Skip to main content

liquid_cache/cache/
io_context.rs

1use std::{fmt::Debug, ops::Range};
2
3use ahash::AHashMap;
4use bytes::Bytes;
5
6use crate::sync::{Arc, RwLock};
7use crate::{
8    cache::{
9        CacheExpression, Observer,
10        observer::InternalEvent,
11        utils::{EntryID, LiquidCompressorStates},
12    },
13    liquid_array::SqueezeIoHandler,
14};
15
16/// Per-entry metadata used by the cache.
17///
18/// This trait covers only the metadata side of the cache: where to find a
19/// batch's compressor and squeeze hints. All actual byte IO goes through the
20/// [`t4::Store`] held by the cache itself.
21pub trait EntryMetadata: Debug + Send + Sync {
22    /// Add a squeeze hint for an entry.
23    fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
24        // Do nothing by default
25    }
26
27    /// Get the squeeze hint for an entry.
28    /// If None, the entry will be evicted to disk entirely.
29    /// If Some, the entry will be squeezed according to the cache expressions previously recorded for this column.
30    /// For example, if expression is ExtractDate32 { field: Date32Field::Year },
31    /// the entry will be squeezed to a [crate::liquid_array::SqueezedDate32Array] with the year
32    /// component (Date32 or Timestamp input).
33    fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
34        None
35    }
36
37    /// Get the compressor for an entry.
38    fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
39}
40
41/// Convert an [`EntryID`] to a t4 key (8-byte little-endian representation).
42pub(crate) fn entry_id_to_key(entry_id: &EntryID) -> Vec<u8> {
43    usize::from(*entry_id).to_le_bytes().to_vec()
44}
45
46/// A default implementation of [`EntryMetadata`].
47///
48/// All entries share a single [`LiquidCompressorStates`] and squeeze hints are
49/// stored in a flat map keyed by [`EntryID`].
50#[derive(Debug, Default)]
51pub struct DefaultCacheMetadata {
52    compressor_state: Arc<LiquidCompressorStates>,
53    squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
54}
55
56impl DefaultCacheMetadata {
57    /// Create a new instance of [`DefaultCacheMetadata`].
58    pub fn new() -> Self {
59        Self {
60            compressor_state: Arc::new(LiquidCompressorStates::new()),
61            squeeze_hints: RwLock::new(AHashMap::new()),
62        }
63    }
64}
65
66impl EntryMetadata for DefaultCacheMetadata {
67    fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
68        let mut guard = self.squeeze_hints.write().unwrap();
69        guard.insert(*entry_id, expression);
70    }
71
72    fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
73        let guard = self.squeeze_hints.read().unwrap();
74        guard.get(entry_id).cloned()
75    }
76
77    fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
78        self.compressor_state.clone()
79    }
80}
81
82/// A default implementation of [SqueezeIoHandler] backed by a [`t4::Store`].
83#[derive(Debug)]
84pub struct DefaultSqueezeIo {
85    store: t4::Store,
86    entry_id: EntryID,
87    observer: Arc<Observer>,
88}
89
90impl DefaultSqueezeIo {
91    /// Create a new instance of [DefaultSqueezeIo].
92    pub fn new(store: t4::Store, entry_id: EntryID, observer: Arc<Observer>) -> Self {
93        Self {
94            store,
95            entry_id,
96            observer,
97        }
98    }
99}
100
101#[async_trait::async_trait]
102impl SqueezeIoHandler for DefaultSqueezeIo {
103    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
104        let key = entry_id_to_key(&self.entry_id);
105        let bytes = match range {
106            Some(range) => {
107                let len = range.end - range.start;
108                self.store
109                    .get_range(&key, range.start, len)
110                    .await
111                    .map_err(|e| std::io::Error::other(e.to_string()))?
112            }
113            None => self
114                .store
115                .get(&key)
116                .await
117                .map_err(|e| std::io::Error::other(e.to_string()))?,
118        };
119        let bytes = Bytes::from(bytes);
120        self.observer
121            .record_internal(InternalEvent::IoReadSqueezedBacking {
122                entry: self.entry_id,
123                bytes: bytes.len(),
124            });
125        Ok(bytes)
126    }
127
128    fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
129        self.observer
130            .record_internal(InternalEvent::DecompressSqueezed {
131                entry: self.entry_id,
132                decompressed: decompress_cnt,
133                total: total_cnt,
134            });
135    }
136
137    fn trace_io_saved(&self) {
138        self.observer.runtime_stats().incr_squeeze_io_saved();
139    }
140}
141
142#[cfg(test)]
143#[derive(Debug, Default)]
144pub(crate) struct TestSqueezeIo {
145    bytes: std::sync::Mutex<Option<Bytes>>,
146    reads: std::sync::atomic::AtomicUsize,
147}
148
149#[cfg(test)]
150impl TestSqueezeIo {
151    pub(crate) fn set_bytes(&self, bytes: Bytes) {
152        *self.bytes.lock().unwrap() = Some(bytes);
153    }
154
155    pub(crate) fn reads(&self) -> usize {
156        self.reads.load(std::sync::atomic::Ordering::SeqCst)
157    }
158
159    pub(crate) fn reset_reads(&self) {
160        self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
161    }
162}
163
164#[cfg(test)]
165#[async_trait::async_trait]
166impl SqueezeIoHandler for TestSqueezeIo {
167    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
168        self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
169        let bytes = self
170            .bytes
171            .lock()
172            .unwrap()
173            .clone()
174            .expect("test squeeze backing set");
175        Ok(match range {
176            Some(range) => bytes.slice(range.start as usize..range.end as usize),
177            None => bytes,
178        })
179    }
180}