lance_encoding_datafusion/
zone.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    collections::{HashMap, VecDeque},
6    ops::Range,
7    sync::{Arc, Mutex},
8};
9
10use arrow_array::{cast::AsArray, types::UInt32Type, ArrayRef, RecordBatch, UInt32Array};
11use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
12use bytes::Bytes;
13use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
14use datafusion_common::{arrow::datatypes::DataType, DFSchema, DFSchemaRef, ScalarValue};
15use datafusion_expr::{
16    col,
17    execution_props::ExecutionProps,
18    interval_arithmetic::{Interval, NullableInterval},
19    simplify::SimplifyContext,
20    Accumulator, Expr,
21};
22use datafusion_functions::core::expr_ext::FieldAccessor;
23use datafusion_optimizer::simplify_expressions::ExprSimplifier;
24use futures::{future::BoxFuture, FutureExt};
25use lance_datafusion::planner::Planner;
26use lance_encoding::{
27    buffer::LanceBuffer,
28    decoder::{
29        decode_batch, ColumnInfoIter, DecoderPlugins, FieldScheduler, FilterExpression,
30        PriorityRange, ScheduledScanLine, SchedulerContext, SchedulingJob,
31    },
32    encoder::{
33        encode_batch, CoreFieldEncodingStrategy, EncodedBatch, EncodedColumn, EncodingOptions,
34        FieldEncoder, OutOfLineBuffers,
35    },
36    format::pb,
37    repdef::RepDefBuilder,
38    EncodingsIo,
39};
40
41use lance_core::{cache::FileMetadataCache, datatypes::Schema, Error, Result};
42use lance_file::{
43    v2::{reader::EncodedBatchReaderExt, writer::EncodedBatchWriteExt},
44    version::LanceFileVersion,
45};
46use snafu::location;
47
48use crate::substrait::FilterExpressionExt;
49
50#[derive(Debug)]
51struct CreatedZoneMap {
52    min: ScalarValue,
53    max: ScalarValue,
54    null_count: u32,
55}
56
57/// Builds up a vector of ranges from a series of sorted ranges that
58/// may be adjacent (in which case we merge them) or disjoint (in
59/// which case we create separate ranges).
60#[derive(Default)]
61struct RangesBuilder {
62    ranges: Vec<Range<u64>>,
63}
64
65impl RangesBuilder {
66    fn add_range(&mut self, range: Range<u64>) {
67        if let Some(cur) = self.ranges.last_mut() {
68            if cur.end == range.start {
69                cur.end = range.end;
70            } else {
71                self.ranges.push(range);
72            }
73        } else {
74            self.ranges.push(range);
75        }
76    }
77}
78
79struct ZoneMapsFilter<F: Fn(u64) -> bool> {
80    filter: F,
81    rows_per_zone: u64,
82}
83
84impl<F: Fn(u64) -> bool> ZoneMapsFilter<F> {
85    fn new(filter: F, rows_per_zone: u64) -> Self {
86        Self {
87            filter,
88            rows_per_zone,
89        }
90    }
91
92    /// Given a requested range, and a filter telling us which zones
93    /// could possibly include matching data, generate a smaller range
94    /// (or ranges) that only include matching zones.
95    fn refine_range(&self, mut range: std::ops::Range<u64>) -> Vec<std::ops::Range<u64>> {
96        let mut ranges_builder = RangesBuilder::default();
97        let mut zone_idx = range.start / self.rows_per_zone;
98        while !range.is_empty() {
99            let end = range.end.min((zone_idx + 1) * self.rows_per_zone);
100
101            if (self.filter)(zone_idx) {
102                let zone_range = range.start..end;
103                ranges_builder.add_range(zone_range);
104            }
105
106            range.start = end;
107            zone_idx += 1;
108        }
109        ranges_builder.ranges
110    }
111
112    fn refine_ranges(&self, ranges: &[Range<u64>]) -> Vec<Range<u64>> {
113        ranges
114            .iter()
115            .flat_map(|r| self.refine_range(r.clone()))
116            .collect()
117    }
118}
119
120/// Substrait represents paths as a series of field indices
121///
122/// This method converts that into a datafusion expression
123#[allow(unused)]
124fn path_to_expr(path: &VecDeque<u32>) -> Expr {
125    let mut parts_iter = path.iter().map(|path_num| path_num.to_string());
126    let mut expr = col(parts_iter.next().unwrap());
127    for part in parts_iter {
128        expr = expr.field(part);
129    }
130    expr
131}
132
133/// If a column has zone info in the encoding description then extract it
134#[allow(unused)]
135pub(crate) fn extract_zone_info(
136    column_info: &mut ColumnInfoIter,
137    data_type: &DataType,
138    cur_path: &VecDeque<u32>,
139) -> Option<(u32, UnloadedPushdown)> {
140    let mut result: Option<(u32, UnloadedPushdown)> = None;
141    let result_ref = &mut result;
142    column_info.peek_transform(|col_info| {
143        let encoding = col_info.encoding.column_encoding.as_ref().unwrap();
144        match *encoding {
145            pb::column_encoding::ColumnEncoding::ZoneIndex(ref zone_index) => {
146                let mut zone_index = zone_index.clone();
147                let inner = zone_index.inner.take().unwrap();
148                let rows_per_zone = zone_index.rows_per_zone;
149                let zone_map_buffer = *zone_index.zone_map_buffer.as_ref().unwrap();
150                assert_eq!(
151                    zone_map_buffer.buffer_type,
152                    i32::from(pb::buffer::BufferType::Column)
153                );
154                let (position, size) =
155                    col_info.buffer_offsets_and_sizes[zone_map_buffer.buffer_index as usize];
156                let column = path_to_expr(cur_path);
157                let unloaded_pushdown = UnloadedPushdown {
158                    data_type: data_type.clone(),
159                    column,
160                    position,
161                    size,
162                };
163                *result_ref = Some((rows_per_zone, unloaded_pushdown));
164
165                let mut col_info = col_info.as_ref().clone();
166                col_info.encoding = *inner;
167                Arc::new(col_info)
168            }
169            _ => col_info,
170        }
171    });
172    result
173}
174
175/// Extracted pushdown information obtained from the column encoding
176/// description.
177///
178/// This is "unloaded" because we haven't yet loaded the actual zone
179/// maps from the file (though position and size tell us where they
180/// are)
181#[derive(Debug)]
182pub struct UnloadedPushdown {
183    data_type: DataType,
184    column: Expr,
185    position: u64,
186    size: u64,
187}
188
189#[derive(Debug)]
190struct ZoneMap {
191    items: Vec<(Expr, NullableInterval)>,
192}
193
194#[derive(Debug)]
195struct InitializedState {
196    zone_maps: Vec<ZoneMap>,
197    filter: Option<Expr>,
198    df_schema: Option<DFSchemaRef>,
199}
200
201/// A top level scheduler that refines the requested range based on
202/// pushdown filtering with zone maps
203#[derive(Debug)]
204pub struct ZoneMapsFieldScheduler {
205    inner: Arc<dyn FieldScheduler>,
206    schema: Arc<Schema>,
207    // A map from field id to unloaded zone map for that field
208    pushdown_buffers: HashMap<u32, UnloadedPushdown>,
209    rows_per_zone: u32,
210    num_rows: u64,
211    initialized_state: Mutex<Option<InitializedState>>,
212}
213
214impl ZoneMapsFieldScheduler {
215    pub fn new(
216        inner: Arc<dyn FieldScheduler>,
217        schema: Arc<Schema>,
218        pushdown_buffers: HashMap<u32, UnloadedPushdown>,
219        rows_per_zone: u32,
220        num_rows: u64,
221    ) -> Self {
222        Self {
223            inner,
224            schema,
225            pushdown_buffers,
226            rows_per_zone,
227            num_rows,
228            // These are set during initialization
229            initialized_state: Mutex::new(None),
230        }
231    }
232
233    async fn load_pushdowns(
234        &self,
235        io: &dyn EncodingsIo,
236        _cache: &FileMetadataCache,
237        pushdowns: &[&UnloadedPushdown],
238    ) -> Result<Vec<ZoneMap>> {
239        // TODO: Use cache
240        let ranges = pushdowns
241            .iter()
242            .map(|pushdown| pushdown.position..pushdown.position + pushdown.size)
243            .collect();
244        let buffers = io.submit_request(ranges, 0).await?;
245        let mut maps = Vec::new();
246        for (buffer, pushdown) in buffers.into_iter().zip(pushdowns.iter()) {
247            // There's no point in running this in parallel since it's actually synchronous
248            let map = self
249                .parse_zone(buffer, &pushdown.data_type, &pushdown.column)
250                .await?;
251            maps.push(map);
252        }
253        // A this point each item in `maps` is a vector of guarantees for a single field
254        // We need to transpose this so that each item is a vector of guarantees for a single zone
255        let zone_maps = transpose2(maps)
256            .into_iter()
257            .map(|items| ZoneMap { items })
258            .collect();
259        Ok(zone_maps)
260    }
261
262    /// Load the zone maps from the file
263    async fn load_maps(
264        &self,
265        io: &dyn EncodingsIo,
266        cache: &FileMetadataCache,
267        filter_schema: &Schema,
268    ) -> Result<Vec<ZoneMap>> {
269        let pushdowns_to_load = filter_schema
270            .fields
271            .iter()
272            .filter_map(|field| {
273                let field_id = field.id as u32;
274                let unloaded = self.pushdown_buffers.get(&field_id)?;
275                Some(unloaded)
276            })
277            .collect::<Vec<_>>();
278        self.load_pushdowns(io, cache, &pushdowns_to_load).await
279    }
280
281    async fn do_initialize(
282        &self,
283        io: &dyn EncodingsIo,
284        cache: &FileMetadataCache,
285        filter: &FilterExpression,
286    ) -> Result<()> {
287        if filter.is_noop() {
288            return Ok(());
289        }
290
291        let arrow_schema = ArrowSchema::from(self.schema.as_ref());
292        let df_schema = DFSchema::try_from(arrow_schema.clone())?;
293        let df_filter = filter.substrait_to_df(Arc::new(arrow_schema))?;
294
295        let columns = Planner::column_names_in_expr(&df_filter);
296        let referenced_schema = self.schema.project(&columns)?;
297
298        let df_schema = Some(Arc::new(df_schema));
299        let zone_maps = self.load_maps(io, cache, &referenced_schema).await?;
300        let filter = Some(df_filter);
301
302        let state = InitializedState {
303            zone_maps,
304            filter,
305            df_schema,
306        };
307        let mut initialized_state = self.initialized_state.lock().unwrap();
308        *initialized_state = Some(state);
309        Ok(())
310    }
311
312    fn create_filter(&self) -> Result<impl Fn(u64) -> bool + '_> {
313        Ok(move |zone_idx| {
314            let state = self.initialized_state.lock().unwrap();
315            let state = state.as_ref().unwrap();
316            let zone_map = &state.zone_maps[zone_idx as usize];
317            let props = ExecutionProps::new();
318            let context =
319                SimplifyContext::new(&props).with_schema(state.df_schema.as_ref().unwrap().clone());
320            let mut simplifier = ExprSimplifier::new(context);
321            simplifier = simplifier.with_guarantees(zone_map.items.clone());
322            match simplifier.simplify(state.filter.as_ref().unwrap().clone()) {
323                Ok(expr) => match expr {
324                    // Predicate, given guarantees, is always false, we can skip the zone
325                    Expr::Literal(ScalarValue::Boolean(Some(false))) => false,
326                    // Predicate may be true, need to load the zone
327                    _ => true,
328                },
329                Err(err) => {
330                    // TODO: this logs on each iteration, but maybe should should
331                    // only log once per call of this func?
332                    log::debug!("Failed to simplify predicate: {}", err);
333                    true
334                }
335            }
336        })
337    }
338
339    /// Parse the statistics into a set of guarantees for each batch.
340    fn extract_guarantees(
341        stats: &RecordBatch,
342        rows_per_zone: u32,
343        num_rows: u64,
344        data_type: &DataType,
345        col: Expr,
346    ) -> Vec<(Expr, NullableInterval)> {
347        let min_values = stats.column(0);
348        let max_values = stats.column(1);
349        let null_counts = stats.column(2).as_primitive::<UInt32Type>();
350
351        let mut guarantees = Vec::new();
352        for zone_idx in 0..stats.num_rows() {
353            let num_rows_in_zone = if zone_idx == stats.num_rows() - 1 {
354                (num_rows % rows_per_zone as u64) as u32
355            } else {
356                rows_per_zone
357            };
358            let min_value = ScalarValue::try_from_array(&min_values, zone_idx).unwrap();
359            let max_value = ScalarValue::try_from_array(&max_values, zone_idx).unwrap();
360            let null_count = null_counts.values()[zone_idx];
361
362            let values = Interval::try_new(min_value, max_value).unwrap();
363            let interval = match (null_count, num_rows_in_zone) {
364                (0, _) => NullableInterval::NotNull { values },
365                (null_count, num_rows_in_zone) if null_count == num_rows_in_zone => {
366                    NullableInterval::Null {
367                        datatype: data_type.clone(),
368                    }
369                }
370                _ => NullableInterval::MaybeNull { values },
371            };
372            guarantees.push((col.clone(), interval));
373        }
374        guarantees
375    }
376
377    async fn parse_zone(
378        &self,
379        buffer: Bytes,
380        data_type: &DataType,
381        col: &Expr,
382    ) -> Result<Vec<(Expr, NullableInterval)>> {
383        let zone_map_schema = Schema::try_from(&ArrowSchema::new(vec![
384            ArrowField::new("min", data_type.clone(), true),
385            ArrowField::new("max", data_type.clone(), true),
386            ArrowField::new("null_count", DataType::UInt32, false),
387        ]))
388        .unwrap();
389        let zone_maps_batch =
390            EncodedBatch::try_from_mini_lance(buffer, &zone_map_schema, LanceFileVersion::V2_0)?;
391        let zone_maps_batch = decode_batch(
392            &zone_maps_batch,
393            &FilterExpression::no_filter(),
394            Arc::<DecoderPlugins>::default(),
395            /*should_validate= */ false,
396            LanceFileVersion::default(),
397            None,
398        )
399        .await?;
400
401        Ok(Self::extract_guarantees(
402            &zone_maps_batch,
403            self.rows_per_zone,
404            self.num_rows,
405            data_type,
406            col.clone(),
407        ))
408    }
409}
410
411// Utility function to transpose Vec<Vec<...>> from Stack Overflow
412// https://stackoverflow.com/questions/64498617/how-to-transpose-a-vector-of-vectors-in-rust
413// Author: https://stackoverflow.com/users/1695172/netwave
414fn transpose2<T>(v: Vec<Vec<T>>) -> Vec<Vec<T>> {
415    assert!(!v.is_empty());
416    let len = v[0].len();
417    let mut iters: Vec<_> = v.into_iter().map(|n| n.into_iter()).collect();
418    (0..len)
419        .map(|_| {
420            iters
421                .iter_mut()
422                .map(|n| n.next().unwrap())
423                .collect::<Vec<T>>()
424        })
425        .collect()
426}
427
428// Schedulers don't always handle empty ranges well, so we need to provide a dummy job
429#[derive(Debug)]
430struct EmptySchedulingJob {}
431
432impl SchedulingJob for EmptySchedulingJob {
433    fn schedule_next(
434        &mut self,
435        _context: &mut SchedulerContext,
436        _priority: &dyn PriorityRange,
437    ) -> Result<ScheduledScanLine> {
438        Ok(ScheduledScanLine {
439            rows_scheduled: 0,
440            decoders: vec![],
441        })
442    }
443
444    fn num_rows(&self) -> u64 {
445        0
446    }
447}
448
449impl FieldScheduler for ZoneMapsFieldScheduler {
450    fn initialize<'a>(
451        &'a self,
452        filter: &'a FilterExpression,
453        context: &'a SchedulerContext,
454    ) -> BoxFuture<'a, Result<()>> {
455        async move {
456            self.do_initialize(context.io().as_ref(), context.cache(), filter)
457                .await
458        }
459        .boxed()
460    }
461
462    fn schedule_ranges<'a>(
463        &'a self,
464        ranges: &[std::ops::Range<u64>],
465        filter: &FilterExpression,
466    ) -> Result<Box<dyn SchedulingJob + 'a>> {
467        if filter.is_noop() {
468            return self.inner.schedule_ranges(ranges, filter);
469        }
470        let zone_filter_fn = self.create_filter()?;
471        let zone_filter = ZoneMapsFilter::new(zone_filter_fn, self.rows_per_zone as u64);
472        let ranges = zone_filter.refine_ranges(ranges);
473        if ranges.is_empty() {
474            Ok(Box::new(EmptySchedulingJob {}))
475        } else {
476            self.inner.schedule_ranges(&ranges, filter)
477        }
478    }
479
480    fn num_rows(&self) -> u64 {
481        self.inner.num_rows()
482    }
483}
484
485/// A field encoder that creates zone maps for the data it encodes
486///
487/// This encoder will create zone maps for the data it encodes.  The zone maps are created by
488/// dividing the data into zones of a fixed size and calculating the min/max values for each
489/// zone.  The zone maps are then encoded as metadata.
490///
491/// This metadata can be used by the reader to skip over zones that don't contain data that
492/// matches the query.
493pub struct ZoneMapsFieldEncoder {
494    items_encoder: Box<dyn FieldEncoder>,
495    items_type: DataType,
496
497    rows_per_map: u32,
498
499    maps: Vec<CreatedZoneMap>,
500    cur_offset: u32,
501    min: MinAccumulator,
502    max: MaxAccumulator,
503    null_count: u32,
504}
505
506impl ZoneMapsFieldEncoder {
507    pub fn try_new(
508        items_encoder: Box<dyn FieldEncoder>,
509        items_type: DataType,
510        rows_per_map: u32,
511    ) -> Result<Self> {
512        let min = MinAccumulator::try_new(&items_type)?;
513        let max = MaxAccumulator::try_new(&items_type)?;
514        Ok(Self {
515            rows_per_map,
516            items_encoder,
517            items_type,
518            min,
519            max,
520            null_count: 0,
521            cur_offset: 0,
522            maps: Vec::new(),
523        })
524    }
525}
526
527impl ZoneMapsFieldEncoder {
528    fn new_map(&mut self) -> Result<()> {
529        // TODO: We should be truncating the min/max values here
530        let map = CreatedZoneMap {
531            min: self.min.evaluate()?,
532            max: self.max.evaluate()?,
533            null_count: self.null_count,
534        };
535        self.maps.push(map);
536        self.min = MinAccumulator::try_new(&self.items_type)?;
537        self.max = MaxAccumulator::try_new(&self.items_type)?;
538        self.null_count = 0;
539        self.cur_offset = 0;
540        Ok(())
541    }
542
543    fn update_stats(&mut self, array: &ArrayRef) -> Result<()> {
544        self.null_count += array.null_count() as u32;
545        self.min.update_batch(&[array.clone()])?;
546        self.max.update_batch(&[array.clone()])?;
547        Ok(())
548    }
549
550    fn update(&mut self, array: &ArrayRef) -> Result<()> {
551        let mut remaining = array.len() as u32;
552        let mut offset = 0;
553
554        while remaining > 0 {
555            let desired = self.rows_per_map - self.cur_offset;
556            if desired > remaining {
557                // Not enough data to fill a map, increment counts and return
558                self.update_stats(&array.slice(offset, remaining as usize))?;
559                self.cur_offset += remaining;
560                break;
561            } else {
562                // We have enough data to fill a map
563                self.update_stats(&array.slice(offset, desired as usize))?;
564                self.new_map()?;
565            }
566            offset += desired as usize;
567            remaining = remaining.saturating_sub(desired);
568        }
569        Ok(())
570    }
571
572    async fn maps_to_metadata(maps: Vec<CreatedZoneMap>) -> Result<LanceBuffer> {
573        let (mins, (maxes, null_counts)): (Vec<_>, (Vec<_>, Vec<_>)) = maps
574            .into_iter()
575            .map(|mp| (mp.min, (mp.max, mp.null_count)))
576            .unzip();
577        let mins = ScalarValue::iter_to_array(mins.into_iter())?;
578        let maxes = ScalarValue::iter_to_array(maxes.into_iter())?;
579        let null_counts = Arc::new(UInt32Array::from_iter_values(null_counts.into_iter()));
580        let zone_map_schema = ArrowSchema::new(vec![
581            ArrowField::new("min", mins.data_type().clone(), true),
582            ArrowField::new("max", maxes.data_type().clone(), true),
583            ArrowField::new("null_count", DataType::UInt32, false),
584        ]);
585        let schema = Schema::try_from(&zone_map_schema)?;
586        let zone_maps =
587            RecordBatch::try_new(Arc::new(zone_map_schema), vec![mins, maxes, null_counts])?;
588        let encoding_strategy = CoreFieldEncodingStrategy::default();
589        let encoded_zone_maps = encode_batch(
590            &zone_maps,
591            Arc::new(schema),
592            &encoding_strategy,
593            &EncodingOptions {
594                cache_bytes_per_column: u64::MAX,
595                max_page_bytes: u64::MAX,
596                keep_original_array: true,
597                buffer_alignment: 8,
598            },
599        )
600        .await?;
601        let zone_maps_buffer = encoded_zone_maps.try_to_mini_lance()?;
602
603        Ok(LanceBuffer::from_bytes(zone_maps_buffer, 1))
604    }
605}
606
607impl FieldEncoder for ZoneMapsFieldEncoder {
608    fn maybe_encode(
609        &mut self,
610        array: ArrayRef,
611        external_buffers: &mut OutOfLineBuffers,
612        repdef: RepDefBuilder,
613        row_number: u64,
614        num_rows: u64,
615    ) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
616        // TODO: If we do the zone map calculation as part of the encoding task then we can
617        // parallelize statistics gathering.  Could be faster too since the encoding task is
618        // going to need to access the same data (although the input to an encoding task is
619        // probably too big for the CPU cache anyways).  We can worry about this if we need
620        // to improve write speed.
621        self.update(&array)?;
622        self.items_encoder
623            .maybe_encode(array, external_buffers, repdef, row_number, num_rows)
624    }
625
626    fn flush(
627        &mut self,
628        external_buffers: &mut OutOfLineBuffers,
629    ) -> Result<Vec<lance_encoding::encoder::EncodeTask>> {
630        self.items_encoder.flush(external_buffers)
631    }
632
633    fn finish(
634        &mut self,
635        external_buffers: &mut OutOfLineBuffers,
636    ) -> BoxFuture<'_, Result<Vec<EncodedColumn>>> {
637        if self.cur_offset > 0 {
638            // Create final map
639            if let Err(err) = self.new_map() {
640                return async move { Err(err) }.boxed();
641            }
642        }
643        let maps = std::mem::take(&mut self.maps);
644        let rows_per_zone = self.rows_per_map;
645        let items_columns = self.items_encoder.finish(external_buffers);
646
647        async move {
648            let items_columns = items_columns.await?;
649            if items_columns.is_empty() {
650                return Err(Error::invalid_input("attempt to apply zone maps to a field encoder that generated zero columns of data".to_string(), location!()))
651            }
652            let items_column = items_columns.into_iter().next().unwrap();
653            let final_pages = items_column.final_pages;
654            let mut column_buffers = items_column.column_buffers;
655            let zone_buffer_index = column_buffers.len();
656            column_buffers.push(Self::maps_to_metadata(maps).await?);
657            let column_encoding = pb::ColumnEncoding {
658                column_encoding: Some(pb::column_encoding::ColumnEncoding::ZoneIndex(Box::new(
659                    pb::ZoneIndex {
660                        inner: Some(Box::new(items_column.encoding)),
661                        rows_per_zone,
662                        zone_map_buffer: Some(pb::Buffer {
663                            buffer_index: zone_buffer_index as u32,
664                            buffer_type: i32::from(pb::buffer::BufferType::Column),
665                        }),
666                    },
667                ))),
668            };
669            Ok(vec![EncodedColumn {
670                encoding: column_encoding,
671                final_pages,
672                column_buffers,
673            }])
674        }
675        .boxed()
676    }
677
678    fn num_columns(&self) -> u32 {
679        self.items_encoder.num_columns()
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use std::sync::Arc;
686
687    use arrow_array::types::Int32Type;
688    use datafusion_common::ScalarValue;
689    use datafusion_expr::{col, BinaryExpr, Expr, Operator};
690    use lance_datagen::{BatchCount, RowCount};
691    use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
692    use lance_file::v2::{
693        testing::{count_lance_file, write_lance_file, FsFixture},
694        writer::FileWriterOptions,
695    };
696
697    use crate::{substrait::FilterExpressionExt, LanceDfFieldEncodingStrategy};
698
699    #[ignore]
700    #[test_log::test(tokio::test)]
701    async fn test_basic_stats() {
702        let data = lance_datagen::gen()
703            .col("0", lance_datagen::array::step::<Int32Type>())
704            .into_reader_rows(RowCount::from(1024), BatchCount::from(30));
705
706        let fs = FsFixture::default();
707
708        let options = FileWriterOptions {
709            encoding_strategy: Some(Arc::new(LanceDfFieldEncodingStrategy::default())),
710            ..Default::default()
711        };
712
713        let written_file = write_lance_file(data, &fs, options).await;
714
715        let decoder_middleware: Arc<DecoderPlugins> = Arc::default();
716
717        let num_rows = written_file
718            .data
719            .iter()
720            .map(|rb| rb.num_rows())
721            .sum::<usize>();
722
723        let result = count_lance_file(
724            &fs,
725            decoder_middleware.clone(),
726            FilterExpression::no_filter(),
727        )
728        .await;
729        assert_eq!(num_rows, result);
730
731        let result = count_lance_file(
732            &fs,
733            decoder_middleware.clone(),
734            FilterExpression::df_to_substrait(
735                Expr::BinaryExpr(BinaryExpr {
736                    left: Box::new(col("0")),
737                    op: Operator::Gt,
738                    right: Box::new(Expr::Literal(ScalarValue::Int32(Some(50000)))),
739                }),
740                written_file.schema.as_ref(),
741            )
742            .unwrap(),
743        )
744        .await;
745        assert_eq!(0, result);
746
747        let result = count_lance_file(
748            &fs,
749            decoder_middleware,
750            FilterExpression::df_to_substrait(
751                Expr::BinaryExpr(BinaryExpr {
752                    left: Box::new(col("0")),
753                    op: Operator::Gt,
754                    right: Box::new(Expr::Literal(ScalarValue::Int32(Some(20000)))),
755                }),
756                written_file.schema.as_ref(),
757            )
758            .unwrap(),
759        )
760        .await;
761        assert_eq!(30 * 1024 - 20000, result);
762    }
763}