Skip to main content

deltalake_core/writer/
stats.rs

1use std::cmp::min;
2use std::ops::Not;
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5use std::{collections::HashMap, ops::AddAssign};
6
7use delta_kernel::expressions::Scalar;
8use delta_kernel::table_properties::DataSkippingNumIndexedCols;
9use indexmap::IndexMap;
10use itertools::Itertools;
11use parquet::basic::LogicalType;
12use parquet::basic::Type;
13use parquet::file::metadata::ParquetMetaData;
14use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
15use parquet::{
16    basic::TimeUnit,
17    file::{metadata::RowGroupMetaData, statistics::Statistics},
18};
19use tracing::warn;
20
21use super::*;
22use crate::kernel::{Add, scalars::ScalarExt};
23use crate::protocol::{ColumnValueStat, Stats};
24
25/// Creates an [`Add`] log action struct.
26pub(crate) fn create_add(
27    partition_values: &IndexMap<String, Scalar>,
28    path: String,
29    size: i64,
30    file_metadata: &ParquetMetaData,
31    num_indexed_cols: DataSkippingNumIndexedCols,
32    stats_columns: &Option<Vec<impl AsRef<str>>>,
33) -> Result<Add, DeltaTableError> {
34    let stats = stats_from_file_metadata(
35        partition_values,
36        file_metadata,
37        num_indexed_cols,
38        stats_columns,
39    )?;
40    let stats_string = serde_json::to_string(&stats)?;
41
42    // Determine the modification timestamp to include in the add action - milliseconds since epoch
43    // Err should be impossible in this case since `SystemTime::now()` is always greater than `UNIX_EPOCH`
44    let modification_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
45    let modification_time = modification_time.as_millis() as i64;
46
47    Ok(Add {
48        path,
49        size,
50        partition_values: partition_values
51            .iter()
52            .map(|(k, v)| {
53                (
54                    k.clone(),
55                    if v.is_null() {
56                        None
57                    } else {
58                        Some(v.serialize())
59                    },
60                )
61            })
62            .collect(),
63        modification_time,
64        data_change: true,
65        stats: Some(stats_string),
66        tags: None,
67        deletion_vector: None,
68        base_row_id: None,
69        default_row_commit_version: None,
70        clustering_provider: None,
71    })
72}
73
74// As opposed to `stats_from_file_metadata` which operates on `parquet::format::FileMetaData`,
75// this function produces the stats by reading the metadata from already written out files.
76//
77// Note that the file metadata used here is actually `parquet::file::metadata::FileMetaData`
78// which is a thrift decoding of the `parquet::format::FileMetaData` which is typically obtained
79// when flushing the write.
80pub(crate) fn stats_from_parquet_metadata(
81    partition_values: &IndexMap<String, Scalar>,
82    parquet_metadata: &ParquetMetaData,
83    num_indexed_cols: DataSkippingNumIndexedCols,
84    stats_columns: &Option<Vec<String>>,
85) -> Result<Stats, DeltaWriterError> {
86    let num_rows = parquet_metadata.file_metadata().num_rows();
87    let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr();
88    let row_group_metadata = parquet_metadata.row_groups().to_vec();
89
90    stats_from_metadata(
91        partition_values,
92        schema_descriptor,
93        row_group_metadata,
94        num_rows,
95        num_indexed_cols,
96        stats_columns,
97    )
98}
99
100fn stats_from_file_metadata(
101    partition_values: &IndexMap<String, Scalar>,
102    file_metadata: &ParquetMetaData,
103    num_indexed_cols: DataSkippingNumIndexedCols,
104    stats_columns: &Option<Vec<impl AsRef<str>>>,
105) -> Result<Stats, DeltaWriterError> {
106    let schema_descriptor = file_metadata.file_metadata().schema_descr();
107
108    let row_group_metadata: Vec<RowGroupMetaData> = file_metadata.row_groups().to_vec();
109
110    stats_from_metadata(
111        partition_values,
112        Arc::new(schema_descriptor.clone()),
113        row_group_metadata,
114        file_metadata.file_metadata().num_rows(),
115        num_indexed_cols,
116        stats_columns,
117    )
118}
119
120fn stats_from_metadata(
121    partition_values: &IndexMap<String, Scalar>,
122    schema_descriptor: Arc<SchemaDescriptor>,
123    row_group_metadata: Vec<RowGroupMetaData>,
124    num_rows: i64,
125    num_indexed_cols: DataSkippingNumIndexedCols,
126    stats_columns: &Option<Vec<impl AsRef<str>>>,
127) -> Result<Stats, DeltaWriterError> {
128    let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
129    let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
130    let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();
131    let dialect = sqlparser::dialect::GenericDialect {};
132
133    let idx_to_iterate = if let Some(stats_cols) = stats_columns {
134        let stats_cols = stats_cols
135            .iter()
136            .map(|v| {
137                match sqlparser::parser::Parser::new(&dialect)
138                    .try_with_sql(v.as_ref())
139                    .map_err(|e| DeltaTableError::generic(e.to_string()))?
140                    .parse_multipart_identifier()
141                {
142                    Ok(parts) => Ok(parts.into_iter().map(|v| v.value).join(".")),
143                    Err(e) => Err(DeltaWriterError::DeltaTable(
144                        DeltaTableError::GenericError {
145                            source: Box::new(e),
146                        },
147                    )),
148                }
149            })
150            .collect::<Result<Vec<String>, DeltaWriterError>>()?;
151
152        schema_descriptor
153            .columns()
154            .iter()
155            .enumerate()
156            .filter_map(|(index, col)| {
157                if stats_cols.contains(&col.name().to_string()) {
158                    Some(index)
159                } else {
160                    None
161                }
162            })
163            .collect()
164    } else if num_indexed_cols == DataSkippingNumIndexedCols::AllColumns {
165        (0..schema_descriptor.num_columns()).collect::<Vec<_>>()
166    } else if let DataSkippingNumIndexedCols::NumColumns(n_cols) = num_indexed_cols {
167        (0..min(n_cols as usize, schema_descriptor.num_columns())).collect::<Vec<_>>()
168    } else {
169        return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic(
170            "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(),
171        )));
172    };
173
174    for idx in idx_to_iterate {
175        let column_descr = schema_descriptor.column(idx);
176
177        let column_path = column_descr.path();
178        let column_path_parts = column_path.parts();
179
180        // Do not include partition columns in statistics
181        if partition_values.contains_key(&column_path_parts[0]) {
182            continue;
183        }
184
185        let maybe_stats: Option<AggregatedStats> = row_group_metadata
186            .iter()
187            .flat_map(|g| {
188                g.column(idx).statistics().into_iter().filter_map(|s| {
189                    let is_binary = matches!(&column_descr.physical_type(), Type::BYTE_ARRAY)
190                        && matches!(column_descr.logical_type_ref(), Some(LogicalType::String))
191                            .not();
192                    if is_binary {
193                        warn!(
194                            "Skipping column {} because it's a binary field.",
195                            &column_descr.name().to_string()
196                        );
197                        None
198                    } else {
199                        Some(AggregatedStats::from((s, column_descr.logical_type_ref())))
200                    }
201                })
202            })
203            .reduce(|mut left, right| {
204                left += right;
205                left
206            });
207
208        if let Some(stats) = maybe_stats {
209            apply_min_max_for_column(
210                stats,
211                column_descr.clone(),
212                column_descr.path().parts(),
213                &mut min_values,
214                &mut max_values,
215                &mut null_count,
216            )?;
217        }
218    }
219
220    Ok(Stats {
221        min_values,
222        max_values,
223        num_records: num_rows,
224        null_count,
225    })
226}
227
228/// Logical scalars extracted from statistics. These are used to aggregate
229/// minimums and maximums. We can't use the physical scalars because they
230/// are not ordered correctly for some types. For example, decimals are stored
231/// as fixed length binary, and can't be sorted leixcographically.
232#[derive(Debug, Clone, PartialEq, PartialOrd)]
233enum StatsScalar {
234    Boolean(bool),
235    Int32(i32),
236    Int64(i64),
237    Float32(f32),
238    Float64(f64),
239    Date(chrono::NaiveDate),
240    Timestamp(chrono::NaiveDateTime),
241    // We are serializing to f64 later and the ordering should be the same
242    // Scale is stored to handle scale=0 serialization correctly
243    Decimal { value: f64, scale: i32 },
244    String(String),
245    Bytes(Vec<u8>),
246    Uuid(uuid::Uuid),
247}
248
249impl StatsScalar {
250    fn try_from_stats(
251        stats: &Statistics,
252        logical_type: Option<&LogicalType>,
253        use_min: bool,
254    ) -> Result<Self, DeltaWriterError> {
255        macro_rules! get_stat {
256            ($val: expr) => {
257                if use_min {
258                    *$val.min_opt().unwrap()
259                } else {
260                    *$val.max_opt().unwrap()
261                }
262            };
263        }
264
265        match (stats, logical_type) {
266            (Statistics::Boolean(v), _) => Ok(Self::Boolean(get_stat!(v))),
267            // Int32 can be date, decimal, or just int32
268            (Statistics::Int32(v), Some(LogicalType::Date)) => {
269                let epoch_start = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); // creating from epoch should be infallible
270                let date = epoch_start + chrono::Duration::days(get_stat!(v) as i64);
271                Ok(Self::Date(date))
272            }
273            (Statistics::Int32(v), Some(LogicalType::Decimal { scale, .. })) => {
274                let val = get_stat!(v) as f64 / 10.0_f64.powi(*scale);
275                // Spark serializes these as numbers
276                Ok(Self::Decimal {
277                    value: val,
278                    scale: *scale,
279                })
280            }
281            (Statistics::Int32(v), _) => Ok(Self::Int32(get_stat!(v))),
282            // Int64 can be timestamp, decimal, or integer
283            (Statistics::Int64(v), Some(LogicalType::Timestamp { unit, .. })) => {
284                // For now, we assume timestamps are adjusted to UTC. Non-UTC timestamps
285                // are behind a feature gate in Delta:
286                // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#timestamp-without-timezone-timestampntz
287                let v = get_stat!(v);
288                let timestamp = match unit {
289                    TimeUnit::MILLIS => chrono::DateTime::from_timestamp_millis(v),
290                    TimeUnit::MICROS => chrono::DateTime::from_timestamp_micros(v),
291                    TimeUnit::NANOS => {
292                        let secs = v / 1_000_000_000;
293                        let nanosecs = (v % 1_000_000_000) as u32;
294                        chrono::DateTime::from_timestamp(secs, nanosecs)
295                    }
296                };
297                let timestamp = timestamp.ok_or(DeltaWriterError::StatsParsingFailed {
298                    debug_value: v.to_string(),
299                    logical_type: logical_type.cloned(),
300                })?;
301                Ok(Self::Timestamp(timestamp.naive_utc()))
302            }
303            (Statistics::Int64(v), Some(LogicalType::Decimal { scale, .. })) => {
304                let val = get_stat!(v) as f64 / 10.0_f64.powi(*scale);
305                // Spark serializes these as numbers
306                Ok(Self::Decimal {
307                    value: val,
308                    scale: *scale,
309                })
310            }
311            (Statistics::Int64(v), _) => Ok(Self::Int64(get_stat!(v))),
312            (Statistics::Float(v), _) => Ok(Self::Float32(get_stat!(v))),
313            (Statistics::Double(v), _) => Ok(Self::Float64(get_stat!(v))),
314            (Statistics::ByteArray(v), logical_type) => {
315                let bytes = if use_min {
316                    v.min_bytes_opt()
317                } else {
318                    v.max_bytes_opt()
319                }
320                .unwrap_or_default();
321                match logical_type {
322                    None => Ok(Self::Bytes(bytes.to_vec())),
323                    Some(LogicalType::String) => {
324                        Ok(Self::String(String::from_utf8(bytes.to_vec()).map_err(
325                            |_| DeltaWriterError::StatsParsingFailed {
326                                debug_value: format!("{bytes:?}"),
327                                logical_type: Some(LogicalType::String),
328                            },
329                        )?))
330                    }
331                    _ => Err(DeltaWriterError::StatsParsingFailed {
332                        debug_value: format!("{bytes:?}"),
333                        logical_type: logical_type.cloned(),
334                    }),
335                }
336            }
337            (Statistics::FixedLenByteArray(v), Some(LogicalType::Decimal { scale, precision })) => {
338                let val = if use_min {
339                    v.min_bytes_opt()
340                } else {
341                    v.max_bytes_opt()
342                }
343                .unwrap_or_default();
344
345                let val = if val.len() <= 16 {
346                    i128::from_be_bytes(sign_extend_be(val)) as f64
347                } else {
348                    return Err(DeltaWriterError::StatsParsingFailed {
349                        debug_value: format!("{val:?}"),
350                        logical_type: Some(LogicalType::Decimal {
351                            scale: *scale,
352                            precision: *precision,
353                        }),
354                    });
355                };
356
357                let mut val = val / 10.0_f64.powi(*scale);
358
359                if val.is_normal()
360                    && (val.trunc() as i128).to_string().len() > (precision - scale) as usize
361                {
362                    // For normal values with integer parts that get rounded to a number beyond
363                    // the precision - scale range take the next smaller (by magnitude) value
364                    val = f64::from_bits(val.to_bits() - 1);
365                }
366
367                Ok(Self::Decimal {
368                    value: val,
369                    scale: *scale,
370                })
371            }
372            (Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => {
373                let val = if use_min {
374                    v.min_bytes_opt()
375                } else {
376                    v.max_bytes_opt()
377                }
378                .unwrap_or_default();
379
380                if val.len() != 16 {
381                    return Err(DeltaWriterError::StatsParsingFailed {
382                        debug_value: format!("{val:?}"),
383                        logical_type: Some(LogicalType::Uuid),
384                    });
385                }
386
387                let mut bytes = [0; 16];
388                bytes.copy_from_slice(val);
389
390                let val = uuid::Uuid::from_bytes(bytes);
391                Ok(Self::Uuid(val))
392            }
393            (stats, _) => Err(DeltaWriterError::StatsParsingFailed {
394                debug_value: format!("{stats:?}"),
395                logical_type: logical_type.cloned(),
396            }),
397        }
398    }
399}
400
401/// Performs big endian sign extension
402/// Copied from arrow-rs repo/parquet crate:
403/// https://github.com/apache/arrow-rs/blob/b25c441745602c9967b1e3cc4a28bc469cfb1311/parquet/src/arrow/buffer/bit_util.rs#L54
404pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
405    assert!(b.len() <= N, "Array too large, expected less than {N}");
406    let is_negative = (b[0] & 128u8) == 128u8;
407    let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
408    for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
409        *d = *s;
410    }
411    result
412}
413
414impl From<StatsScalar> for serde_json::Value {
415    fn from(scalar: StatsScalar) -> Self {
416        match scalar {
417            StatsScalar::Boolean(v) => serde_json::Value::Bool(v),
418            StatsScalar::Int32(v) => serde_json::Value::from(v),
419            StatsScalar::Int64(v) => serde_json::Value::from(v),
420            StatsScalar::Float32(v) => serde_json::Value::from(v),
421            StatsScalar::Float64(v) => serde_json::Value::from(v),
422            StatsScalar::Date(v) => serde_json::Value::from(v.format("%Y-%m-%d").to_string()),
423            StatsScalar::Timestamp(v) => {
424                serde_json::Value::from(v.format("%Y-%m-%dT%H:%M:%S%.fZ").to_string())
425            }
426            StatsScalar::Decimal { value, scale } => {
427                // For scale=0, serialize as integer since serde_json would otherwise
428                // serialize f64 as "1234.0" instead of "1234"
429                if scale == 0 {
430                    serde_json::Value::from(value.round() as i64)
431                } else {
432                    serde_json::Value::from(value)
433                }
434            }
435            StatsScalar::String(v) => serde_json::Value::from(v),
436            StatsScalar::Bytes(v) => {
437                let escaped_bytes = v
438                    .into_iter()
439                    .flat_map(std::ascii::escape_default)
440                    .collect::<Vec<u8>>();
441                let escaped_string = String::from_utf8(escaped_bytes).unwrap();
442                serde_json::Value::from(escaped_string)
443            }
444            StatsScalar::Uuid(v) => serde_json::Value::from(v.hyphenated().to_string()),
445        }
446    }
447}
448
449/// Aggregated stats
450struct AggregatedStats {
451    pub min: Option<StatsScalar>,
452    pub max: Option<StatsScalar>,
453    pub null_count: u64,
454}
455
456impl From<(&Statistics, Option<&LogicalType>)> for AggregatedStats {
457    fn from(value: (&Statistics, Option<&LogicalType>)) -> Self {
458        let (stats, logical_type) = value;
459        let null_count = stats.null_count_opt().unwrap_or_default();
460        if stats.min_bytes_opt().is_some() && stats.max_bytes_opt().is_some() {
461            let min = StatsScalar::try_from_stats(stats, logical_type, true).ok();
462            let max = StatsScalar::try_from_stats(stats, logical_type, false).ok();
463            Self {
464                min,
465                max,
466                null_count,
467            }
468        } else {
469            Self {
470                min: None,
471                max: None,
472                null_count,
473            }
474        }
475    }
476}
477
478impl AddAssign for AggregatedStats {
479    fn add_assign(&mut self, rhs: Self) {
480        self.min = match (self.min.take(), rhs.min) {
481            (Some(lhs), Some(rhs)) => {
482                if lhs < rhs {
483                    Some(lhs)
484                } else {
485                    Some(rhs)
486                }
487            }
488            (lhs, rhs) => lhs.or(rhs),
489        };
490        self.max = match (self.max.take(), rhs.max) {
491            (Some(lhs), Some(rhs)) => {
492                if lhs > rhs {
493                    Some(lhs)
494                } else {
495                    Some(rhs)
496                }
497            }
498            (lhs, rhs) => lhs.or(rhs),
499        };
500
501        self.null_count += rhs.null_count;
502    }
503}
504
505/// For a list field, we don't want the inner field names. We need to chuck out
506/// the list and items fields from the path, but also need to handle the
507/// peculiar case where the user named the list field "list" or "item".
508///
509/// NOTE: As of delta_kernel 0.3.1 the name switched from `item` to `element` to line up with the
510/// parquet spec, see
511/// [here](https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists)
512///
513/// For example:
514///
515/// * ["some_nested_list", "list", "item", "list", "item"] -> "some_nested_list"
516/// * ["some_list", "list", "item"] -> "some_list"
517/// * ["list", "list", "item"] -> "list"
518/// * ["item", "list", "item"] -> "item"
519fn get_list_field_name(column_descr: &Arc<ColumnDescriptor>) -> Option<String> {
520    let max_rep_levels = column_descr.max_rep_level();
521    let column_path_parts = column_descr.path().parts();
522
523    // If there are more nested names, we can't handle them yet.
524    if column_path_parts.len() > (2 * max_rep_levels + 1) as usize {
525        return None;
526    }
527
528    let mut column_path_parts = column_path_parts.to_vec();
529    let mut items_seen = 0;
530    let mut lists_seen = 0;
531    while let Some(part) = column_path_parts.pop() {
532        match (part.as_str(), lists_seen, items_seen) {
533            ("list", seen, _) if seen == max_rep_levels => return Some("list".to_string()),
534            ("element", _, seen) if seen == max_rep_levels => return Some("element".to_string()),
535            ("list", _, _) => lists_seen += 1,
536            ("element", _, _) => items_seen += 1,
537            (other, _, _) => return Some(other.to_string()),
538        }
539    }
540    None
541}
542
543fn apply_min_max_for_column(
544    statistics: AggregatedStats,
545    column_descr: Arc<ColumnDescriptor>,
546    column_path_parts: &[String],
547    min_values: &mut HashMap<String, ColumnValueStat>,
548    max_values: &mut HashMap<String, ColumnValueStat>,
549    null_counts: &mut HashMap<String, ColumnCountStat>,
550) -> Result<(), DeltaWriterError> {
551    // Special handling for list column
552    if column_descr.max_rep_level() > 0 {
553        let key = get_list_field_name(&column_descr);
554
555        if let Some(key) = key {
556            null_counts.insert(key, ColumnCountStat::Value(statistics.null_count as i64));
557        }
558
559        return Ok(());
560    }
561
562    match (column_path_parts.len(), column_path_parts.first()) {
563        // Base case - we are at the leaf struct level in the path
564        (1, _) => {
565            let key = column_descr.name().to_string();
566
567            if let Some(min) = statistics.min {
568                let min = ColumnValueStat::Value(min.into());
569                min_values.insert(key.clone(), min);
570            }
571
572            if let Some(max) = statistics.max {
573                let max = ColumnValueStat::Value(max.into());
574                max_values.insert(key.clone(), max);
575            }
576
577            null_counts.insert(key, ColumnCountStat::Value(statistics.null_count as i64));
578
579            Ok(())
580        }
581        // Recurse to load value at the appropriate level of HashMap
582        (_, Some(key)) => {
583            let child_min_values = min_values
584                .entry(key.to_owned())
585                .or_insert_with(|| ColumnValueStat::Column(HashMap::new()));
586            let child_max_values = max_values
587                .entry(key.to_owned())
588                .or_insert_with(|| ColumnValueStat::Column(HashMap::new()));
589            let child_null_counts = null_counts
590                .entry(key.to_owned())
591                .or_insert_with(|| ColumnCountStat::Column(HashMap::new()));
592
593            match (child_min_values, child_max_values, child_null_counts) {
594                (
595                    ColumnValueStat::Column(mins),
596                    ColumnValueStat::Column(maxes),
597                    ColumnCountStat::Column(null_counts),
598                ) => {
599                    let remaining_parts: Vec<String> = column_path_parts
600                        .iter()
601                        .skip(1)
602                        .map(|s| s.to_string())
603                        .collect();
604
605                    apply_min_max_for_column(
606                        statistics,
607                        column_descr,
608                        remaining_parts.as_slice(),
609                        mins,
610                        maxes,
611                        null_counts,
612                    )?;
613
614                    Ok(())
615                }
616                _ => {
617                    unreachable!();
618                }
619            }
620        }
621        // column path parts will always have at least one element.
622        (_, None) => {
623            unreachable!();
624        }
625    }
626}
627
628#[cfg(test)]
629mod tests {
630    use super::utils::record_batch_from_message;
631    use super::*;
632    use crate::{
633        DeltaTable,
634        errors::DeltaTableError,
635        protocol::{ColumnCountStat, ColumnValueStat},
636        table::builder::DeltaTableBuilder,
637    };
638    use parquet::data_type::{ByteArray, FixedLenByteArray};
639    use parquet::file::statistics::ValueStatistics;
640    use parquet::{basic::Compression, file::properties::WriterProperties};
641    use serde_json::{Value, json};
642    use std::collections::HashMap;
643    use std::path::Path;
644    use std::sync::LazyLock;
645    use url::Url;
646
647    macro_rules! simple_parquet_stat {
648        ($variant:expr, $value:expr) => {
649            $variant(ValueStatistics::new(
650                Some($value),
651                Some($value),
652                None,
653                Some(0),
654                false,
655            ))
656        };
657    }
658
659    #[test]
660    fn test_stats_scalar_serialization() {
661        let cases = &[
662            (
663                simple_parquet_stat!(Statistics::Boolean, true),
664                Some(LogicalType::Integer {
665                    bit_width: 1,
666                    is_signed: true,
667                }),
668                Value::Bool(true),
669            ),
670            (
671                simple_parquet_stat!(Statistics::Int32, 1),
672                Some(LogicalType::Integer {
673                    bit_width: 32,
674                    is_signed: true,
675                }),
676                Value::from(1),
677            ),
678            (
679                simple_parquet_stat!(Statistics::Int32, 1234),
680                Some(LogicalType::Decimal {
681                    scale: 3,
682                    precision: 4,
683                }),
684                Value::from(1.234),
685            ),
686            (
687                simple_parquet_stat!(Statistics::Int32, 1234),
688                Some(LogicalType::Decimal {
689                    scale: -1,
690                    precision: 4,
691                }),
692                Value::from(12340.0),
693            ),
694            (
695                simple_parquet_stat!(Statistics::Int32, 1234),
696                Some(LogicalType::Decimal {
697                    scale: 0,
698                    precision: 4,
699                }),
700                Value::from(1234),
701            ),
702            (
703                simple_parquet_stat!(Statistics::Int32, 10561),
704                Some(LogicalType::Date),
705                Value::from("1998-12-01"),
706            ),
707            (
708                simple_parquet_stat!(Statistics::Int64, 1641040496789123456),
709                Some(LogicalType::Timestamp {
710                    is_adjusted_to_u_t_c: true,
711                    unit: parquet::basic::TimeUnit::NANOS,
712                }),
713                Value::from("2022-01-01T12:34:56.789123456Z"),
714            ),
715            (
716                simple_parquet_stat!(Statistics::Int64, 1641040496789123),
717                Some(LogicalType::Timestamp {
718                    is_adjusted_to_u_t_c: true,
719                    unit: parquet::basic::TimeUnit::MICROS,
720                }),
721                Value::from("2022-01-01T12:34:56.789123Z"),
722            ),
723            (
724                simple_parquet_stat!(Statistics::Int64, 1641040496789),
725                Some(LogicalType::Timestamp {
726                    is_adjusted_to_u_t_c: true,
727                    unit: parquet::basic::TimeUnit::MILLIS,
728                }),
729                Value::from("2022-01-01T12:34:56.789Z"),
730            ),
731            (
732                simple_parquet_stat!(Statistics::Int64, 1234),
733                Some(LogicalType::Decimal {
734                    scale: 3,
735                    precision: 4,
736                }),
737                Value::from(1.234),
738            ),
739            (
740                simple_parquet_stat!(Statistics::Int64, 1234),
741                Some(LogicalType::Decimal {
742                    scale: -1,
743                    precision: 4,
744                }),
745                Value::from(12340.0),
746            ),
747            (
748                simple_parquet_stat!(Statistics::Int64, 1234),
749                Some(LogicalType::Decimal {
750                    scale: 0,
751                    precision: 4,
752                }),
753                Value::from(1234),
754            ),
755            (
756                simple_parquet_stat!(Statistics::Int64, 1234),
757                None,
758                Value::from(1234),
759            ),
760            (
761                simple_parquet_stat!(Statistics::ByteArray, ByteArray::from(b"hello".to_vec())),
762                Some(LogicalType::String),
763                Value::from("hello"),
764            ),
765            (
766                simple_parquet_stat!(Statistics::ByteArray, ByteArray::from(b"\x00\\".to_vec())),
767                None,
768                Value::from("\\x00\\\\"),
769            ),
770            (
771                simple_parquet_stat!(
772                    Statistics::FixedLenByteArray,
773                    FixedLenByteArray::from(1243124142314423i128.to_be_bytes().to_vec())
774                ),
775                Some(LogicalType::Decimal {
776                    scale: 3,
777                    precision: 16,
778                }),
779                Value::from(1243124142314.423),
780            ),
781            (
782                simple_parquet_stat!(
783                    Statistics::FixedLenByteArray,
784                    FixedLenByteArray::from(vec![0, 39, 16])
785                ),
786                Some(LogicalType::Decimal {
787                    scale: 3,
788                    precision: 5,
789                }),
790                Value::from(10.0),
791            ),
792            (
793                simple_parquet_stat!(
794                    Statistics::FixedLenByteArray,
795                    FixedLenByteArray::from(1234i128.to_be_bytes().to_vec())
796                ),
797                Some(LogicalType::Decimal {
798                    scale: 0,
799                    precision: 4,
800                }),
801                Value::from(1234),
802            ),
803            (
804                simple_parquet_stat!(
805                    Statistics::FixedLenByteArray,
806                    FixedLenByteArray::from(vec![
807                        75, 59, 76, 168, 90, 134, 196, 122, 9, 138, 34, 63, 255, 255, 255, 255
808                    ])
809                ),
810                Some(LogicalType::Decimal {
811                    scale: 6,
812                    precision: 38,
813                }),
814                Value::from(9.999999999999999e31),
815            ),
816            (
817                simple_parquet_stat!(
818                    Statistics::FixedLenByteArray,
819                    FixedLenByteArray::from(vec![
820                        180, 196, 179, 87, 165, 121, 59, 133, 246, 117, 221, 192, 0, 0, 0, 1
821                    ])
822                ),
823                Some(LogicalType::Decimal {
824                    scale: 6,
825                    precision: 38,
826                }),
827                Value::from(-9.999999999999999e31),
828            ),
829            (
830                simple_parquet_stat!(
831                    Statistics::FixedLenByteArray,
832                    FixedLenByteArray::from(
833                        [
834                            0xc2, 0xe8, 0xc7, 0xf7, 0xd1, 0xf9, 0x4b, 0x49, 0xa5, 0xd9, 0x4b, 0xfe,
835                            0x75, 0xc3, 0x17, 0xe2
836                        ]
837                        .to_vec()
838                    )
839                ),
840                Some(LogicalType::Uuid),
841                Value::from("c2e8c7f7-d1f9-4b49-a5d9-4bfe75c317e2"),
842            ),
843        ];
844
845        for (stats, logical_type, expected) in cases {
846            let scalar = StatsScalar::try_from_stats(stats, logical_type.as_ref(), true).unwrap();
847            let actual = serde_json::Value::from(scalar);
848            assert_eq!(&actual, expected);
849        }
850    }
851
852    #[tokio::test]
853    async fn test_delta_stats() {
854        let temp_dir = tempfile::tempdir().unwrap();
855        let table_path = temp_dir.path();
856        create_temp_table(table_path);
857
858        let table_uri = Url::from_directory_path(table_path).unwrap();
859        let table = load_table(&table_uri, HashMap::new()).await.unwrap();
860
861        let mut writer = RecordBatchWriter::for_table(&table).unwrap();
862        writer = writer.with_writer_properties(
863            WriterProperties::builder()
864                .set_compression(Compression::SNAPPY)
865                .set_max_row_group_size(128)
866                .build(),
867        );
868
869        let arrow_schema = writer.arrow_schema();
870        let batch = record_batch_from_message(arrow_schema, JSON_ROWS.clone().as_ref()).unwrap();
871
872        writer.write(batch).await.unwrap();
873        let add = writer.flush().await.unwrap();
874        assert_eq!(add.len(), 1);
875        let stats = add[0].get_stats().unwrap().unwrap();
876
877        let min_max_keys = vec!["meta", "some_int", "some_string", "some_bool", "uuid"];
878        let mut null_count_keys = vec!["some_list", "some_nested_list"];
879        null_count_keys.extend_from_slice(min_max_keys.as_slice());
880
881        assert_eq!(
882            min_max_keys.len(),
883            stats.min_values.len(),
884            "min values don't match"
885        );
886        assert_eq!(
887            min_max_keys.len(),
888            stats.max_values.len(),
889            "max values don't match"
890        );
891        assert_eq!(
892            null_count_keys.len(),
893            stats.null_count.len(),
894            "null counts don't match"
895        );
896
897        // assert on min values
898        for (k, v) in stats.min_values.iter() {
899            match (k.as_str(), v) {
900                ("meta", ColumnValueStat::Column(map)) => {
901                    assert_eq!(2, map.len());
902
903                    let kafka = map.get("kafka").unwrap().as_column().unwrap();
904                    assert_eq!(3, kafka.len());
905                    let partition = kafka.get("partition").unwrap().as_value().unwrap();
906                    assert_eq!(0, partition.as_i64().unwrap());
907
908                    let producer = map.get("producer").unwrap().as_column().unwrap();
909                    assert_eq!(1, producer.len());
910                    let timestamp = producer.get("timestamp").unwrap().as_value().unwrap();
911                    assert_eq!("2021-06-22", timestamp.as_str().unwrap());
912                }
913                ("some_int", ColumnValueStat::Value(v)) => assert_eq!(302, v.as_i64().unwrap()),
914                ("some_bool", ColumnValueStat::Value(v)) => assert!(!v.as_bool().unwrap()),
915                ("some_string", ColumnValueStat::Value(v)) => {
916                    assert_eq!("GET", v.as_str().unwrap())
917                }
918                ("date", ColumnValueStat::Value(v)) => {
919                    assert_eq!("2021-06-22", v.as_str().unwrap())
920                }
921                ("uuid", ColumnValueStat::Value(v)) => {
922                    assert_eq!("176c770d-92af-4a21-bf76-5d8c5261d659", v.as_str().unwrap())
923                }
924                k => panic!("Key {k:?} should not be present in min_values"),
925            }
926        }
927
928        // assert on max values
929        for (k, v) in stats.max_values.iter() {
930            match (k.as_str(), v) {
931                ("meta", ColumnValueStat::Column(map)) => {
932                    assert_eq!(2, map.len());
933
934                    let kafka = map.get("kafka").unwrap().as_column().unwrap();
935                    assert_eq!(3, kafka.len());
936                    let partition = kafka.get("partition").unwrap().as_value().unwrap();
937                    assert_eq!(1, partition.as_i64().unwrap());
938
939                    let producer = map.get("producer").unwrap().as_column().unwrap();
940                    assert_eq!(1, producer.len());
941                    let timestamp = producer.get("timestamp").unwrap().as_value().unwrap();
942                    assert_eq!("2021-06-22", timestamp.as_str().unwrap());
943                }
944                ("some_int", ColumnValueStat::Value(v)) => assert_eq!(400, v.as_i64().unwrap()),
945                ("some_bool", ColumnValueStat::Value(v)) => assert!(v.as_bool().unwrap()),
946                ("some_string", ColumnValueStat::Value(v)) => {
947                    assert_eq!("PUT", v.as_str().unwrap())
948                }
949                ("date", ColumnValueStat::Value(v)) => {
950                    assert_eq!("2021-06-22", v.as_str().unwrap())
951                }
952                ("uuid", ColumnValueStat::Value(v)) => {
953                    assert_eq!("a98bea04-d119-4f21-8edc-eb218b5849af", v.as_str().unwrap())
954                }
955                k => panic!("Key {k:?} should not be present in max_values"),
956            }
957        }
958
959        // assert on null count
960        for (k, v) in stats.null_count.iter() {
961            match (k.as_str(), v) {
962                ("meta", ColumnCountStat::Column(map)) => {
963                    assert_eq!(2, map.len());
964
965                    let kafka = map.get("kafka").unwrap().as_column().unwrap();
966                    assert_eq!(3, kafka.len());
967                    let partition = kafka.get("partition").unwrap().as_value().unwrap();
968                    assert_eq!(0, partition);
969
970                    let producer = map.get("producer").unwrap().as_column().unwrap();
971                    assert_eq!(1, producer.len());
972                    let timestamp = producer.get("timestamp").unwrap().as_value().unwrap();
973                    assert_eq!(0, timestamp);
974                }
975                ("some_int", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
976                ("some_bool", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
977                ("some_string", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
978                ("some_list", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
979                ("some_nested_list", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
980                ("date", ColumnCountStat::Value(v)) => assert_eq!(0, *v),
981                ("uuid", ColumnCountStat::Value(v)) => assert_eq!(0, *v),
982                k => panic!("Key {k:?} should not be present in null_count"),
983            }
984        }
985    }
986
987    async fn load_table(
988        table_url: &Url,
989        options: HashMap<String, String>,
990    ) -> Result<DeltaTable, DeltaTableError> {
991        DeltaTableBuilder::from_url(table_url.clone())?
992            .with_storage_options(options)
993            .load()
994            .await
995    }
996
997    fn create_temp_table(table_path: &Path) {
998        let log_path = table_path.join("_delta_log");
999
1000        std::fs::create_dir(log_path.as_path()).unwrap();
1001        std::fs::write(
1002            log_path.join("00000000000000000000.json"),
1003            V0_COMMIT.as_str(),
1004        )
1005        .unwrap();
1006    }
1007
1008    static SCHEMA: LazyLock<Value> = LazyLock::new(|| {
1009        json!({
1010            "type": "struct",
1011            "fields": [
1012                {
1013                    "name": "meta",
1014                    "type": {
1015                        "type": "struct",
1016                        "fields": [
1017                            {
1018                                "name": "kafka",
1019                                "type": {
1020                                    "type": "struct",
1021                                    "fields": [
1022                                        {
1023                                            "name": "topic",
1024                                            "type": "string",
1025                                            "nullable": true, "metadata": {}
1026                                        },
1027                                        {
1028                                            "name": "partition",
1029                                            "type": "integer",
1030                                            "nullable": true, "metadata": {}
1031                                        },
1032                                        {
1033                                            "name": "offset",
1034                                            "type": "long",
1035                                            "nullable": true, "metadata": {}
1036                                        }
1037                                    ],
1038                                },
1039                                "nullable": true, "metadata": {}
1040                            },
1041                            {
1042                                "name": "producer",
1043                                "type": {
1044                                    "type": "struct",
1045                                    "fields": [
1046                                        {
1047                                            "name": "timestamp",
1048                                            "type": "string",
1049                                            "nullable": true, "metadata": {}
1050                                        }
1051                                    ],
1052                                },
1053                                "nullable": true, "metadata": {}
1054                            }
1055                        ]
1056                    },
1057                    "nullable": true, "metadata": {}
1058                },
1059                { "name": "some_string", "type": "string", "nullable": true, "metadata": {} },
1060                { "name": "some_int", "type": "integer", "nullable": true, "metadata": {} },
1061                { "name": "some_bool", "type": "boolean", "nullable": true, "metadata": {} },
1062                {
1063                    "name": "some_list",
1064                    "type": {
1065                        "type": "array",
1066                        "elementType": "string",
1067                        "containsNull": true
1068                    },
1069                    "nullable": true, "metadata": {}
1070                },
1071                {
1072                    "name": "some_nested_list",
1073                    "type": {
1074                        "type": "array",
1075                        "elementType": {
1076                            "type": "array",
1077                            "elementType": "integer",
1078                            "containsNull": true
1079                        },
1080                        "containsNull": true
1081                    },
1082                    "nullable": true, "metadata": {}
1083               },
1084               { "name": "date", "type": "string", "nullable": true, "metadata": {} },
1085               { "name": "uuid", "type": "string", "nullable": true, "metadata": {} },
1086            ]
1087        })
1088    });
1089    static V0_COMMIT: LazyLock<String> = LazyLock::new(|| {
1090        let schema_string = serde_json::to_string(&SCHEMA.clone()).unwrap();
1091        let jsons = [
1092            json!({
1093                "protocol":{"minReaderVersion":1,"minWriterVersion":2}
1094            }),
1095            json!({
1096                "metaData": {
1097                    "id": "22ef18ba-191c-4c36-a606-3dad5cdf3830",
1098                    "format": {
1099                        "provider": "parquet", "options": {}
1100                    },
1101                    "schemaString": schema_string,
1102                    "partitionColumns": ["date"], "configuration": {}, "createdTime": 1564524294376i64
1103                }
1104            }),
1105        ];
1106
1107        jsons
1108            .iter()
1109            .map(|j| serde_json::to_string(j).unwrap())
1110            .collect::<Vec<String>>()
1111            .join("\n")
1112    });
1113    static JSON_ROWS: LazyLock<Vec<Value>> = LazyLock::new(|| {
1114        std::iter::repeat_n(
1115            json!({
1116                "meta": {
1117                    "kafka": {
1118                        "offset": 0,
1119                        "partition": 0,
1120                        "topic": "some_topic"
1121                    },
1122                    "producer": {
1123                        "timestamp": "2021-06-22"
1124                    },
1125                },
1126                "some_string": "GET",
1127                "some_int": 302,
1128                "some_bool": true,
1129                "some_list": ["a", "b", "c"],
1130                "some_nested_list": [[42], [84]],
1131                "date": "2021-06-22",
1132                "uuid": "176c770d-92af-4a21-bf76-5d8c5261d659",
1133            }),
1134            100,
1135        )
1136        .chain(std::iter::repeat_n(
1137            json!({
1138                "meta": {
1139                    "kafka": {
1140                        "offset": 100,
1141                        "partition": 1,
1142                        "topic": "another_topic"
1143                    },
1144                    "producer": {
1145                        "timestamp": "2021-06-22"
1146                    },
1147                },
1148                "some_string": "PUT",
1149                "some_int": 400,
1150                "some_bool": false,
1151                "some_list": ["x", "y", "z"],
1152                "some_nested_list": [[42], [84]],
1153                "date": "2021-06-22",
1154                "uuid": "54f3e867-3f7b-4122-a452-9d74fb4fe1ba",
1155            }),
1156            100,
1157        ))
1158        .chain(std::iter::repeat_n(
1159            json!({
1160                "meta": {
1161                    "kafka": {
1162                        "offset": 0,
1163                        "partition": 0,
1164                        "topic": "some_topic"
1165                    },
1166                    "producer": {
1167                        "timestamp": "2021-06-22"
1168                    },
1169                },
1170                "some_nested_list": [[42], null],
1171                "date": "2021-06-22",
1172                "uuid": "a98bea04-d119-4f21-8edc-eb218b5849af",
1173            }),
1174            100,
1175        ))
1176        .collect()
1177    });
1178}