Skip to main content

liquid_cache/cache/
builders.rs

1use std::future::{Future, IntoFuture};
2use std::pin::Pin;
3
4use arrow::array::{
5    Array, ArrayData, ArrayRef, BinaryViewArray, BooleanArray, StringViewArray, make_array,
6};
7use arrow::buffer::BooleanBuffer;
8
9use super::cached_batch::CacheEntry;
10use super::core::LiquidCache;
11use super::io_context::{DefaultCacheMetadata, EntryMetadata};
12use super::policies::{CachePolicy, HydrationPolicy, SqueezePolicy, TranscodeSqueezeEvict};
13use super::{CacheExpression, CacheFull, EntryID, LiquidExpr, LiquidPolicy};
14use crate::sync::Arc;
15
16/// Builder for [LiquidCache].
17///
18/// Example:
19/// ```rust
20/// use liquid_cache::cache::LiquidCacheBuilder;
21/// use liquid_cache::cache_policies::LiquidPolicy;
22///
23/// tokio_test::block_on(async {
24///     let _storage = LiquidCacheBuilder::new()
25///         .with_batch_size(8192)
26///         .with_max_memory_bytes(1024 * 1024 * 1024)
27///         .with_cache_policy(Box::new(LiquidPolicy::new()))
28///         .build()
29///         .await;
30/// });
31/// ```
32pub struct LiquidCacheBuilder {
33    batch_size: usize,
34    max_memory_bytes: usize,
35    max_disk_bytes: usize,
36    cache_policy: Box<dyn CachePolicy>,
37    hydration_policy: Box<dyn HydrationPolicy>,
38    squeeze_policy: Box<dyn SqueezePolicy>,
39    metadata: Option<Arc<dyn EntryMetadata>>,
40    store: Option<t4::Store>,
41    squeeze_victims_concurrently: bool,
42}
43
44impl Default for LiquidCacheBuilder {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl LiquidCacheBuilder {
51    /// Create a new instance of [LiquidCacheBuilder].
52    pub fn new() -> Self {
53        Self {
54            batch_size: 8192,
55            max_memory_bytes: 1024 * 1024 * 1024,
56            max_disk_bytes: usize::MAX,
57            cache_policy: Box::new(LiquidPolicy::new()),
58            hydration_policy: Box::new(super::AlwaysHydrate::new()),
59            squeeze_policy: Box::new(TranscodeSqueezeEvict),
60            metadata: None,
61            store: None,
62            squeeze_victims_concurrently: !cfg!(test),
63        }
64    }
65
66    /// Set the batch size for the cache.
67    /// Default is 8192.
68    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
69        self.batch_size = batch_size;
70        self
71    }
72
73    /// Set the max memory bytes for the cache.
74    /// Default is 1GB.
75    pub fn with_max_memory_bytes(mut self, max_memory_bytes: usize) -> Self {
76        self.max_memory_bytes = max_memory_bytes;
77        self
78    }
79
80    /// Set the max disk bytes for the cache.
81    /// Default is unlimited.
82    pub fn with_max_disk_bytes(mut self, max_disk_bytes: usize) -> Self {
83        self.max_disk_bytes = max_disk_bytes;
84        self
85    }
86
87    /// Set the cache policy for the cache.
88    /// Default is [LiquidPolicy].
89    pub fn with_cache_policy(mut self, policy: Box<dyn CachePolicy>) -> Self {
90        self.cache_policy = policy;
91        self
92    }
93
94    /// Set the hydration policy for the cache.
95    /// Default is [crate::cache::NoHydration].
96    pub fn with_hydration_policy(mut self, policy: Box<dyn HydrationPolicy>) -> Self {
97        self.hydration_policy = policy;
98        self
99    }
100
101    /// Set the squeeze policy for the cache.
102    /// Default is [TranscodeSqueezeEvict].
103    pub fn with_squeeze_policy(mut self, policy: Box<dyn SqueezePolicy>) -> Self {
104        self.squeeze_policy = policy;
105        self
106    }
107
108    /// Set the [EntryMetadata] for the cache.
109    /// Default is [DefaultCacheMetadata].
110    pub fn with_metadata(mut self, metadata: Arc<dyn EntryMetadata>) -> Self {
111        self.metadata = Some(metadata);
112        self
113    }
114
115    /// Set the [`t4::Store`] used for on-disk IO.
116    /// If not provided, the builder mounts a fresh store at a temporary directory.
117    pub fn with_store(mut self, store: t4::Store) -> Self {
118        self.store = Some(store);
119        self
120    }
121
122    /// Set whether cache victims are squeezed concurrently.
123    pub fn with_squeeze_victims_concurrently(mut self, enabled: bool) -> Self {
124        self.squeeze_victims_concurrently = enabled;
125        self
126    }
127
128    /// Build the cache storage.
129    ///
130    /// The cache storage is wrapped in an [Arc] to allow for concurrent access.
131    /// When no [`t4::Store`] is provided, one is mounted at a temporary directory.
132    pub async fn build(self) -> Arc<LiquidCache> {
133        let store = match self.store {
134            Some(store) => store,
135            None => {
136                let cache_dir = tempfile::tempdir().unwrap().keep();
137                let store_path = cache_dir.join("liquid_cache.t4");
138                t4::mount(&store_path)
139                    .await
140                    .expect("failed to mount t4 store")
141            }
142        };
143        let metadata = self
144            .metadata
145            .unwrap_or_else(|| Arc::new(DefaultCacheMetadata::new()));
146        Arc::new(LiquidCache::new(
147            self.batch_size,
148            self.max_memory_bytes,
149            self.max_disk_bytes,
150            self.squeeze_policy,
151            self.cache_policy,
152            self.hydration_policy,
153            metadata,
154            store,
155            self.squeeze_victims_concurrently,
156        ))
157    }
158}
159
160/// Builder returned by [`LiquidCache::insert`] for configuring cache writes.
161#[derive(Debug)]
162pub struct Insert<'a> {
163    pub(super) storage: &'a Arc<LiquidCache>,
164    pub(super) entry_id: EntryID,
165    pub(super) batch: ArrayRef,
166    pub(super) skip_gc: bool,
167    pub(super) squeeze_hint: Option<Arc<CacheExpression>>,
168}
169
170impl<'a> Insert<'a> {
171    pub(super) fn new(storage: &'a Arc<LiquidCache>, entry_id: EntryID, batch: ArrayRef) -> Self {
172        Self {
173            storage,
174            entry_id,
175            batch,
176            skip_gc: false,
177            squeeze_hint: None,
178        }
179    }
180
181    /// Skip garbage collection of view arrays.
182    pub fn with_skip_gc(mut self) -> Self {
183        self.skip_gc = true;
184        self
185    }
186
187    /// Set a squeeze hint for the entry.
188    pub fn with_squeeze_hint(mut self, expression: Arc<CacheExpression>) -> Self {
189        self.squeeze_hint = Some(expression);
190        self
191    }
192
193    async fn run(self) -> Result<(), CacheFull> {
194        let batch = if self.skip_gc {
195            self.batch.clone()
196        } else {
197            maybe_gc_view_arrays(&self.batch).unwrap_or_else(|| self.batch.clone())
198        };
199        if let Some(squeeze_hint) = self.squeeze_hint {
200            self.storage.add_squeeze_hint(&self.entry_id, squeeze_hint);
201        }
202        let batch = CacheEntry::memory_arrow(batch);
203        self.storage.insert_inner(self.entry_id, batch).await
204    }
205}
206
207impl<'a> IntoFuture for Insert<'a> {
208    type Output = Result<(), CacheFull>;
209    type IntoFuture = Pin<Box<dyn Future<Output = Result<(), CacheFull>> + Send + 'a>>;
210
211    fn into_future(self) -> Self::IntoFuture {
212        Box::pin(async move { self.run().await })
213    }
214}
215
216/// Builder returned by [`LiquidCache::get`] for configuring cache reads.
217#[derive(Debug)]
218pub struct Get<'a> {
219    pub(super) storage: &'a LiquidCache,
220    pub(super) entry_id: &'a EntryID,
221    pub(super) selection: Option<&'a BooleanBuffer>,
222    pub(super) expression_hint: Option<Arc<CacheExpression>>,
223}
224
225impl<'a> Get<'a> {
226    pub(super) fn new(storage: &'a LiquidCache, entry_id: &'a EntryID) -> Self {
227        Self {
228            storage,
229            entry_id,
230            selection: None,
231            expression_hint: None,
232        }
233    }
234
235    /// Attach a selection bitmap used to filter rows prior to materialization.
236    pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
237        self.selection = Some(selection);
238        self
239    }
240
241    /// Attach an expression hint that may help optimize cache materialization.
242    pub fn with_expression_hint(mut self, expression: Arc<CacheExpression>) -> Self {
243        self.expression_hint = Some(expression);
244        self
245    }
246
247    /// Attach an optional expression hint.
248    pub fn with_optional_expression_hint(
249        mut self,
250        expression: Option<Arc<CacheExpression>>,
251    ) -> Self {
252        self.expression_hint = expression;
253        self
254    }
255
256    /// Materialize the cached array as [`ArrayRef`].
257    pub async fn read(self) -> Option<ArrayRef> {
258        self.storage.observer().on_get(self.selection.is_some());
259        self.storage
260            .read_arrow_array(
261                self.entry_id,
262                self.selection,
263                self.expression_hint.as_deref(),
264            )
265            .await
266    }
267}
268
269impl<'a> IntoFuture for Get<'a> {
270    type Output = Option<ArrayRef>;
271    type IntoFuture = Pin<Box<dyn std::future::Future<Output = Option<ArrayRef>> + Send + 'a>>;
272
273    fn into_future(self) -> Self::IntoFuture {
274        Box::pin(async move { self.read().await })
275    }
276}
277
278/// Recursively garbage collects view arrays (BinaryView/StringView) within an array tree.
279fn maybe_gc_view_arrays(array: &ArrayRef) -> Option<ArrayRef> {
280    if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
281        return Some(Arc::new(binary_view.gc()));
282    }
283    if let Some(utf8_view) = array.as_any().downcast_ref::<StringViewArray>() {
284        return Some(Arc::new(utf8_view.gc()));
285    }
286
287    let data = array.to_data();
288    if data.child_data().is_empty() {
289        return None;
290    }
291
292    let mut changed = false;
293    let mut children: Vec<ArrayData> = Vec::with_capacity(data.child_data().len());
294    for child in data.child_data() {
295        let child_array = make_array(child.clone());
296        if let Some(gc_child) = maybe_gc_view_arrays(&child_array) {
297            changed = true;
298            children.push(gc_child.to_data());
299        } else {
300            children.push(child.clone());
301        }
302    }
303
304    if !changed {
305        return None;
306    }
307
308    let new_data = data.into_builder().child_data(children).build().ok()?;
309    Some(make_array(new_data))
310}
311
312/// Builder for predicate evaluation on cached data.
313#[derive(Debug)]
314pub struct EvaluatePredicate<'a> {
315    pub(super) storage: &'a LiquidCache,
316    pub(super) entry_id: &'a EntryID,
317    pub(super) predicate: &'a LiquidExpr,
318    pub(super) selection: Option<&'a BooleanBuffer>,
319}
320
321impl<'a> EvaluatePredicate<'a> {
322    pub(super) fn new(
323        storage: &'a LiquidCache,
324        entry_id: &'a EntryID,
325        predicate: &'a LiquidExpr,
326    ) -> Self {
327        Self {
328            storage,
329            entry_id,
330            predicate,
331            selection: None,
332        }
333    }
334
335    /// Attach a selection bitmap used to pre-filter rows before predicate evaluation.
336    pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
337        self.selection = Some(selection);
338        self
339    }
340
341    /// Evaluate the predicate against the cached data.
342    pub async fn read(self) -> Option<BooleanArray> {
343        self.storage
344            .eval_predicate_internal(self.entry_id, self.selection, self.predicate)
345            .await
346    }
347}
348
349impl<'a> IntoFuture for EvaluatePredicate<'a> {
350    type Output = Option<BooleanArray>;
351    type IntoFuture = Pin<Box<dyn std::future::Future<Output = Option<BooleanArray>> + Send + 'a>>;
352
353    fn into_future(self) -> Self::IntoFuture {
354        Box::pin(async move { self.read().await })
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use super::*;
361    use arrow::array::{AsArray, StructArray};
362    use arrow_schema::{DataType, Field, Fields};
363
364    #[tokio::test]
365    async fn insert_gcs_view_arrays_recursively() {
366        // Build view arrays then slice to create non-zero offsets (and larger backing buffers).
367        let bin = Arc::new(BinaryViewArray::from(vec![
368            Some(b"long_prefix_m0" as &[u8]),
369            Some(b"m1"),
370        ])) as ArrayRef;
371        let str_view = Arc::new(StringViewArray::from(vec![
372            Some("long_prefix_s0"),
373            Some("s1"),
374        ])) as ArrayRef;
375        let variant_metadata = Arc::new(BinaryViewArray::from(vec![
376            Some(b"meta0" as &[u8]),
377            Some(b"meta1"),
378        ])) as ArrayRef;
379        let variant_value = Arc::new(BinaryViewArray::from(vec![
380            Some(b"value0" as &[u8]),
381            Some(b"value1"),
382        ])) as ArrayRef;
383
384        // Slice to keep only the second element so buffers still reference unused bytes.
385        let bin_slice = bin.slice(1, 1);
386        let str_slice = str_view.slice(1, 1);
387        let variant_metadata_slice = variant_metadata.slice(1, 1);
388        let variant_value_slice = variant_value.slice(1, 1);
389
390        // Variant-like struct: metadata (BinaryView), value (BinaryView), and a typed string view.
391        let variant_typed_fields = Fields::from(vec![Arc::new(Field::new(
392            "typed_str",
393            DataType::Utf8View,
394            true,
395        ))]);
396        let variant_struct_fields = Fields::from(vec![
397            Arc::new(Field::new("metadata", DataType::BinaryView, true)),
398            Arc::new(Field::new("value", DataType::BinaryView, true)),
399            Arc::new(Field::new(
400                "typed_value",
401                DataType::Struct(variant_typed_fields.clone()),
402                true,
403            )),
404        ]);
405        let variant_struct = Arc::new(StructArray::new(
406            variant_struct_fields.clone(),
407            vec![
408                variant_metadata_slice.clone(),
409                variant_value_slice.clone(),
410                Arc::new(StructArray::new(
411                    variant_typed_fields.clone(),
412                    vec![str_slice.clone()],
413                    None,
414                )) as ArrayRef,
415            ],
416            None,
417        ));
418
419        let root_fields = Fields::from(vec![
420            Arc::new(Field::new("bin_view", DataType::BinaryView, true)),
421            Arc::new(Field::new("str_view", DataType::Utf8View, true)),
422            Arc::new(Field::new(
423                "variant",
424                DataType::Struct(variant_struct_fields.clone()),
425                true,
426            )),
427        ]);
428        let root = Arc::new(StructArray::new(
429            root_fields,
430            vec![
431                bin_slice.clone(),
432                str_slice.clone(),
433                variant_struct.clone() as ArrayRef,
434            ],
435            None,
436        )) as ArrayRef;
437
438        let pre_size = root.get_array_memory_size();
439
440        let cache = LiquidCacheBuilder::new().build().await;
441        let entry_id = EntryID::from(123usize);
442        cache.insert(entry_id, root.clone()).await.unwrap();
443
444        let stored = cache.get(&entry_id).await.expect("array present");
445        let post_size = stored.get_array_memory_size();
446
447        // GC should have compacted the view arrays, reducing memory footprint.
448        assert!(post_size < pre_size, "expected gc to reduce memory usage");
449
450        // Validate values are preserved.
451        let struct_out = stored
452            .as_any()
453            .downcast_ref::<StructArray>()
454            .expect("struct array");
455
456        assert_eq!(struct_out.len(), 1);
457
458        let bin_out = struct_out
459            .column_by_name("bin_view")
460            .unwrap()
461            .as_binary_view();
462        assert_eq!(bin_out.value(0), b"m1");
463
464        let str_out = struct_out
465            .column_by_name("str_view")
466            .unwrap()
467            .as_string_view();
468        assert_eq!(str_out.value(0), "s1");
469
470        let variant_out = struct_out.column_by_name("variant").unwrap().as_struct();
471        let meta_out = variant_out
472            .column_by_name("metadata")
473            .unwrap()
474            .as_binary_view();
475        assert_eq!(meta_out.value(0), b"meta1");
476
477        let val_out = variant_out
478            .column_by_name("value")
479            .unwrap()
480            .as_binary_view();
481        assert_eq!(val_out.value(0), b"value1");
482
483        let typed_out = variant_out
484            .column_by_name("typed_value")
485            .unwrap()
486            .as_struct();
487        let typed_str_out = typed_out
488            .column_by_name("typed_str")
489            .unwrap()
490            .as_string_view();
491        assert_eq!(typed_str_out.value(0), "s1");
492    }
493}