vortex_file/segments/
cache.rs1use std::sync::{Arc, RwLock};
2
3use async_trait::async_trait;
4use futures::FutureExt;
5use moka::future::{Cache, CacheBuilder};
6use moka::policy::EvictionPolicy;
7use rustc_hash::FxBuildHasher;
8use vortex_array::aliases::hash_map::HashMap;
9use vortex_buffer::ByteBuffer;
10use vortex_error::{VortexExpect, VortexResult};
11use vortex_layout::segments::{SegmentFuture, SegmentId, SegmentSource};
12use vortex_metrics::{Counter, VortexMetrics};
13
14#[async_trait]
16pub trait SegmentCache: Send + Sync {
17 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
18 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
19}
20
21pub(crate) struct NoOpSegmentCache;
22
23#[async_trait]
24impl SegmentCache for NoOpSegmentCache {
25 async fn get(&self, _id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
26 Ok(None)
27 }
28
29 async fn put(&self, _id: SegmentId, _buffer: ByteBuffer) -> VortexResult<()> {
30 Ok(())
31 }
32}
33
34pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
36
37impl MokaSegmentCache {
38 pub fn new(max_capacity_bytes: u64) -> Self {
39 Self(
40 CacheBuilder::new(max_capacity_bytes)
41 .name("vortex-segment-cache")
42 .weigher(|_, buffer: &ByteBuffer| {
44 u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
45 })
46 .eviction_policy(EvictionPolicy::tiny_lfu())
50 .build_with_hasher(FxBuildHasher),
51 )
52 }
53}
54
55#[async_trait]
56impl SegmentCache for MokaSegmentCache {
57 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
58 Ok(self.0.get(&id).await)
59 }
60
61 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
62 self.0.insert(id, buffer).await;
63 Ok(())
64 }
65}
66
67pub(crate) struct InitialReadSegmentCache {
69 pub(crate) initial: RwLock<HashMap<SegmentId, ByteBuffer>>,
70 pub(crate) fallback: Arc<dyn SegmentCache>,
71}
72
73#[async_trait]
74impl SegmentCache for InitialReadSegmentCache {
75 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
76 if let Some(buffer) = self.initial.read().vortex_expect("poisoned lock").get(&id) {
77 return Ok(Some(buffer.clone()));
78 }
79 self.fallback.get(id).await
80 }
81
82 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
83 self.fallback.put(id, buffer).await
84 }
85}
86
87pub struct SegmentCacheMetrics<C> {
88 segment_cache: C,
89
90 hits: Arc<Counter>,
91 misses: Arc<Counter>,
92 stores: Arc<Counter>,
93}
94
95impl<C: SegmentCache> SegmentCacheMetrics<C> {
96 pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
97 Self {
98 segment_cache,
99 hits: metrics.counter("vortex.file.segments.cache.hits"),
100 misses: metrics.counter("vortex.file.segments.cache.misses"),
101 stores: metrics.counter("vortex.file.segments.cache.stores"),
102 }
103 }
104}
105
106#[async_trait]
107impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
108 async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
109 let result = self.segment_cache.get(id).await?;
110 if result.is_some() {
111 self.hits.inc()
112 } else {
113 self.misses.inc()
114 }
115 Ok(result)
116 }
117
118 async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
119 self.segment_cache.put(id, buffer).await?;
120 self.stores.inc();
121 Ok(())
122 }
123}
124
125pub struct SegmentCacheSourceAdapter {
126 cache: Arc<dyn SegmentCache>,
127 source: Arc<dyn SegmentSource>,
128}
129
130impl SegmentCacheSourceAdapter {
131 pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
132 Self { cache, source }
133 }
134}
135
136impl SegmentSource for SegmentCacheSourceAdapter {
137 fn request(&self, id: SegmentId, for_whom: &Arc<str>) -> SegmentFuture {
138 let cache = self.cache.clone();
139 let delegate = self.source.request(id, for_whom);
140 let for_whom = for_whom.clone();
141
142 async move {
143 if let Ok(Some(segment)) = cache.get(id).await {
144 log::debug!("Resolved segment {} for {} from cache", id, &for_whom);
145 return Ok(segment);
146 }
147 let result = delegate.await?;
148 if let Err(e) = cache.put(id, result.clone()).await {
149 log::warn!(
150 "Failed to store segment {} for {} in cache: {}",
151 id,
152 &for_whom,
153 e
154 );
155 }
156 Ok(result)
157 }
158 .boxed()
159 }
160}