Skip to main content

liquid_cache/cache/
utils.rs

1#[cfg(test)]
2use crate::cache::cached_batch::CacheEntry;
3use crate::sync::{Arc, RwLock};
4use arrow::array::ArrayRef;
5use arrow_schema::ArrowError;
6use bytes::Bytes;
7use std::path::PathBuf;
8
9#[derive(Debug)]
10pub struct CacheConfig {
11    batch_size: usize,
12    max_cache_bytes: usize,
13    cache_root_dir: PathBuf,
14}
15
16impl CacheConfig {
17    pub(super) fn new(batch_size: usize, max_cache_bytes: usize, cache_root_dir: PathBuf) -> Self {
18        Self {
19            batch_size,
20            max_cache_bytes,
21            cache_root_dir,
22        }
23    }
24
25    pub fn batch_size(&self) -> usize {
26        self.batch_size
27    }
28
29    pub fn max_cache_bytes(&self) -> usize {
30        self.max_cache_bytes
31    }
32
33    pub fn cache_root_dir(&self) -> &PathBuf {
34        &self.cache_root_dir
35    }
36}
37
38// Helper methods
39#[cfg(test)]
40pub(crate) fn create_test_array(size: usize) -> CacheEntry {
41    use arrow::array::Int64Array;
42    use std::sync::Arc;
43
44    CacheEntry::memory_arrow(Arc::new(Int64Array::from_iter_values(0..size as i64)))
45}
46
47// Helper methods
48#[cfg(test)]
49pub(crate) fn create_test_arrow_array(size: usize) -> ArrayRef {
50    use arrow::array::Int64Array;
51    Arc::new(Int64Array::from_iter_values(0..size as i64))
52}
53
54#[cfg(test)]
55pub(crate) fn create_cache_store(
56    max_cache_bytes: usize,
57    policy: Box<dyn super::policies::CachePolicy>,
58) -> Arc<super::core::LiquidCache> {
59    use tempfile::tempdir;
60
61    use crate::cache::{
62        AlwaysHydrate, DefaultIoContext, LiquidCacheBuilder, TranscodeSqueezeEvict,
63    };
64
65    let temp_dir = tempdir().unwrap();
66    let base_dir = temp_dir.keep();
67    let batch_size = 128;
68
69    let builder = LiquidCacheBuilder::new()
70        .with_batch_size(batch_size)
71        .with_max_cache_bytes(max_cache_bytes)
72        .with_cache_dir(base_dir.clone())
73        .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
74        .with_hydration_policy(Box::new(AlwaysHydrate::new()))
75        .with_cache_policy(policy)
76        .with_io_context(Arc::new(DefaultIoContext::new(base_dir)));
77    builder.build()
78}
79
80/// EntryID is a unique identifier for a batch of rows, i.e., the cache key.
81#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, serde::Serialize)]
82pub struct EntryID {
83    val: usize,
84}
85
86impl From<usize> for EntryID {
87    fn from(val: usize) -> Self {
88        Self { val }
89    }
90}
91
92impl From<EntryID> for usize {
93    fn from(val: EntryID) -> Self {
94        val.val
95    }
96}
97
98/// States for liquid compressor.
99pub struct LiquidCompressorStates {
100    fsst_compressor: RwLock<Option<Arc<fsst::Compressor>>>,
101}
102
103impl std::fmt::Debug for LiquidCompressorStates {
104    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
105        write!(f, "EtcCompressorStates")
106    }
107}
108
109impl Default for LiquidCompressorStates {
110    fn default() -> Self {
111        Self::new()
112    }
113}
114
115impl LiquidCompressorStates {
116    /// Create a new instance of LiquidCompressorStates.
117    pub fn new() -> Self {
118        Self {
119            fsst_compressor: RwLock::new(None),
120        }
121    }
122
123    /// Create a new instance of LiquidCompressorStates with an fsst compressor.
124    pub fn new_with_fsst_compressor(fsst_compressor: Arc<fsst::Compressor>) -> Self {
125        Self {
126            fsst_compressor: RwLock::new(Some(fsst_compressor)),
127        }
128    }
129
130    /// Get the fsst compressor.
131    pub fn fsst_compressor(&self) -> Option<Arc<fsst::Compressor>> {
132        self.fsst_compressor.read().unwrap().clone()
133    }
134
135    /// Get the fsst compressor .
136    pub fn fsst_compressor_raw(&self) -> &RwLock<Option<Arc<fsst::Compressor>>> {
137        &self.fsst_compressor
138    }
139}
140
141pub(crate) fn arrow_to_bytes(array: &ArrayRef) -> Result<Bytes, ArrowError> {
142    use arrow::array::RecordBatch;
143    use arrow::ipc::writer::StreamWriter;
144
145    let mut bytes = Vec::new();
146
147    // Create a record batch with the single array
148    // We need to create a dummy field since we don't have the original field here
149    let field =
150        arrow_schema::Field::new("column", array.data_type().clone(), array.null_count() > 0);
151    let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![field]));
152    let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()])?;
153
154    let mut stream_writer = StreamWriter::try_new(&mut bytes, &schema)?;
155    stream_writer.write(&batch)?;
156    stream_writer.finish()?;
157
158    Ok(Bytes::from(bytes))
159}