Skip to main content

buoyant_kernel/
engine_data.rs

1//! Traits that engines need to implement in order to pass data between themselves and kernel.
2
3use std::collections::HashMap;
4use std::ops::Range;
5
6use tracing::debug;
7
8use crate::actions::visitors::SelectionVectorVisitor;
9use crate::expressions::ArrayData;
10use crate::log_replay::HasSelectionVector;
11use crate::schema::{ColumnName, DataType, SchemaRef};
12use crate::utils::require;
13use crate::{AsAny, DeltaResult, Error};
14
15/// Engine data paired with a selection vector indicating which rows are logically selected.
16///
17/// A value of `true` in the selection vector means the corresponding row is selected (i.e., not
18/// deleted), while `false` means the row is logically deleted and should be ignored. If the
19/// selection vector is shorter than the number of rows in `data` then all rows not covered by the
20/// selection vector are assumed to be selected.
21///
22/// Interpreting unselected (`false`) rows will result in incorrect/undefined behavior.
23pub struct FilteredEngineData {
24    // The underlying engine data
25    data: Box<dyn EngineData>,
26    // The selection vector where `true` marks rows to include in results. N.B. this selection
27    // vector may be less then `data.len()` and any gaps represent rows that are assumed to be
28    // selected.
29    selection_vector: Vec<bool>,
30}
31
32impl FilteredEngineData {
33    pub fn try_new(data: Box<dyn EngineData>, selection_vector: Vec<bool>) -> DeltaResult<Self> {
34        if selection_vector.len() > data.len() {
35            return Err(Error::InvalidSelectionVector(format!(
36                "Selection vector is larger than data length: {} > {}",
37                selection_vector.len(),
38                data.len()
39            )));
40        }
41        Ok(Self {
42            data,
43            selection_vector,
44        })
45    }
46
47    /// Returns a reference to the underlying engine data.
48    pub fn data(&self) -> &dyn EngineData {
49        &*self.data
50    }
51
52    /// Returns a reference to the selection vector.
53    pub fn selection_vector(&self) -> &[bool] {
54        &self.selection_vector
55    }
56
57    /// Consumes the FilteredEngineData and returns the underlying data and selection vector.
58    pub fn into_parts(self) -> (Box<dyn EngineData>, Vec<bool>) {
59        (self.data, self.selection_vector)
60    }
61
62    /// Creates a new `FilteredEngineData` with all rows selected.
63    ///
64    /// This is a convenience method for the common case where you want to wrap
65    /// `EngineData` in `FilteredEngineData` without any filtering.
66    pub fn with_all_rows_selected(data: Box<dyn EngineData>) -> Self {
67        Self {
68            data,
69            selection_vector: vec![],
70        }
71    }
72
73    /// Apply the contained selection vector and return an engine data with only the valid rows
74    /// included. This consumes the `FilteredEngineData`
75    pub fn apply_selection_vector(self) -> DeltaResult<Box<dyn EngineData>> {
76        self.data
77            .apply_selection_vector(self.selection_vector.clone())
78    }
79}
80
81impl HasSelectionVector for FilteredEngineData {
82    /// Returns true if any row in the selection vector is marked as selected
83    fn has_selected_rows(&self) -> bool {
84        // Per contract if selection is not as long as data then at least one row is selected.
85        if self.selection_vector.len() < self.data.len() {
86            return true;
87        }
88
89        self.selection_vector.contains(&true)
90    }
91}
92
93impl From<Box<dyn EngineData>> for FilteredEngineData {
94    /// Converts `EngineData` into `FilteredEngineData` with all rows selected.
95    ///
96    /// This is a convenience conversion that wraps the provided engine data
97    /// in a `FilteredEngineData` with an empty selection vector, meaning all
98    /// rows are logically selected.
99    ///
100    /// # Example
101    /// ```rust,ignore
102    /// let engine_data: Box<dyn EngineData> = ...;
103    /// let filtered: FilteredEngineData = engine_data.into();
104    /// ```
105    fn from(data: Box<dyn EngineData>) -> Self {
106        Self::with_all_rows_selected(data)
107    }
108}
109
110/// Uniform read access to a string array, abstracting over the various string representations
111/// that list and map columns may use (e.g. Utf8, LargeUtf8, Utf8View). Engines implement this
112/// for their string array types so that [`ListItem`] and [`MapItem`] can resolve the concrete
113/// type once at construction and access elements via virtual dispatch thereafter.
114pub trait StringArrayAccessor {
115    /// Returns the number of elements in the array.
116    fn len(&self) -> usize;
117    /// Returns whether the array has no elements.
118    fn is_empty(&self) -> bool {
119        self.len() == 0
120    }
121    /// Returns the string value at the given index. The caller must ensure `index < len()`.
122    fn value(&self, index: usize) -> &str;
123    /// Returns whether the value at the given index is non-null.
124    fn is_valid(&self, index: usize) -> bool;
125}
126
127/// A pre-resolved view into a single row's list of strings. The string array type is resolved
128/// once at construction, so subsequent element accesses use virtual dispatch rather than
129/// repeated downcasting.
130pub struct ListItem<'a> {
131    values: &'a dyn StringArrayAccessor,
132    offsets: Range<usize>,
133}
134
135impl<'a> ListItem<'a> {
136    pub fn new(values: &'a dyn StringArrayAccessor, offsets: Range<usize>) -> ListItem<'a> {
137        ListItem { values, offsets }
138    }
139
140    pub fn len(&self) -> usize {
141        self.offsets.len()
142    }
143
144    pub fn is_empty(&self) -> bool {
145        self.offsets.is_empty()
146    }
147
148    pub fn get(&self, list_index: usize) -> String {
149        self.values
150            .value(self.offsets.start + list_index)
151            .to_string()
152    }
153
154    pub fn materialize(&self) -> Vec<String> {
155        self.offsets
156            .clone()
157            .map(|i| self.values.value(i).to_string())
158            .collect()
159    }
160}
161
162/// A pre-resolved view into a single row's map of string keys to string values. Like
163/// [`ListItem`], the string array types are resolved once at construction.
164///
165/// Note: in conjunction with the `allow_null_container_values` attribute, [`materialize`]
166/// _drops_ any (key, value) pairs where the underlying value was null. If preserving null
167/// values is important, use the `allow_null_container_values` attribute and manually
168/// materialize the map using [`MapItem::get`].
169///
170/// [`materialize`]: MapItem::materialize
171pub struct MapItem<'a> {
172    keys: &'a dyn StringArrayAccessor,
173    values: &'a dyn StringArrayAccessor,
174    offsets: Range<usize>,
175}
176
177impl<'a> MapItem<'a> {
178    pub fn new(
179        keys: &'a dyn StringArrayAccessor,
180        values: &'a dyn StringArrayAccessor,
181        offsets: Range<usize>,
182    ) -> MapItem<'a> {
183        MapItem {
184            keys,
185            values,
186            offsets,
187        }
188    }
189
190    pub fn get(&self, key: &str) -> Option<&'a str> {
191        let idx = self
192            .offsets
193            .clone()
194            .rev()
195            .find(|&idx| self.keys.value(idx) == key)?;
196        self.values.is_valid(idx).then(|| self.values.value(idx))
197    }
198
199    pub fn materialize(&self) -> HashMap<String, String> {
200        let mut ret = HashMap::with_capacity(self.offsets.len());
201        for idx in self.offsets.clone() {
202            if self.values.is_valid(idx) {
203                ret.insert(
204                    self.keys.value(idx).to_string(),
205                    self.values.value(idx).to_string(),
206                );
207            }
208        }
209        ret
210    }
211}
212
213macro_rules! impl_default_get {
214    ( $(($name: ident, $typ: ty)), * ) => {
215        $(
216            fn $name(&'a self, _row_index: usize, field_name: &str) -> DeltaResult<Option<$typ>> {
217                debug!("Asked for type {} on {field_name}, but using default error impl.", stringify!($typ));
218                Err(Error::UnexpectedColumnType(format!("{field_name} is not of type {}", stringify!($typ))).with_backtrace())
219            }
220        )*
221    };
222}
223
224/// When calling back into a [`RowVisitor`], the engine needs to provide a slice of items that
225/// implement this trait. This allows type_safe extraction from the raw data by the kernel. By
226/// default all these methods will return an `Error` that an incorrect type has been asked
227/// for. Therefore, for each "data container" an Engine has, it is only necessary to implement the
228/// `get_x` method for the type it holds.
229pub trait GetData<'a> {
230    impl_default_get!(
231        (get_bool, bool),
232        (get_byte, i8),
233        (get_short, i16),
234        (get_int, i32),
235        (get_long, i64),
236        (get_float, f32),
237        (get_double, f64),
238        (get_date, i32),
239        (get_timestamp, i64),
240        (get_decimal, i128),
241        (get_str, &'a str),
242        (get_binary, &'a [u8]),
243        (get_list, ListItem<'a>),
244        (get_map, MapItem<'a>)
245    );
246}
247
248macro_rules! impl_null_get {
249    ( $(($name: ident, $typ: ty)), * ) => {
250        $(
251            fn $name(&'a self, _row_index: usize, _field_name: &str) -> DeltaResult<Option<$typ>> {
252                Ok(None)
253            }
254        )*
255    };
256}
257
258impl<'a> GetData<'a> for () {
259    impl_null_get!(
260        (get_bool, bool),
261        (get_byte, i8),
262        (get_short, i16),
263        (get_int, i32),
264        (get_long, i64),
265        (get_float, f32),
266        (get_double, f64),
267        (get_date, i32),
268        (get_timestamp, i64),
269        (get_decimal, i128),
270        (get_str, &'a str),
271        (get_binary, &'a [u8]),
272        (get_list, ListItem<'a>),
273        (get_map, MapItem<'a>)
274    );
275}
276
277/// This is a convenience wrapper over `GetData` to allow code like: `let name: Option<String> =
278/// getters[1].get_opt(row_index, "metadata.name")?;`
279pub trait TypedGetData<'a, T> {
280    fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<T>>;
281    fn get(&'a self, row_index: usize, field_name: &str) -> DeltaResult<T> {
282        let val = self.get_opt(row_index, field_name)?;
283        val.ok_or_else(|| {
284            Error::MissingData(format!("Data missing for field {field_name}")).with_backtrace()
285        })
286    }
287}
288
289macro_rules! impl_typed_get_data {
290    ( $(($name: ident, $typ: ty)), * ) => {
291        $(
292            impl<'a> TypedGetData<'a, $typ> for dyn GetData<'a> +'_ {
293                fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<$typ>> {
294                    self.$name(row_index, field_name)
295                }
296            }
297        )*
298    };
299}
300
301// Note: get_date and get_timestamp are intentionally excluded because their return types (i32 and
302// i64) collide with get_int and get_long, which would produce conflicting TypedGetData impls.
303// Use get_date/get_timestamp directly instead of through TypedGetData.
304impl_typed_get_data!(
305    (get_bool, bool),
306    (get_byte, i8),
307    (get_short, i16),
308    (get_int, i32),
309    (get_long, i64),
310    (get_float, f32),
311    (get_double, f64),
312    (get_decimal, i128),
313    (get_str, &'a str),
314    (get_binary, &'a [u8]),
315    (get_list, ListItem<'a>),
316    (get_map, MapItem<'a>)
317);
318
319impl<'a> TypedGetData<'a, String> for dyn GetData<'a> + '_ {
320    fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<String>> {
321        self.get_str(row_index, field_name)
322            .map(|s| s.map(|s| s.to_string()))
323    }
324}
325
326/// Provide an impl to get a list field as a `Vec<String>`. Note that this will allocate the vector
327/// and allocate for each string entry.
328impl<'a> TypedGetData<'a, Vec<String>> for dyn GetData<'a> + '_ {
329    fn get_opt(&'a self, row_index: usize, field_name: &str) -> DeltaResult<Option<Vec<String>>> {
330        let list_opt: Option<ListItem<'_>> = self.get_opt(row_index, field_name)?;
331        Ok(list_opt.map(|list| list.materialize()))
332    }
333}
334
335/// Provide an impl to get a map field as a `HashMap<String, String>`. Note that this will
336/// allocate the map and allocate for each entry
337impl<'a> TypedGetData<'a, HashMap<String, String>> for dyn GetData<'a> + '_ {
338    fn get_opt(
339        &'a self,
340        row_index: usize,
341        field_name: &str,
342    ) -> DeltaResult<Option<HashMap<String, String>>> {
343        let map_opt: Option<MapItem<'_>> = self.get_opt(row_index, field_name)?;
344        Ok(map_opt.map(|map| map.materialize()))
345    }
346}
347
348/// An iterator over the indices of selected rows in an engine-data batch.
349///
350/// Each call to [`Iterator::next`] returns the index of the next selected row.
351///
352/// Constructed internally and passed (alongside the column getters) to
353/// [`FilteredRowVisitor::visit_filtered`].
354pub struct RowIndexIterator<'sv> {
355    sv_pos: usize,
356    selection_vector: &'sv [bool],
357    row_count: usize,
358}
359
360impl<'sv> RowIndexIterator<'sv> {
361    pub(crate) fn new(row_count: usize, selection_vector: &'sv [bool]) -> Self {
362        Self {
363            sv_pos: 0,
364            selection_vector,
365            row_count,
366        }
367    }
368
369    /// Returns the total number of rows in the batch (selected and deselected).
370    pub fn num_rows(&self) -> usize {
371        self.row_count
372    }
373}
374
375impl<'sv> Iterator for RowIndexIterator<'sv> {
376    type Item = usize;
377
378    fn next(&mut self) -> Option<usize> {
379        while self.sv_pos < self.row_count {
380            let pos = self.sv_pos;
381            self.sv_pos += 1;
382            if pos >= self.selection_vector.len() || self.selection_vector[pos] {
383                return Some(pos);
384            }
385        }
386        None
387    }
388}
389
390/// A visitor that processes [`FilteredEngineData`] with automatic row filtering.
391///
392/// Implementors provide [`visit_filtered`] which receives the column getters and a
393/// [`RowIndexIterator`] that yields the index of each selected row.
394/// The default [`visit_rows_of`] method handles all the plumbing: extracting the selection
395/// vector, building the bridge, and calling [`EngineData::visit_rows`].
396///
397/// [`visit_filtered`]: FilteredRowVisitor::visit_filtered
398/// [`visit_rows_of`]: FilteredRowVisitor::visit_rows_of
399pub trait FilteredRowVisitor {
400    fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]);
401
402    /// Process this batch. `getters` contains one [`GetData`] item per requested column.
403    /// Iterate `rows` to receive the index of each selected row. Use
404    /// [`RowIndexIterator::num_rows`] to get the total row count (for padding output
405    /// vectors with null values for deselected rows).
406    fn visit_filtered<'a>(
407        &mut self,
408        getters: &[&'a dyn GetData<'a>],
409        rows: RowIndexIterator<'_>,
410    ) -> DeltaResult<()>;
411
412    /// Visit the rows of a [`FilteredEngineData`], automatically respecting the selection vector.
413    ///
414    /// Extracts the selection vector and passes a [`RowIndexIterator`] of selected row indices
415    /// to [`FilteredRowVisitor::visit_filtered`].
416    fn visit_rows_of(&mut self, data: &FilteredEngineData) -> DeltaResult<()>
417    where
418        Self: Sized,
419    {
420        // column_names is 'static so this borrow ends immediately, before bridge borrows self
421        let column_names = self.selected_column_names_and_types().0;
422        let mut bridge = FilteredVisitorBridge {
423            visitor: self,
424            selection_vector: data.selection_vector(),
425        };
426        data.data().visit_rows(column_names, &mut bridge)
427    }
428}
429
430/// Private bridge that implements [`RowVisitor`] and forwards to a [`FilteredRowVisitor`].
431struct FilteredVisitorBridge<'bridge, V: FilteredRowVisitor> {
432    visitor: &'bridge mut V,
433    selection_vector: &'bridge [bool],
434}
435
436impl<V: FilteredRowVisitor> RowVisitor for FilteredVisitorBridge<'_, V> {
437    fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
438        self.visitor.selected_column_names_and_types()
439    }
440
441    fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
442        let rows = RowIndexIterator::new(row_count, self.selection_vector);
443        self.visitor.visit_filtered(getters, rows)
444    }
445}
446
447/// A `RowVisitor` can be called back to visit extracted data. Aside from calling
448/// [`RowVisitor::visit`] on the visitor passed to [`EngineData::visit_rows`], engines do
449/// not need to worry about this trait.
450pub trait RowVisitor {
451    /// The names and types of leaf fields this visitor accesses. The `EngineData` being visited
452    /// validates these types when extracting column getters, and [`RowVisitor::visit`] will receive
453    /// one getter for each selected field, in the requested order. The column names are used by
454    /// [`RowVisitor::visit_rows_of`] to select fields from a "typical" `EngineData`; callers whose
455    /// engine data has different column names can manually invoke [`EngineData::visit_rows`].
456    fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]);
457
458    /// Have the visitor visit the data. This will be called on a visitor passed to
459    /// [`EngineData::visit_rows`]. For each leaf in the schema that was passed to `extract` a
460    /// "getter" of type [`GetData`] will be present. This can be used to actually get at the data
461    /// for each row. You can `use` the `TypedGetData` trait if you want to have a way to extract
462    /// typed data that will fail if the "getter" is for an unexpected type.  The data in `getters`
463    /// does not outlive the call to this function (i.e. it should be copied if needed).
464    fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()>;
465
466    /// Visit the rows of an [`EngineData`], selecting the leaf column names given by
467    /// [`RowVisitor::selected_column_names_and_types`]. This is a thin wrapper around
468    /// [`EngineData::visit_rows`] which in turn will eventually invoke [`RowVisitor::visit`].
469    fn visit_rows_of(&mut self, data: &dyn EngineData) -> DeltaResult<()>
470    where
471        Self: Sized,
472    {
473        data.visit_rows(self.selected_column_names_and_types().0, self)
474    }
475}
476
477/// Any type that an engine wants to return as "data" needs to implement this trait. The bulk of the
478/// work is in the [`EngineData::visit_rows`] method. See the docs for that method for more details.
479/// ```rust
480/// # use std::any::Any;
481/// # use buoyant_kernel as delta_kernel;
482/// # use delta_kernel::DeltaResult;
483/// # use delta_kernel::engine_data::{RowVisitor, EngineData, GetData};
484/// # use delta_kernel::expressions::{ArrayData, ColumnName};
485/// # use delta_kernel::schema::SchemaRef;
486/// struct MyDataType; // Whatever the engine wants here
487/// impl MyDataType {
488///   fn do_extraction<'a>(&self) -> Vec<&'a dyn GetData<'a>> {
489///      /// Actually do the extraction into getters
490///      todo!()
491///   }
492/// }
493///
494/// impl EngineData for MyDataType {
495///   fn visit_rows(&self, leaf_columns: &[ColumnName], visitor: &mut dyn RowVisitor) -> DeltaResult<()> {
496///     let getters = self.do_extraction(); // do the extraction
497///     visitor.visit(self.len(), &getters); // call the visitor back with the getters
498///     Ok(())
499///   }
500///   fn len(&self) -> usize {
501///     todo!() // actually get the len here
502///   }
503///   fn append_columns(&self, schema: SchemaRef, columns: Vec<ArrayData>) -> DeltaResult<Box<dyn EngineData>> {
504///     todo!() // convert `SchemaRef` and `ArrayData` into local representation and append them
505///   }
506///   fn apply_selection_vector(self: Box<Self>, selection_vector: Vec<bool>) -> DeltaResult<Box<dyn EngineData>> {
507///     todo!() // filter out unselected rows and return the new set of data
508///   }
509///   fn has_field(&self, name: &ColumnName) -> bool {
510///     todo!() // determine whether the field exists in the data
511///   }
512/// }
513/// ```
514pub trait EngineData: AsAny {
515    /// Visits a subset of leaf columns in each row of this data, passing a `GetData` item for each
516    /// requested column to the visitor's `visit` method (along with the number of rows of data to
517    /// be visited).
518    fn visit_rows(
519        &self,
520        column_names: &[ColumnName],
521        visitor: &mut dyn RowVisitor,
522    ) -> DeltaResult<()>;
523
524    /// Return the number of items (rows) in blob
525    fn len(&self) -> usize;
526
527    /// Returns true if the data is empty (i.e., has no rows).
528    fn is_empty(&self) -> bool {
529        self.len() == 0
530    }
531
532    /// Append new columns provided by Kernel to the existing data.
533    ///
534    /// This method creates a new [`EngineData`] instance that combines the existing columns
535    /// with the provided new columns. The original data remains unchanged.
536    ///
537    /// # Parameters
538    /// - `schema`: The schema of the columns being appended (not the entire resulting schema). This
539    ///   schema must describe exactly the columns being added in the `columns` parameter.
540    /// - `columns`: The column data to append. Each [`ArrayData`] corresponds to one field in the
541    ///   schema.
542    ///
543    /// # Returns
544    /// A new `EngineData` instance containing both the original columns and the appended columns.
545    /// The schema of the result will contain all original fields followed by the new schema fields.
546    ///
547    /// # Errors
548    /// Returns an error if:
549    /// - The number of rows in any appended column doesn't match the existing data.
550    /// - The number of new columns doesn't match the number of schema fields.
551    /// - Data type conversion to the engine's native data types fails.
552    /// - The engine cannot create the combined data structure.
553    fn append_columns(
554        &self,
555        schema: SchemaRef,
556        columns: Vec<ArrayData>,
557    ) -> DeltaResult<Box<dyn EngineData>>;
558
559    /// Apply a selection vector to the data and return a data where only the valid rows are
560    /// included. This consumes the EngineData, allowing engines to implement this "in place" if
561    /// desired
562    fn apply_selection_vector(
563        self: Box<Self>,
564        selection_vector: Vec<bool>,
565    ) -> DeltaResult<Box<dyn EngineData>>;
566
567    /// Returns `true` if a field at the given (possibly nested) path exists in this data's schema.
568    ///
569    /// For a top-level field named `"foo"`, use `ColumnName::new(["foo"])`. For nested fields,
570    /// each non-leaf element of the path must be a struct field at that level.
571    fn has_field(&self, name: &ColumnName) -> bool;
572}
573
574/// Evaluates a predicate on the batch, extracts the resulting selection vector, and applies
575/// it to filter out rows that don't satisfy the predicate.
576pub(crate) fn filter_by_predicate(
577    filter: &dyn crate::PredicateEvaluator,
578    batch: Box<dyn EngineData>,
579) -> DeltaResult<Box<dyn EngineData>> {
580    let predicate_result = filter.evaluate(batch.as_ref())?;
581    let mut visitor = SelectionVectorVisitor::default();
582    visitor.visit_rows_of(predicate_result.as_ref())?;
583    require!(
584        visitor.selection_vector.len() == batch.len(),
585        Error::internal_error(format!(
586            "predicate output length {} != batch length {}",
587            visitor.selection_vector.len(),
588            batch.len()
589        ))
590    );
591    batch.apply_selection_vector(visitor.selection_vector)
592}
593
594#[cfg(test)]
595mod tests {
596    use std::sync::Arc;
597
598    use rstest::rstest;
599
600    use super::*;
601    use crate::arrow::array::{RecordBatch, StringArray};
602    use crate::arrow::datatypes::{
603        DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
604    };
605    use crate::engine::arrow_data::ArrowEngineData;
606
607    fn get_engine_data(rows: usize) -> Box<dyn EngineData> {
608        let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
609            "value",
610            ArrowDataType::Utf8,
611            true,
612        )]));
613        let data: Vec<String> = (0..rows).map(|i| format!("row{i}")).collect();
614        Box::new(ArrowEngineData::new(
615            RecordBatch::try_new(schema, vec![Arc::new(StringArray::from(data))]).unwrap(),
616        ))
617    }
618
619    #[test]
620    fn test_with_all_rows_selected_empty_data() {
621        // Test with empty data
622        let data = get_engine_data(0);
623        let filtered_data = FilteredEngineData::with_all_rows_selected(data);
624
625        assert_eq!(filtered_data.selection_vector().len(), 0);
626        assert!(filtered_data.selection_vector().is_empty());
627        assert_eq!(filtered_data.data().len(), 0);
628    }
629
630    #[test]
631    fn test_with_all_rows_selected_single_row() {
632        // Test with single row
633        let data = get_engine_data(1);
634        let filtered_data = FilteredEngineData::with_all_rows_selected(data);
635
636        // According to the new contract, empty selection vector means all rows are selected
637        assert!(filtered_data.selection_vector().is_empty());
638        assert_eq!(filtered_data.data().len(), 1);
639        assert!(filtered_data.has_selected_rows());
640    }
641
642    #[test]
643    fn test_with_all_rows_selected_multiple_rows() {
644        // Test with multiple rows
645        let data = get_engine_data(4);
646        let filtered_data = FilteredEngineData::with_all_rows_selected(data);
647
648        // According to the new contract, empty selection vector means all rows are selected
649        assert!(filtered_data.selection_vector().is_empty());
650        assert_eq!(filtered_data.data().len(), 4);
651        assert!(filtered_data.has_selected_rows());
652    }
653
654    #[test]
655    fn test_has_selected_rows_empty_data() {
656        // Test with empty data
657        let data = get_engine_data(0);
658        let filtered_data = FilteredEngineData::try_new(data, vec![]).unwrap();
659
660        // Empty data should return false even with empty selection vector
661        assert!(!filtered_data.has_selected_rows());
662    }
663
664    #[test]
665    fn test_has_selected_rows_selection_vector_shorter_than_data() {
666        // Test with selection vector shorter than data length
667        let data = get_engine_data(3);
668        // Selection vector with only 2 elements for 3 rows of data
669        let filtered_data = FilteredEngineData::try_new(data, vec![false, false]).unwrap();
670
671        // Should return true because selection vector is shorter than data
672        assert!(filtered_data.has_selected_rows());
673    }
674
675    #[test]
676    fn test_has_selected_rows_selection_vector_same_length_all_false() {
677        let data = get_engine_data(2);
678        let filtered_data = FilteredEngineData::try_new(data, vec![false, false]).unwrap();
679
680        // Should return false because no rows are selected
681        assert!(!filtered_data.has_selected_rows());
682    }
683
684    #[test]
685    fn test_has_selected_rows_selection_vector_same_length_some_true() {
686        let data = get_engine_data(3);
687        let filtered_data = FilteredEngineData::try_new(data, vec![true, false, true]).unwrap();
688
689        // Should return true because some rows are selected
690        assert!(filtered_data.has_selected_rows());
691    }
692
693    #[test]
694    fn test_try_new_selection_vector_larger_than_data() {
695        // Test with selection vector larger than data length - should return error
696        let data = get_engine_data(2);
697        // Selection vector with 3 elements for 2 rows of data - should fail
698        let result = FilteredEngineData::try_new(data, vec![true, false, true]);
699
700        // Should return an error
701        assert!(result.is_err());
702        if let Err(e) = result {
703            assert!(e
704                .to_string()
705                .contains("Selection vector is larger than data length"));
706            assert!(e.to_string().contains("3 > 2"));
707        }
708    }
709
710    #[test]
711    fn test_get_binary_some_value() {
712        use crate::arrow::array::BinaryArray;
713
714        // Use Arrow's BinaryArray implementation
715        let binary_data: Vec<Option<&[u8]>> = vec![Some(b"hello"), Some(b"world"), None];
716        let binary_array = BinaryArray::from(binary_data);
717
718        // Cast to dyn GetData to use TypedGetData trait
719        let getter: &dyn GetData<'_> = &binary_array;
720
721        // Test getting first row
722        let result: Option<&[u8]> = getter.get_opt(0, "binary_field").unwrap();
723        assert_eq!(result, Some(b"hello".as_ref()));
724
725        // Test getting second row
726        let result: Option<&[u8]> = getter.get_opt(1, "binary_field").unwrap();
727        assert_eq!(result, Some(b"world".as_ref()));
728
729        // Test getting None value
730        let result: Option<&[u8]> = getter.get_opt(2, "binary_field").unwrap();
731        assert_eq!(result, None);
732    }
733
734    #[test]
735    fn test_get_binary_required() {
736        use crate::arrow::array::BinaryArray;
737
738        let binary_data: Vec<Option<&[u8]>> = vec![Some(b"hello")];
739        let binary_array = BinaryArray::from(binary_data);
740
741        // Cast to dyn GetData to use TypedGetData trait
742        let getter: &dyn GetData<'_> = &binary_array;
743
744        // Test using get() for required field
745        let result: &[u8] = getter.get(0, "binary_field").unwrap();
746        assert_eq!(result, b"hello");
747    }
748
749    #[test]
750    fn test_get_binary_required_missing() {
751        use crate::arrow::array::BinaryArray;
752
753        let binary_data: Vec<Option<&[u8]>> = vec![None];
754        let binary_array = BinaryArray::from(binary_data);
755
756        // Cast to dyn GetData to use TypedGetData trait
757        let getter: &dyn GetData<'_> = &binary_array;
758
759        // Test using get() for missing required field should error
760        let result: DeltaResult<&[u8]> = getter.get(0, "binary_field");
761        assert!(result.is_err());
762        if let Err(e) = result {
763            assert!(e.to_string().contains("Data missing for field"));
764        }
765    }
766
767    #[test]
768    fn test_get_binary_empty_bytes() {
769        use crate::arrow::array::BinaryArray;
770
771        let binary_data: Vec<Option<&[u8]>> = vec![Some(b"")];
772        let binary_array = BinaryArray::from(binary_data);
773
774        // Cast to dyn GetData to use TypedGetData trait
775        let getter: &dyn GetData<'_> = &binary_array;
776
777        // Test getting empty bytes
778        let result: Option<&[u8]> = getter.get_opt(0, "binary_field").unwrap();
779        assert_eq!(result, Some([].as_ref()));
780        assert_eq!(result.unwrap().len(), 0);
781    }
782
783    #[test]
784    fn test_from_engine_data() {
785        let data = get_engine_data(3);
786        let data_len = data.len(); // Save length before move
787
788        // Use the From trait to convert
789        let filtered_data: FilteredEngineData = data.into();
790
791        // Verify all rows are selected (empty selection vector)
792        assert!(filtered_data.selection_vector().is_empty());
793        assert_eq!(filtered_data.data().len(), data_len);
794        assert_eq!(filtered_data.data().len(), 3);
795        assert!(filtered_data.has_selected_rows());
796    }
797
798    #[test]
799    fn filtered_apply_seclection_vector_full() {
800        let data = get_engine_data(4);
801        let filtered = FilteredEngineData::try_new(data, vec![true, false, true, false]).unwrap();
802        let data = filtered.apply_selection_vector().unwrap();
803        assert_eq!(data.len(), 2);
804    }
805
806    #[test]
807    fn filtered_apply_seclection_vector_partial() {
808        let data = get_engine_data(4);
809        let filtered = FilteredEngineData::try_new(data, vec![true, false]).unwrap();
810        let data = filtered.apply_selection_vector().unwrap();
811        assert_eq!(data.len(), 3);
812    }
813
814    fn collect_indices(row_count: usize, selection: &[bool]) -> Vec<usize> {
815        RowIndexIterator::new(row_count, selection).collect()
816    }
817
818    #[rstest]
819    #[case(0, &[], vec![])]
820    #[case(3, &[], vec![0, 1, 2])]
821    #[case(3, &[true, true, true], vec![0, 1, 2])]
822    #[case(3, &[false, false, false], vec![])]
823    #[case(5, &[true, false, false, true, true], vec![0, 3, 4])]
824    #[case(4, &[false, false, true, true], vec![2, 3])]
825    #[case(3, &[true, false, false], vec![0])]
826    // sv shorter than row_count: tail rows implicitly selected
827    #[case(4, &[false, true], vec![1, 2, 3])]
828    #[case(4, &[true, false], vec![0, 2, 3])]
829    #[case(4, &[false, true, false, true], vec![1, 3])]
830    fn row_index_iter(
831        #[case] row_count: usize,
832        #[case] selection: &[bool],
833        #[case] expected: Vec<usize>,
834    ) {
835        assert_eq!(collect_indices(row_count, selection), expected);
836    }
837}