Skip to main content

liquid_cache/cache/
builders.rs

1use std::future::{Future, IntoFuture};
2use std::path::PathBuf;
3use std::pin::Pin;
4
5use arrow::array::{
6    Array, ArrayData, ArrayRef, BinaryViewArray, BooleanArray, StringViewArray, make_array,
7};
8use arrow::buffer::BooleanBuffer;
9use datafusion::physical_plan::PhysicalExpr;
10
11use super::cached_batch::CacheEntry;
12use super::core::LiquidCache;
13use super::io_context::{DefaultIoContext, IoContext};
14use super::policies::{CachePolicy, HydrationPolicy, SqueezePolicy, TranscodeSqueezeEvict};
15use super::{CacheExpression, EntryID, LiquidPolicy};
16use crate::sync::Arc;
17
18/// Builder for [LiquidCache].
19///
20/// Example:
21/// ```rust
22/// use liquid_cache::cache::LiquidCacheBuilder;
23/// use liquid_cache::cache_policies::LiquidPolicy;
24///
25///
26/// let _storage = LiquidCacheBuilder::new()
27///     .with_batch_size(8192)
28///     .with_max_cache_bytes(1024 * 1024 * 1024)
29///     .with_cache_policy(Box::new(LiquidPolicy::new()))
30///     .build();
31/// ```
32pub struct LiquidCacheBuilder {
33    batch_size: usize,
34    max_cache_bytes: usize,
35    cache_dir: Option<PathBuf>,
36    cache_policy: Box<dyn CachePolicy>,
37    hydration_policy: Box<dyn HydrationPolicy>,
38    squeeze_policy: Box<dyn SqueezePolicy>,
39    io_context: Option<Arc<dyn IoContext>>,
40}
41
42impl Default for LiquidCacheBuilder {
43    fn default() -> Self {
44        Self::new()
45    }
46}
47
48impl LiquidCacheBuilder {
49    /// Create a new instance of [LiquidCacheBuilder].
50    pub fn new() -> Self {
51        Self {
52            batch_size: 8192,
53            max_cache_bytes: 1024 * 1024 * 1024,
54            cache_dir: None,
55            cache_policy: Box::new(LiquidPolicy::new()),
56            hydration_policy: Box::new(super::AlwaysHydrate::new()),
57            squeeze_policy: Box::new(TranscodeSqueezeEvict),
58            io_context: None,
59        }
60    }
61
62    /// Set the cache directory for the cache.
63    /// Default is a temporary directory.
64    pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
65        self.cache_dir = Some(cache_dir);
66        self
67    }
68
69    /// Set the batch size for the cache.
70    /// Default is 8192.
71    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
72        self.batch_size = batch_size;
73        self
74    }
75
76    /// Set the max cache bytes for the cache.
77    /// Default is 1GB.
78    pub fn with_max_cache_bytes(mut self, max_cache_bytes: usize) -> Self {
79        self.max_cache_bytes = max_cache_bytes;
80        self
81    }
82
83    /// Set the cache policy for the cache.
84    /// Default is [LiquidPolicy].
85    pub fn with_cache_policy(mut self, policy: Box<dyn CachePolicy>) -> Self {
86        self.cache_policy = policy;
87        self
88    }
89
90    /// Set the hydration policy for the cache.
91    /// Default is [crate::cache::NoHydration].
92    pub fn with_hydration_policy(mut self, policy: Box<dyn HydrationPolicy>) -> Self {
93        self.hydration_policy = policy;
94        self
95    }
96
97    /// Set the squeeze policy for the cache.
98    /// Default is [TranscodeSqueezeEvict].
99    pub fn with_squeeze_policy(mut self, policy: Box<dyn SqueezePolicy>) -> Self {
100        self.squeeze_policy = policy;
101        self
102    }
103
104    /// Set the [IoContext] for the cache.
105    /// Default is [DefaultIoContext].
106    pub fn with_io_context(mut self, io_context: Arc<dyn IoContext>) -> Self {
107        self.io_context = Some(io_context);
108        self
109    }
110
111    /// Build the cache storage.
112    ///
113    /// The cache storage is wrapped in an [Arc] to allow for concurrent access.
114    pub fn build(self) -> Arc<LiquidCache> {
115        let cache_dir = self
116            .cache_dir
117            .unwrap_or_else(|| tempfile::tempdir().unwrap().keep());
118        let io_worker = self
119            .io_context
120            .unwrap_or_else(|| Arc::new(DefaultIoContext::new(cache_dir.clone())));
121        Arc::new(LiquidCache::new(
122            self.batch_size,
123            self.max_cache_bytes,
124            cache_dir,
125            self.squeeze_policy,
126            self.cache_policy,
127            self.hydration_policy,
128            io_worker,
129        ))
130    }
131}
132
133/// Builder returned by [`LiquidCache::insert`] for configuring cache writes.
134#[derive(Debug)]
135pub struct Insert<'a> {
136    pub(super) storage: &'a Arc<LiquidCache>,
137    pub(super) entry_id: EntryID,
138    pub(super) batch: ArrayRef,
139    pub(super) skip_gc: bool,
140    pub(super) squeeze_hint: Option<Arc<CacheExpression>>,
141}
142
143impl<'a> Insert<'a> {
144    pub(super) fn new(storage: &'a Arc<LiquidCache>, entry_id: EntryID, batch: ArrayRef) -> Self {
145        Self {
146            storage,
147            entry_id,
148            batch,
149            skip_gc: false,
150            squeeze_hint: None,
151        }
152    }
153
154    /// Skip garbage collection of view arrays.
155    pub fn with_skip_gc(mut self) -> Self {
156        self.skip_gc = true;
157        self
158    }
159
160    /// Set a squeeze hint for the entry.
161    pub fn with_squeeze_hint(mut self, expression: Arc<CacheExpression>) -> Self {
162        self.squeeze_hint = Some(expression);
163        self
164    }
165
166    async fn run(self) {
167        let batch = if self.skip_gc {
168            self.batch.clone()
169        } else {
170            maybe_gc_view_arrays(&self.batch).unwrap_or_else(|| self.batch.clone())
171        };
172        if let Some(squeeze_hint) = self.squeeze_hint {
173            self.storage.add_squeeze_hint(&self.entry_id, squeeze_hint);
174        }
175        let batch = CacheEntry::memory_arrow(batch);
176        self.storage.insert_inner(self.entry_id, batch).await;
177    }
178}
179
180impl<'a> IntoFuture for Insert<'a> {
181    type Output = ();
182    type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
183
184    fn into_future(self) -> Self::IntoFuture {
185        Box::pin(async move { self.run().await })
186    }
187}
188
189/// Builder returned by [`LiquidCache::get`] for configuring cache reads.
190#[derive(Debug)]
191pub struct Get<'a> {
192    pub(super) storage: &'a LiquidCache,
193    pub(super) entry_id: &'a EntryID,
194    pub(super) selection: Option<&'a BooleanBuffer>,
195    pub(super) expression_hint: Option<Arc<CacheExpression>>,
196}
197
198impl<'a> Get<'a> {
199    pub(super) fn new(storage: &'a LiquidCache, entry_id: &'a EntryID) -> Self {
200        Self {
201            storage,
202            entry_id,
203            selection: None,
204            expression_hint: None,
205        }
206    }
207
208    /// Attach a selection bitmap used to filter rows prior to materialization.
209    pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
210        self.selection = Some(selection);
211        self
212    }
213
214    /// Attach an expression hint that may help optimize cache materialization.
215    pub fn with_expression_hint(mut self, expression: Arc<CacheExpression>) -> Self {
216        self.expression_hint = Some(expression);
217        self
218    }
219
220    /// Attach an optional expression hint.
221    pub fn with_optional_expression_hint(
222        mut self,
223        expression: Option<Arc<CacheExpression>>,
224    ) -> Self {
225        self.expression_hint = expression;
226        self
227    }
228
229    /// Materialize the cached array as [`ArrayRef`].
230    pub async fn read(self) -> Option<ArrayRef> {
231        self.storage.observer().on_get(self.selection.is_some());
232        self.storage
233            .read_arrow_array(
234                self.entry_id,
235                self.selection,
236                self.expression_hint.as_deref(),
237            )
238            .await
239    }
240}
241
242impl<'a> IntoFuture for Get<'a> {
243    type Output = Option<ArrayRef>;
244    type IntoFuture = Pin<Box<dyn std::future::Future<Output = Option<ArrayRef>> + Send + 'a>>;
245
246    fn into_future(self) -> Self::IntoFuture {
247        Box::pin(async move { self.read().await })
248    }
249}
250
251/// Recursively garbage collects view arrays (BinaryView/StringView) within an array tree.
252fn maybe_gc_view_arrays(array: &ArrayRef) -> Option<ArrayRef> {
253    if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
254        return Some(Arc::new(binary_view.gc()));
255    }
256    if let Some(utf8_view) = array.as_any().downcast_ref::<StringViewArray>() {
257        return Some(Arc::new(utf8_view.gc()));
258    }
259
260    let data = array.to_data();
261    if data.child_data().is_empty() {
262        return None;
263    }
264
265    let mut changed = false;
266    let mut children: Vec<ArrayData> = Vec::with_capacity(data.child_data().len());
267    for child in data.child_data() {
268        let child_array = make_array(child.clone());
269        if let Some(gc_child) = maybe_gc_view_arrays(&child_array) {
270            changed = true;
271            children.push(gc_child.to_data());
272        } else {
273            children.push(child.clone());
274        }
275    }
276
277    if !changed {
278        return None;
279    }
280
281    let new_data = data.into_builder().child_data(children).build().ok()?;
282    Some(make_array(new_data))
283}
284
285/// Builder for predicate evaluation on cached data.
286#[derive(Debug)]
287pub struct EvaluatePredicate<'a> {
288    pub(super) storage: &'a LiquidCache,
289    pub(super) entry_id: &'a EntryID,
290    pub(super) predicate: &'a Arc<dyn PhysicalExpr>,
291    pub(super) selection: Option<&'a BooleanBuffer>,
292}
293
294impl<'a> EvaluatePredicate<'a> {
295    pub(super) fn new(
296        storage: &'a LiquidCache,
297        entry_id: &'a EntryID,
298        predicate: &'a Arc<dyn PhysicalExpr>,
299    ) -> Self {
300        Self {
301            storage,
302            entry_id,
303            predicate,
304            selection: None,
305        }
306    }
307
308    /// Attach a selection bitmap used to pre-filter rows before predicate evaluation.
309    pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
310        self.selection = Some(selection);
311        self
312    }
313
314    /// Evaluate the predicate against the cached data.
315    pub async fn read(self) -> Option<Result<BooleanArray, ArrayRef>> {
316        self.storage
317            .eval_predicate_internal(self.entry_id, self.selection, self.predicate)
318            .await
319    }
320}
321
322impl<'a> IntoFuture for EvaluatePredicate<'a> {
323    type Output = Option<Result<BooleanArray, ArrayRef>>;
324    type IntoFuture = Pin<
325        Box<dyn std::future::Future<Output = Option<Result<BooleanArray, ArrayRef>>> + Send + 'a>,
326    >;
327
328    fn into_future(self) -> Self::IntoFuture {
329        Box::pin(async move { self.read().await })
330    }
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336    use arrow::array::{AsArray, StructArray};
337    use arrow_schema::{DataType, Field, Fields};
338
339    #[tokio::test]
340    async fn insert_gcs_view_arrays_recursively() {
341        // Build view arrays then slice to create non-zero offsets (and larger backing buffers).
342        let bin = Arc::new(BinaryViewArray::from(vec![
343            Some(b"long_prefix_m0" as &[u8]),
344            Some(b"m1"),
345        ])) as ArrayRef;
346        let str_view = Arc::new(StringViewArray::from(vec![
347            Some("long_prefix_s0"),
348            Some("s1"),
349        ])) as ArrayRef;
350        let variant_metadata = Arc::new(BinaryViewArray::from(vec![
351            Some(b"meta0" as &[u8]),
352            Some(b"meta1"),
353        ])) as ArrayRef;
354        let variant_value = Arc::new(BinaryViewArray::from(vec![
355            Some(b"value0" as &[u8]),
356            Some(b"value1"),
357        ])) as ArrayRef;
358
359        // Slice to keep only the second element so buffers still reference unused bytes.
360        let bin_slice = bin.slice(1, 1);
361        let str_slice = str_view.slice(1, 1);
362        let variant_metadata_slice = variant_metadata.slice(1, 1);
363        let variant_value_slice = variant_value.slice(1, 1);
364
365        // Variant-like struct: metadata (BinaryView), value (BinaryView), and a typed string view.
366        let variant_typed_fields = Fields::from(vec![Arc::new(Field::new(
367            "typed_str",
368            DataType::Utf8View,
369            true,
370        ))]);
371        let variant_struct_fields = Fields::from(vec![
372            Arc::new(Field::new("metadata", DataType::BinaryView, true)),
373            Arc::new(Field::new("value", DataType::BinaryView, true)),
374            Arc::new(Field::new(
375                "typed_value",
376                DataType::Struct(variant_typed_fields.clone()),
377                true,
378            )),
379        ]);
380        let variant_struct = Arc::new(StructArray::new(
381            variant_struct_fields.clone(),
382            vec![
383                variant_metadata_slice.clone(),
384                variant_value_slice.clone(),
385                Arc::new(StructArray::new(
386                    variant_typed_fields.clone(),
387                    vec![str_slice.clone()],
388                    None,
389                )) as ArrayRef,
390            ],
391            None,
392        ));
393
394        let root_fields = Fields::from(vec![
395            Arc::new(Field::new("bin_view", DataType::BinaryView, true)),
396            Arc::new(Field::new("str_view", DataType::Utf8View, true)),
397            Arc::new(Field::new(
398                "variant",
399                DataType::Struct(variant_struct_fields.clone()),
400                true,
401            )),
402        ]);
403        let root = Arc::new(StructArray::new(
404            root_fields,
405            vec![
406                bin_slice.clone(),
407                str_slice.clone(),
408                variant_struct.clone() as ArrayRef,
409            ],
410            None,
411        )) as ArrayRef;
412
413        let pre_size = root.get_array_memory_size();
414
415        let cache = LiquidCacheBuilder::new().build();
416        let entry_id = EntryID::from(123usize);
417        cache.insert(entry_id, root.clone()).await;
418
419        let stored = cache.get(&entry_id).await.expect("array present");
420        let post_size = stored.get_array_memory_size();
421
422        // GC should have compacted the view arrays, reducing memory footprint.
423        assert!(post_size < pre_size, "expected gc to reduce memory usage");
424
425        // Validate values are preserved.
426        let struct_out = stored
427            .as_any()
428            .downcast_ref::<StructArray>()
429            .expect("struct array");
430
431        assert_eq!(struct_out.len(), 1);
432
433        let bin_out = struct_out
434            .column_by_name("bin_view")
435            .unwrap()
436            .as_binary_view();
437        assert_eq!(bin_out.value(0), b"m1");
438
439        let str_out = struct_out
440            .column_by_name("str_view")
441            .unwrap()
442            .as_string_view();
443        assert_eq!(str_out.value(0), "s1");
444
445        let variant_out = struct_out.column_by_name("variant").unwrap().as_struct();
446        let meta_out = variant_out
447            .column_by_name("metadata")
448            .unwrap()
449            .as_binary_view();
450        assert_eq!(meta_out.value(0), b"meta1");
451
452        let val_out = variant_out
453            .column_by_name("value")
454            .unwrap()
455            .as_binary_view();
456        assert_eq!(val_out.value(0), b"value1");
457
458        let typed_out = variant_out
459            .column_by_name("typed_value")
460            .unwrap()
461            .as_struct();
462        let typed_str_out = typed_out
463            .column_by_name("typed_str")
464            .unwrap()
465            .as_string_view();
466        assert_eq!(typed_str_out.value(0), "s1");
467    }
468}