1use std::future::{Future, IntoFuture};
2use std::pin::Pin;
3
4use arrow::array::{
5 Array, ArrayData, ArrayRef, BinaryViewArray, BooleanArray, StringViewArray, make_array,
6};
7use arrow::buffer::BooleanBuffer;
8
9use super::cached_batch::CacheEntry;
10use super::core::LiquidCache;
11use super::io_context::{DefaultCacheMetadata, EntryMetadata};
12use super::policies::{CachePolicy, HydrationPolicy, SqueezePolicy, TranscodeSqueezeEvict};
13use super::{CacheExpression, CacheFull, EntryID, LiquidExpr, LiquidPolicy};
14use crate::sync::Arc;
15
16pub struct LiquidCacheBuilder {
33 batch_size: usize,
34 max_memory_bytes: usize,
35 max_disk_bytes: usize,
36 cache_policy: Box<dyn CachePolicy>,
37 hydration_policy: Box<dyn HydrationPolicy>,
38 squeeze_policy: Box<dyn SqueezePolicy>,
39 metadata: Option<Arc<dyn EntryMetadata>>,
40 store: Option<t4::Store>,
41 squeeze_victims_concurrently: bool,
42}
43
44impl Default for LiquidCacheBuilder {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl LiquidCacheBuilder {
51 pub fn new() -> Self {
53 Self {
54 batch_size: 8192,
55 max_memory_bytes: 1024 * 1024 * 1024,
56 max_disk_bytes: usize::MAX,
57 cache_policy: Box::new(LiquidPolicy::new()),
58 hydration_policy: Box::new(super::AlwaysHydrate::new()),
59 squeeze_policy: Box::new(TranscodeSqueezeEvict),
60 metadata: None,
61 store: None,
62 squeeze_victims_concurrently: !cfg!(test),
63 }
64 }
65
66 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
69 self.batch_size = batch_size;
70 self
71 }
72
73 pub fn with_max_memory_bytes(mut self, max_memory_bytes: usize) -> Self {
76 self.max_memory_bytes = max_memory_bytes;
77 self
78 }
79
80 pub fn with_max_disk_bytes(mut self, max_disk_bytes: usize) -> Self {
83 self.max_disk_bytes = max_disk_bytes;
84 self
85 }
86
87 pub fn with_cache_policy(mut self, policy: Box<dyn CachePolicy>) -> Self {
90 self.cache_policy = policy;
91 self
92 }
93
94 pub fn with_hydration_policy(mut self, policy: Box<dyn HydrationPolicy>) -> Self {
97 self.hydration_policy = policy;
98 self
99 }
100
101 pub fn with_squeeze_policy(mut self, policy: Box<dyn SqueezePolicy>) -> Self {
104 self.squeeze_policy = policy;
105 self
106 }
107
108 pub fn with_metadata(mut self, metadata: Arc<dyn EntryMetadata>) -> Self {
111 self.metadata = Some(metadata);
112 self
113 }
114
115 pub fn with_store(mut self, store: t4::Store) -> Self {
118 self.store = Some(store);
119 self
120 }
121
122 pub fn with_squeeze_victims_concurrently(mut self, enabled: bool) -> Self {
124 self.squeeze_victims_concurrently = enabled;
125 self
126 }
127
128 pub async fn build(self) -> Arc<LiquidCache> {
133 let store = match self.store {
134 Some(store) => store,
135 None => {
136 let cache_dir = tempfile::tempdir().unwrap().keep();
137 let store_path = cache_dir.join("liquid_cache.t4");
138 t4::mount(&store_path)
139 .await
140 .expect("failed to mount t4 store")
141 }
142 };
143 let metadata = self
144 .metadata
145 .unwrap_or_else(|| Arc::new(DefaultCacheMetadata::new()));
146 Arc::new(LiquidCache::new(
147 self.batch_size,
148 self.max_memory_bytes,
149 self.max_disk_bytes,
150 self.squeeze_policy,
151 self.cache_policy,
152 self.hydration_policy,
153 metadata,
154 store,
155 self.squeeze_victims_concurrently,
156 ))
157 }
158}
159
160#[derive(Debug)]
162pub struct Insert<'a> {
163 pub(super) storage: &'a Arc<LiquidCache>,
164 pub(super) entry_id: EntryID,
165 pub(super) batch: ArrayRef,
166 pub(super) skip_gc: bool,
167 pub(super) squeeze_hint: Option<Arc<CacheExpression>>,
168}
169
170impl<'a> Insert<'a> {
171 pub(super) fn new(storage: &'a Arc<LiquidCache>, entry_id: EntryID, batch: ArrayRef) -> Self {
172 Self {
173 storage,
174 entry_id,
175 batch,
176 skip_gc: false,
177 squeeze_hint: None,
178 }
179 }
180
181 pub fn with_skip_gc(mut self) -> Self {
183 self.skip_gc = true;
184 self
185 }
186
187 pub fn with_squeeze_hint(mut self, expression: Arc<CacheExpression>) -> Self {
189 self.squeeze_hint = Some(expression);
190 self
191 }
192
193 async fn run(self) -> Result<(), CacheFull> {
194 let batch = if self.skip_gc {
195 self.batch.clone()
196 } else {
197 maybe_gc_view_arrays(&self.batch).unwrap_or_else(|| self.batch.clone())
198 };
199 if let Some(squeeze_hint) = self.squeeze_hint {
200 self.storage.add_squeeze_hint(&self.entry_id, squeeze_hint);
201 }
202 let batch = CacheEntry::memory_arrow(batch);
203 self.storage.insert_inner(self.entry_id, batch).await
204 }
205}
206
207impl<'a> IntoFuture for Insert<'a> {
208 type Output = Result<(), CacheFull>;
209 type IntoFuture = Pin<Box<dyn Future<Output = Result<(), CacheFull>> + Send + 'a>>;
210
211 fn into_future(self) -> Self::IntoFuture {
212 Box::pin(async move { self.run().await })
213 }
214}
215
216#[derive(Debug)]
218pub struct Get<'a> {
219 pub(super) storage: &'a LiquidCache,
220 pub(super) entry_id: &'a EntryID,
221 pub(super) selection: Option<&'a BooleanBuffer>,
222 pub(super) expression_hint: Option<Arc<CacheExpression>>,
223}
224
225impl<'a> Get<'a> {
226 pub(super) fn new(storage: &'a LiquidCache, entry_id: &'a EntryID) -> Self {
227 Self {
228 storage,
229 entry_id,
230 selection: None,
231 expression_hint: None,
232 }
233 }
234
235 pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
237 self.selection = Some(selection);
238 self
239 }
240
241 pub fn with_expression_hint(mut self, expression: Arc<CacheExpression>) -> Self {
243 self.expression_hint = Some(expression);
244 self
245 }
246
247 pub fn with_optional_expression_hint(
249 mut self,
250 expression: Option<Arc<CacheExpression>>,
251 ) -> Self {
252 self.expression_hint = expression;
253 self
254 }
255
256 pub async fn read(self) -> Option<ArrayRef> {
258 self.storage.observer().on_get(self.selection.is_some());
259 self.storage
260 .read_arrow_array(
261 self.entry_id,
262 self.selection,
263 self.expression_hint.as_deref(),
264 )
265 .await
266 }
267}
268
269impl<'a> IntoFuture for Get<'a> {
270 type Output = Option<ArrayRef>;
271 type IntoFuture = Pin<Box<dyn std::future::Future<Output = Option<ArrayRef>> + Send + 'a>>;
272
273 fn into_future(self) -> Self::IntoFuture {
274 Box::pin(async move { self.read().await })
275 }
276}
277
278fn maybe_gc_view_arrays(array: &ArrayRef) -> Option<ArrayRef> {
280 if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
281 return Some(Arc::new(binary_view.gc()));
282 }
283 if let Some(utf8_view) = array.as_any().downcast_ref::<StringViewArray>() {
284 return Some(Arc::new(utf8_view.gc()));
285 }
286
287 let data = array.to_data();
288 if data.child_data().is_empty() {
289 return None;
290 }
291
292 let mut changed = false;
293 let mut children: Vec<ArrayData> = Vec::with_capacity(data.child_data().len());
294 for child in data.child_data() {
295 let child_array = make_array(child.clone());
296 if let Some(gc_child) = maybe_gc_view_arrays(&child_array) {
297 changed = true;
298 children.push(gc_child.to_data());
299 } else {
300 children.push(child.clone());
301 }
302 }
303
304 if !changed {
305 return None;
306 }
307
308 let new_data = data.into_builder().child_data(children).build().ok()?;
309 Some(make_array(new_data))
310}
311
312#[derive(Debug)]
314pub struct EvaluatePredicate<'a> {
315 pub(super) storage: &'a LiquidCache,
316 pub(super) entry_id: &'a EntryID,
317 pub(super) predicate: &'a LiquidExpr,
318 pub(super) selection: Option<&'a BooleanBuffer>,
319}
320
321impl<'a> EvaluatePredicate<'a> {
322 pub(super) fn new(
323 storage: &'a LiquidCache,
324 entry_id: &'a EntryID,
325 predicate: &'a LiquidExpr,
326 ) -> Self {
327 Self {
328 storage,
329 entry_id,
330 predicate,
331 selection: None,
332 }
333 }
334
335 pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
337 self.selection = Some(selection);
338 self
339 }
340
341 pub async fn read(self) -> Option<BooleanArray> {
343 self.storage
344 .eval_predicate_internal(self.entry_id, self.selection, self.predicate)
345 .await
346 }
347}
348
349impl<'a> IntoFuture for EvaluatePredicate<'a> {
350 type Output = Option<BooleanArray>;
351 type IntoFuture = Pin<Box<dyn std::future::Future<Output = Option<BooleanArray>> + Send + 'a>>;
352
353 fn into_future(self) -> Self::IntoFuture {
354 Box::pin(async move { self.read().await })
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use super::*;
361 use arrow::array::{AsArray, StructArray};
362 use arrow_schema::{DataType, Field, Fields};
363
364 #[tokio::test]
365 async fn insert_gcs_view_arrays_recursively() {
366 let bin = Arc::new(BinaryViewArray::from(vec![
368 Some(b"long_prefix_m0" as &[u8]),
369 Some(b"m1"),
370 ])) as ArrayRef;
371 let str_view = Arc::new(StringViewArray::from(vec![
372 Some("long_prefix_s0"),
373 Some("s1"),
374 ])) as ArrayRef;
375 let variant_metadata = Arc::new(BinaryViewArray::from(vec![
376 Some(b"meta0" as &[u8]),
377 Some(b"meta1"),
378 ])) as ArrayRef;
379 let variant_value = Arc::new(BinaryViewArray::from(vec![
380 Some(b"value0" as &[u8]),
381 Some(b"value1"),
382 ])) as ArrayRef;
383
384 let bin_slice = bin.slice(1, 1);
386 let str_slice = str_view.slice(1, 1);
387 let variant_metadata_slice = variant_metadata.slice(1, 1);
388 let variant_value_slice = variant_value.slice(1, 1);
389
390 let variant_typed_fields = Fields::from(vec![Arc::new(Field::new(
392 "typed_str",
393 DataType::Utf8View,
394 true,
395 ))]);
396 let variant_struct_fields = Fields::from(vec![
397 Arc::new(Field::new("metadata", DataType::BinaryView, true)),
398 Arc::new(Field::new("value", DataType::BinaryView, true)),
399 Arc::new(Field::new(
400 "typed_value",
401 DataType::Struct(variant_typed_fields.clone()),
402 true,
403 )),
404 ]);
405 let variant_struct = Arc::new(StructArray::new(
406 variant_struct_fields.clone(),
407 vec![
408 variant_metadata_slice.clone(),
409 variant_value_slice.clone(),
410 Arc::new(StructArray::new(
411 variant_typed_fields.clone(),
412 vec![str_slice.clone()],
413 None,
414 )) as ArrayRef,
415 ],
416 None,
417 ));
418
419 let root_fields = Fields::from(vec![
420 Arc::new(Field::new("bin_view", DataType::BinaryView, true)),
421 Arc::new(Field::new("str_view", DataType::Utf8View, true)),
422 Arc::new(Field::new(
423 "variant",
424 DataType::Struct(variant_struct_fields.clone()),
425 true,
426 )),
427 ]);
428 let root = Arc::new(StructArray::new(
429 root_fields,
430 vec![
431 bin_slice.clone(),
432 str_slice.clone(),
433 variant_struct.clone() as ArrayRef,
434 ],
435 None,
436 )) as ArrayRef;
437
438 let pre_size = root.get_array_memory_size();
439
440 let cache = LiquidCacheBuilder::new().build().await;
441 let entry_id = EntryID::from(123usize);
442 cache.insert(entry_id, root.clone()).await.unwrap();
443
444 let stored = cache.get(&entry_id).await.expect("array present");
445 let post_size = stored.get_array_memory_size();
446
447 assert!(post_size < pre_size, "expected gc to reduce memory usage");
449
450 let struct_out = stored
452 .as_any()
453 .downcast_ref::<StructArray>()
454 .expect("struct array");
455
456 assert_eq!(struct_out.len(), 1);
457
458 let bin_out = struct_out
459 .column_by_name("bin_view")
460 .unwrap()
461 .as_binary_view();
462 assert_eq!(bin_out.value(0), b"m1");
463
464 let str_out = struct_out
465 .column_by_name("str_view")
466 .unwrap()
467 .as_string_view();
468 assert_eq!(str_out.value(0), "s1");
469
470 let variant_out = struct_out.column_by_name("variant").unwrap().as_struct();
471 let meta_out = variant_out
472 .column_by_name("metadata")
473 .unwrap()
474 .as_binary_view();
475 assert_eq!(meta_out.value(0), b"meta1");
476
477 let val_out = variant_out
478 .column_by_name("value")
479 .unwrap()
480 .as_binary_view();
481 assert_eq!(val_out.value(0), b"value1");
482
483 let typed_out = variant_out
484 .column_by_name("typed_value")
485 .unwrap()
486 .as_struct();
487 let typed_str_out = typed_out
488 .column_by_name("typed_str")
489 .unwrap()
490 .as_string_view();
491 assert_eq!(typed_str_out.value(0), "s1");
492 }
493}