deltalake_core/protocol/
mod.rs

1//! Actions included in Delta table transaction logs
2
3#![allow(non_camel_case_types)]
4
5use std::borrow::Borrow;
6use std::collections::HashMap;
7use std::hash::{Hash, Hasher};
8use std::mem::take;
9use std::str::FromStr;
10use std::sync::LazyLock;
11
12use arrow_schema::ArrowError;
13use futures::StreamExt;
14use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
15use regex::Regex;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use tracing::{debug, error};
19
20use crate::errors::{DeltaResult, DeltaTableError};
21use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField, TableFeatures};
22use crate::logstore::LogStore;
23use crate::table::CheckPoint;
24
25pub mod checkpoints;
26mod parquet_read;
27mod time_utils;
28
29/// Error returned when an invalid Delta log action is encountered.
30#[allow(missing_docs)]
31#[derive(thiserror::Error, Debug)]
32pub enum ProtocolError {
33    #[error("Table state does not contain metadata")]
34    NoMetaData,
35
36    #[error("Checkpoint file not found")]
37    CheckpointNotFound,
38
39    #[error("End of transaction log")]
40    EndOfLog,
41
42    /// The action contains an invalid field.
43    #[error("Invalid action field: {0}")]
44    InvalidField(String),
45
46    /// A parquet log checkpoint file contains an invalid action.
47    #[error("Invalid action in parquet row: {0}")]
48    InvalidRow(String),
49
50    /// A transaction log contains invalid deletion vector storage type
51    #[error("Invalid deletion vector storage type: {0}")]
52    InvalidDeletionVectorStorageType(String),
53
54    /// A generic action error. The wrapped error string describes the details.
55    #[error("Generic action error: {0}")]
56    Generic(String),
57
58    /// Error returned when parsing checkpoint parquet using the parquet crate.
59    #[error("Failed to parse parquet checkpoint: {source}")]
60    ParquetParseError {
61        /// Parquet error details returned when parsing the checkpoint parquet
62        #[from]
63        source: parquet::errors::ParquetError,
64    },
65
66    /// Failed to serialize operation
67    #[error("Failed to serialize operation: {source}")]
68    SerializeOperation {
69        #[from]
70        /// The source error
71        source: serde_json::Error,
72    },
73
74    /// Error returned when converting the schema to Arrow format failed.
75    #[error("Failed to convert into Arrow schema: {}", .source)]
76    Arrow {
77        /// Arrow error details returned when converting the schema in Arrow format failed
78        #[from]
79        source: ArrowError,
80    },
81
82    /// Passthrough error returned when calling ObjectStore.
83    #[error("ObjectStoreError: {source}")]
84    ObjectStore {
85        /// The source ObjectStoreError.
86        #[from]
87        source: ObjectStoreError,
88    },
89
90    #[error("Io: {source}")]
91    IO {
92        #[from]
93        source: std::io::Error,
94    },
95
96    #[error("Kernel: {source}")]
97    Kernel {
98        #[from]
99        source: crate::kernel::Error,
100    },
101}
102
103/// Struct used to represent minValues and maxValues in add action statistics.
104#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
105#[serde(untagged)]
106pub enum ColumnValueStat {
107    /// Composite HashMap representation of statistics.
108    Column(HashMap<String, ColumnValueStat>),
109    /// Json representation of statistics.
110    Value(Value),
111}
112
113impl ColumnValueStat {
114    /// Returns the HashMap representation of the ColumnValueStat.
115    pub fn as_column(&self) -> Option<&HashMap<String, ColumnValueStat>> {
116        match self {
117            ColumnValueStat::Column(m) => Some(m),
118            _ => None,
119        }
120    }
121
122    /// Returns the serde_json representation of the ColumnValueStat.
123    pub fn as_value(&self) -> Option<&Value> {
124        match self {
125            ColumnValueStat::Value(v) => Some(v),
126            _ => None,
127        }
128    }
129}
130
131/// Struct used to represent nullCount in add action statistics.
132#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
133#[serde(untagged)]
134pub enum ColumnCountStat {
135    /// Composite HashMap representation of statistics.
136    Column(HashMap<String, ColumnCountStat>),
137    /// Json representation of statistics.
138    Value(i64),
139}
140
141impl ColumnCountStat {
142    /// Returns the HashMap representation of the ColumnCountStat.
143    pub fn as_column(&self) -> Option<&HashMap<String, ColumnCountStat>> {
144        match self {
145            ColumnCountStat::Column(m) => Some(m),
146            _ => None,
147        }
148    }
149
150    /// Returns the serde_json representation of the ColumnCountStat.
151    pub fn as_value(&self) -> Option<i64> {
152        match self {
153            ColumnCountStat::Value(v) => Some(*v),
154            _ => None,
155        }
156    }
157}
158
159/// Statistics associated with Add actions contained in the Delta log.
160#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
161#[serde(rename_all = "camelCase")]
162pub struct Stats {
163    /// Number of records in the file associated with the log action.
164    pub num_records: i64,
165
166    // start of per column stats
167    /// Contains a value smaller than all values present in the file for all columns.
168    pub min_values: HashMap<String, ColumnValueStat>,
169    /// Contains a value larger than all values present in the file for all columns.
170    pub max_values: HashMap<String, ColumnValueStat>,
171    /// The number of null values for all columns.
172    pub null_count: HashMap<String, ColumnCountStat>,
173}
174
175/// Statistics associated with Add actions contained in the Delta log.
176/// min_values, max_values and null_count are optional to allow them to be missing
177#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
178#[serde(rename_all = "camelCase")]
179struct PartialStats {
180    /// Number of records in the file associated with the log action.
181    pub num_records: i64,
182
183    // start of per column stats
184    /// Contains a value smaller than all values present in the file for all columns.
185    pub min_values: Option<HashMap<String, ColumnValueStat>>,
186    /// Contains a value larger than all values present in the file for all columns.
187    pub max_values: Option<HashMap<String, ColumnValueStat>>,
188    /// The number of null values for all columns.
189    pub null_count: Option<HashMap<String, ColumnCountStat>>,
190}
191
192impl PartialStats {
193    /// Fills in missing HashMaps
194    pub fn as_stats(&mut self) -> Stats {
195        let min_values = take(&mut self.min_values);
196        let max_values = take(&mut self.max_values);
197        let null_count = take(&mut self.null_count);
198        Stats {
199            num_records: self.num_records,
200            min_values: min_values.unwrap_or_default(),
201            max_values: max_values.unwrap_or_default(),
202            null_count: null_count.unwrap_or_default(),
203        }
204    }
205}
206
207/// File stats parsed from raw parquet format.
208#[derive(Debug, Default)]
209pub struct StatsParsed {
210    /// Number of records in the file associated with the log action.
211    pub num_records: i64,
212
213    // start of per column stats
214    /// Contains a value smaller than all values present in the file for all columns.
215    pub min_values: HashMap<String, parquet::record::Field>,
216    /// Contains a value larger than all values present in the file for all columns.
217    /// Contains a value larger than all values present in the file for all columns.
218    pub max_values: HashMap<String, parquet::record::Field>,
219    /// The number of null values for all columns.
220    pub null_count: HashMap<String, i64>,
221}
222
223impl Hash for Add {
224    fn hash<H: Hasher>(&self, state: &mut H) {
225        self.path.hash(state);
226    }
227}
228
229impl PartialEq for Add {
230    fn eq(&self, other: &Self) -> bool {
231        self.path == other.path
232            && self.size == other.size
233            && self.partition_values == other.partition_values
234            && self.modification_time == other.modification_time
235            && self.data_change == other.data_change
236            && self.stats == other.stats
237            && self.tags == other.tags
238            && self.deletion_vector == other.deletion_vector
239    }
240}
241
242impl Eq for Add {}
243
244impl Add {
245    /// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats.
246    pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
247        match self.get_stats_parsed() {
248            Ok(Some(stats)) => Ok(Some(stats)),
249            Ok(None) => self.get_json_stats(),
250            Err(e) => {
251                error!(
252                    "Error when reading parquet stats {:?} {e}. Attempting to read json stats",
253                    self.stats_parsed
254                );
255                self.get_json_stats()
256            }
257        }
258    }
259
260    /// Returns the serde_json representation of stats contained in the action if present.
261    /// Since stats are defined as optional in the protocol, this may be None.
262    pub fn get_json_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
263        self.stats
264            .as_ref()
265            .map(|stats| serde_json::from_str(stats).map(|mut ps: PartialStats| ps.as_stats()))
266            .transpose()
267    }
268}
269
270impl Hash for Remove {
271    fn hash<H: Hasher>(&self, state: &mut H) {
272        self.path.hash(state);
273    }
274}
275
276/// Borrow `Remove` as str so we can look at tombstones hashset in `DeltaTableState` by using
277/// a path from action `Add`.
278impl Borrow<str> for Remove {
279    fn borrow(&self) -> &str {
280        self.path.as_ref()
281    }
282}
283
284impl PartialEq for Remove {
285    fn eq(&self, other: &Self) -> bool {
286        self.path == other.path
287            && self.deletion_timestamp == other.deletion_timestamp
288            && self.data_change == other.data_change
289            && self.extended_file_metadata == other.extended_file_metadata
290            && self.partition_values == other.partition_values
291            && self.size == other.size
292            && self.tags == other.tags
293            && self.deletion_vector == other.deletion_vector
294    }
295}
296
297#[allow(clippy::large_enum_variant)]
298#[derive(Serialize, Deserialize, Debug, Clone)]
299#[serde(rename_all = "camelCase")]
300/// Used to record the operations performed to the Delta Log
301pub struct MergePredicate {
302    /// The type of merge operation performed
303    pub action_type: String,
304    /// The predicate used for the merge operation
305    #[serde(skip_serializing_if = "Option::is_none")]
306    pub predicate: Option<String>,
307}
308
309/// Operation performed when creating a new log entry with one or more actions.
310/// This is a key element of the `CommitInfo` action.
311#[allow(clippy::large_enum_variant)]
312#[derive(Serialize, Deserialize, Debug, Clone)]
313#[serde(rename_all = "camelCase")]
314pub enum DeltaOperation {
315    /// Represents a Delta `Add Column` operation.
316    /// Used to add new columns or field in a struct
317    AddColumn {
318        /// Fields added to existing schema
319        fields: Vec<StructField>,
320    },
321
322    /// Represents a Delta `Create` operation.
323    /// Would usually only create the table, if also data is written,
324    /// a `Write` operations is more appropriate
325    Create {
326        /// The save mode used during the create.
327        mode: SaveMode,
328        /// The storage location of the new table
329        location: String,
330        /// The min reader and writer protocol versions of the table
331        protocol: Protocol,
332        /// Metadata associated with the new table
333        metadata: Metadata,
334    },
335
336    /// Represents a Delta `Write` operation.
337    /// Write operations will typically only include `Add` actions.
338    #[serde(rename_all = "camelCase")]
339    Write {
340        /// The save mode used during the write.
341        mode: SaveMode,
342        /// The columns the write is partitioned by.
343        partition_by: Option<Vec<String>>,
344        /// The predicate used during the write.
345        predicate: Option<String>,
346    },
347
348    /// Delete data matching predicate from delta table
349    Delete {
350        /// The condition the to be deleted data must match
351        predicate: Option<String>,
352    },
353
354    /// Update data matching predicate from delta table
355    Update {
356        /// The update predicate
357        predicate: Option<String>,
358    },
359    /// Add constraints to a table
360    AddConstraint {
361        /// Constraints name
362        name: String,
363        /// Expression to check against
364        expr: String,
365    },
366
367    /// Add table features to a table
368    AddFeature {
369        /// Name of the feature
370        name: Vec<TableFeatures>,
371    },
372
373    /// Drops constraints from a table
374    DropConstraint {
375        /// Constraints name
376        name: String,
377    },
378
379    /// Merge data with a source data with the following predicate
380    #[serde(rename_all = "camelCase")]
381    Merge {
382        /// Cleaned merge predicate for conflict checks
383        predicate: Option<String>,
384
385        /// The original merge predicate
386        merge_predicate: Option<String>,
387
388        /// Match operations performed
389        matched_predicates: Vec<MergePredicate>,
390
391        /// Not Match operations performed
392        not_matched_predicates: Vec<MergePredicate>,
393
394        /// Not Match by Source operations performed
395        not_matched_by_source_predicates: Vec<MergePredicate>,
396    },
397
398    /// Represents a Delta `StreamingUpdate` operation.
399    #[serde(rename_all = "camelCase")]
400    StreamingUpdate {
401        /// The output mode the streaming writer is using.
402        output_mode: OutputMode,
403        /// The query id of the streaming writer.
404        query_id: String,
405        /// The epoch id of the written micro-batch.
406        epoch_id: i64,
407    },
408
409    /// Set table properties operations
410    #[serde(rename_all = "camelCase")]
411    SetTableProperties {
412        /// Table properties that were added
413        properties: HashMap<String, String>,
414    },
415
416    #[serde(rename_all = "camelCase")]
417    /// Represents a `Optimize` operation
418    Optimize {
419        // TODO: Create a string representation of the filter passed to optimize
420        /// The filter used to determine which partitions to filter
421        predicate: Option<String>,
422        /// Target optimize size
423        target_size: i64,
424    },
425    #[serde(rename_all = "camelCase")]
426    /// Represents a `FileSystemCheck` operation
427    FileSystemCheck {},
428
429    /// Represents a `Restore` operation
430    Restore {
431        /// Version to restore
432        version: Option<i64>,
433        ///Datetime to restore
434        datetime: Option<i64>,
435    }, // TODO: Add more operations
436
437    #[serde(rename_all = "camelCase")]
438    /// Represents the start of `Vacuum` operation
439    VacuumStart {
440        /// Whether the retention check is enforced
441        retention_check_enabled: bool,
442        /// The specified retetion period in milliseconds
443        specified_retention_millis: Option<i64>,
444        /// The default delta table retention milliseconds policy
445        default_retention_millis: i64,
446    },
447
448    /// Represents the end of `Vacuum` operation
449    VacuumEnd {
450        /// The status of the operation
451        status: String,
452    },
453    /// Set table field metadata operations
454    #[serde(rename_all = "camelCase")]
455    UpdateFieldMetadata {
456        /// Fields added to existing schema
457        fields: Vec<StructField>,
458    },
459}
460
461impl DeltaOperation {
462    /// A human readable name for the operation
463    pub fn name(&self) -> &str {
464        // operation names taken from https://learn.microsoft.com/en-us/azure/databricks/delta/history#--operation-metrics-keys
465        match &self {
466            DeltaOperation::AddColumn { .. } => "ADD COLUMN",
467            DeltaOperation::Create {
468                mode: SaveMode::Overwrite,
469                ..
470            } => "CREATE OR REPLACE TABLE",
471            DeltaOperation::Create { .. } => "CREATE TABLE",
472            DeltaOperation::Write { .. } => "WRITE",
473            DeltaOperation::Delete { .. } => "DELETE",
474            DeltaOperation::Update { .. } => "UPDATE",
475            DeltaOperation::Merge { .. } => "MERGE",
476            DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
477            DeltaOperation::SetTableProperties { .. } => "SET TBLPROPERTIES",
478            DeltaOperation::Optimize { .. } => "OPTIMIZE",
479            DeltaOperation::FileSystemCheck { .. } => "FSCK",
480            DeltaOperation::Restore { .. } => "RESTORE",
481            DeltaOperation::VacuumStart { .. } => "VACUUM START",
482            DeltaOperation::VacuumEnd { .. } => "VACUUM END",
483            DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT",
484            DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
485            DeltaOperation::AddFeature { .. } => "ADD FEATURE",
486            DeltaOperation::UpdateFieldMetadata { .. } => "UPDATE FIELD METADATA",
487        }
488    }
489
490    /// Parameters configured for operation.
491    pub fn operation_parameters(&self) -> DeltaResult<HashMap<String, Value>> {
492        if let Some(Some(Some(map))) = serde_json::to_value(self)
493            .map_err(|err| ProtocolError::SerializeOperation { source: err })?
494            .as_object()
495            .map(|p| p.values().next().map(|q| q.as_object()))
496        {
497            Ok(map
498                .iter()
499                .filter(|item| !item.1.is_null())
500                .map(|(k, v)| {
501                    (
502                        k.to_owned(),
503                        serde_json::Value::String(if v.is_string() {
504                            String::from(v.as_str().unwrap())
505                        } else {
506                            v.to_string()
507                        }),
508                    )
509                })
510                .collect())
511        } else {
512            Err(ProtocolError::Generic(
513                "Operation parameters serialized into unexpected shape".into(),
514            )
515            .into())
516        }
517    }
518
519    /// Denotes if the operation changes the data contained in the table
520    pub fn changes_data(&self) -> bool {
521        match self {
522            Self::Optimize { .. }
523            | Self::UpdateFieldMetadata { .. }
524            | Self::SetTableProperties { .. }
525            | Self::AddColumn { .. }
526            | Self::AddFeature { .. }
527            | Self::VacuumStart { .. }
528            | Self::VacuumEnd { .. }
529            | Self::AddConstraint { .. }
530            | Self::DropConstraint { .. } => false,
531            Self::Create { .. }
532            | Self::FileSystemCheck {}
533            | Self::StreamingUpdate { .. }
534            | Self::Write { .. }
535            | Self::Delete { .. }
536            | Self::Merge { .. }
537            | Self::Update { .. }
538            | Self::Restore { .. } => true,
539        }
540    }
541
542    /// Retrieve basic commit information to be added to Delta commits
543    pub fn get_commit_info(&self) -> CommitInfo {
544        // TODO infer additional info from operation parameters ...
545        CommitInfo {
546            operation: Some(self.name().into()),
547            operation_parameters: self.operation_parameters().ok(),
548            ..Default::default()
549        }
550    }
551
552    /// Get predicate expression applied when the operation reads data from the table.
553    pub fn read_predicate(&self) -> Option<String> {
554        match self {
555            // TODO add more operations
556            Self::Write { predicate, .. } => predicate.clone(),
557            Self::Delete { predicate, .. } => predicate.clone(),
558            Self::Update { predicate, .. } => predicate.clone(),
559            Self::Merge { predicate, .. } => predicate.clone(),
560            _ => None,
561        }
562    }
563
564    /// Denotes if the operation reads the entire table
565    pub fn read_whole_table(&self) -> bool {
566        match self {
567            // Predicate is none -> Merge operation had to join full source and target
568            Self::Merge { predicate, .. } if predicate.is_none() => true,
569            _ => false,
570        }
571    }
572}
573
574/// The SaveMode used when performing a DeltaOperation
575#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
576pub enum SaveMode {
577    /// Files will be appended to the target location.
578    Append,
579    /// The target location will be overwritten.
580    Overwrite,
581    /// If files exist for the target, the operation must fail.
582    ErrorIfExists,
583    /// If files exist for the target, the operation must not proceed or change any data.
584    Ignore,
585}
586
587impl FromStr for SaveMode {
588    type Err = DeltaTableError;
589
590    fn from_str(s: &str) -> DeltaResult<Self> {
591        match s.to_ascii_lowercase().as_str() {
592            "append" => Ok(SaveMode::Append),
593            "overwrite" => Ok(SaveMode::Overwrite),
594            "error" => Ok(SaveMode::ErrorIfExists),
595            "ignore" => Ok(SaveMode::Ignore),
596            _ => Err(DeltaTableError::Generic(format!("Invalid save mode provided: {s}, only these are supported: ['append', 'overwrite', 'error', 'ignore']"))),
597        }
598    }
599}
600
601/// The OutputMode used in streaming operations.
602#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
603pub enum OutputMode {
604    /// Only new rows will be written when new data is available.
605    Append,
606    /// The full output (all rows) will be written whenever new data is available.
607    Complete,
608    /// Only rows with updates will be written when new or changed data is available.
609    Update,
610}
611
612pub(crate) async fn get_last_checkpoint(
613    log_store: &dyn LogStore,
614) -> Result<CheckPoint, ProtocolError> {
615    let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
616    debug!("loading checkpoint from {last_checkpoint_path}");
617    match log_store
618        .object_store(None)
619        .get(&last_checkpoint_path)
620        .await
621    {
622        Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
623        Err(ObjectStoreError::NotFound { .. }) => {
624            match find_latest_check_point_for_version(log_store, i64::MAX).await {
625                Ok(Some(cp)) => Ok(cp),
626                _ => Err(ProtocolError::CheckpointNotFound),
627            }
628        }
629        Err(err) => Err(ProtocolError::ObjectStore { source: err }),
630    }
631}
632
633pub(crate) async fn find_latest_check_point_for_version(
634    log_store: &dyn LogStore,
635    version: i64,
636) -> Result<Option<CheckPoint>, ProtocolError> {
637    static CHECKPOINT_REGEX: LazyLock<Regex> =
638        LazyLock::new(|| Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap());
639    static CHECKPOINT_PARTS_REGEX: LazyLock<Regex> = LazyLock::new(|| {
640        Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap()
641    });
642
643    let mut cp: Option<CheckPoint> = None;
644    let object_store = log_store.object_store(None);
645    let mut stream = object_store.list(Some(log_store.log_path()));
646
647    while let Some(obj_meta) = stream.next().await {
648        // Exit early if any objects can't be listed.
649        // We exclude the special case of a not found error on some of the list entities.
650        // This error mainly occurs for local stores when a temporary file has been deleted by
651        // concurrent writers or if the table is vacuumed by another client.
652        let obj_meta = match obj_meta {
653            Ok(meta) => Ok(meta),
654            Err(ObjectStoreError::NotFound { .. }) => continue,
655            Err(err) => Err(err),
656        }?;
657        if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) {
658            let curr_ver_str = captures.get(1).unwrap().as_str();
659            let curr_ver: i64 = curr_ver_str.parse().unwrap();
660            if curr_ver > version {
661                // skip checkpoints newer than max version
662                continue;
663            }
664            if cp.is_none() || curr_ver > cp.unwrap().version {
665                cp = Some(CheckPoint::new(curr_ver, 0, None));
666            }
667            continue;
668        }
669
670        if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) {
671            let curr_ver_str = captures.get(1).unwrap().as_str();
672            let curr_ver: i64 = curr_ver_str.parse().unwrap();
673            if curr_ver > version {
674                // skip checkpoints newer than max version
675                continue;
676            }
677            if cp.is_none() || curr_ver > cp.unwrap().version {
678                let parts_str = captures.get(2).unwrap().as_str();
679                let parts = parts_str.parse().unwrap();
680                cp = Some(CheckPoint::new(curr_ver, 0, Some(parts)));
681            }
682            continue;
683        }
684    }
685
686    Ok(cp)
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692    use crate::kernel::Action;
693
694    #[test]
695    fn test_load_table_stats() {
696        let action = Add {
697            stats: Some(
698                serde_json::json!({
699                    "numRecords": 22,
700                    "minValues": {"a": 1, "nested": {"b": 2, "c": "a"}},
701                    "maxValues": {"a": 10, "nested": {"b": 20, "c": "z"}},
702                    "nullCount": {"a": 1, "nested": {"b": 0, "c": 1}},
703                })
704                .to_string(),
705            ),
706            path: Default::default(),
707            data_change: true,
708            deletion_vector: None,
709            partition_values: Default::default(),
710            stats_parsed: None,
711            tags: None,
712            size: 0,
713            modification_time: 0,
714            base_row_id: None,
715            default_row_commit_version: None,
716            clustering_provider: None,
717        };
718
719        let stats = action.get_stats().unwrap().unwrap();
720
721        assert_eq!(stats.num_records, 22);
722
723        assert_eq!(
724            stats.min_values["a"].as_value().unwrap(),
725            &serde_json::json!(1)
726        );
727        assert_eq!(
728            stats.min_values["nested"].as_column().unwrap()["b"]
729                .as_value()
730                .unwrap(),
731            &serde_json::json!(2)
732        );
733        assert_eq!(
734            stats.min_values["nested"].as_column().unwrap()["c"]
735                .as_value()
736                .unwrap(),
737            &serde_json::json!("a")
738        );
739
740        assert_eq!(
741            stats.max_values["a"].as_value().unwrap(),
742            &serde_json::json!(10)
743        );
744        assert_eq!(
745            stats.max_values["nested"].as_column().unwrap()["b"]
746                .as_value()
747                .unwrap(),
748            &serde_json::json!(20)
749        );
750        assert_eq!(
751            stats.max_values["nested"].as_column().unwrap()["c"]
752                .as_value()
753                .unwrap(),
754            &serde_json::json!("z")
755        );
756
757        assert_eq!(stats.null_count["a"].as_value().unwrap(), 1);
758        assert_eq!(
759            stats.null_count["nested"].as_column().unwrap()["b"]
760                .as_value()
761                .unwrap(),
762            0
763        );
764        assert_eq!(
765            stats.null_count["nested"].as_column().unwrap()["c"]
766                .as_value()
767                .unwrap(),
768            1
769        );
770    }
771
772    #[test]
773    fn test_load_table_partial_stats() {
774        let action = Add {
775            stats: Some(
776                serde_json::json!({
777                    "numRecords": 22
778                })
779                .to_string(),
780            ),
781            path: Default::default(),
782            data_change: true,
783            deletion_vector: None,
784            partition_values: Default::default(),
785            stats_parsed: None,
786            tags: None,
787            size: 0,
788            modification_time: 0,
789            base_row_id: None,
790            default_row_commit_version: None,
791            clustering_provider: None,
792        };
793
794        let stats = action.get_stats().unwrap().unwrap();
795
796        assert_eq!(stats.num_records, 22);
797        assert_eq!(stats.min_values.len(), 0);
798        assert_eq!(stats.max_values.len(), 0);
799        assert_eq!(stats.null_count.len(), 0);
800    }
801
802    #[test]
803    fn test_read_commit_info() {
804        let raw = r#"
805        {
806            "timestamp": 1670892998177,
807            "operation": "WRITE",
808            "operationParameters": {
809                "mode": "Append",
810                "partitionBy": "[\"c1\",\"c2\"]"
811            },
812            "isolationLevel": "Serializable",
813            "isBlindAppend": true,
814            "operationMetrics": {
815                "numFiles": "3",
816                "numOutputRows": "3",
817                "numOutputBytes": "1356"
818            },
819            "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
820            "txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c"
821        }"#;
822
823        let info = serde_json::from_str::<CommitInfo>(raw);
824        assert!(info.is_ok());
825
826        // assert that commit info has no required fields
827        let raw = "{}";
828        let info = serde_json::from_str::<CommitInfo>(raw);
829        assert!(info.is_ok());
830
831        // arbitrary field data may be added to commit
832        let raw = r#"
833        {
834            "timestamp": 1670892998177,
835            "operation": "WRITE",
836            "operationParameters": {
837                "mode": "Append",
838                "partitionBy": "[\"c1\",\"c2\"]"
839            },
840            "isolationLevel": "Serializable",
841            "isBlindAppend": true,
842            "operationMetrics": {
843                "numFiles": "3",
844                "numOutputRows": "3",
845                "numOutputBytes": "1356"
846            },
847            "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
848            "txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c",
849            "additionalField": "more data",
850            "additionalStruct": {
851                "key": "value",
852                "otherKey": 123
853            }
854        }"#;
855
856        let info = serde_json::from_str::<CommitInfo>(raw).expect("should parse");
857        assert!(info.info.contains_key("additionalField"));
858        assert!(info.info.contains_key("additionalStruct"));
859    }
860
861    #[test]
862    fn test_read_domain_metadata() {
863        let buf = r#"{"domainMetadata":{"domain":"delta.liquid","configuration":"{\"clusteringColumns\":[{\"physicalName\":[\"id\"]}],\"domainName\":\"delta.liquid\"}","removed":false}}"#;
864        let _action: Action =
865            serde_json::from_str(buf).expect("Expected to be able to deserialize");
866    }
867
868    mod arrow_tests {
869        use arrow::array::{self, ArrayRef, StructArray};
870        use arrow::compute::kernels::cast_utils::Parser;
871        use arrow::compute::sort_to_indices;
872        use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimestampMicrosecondType};
873        use arrow::record_batch::RecordBatch;
874        use std::sync::Arc;
875
876        fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result<RecordBatch> {
877            let sort_column = batch.column(batch.schema().column_with_name(column).unwrap().0);
878            let sort_indices = sort_to_indices(sort_column, None, None)?;
879            let schema = batch.schema();
880            let sorted_columns: Vec<(&String, ArrayRef)> = schema
881                .fields()
882                .iter()
883                .zip(batch.columns().iter())
884                .map(|(field, column)| {
885                    Ok((
886                        field.name(),
887                        arrow::compute::take(column, &sort_indices, None)?,
888                    ))
889                })
890                .collect::<arrow::error::Result<_>>()?;
891            RecordBatch::try_from_iter(sorted_columns)
892        }
893
894        #[tokio::test]
895        async fn test_with_partitions() {
896            // test table with partitions
897            let path = "../test/tests/data/delta-0.8.0-null-partition";
898            let table = crate::open_table(path).await.unwrap();
899            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
900
901            let mut expected_columns: Vec<(&str, ArrayRef)> = vec![
902                ("path", Arc::new(array::StringArray::from(vec![
903                    "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet",
904                    "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"
905                ]))),
906                ("size_bytes", Arc::new(array::Int64Array::from(vec![460, 460]))),
907                ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
908                    1627990384000, 1627990384000
909                ]))),
910                ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))),
911                ("partition.k", Arc::new(array::StringArray::from(vec![Some("A"), None]))),
912            ];
913            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
914
915            assert_eq!(expected, actions);
916
917            let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
918            let actions = sort_batch_by(&actions, "path").unwrap();
919
920            expected_columns[4] = (
921                "partition_values",
922                Arc::new(array::StructArray::new(
923                    Fields::from(vec![Field::new("k", DataType::Utf8, true)]),
924                    vec![Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef],
925                    None,
926                )),
927            );
928            let expected = RecordBatch::try_from_iter(expected_columns).unwrap();
929
930            assert_eq!(expected, actions);
931        }
932
933        #[tokio::test]
934        #[ignore = "enable when deletion vector is supported"]
935        async fn test_with_deletion_vector() {
936            // test table with partitions
937            let path = "../test/tests/data/table_with_deletion_logs";
938            let table = crate::open_table(path).await.unwrap();
939            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
940            let actions = sort_batch_by(&actions, "path").unwrap();
941            let actions = actions
942                .project(&[
943                    actions.schema().index_of("path").unwrap(),
944                    actions.schema().index_of("size_bytes").unwrap(),
945                    actions
946                        .schema()
947                        .index_of("deletionVector.storageType")
948                        .unwrap(),
949                    actions
950                        .schema()
951                        .index_of("deletionVector.pathOrInlineDiv")
952                        .unwrap(),
953                    actions.schema().index_of("deletionVector.offset").unwrap(),
954                    actions
955                        .schema()
956                        .index_of("deletionVector.sizeInBytes")
957                        .unwrap(),
958                    actions
959                        .schema()
960                        .index_of("deletionVector.cardinality")
961                        .unwrap(),
962                ])
963                .unwrap();
964            let expected_columns: Vec<(&str, ArrayRef)> = vec![
965                (
966                    "path",
967                    Arc::new(array::StringArray::from(vec![
968                        "part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
969                    ])),
970                ),
971                ("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
972                (
973                    "deletionVector.storageType",
974                    Arc::new(array::StringArray::from(vec!["u"])),
975                ),
976                (
977                    "deletionVector.pathOrInlineDiv",
978                    Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"])),
979                ),
980                (
981                    "deletionVector.offset",
982                    Arc::new(array::Int32Array::from(vec![1])),
983                ),
984                (
985                    "deletionVector.sizeInBytes",
986                    Arc::new(array::Int32Array::from(vec![36])),
987                ),
988                (
989                    "deletionVector.cardinality",
990                    Arc::new(array::Int64Array::from(vec![2])),
991                ),
992            ];
993            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
994
995            assert_eq!(expected, actions);
996
997            let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
998            let actions = sort_batch_by(&actions, "path").unwrap();
999            let actions = actions
1000                .project(&[
1001                    actions.schema().index_of("path").unwrap(),
1002                    actions.schema().index_of("size_bytes").unwrap(),
1003                    actions.schema().index_of("deletionVector").unwrap(),
1004                ])
1005                .unwrap();
1006            let expected_columns: Vec<(&str, ArrayRef)> = vec![
1007                (
1008                    "path",
1009                    Arc::new(array::StringArray::from(vec![
1010                        "part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
1011                    ])),
1012                ),
1013                ("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
1014                (
1015                    "deletionVector",
1016                    Arc::new(array::StructArray::new(
1017                        Fields::from(vec![
1018                            Field::new("storageType", DataType::Utf8, false),
1019                            Field::new("pathOrInlineDiv", DataType::Utf8, false),
1020                            Field::new("offset", DataType::Int32, true),
1021                            Field::new("sizeInBytes", DataType::Int32, false),
1022                            Field::new("cardinality", DataType::Int64, false),
1023                        ]),
1024                        vec![
1025                            Arc::new(array::StringArray::from(vec!["u"])) as ArrayRef,
1026                            Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"]))
1027                                as ArrayRef,
1028                            Arc::new(array::Int32Array::from(vec![1])) as ArrayRef,
1029                            Arc::new(array::Int32Array::from(vec![36])) as ArrayRef,
1030                            Arc::new(array::Int64Array::from(vec![2])) as ArrayRef,
1031                        ],
1032                        None,
1033                    )),
1034                ),
1035            ];
1036            let expected = RecordBatch::try_from_iter(expected_columns).unwrap();
1037
1038            assert_eq!(expected, actions);
1039        }
1040        #[tokio::test]
1041        async fn test_without_partitions() {
1042            // test table without partitions
1043            let path = "../test/tests/data/simple_table";
1044            let table = crate::open_table(path).await.unwrap();
1045
1046            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1047            let actions = sort_batch_by(&actions, "path").unwrap();
1048
1049            let expected_columns: Vec<(&str, ArrayRef)> = vec![
1050                (
1051                    "path",
1052                    Arc::new(array::StringArray::from(vec![
1053                        "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet",
1054                        "part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet",
1055                        "part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet",
1056                        "part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet",
1057                        "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet",
1058                    ])),
1059                ),
1060                (
1061                    "size_bytes",
1062                    Arc::new(array::Int64Array::from(vec![262, 262, 429, 429, 429])),
1063                ),
1064                (
1065                    "modification_time",
1066                    Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1067                        1587968626000,
1068                        1587968602000,
1069                        1587968602000,
1070                        1587968602000,
1071                        1587968602000,
1072                    ])),
1073                ),
1074                (
1075                    "data_change",
1076                    Arc::new(array::BooleanArray::from(vec![
1077                        true, true, true, true, true,
1078                    ])),
1079                ),
1080            ];
1081            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1082
1083            assert_eq!(expected, actions);
1084
1085            let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
1086            let actions = sort_batch_by(&actions, "path").unwrap();
1087
1088            // For now, this column is ignored.
1089            // expected_columns.push((
1090            //     "partition_values",
1091            //     new_null_array(&DataType::Struct(vec![]), 5),
1092            // ));
1093            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1094
1095            assert_eq!(expected, actions);
1096        }
1097
1098        #[tokio::test]
1099        #[ignore = "column mapping not yet supported."]
1100        async fn test_with_column_mapping() {
1101            // test table with column mapping and partitions
1102            let path = "../test/tests/data/table_with_column_mapping";
1103            let table = crate::open_table(path).await.unwrap();
1104            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1105            let expected_columns: Vec<(&str, ArrayRef)> = vec![
1106                (
1107                    "path",
1108                    Arc::new(array::StringArray::from(vec![
1109                        "BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet",
1110                        "8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet",
1111                    ])),
1112                ),
1113                (
1114                    "size_bytes",
1115                    Arc::new(array::Int64Array::from(vec![890, 810])),
1116                ),
1117                (
1118                    "modification_time",
1119                    Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1120                        1699946088000,
1121                        1699946088000,
1122                    ])),
1123                ),
1124                (
1125                    "data_change",
1126                    Arc::new(array::BooleanArray::from(vec![true, true])),
1127                ),
1128                (
1129                    "partition.Company Very Short",
1130                    Arc::new(array::StringArray::from(vec!["BMS", "BME"])),
1131                ),
1132                ("num_records", Arc::new(array::Int64Array::from(vec![4, 1]))),
1133                (
1134                    "null_count.Company Very Short",
1135                    Arc::new(array::NullArray::new(2)),
1136                ),
1137                ("min.Company Very Short", Arc::new(array::NullArray::new(2))),
1138                ("max.Company Very Short", Arc::new(array::NullArray::new(2))),
1139                ("null_count.Super Name", Arc::new(array::NullArray::new(2))),
1140                ("min.Super Name", Arc::new(array::NullArray::new(2))),
1141                ("max.Super Name", Arc::new(array::NullArray::new(2))),
1142                (
1143                    "tags.INSERTION_TIME",
1144                    Arc::new(array::StringArray::from(vec![
1145                        "1699946088000000",
1146                        "1699946088000001",
1147                    ])),
1148                ),
1149                (
1150                    "tags.MAX_INSERTION_TIME",
1151                    Arc::new(array::StringArray::from(vec![
1152                        "1699946088000000",
1153                        "1699946088000001",
1154                    ])),
1155                ),
1156                (
1157                    "tags.MIN_INSERTION_TIME",
1158                    Arc::new(array::StringArray::from(vec![
1159                        "1699946088000000",
1160                        "1699946088000001",
1161                    ])),
1162                ),
1163                (
1164                    "tags.OPTIMIZE_TARGET_SIZE",
1165                    Arc::new(array::StringArray::from(vec!["33554432", "33554432"])),
1166                ),
1167            ];
1168            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1169
1170            assert_eq!(expected, actions);
1171        }
1172
1173        #[tokio::test]
1174        async fn test_with_stats() {
1175            // test table with stats
1176            let path = "../test/tests/data/delta-0.8.0";
1177            let table = crate::open_table(path).await.unwrap();
1178            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1179            let actions = sort_batch_by(&actions, "path").unwrap();
1180
1181            let expected_columns: Vec<(&str, ArrayRef)> = vec![
1182                (
1183                    "path",
1184                    Arc::new(array::StringArray::from(vec![
1185                        "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
1186                        "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
1187                    ])),
1188                ),
1189                (
1190                    "size_bytes",
1191                    Arc::new(array::Int64Array::from(vec![440, 440])),
1192                ),
1193                (
1194                    "modification_time",
1195                    Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1196                        1615043776000,
1197                        1615043767000,
1198                    ])),
1199                ),
1200                (
1201                    "data_change",
1202                    Arc::new(array::BooleanArray::from(vec![true, true])),
1203                ),
1204                ("num_records", Arc::new(array::Int64Array::from(vec![2, 2]))),
1205                (
1206                    "null_count.value",
1207                    Arc::new(array::Int64Array::from(vec![0, 0])),
1208                ),
1209                ("min.value", Arc::new(array::Int32Array::from(vec![2, 0]))),
1210                ("max.value", Arc::new(array::Int32Array::from(vec![4, 2]))),
1211            ];
1212            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1213
1214            assert_eq!(expected, actions);
1215        }
1216
1217        #[tokio::test]
1218        async fn test_table_not_always_with_stats() {
1219            let path = "../test/tests/data/delta-stats-optional";
1220            let mut table = crate::open_table(path).await.unwrap();
1221            table.load().await.unwrap();
1222            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1223            let actions = sort_batch_by(&actions, "path").unwrap();
1224            // get column-0 path, and column-4 num_records, and column_5 null_count.integer
1225            let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
1226                "part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet",
1227                "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
1228            ]));
1229            let expected_num_records: ArrayRef =
1230                Arc::new(array::Int64Array::from(vec![None, Some(1)]));
1231            let expected_null_count: ArrayRef =
1232                Arc::new(array::Int64Array::from(vec![None, Some(0)]));
1233
1234            let path_column = actions.column(0);
1235            let num_records_column = actions.column(4);
1236            let null_count_column = actions.column(5);
1237
1238            assert_eq!(&expected_path, path_column);
1239            assert_eq!(&expected_num_records, num_records_column);
1240            assert_eq!(&expected_null_count, null_count_column);
1241        }
1242
1243        #[tokio::test]
1244        async fn test_table_checkpoint_not_always_with_stats() {
1245            let path = "../test/tests/data/delta-checkpoint-stats-optional";
1246            let mut table = crate::open_table(path).await.unwrap();
1247            table.load().await.unwrap();
1248
1249            assert_eq!(2, table.snapshot().unwrap().file_actions().unwrap().len());
1250        }
1251
1252        #[tokio::test]
1253        async fn test_only_struct_stats() {
1254            // test table with no json stats
1255            let path = "../test/tests/data/delta-1.2.1-only-struct-stats";
1256            let mut table = crate::open_table(path).await.unwrap();
1257            table.load_version(1).await.unwrap();
1258
1259            let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1260
1261            let expected_columns: Vec<(&str, ArrayRef)> = vec![
1262                (
1263                    "path",
1264                    Arc::new(array::StringArray::from(vec![
1265                        "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
1266                    ])),
1267                ),
1268                ("size_bytes", Arc::new(array::Int64Array::from(vec![5489]))),
1269                (
1270                    "modification_time",
1271                    Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1272                        1666652373000,
1273                    ])),
1274                ),
1275                (
1276                    "data_change",
1277                    Arc::new(array::BooleanArray::from(vec![true])),
1278                ),
1279                ("num_records", Arc::new(array::Int64Array::from(vec![1]))),
1280                (
1281                    "null_count.integer",
1282                    Arc::new(array::Int64Array::from(vec![0])),
1283                ),
1284                ("min.integer", Arc::new(array::Int32Array::from(vec![0]))),
1285                ("max.integer", Arc::new(array::Int32Array::from(vec![0]))),
1286                (
1287                    "null_count.null",
1288                    Arc::new(array::Int64Array::from(vec![1])),
1289                ),
1290                ("min.null", Arc::new(array::NullArray::new(1))),
1291                ("max.null", Arc::new(array::NullArray::new(1))),
1292                (
1293                    "null_count.boolean",
1294                    Arc::new(array::Int64Array::from(vec![0])),
1295                ),
1296                ("min.boolean", Arc::new(array::NullArray::new(1))),
1297                ("max.boolean", Arc::new(array::NullArray::new(1))),
1298                (
1299                    "null_count.double",
1300                    Arc::new(array::Int64Array::from(vec![0])),
1301                ),
1302                (
1303                    "min.double",
1304                    Arc::new(array::Float64Array::from(vec![1.234])),
1305                ),
1306                (
1307                    "max.double",
1308                    Arc::new(array::Float64Array::from(vec![1.234])),
1309                ),
1310                (
1311                    "null_count.decimal",
1312                    Arc::new(array::Int64Array::from(vec![0])),
1313                ),
1314                (
1315                    "min.decimal",
1316                    Arc::new(
1317                        array::Decimal128Array::from_iter_values([-567800])
1318                            .with_precision_and_scale(8, 5)
1319                            .unwrap(),
1320                    ),
1321                ),
1322                (
1323                    "max.decimal",
1324                    Arc::new(
1325                        array::Decimal128Array::from_iter_values([-567800])
1326                            .with_precision_and_scale(8, 5)
1327                            .unwrap(),
1328                    ),
1329                ),
1330                (
1331                    "null_count.string",
1332                    Arc::new(array::Int64Array::from(vec![0])),
1333                ),
1334                (
1335                    "min.string",
1336                    Arc::new(array::StringArray::from(vec!["string"])),
1337                ),
1338                (
1339                    "max.string",
1340                    Arc::new(array::StringArray::from(vec!["string"])),
1341                ),
1342                (
1343                    "null_count.binary",
1344                    Arc::new(array::Int64Array::from(vec![0])),
1345                ),
1346                ("min.binary", Arc::new(array::NullArray::new(1))),
1347                ("max.binary", Arc::new(array::NullArray::new(1))),
1348                (
1349                    "null_count.date",
1350                    Arc::new(array::Int64Array::from(vec![0])),
1351                ),
1352                (
1353                    "min.date",
1354                    Arc::new(array::Date32Array::from(vec![Date32Type::parse(
1355                        "2022-10-24",
1356                    )])),
1357                ),
1358                (
1359                    "max.date",
1360                    Arc::new(array::Date32Array::from(vec![Date32Type::parse(
1361                        "2022-10-24",
1362                    )])),
1363                ),
1364                (
1365                    "null_count.timestamp",
1366                    Arc::new(array::Int64Array::from(vec![0])),
1367                ),
1368                (
1369                    "min.timestamp",
1370                    Arc::new(
1371                        array::TimestampMicrosecondArray::from(vec![
1372                            TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"),
1373                        ])
1374                        .with_timezone("UTC"),
1375                    ),
1376                ),
1377                (
1378                    "max.timestamp",
1379                    Arc::new(
1380                        array::TimestampMicrosecondArray::from(vec![
1381                            TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"),
1382                        ])
1383                        .with_timezone("UTC"),
1384                    ),
1385                ),
1386                (
1387                    "null_count.struct.struct_element",
1388                    Arc::new(array::Int64Array::from(vec![0])),
1389                ),
1390                (
1391                    "min.struct.struct_element",
1392                    Arc::new(array::StringArray::from(vec!["struct_value"])),
1393                ),
1394                (
1395                    "max.struct.struct_element",
1396                    Arc::new(array::StringArray::from(vec!["struct_value"])),
1397                ),
1398                ("null_count.map", Arc::new(array::Int64Array::from(vec![0]))),
1399                (
1400                    "null_count.array",
1401                    Arc::new(array::Int64Array::from(vec![0])),
1402                ),
1403                (
1404                    "null_count.nested_struct.struct_element.nested_struct_element",
1405                    Arc::new(array::Int64Array::from(vec![0])),
1406                ),
1407                (
1408                    "min.nested_struct.struct_element.nested_struct_element",
1409                    Arc::new(array::StringArray::from(vec!["nested_struct_value"])),
1410                ),
1411                (
1412                    "max.nested_struct.struct_element.nested_struct_element",
1413                    Arc::new(array::StringArray::from(vec!["nested_struct_value"])),
1414                ),
1415                (
1416                    "null_count.struct_of_array_of_map.struct_element",
1417                    Arc::new(array::Int64Array::from(vec![0])),
1418                ),
1419                (
1420                    "tags.INSERTION_TIME",
1421                    Arc::new(array::StringArray::from(vec!["1666652373000000"])),
1422                ),
1423                (
1424                    "tags.OPTIMIZE_TARGET_SIZE",
1425                    Arc::new(array::StringArray::from(vec!["268435456"])),
1426                ),
1427            ];
1428            let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1429
1430            assert_eq!(
1431                expected
1432                    .schema()
1433                    .fields()
1434                    .iter()
1435                    .map(|field| field.name().as_str())
1436                    .collect::<Vec<&str>>(),
1437                actions
1438                    .schema()
1439                    .fields()
1440                    .iter()
1441                    .map(|field| field.name().as_str())
1442                    .collect::<Vec<&str>>()
1443            );
1444            assert_eq!(expected, actions);
1445
1446            let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
1447            // For brevity, just checking a few nested columns in stats
1448
1449            assert_eq!(
1450                actions
1451                    .get_field_at_path(&[
1452                        "null_count",
1453                        "nested_struct",
1454                        "struct_element",
1455                        "nested_struct_element"
1456                    ])
1457                    .unwrap()
1458                    .as_any()
1459                    .downcast_ref::<array::Int64Array>()
1460                    .unwrap(),
1461                &array::Int64Array::from(vec![0]),
1462            );
1463
1464            assert_eq!(
1465                actions
1466                    .get_field_at_path(&[
1467                        "min",
1468                        "nested_struct",
1469                        "struct_element",
1470                        "nested_struct_element"
1471                    ])
1472                    .unwrap()
1473                    .as_any()
1474                    .downcast_ref::<array::StringArray>()
1475                    .unwrap(),
1476                &array::StringArray::from(vec!["nested_struct_value"]),
1477            );
1478
1479            assert_eq!(
1480                actions
1481                    .get_field_at_path(&[
1482                        "max",
1483                        "nested_struct",
1484                        "struct_element",
1485                        "nested_struct_element"
1486                    ])
1487                    .unwrap()
1488                    .as_any()
1489                    .downcast_ref::<array::StringArray>()
1490                    .unwrap(),
1491                &array::StringArray::from(vec!["nested_struct_value"]),
1492            );
1493
1494            assert_eq!(
1495                actions
1496                    .get_field_at_path(&["null_count", "struct_of_array_of_map", "struct_element"])
1497                    .unwrap()
1498                    .as_any()
1499                    .downcast_ref::<array::Int64Array>()
1500                    .unwrap(),
1501                &array::Int64Array::from(vec![0])
1502            );
1503
1504            assert_eq!(
1505                actions
1506                    .get_field_at_path(&["tags", "OPTIMIZE_TARGET_SIZE"])
1507                    .unwrap()
1508                    .as_any()
1509                    .downcast_ref::<array::StringArray>()
1510                    .unwrap(),
1511                &array::StringArray::from(vec!["268435456"])
1512            );
1513        }
1514
1515        /// Trait to make it easier to access nested fields
1516        trait NestedTabular {
1517            fn get_field_at_path(&self, path: &[&str]) -> Option<ArrayRef>;
1518        }
1519
1520        impl NestedTabular for RecordBatch {
1521            fn get_field_at_path(&self, path: &[&str]) -> Option<ArrayRef> {
1522                // First, get array in the batch
1523                let (first_key, remainder) = path.split_at(1);
1524                let mut col = self.column(self.schema().column_with_name(first_key[0])?.0);
1525
1526                if remainder.is_empty() {
1527                    return Some(Arc::clone(col));
1528                }
1529
1530                for segment in remainder {
1531                    col = col
1532                        .as_any()
1533                        .downcast_ref::<StructArray>()?
1534                        .column_by_name(segment)?;
1535                }
1536
1537                Some(Arc::clone(col))
1538            }
1539        }
1540    }
1541}