vortex_layout/segments/
cache.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use moka::future::{Cache, CacheBuilder};
9use moka::policy::EvictionPolicy;
10use rustc_hash::FxBuildHasher;
11use vortex_buffer::ByteBuffer;
12use vortex_error::{VortexExpect, VortexResult};
13use vortex_metrics::{Counter, VortexMetrics};
14
15use crate::segments::{SegmentFuture, SegmentId, SegmentSource};
16
17#[async_trait]
19pub trait SegmentCache: Send + Sync {
20 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
21 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
22}
23
24pub struct NoOpSegmentCache;
25
26#[async_trait]
27impl SegmentCache for NoOpSegmentCache {
28 async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
29 Ok(None)
30 }
31
32 async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
33 Ok(())
34 }
35}
36
37pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
39
40impl MokaSegmentCache {
41 pub fn new(max_capacity_bytes: u64) -> Self {
42 Self(
43 CacheBuilder::new(max_capacity_bytes)
44 .name("vortex-segment-cache")
45 .weigher(|_, buffer: &ByteBuffer| {
47 u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
48 })
49 .eviction_policy(EvictionPolicy::tiny_lfu())
53 .build_with_hasher(FxBuildHasher),
54 )
55 }
56}
57
58#[async_trait]
59impl SegmentCache for MokaSegmentCache {
60 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
61 Ok(self.0.get(&id).await)
62 }
63
64 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
65 self.0.insert(id, buffer).await;
66 Ok(())
67 }
68}
69
70pub struct SegmentCacheMetrics<C> {
71 segment_cache: C,
72
73 hits: Arc<Counter>,
74 misses: Arc<Counter>,
75 stores: Arc<Counter>,
76}
77
78impl<C: SegmentCache> SegmentCacheMetrics<C> {
79 pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
80 Self {
81 segment_cache,
82 hits: metrics.counter("vortex.file.segments.cache.hits"),
83 misses: metrics.counter("vortex.file.segments.cache.misses"),
84 stores: metrics.counter("vortex.file.segments.cache.stores"),
85 }
86 }
87}
88
89#[async_trait]
90impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
91 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
92 let result = self.segment_cache.get(id).await?;
93 if result.is_some() {
94 self.hits.inc()
95 } else {
96 self.misses.inc()
97 }
98 Ok(result)
99 }
100
101 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
102 self.segment_cache.put(id, buffer).await?;
103 self.stores.inc();
104 Ok(())
105 }
106}
107
108pub struct SegmentCacheSourceAdapter {
109 cache: Arc<dyn SegmentCache>,
110 source: Arc<dyn SegmentSource>,
111}
112
113impl SegmentCacheSourceAdapter {
114 pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
115 Self { cache, source }
116 }
117}
118
119impl SegmentSource for SegmentCacheSourceAdapter {
120 fn request(&self, id: SegmentId) -> SegmentFuture {
121 let cache = self.cache.clone();
122 let delegate = self.source.request(id);
123
124 async move {
125 if let Ok(Some(segment)) = cache.get(id).await {
126 log::debug!("Resolved segment {} from cache", id);
127 return Ok(segment);
128 }
129 let result = delegate.await?;
130 if let Err(e) = cache.put(id, result.clone()).await {
131 log::warn!("Failed to store segment {} in cache: {}", id, e);
132 }
133 Ok(result)
134 }
135 .boxed()
136 }
137}