Skip to main content

liquid_cache/cache/
utils.rs

1#[cfg(test)]
2use crate::cache::cached_batch::CacheEntry;
3use crate::sync::{Arc, RwLock};
4use arrow::array::ArrayRef;
5use arrow_schema::ArrowError;
6use bytes::Bytes;
7
8#[derive(Debug)]
9pub struct CacheConfig {
10    batch_size: usize,
11    max_memory_bytes: usize,
12    max_disk_bytes: usize,
13}
14
15impl CacheConfig {
16    pub(super) fn new(batch_size: usize, max_memory_bytes: usize, max_disk_bytes: usize) -> Self {
17        Self {
18            batch_size,
19            max_memory_bytes,
20            max_disk_bytes,
21        }
22    }
23
24    pub fn batch_size(&self) -> usize {
25        self.batch_size
26    }
27
28    pub fn max_memory_bytes(&self) -> usize {
29        self.max_memory_bytes
30    }
31
32    pub fn max_disk_bytes(&self) -> usize {
33        self.max_disk_bytes
34    }
35}
36
37// Helper methods
38#[cfg(test)]
39pub(crate) fn create_test_array(size: usize) -> CacheEntry {
40    use arrow::array::Int64Array;
41    use std::sync::Arc;
42
43    CacheEntry::memory_arrow(Arc::new(Int64Array::from_iter_values(0..size as i64)))
44}
45
46// Helper methods
47#[cfg(test)]
48pub(crate) fn create_test_arrow_array(size: usize) -> ArrayRef {
49    use arrow::array::Int64Array;
50    Arc::new(Int64Array::from_iter_values(0..size as i64))
51}
52
53#[cfg(test)]
54pub(crate) async fn create_cache_store(
55    max_memory_bytes: usize,
56    policy: Box<dyn super::policies::CachePolicy>,
57) -> Arc<super::core::LiquidCache> {
58    use crate::cache::{AlwaysHydrate, LiquidCacheBuilder, TranscodeSqueezeEvict};
59
60    let batch_size = 128;
61
62    let builder = LiquidCacheBuilder::new()
63        .with_batch_size(batch_size)
64        .with_max_memory_bytes(max_memory_bytes)
65        .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
66        .with_hydration_policy(Box::new(AlwaysHydrate::new()))
67        .with_cache_policy(policy);
68    builder.build().await
69}
70
71/// EntryID is a unique identifier for a batch of rows, i.e., the cache key.
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd, serde::Serialize)]
73pub struct EntryID {
74    val: usize,
75}
76
77impl From<usize> for EntryID {
78    fn from(val: usize) -> Self {
79        Self { val }
80    }
81}
82
83impl From<EntryID> for usize {
84    fn from(val: EntryID) -> Self {
85        val.val
86    }
87}
88
89/// States for liquid compressor.
90pub struct LiquidCompressorStates {
91    fsst_compressor: RwLock<Option<Arc<fsst::Compressor>>>,
92}
93
94impl std::fmt::Debug for LiquidCompressorStates {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        write!(f, "EtcCompressorStates")
97    }
98}
99
100impl Default for LiquidCompressorStates {
101    fn default() -> Self {
102        Self::new()
103    }
104}
105
106impl LiquidCompressorStates {
107    /// Create a new instance of LiquidCompressorStates.
108    pub fn new() -> Self {
109        Self {
110            fsst_compressor: RwLock::new(None),
111        }
112    }
113
114    /// Create a new instance of LiquidCompressorStates with an fsst compressor.
115    pub fn new_with_fsst_compressor(fsst_compressor: Arc<fsst::Compressor>) -> Self {
116        Self {
117            fsst_compressor: RwLock::new(Some(fsst_compressor)),
118        }
119    }
120
121    /// Get the fsst compressor.
122    pub fn fsst_compressor(&self) -> Option<Arc<fsst::Compressor>> {
123        self.fsst_compressor.read().unwrap().clone()
124    }
125
126    /// Get the fsst compressor .
127    pub fn fsst_compressor_raw(&self) -> &RwLock<Option<Arc<fsst::Compressor>>> {
128        &self.fsst_compressor
129    }
130}
131
132pub(crate) fn arrow_to_bytes(array: &ArrayRef) -> Result<Bytes, ArrowError> {
133    use arrow::array::RecordBatch;
134    use arrow::ipc::writer::StreamWriter;
135
136    let mut bytes = Vec::new();
137
138    // Create a record batch with the single array
139    // We need to create a dummy field since we don't have the original field here
140    let field =
141        arrow_schema::Field::new("column", array.data_type().clone(), array.null_count() > 0);
142    let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![field]));
143    let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()])?;
144
145    let mut stream_writer = StreamWriter::try_new(&mut bytes, &schema)?;
146    stream_writer.write(&batch)?;
147    stream_writer.finish()?;
148
149    Ok(Bytes::from(bytes))
150}