liquid_cache/cache/
io_context.rs1use std::{fmt::Debug, ops::Range, path::PathBuf};
2
3use ahash::AHashMap;
4use bytes::Bytes;
5
6use crate::sync::{Arc, RwLock};
7use crate::{
8 cache::{
9 CacheExpression, Observer,
10 observer::InternalEvent,
11 utils::{EntryID, LiquidCompressorStates},
12 },
13 liquid_array::SqueezeIoHandler,
14};
15
16#[async_trait::async_trait]
18pub trait IoContext: Debug + Send + Sync {
19 fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
21 }
23
24 fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
31 None
32 }
33
34 fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
36
37 fn disk_path(&self, entry_id: &EntryID) -> PathBuf;
39
40 async fn read(&self, path: PathBuf, range: Option<Range<u64>>)
42 -> Result<Bytes, std::io::Error>;
43
44 async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error>;
46}
47
48#[derive(Debug)]
51pub struct DefaultIoContext {
52 compressor_state: Arc<LiquidCompressorStates>,
53 squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
54 base_dir: PathBuf,
55}
56
57impl DefaultIoContext {
58 pub fn new(base_dir: PathBuf) -> Self {
60 Self {
61 compressor_state: Arc::new(LiquidCompressorStates::new()),
62 base_dir,
63 squeeze_hints: RwLock::new(AHashMap::new()),
64 }
65 }
66}
67
68#[async_trait::async_trait]
69impl IoContext for DefaultIoContext {
70 fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
71 let mut guard = self.squeeze_hints.write().unwrap();
72 guard.insert(*entry_id, expression);
73 }
74
75 fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
76 let guard = self.squeeze_hints.read().unwrap();
77 guard.get(entry_id).cloned()
78 }
79
80 fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
81 self.compressor_state.clone()
82 }
83
84 fn disk_path(&self, entry_id: &EntryID) -> PathBuf {
85 self.base_dir
86 .join(format!("{:016x}.liquid", usize::from(*entry_id)))
87 }
88
89 async fn read(
90 &self,
91 path: PathBuf,
92 range: Option<Range<u64>>,
93 ) -> Result<Bytes, std::io::Error> {
94 if cfg!(test) {
95 let mut file = std::fs::File::open(path)?;
96 match range {
97 Some(range) => {
98 let len = (range.end - range.start) as usize;
99 let mut bytes = vec![0u8; len];
100 std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(range.start))?;
101 std::io::Read::read_exact(&mut file, &mut bytes)?;
102 Ok(Bytes::from(bytes))
103 }
104 None => {
105 let mut bytes = Vec::new();
106 std::io::Read::read_to_end(&mut file, &mut bytes)?;
107 Ok(Bytes::from(bytes))
108 }
109 }
110 } else {
111 use tokio::io::AsyncReadExt;
112 use tokio::io::AsyncSeekExt;
113 let mut file = tokio::fs::File::open(path).await?;
114
115 match range {
116 Some(range) => {
117 let len = (range.end - range.start) as usize;
118 let mut bytes = vec![0u8; len];
119 file.seek(tokio::io::SeekFrom::Start(range.start)).await?;
120 file.read_exact(&mut bytes).await?;
121 Ok(Bytes::from(bytes))
122 }
123 None => {
124 let mut bytes = Vec::new();
125 file.read_to_end(&mut bytes).await?;
126 Ok(Bytes::from(bytes))
127 }
128 }
129 }
130 }
131
132 async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
133 if cfg!(test) {
134 std::fs::write(path, data.as_ref())?;
135 Ok(())
136 } else {
137 use tokio::io::AsyncWriteExt;
138 let mut file = tokio::fs::File::create(path).await?;
139 file.write_all(&data).await?;
140 file.sync_all().await?;
141 Ok(())
142 }
143 }
144}
145
146#[derive(Debug)]
148pub struct DefaultSqueezeIo {
149 io_context: Arc<dyn IoContext>,
150 entry_id: EntryID,
151 observer: Arc<Observer>,
152}
153
154impl DefaultSqueezeIo {
155 pub fn new(io_context: Arc<dyn IoContext>, entry_id: EntryID, observer: Arc<Observer>) -> Self {
157 Self {
158 io_context,
159 entry_id,
160 observer,
161 }
162 }
163}
164
165#[async_trait::async_trait]
166impl SqueezeIoHandler for DefaultSqueezeIo {
167 async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
168 let path = self.io_context.disk_path(&self.entry_id);
169 let bytes = self.io_context.read(path, range).await?;
170 self.observer
171 .record_internal(InternalEvent::IoReadSqueezedBacking {
172 entry: self.entry_id,
173 bytes: bytes.len(),
174 });
175 Ok(bytes)
176 }
177
178 fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
179 self.observer
180 .record_internal(InternalEvent::DecompressSqueezed {
181 entry: self.entry_id,
182 decompressed: decompress_cnt,
183 total: total_cnt,
184 });
185 }
186
187 fn trace_io_saved(&self) {
188 self.observer.runtime_stats().incr_squeeze_io_saved();
189 }
190}
191
192#[cfg(test)]
193#[derive(Debug, Default)]
194pub(crate) struct TestSqueezeIo {
195 bytes: std::sync::Mutex<Option<Bytes>>,
196 reads: std::sync::atomic::AtomicUsize,
197}
198
199#[cfg(test)]
200impl TestSqueezeIo {
201 pub(crate) fn set_bytes(&self, bytes: Bytes) {
202 *self.bytes.lock().unwrap() = Some(bytes);
203 }
204
205 pub(crate) fn reads(&self) -> usize {
206 self.reads.load(std::sync::atomic::Ordering::SeqCst)
207 }
208
209 pub(crate) fn reset_reads(&self) {
210 self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
211 }
212}
213
214#[cfg(test)]
215#[async_trait::async_trait]
216impl SqueezeIoHandler for TestSqueezeIo {
217 async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
218 self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
219 let bytes = self
220 .bytes
221 .lock()
222 .unwrap()
223 .clone()
224 .expect("test squeeze backing set");
225 Ok(match range {
226 Some(range) => bytes.slice(range.start as usize..range.end as usize),
227 None => bytes,
228 })
229 }
230}