vortex_file/segments/
cache.rs

1use std::sync::{Arc, RwLock};
2
3use async_trait::async_trait;
4use futures::FutureExt;
5use moka::future::{Cache, CacheBuilder};
6use moka::policy::EvictionPolicy;
7use rustc_hash::FxBuildHasher;
8use vortex_array::aliases::hash_map::HashMap;
9use vortex_buffer::ByteBuffer;
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_layout::segments::{SegmentFuture, SegmentId, SegmentSource};
12use vortex_metrics::{Counter, VortexMetrics};
13
14/// A cache for storing and retrieving individual segment data.
15#[async_trait]
16pub trait SegmentCache: Send + Sync {
17    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
18    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
19}
20
21pub(crate) struct NoOpSegmentCache;
22
23#[async_trait]
24impl SegmentCache for NoOpSegmentCache {
25    async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
26        Ok(None)
27    }
28
29    async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
30        Ok(())
31    }
32}
33
34/// A [`SegmentCache`] based around an in-memory Moka cache.
35pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
36
37impl MokaSegmentCache {
38    pub fn new(max_capacity_bytes: u64) -> Self {
39        Self(
40            CacheBuilder::new(max_capacity_bytes)
41                .name("vortex-segment-cache")
42                // Weight each segment by the number of bytes in the buffer.
43                .weigher(|_, buffer: &ByteBuffer| {
44                    u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
45                })
46                // We configure LFU (vs LRU) since the cache is mostly used when re-reading the
47                // same file - it is _not_ used when reading the same segments during a single
48                // scan.
49                .eviction_policy(EvictionPolicy::tiny_lfu())
50                .build_with_hasher(FxBuildHasher),
51        )
52    }
53}
54
55#[async_trait]
56impl SegmentCache for MokaSegmentCache {
57    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
58        Ok(self.0.get(&id).await)
59    }
60
61    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
62        self.0.insert(id, buffer).await;
63        Ok(())
64    }
65}
66
67/// Segment cache containing the initial read segments.
68pub(crate) struct InitialReadSegmentCache {
69    pub(crate) initial: RwLock<HashMap<SegmentId, ByteBuffer>>,
70    pub(crate) fallback: Arc<dyn SegmentCache>,
71}
72
73#[async_trait]
74impl SegmentCache for InitialReadSegmentCache {
75    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
76        if let Some(buffer) = self.initial.read().vortex_expect("poisoned lock").get(&id) {
77            return Ok(Some(buffer.clone()));
78        }
79        self.fallback.get(id).await
80    }
81
82    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
83        self.fallback.put(id, buffer).await
84    }
85}
86
87pub struct SegmentCacheMetrics<C> {
88    segment_cache: C,
89
90    hits: Arc<Counter>,
91    misses: Arc<Counter>,
92    stores: Arc<Counter>,
93}
94
95impl<C: SegmentCache> SegmentCacheMetrics<C> {
96    pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
97        Self {
98            segment_cache,
99            hits: metrics.counter("vortex.file.segments.cache.hits"),
100            misses: metrics.counter("vortex.file.segments.cache.misses"),
101            stores: metrics.counter("vortex.file.segments.cache.stores"),
102        }
103    }
104}
105
106#[async_trait]
107impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
108    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
109        let result = self.segment_cache.get(id).await?;
110        if result.is_some() {
111            self.hits.inc()
112        } else {
113            self.misses.inc()
114        }
115        Ok(result)
116    }
117
118    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
119        self.segment_cache.put(id, buffer).await?;
120        self.stores.inc();
121        Ok(())
122    }
123}
124
125pub struct SegmentCacheSourceAdapter {
126    cache: Arc<dyn SegmentCache>,
127    source: Arc<dyn SegmentSource>,
128}
129
130impl SegmentCacheSourceAdapter {
131    pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
132        Self { cache, source }
133    }
134}
135
136impl SegmentSource for SegmentCacheSourceAdapter {
137    fn request(&self, id: SegmentId, for_whom: &Arc<str>) -> SegmentFuture {
138        let cache = self.cache.clone();
139        let delegate = self.source.request(id, for_whom);
140        let for_whom = for_whom.clone();
141
142        async move {
143            if let Ok(Some(segment)) = cache.get(id).await {
144                log::debug!("Resolved segment {} for {} from cache", id, &for_whom);
145                return Ok(segment);
146            }
147            let result = delegate.await?;
148            if let Err(e) = cache.put(id, result.clone()).await {
149                log::warn!(
150                    "Failed to store segment {} for {} in cache: {}",
151                    id,
152                    &for_whom,
153                    e
154                );
155            }
156            Ok(result)
157        }
158        .boxed()
159    }
160}