vortex_layout/segments/
cache.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use moka::future::Cache;
9use moka::future::CacheBuilder;
10use moka::policy::EvictionPolicy;
11use rustc_hash::FxBuildHasher;
12use vortex_buffer::BufferHandle;
13use vortex_buffer::ByteBuffer;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_metrics::Counter;
17use vortex_metrics::VortexMetrics;
18
19use crate::segments::SegmentFuture;
20use crate::segments::SegmentId;
21use crate::segments::SegmentSource;
22
23#[async_trait]
25pub trait SegmentCache: Send + Sync {
26 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
27 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
28}
29
30pub struct NoOpSegmentCache;
31
32#[async_trait]
33impl SegmentCache for NoOpSegmentCache {
34 async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
35 Ok(None)
36 }
37
38 async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
39 Ok(())
40 }
41}
42
43pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
45
46impl MokaSegmentCache {
47 pub fn new(max_capacity_bytes: u64) -> Self {
48 Self(
49 CacheBuilder::new(max_capacity_bytes)
50 .name("vortex-segment-cache")
51 .weigher(|_, buffer: &ByteBuffer| {
53 u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
54 })
55 .eviction_policy(EvictionPolicy::tiny_lfu())
59 .build_with_hasher(FxBuildHasher),
60 )
61 }
62}
63
64#[async_trait]
65impl SegmentCache for MokaSegmentCache {
66 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
67 Ok(self.0.get(&id).await)
68 }
69
70 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
71 self.0.insert(id, buffer).await;
72 Ok(())
73 }
74}
75
76pub struct SegmentCacheMetrics<C> {
77 segment_cache: C,
78
79 hits: Arc<Counter>,
80 misses: Arc<Counter>,
81 stores: Arc<Counter>,
82}
83
84impl<C: SegmentCache> SegmentCacheMetrics<C> {
85 pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
86 Self {
87 segment_cache,
88 hits: metrics.counter("vortex.file.segments.cache.hits"),
89 misses: metrics.counter("vortex.file.segments.cache.misses"),
90 stores: metrics.counter("vortex.file.segments.cache.stores"),
91 }
92 }
93}
94
95#[async_trait]
96impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
97 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
98 let result = self.segment_cache.get(id).await?;
99 if result.is_some() {
100 self.hits.inc()
101 } else {
102 self.misses.inc()
103 }
104 Ok(result)
105 }
106
107 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
108 self.segment_cache.put(id, buffer).await?;
109 self.stores.inc();
110 Ok(())
111 }
112}
113
114pub struct SegmentCacheSourceAdapter {
115 cache: Arc<dyn SegmentCache>,
116 source: Arc<dyn SegmentSource>,
117}
118
119impl SegmentCacheSourceAdapter {
120 pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
121 Self { cache, source }
122 }
123}
124
125impl SegmentSource for SegmentCacheSourceAdapter {
126 fn request(&self, id: SegmentId) -> SegmentFuture {
127 let cache = self.cache.clone();
128 let delegate = self.source.request(id);
129
130 async move {
131 if let Ok(Some(segment)) = cache.get(id).await {
132 log::debug!("Resolved segment {} from cache", id);
133 return Ok(BufferHandle::Buffer(segment));
134 }
135 let result = delegate.await?;
136 if let BufferHandle::Buffer(ref buffer) = result
138 && let Err(e) = cache.put(id, buffer.clone()).await
139 {
140 log::warn!("Failed to store segment {} in cache: {}", id, e);
141 }
142 Ok(result)
143 }
144 .boxed()
145 }
146}