vortex_layout/segments/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use moka::future::Cache;
9use moka::future::CacheBuilder;
10use moka::policy::EvictionPolicy;
11use rustc_hash::FxBuildHasher;
12use vortex_buffer::BufferHandle;
13use vortex_buffer::ByteBuffer;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_metrics::Counter;
17use vortex_metrics::VortexMetrics;
18
19use crate::segments::SegmentFuture;
20use crate::segments::SegmentId;
21use crate::segments::SegmentSource;
22
23/// A cache for storing and retrieving individual segment data.
24#[async_trait]
25pub trait SegmentCache: Send + Sync {
26    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
27    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
28}
29
30pub struct NoOpSegmentCache;
31
32#[async_trait]
33impl SegmentCache for NoOpSegmentCache {
34    async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
35        Ok(None)
36    }
37
38    async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
39        Ok(())
40    }
41}
42
43/// A [`SegmentCache`] based around an in-memory Moka cache.
44pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
45
46impl MokaSegmentCache {
47    pub fn new(max_capacity_bytes: u64) -> Self {
48        Self(
49            CacheBuilder::new(max_capacity_bytes)
50                .name("vortex-segment-cache")
51                // Weight each segment by the number of bytes in the buffer.
52                .weigher(|_, buffer: &ByteBuffer| {
53                    u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
54                })
55                // We configure LFU (vs LRU) since the cache is mostly used when re-reading the
56                // same file - it is _not_ used when reading the same segments during a single
57                // scan.
58                .eviction_policy(EvictionPolicy::tiny_lfu())
59                .build_with_hasher(FxBuildHasher),
60        )
61    }
62}
63
64#[async_trait]
65impl SegmentCache for MokaSegmentCache {
66    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
67        Ok(self.0.get(&id).await)
68    }
69
70    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
71        self.0.insert(id, buffer).await;
72        Ok(())
73    }
74}
75
76pub struct SegmentCacheMetrics<C> {
77    segment_cache: C,
78
79    hits: Arc<Counter>,
80    misses: Arc<Counter>,
81    stores: Arc<Counter>,
82}
83
84impl<C: SegmentCache> SegmentCacheMetrics<C> {
85    pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
86        Self {
87            segment_cache,
88            hits: metrics.counter("vortex.file.segments.cache.hits"),
89            misses: metrics.counter("vortex.file.segments.cache.misses"),
90            stores: metrics.counter("vortex.file.segments.cache.stores"),
91        }
92    }
93}
94
95#[async_trait]
96impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
97    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
98        let result = self.segment_cache.get(id).await?;
99        if result.is_some() {
100            self.hits.inc()
101        } else {
102            self.misses.inc()
103        }
104        Ok(result)
105    }
106
107    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
108        self.segment_cache.put(id, buffer).await?;
109        self.stores.inc();
110        Ok(())
111    }
112}
113
114pub struct SegmentCacheSourceAdapter {
115    cache: Arc<dyn SegmentCache>,
116    source: Arc<dyn SegmentSource>,
117}
118
119impl SegmentCacheSourceAdapter {
120    pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
121        Self { cache, source }
122    }
123}
124
125impl SegmentSource for SegmentCacheSourceAdapter {
126    fn request(&self, id: SegmentId) -> SegmentFuture {
127        let cache = self.cache.clone();
128        let delegate = self.source.request(id);
129
130        async move {
131            if let Ok(Some(segment)) = cache.get(id).await {
132                log::debug!("Resolved segment {} from cache", id);
133                return Ok(BufferHandle::Buffer(segment));
134            }
135            let result = delegate.await?;
136            // Cache only CPU buffers; device buffers are not cached.
137            if let BufferHandle::Buffer(ref buffer) = result
138                && let Err(e) = cache.put(id, buffer.clone()).await
139            {
140                log::warn!("Failed to store segment {} in cache: {}", id, e);
141            }
142            Ok(result)
143        }
144        .boxed()
145    }
146}