1use std::{
5 collections::{HashMap, VecDeque},
6 ops::Range,
7 sync::{Arc, Mutex},
8};
9
10use arrow_array::{cast::AsArray, types::UInt32Type, ArrayRef, RecordBatch, UInt32Array};
11use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
12use bytes::Bytes;
13use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
14use datafusion_common::{arrow::datatypes::DataType, DFSchema, DFSchemaRef, ScalarValue};
15use datafusion_expr::{
16 col,
17 execution_props::ExecutionProps,
18 interval_arithmetic::{Interval, NullableInterval},
19 simplify::SimplifyContext,
20 Accumulator, Expr,
21};
22use datafusion_functions::core::expr_ext::FieldAccessor;
23use datafusion_optimizer::simplify_expressions::ExprSimplifier;
24use futures::{future::BoxFuture, FutureExt};
25use lance_datafusion::planner::Planner;
26use lance_encoding::{
27 buffer::LanceBuffer,
28 decoder::{
29 decode_batch, ColumnInfoIter, DecoderPlugins, FieldScheduler, FilterExpression,
30 PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
31 },
32 encoder::{
33 encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodedColumn, EncodingOptions,
34 FieldEncoder, OutOfLineBuffers,
35 },
36 format::pb,
37 repdef::RepDefBuilder,
38 EncodingsIo,
39};
40
41use lance_core::{cache::FileMetadataCache, datatypes::Schema, Error, Result};
42use lance_file::{
43 v2::{reader::EncodedBatchReaderExt, writer::EncodedBatchWriteExt},
44 version::LanceFileVersion,
45};
46use snafu::location;
47
48use crate::substrait::FilterExpressionExt;
49
50#[derive(Debug)]
51struct CreatedZoneMap {
52 min: ScalarValue,
53 max: ScalarValue,
54 null_count: u32,
55}
56
57#[derive(Default)]
61struct RangesBuilder {
62 ranges: Vec<Range<u64>>,
63}
64
65impl RangesBuilder {
66 fn add_range(&mut self, range: Range<u64>) {
67 if let Some(cur) = self.ranges.last_mut() {
68 if cur.end == range.start {
69 cur.end = range.end;
70 } else {
71 self.ranges.push(range);
72 }
73 } else {
74 self.ranges.push(range);
75 }
76 }
77}
78
79struct ZoneMapsFilter<F: Fn(u64) -> bool> {
80 filter: F,
81 rows_per_zone: u64,
82}
83
84impl<F: Fn(u64) -> bool> ZoneMapsFilter<F> {
85 fn new(filter: F, rows_per_zone: u64) -> Self {
86 Self {
87 filter,
88 rows_per_zone,
89 }
90 }
91
92 fn refine_range(&self, mut range: std::ops::Range<u64>) -> Vec<std::ops::Range<u64>> {
96 let mut ranges_builder = RangesBuilder::default();
97 let mut zone_idx = range.start / self.rows_per_zone;
98 while !range.is_empty() {
99 let end = range.end.min((zone_idx + 1) * self.rows_per_zone);
100
101 if (self.filter)(zone_idx) {
102 let zone_range = range.start..end;
103 ranges_builder.add_range(zone_range);
104 }
105
106 range.start = end;
107 zone_idx += 1;
108 }
109 ranges_builder.ranges
110 }
111
112 fn refine_ranges(&self, ranges: &[Range<u64>]) -> Vec<Range<u64>> {
113 ranges
114 .iter()
115 .flat_map(|r| self.refine_range(r.clone()))
116 .collect()
117 }
118}
119
120#[allow(unused)]
124fn path_to_expr(path: &VecDeque<u32>) -> Expr {
125 let mut parts_iter = path.iter().map(|path_num| path_num.to_string());
126 let mut expr = col(parts_iter.next().unwrap());
127 for part in parts_iter {
128 expr = expr.field(part);
129 }
130 expr
131}
132
133#[allow(unused)]
135pub(crate) fn extract_zone_info(
136 column_info: &mut ColumnInfoIter,
137 data_type: &DataType,
138 cur_path: &VecDeque<u32>,
139) -> Option<(u32, UnloadedPushdown)> {
140 let mut result: Option<(u32, UnloadedPushdown)> = None;
141 let result_ref = &mut result;
142 column_info.peek_transform(|col_info| {
143 let encoding = col_info.encoding.column_encoding.as_ref().unwrap();
144 match *encoding {
145 pb::column_encoding::ColumnEncoding::ZoneIndex(ref zone_index) => {
146 let mut zone_index = zone_index.clone();
147 let inner = zone_index.inner.take().unwrap();
148 let rows_per_zone = zone_index.rows_per_zone;
149 let zone_map_buffer = *zone_index.zone_map_buffer.as_ref().unwrap();
150 assert_eq!(
151 zone_map_buffer.buffer_type,
152 i32::from(pb::buffer::BufferType::Column)
153 );
154 let (position, size) =
155 col_info.buffer_offsets_and_sizes[zone_map_buffer.buffer_index as usize];
156 let column = path_to_expr(cur_path);
157 let unloaded_pushdown = UnloadedPushdown {
158 data_type: data_type.clone(),
159 column,
160 position,
161 size,
162 };
163 *result_ref = Some((rows_per_zone, unloaded_pushdown));
164
165 let mut col_info = col_info.as_ref().clone();
166 col_info.encoding = *inner;
167 Arc::new(col_info)
168 }
169 _ => col_info,
170 }
171 });
172 result
173}
174
175#[derive(Debug)]
182pub struct UnloadedPushdown {
183 data_type: DataType,
184 column: Expr,
185 position: u64,
186 size: u64,
187}
188
189#[derive(Debug)]
190struct ZoneMap {
191 items: Vec<(Expr, NullableInterval)>,
192}
193
194#[derive(Debug)]
195struct InitializedState {
196 zone_maps: Vec<ZoneMap>,
197 filter: Option<Expr>,
198 df_schema: Option<DFSchemaRef>,
199}
200
201#[derive(Debug)]
204pub struct ZoneMapsFieldScheduler {
205 inner: Arc<dyn FieldScheduler>,
206 schema: Arc<Schema>,
207 pushdown_buffers: HashMap<u32, UnloadedPushdown>,
209 rows_per_zone: u32,
210 num_rows: u64,
211 initialized_state: Mutex<Option<InitializedState>>,
212}
213
214impl ZoneMapsFieldScheduler {
215 pub fn new(
216 inner: Arc<dyn FieldScheduler>,
217 schema: Arc<Schema>,
218 pushdown_buffers: HashMap<u32, UnloadedPushdown>,
219 rows_per_zone: u32,
220 num_rows: u64,
221 ) -> Self {
222 Self {
223 inner,
224 schema,
225 pushdown_buffers,
226 rows_per_zone,
227 num_rows,
228 initialized_state: Mutex::new(None),
230 }
231 }
232
233 async fn load_pushdowns(
234 &self,
235 io: &dyn EncodingsIo,
236 _cache: &FileMetadataCache,
237 pushdowns: &[&UnloadedPushdown],
238 ) -> Result<Vec<ZoneMap>> {
239 let ranges = pushdowns
241 .iter()
242 .map(|pushdown| pushdown.position..pushdown.position + pushdown.size)
243 .collect();
244 let buffers = io.submit_request(ranges, 0).await?;
245 let mut maps = Vec::new();
246 for (buffer, pushdown) in buffers.into_iter().zip(pushdowns.iter()) {
247 let map = self
249 .parse_zone(buffer, &pushdown.data_type, &pushdown.column)
250 .await?;
251 maps.push(map);
252 }
253 let zone_maps = transpose2(maps)
256 .into_iter()
257 .map(|items| ZoneMap { items })
258 .collect();
259 Ok(zone_maps)
260 }
261
262 async fn load_maps(
264 &self,
265 io: &dyn EncodingsIo,
266 cache: &FileMetadataCache,
267 filter_schema: &Schema,
268 ) -> Result<Vec<ZoneMap>> {
269 let pushdowns_to_load = filter_schema
270 .fields
271 .iter()
272 .filter_map(|field| {
273 let field_id = field.id as u32;
274 let unloaded = self.pushdown_buffers.get(&field_id)?;
275 Some(unloaded)
276 })
277 .collect::<Vec<_>>();
278 self.load_pushdowns(io, cache, &pushdowns_to_load).await
279 }
280
281 async fn do_initialize(
282 &self,
283 io: &dyn EncodingsIo,
284 cache: &FileMetadataCache,
285 filter: &FilterExpression,
286 ) -> Result<()> {
287 if filter.is_noop() {
288 return Ok(());
289 }
290
291 let arrow_schema = ArrowSchema::from(self.schema.as_ref());
292 let df_schema = DFSchema::try_from(arrow_schema.clone())?;
293 let df_filter = filter.substrait_to_df(Arc::new(arrow_schema))?;
294
295 let columns = Planner::column_names_in_expr(&df_filter);
296 let referenced_schema = self.schema.project(&columns)?;
297
298 let df_schema = Some(Arc::new(df_schema));
299 let zone_maps = self.load_maps(io, cache, &referenced_schema).await?;
300 let filter = Some(df_filter);
301
302 let state = InitializedState {
303 zone_maps,
304 filter,
305 df_schema,
306 };
307 let mut initialized_state = self.initialized_state.lock().unwrap();
308 *initialized_state = Some(state);
309 Ok(())
310 }
311
312 fn create_filter(&self) -> Result<impl Fn(u64) -> bool + '_> {
313 Ok(move |zone_idx| {
314 let state = self.initialized_state.lock().unwrap();
315 let state = state.as_ref().unwrap();
316 let zone_map = &state.zone_maps[zone_idx as usize];
317 let props = ExecutionProps::new();
318 let context =
319 SimplifyContext::new(&props).with_schema(state.df_schema.as_ref().unwrap().clone());
320 let mut simplifier = ExprSimplifier::new(context);
321 simplifier = simplifier.with_guarantees(zone_map.items.clone());
322 match simplifier.simplify(state.filter.as_ref().unwrap().clone()) {
323 Ok(expr) => match expr {
324 Expr::Literal(ScalarValue::Boolean(Some(false))) => false,
326 _ => true,
328 },
329 Err(err) => {
330 log::debug!("Failed to simplify predicate: {}", err);
333 true
334 }
335 }
336 })
337 }
338
339 fn extract_guarantees(
341 stats: &RecordBatch,
342 rows_per_zone: u32,
343 num_rows: u64,
344 data_type: &DataType,
345 col: Expr,
346 ) -> Vec<(Expr, NullableInterval)> {
347 let min_values = stats.column(0);
348 let max_values = stats.column(1);
349 let null_counts = stats.column(2).as_primitive::<UInt32Type>();
350
351 let mut guarantees = Vec::new();
352 for zone_idx in 0..stats.num_rows() {
353 let num_rows_in_zone = if zone_idx == stats.num_rows() - 1 {
354 (num_rows % rows_per_zone as u64) as u32
355 } else {
356 rows_per_zone
357 };
358 let min_value = ScalarValue::try_from_array(&min_values, zone_idx).unwrap();
359 let max_value = ScalarValue::try_from_array(&max_values, zone_idx).unwrap();
360 let null_count = null_counts.values()[zone_idx];
361
362 let values = Interval::try_new(min_value, max_value).unwrap();
363 let interval = match (null_count, num_rows_in_zone) {
364 (0, _) => NullableInterval::NotNull { values },
365 (null_count, num_rows_in_zone) if null_count == num_rows_in_zone => {
366 NullableInterval::Null {
367 datatype: data_type.clone(),
368 }
369 }
370 _ => NullableInterval::MaybeNull { values },
371 };
372 guarantees.push((col.clone(), interval));
373 }
374 guarantees
375 }
376
377 async fn parse_zone(
378 &self,
379 buffer: Bytes,
380 data_type: &DataType,
381 col: &Expr,
382 ) -> Result<Vec<(Expr, NullableInterval)>> {
383 let zone_map_schema = Schema::try_from(&ArrowSchema::new(vec![
384 ArrowField::new("min", data_type.clone(), true),
385 ArrowField::new("max", data_type.clone(), true),
386 ArrowField::new("null_count", DataType::UInt32, false),
387 ]))
388 .unwrap();
389 let zone_maps_batch =
390 EncodedBatch::try_from_mini_lance(buffer, &zone_map_schema, LanceFileVersion::V2_0)?;
391 let zone_maps_batch = decode_batch(
392 &zone_maps_batch,
393 &FilterExpression::no_filter(),
394 Arc::<DecoderPlugins>::default(),
395 false,
396 LanceFileVersion::default(),
397 None,
398 )
399 .await?;
400
401 Ok(Self::extract_guarantees(
402 &zone_maps_batch,
403 self.rows_per_zone,
404 self.num_rows,
405 data_type,
406 col.clone(),
407 ))
408 }
409}
410
411fn transpose2<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
415 assert!(!v.is_empty());
416 let len = v[0].len();
417 let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect();
418 (0..len)
419 .map(|_| {
420 iters
421 .iter_mut()
422 .map(|n| n.next().unwrap())
423 .collect::<Vec<T>>()
424 })
425 .collect()
426}
427
428#[derive(Debug)]
430struct EmptySchedulingJob {}
431
432impl SchedulingJob for EmptySchedulingJob {
433 fn schedule_next(
434 &mut self,
435 _context: &mut SchedulerContext,
436 _priority: &dyn PriorityRange,
437 ) -> Result<ScheduledScanLine> {
438 Ok(ScheduledScanLine {
439 rows_scheduled: 0,
440 decoders: vec![],
441 })
442 }
443
444 fn num_rows(&self) -> u64 {
445 0
446 }
447}
448
449impl FieldScheduler for ZoneMapsFieldScheduler {
450 fn initialize<'a>(
451 &'a self,
452 filter: &'a FilterExpression,
453 context: &'a SchedulerContext,
454 ) -> BoxFuture<'a, Result<()>> {
455 async move {
456 self.do_initialize(context.io().as_ref(), context.cache(), filter)
457 .await
458 }
459 .boxed()
460 }
461
462 fn schedule_ranges<'a>(
463 &'a self,
464 ranges: &[std::ops::Range<u64>],
465 filter: &FilterExpression,
466 ) -> Result<Box<dyn SchedulingJob + 'a>> {
467 if filter.is_noop() {
468 return self.inner.schedule_ranges(ranges, filter);
469 }
470 let zone_filter_fn = self.create_filter()?;
471 let zone_filter = ZoneMapsFilter::new(zone_filter_fn, self.rows_per_zone as u64);
472 let ranges = zone_filter.refine_ranges(ranges);
473 if ranges.is_empty() {
474 Ok(Box::new(EmptySchedulingJob {}))
475 } else {
476 self.inner.schedule_ranges(&ranges, filter)
477 }
478 }
479
480 fn num_rows(&self) -> u64 {
481 self.inner.num_rows()
482 }
483}
484
485pub struct ZoneMapsFieldEncoder {
494 items_encoder: Box<dyn FieldEncoder>,
495 items_type: DataType,
496
497 rows_per_map: u32,
498
499 maps: Vec<CreatedZoneMap>,
500 cur_offset: u32,
501 min: MinAccumulator,
502 max: MaxAccumulator,
503 null_count: u32,
504}
505
506impl ZoneMapsFieldEncoder {
507 pub fn try_new(
508 items_encoder: Box<dyn FieldEncoder>,
509 items_type: DataType,
510 rows_per_map: u32,
511 ) -> Result<Self> {
512 let min = MinAccumulator::try_new(&items_type)?;
513 let max = MaxAccumulator::try_new(&items_type)?;
514 Ok(Self {
515 rows_per_map,
516 items_encoder,
517 items_type,
518 min,
519 max,
520 null_count: 0,
521 cur_offset: 0,
522 maps: Vec::new(),
523 })
524 }
525}
526
527impl ZoneMapsFieldEncoder {
528 fn new_map(&mut self) -> Result<()> {
529 let map = CreatedZoneMap {
531 min: self.min.evaluate()?,
532 max: self.max.evaluate()?,
533 null_count: self.null_count,
534 };
535 self.maps.push(map);
536 self.min = MinAccumulator::try_new(&self.items_type)?;
537 self.max = MaxAccumulator::try_new(&self.items_type)?;
538 self.null_count = 0;
539 self.cur_offset = 0;
540 Ok(())
541 }
542
543 fn update_stats(&mut self, array: &ArrayRef) -> Result<()> {
544 self.null_count += array.null_count() as u32;
545 self.min.update_batch(&[array.clone()])?;
546 self.max.update_batch(&[array.clone()])?;
547 Ok(())
548 }
549
550 fn update(&mut self, array: &ArrayRef) -> Result<()> {
551 let mut remaining = array.len() as u32;
552 let mut offset = 0;
553
554 while remaining > 0 {
555 let desired = self.rows_per_map - self.cur_offset;
556 if desired > remaining {
557 self.update_stats(&array.slice(offset, remaining as usize))?;
559 self.cur_offset += remaining;
560 break;
561 } else {
562 self.update_stats(&array.slice(offset, desired as usize))?;
564 self.new_map()?;
565 }
566 offset += desired as usize;
567 remaining = remaining.saturating_sub(desired);
568 }
569 Ok(())
570 }
571
572 async fn maps_to_metadata(maps: Vec<CreatedZoneMap>) -> Result<LanceBuffer> {
573 let (mins, (maxes, null_counts)): (Vec<_>, (Vec<_>, Vec<_>)) = maps
574 .into_iter()
575 .map(|mp| (mp.min, (mp.max, mp.null_count)))
576 .unzip();
577 let mins = ScalarValue::iter_to_array(mins.into_iter())?;
578 let maxes = ScalarValue::iter_to_array(maxes.into_iter())?;
579 let null_counts = Arc::new(UInt32Array::from_iter_values(null_counts.into_iter()));
580 let zone_map_schema = ArrowSchema::new(vec![
581 ArrowField::new("min", mins.data_type().clone(), true),
582 ArrowField::new("max", maxes.data_type().clone(), true),
583 ArrowField::new("null_count", DataType::UInt32, false),
584 ]);
585 let schema = Schema::try_from(&zone_map_schema)?;
586 let zone_maps =
587 RecordBatch::try_new(Arc::new(zone_map_schema), vec![mins, maxes, null_counts])?;
588 let encoding_strategy = CoreFieldEncodingStrategy::default();
589 let encoded_zone_maps = encode_batch(
590 &zone_maps,
591 Arc::new(schema),
592 &encoding_strategy,
593 &EncodingOptions {
594 cache_bytes_per_column: u64::MAX,
595 max_page_bytes: u64::MAX,
596 keep_original_array: true,
597 buffer_alignment: 8,
598 },
599 )
600 .await?;
601 let zone_maps_buffer = encoded_zone_maps.try_to_mini_lance()?;
602
603 Ok(LanceBuffer::from_bytes(zone_maps_buffer, 1))
604 }
605}
606
607impl FieldEncoder for ZoneMapsFieldEncoder {
608 fn maybe_encode(
609 &mut self,
610 array: ArrayRef,
611 external_buffers: &mut OutOfLineBuffers,
612 repdef: RepDefBuilder,
613 row_number: u64,
614 num_rows: u64,
615 ) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
616 self.update(&array)?;
622 self.items_encoder
623 .maybe_encode(array, external_buffers, repdef, row_number, num_rows)
624 }
625
626 fn flush(
627 &mut self,
628 external_buffers: &mut OutOfLineBuffers,
629 ) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
630 self.items_encoder.flush(external_buffers)
631 }
632
633 fn finish(
634 &mut self,
635 external_buffers: &mut OutOfLineBuffers,
636 ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
637 if self.cur_offset > 0 {
638 if let Err(err) = self.new_map() {
640 return async move { Err(err) }.boxed();
641 }
642 }
643 let maps = std::mem::take(&mut self.maps);
644 let rows_per_zone = self.rows_per_map;
645 let items_columns = self.items_encoder.finish(external_buffers);
646
647 async move {
648 let items_columns = items_columns.await?;
649 if items_columns.is_empty() {
650 return Err(Error::invalid_input("attempt to apply zone maps to a field encoder that generated zero columns of data".to_string(), location!()))
651 }
652 let items_column = items_columns.into_iter().next().unwrap();
653 let final_pages = items_column.final_pages;
654 let mut column_buffers = items_column.column_buffers;
655 let zone_buffer_index = column_buffers.len();
656 column_buffers.push(Self::maps_to_metadata(maps).await?);
657 let column_encoding = pb::ColumnEncoding {
658 column_encoding: Some(pb::column_encoding::ColumnEncoding::ZoneIndex(Box::new(
659 pb::ZoneIndex {
660 inner: Some(Box::new(items_column.encoding)),
661 rows_per_zone,
662 zone_map_buffer: Some(pb::Buffer {
663 buffer_index: zone_buffer_index as u32,
664 buffer_type: i32::from(pb::buffer::BufferType::Column),
665 }),
666 },
667 ))),
668 };
669 Ok(vec![EncodedColumn {
670 encoding: column_encoding,
671 final_pages,
672 column_buffers,
673 }])
674 }
675 .boxed()
676 }
677
678 fn num_columns(&self) -> u32 {
679 self.items_encoder.num_columns()
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use std::sync::Arc;
686
687 use arrow_array::types::Int32Type;
688 use datafusion_common::ScalarValue;
689 use datafusion_expr::{col, BinaryExpr, Expr, Operator};
690 use lance_datagen::{BatchCount, RowCount};
691 use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
692 use lance_file::v2::{
693 testing::{count_lance_file, write_lance_file, FsFixture},
694 writer::FileWriterOptions,
695 };
696
697 use crate::{substrait::FilterExpressionExt, LanceDfFieldEncodingStrategy};
698
699 #[ignore]
700 #[test_log::test(tokio::test)]
701 async fn test_basic_stats() {
702 let data = lance_datagen::gen()
703 .col("0", lance_datagen::array::step::<Int32Type>())
704 .into_reader_rows(RowCount::from(1024), BatchCount::from(30));
705
706 let fs = FsFixture::default();
707
708 let options = FileWriterOptions {
709 encoding_strategy: Some(Arc::new(LanceDfFieldEncodingStrategy::default())),
710 ..Default::default()
711 };
712
713 let written_file = write_lance_file(data, &fs, options).await;
714
715 let decoder_middleware: Arc<DecoderPlugins> = Arc::default();
716
717 let num_rows = written_file
718 .data
719 .iter()
720 .map(|rb| rb.num_rows())
721 .sum::<usize>();
722
723 let result = count_lance_file(
724 &fs,
725 decoder_middleware.clone(),
726 FilterExpression::no_filter(),
727 )
728 .await;
729 assert_eq!(num_rows, result);
730
731 let result = count_lance_file(
732 &fs,
733 decoder_middleware.clone(),
734 FilterExpression::df_to_substrait(
735 Expr::BinaryExpr(BinaryExpr {
736 left: Box::new(col("0")),
737 op: Operator::Gt,
738 right: Box::new(Expr::Literal(ScalarValue::Int32(Some(50000)))),
739 }),
740 written_file.schema.as_ref(),
741 )
742 .unwrap(),
743 )
744 .await;
745 assert_eq!(0, result);
746
747 let result = count_lance_file(
748 &fs,
749 decoder_middleware,
750 FilterExpression::df_to_substrait(
751 Expr::BinaryExpr(BinaryExpr {
752 left: Box::new(col("0")),
753 op: Operator::Gt,
754 right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
755 }),
756 written_file.schema.as_ref(),
757 )
758 .unwrap(),
759 )
760 .await;
761 assert_eq!(30 * 1024 - 20000, result);
762 }
763}