vortex_layout/segments/
cache.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::FutureExt;
8use moka::future::{Cache, CacheBuilder};
9use moka::policy::EvictionPolicy;
10use rustc_hash::FxBuildHasher;
11use vortex_buffer::ByteBuffer;
12use vortex_error::{VortexExpect, VortexResult};
13use vortex_metrics::{Counter, VortexMetrics};
14
15use crate::segments::{SegmentFuture, SegmentId, SegmentSource};
16
17/// A cache for storing and retrieving individual segment data.
18#[async_trait]
19pub trait SegmentCache: Send + Sync {
20    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
21    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
22}
23
24pub struct NoOpSegmentCache;
25
26#[async_trait]
27impl SegmentCache for NoOpSegmentCache {
28    async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
29        Ok(None)
30    }
31
32    async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
33        Ok(())
34    }
35}
36
37/// A [`SegmentCache`] based around an in-memory Moka cache.
38pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
39
40impl MokaSegmentCache {
41    pub fn new(max_capacity_bytes: u64) -> Self {
42        Self(
43            CacheBuilder::new(max_capacity_bytes)
44                .name("vortex-segment-cache")
45                // Weight each segment by the number of bytes in the buffer.
46                .weigher(|_, buffer: &ByteBuffer| {
47                    u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
48                })
49                // We configure LFU (vs LRU) since the cache is mostly used when re-reading the
50                // same file - it is _not_ used when reading the same segments during a single
51                // scan.
52                .eviction_policy(EvictionPolicy::tiny_lfu())
53                .build_with_hasher(FxBuildHasher),
54        )
55    }
56}
57
58#[async_trait]
59impl SegmentCache for MokaSegmentCache {
60    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
61        Ok(self.0.get(&id).await)
62    }
63
64    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
65        self.0.insert(id, buffer).await;
66        Ok(())
67    }
68}
69
70pub struct SegmentCacheMetrics<C> {
71    segment_cache: C,
72
73    hits: Arc<Counter>,
74    misses: Arc<Counter>,
75    stores: Arc<Counter>,
76}
77
78impl<C: SegmentCache> SegmentCacheMetrics<C> {
79    pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
80        Self {
81            segment_cache,
82            hits: metrics.counter("vortex.file.segments.cache.hits"),
83            misses: metrics.counter("vortex.file.segments.cache.misses"),
84            stores: metrics.counter("vortex.file.segments.cache.stores"),
85        }
86    }
87}
88
89#[async_trait]
90impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
91    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
92        let result = self.segment_cache.get(id).await?;
93        if result.is_some() {
94            self.hits.inc()
95        } else {
96            self.misses.inc()
97        }
98        Ok(result)
99    }
100
101    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
102        self.segment_cache.put(id, buffer).await?;
103        self.stores.inc();
104        Ok(())
105    }
106}
107
108pub struct SegmentCacheSourceAdapter {
109    cache: Arc<dyn SegmentCache>,
110    source: Arc<dyn SegmentSource>,
111}
112
113impl SegmentCacheSourceAdapter {
114    pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
115        Self { cache, source }
116    }
117}
118
119impl SegmentSource for SegmentCacheSourceAdapter {
120    fn request(&self, id: SegmentId) -> SegmentFuture {
121        let cache = self.cache.clone();
122        let delegate = self.source.request(id);
123
124        async move {
125            if let Ok(Some(segment)) = cache.get(id).await {
126                log::debug!("Resolved segment {} from cache", id);
127                return Ok(segment);
128            }
129            let result = delegate.await?;
130            if let Err(e) = cache.put(id, result.clone()).await {
131                log::warn!("Failed to store segment {} in cache: {}", id, e);
132            }
133            Ok(result)
134        }
135        .boxed()
136    }
137}