liquid_cache/cache/
io_context.rs1use std::{fmt::Debug, ops::Range};
2
3use ahash::AHashMap;
4use bytes::Bytes;
5
6use crate::sync::{Arc, RwLock};
7use crate::{
8 cache::{
9 CacheExpression, Observer,
10 observer::InternalEvent,
11 utils::{EntryID, LiquidCompressorStates},
12 },
13 liquid_array::SqueezeIoHandler,
14};
15
16pub trait EntryMetadata: Debug + Send + Sync {
22 fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
24 }
26
27 fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
34 None
35 }
36
37 fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
39}
40
41pub(crate) fn entry_id_to_key(entry_id: &EntryID) -> Vec<u8> {
43 usize::from(*entry_id).to_le_bytes().to_vec()
44}
45
46#[derive(Debug, Default)]
51pub struct DefaultCacheMetadata {
52 compressor_state: Arc<LiquidCompressorStates>,
53 squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
54}
55
56impl DefaultCacheMetadata {
57 pub fn new() -> Self {
59 Self {
60 compressor_state: Arc::new(LiquidCompressorStates::new()),
61 squeeze_hints: RwLock::new(AHashMap::new()),
62 }
63 }
64}
65
66impl EntryMetadata for DefaultCacheMetadata {
67 fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
68 let mut guard = self.squeeze_hints.write().unwrap();
69 guard.insert(*entry_id, expression);
70 }
71
72 fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
73 let guard = self.squeeze_hints.read().unwrap();
74 guard.get(entry_id).cloned()
75 }
76
77 fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
78 self.compressor_state.clone()
79 }
80}
81
82#[derive(Debug)]
84pub struct DefaultSqueezeIo {
85 store: t4::Store,
86 entry_id: EntryID,
87 observer: Arc<Observer>,
88}
89
90impl DefaultSqueezeIo {
91 pub fn new(store: t4::Store, entry_id: EntryID, observer: Arc<Observer>) -> Self {
93 Self {
94 store,
95 entry_id,
96 observer,
97 }
98 }
99}
100
101#[async_trait::async_trait]
102impl SqueezeIoHandler for DefaultSqueezeIo {
103 async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
104 let key = entry_id_to_key(&self.entry_id);
105 let bytes = match range {
106 Some(range) => {
107 let len = range.end - range.start;
108 self.store
109 .get_range(&key, range.start, len)
110 .await
111 .map_err(|e| std::io::Error::other(e.to_string()))?
112 }
113 None => self
114 .store
115 .get(&key)
116 .await
117 .map_err(|e| std::io::Error::other(e.to_string()))?,
118 };
119 let bytes = Bytes::from(bytes);
120 self.observer
121 .record_internal(InternalEvent::IoReadSqueezedBacking {
122 entry: self.entry_id,
123 bytes: bytes.len(),
124 });
125 Ok(bytes)
126 }
127
128 fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
129 self.observer
130 .record_internal(InternalEvent::DecompressSqueezed {
131 entry: self.entry_id,
132 decompressed: decompress_cnt,
133 total: total_cnt,
134 });
135 }
136
137 fn trace_io_saved(&self) {
138 self.observer.runtime_stats().incr_squeeze_io_saved();
139 }
140}
141
142#[cfg(test)]
143#[derive(Debug, Default)]
144pub(crate) struct TestSqueezeIo {
145 bytes: std::sync::Mutex<Option<Bytes>>,
146 reads: std::sync::atomic::AtomicUsize,
147}
148
149#[cfg(test)]
150impl TestSqueezeIo {
151 pub(crate) fn set_bytes(&self, bytes: Bytes) {
152 *self.bytes.lock().unwrap() = Some(bytes);
153 }
154
155 pub(crate) fn reads(&self) -> usize {
156 self.reads.load(std::sync::atomic::Ordering::SeqCst)
157 }
158
159 pub(crate) fn reset_reads(&self) {
160 self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
161 }
162}
163
164#[cfg(test)]
165#[async_trait::async_trait]
166impl SqueezeIoHandler for TestSqueezeIo {
167 async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
168 self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
169 let bytes = self
170 .bytes
171 .lock()
172 .unwrap()
173 .clone()
174 .expect("test squeeze backing set");
175 Ok(match range {
176 Some(range) => bytes.slice(range.start as usize..range.end as usize),
177 None => bytes,
178 })
179 }
180}