1use std::future::{Future, IntoFuture};
2use std::path::PathBuf;
3use std::pin::Pin;
4
5use arrow::array::{
6 Array, ArrayData, ArrayRef, BinaryViewArray, BooleanArray, StringViewArray, make_array,
7};
8use arrow::buffer::BooleanBuffer;
9use datafusion::physical_plan::PhysicalExpr;
10
11use super::cached_batch::CacheEntry;
12use super::core::LiquidCache;
13use super::io_context::{DefaultIoContext, IoContext};
14use super::policies::{CachePolicy, HydrationPolicy, SqueezePolicy, TranscodeSqueezeEvict};
15use super::{CacheExpression, EntryID, LiquidPolicy};
16use crate::sync::Arc;
17
18pub struct LiquidCacheBuilder {
33 batch_size: usize,
34 max_cache_bytes: usize,
35 cache_dir: Option<PathBuf>,
36 cache_policy: Box<dyn CachePolicy>,
37 hydration_policy: Box<dyn HydrationPolicy>,
38 squeeze_policy: Box<dyn SqueezePolicy>,
39 io_context: Option<Arc<dyn IoContext>>,
40}
41
42impl Default for LiquidCacheBuilder {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl LiquidCacheBuilder {
49 pub fn new() -> Self {
51 Self {
52 batch_size: 8192,
53 max_cache_bytes: 1024 * 1024 * 1024,
54 cache_dir: None,
55 cache_policy: Box::new(LiquidPolicy::new()),
56 hydration_policy: Box::new(super::AlwaysHydrate::new()),
57 squeeze_policy: Box::new(TranscodeSqueezeEvict),
58 io_context: None,
59 }
60 }
61
62 pub fn with_cache_dir(mut self, cache_dir: PathBuf) -> Self {
65 self.cache_dir = Some(cache_dir);
66 self
67 }
68
69 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
72 self.batch_size = batch_size;
73 self
74 }
75
76 pub fn with_max_cache_bytes(mut self, max_cache_bytes: usize) -> Self {
79 self.max_cache_bytes = max_cache_bytes;
80 self
81 }
82
83 pub fn with_cache_policy(mut self, policy: Box<dyn CachePolicy>) -> Self {
86 self.cache_policy = policy;
87 self
88 }
89
90 pub fn with_hydration_policy(mut self, policy: Box<dyn HydrationPolicy>) -> Self {
93 self.hydration_policy = policy;
94 self
95 }
96
97 pub fn with_squeeze_policy(mut self, policy: Box<dyn SqueezePolicy>) -> Self {
100 self.squeeze_policy = policy;
101 self
102 }
103
104 pub fn with_io_context(mut self, io_context: Arc<dyn IoContext>) -> Self {
107 self.io_context = Some(io_context);
108 self
109 }
110
111 pub fn build(self) -> Arc<LiquidCache> {
115 let cache_dir = self
116 .cache_dir
117 .unwrap_or_else(|| tempfile::tempdir().unwrap().keep());
118 let io_worker = self
119 .io_context
120 .unwrap_or_else(|| Arc::new(DefaultIoContext::new(cache_dir.clone())));
121 Arc::new(LiquidCache::new(
122 self.batch_size,
123 self.max_cache_bytes,
124 cache_dir,
125 self.squeeze_policy,
126 self.cache_policy,
127 self.hydration_policy,
128 io_worker,
129 ))
130 }
131}
132
133#[derive(Debug)]
135pub struct Insert<'a> {
136 pub(super) storage: &'a Arc<LiquidCache>,
137 pub(super) entry_id: EntryID,
138 pub(super) batch: ArrayRef,
139 pub(super) skip_gc: bool,
140 pub(super) squeeze_hint: Option<Arc<CacheExpression>>,
141}
142
143impl<'a> Insert<'a> {
144 pub(super) fn new(storage: &'a Arc<LiquidCache>, entry_id: EntryID, batch: ArrayRef) -> Self {
145 Self {
146 storage,
147 entry_id,
148 batch,
149 skip_gc: false,
150 squeeze_hint: None,
151 }
152 }
153
154 pub fn with_skip_gc(mut self) -> Self {
156 self.skip_gc = true;
157 self
158 }
159
160 pub fn with_squeeze_hint(mut self, expression: Arc<CacheExpression>) -> Self {
162 self.squeeze_hint = Some(expression);
163 self
164 }
165
166 async fn run(self) {
167 let batch = if self.skip_gc {
168 self.batch.clone()
169 } else {
170 maybe_gc_view_arrays(&self.batch).unwrap_or_else(|| self.batch.clone())
171 };
172 if let Some(squeeze_hint) = self.squeeze_hint {
173 self.storage.add_squeeze_hint(&self.entry_id, squeeze_hint);
174 }
175 let batch = CacheEntry::memory_arrow(batch);
176 self.storage.insert_inner(self.entry_id, batch).await;
177 }
178}
179
180impl<'a> IntoFuture for Insert<'a> {
181 type Output = ();
182 type IntoFuture = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
183
184 fn into_future(self) -> Self::IntoFuture {
185 Box::pin(async move { self.run().await })
186 }
187}
188
189#[derive(Debug)]
191pub struct Get<'a> {
192 pub(super) storage: &'a LiquidCache,
193 pub(super) entry_id: &'a EntryID,
194 pub(super) selection: Option<&'a BooleanBuffer>,
195 pub(super) expression_hint: Option<Arc<CacheExpression>>,
196}
197
198impl<'a> Get<'a> {
199 pub(super) fn new(storage: &'a LiquidCache, entry_id: &'a EntryID) -> Self {
200 Self {
201 storage,
202 entry_id,
203 selection: None,
204 expression_hint: None,
205 }
206 }
207
208 pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
210 self.selection = Some(selection);
211 self
212 }
213
214 pub fn with_expression_hint(mut self, expression: Arc<CacheExpression>) -> Self {
216 self.expression_hint = Some(expression);
217 self
218 }
219
220 pub fn with_optional_expression_hint(
222 mut self,
223 expression: Option<Arc<CacheExpression>>,
224 ) -> Self {
225 self.expression_hint = expression;
226 self
227 }
228
229 pub async fn read(self) -> Option<ArrayRef> {
231 self.storage.observer().on_get(self.selection.is_some());
232 self.storage
233 .read_arrow_array(
234 self.entry_id,
235 self.selection,
236 self.expression_hint.as_deref(),
237 )
238 .await
239 }
240}
241
242impl<'a> IntoFuture for Get<'a> {
243 type Output = Option<ArrayRef>;
244 type IntoFuture = Pin<Box<dyn std::future::Future<Output = Option<ArrayRef>> + Send + 'a>>;
245
246 fn into_future(self) -> Self::IntoFuture {
247 Box::pin(async move { self.read().await })
248 }
249}
250
251fn maybe_gc_view_arrays(array: &ArrayRef) -> Option<ArrayRef> {
253 if let Some(binary_view) = array.as_any().downcast_ref::<BinaryViewArray>() {
254 return Some(Arc::new(binary_view.gc()));
255 }
256 if let Some(utf8_view) = array.as_any().downcast_ref::<StringViewArray>() {
257 return Some(Arc::new(utf8_view.gc()));
258 }
259
260 let data = array.to_data();
261 if data.child_data().is_empty() {
262 return None;
263 }
264
265 let mut changed = false;
266 let mut children: Vec<ArrayData> = Vec::with_capacity(data.child_data().len());
267 for child in data.child_data() {
268 let child_array = make_array(child.clone());
269 if let Some(gc_child) = maybe_gc_view_arrays(&child_array) {
270 changed = true;
271 children.push(gc_child.to_data());
272 } else {
273 children.push(child.clone());
274 }
275 }
276
277 if !changed {
278 return None;
279 }
280
281 let new_data = data.into_builder().child_data(children).build().ok()?;
282 Some(make_array(new_data))
283}
284
285#[derive(Debug)]
287pub struct EvaluatePredicate<'a> {
288 pub(super) storage: &'a LiquidCache,
289 pub(super) entry_id: &'a EntryID,
290 pub(super) predicate: &'a Arc<dyn PhysicalExpr>,
291 pub(super) selection: Option<&'a BooleanBuffer>,
292}
293
294impl<'a> EvaluatePredicate<'a> {
295 pub(super) fn new(
296 storage: &'a LiquidCache,
297 entry_id: &'a EntryID,
298 predicate: &'a Arc<dyn PhysicalExpr>,
299 ) -> Self {
300 Self {
301 storage,
302 entry_id,
303 predicate,
304 selection: None,
305 }
306 }
307
308 pub fn with_selection(mut self, selection: &'a BooleanBuffer) -> Self {
310 self.selection = Some(selection);
311 self
312 }
313
314 pub async fn read(self) -> Option<Result<BooleanArray, ArrayRef>> {
316 self.storage
317 .eval_predicate_internal(self.entry_id, self.selection, self.predicate)
318 .await
319 }
320}
321
322impl<'a> IntoFuture for EvaluatePredicate<'a> {
323 type Output = Option<Result<BooleanArray, ArrayRef>>;
324 type IntoFuture = Pin<
325 Box<dyn std::future::Future<Output = Option<Result<BooleanArray, ArrayRef>>> + Send + 'a>,
326 >;
327
328 fn into_future(self) -> Self::IntoFuture {
329 Box::pin(async move { self.read().await })
330 }
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336 use arrow::array::{AsArray, StructArray};
337 use arrow_schema::{DataType, Field, Fields};
338
339 #[tokio::test]
340 async fn insert_gcs_view_arrays_recursively() {
341 let bin = Arc::new(BinaryViewArray::from(vec![
343 Some(b"long_prefix_m0" as &[u8]),
344 Some(b"m1"),
345 ])) as ArrayRef;
346 let str_view = Arc::new(StringViewArray::from(vec![
347 Some("long_prefix_s0"),
348 Some("s1"),
349 ])) as ArrayRef;
350 let variant_metadata = Arc::new(BinaryViewArray::from(vec![
351 Some(b"meta0" as &[u8]),
352 Some(b"meta1"),
353 ])) as ArrayRef;
354 let variant_value = Arc::new(BinaryViewArray::from(vec![
355 Some(b"value0" as &[u8]),
356 Some(b"value1"),
357 ])) as ArrayRef;
358
359 let bin_slice = bin.slice(1, 1);
361 let str_slice = str_view.slice(1, 1);
362 let variant_metadata_slice = variant_metadata.slice(1, 1);
363 let variant_value_slice = variant_value.slice(1, 1);
364
365 let variant_typed_fields = Fields::from(vec![Arc::new(Field::new(
367 "typed_str",
368 DataType::Utf8View,
369 true,
370 ))]);
371 let variant_struct_fields = Fields::from(vec![
372 Arc::new(Field::new("metadata", DataType::BinaryView, true)),
373 Arc::new(Field::new("value", DataType::BinaryView, true)),
374 Arc::new(Field::new(
375 "typed_value",
376 DataType::Struct(variant_typed_fields.clone()),
377 true,
378 )),
379 ]);
380 let variant_struct = Arc::new(StructArray::new(
381 variant_struct_fields.clone(),
382 vec![
383 variant_metadata_slice.clone(),
384 variant_value_slice.clone(),
385 Arc::new(StructArray::new(
386 variant_typed_fields.clone(),
387 vec![str_slice.clone()],
388 None,
389 )) as ArrayRef,
390 ],
391 None,
392 ));
393
394 let root_fields = Fields::from(vec![
395 Arc::new(Field::new("bin_view", DataType::BinaryView, true)),
396 Arc::new(Field::new("str_view", DataType::Utf8View, true)),
397 Arc::new(Field::new(
398 "variant",
399 DataType::Struct(variant_struct_fields.clone()),
400 true,
401 )),
402 ]);
403 let root = Arc::new(StructArray::new(
404 root_fields,
405 vec![
406 bin_slice.clone(),
407 str_slice.clone(),
408 variant_struct.clone() as ArrayRef,
409 ],
410 None,
411 )) as ArrayRef;
412
413 let pre_size = root.get_array_memory_size();
414
415 let cache = LiquidCacheBuilder::new().build();
416 let entry_id = EntryID::from(123usize);
417 cache.insert(entry_id, root.clone()).await;
418
419 let stored = cache.get(&entry_id).await.expect("array present");
420 let post_size = stored.get_array_memory_size();
421
422 assert!(post_size < pre_size, "expected gc to reduce memory usage");
424
425 let struct_out = stored
427 .as_any()
428 .downcast_ref::<StructArray>()
429 .expect("struct array");
430
431 assert_eq!(struct_out.len(), 1);
432
433 let bin_out = struct_out
434 .column_by_name("bin_view")
435 .unwrap()
436 .as_binary_view();
437 assert_eq!(bin_out.value(0), b"m1");
438
439 let str_out = struct_out
440 .column_by_name("str_view")
441 .unwrap()
442 .as_string_view();
443 assert_eq!(str_out.value(0), "s1");
444
445 let variant_out = struct_out.column_by_name("variant").unwrap().as_struct();
446 let meta_out = variant_out
447 .column_by_name("metadata")
448 .unwrap()
449 .as_binary_view();
450 assert_eq!(meta_out.value(0), b"meta1");
451
452 let val_out = variant_out
453 .column_by_name("value")
454 .unwrap()
455 .as_binary_view();
456 assert_eq!(val_out.value(0), b"value1");
457
458 let typed_out = variant_out
459 .column_by_name("typed_value")
460 .unwrap()
461 .as_struct();
462 let typed_str_out = typed_out
463 .column_by_name("typed_str")
464 .unwrap()
465 .as_string_view();
466 assert_eq!(typed_str_out.value(0), "s1");
467 }
468}