1use std::cmp::min;
2use std::ops::Not;
3use std::sync::Arc;
4use std::time::{SystemTime, UNIX_EPOCH};
5use std::{collections::HashMap, ops::AddAssign};
6
7use delta_kernel::expressions::Scalar;
8use delta_kernel::table_properties::DataSkippingNumIndexedCols;
9use indexmap::IndexMap;
10use itertools::Itertools;
11use parquet::basic::LogicalType;
12use parquet::basic::Type;
13use parquet::file::metadata::ParquetMetaData;
14use parquet::schema::types::{ColumnDescriptor, SchemaDescriptor};
15use parquet::{
16 basic::TimeUnit,
17 file::{metadata::RowGroupMetaData, statistics::Statistics},
18};
19use tracing::warn;
20
21use super::*;
22use crate::kernel::{Add, scalars::ScalarExt};
23use crate::protocol::{ColumnValueStat, Stats};
24
25pub(crate) fn create_add(
27 partition_values: &IndexMap<String, Scalar>,
28 path: String,
29 size: i64,
30 file_metadata: &ParquetMetaData,
31 num_indexed_cols: DataSkippingNumIndexedCols,
32 stats_columns: &Option<Vec<impl AsRef<str>>>,
33) -> Result<Add, DeltaTableError> {
34 let stats = stats_from_file_metadata(
35 partition_values,
36 file_metadata,
37 num_indexed_cols,
38 stats_columns,
39 )?;
40 let stats_string = serde_json::to_string(&stats)?;
41
42 let modification_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
45 let modification_time = modification_time.as_millis() as i64;
46
47 Ok(Add {
48 path,
49 size,
50 partition_values: partition_values
51 .iter()
52 .map(|(k, v)| {
53 (
54 k.clone(),
55 if v.is_null() {
56 None
57 } else {
58 Some(v.serialize())
59 },
60 )
61 })
62 .collect(),
63 modification_time,
64 data_change: true,
65 stats: Some(stats_string),
66 tags: None,
67 deletion_vector: None,
68 base_row_id: None,
69 default_row_commit_version: None,
70 clustering_provider: None,
71 })
72}
73
74pub(crate) fn stats_from_parquet_metadata(
81 partition_values: &IndexMap<String, Scalar>,
82 parquet_metadata: &ParquetMetaData,
83 num_indexed_cols: DataSkippingNumIndexedCols,
84 stats_columns: &Option<Vec<String>>,
85) -> Result<Stats, DeltaWriterError> {
86 let num_rows = parquet_metadata.file_metadata().num_rows();
87 let schema_descriptor = parquet_metadata.file_metadata().schema_descr_ptr();
88 let row_group_metadata = parquet_metadata.row_groups().to_vec();
89
90 stats_from_metadata(
91 partition_values,
92 schema_descriptor,
93 row_group_metadata,
94 num_rows,
95 num_indexed_cols,
96 stats_columns,
97 )
98}
99
100fn stats_from_file_metadata(
101 partition_values: &IndexMap<String, Scalar>,
102 file_metadata: &ParquetMetaData,
103 num_indexed_cols: DataSkippingNumIndexedCols,
104 stats_columns: &Option<Vec<impl AsRef<str>>>,
105) -> Result<Stats, DeltaWriterError> {
106 let schema_descriptor = file_metadata.file_metadata().schema_descr();
107
108 let row_group_metadata: Vec<RowGroupMetaData> = file_metadata.row_groups().to_vec();
109
110 stats_from_metadata(
111 partition_values,
112 Arc::new(schema_descriptor.clone()),
113 row_group_metadata,
114 file_metadata.file_metadata().num_rows(),
115 num_indexed_cols,
116 stats_columns,
117 )
118}
119
120fn stats_from_metadata(
121 partition_values: &IndexMap<String, Scalar>,
122 schema_descriptor: Arc<SchemaDescriptor>,
123 row_group_metadata: Vec<RowGroupMetaData>,
124 num_rows: i64,
125 num_indexed_cols: DataSkippingNumIndexedCols,
126 stats_columns: &Option<Vec<impl AsRef<str>>>,
127) -> Result<Stats, DeltaWriterError> {
128 let mut min_values: HashMap<String, ColumnValueStat> = HashMap::new();
129 let mut max_values: HashMap<String, ColumnValueStat> = HashMap::new();
130 let mut null_count: HashMap<String, ColumnCountStat> = HashMap::new();
131 let dialect = sqlparser::dialect::GenericDialect {};
132
133 let idx_to_iterate = if let Some(stats_cols) = stats_columns {
134 let stats_cols = stats_cols
135 .iter()
136 .map(|v| {
137 match sqlparser::parser::Parser::new(&dialect)
138 .try_with_sql(v.as_ref())
139 .map_err(|e| DeltaTableError::generic(e.to_string()))?
140 .parse_multipart_identifier()
141 {
142 Ok(parts) => Ok(parts.into_iter().map(|v| v.value).join(".")),
143 Err(e) => Err(DeltaWriterError::DeltaTable(
144 DeltaTableError::GenericError {
145 source: Box::new(e),
146 },
147 )),
148 }
149 })
150 .collect::<Result<Vec<String>, DeltaWriterError>>()?;
151
152 schema_descriptor
153 .columns()
154 .iter()
155 .enumerate()
156 .filter_map(|(index, col)| {
157 if stats_cols.contains(&col.name().to_string()) {
158 Some(index)
159 } else {
160 None
161 }
162 })
163 .collect()
164 } else if num_indexed_cols == DataSkippingNumIndexedCols::AllColumns {
165 (0..schema_descriptor.num_columns()).collect::<Vec<_>>()
166 } else if let DataSkippingNumIndexedCols::NumColumns(n_cols) = num_indexed_cols {
167 (0..min(n_cols as usize, schema_descriptor.num_columns())).collect::<Vec<_>>()
168 } else {
169 return Err(DeltaWriterError::DeltaTable(DeltaTableError::Generic(
170 "delta.dataSkippingNumIndexedCols valid values are >=-1".to_string(),
171 )));
172 };
173
174 for idx in idx_to_iterate {
175 let column_descr = schema_descriptor.column(idx);
176
177 let column_path = column_descr.path();
178 let column_path_parts = column_path.parts();
179
180 if partition_values.contains_key(&column_path_parts[0]) {
182 continue;
183 }
184
185 let maybe_stats: Option<AggregatedStats> = row_group_metadata
186 .iter()
187 .flat_map(|g| {
188 g.column(idx).statistics().into_iter().filter_map(|s| {
189 let is_binary = matches!(&column_descr.physical_type(), Type::BYTE_ARRAY)
190 && matches!(column_descr.logical_type_ref(), Some(LogicalType::String))
191 .not();
192 if is_binary {
193 warn!(
194 "Skipping column {} because it's a binary field.",
195 &column_descr.name().to_string()
196 );
197 None
198 } else {
199 Some(AggregatedStats::from((s, column_descr.logical_type_ref())))
200 }
201 })
202 })
203 .reduce(|mut left, right| {
204 left += right;
205 left
206 });
207
208 if let Some(stats) = maybe_stats {
209 apply_min_max_for_column(
210 stats,
211 column_descr.clone(),
212 column_descr.path().parts(),
213 &mut min_values,
214 &mut max_values,
215 &mut null_count,
216 )?;
217 }
218 }
219
220 Ok(Stats {
221 min_values,
222 max_values,
223 num_records: num_rows,
224 null_count,
225 })
226}
227
228#[derive(Debug, Clone, PartialEq, PartialOrd)]
233enum StatsScalar {
234 Boolean(bool),
235 Int32(i32),
236 Int64(i64),
237 Float32(f32),
238 Float64(f64),
239 Date(chrono::NaiveDate),
240 Timestamp(chrono::NaiveDateTime),
241 Decimal { value: f64, scale: i32 },
244 String(String),
245 Bytes(Vec<u8>),
246 Uuid(uuid::Uuid),
247}
248
249impl StatsScalar {
250 fn try_from_stats(
251 stats: &Statistics,
252 logical_type: Option<&LogicalType>,
253 use_min: bool,
254 ) -> Result<Self, DeltaWriterError> {
255 macro_rules! get_stat {
256 ($val: expr) => {
257 if use_min {
258 *$val.min_opt().unwrap()
259 } else {
260 *$val.max_opt().unwrap()
261 }
262 };
263 }
264
265 match (stats, logical_type) {
266 (Statistics::Boolean(v), _) => Ok(Self::Boolean(get_stat!(v))),
267 (Statistics::Int32(v), Some(LogicalType::Date)) => {
269 let epoch_start = chrono::NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); let date = epoch_start + chrono::Duration::days(get_stat!(v) as i64);
271 Ok(Self::Date(date))
272 }
273 (Statistics::Int32(v), Some(LogicalType::Decimal { scale, .. })) => {
274 let val = get_stat!(v) as f64 / 10.0_f64.powi(*scale);
275 Ok(Self::Decimal {
277 value: val,
278 scale: *scale,
279 })
280 }
281 (Statistics::Int32(v), _) => Ok(Self::Int32(get_stat!(v))),
282 (Statistics::Int64(v), Some(LogicalType::Timestamp { unit, .. })) => {
284 let v = get_stat!(v);
288 let timestamp = match unit {
289 TimeUnit::MILLIS => chrono::DateTime::from_timestamp_millis(v),
290 TimeUnit::MICROS => chrono::DateTime::from_timestamp_micros(v),
291 TimeUnit::NANOS => {
292 let secs = v / 1_000_000_000;
293 let nanosecs = (v % 1_000_000_000) as u32;
294 chrono::DateTime::from_timestamp(secs, nanosecs)
295 }
296 };
297 let timestamp = timestamp.ok_or(DeltaWriterError::StatsParsingFailed {
298 debug_value: v.to_string(),
299 logical_type: logical_type.cloned(),
300 })?;
301 Ok(Self::Timestamp(timestamp.naive_utc()))
302 }
303 (Statistics::Int64(v), Some(LogicalType::Decimal { scale, .. })) => {
304 let val = get_stat!(v) as f64 / 10.0_f64.powi(*scale);
305 Ok(Self::Decimal {
307 value: val,
308 scale: *scale,
309 })
310 }
311 (Statistics::Int64(v), _) => Ok(Self::Int64(get_stat!(v))),
312 (Statistics::Float(v), _) => Ok(Self::Float32(get_stat!(v))),
313 (Statistics::Double(v), _) => Ok(Self::Float64(get_stat!(v))),
314 (Statistics::ByteArray(v), logical_type) => {
315 let bytes = if use_min {
316 v.min_bytes_opt()
317 } else {
318 v.max_bytes_opt()
319 }
320 .unwrap_or_default();
321 match logical_type {
322 None => Ok(Self::Bytes(bytes.to_vec())),
323 Some(LogicalType::String) => {
324 Ok(Self::String(String::from_utf8(bytes.to_vec()).map_err(
325 |_| DeltaWriterError::StatsParsingFailed {
326 debug_value: format!("{bytes:?}"),
327 logical_type: Some(LogicalType::String),
328 },
329 )?))
330 }
331 _ => Err(DeltaWriterError::StatsParsingFailed {
332 debug_value: format!("{bytes:?}"),
333 logical_type: logical_type.cloned(),
334 }),
335 }
336 }
337 (Statistics::FixedLenByteArray(v), Some(LogicalType::Decimal { scale, precision })) => {
338 let val = if use_min {
339 v.min_bytes_opt()
340 } else {
341 v.max_bytes_opt()
342 }
343 .unwrap_or_default();
344
345 let val = if val.len() <= 16 {
346 i128::from_be_bytes(sign_extend_be(val)) as f64
347 } else {
348 return Err(DeltaWriterError::StatsParsingFailed {
349 debug_value: format!("{val:?}"),
350 logical_type: Some(LogicalType::Decimal {
351 scale: *scale,
352 precision: *precision,
353 }),
354 });
355 };
356
357 let mut val = val / 10.0_f64.powi(*scale);
358
359 if val.is_normal()
360 && (val.trunc() as i128).to_string().len() > (precision - scale) as usize
361 {
362 val = f64::from_bits(val.to_bits() - 1);
365 }
366
367 Ok(Self::Decimal {
368 value: val,
369 scale: *scale,
370 })
371 }
372 (Statistics::FixedLenByteArray(v), Some(LogicalType::Uuid)) => {
373 let val = if use_min {
374 v.min_bytes_opt()
375 } else {
376 v.max_bytes_opt()
377 }
378 .unwrap_or_default();
379
380 if val.len() != 16 {
381 return Err(DeltaWriterError::StatsParsingFailed {
382 debug_value: format!("{val:?}"),
383 logical_type: Some(LogicalType::Uuid),
384 });
385 }
386
387 let mut bytes = [0; 16];
388 bytes.copy_from_slice(val);
389
390 let val = uuid::Uuid::from_bytes(bytes);
391 Ok(Self::Uuid(val))
392 }
393 (stats, _) => Err(DeltaWriterError::StatsParsingFailed {
394 debug_value: format!("{stats:?}"),
395 logical_type: logical_type.cloned(),
396 }),
397 }
398 }
399}
400
401pub fn sign_extend_be<const N: usize>(b: &[u8]) -> [u8; N] {
405 assert!(b.len() <= N, "Array too large, expected less than {N}");
406 let is_negative = (b[0] & 128u8) == 128u8;
407 let mut result = if is_negative { [255u8; N] } else { [0u8; N] };
408 for (d, s) in result.iter_mut().skip(N - b.len()).zip(b) {
409 *d = *s;
410 }
411 result
412}
413
414impl From<StatsScalar> for serde_json::Value {
415 fn from(scalar: StatsScalar) -> Self {
416 match scalar {
417 StatsScalar::Boolean(v) => serde_json::Value::Bool(v),
418 StatsScalar::Int32(v) => serde_json::Value::from(v),
419 StatsScalar::Int64(v) => serde_json::Value::from(v),
420 StatsScalar::Float32(v) => serde_json::Value::from(v),
421 StatsScalar::Float64(v) => serde_json::Value::from(v),
422 StatsScalar::Date(v) => serde_json::Value::from(v.format("%Y-%m-%d").to_string()),
423 StatsScalar::Timestamp(v) => {
424 serde_json::Value::from(v.format("%Y-%m-%dT%H:%M:%S%.fZ").to_string())
425 }
426 StatsScalar::Decimal { value, scale } => {
427 if scale == 0 {
430 serde_json::Value::from(value.round() as i64)
431 } else {
432 serde_json::Value::from(value)
433 }
434 }
435 StatsScalar::String(v) => serde_json::Value::from(v),
436 StatsScalar::Bytes(v) => {
437 let escaped_bytes = v
438 .into_iter()
439 .flat_map(std::ascii::escape_default)
440 .collect::<Vec<u8>>();
441 let escaped_string = String::from_utf8(escaped_bytes).unwrap();
442 serde_json::Value::from(escaped_string)
443 }
444 StatsScalar::Uuid(v) => serde_json::Value::from(v.hyphenated().to_string()),
445 }
446 }
447}
448
449struct AggregatedStats {
451 pub min: Option<StatsScalar>,
452 pub max: Option<StatsScalar>,
453 pub null_count: u64,
454}
455
456impl From<(&Statistics, Option<&LogicalType>)> for AggregatedStats {
457 fn from(value: (&Statistics, Option<&LogicalType>)) -> Self {
458 let (stats, logical_type) = value;
459 let null_count = stats.null_count_opt().unwrap_or_default();
460 if stats.min_bytes_opt().is_some() && stats.max_bytes_opt().is_some() {
461 let min = StatsScalar::try_from_stats(stats, logical_type, true).ok();
462 let max = StatsScalar::try_from_stats(stats, logical_type, false).ok();
463 Self {
464 min,
465 max,
466 null_count,
467 }
468 } else {
469 Self {
470 min: None,
471 max: None,
472 null_count,
473 }
474 }
475 }
476}
477
478impl AddAssign for AggregatedStats {
479 fn add_assign(&mut self, rhs: Self) {
480 self.min = match (self.min.take(), rhs.min) {
481 (Some(lhs), Some(rhs)) => {
482 if lhs < rhs {
483 Some(lhs)
484 } else {
485 Some(rhs)
486 }
487 }
488 (lhs, rhs) => lhs.or(rhs),
489 };
490 self.max = match (self.max.take(), rhs.max) {
491 (Some(lhs), Some(rhs)) => {
492 if lhs > rhs {
493 Some(lhs)
494 } else {
495 Some(rhs)
496 }
497 }
498 (lhs, rhs) => lhs.or(rhs),
499 };
500
501 self.null_count += rhs.null_count;
502 }
503}
504
505fn get_list_field_name(column_descr: &Arc<ColumnDescriptor>) -> Option<String> {
520 let max_rep_levels = column_descr.max_rep_level();
521 let column_path_parts = column_descr.path().parts();
522
523 if column_path_parts.len() > (2 * max_rep_levels + 1) as usize {
525 return None;
526 }
527
528 let mut column_path_parts = column_path_parts.to_vec();
529 let mut items_seen = 0;
530 let mut lists_seen = 0;
531 while let Some(part) = column_path_parts.pop() {
532 match (part.as_str(), lists_seen, items_seen) {
533 ("list", seen, _) if seen == max_rep_levels => return Some("list".to_string()),
534 ("element", _, seen) if seen == max_rep_levels => return Some("element".to_string()),
535 ("list", _, _) => lists_seen += 1,
536 ("element", _, _) => items_seen += 1,
537 (other, _, _) => return Some(other.to_string()),
538 }
539 }
540 None
541}
542
543fn apply_min_max_for_column(
544 statistics: AggregatedStats,
545 column_descr: Arc<ColumnDescriptor>,
546 column_path_parts: &[String],
547 min_values: &mut HashMap<String, ColumnValueStat>,
548 max_values: &mut HashMap<String, ColumnValueStat>,
549 null_counts: &mut HashMap<String, ColumnCountStat>,
550) -> Result<(), DeltaWriterError> {
551 if column_descr.max_rep_level() > 0 {
553 let key = get_list_field_name(&column_descr);
554
555 if let Some(key) = key {
556 null_counts.insert(key, ColumnCountStat::Value(statistics.null_count as i64));
557 }
558
559 return Ok(());
560 }
561
562 match (column_path_parts.len(), column_path_parts.first()) {
563 (1, _) => {
565 let key = column_descr.name().to_string();
566
567 if let Some(min) = statistics.min {
568 let min = ColumnValueStat::Value(min.into());
569 min_values.insert(key.clone(), min);
570 }
571
572 if let Some(max) = statistics.max {
573 let max = ColumnValueStat::Value(max.into());
574 max_values.insert(key.clone(), max);
575 }
576
577 null_counts.insert(key, ColumnCountStat::Value(statistics.null_count as i64));
578
579 Ok(())
580 }
581 (_, Some(key)) => {
583 let child_min_values = min_values
584 .entry(key.to_owned())
585 .or_insert_with(|| ColumnValueStat::Column(HashMap::new()));
586 let child_max_values = max_values
587 .entry(key.to_owned())
588 .or_insert_with(|| ColumnValueStat::Column(HashMap::new()));
589 let child_null_counts = null_counts
590 .entry(key.to_owned())
591 .or_insert_with(|| ColumnCountStat::Column(HashMap::new()));
592
593 match (child_min_values, child_max_values, child_null_counts) {
594 (
595 ColumnValueStat::Column(mins),
596 ColumnValueStat::Column(maxes),
597 ColumnCountStat::Column(null_counts),
598 ) => {
599 let remaining_parts: Vec<String> = column_path_parts
600 .iter()
601 .skip(1)
602 .map(|s| s.to_string())
603 .collect();
604
605 apply_min_max_for_column(
606 statistics,
607 column_descr,
608 remaining_parts.as_slice(),
609 mins,
610 maxes,
611 null_counts,
612 )?;
613
614 Ok(())
615 }
616 _ => {
617 unreachable!();
618 }
619 }
620 }
621 (_, None) => {
623 unreachable!();
624 }
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use super::utils::record_batch_from_message;
631 use super::*;
632 use crate::{
633 DeltaTable,
634 errors::DeltaTableError,
635 protocol::{ColumnCountStat, ColumnValueStat},
636 table::builder::DeltaTableBuilder,
637 };
638 use parquet::data_type::{ByteArray, FixedLenByteArray};
639 use parquet::file::statistics::ValueStatistics;
640 use parquet::{basic::Compression, file::properties::WriterProperties};
641 use serde_json::{Value, json};
642 use std::collections::HashMap;
643 use std::path::Path;
644 use std::sync::LazyLock;
645 use url::Url;
646
647 macro_rules! simple_parquet_stat {
648 ($variant:expr, $value:expr) => {
649 $variant(ValueStatistics::new(
650 Some($value),
651 Some($value),
652 None,
653 Some(0),
654 false,
655 ))
656 };
657 }
658
659 #[test]
660 fn test_stats_scalar_serialization() {
661 let cases = &[
662 (
663 simple_parquet_stat!(Statistics::Boolean, true),
664 Some(LogicalType::Integer {
665 bit_width: 1,
666 is_signed: true,
667 }),
668 Value::Bool(true),
669 ),
670 (
671 simple_parquet_stat!(Statistics::Int32, 1),
672 Some(LogicalType::Integer {
673 bit_width: 32,
674 is_signed: true,
675 }),
676 Value::from(1),
677 ),
678 (
679 simple_parquet_stat!(Statistics::Int32, 1234),
680 Some(LogicalType::Decimal {
681 scale: 3,
682 precision: 4,
683 }),
684 Value::from(1.234),
685 ),
686 (
687 simple_parquet_stat!(Statistics::Int32, 1234),
688 Some(LogicalType::Decimal {
689 scale: -1,
690 precision: 4,
691 }),
692 Value::from(12340.0),
693 ),
694 (
695 simple_parquet_stat!(Statistics::Int32, 1234),
696 Some(LogicalType::Decimal {
697 scale: 0,
698 precision: 4,
699 }),
700 Value::from(1234),
701 ),
702 (
703 simple_parquet_stat!(Statistics::Int32, 10561),
704 Some(LogicalType::Date),
705 Value::from("1998-12-01"),
706 ),
707 (
708 simple_parquet_stat!(Statistics::Int64, 1641040496789123456),
709 Some(LogicalType::Timestamp {
710 is_adjusted_to_u_t_c: true,
711 unit: parquet::basic::TimeUnit::NANOS,
712 }),
713 Value::from("2022-01-01T12:34:56.789123456Z"),
714 ),
715 (
716 simple_parquet_stat!(Statistics::Int64, 1641040496789123),
717 Some(LogicalType::Timestamp {
718 is_adjusted_to_u_t_c: true,
719 unit: parquet::basic::TimeUnit::MICROS,
720 }),
721 Value::from("2022-01-01T12:34:56.789123Z"),
722 ),
723 (
724 simple_parquet_stat!(Statistics::Int64, 1641040496789),
725 Some(LogicalType::Timestamp {
726 is_adjusted_to_u_t_c: true,
727 unit: parquet::basic::TimeUnit::MILLIS,
728 }),
729 Value::from("2022-01-01T12:34:56.789Z"),
730 ),
731 (
732 simple_parquet_stat!(Statistics::Int64, 1234),
733 Some(LogicalType::Decimal {
734 scale: 3,
735 precision: 4,
736 }),
737 Value::from(1.234),
738 ),
739 (
740 simple_parquet_stat!(Statistics::Int64, 1234),
741 Some(LogicalType::Decimal {
742 scale: -1,
743 precision: 4,
744 }),
745 Value::from(12340.0),
746 ),
747 (
748 simple_parquet_stat!(Statistics::Int64, 1234),
749 Some(LogicalType::Decimal {
750 scale: 0,
751 precision: 4,
752 }),
753 Value::from(1234),
754 ),
755 (
756 simple_parquet_stat!(Statistics::Int64, 1234),
757 None,
758 Value::from(1234),
759 ),
760 (
761 simple_parquet_stat!(Statistics::ByteArray, ByteArray::from(b"hello".to_vec())),
762 Some(LogicalType::String),
763 Value::from("hello"),
764 ),
765 (
766 simple_parquet_stat!(Statistics::ByteArray, ByteArray::from(b"\x00\\".to_vec())),
767 None,
768 Value::from("\\x00\\\\"),
769 ),
770 (
771 simple_parquet_stat!(
772 Statistics::FixedLenByteArray,
773 FixedLenByteArray::from(1243124142314423i128.to_be_bytes().to_vec())
774 ),
775 Some(LogicalType::Decimal {
776 scale: 3,
777 precision: 16,
778 }),
779 Value::from(1243124142314.423),
780 ),
781 (
782 simple_parquet_stat!(
783 Statistics::FixedLenByteArray,
784 FixedLenByteArray::from(vec![0, 39, 16])
785 ),
786 Some(LogicalType::Decimal {
787 scale: 3,
788 precision: 5,
789 }),
790 Value::from(10.0),
791 ),
792 (
793 simple_parquet_stat!(
794 Statistics::FixedLenByteArray,
795 FixedLenByteArray::from(1234i128.to_be_bytes().to_vec())
796 ),
797 Some(LogicalType::Decimal {
798 scale: 0,
799 precision: 4,
800 }),
801 Value::from(1234),
802 ),
803 (
804 simple_parquet_stat!(
805 Statistics::FixedLenByteArray,
806 FixedLenByteArray::from(vec![
807 75, 59, 76, 168, 90, 134, 196, 122, 9, 138, 34, 63, 255, 255, 255, 255
808 ])
809 ),
810 Some(LogicalType::Decimal {
811 scale: 6,
812 precision: 38,
813 }),
814 Value::from(9.999999999999999e31),
815 ),
816 (
817 simple_parquet_stat!(
818 Statistics::FixedLenByteArray,
819 FixedLenByteArray::from(vec![
820 180, 196, 179, 87, 165, 121, 59, 133, 246, 117, 221, 192, 0, 0, 0, 1
821 ])
822 ),
823 Some(LogicalType::Decimal {
824 scale: 6,
825 precision: 38,
826 }),
827 Value::from(-9.999999999999999e31),
828 ),
829 (
830 simple_parquet_stat!(
831 Statistics::FixedLenByteArray,
832 FixedLenByteArray::from(
833 [
834 0xc2, 0xe8, 0xc7, 0xf7, 0xd1, 0xf9, 0x4b, 0x49, 0xa5, 0xd9, 0x4b, 0xfe,
835 0x75, 0xc3, 0x17, 0xe2
836 ]
837 .to_vec()
838 )
839 ),
840 Some(LogicalType::Uuid),
841 Value::from("c2e8c7f7-d1f9-4b49-a5d9-4bfe75c317e2"),
842 ),
843 ];
844
845 for (stats, logical_type, expected) in cases {
846 let scalar = StatsScalar::try_from_stats(stats, logical_type.as_ref(), true).unwrap();
847 let actual = serde_json::Value::from(scalar);
848 assert_eq!(&actual, expected);
849 }
850 }
851
852 #[tokio::test]
853 async fn test_delta_stats() {
854 let temp_dir = tempfile::tempdir().unwrap();
855 let table_path = temp_dir.path();
856 create_temp_table(table_path);
857
858 let table_uri = Url::from_directory_path(table_path).unwrap();
859 let table = load_table(&table_uri, HashMap::new()).await.unwrap();
860
861 let mut writer = RecordBatchWriter::for_table(&table).unwrap();
862 writer = writer.with_writer_properties(
863 WriterProperties::builder()
864 .set_compression(Compression::SNAPPY)
865 .set_max_row_group_size(128)
866 .build(),
867 );
868
869 let arrow_schema = writer.arrow_schema();
870 let batch = record_batch_from_message(arrow_schema, JSON_ROWS.clone().as_ref()).unwrap();
871
872 writer.write(batch).await.unwrap();
873 let add = writer.flush().await.unwrap();
874 assert_eq!(add.len(), 1);
875 let stats = add[0].get_stats().unwrap().unwrap();
876
877 let min_max_keys = vec!["meta", "some_int", "some_string", "some_bool", "uuid"];
878 let mut null_count_keys = vec!["some_list", "some_nested_list"];
879 null_count_keys.extend_from_slice(min_max_keys.as_slice());
880
881 assert_eq!(
882 min_max_keys.len(),
883 stats.min_values.len(),
884 "min values don't match"
885 );
886 assert_eq!(
887 min_max_keys.len(),
888 stats.max_values.len(),
889 "max values don't match"
890 );
891 assert_eq!(
892 null_count_keys.len(),
893 stats.null_count.len(),
894 "null counts don't match"
895 );
896
897 for (k, v) in stats.min_values.iter() {
899 match (k.as_str(), v) {
900 ("meta", ColumnValueStat::Column(map)) => {
901 assert_eq!(2, map.len());
902
903 let kafka = map.get("kafka").unwrap().as_column().unwrap();
904 assert_eq!(3, kafka.len());
905 let partition = kafka.get("partition").unwrap().as_value().unwrap();
906 assert_eq!(0, partition.as_i64().unwrap());
907
908 let producer = map.get("producer").unwrap().as_column().unwrap();
909 assert_eq!(1, producer.len());
910 let timestamp = producer.get("timestamp").unwrap().as_value().unwrap();
911 assert_eq!("2021-06-22", timestamp.as_str().unwrap());
912 }
913 ("some_int", ColumnValueStat::Value(v)) => assert_eq!(302, v.as_i64().unwrap()),
914 ("some_bool", ColumnValueStat::Value(v)) => assert!(!v.as_bool().unwrap()),
915 ("some_string", ColumnValueStat::Value(v)) => {
916 assert_eq!("GET", v.as_str().unwrap())
917 }
918 ("date", ColumnValueStat::Value(v)) => {
919 assert_eq!("2021-06-22", v.as_str().unwrap())
920 }
921 ("uuid", ColumnValueStat::Value(v)) => {
922 assert_eq!("176c770d-92af-4a21-bf76-5d8c5261d659", v.as_str().unwrap())
923 }
924 k => panic!("Key {k:?} should not be present in min_values"),
925 }
926 }
927
928 for (k, v) in stats.max_values.iter() {
930 match (k.as_str(), v) {
931 ("meta", ColumnValueStat::Column(map)) => {
932 assert_eq!(2, map.len());
933
934 let kafka = map.get("kafka").unwrap().as_column().unwrap();
935 assert_eq!(3, kafka.len());
936 let partition = kafka.get("partition").unwrap().as_value().unwrap();
937 assert_eq!(1, partition.as_i64().unwrap());
938
939 let producer = map.get("producer").unwrap().as_column().unwrap();
940 assert_eq!(1, producer.len());
941 let timestamp = producer.get("timestamp").unwrap().as_value().unwrap();
942 assert_eq!("2021-06-22", timestamp.as_str().unwrap());
943 }
944 ("some_int", ColumnValueStat::Value(v)) => assert_eq!(400, v.as_i64().unwrap()),
945 ("some_bool", ColumnValueStat::Value(v)) => assert!(v.as_bool().unwrap()),
946 ("some_string", ColumnValueStat::Value(v)) => {
947 assert_eq!("PUT", v.as_str().unwrap())
948 }
949 ("date", ColumnValueStat::Value(v)) => {
950 assert_eq!("2021-06-22", v.as_str().unwrap())
951 }
952 ("uuid", ColumnValueStat::Value(v)) => {
953 assert_eq!("a98bea04-d119-4f21-8edc-eb218b5849af", v.as_str().unwrap())
954 }
955 k => panic!("Key {k:?} should not be present in max_values"),
956 }
957 }
958
959 for (k, v) in stats.null_count.iter() {
961 match (k.as_str(), v) {
962 ("meta", ColumnCountStat::Column(map)) => {
963 assert_eq!(2, map.len());
964
965 let kafka = map.get("kafka").unwrap().as_column().unwrap();
966 assert_eq!(3, kafka.len());
967 let partition = kafka.get("partition").unwrap().as_value().unwrap();
968 assert_eq!(0, partition);
969
970 let producer = map.get("producer").unwrap().as_column().unwrap();
971 assert_eq!(1, producer.len());
972 let timestamp = producer.get("timestamp").unwrap().as_value().unwrap();
973 assert_eq!(0, timestamp);
974 }
975 ("some_int", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
976 ("some_bool", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
977 ("some_string", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
978 ("some_list", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
979 ("some_nested_list", ColumnCountStat::Value(v)) => assert_eq!(100, *v),
980 ("date", ColumnCountStat::Value(v)) => assert_eq!(0, *v),
981 ("uuid", ColumnCountStat::Value(v)) => assert_eq!(0, *v),
982 k => panic!("Key {k:?} should not be present in null_count"),
983 }
984 }
985 }
986
987 async fn load_table(
988 table_url: &Url,
989 options: HashMap<String, String>,
990 ) -> Result<DeltaTable, DeltaTableError> {
991 DeltaTableBuilder::from_url(table_url.clone())?
992 .with_storage_options(options)
993 .load()
994 .await
995 }
996
997 fn create_temp_table(table_path: &Path) {
998 let log_path = table_path.join("_delta_log");
999
1000 std::fs::create_dir(log_path.as_path()).unwrap();
1001 std::fs::write(
1002 log_path.join("00000000000000000000.json"),
1003 V0_COMMIT.as_str(),
1004 )
1005 .unwrap();
1006 }
1007
1008 static SCHEMA: LazyLock<Value> = LazyLock::new(|| {
1009 json!({
1010 "type": "struct",
1011 "fields": [
1012 {
1013 "name": "meta",
1014 "type": {
1015 "type": "struct",
1016 "fields": [
1017 {
1018 "name": "kafka",
1019 "type": {
1020 "type": "struct",
1021 "fields": [
1022 {
1023 "name": "topic",
1024 "type": "string",
1025 "nullable": true, "metadata": {}
1026 },
1027 {
1028 "name": "partition",
1029 "type": "integer",
1030 "nullable": true, "metadata": {}
1031 },
1032 {
1033 "name": "offset",
1034 "type": "long",
1035 "nullable": true, "metadata": {}
1036 }
1037 ],
1038 },
1039 "nullable": true, "metadata": {}
1040 },
1041 {
1042 "name": "producer",
1043 "type": {
1044 "type": "struct",
1045 "fields": [
1046 {
1047 "name": "timestamp",
1048 "type": "string",
1049 "nullable": true, "metadata": {}
1050 }
1051 ],
1052 },
1053 "nullable": true, "metadata": {}
1054 }
1055 ]
1056 },
1057 "nullable": true, "metadata": {}
1058 },
1059 { "name": "some_string", "type": "string", "nullable": true, "metadata": {} },
1060 { "name": "some_int", "type": "integer", "nullable": true, "metadata": {} },
1061 { "name": "some_bool", "type": "boolean", "nullable": true, "metadata": {} },
1062 {
1063 "name": "some_list",
1064 "type": {
1065 "type": "array",
1066 "elementType": "string",
1067 "containsNull": true
1068 },
1069 "nullable": true, "metadata": {}
1070 },
1071 {
1072 "name": "some_nested_list",
1073 "type": {
1074 "type": "array",
1075 "elementType": {
1076 "type": "array",
1077 "elementType": "integer",
1078 "containsNull": true
1079 },
1080 "containsNull": true
1081 },
1082 "nullable": true, "metadata": {}
1083 },
1084 { "name": "date", "type": "string", "nullable": true, "metadata": {} },
1085 { "name": "uuid", "type": "string", "nullable": true, "metadata": {} },
1086 ]
1087 })
1088 });
1089 static V0_COMMIT: LazyLock<String> = LazyLock::new(|| {
1090 let schema_string = serde_json::to_string(&SCHEMA.clone()).unwrap();
1091 let jsons = [
1092 json!({
1093 "protocol":{"minReaderVersion":1,"minWriterVersion":2}
1094 }),
1095 json!({
1096 "metaData": {
1097 "id": "22ef18ba-191c-4c36-a606-3dad5cdf3830",
1098 "format": {
1099 "provider": "parquet", "options": {}
1100 },
1101 "schemaString": schema_string,
1102 "partitionColumns": ["date"], "configuration": {}, "createdTime": 1564524294376i64
1103 }
1104 }),
1105 ];
1106
1107 jsons
1108 .iter()
1109 .map(|j| serde_json::to_string(j).unwrap())
1110 .collect::<Vec<String>>()
1111 .join("\n")
1112 });
1113 static JSON_ROWS: LazyLock<Vec<Value>> = LazyLock::new(|| {
1114 std::iter::repeat_n(
1115 json!({
1116 "meta": {
1117 "kafka": {
1118 "offset": 0,
1119 "partition": 0,
1120 "topic": "some_topic"
1121 },
1122 "producer": {
1123 "timestamp": "2021-06-22"
1124 },
1125 },
1126 "some_string": "GET",
1127 "some_int": 302,
1128 "some_bool": true,
1129 "some_list": ["a", "b", "c"],
1130 "some_nested_list": [[42], [84]],
1131 "date": "2021-06-22",
1132 "uuid": "176c770d-92af-4a21-bf76-5d8c5261d659",
1133 }),
1134 100,
1135 )
1136 .chain(std::iter::repeat_n(
1137 json!({
1138 "meta": {
1139 "kafka": {
1140 "offset": 100,
1141 "partition": 1,
1142 "topic": "another_topic"
1143 },
1144 "producer": {
1145 "timestamp": "2021-06-22"
1146 },
1147 },
1148 "some_string": "PUT",
1149 "some_int": 400,
1150 "some_bool": false,
1151 "some_list": ["x", "y", "z"],
1152 "some_nested_list": [[42], [84]],
1153 "date": "2021-06-22",
1154 "uuid": "54f3e867-3f7b-4122-a452-9d74fb4fe1ba",
1155 }),
1156 100,
1157 ))
1158 .chain(std::iter::repeat_n(
1159 json!({
1160 "meta": {
1161 "kafka": {
1162 "offset": 0,
1163 "partition": 0,
1164 "topic": "some_topic"
1165 },
1166 "producer": {
1167 "timestamp": "2021-06-22"
1168 },
1169 },
1170 "some_nested_list": [[42], null],
1171 "date": "2021-06-22",
1172 "uuid": "a98bea04-d119-4f21-8edc-eb218b5849af",
1173 }),
1174 100,
1175 ))
1176 .collect()
1177 });
1178}