Skip to main content

liquid_cache/cache/
io_context.rs

1use std::{fmt::Debug, ops::Range, path::PathBuf};
2
3use ahash::AHashMap;
4use bytes::Bytes;
5
6use crate::sync::{Arc, RwLock};
7use crate::{
8    cache::{
9        CacheExpression, Observer,
10        observer::InternalEvent,
11        utils::{EntryID, LiquidCompressorStates},
12    },
13    liquid_array::SqueezeIoHandler,
14};
15
16/// A trait for objects that can handle IO operations for the cache.
17#[async_trait::async_trait]
18pub trait IoContext: Debug + Send + Sync {
19    /// Add a squeeze hint for an entry.
20    fn add_squeeze_hint(&self, _entry_id: &EntryID, _expression: Arc<CacheExpression>) {
21        // Do nothing by default
22    }
23
24    /// Get the squeeze hint for an entry.
25    /// If None, the entry will be evicted to disk entirely.
26    /// If Some, the entry will be squeezed according to the cache expressions previously recorded for this column.
27    /// For example, if expression is ExtractDate32 { field: Date32Field::Year },
28    /// the entry will be squeezed to a [crate::liquid_array::SqueezedDate32Array] with the year
29    /// component (Date32 or Timestamp input).
30    fn squeeze_hint(&self, _entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
31        None
32    }
33
34    /// Get the compressor for an entry.
35    fn get_compressor(&self, entry_id: &EntryID) -> Arc<LiquidCompressorStates>;
36
37    /// Get the disk path for a cache entry.
38    fn disk_path(&self, entry_id: &EntryID) -> PathBuf;
39
40    /// Read bytes from the file at the given path, optionally restricted to the provided range.
41    async fn read(&self, path: PathBuf, range: Option<Range<u64>>)
42    -> Result<Bytes, std::io::Error>;
43
44    /// Write the entire buffer to a file at the given path.
45    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error>;
46}
47
48/// A default implementation of [IoContext] that uses the default compressor.
49/// It uses tokio's async IO by default.
50#[derive(Debug)]
51pub struct DefaultIoContext {
52    compressor_state: Arc<LiquidCompressorStates>,
53    squeeze_hints: RwLock<AHashMap<EntryID, Arc<CacheExpression>>>,
54    base_dir: PathBuf,
55}
56
57impl DefaultIoContext {
58    /// Create a new instance of [DefaultIoContext].
59    pub fn new(base_dir: PathBuf) -> Self {
60        Self {
61            compressor_state: Arc::new(LiquidCompressorStates::new()),
62            base_dir,
63            squeeze_hints: RwLock::new(AHashMap::new()),
64        }
65    }
66}
67
68#[async_trait::async_trait]
69impl IoContext for DefaultIoContext {
70    fn add_squeeze_hint(&self, entry_id: &EntryID, expression: Arc<CacheExpression>) {
71        let mut guard = self.squeeze_hints.write().unwrap();
72        guard.insert(*entry_id, expression);
73    }
74
75    fn squeeze_hint(&self, entry_id: &EntryID) -> Option<Arc<CacheExpression>> {
76        let guard = self.squeeze_hints.read().unwrap();
77        guard.get(entry_id).cloned()
78    }
79
80    fn get_compressor(&self, _entry_id: &EntryID) -> Arc<LiquidCompressorStates> {
81        self.compressor_state.clone()
82    }
83
84    fn disk_path(&self, entry_id: &EntryID) -> PathBuf {
85        self.base_dir
86            .join(format!("{:016x}.liquid", usize::from(*entry_id)))
87    }
88
89    async fn read(
90        &self,
91        path: PathBuf,
92        range: Option<Range<u64>>,
93    ) -> Result<Bytes, std::io::Error> {
94        if cfg!(test) {
95            let mut file = std::fs::File::open(path)?;
96            match range {
97                Some(range) => {
98                    let len = (range.end - range.start) as usize;
99                    let mut bytes = vec![0u8; len];
100                    std::io::Seek::seek(&mut file, std::io::SeekFrom::Start(range.start))?;
101                    std::io::Read::read_exact(&mut file, &mut bytes)?;
102                    Ok(Bytes::from(bytes))
103                }
104                None => {
105                    let mut bytes = Vec::new();
106                    std::io::Read::read_to_end(&mut file, &mut bytes)?;
107                    Ok(Bytes::from(bytes))
108                }
109            }
110        } else {
111            use tokio::io::AsyncReadExt;
112            use tokio::io::AsyncSeekExt;
113            let mut file = tokio::fs::File::open(path).await?;
114
115            match range {
116                Some(range) => {
117                    let len = (range.end - range.start) as usize;
118                    let mut bytes = vec![0u8; len];
119                    file.seek(tokio::io::SeekFrom::Start(range.start)).await?;
120                    file.read_exact(&mut bytes).await?;
121                    Ok(Bytes::from(bytes))
122                }
123                None => {
124                    let mut bytes = Vec::new();
125                    file.read_to_end(&mut bytes).await?;
126                    Ok(Bytes::from(bytes))
127                }
128            }
129        }
130    }
131
132    async fn write_file(&self, path: PathBuf, data: Bytes) -> Result<(), std::io::Error> {
133        if cfg!(test) {
134            std::fs::write(path, data.as_ref())?;
135            Ok(())
136        } else {
137            use tokio::io::AsyncWriteExt;
138            let mut file = tokio::fs::File::create(path).await?;
139            file.write_all(&data).await?;
140            file.sync_all().await?;
141            Ok(())
142        }
143    }
144}
145
146/// A default implementation of [SqueezeIoHandler] that uses the default [IoContext].
147#[derive(Debug)]
148pub struct DefaultSqueezeIo {
149    io_context: Arc<dyn IoContext>,
150    entry_id: EntryID,
151    observer: Arc<Observer>,
152}
153
154impl DefaultSqueezeIo {
155    /// Create a new instance of [DefaultSqueezeIo].
156    pub fn new(io_context: Arc<dyn IoContext>, entry_id: EntryID, observer: Arc<Observer>) -> Self {
157        Self {
158            io_context,
159            entry_id,
160            observer,
161        }
162    }
163}
164
165#[async_trait::async_trait]
166impl SqueezeIoHandler for DefaultSqueezeIo {
167    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
168        let path = self.io_context.disk_path(&self.entry_id);
169        let bytes = self.io_context.read(path, range).await?;
170        self.observer
171            .record_internal(InternalEvent::IoReadSqueezedBacking {
172                entry: self.entry_id,
173                bytes: bytes.len(),
174            });
175        Ok(bytes)
176    }
177
178    fn tracing_decompress_count(&self, decompress_cnt: usize, total_cnt: usize) {
179        self.observer
180            .record_internal(InternalEvent::DecompressSqueezed {
181                entry: self.entry_id,
182                decompressed: decompress_cnt,
183                total: total_cnt,
184            });
185    }
186
187    fn trace_io_saved(&self) {
188        self.observer.runtime_stats().incr_squeeze_io_saved();
189    }
190}
191
192#[cfg(test)]
193#[derive(Debug, Default)]
194pub(crate) struct TestSqueezeIo {
195    bytes: std::sync::Mutex<Option<Bytes>>,
196    reads: std::sync::atomic::AtomicUsize,
197}
198
199#[cfg(test)]
200impl TestSqueezeIo {
201    pub(crate) fn set_bytes(&self, bytes: Bytes) {
202        *self.bytes.lock().unwrap() = Some(bytes);
203    }
204
205    pub(crate) fn reads(&self) -> usize {
206        self.reads.load(std::sync::atomic::Ordering::SeqCst)
207    }
208
209    pub(crate) fn reset_reads(&self) {
210        self.reads.store(0, std::sync::atomic::Ordering::SeqCst);
211    }
212}
213
214#[cfg(test)]
215#[async_trait::async_trait]
216impl SqueezeIoHandler for TestSqueezeIo {
217    async fn read(&self, range: Option<Range<u64>>) -> std::io::Result<Bytes> {
218        self.reads.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
219        let bytes = self
220            .bytes
221            .lock()
222            .unwrap()
223            .clone()
224            .expect("test squeeze backing set");
225        Ok(match range {
226            Some(range) => bytes.slice(range.start as usize..range.end as usize),
227            None => bytes,
228        })
229    }
230}