liquid_cache_storage/cache/
utils.rs1#[cfg(test)]
2use crate::cache::cached_batch::CachedBatch;
3use crate::sync::{Arc, RwLock};
4use arrow::array::ArrayRef;
5use arrow_schema::ArrowError;
6use bytes::Bytes;
7use std::path::PathBuf;
8
9#[derive(Debug)]
10pub struct CacheConfig {
11 batch_size: usize,
12 max_cache_bytes: usize,
13 cache_root_dir: PathBuf,
14}
15
16impl CacheConfig {
17 pub(super) fn new(batch_size: usize, max_cache_bytes: usize, cache_root_dir: PathBuf) -> Self {
18 Self {
19 batch_size,
20 max_cache_bytes,
21 cache_root_dir,
22 }
23 }
24
25 pub fn batch_size(&self) -> usize {
26 self.batch_size
27 }
28
29 pub fn max_cache_bytes(&self) -> usize {
30 self.max_cache_bytes
31 }
32
33 pub fn cache_root_dir(&self) -> &PathBuf {
34 &self.cache_root_dir
35 }
36}
37
38#[cfg(test)]
40pub(crate) fn create_test_array(size: usize) -> CachedBatch {
41 use arrow::array::Int64Array;
42 use std::sync::Arc;
43
44 CachedBatch::MemoryArrow(Arc::new(Int64Array::from_iter_values(0..size as i64)))
45}
46
47#[cfg(test)]
49pub(crate) fn create_test_arrow_array(size: usize) -> ArrayRef {
50 use arrow::array::Int64Array;
51 Arc::new(Int64Array::from_iter_values(0..size as i64))
52}
53
54#[cfg(test)]
55pub(crate) fn create_cache_store(
56 max_cache_bytes: usize,
57 policy: Box<dyn super::cache_policies::CachePolicy>,
58) -> Arc<super::core::CacheStorage> {
59 use tempfile::tempdir;
60
61 use crate::cache::{
62 CacheStorageBuilder, core::BlockingIoContext, squeeze_policies::TranscodeSqueezeEvict,
63 };
64
65 let temp_dir = tempdir().unwrap();
66 let base_dir = temp_dir.keep();
67 let batch_size = 128;
68
69 let builder = CacheStorageBuilder::new()
70 .with_batch_size(batch_size)
71 .with_max_cache_bytes(max_cache_bytes)
72 .with_cache_dir(base_dir.clone())
73 .with_squeeze_policy(Box::new(TranscodeSqueezeEvict))
74 .with_cache_policy(policy)
75 .with_io_worker(Arc::new(BlockingIoContext::new(base_dir)));
76 builder.build()
77}
78
79#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Ord, PartialOrd)]
81pub struct EntryID {
82 val: usize,
83}
84
85impl From<usize> for EntryID {
86 fn from(val: usize) -> Self {
87 Self { val }
88 }
89}
90
91impl From<EntryID> for usize {
92 fn from(val: EntryID) -> Self {
93 val.val
94 }
95}
96
97pub struct LiquidCompressorStates {
99 fsst_compressor: RwLock<Option<Arc<fsst::Compressor>>>,
100}
101
102impl std::fmt::Debug for LiquidCompressorStates {
103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
104 write!(f, "EtcCompressorStates")
105 }
106}
107
108impl Default for LiquidCompressorStates {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl LiquidCompressorStates {
115 pub fn new() -> Self {
117 Self {
118 fsst_compressor: RwLock::new(None),
119 }
120 }
121
122 pub fn new_with_fsst_compressor(fsst_compressor: Arc<fsst::Compressor>) -> Self {
124 Self {
125 fsst_compressor: RwLock::new(Some(fsst_compressor)),
126 }
127 }
128
129 pub fn fsst_compressor(&self) -> Option<Arc<fsst::Compressor>> {
131 self.fsst_compressor.read().unwrap().clone()
132 }
133
134 pub fn fsst_compressor_raw(&self) -> &RwLock<Option<Arc<fsst::Compressor>>> {
136 &self.fsst_compressor
137 }
138}
139
140pub(crate) fn arrow_to_bytes(array: &ArrayRef) -> Result<Bytes, ArrowError> {
141 use arrow::array::RecordBatch;
142 use arrow::ipc::writer::StreamWriter;
143
144 let mut bytes = Vec::new();
145
146 let field =
149 arrow_schema::Field::new("column", array.data_type().clone(), array.null_count() > 0);
150 let schema = std::sync::Arc::new(arrow_schema::Schema::new(vec![field]));
151 let batch = RecordBatch::try_new(schema.clone(), vec![array.clone()])?;
152
153 let mut stream_writer = StreamWriter::try_new(&mut bytes, &schema)?;
154 stream_writer.write(&batch)?;
155 stream_writer.finish()?;
156
157 Ok(Bytes::from(bytes))
158}