Skip to main content

vortex_layout/segments/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use moka::future::Cache;
9use moka::future::CacheBuilder;
10use moka::policy::EvictionPolicy;
11use rustc_hash::FxBuildHasher;
12use vortex_array::buffer::BufferHandle;
13use vortex_buffer::ByteBuffer;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_metrics::Counter;
17use vortex_metrics::Label;
18use vortex_metrics::MetricBuilder;
19use vortex_metrics::MetricsRegistry;
20
21use crate::segments::SegmentFuture;
22use crate::segments::SegmentId;
23use crate::segments::SegmentSource;
24
25/// A cache for storing and retrieving individual segment data.
26#[async_trait]
27pub trait SegmentCache: Send + Sync {
28    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
29    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
30}
31
32pub struct NoOpSegmentCache;
33
34#[async_trait]
35impl SegmentCache for NoOpSegmentCache {
36    async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
37        Ok(None)
38    }
39
40    async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
41        Ok(())
42    }
43}
44
45/// A [`SegmentCache`] based around an in-memory Moka cache.
46pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
47
48impl MokaSegmentCache {
49    pub fn new(max_capacity_bytes: u64) -> Self {
50        Self(
51            CacheBuilder::new(max_capacity_bytes)
52                .name("vortex-segment-cache")
53                // Weight each segment by the number of bytes in the buffer.
54                .weigher(|_, buffer: &ByteBuffer| {
55                    u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
56                })
57                // We configure LFU (vs LRU) since the cache is mostly used when re-reading the
58                // same file - it is _not_ used when reading the same segments during a single
59                // scan.
60                .eviction_policy(EvictionPolicy::tiny_lfu())
61                .build_with_hasher(FxBuildHasher),
62        )
63    }
64}
65
66#[async_trait]
67impl SegmentCache for MokaSegmentCache {
68    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
69        Ok(self.0.get(&id).await)
70    }
71
72    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
73        self.0.insert(id, buffer).await;
74        Ok(())
75    }
76}
77
78/// Wrapper for [`SegmentCache`] that tracks its hit rate.
79pub struct InstrumentedSegmentCache<C> {
80    segment_cache: C,
81
82    hits: Counter,
83    misses: Counter,
84    stores: Counter,
85}
86
87impl<C: SegmentCache> InstrumentedSegmentCache<C> {
88    pub fn new(
89        segment_cache: C,
90        metrics_registry: &dyn MetricsRegistry,
91        labels: Vec<Label>,
92    ) -> Self {
93        Self {
94            segment_cache,
95            hits: MetricBuilder::new(metrics_registry)
96                .add_labels(labels.clone())
97                .counter("vortex.file.segments.cache.hits"),
98            misses: MetricBuilder::new(metrics_registry)
99                .add_labels(labels.clone())
100                .counter("vortex.file.segments.cache.misses"),
101            stores: MetricBuilder::new(metrics_registry)
102                .add_labels(labels)
103                .counter("vortex.file.segments.cache.stores"),
104        }
105    }
106}
107
108#[async_trait]
109impl<C: SegmentCache> SegmentCache for InstrumentedSegmentCache<C> {
110    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
111        let result = self.segment_cache.get(id).await?;
112        if result.is_some() {
113            self.hits.add(1);
114        } else {
115            self.misses.add(1);
116        }
117        Ok(result)
118    }
119
120    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
121        self.segment_cache.put(id, buffer).await?;
122        self.stores.add(1);
123        Ok(())
124    }
125}
126
127pub struct SegmentCacheSourceAdapter {
128    cache: Arc<dyn SegmentCache>,
129    source: Arc<dyn SegmentSource>,
130}
131
132impl SegmentCacheSourceAdapter {
133    pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
134        Self { cache, source }
135    }
136}
137
138impl SegmentSource for SegmentCacheSourceAdapter {
139    fn request(&self, id: SegmentId) -> SegmentFuture {
140        let cache = self.cache.clone();
141        let delegate = self.source.request(id);
142
143        async move {
144            if let Ok(Some(segment)) = cache.get(id).await {
145                tracing::debug!("Resolved segment {} from cache", id);
146                return Ok(BufferHandle::new_host(segment));
147            }
148            let result = delegate.await?;
149            // Cache only CPU buffers; device buffers are not cached.
150            if let Some(buffer) = result.as_host_opt()
151                && let Err(e) = cache.put(id, buffer.clone()).await
152            {
153                tracing::warn!("Failed to store segment {} in cache: {}", id, e);
154            }
155            Ok(result)
156        }
157        .boxed()
158    }
159}