vortex_layout/segments/
cache.rs1use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use moka::future::Cache;
9use moka::future::CacheBuilder;
10use moka::policy::EvictionPolicy;
11use rustc_hash::FxBuildHasher;
12use vortex_array::buffer::BufferHandle;
13use vortex_buffer::ByteBuffer;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_metrics::Counter;
17use vortex_metrics::Label;
18use vortex_metrics::MetricBuilder;
19use vortex_metrics::MetricsRegistry;
20
21use crate::segments::SegmentFuture;
22use crate::segments::SegmentId;
23use crate::segments::SegmentSource;
24
25#[async_trait]
27pub trait SegmentCache: Send + Sync {
28 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
29 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
30}
31
32pub struct NoOpSegmentCache;
33
34#[async_trait]
35impl SegmentCache for NoOpSegmentCache {
36 async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
37 Ok(None)
38 }
39
40 async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
41 Ok(())
42 }
43}
44
45pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
47
48impl MokaSegmentCache {
49 pub fn new(max_capacity_bytes: u64) -> Self {
50 Self(
51 CacheBuilder::new(max_capacity_bytes)
52 .name("vortex-segment-cache")
53 .weigher(|_, buffer: &ByteBuffer| {
55 u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
56 })
57 .eviction_policy(EvictionPolicy::tiny_lfu())
61 .build_with_hasher(FxBuildHasher),
62 )
63 }
64}
65
66#[async_trait]
67impl SegmentCache for MokaSegmentCache {
68 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
69 Ok(self.0.get(&id).await)
70 }
71
72 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
73 self.0.insert(id, buffer).await;
74 Ok(())
75 }
76}
77
78pub struct InstrumentedSegmentCache<C> {
80 segment_cache: C,
81
82 hits: Counter,
83 misses: Counter,
84 stores: Counter,
85}
86
87impl<C: SegmentCache> InstrumentedSegmentCache<C> {
88 pub fn new(
89 segment_cache: C,
90 metrics_registry: &dyn MetricsRegistry,
91 labels: Vec<Label>,
92 ) -> Self {
93 Self {
94 segment_cache,
95 hits: MetricBuilder::new(metrics_registry)
96 .add_labels(labels.clone())
97 .counter("vortex.file.segments.cache.hits"),
98 misses: MetricBuilder::new(metrics_registry)
99 .add_labels(labels.clone())
100 .counter("vortex.file.segments.cache.misses"),
101 stores: MetricBuilder::new(metrics_registry)
102 .add_labels(labels)
103 .counter("vortex.file.segments.cache.stores"),
104 }
105 }
106}
107
108#[async_trait]
109impl<C: SegmentCache> SegmentCache for InstrumentedSegmentCache<C> {
110 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
111 let result = self.segment_cache.get(id).await?;
112 if result.is_some() {
113 self.hits.add(1);
114 } else {
115 self.misses.add(1);
116 }
117 Ok(result)
118 }
119
120 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
121 self.segment_cache.put(id, buffer).await?;
122 self.stores.add(1);
123 Ok(())
124 }
125}
126
127pub struct SegmentCacheSourceAdapter {
128 cache: Arc<dyn SegmentCache>,
129 source: Arc<dyn SegmentSource>,
130}
131
132impl SegmentCacheSourceAdapter {
133 pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
134 Self { cache, source }
135 }
136}
137
138impl SegmentSource for SegmentCacheSourceAdapter {
139 fn request(&self, id: SegmentId) -> SegmentFuture {
140 let cache = self.cache.clone();
141 let delegate = self.source.request(id);
142
143 async move {
144 if let Ok(Some(segment)) = cache.get(id).await {
145 tracing::debug!("Resolved segment {} from cache", id);
146 return Ok(BufferHandle::new_host(segment));
147 }
148 let result = delegate.await?;
149 if let Some(buffer) = result.as_host_opt()
151 && let Err(e) = cache.put(id, buffer.clone()).await
152 {
153 tracing::warn!("Failed to store segment {} in cache: {}", id, e);
154 }
155 Ok(result)
156 }
157 .boxed()
158 }
159}