1#![allow(non_camel_case_types)]
4
5use std::borrow::Borrow;
6use std::collections::HashMap;
7use std::hash::{Hash, Hasher};
8use std::mem::take;
9use std::str::FromStr;
10use std::sync::LazyLock;
11
12use arrow_schema::ArrowError;
13use futures::StreamExt;
14use object_store::{path::Path, Error as ObjectStoreError, ObjectStore};
15use regex::Regex;
16use serde::{Deserialize, Serialize};
17use serde_json::Value;
18use tracing::{debug, error};
19
20use crate::errors::{DeltaResult, DeltaTableError};
21use crate::kernel::{Add, CommitInfo, Metadata, Protocol, Remove, StructField, TableFeatures};
22use crate::logstore::LogStore;
23use crate::table::CheckPoint;
24
25pub mod checkpoints;
26mod parquet_read;
27mod time_utils;
28
29#[allow(missing_docs)]
31#[derive(thiserror::Error, Debug)]
32pub enum ProtocolError {
33 #[error("Table state does not contain metadata")]
34 NoMetaData,
35
36 #[error("Checkpoint file not found")]
37 CheckpointNotFound,
38
39 #[error("End of transaction log")]
40 EndOfLog,
41
42 #[error("Invalid action field: {0}")]
44 InvalidField(String),
45
46 #[error("Invalid action in parquet row: {0}")]
48 InvalidRow(String),
49
50 #[error("Invalid deletion vector storage type: {0}")]
52 InvalidDeletionVectorStorageType(String),
53
54 #[error("Generic action error: {0}")]
56 Generic(String),
57
58 #[error("Failed to parse parquet checkpoint: {source}")]
60 ParquetParseError {
61 #[from]
63 source: parquet::errors::ParquetError,
64 },
65
66 #[error("Failed to serialize operation: {source}")]
68 SerializeOperation {
69 #[from]
70 source: serde_json::Error,
72 },
73
74 #[error("Failed to convert into Arrow schema: {}", .source)]
76 Arrow {
77 #[from]
79 source: ArrowError,
80 },
81
82 #[error("ObjectStoreError: {source}")]
84 ObjectStore {
85 #[from]
87 source: ObjectStoreError,
88 },
89
90 #[error("Io: {source}")]
91 IO {
92 #[from]
93 source: std::io::Error,
94 },
95
96 #[error("Kernel: {source}")]
97 Kernel {
98 #[from]
99 source: crate::kernel::Error,
100 },
101}
102
103#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
105#[serde(untagged)]
106pub enum ColumnValueStat {
107 Column(HashMap<String, ColumnValueStat>),
109 Value(Value),
111}
112
113impl ColumnValueStat {
114 pub fn as_column(&self) -> Option<&HashMap<String, ColumnValueStat>> {
116 match self {
117 ColumnValueStat::Column(m) => Some(m),
118 _ => None,
119 }
120 }
121
122 pub fn as_value(&self) -> Option<&Value> {
124 match self {
125 ColumnValueStat::Value(v) => Some(v),
126 _ => None,
127 }
128 }
129}
130
131#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
133#[serde(untagged)]
134pub enum ColumnCountStat {
135 Column(HashMap<String, ColumnCountStat>),
137 Value(i64),
139}
140
141impl ColumnCountStat {
142 pub fn as_column(&self) -> Option<&HashMap<String, ColumnCountStat>> {
144 match self {
145 ColumnCountStat::Column(m) => Some(m),
146 _ => None,
147 }
148 }
149
150 pub fn as_value(&self) -> Option<i64> {
152 match self {
153 ColumnCountStat::Value(v) => Some(*v),
154 _ => None,
155 }
156 }
157}
158
159#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
161#[serde(rename_all = "camelCase")]
162pub struct Stats {
163 pub num_records: i64,
165
166 pub min_values: HashMap<String, ColumnValueStat>,
169 pub max_values: HashMap<String, ColumnValueStat>,
171 pub null_count: HashMap<String, ColumnCountStat>,
173}
174
175#[derive(Serialize, Deserialize, Debug, Default, PartialEq, Eq)]
178#[serde(rename_all = "camelCase")]
179struct PartialStats {
180 pub num_records: i64,
182
183 pub min_values: Option<HashMap<String, ColumnValueStat>>,
186 pub max_values: Option<HashMap<String, ColumnValueStat>>,
188 pub null_count: Option<HashMap<String, ColumnCountStat>>,
190}
191
192impl PartialStats {
193 pub fn as_stats(&mut self) -> Stats {
195 let min_values = take(&mut self.min_values);
196 let max_values = take(&mut self.max_values);
197 let null_count = take(&mut self.null_count);
198 Stats {
199 num_records: self.num_records,
200 min_values: min_values.unwrap_or_default(),
201 max_values: max_values.unwrap_or_default(),
202 null_count: null_count.unwrap_or_default(),
203 }
204 }
205}
206
207#[derive(Debug, Default)]
209pub struct StatsParsed {
210 pub num_records: i64,
212
213 pub min_values: HashMap<String, parquet::record::Field>,
216 pub max_values: HashMap<String, parquet::record::Field>,
219 pub null_count: HashMap<String, i64>,
221}
222
223impl Hash for Add {
224 fn hash<H: Hasher>(&self, state: &mut H) {
225 self.path.hash(state);
226 }
227}
228
229impl PartialEq for Add {
230 fn eq(&self, other: &Self) -> bool {
231 self.path == other.path
232 && self.size == other.size
233 && self.partition_values == other.partition_values
234 && self.modification_time == other.modification_time
235 && self.data_change == other.data_change
236 && self.stats == other.stats
237 && self.tags == other.tags
238 && self.deletion_vector == other.deletion_vector
239 }
240}
241
242impl Eq for Add {}
243
244impl Add {
245 pub fn get_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
247 match self.get_stats_parsed() {
248 Ok(Some(stats)) => Ok(Some(stats)),
249 Ok(None) => self.get_json_stats(),
250 Err(e) => {
251 error!(
252 "Error when reading parquet stats {:?} {e}. Attempting to read json stats",
253 self.stats_parsed
254 );
255 self.get_json_stats()
256 }
257 }
258 }
259
260 pub fn get_json_stats(&self) -> Result<Option<Stats>, serde_json::error::Error> {
263 self.stats
264 .as_ref()
265 .map(|stats| serde_json::from_str(stats).map(|mut ps: PartialStats| ps.as_stats()))
266 .transpose()
267 }
268}
269
270impl Hash for Remove {
271 fn hash<H: Hasher>(&self, state: &mut H) {
272 self.path.hash(state);
273 }
274}
275
276impl Borrow<str> for Remove {
279 fn borrow(&self) -> &str {
280 self.path.as_ref()
281 }
282}
283
284impl PartialEq for Remove {
285 fn eq(&self, other: &Self) -> bool {
286 self.path == other.path
287 && self.deletion_timestamp == other.deletion_timestamp
288 && self.data_change == other.data_change
289 && self.extended_file_metadata == other.extended_file_metadata
290 && self.partition_values == other.partition_values
291 && self.size == other.size
292 && self.tags == other.tags
293 && self.deletion_vector == other.deletion_vector
294 }
295}
296
297#[allow(clippy::large_enum_variant)]
298#[derive(Serialize, Deserialize, Debug, Clone)]
299#[serde(rename_all = "camelCase")]
300pub struct MergePredicate {
302 pub action_type: String,
304 #[serde(skip_serializing_if = "Option::is_none")]
306 pub predicate: Option<String>,
307}
308
309#[allow(clippy::large_enum_variant)]
312#[derive(Serialize, Deserialize, Debug, Clone)]
313#[serde(rename_all = "camelCase")]
314pub enum DeltaOperation {
315 AddColumn {
318 fields: Vec<StructField>,
320 },
321
322 Create {
326 mode: SaveMode,
328 location: String,
330 protocol: Protocol,
332 metadata: Metadata,
334 },
335
336 #[serde(rename_all = "camelCase")]
339 Write {
340 mode: SaveMode,
342 partition_by: Option<Vec<String>>,
344 predicate: Option<String>,
346 },
347
348 Delete {
350 predicate: Option<String>,
352 },
353
354 Update {
356 predicate: Option<String>,
358 },
359 AddConstraint {
361 name: String,
363 expr: String,
365 },
366
367 AddFeature {
369 name: Vec<TableFeatures>,
371 },
372
373 DropConstraint {
375 name: String,
377 },
378
379 #[serde(rename_all = "camelCase")]
381 Merge {
382 predicate: Option<String>,
384
385 merge_predicate: Option<String>,
387
388 matched_predicates: Vec<MergePredicate>,
390
391 not_matched_predicates: Vec<MergePredicate>,
393
394 not_matched_by_source_predicates: Vec<MergePredicate>,
396 },
397
398 #[serde(rename_all = "camelCase")]
400 StreamingUpdate {
401 output_mode: OutputMode,
403 query_id: String,
405 epoch_id: i64,
407 },
408
409 #[serde(rename_all = "camelCase")]
411 SetTableProperties {
412 properties: HashMap<String, String>,
414 },
415
416 #[serde(rename_all = "camelCase")]
417 Optimize {
419 predicate: Option<String>,
422 target_size: i64,
424 },
425 #[serde(rename_all = "camelCase")]
426 FileSystemCheck {},
428
429 Restore {
431 version: Option<i64>,
433 datetime: Option<i64>,
435 }, #[serde(rename_all = "camelCase")]
438 VacuumStart {
440 retention_check_enabled: bool,
442 specified_retention_millis: Option<i64>,
444 default_retention_millis: i64,
446 },
447
448 VacuumEnd {
450 status: String,
452 },
453 #[serde(rename_all = "camelCase")]
455 UpdateFieldMetadata {
456 fields: Vec<StructField>,
458 },
459}
460
461impl DeltaOperation {
462 pub fn name(&self) -> &str {
464 match &self {
466 DeltaOperation::AddColumn { .. } => "ADD COLUMN",
467 DeltaOperation::Create {
468 mode: SaveMode::Overwrite,
469 ..
470 } => "CREATE OR REPLACE TABLE",
471 DeltaOperation::Create { .. } => "CREATE TABLE",
472 DeltaOperation::Write { .. } => "WRITE",
473 DeltaOperation::Delete { .. } => "DELETE",
474 DeltaOperation::Update { .. } => "UPDATE",
475 DeltaOperation::Merge { .. } => "MERGE",
476 DeltaOperation::StreamingUpdate { .. } => "STREAMING UPDATE",
477 DeltaOperation::SetTableProperties { .. } => "SET TBLPROPERTIES",
478 DeltaOperation::Optimize { .. } => "OPTIMIZE",
479 DeltaOperation::FileSystemCheck { .. } => "FSCK",
480 DeltaOperation::Restore { .. } => "RESTORE",
481 DeltaOperation::VacuumStart { .. } => "VACUUM START",
482 DeltaOperation::VacuumEnd { .. } => "VACUUM END",
483 DeltaOperation::AddConstraint { .. } => "ADD CONSTRAINT",
484 DeltaOperation::DropConstraint { .. } => "DROP CONSTRAINT",
485 DeltaOperation::AddFeature { .. } => "ADD FEATURE",
486 DeltaOperation::UpdateFieldMetadata { .. } => "UPDATE FIELD METADATA",
487 }
488 }
489
490 pub fn operation_parameters(&self) -> DeltaResult<HashMap<String, Value>> {
492 if let Some(Some(Some(map))) = serde_json::to_value(self)
493 .map_err(|err| ProtocolError::SerializeOperation { source: err })?
494 .as_object()
495 .map(|p| p.values().next().map(|q| q.as_object()))
496 {
497 Ok(map
498 .iter()
499 .filter(|item| !item.1.is_null())
500 .map(|(k, v)| {
501 (
502 k.to_owned(),
503 serde_json::Value::String(if v.is_string() {
504 String::from(v.as_str().unwrap())
505 } else {
506 v.to_string()
507 }),
508 )
509 })
510 .collect())
511 } else {
512 Err(ProtocolError::Generic(
513 "Operation parameters serialized into unexpected shape".into(),
514 )
515 .into())
516 }
517 }
518
519 pub fn changes_data(&self) -> bool {
521 match self {
522 Self::Optimize { .. }
523 | Self::UpdateFieldMetadata { .. }
524 | Self::SetTableProperties { .. }
525 | Self::AddColumn { .. }
526 | Self::AddFeature { .. }
527 | Self::VacuumStart { .. }
528 | Self::VacuumEnd { .. }
529 | Self::AddConstraint { .. }
530 | Self::DropConstraint { .. } => false,
531 Self::Create { .. }
532 | Self::FileSystemCheck {}
533 | Self::StreamingUpdate { .. }
534 | Self::Write { .. }
535 | Self::Delete { .. }
536 | Self::Merge { .. }
537 | Self::Update { .. }
538 | Self::Restore { .. } => true,
539 }
540 }
541
542 pub fn get_commit_info(&self) -> CommitInfo {
544 CommitInfo {
546 operation: Some(self.name().into()),
547 operation_parameters: self.operation_parameters().ok(),
548 ..Default::default()
549 }
550 }
551
552 pub fn read_predicate(&self) -> Option<String> {
554 match self {
555 Self::Write { predicate, .. } => predicate.clone(),
557 Self::Delete { predicate, .. } => predicate.clone(),
558 Self::Update { predicate, .. } => predicate.clone(),
559 Self::Merge { predicate, .. } => predicate.clone(),
560 _ => None,
561 }
562 }
563
564 pub fn read_whole_table(&self) -> bool {
566 match self {
567 Self::Merge { predicate, .. } if predicate.is_none() => true,
569 _ => false,
570 }
571 }
572}
573
574#[derive(Serialize, Deserialize, Debug, Copy, Clone, PartialEq, Eq)]
576pub enum SaveMode {
577 Append,
579 Overwrite,
581 ErrorIfExists,
583 Ignore,
585}
586
587impl FromStr for SaveMode {
588 type Err = DeltaTableError;
589
590 fn from_str(s: &str) -> DeltaResult<Self> {
591 match s.to_ascii_lowercase().as_str() {
592 "append" => Ok(SaveMode::Append),
593 "overwrite" => Ok(SaveMode::Overwrite),
594 "error" => Ok(SaveMode::ErrorIfExists),
595 "ignore" => Ok(SaveMode::Ignore),
596 _ => Err(DeltaTableError::Generic(format!("Invalid save mode provided: {s}, only these are supported: ['append', 'overwrite', 'error', 'ignore']"))),
597 }
598 }
599}
600
601#[derive(Serialize, Deserialize, Debug, Copy, Clone)]
603pub enum OutputMode {
604 Append,
606 Complete,
608 Update,
610}
611
612pub(crate) async fn get_last_checkpoint(
613 log_store: &dyn LogStore,
614) -> Result<CheckPoint, ProtocolError> {
615 let last_checkpoint_path = Path::from_iter(["_delta_log", "_last_checkpoint"]);
616 debug!("loading checkpoint from {last_checkpoint_path}");
617 match log_store
618 .object_store(None)
619 .get(&last_checkpoint_path)
620 .await
621 {
622 Ok(data) => Ok(serde_json::from_slice(&data.bytes().await?)?),
623 Err(ObjectStoreError::NotFound { .. }) => {
624 match find_latest_check_point_for_version(log_store, i64::MAX).await {
625 Ok(Some(cp)) => Ok(cp),
626 _ => Err(ProtocolError::CheckpointNotFound),
627 }
628 }
629 Err(err) => Err(ProtocolError::ObjectStore { source: err }),
630 }
631}
632
633pub(crate) async fn find_latest_check_point_for_version(
634 log_store: &dyn LogStore,
635 version: i64,
636) -> Result<Option<CheckPoint>, ProtocolError> {
637 static CHECKPOINT_REGEX: LazyLock<Regex> =
638 LazyLock::new(|| Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap());
639 static CHECKPOINT_PARTS_REGEX: LazyLock<Regex> = LazyLock::new(|| {
640 Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap()
641 });
642
643 let mut cp: Option<CheckPoint> = None;
644 let object_store = log_store.object_store(None);
645 let mut stream = object_store.list(Some(log_store.log_path()));
646
647 while let Some(obj_meta) = stream.next().await {
648 let obj_meta = match obj_meta {
653 Ok(meta) => Ok(meta),
654 Err(ObjectStoreError::NotFound { .. }) => continue,
655 Err(err) => Err(err),
656 }?;
657 if let Some(captures) = CHECKPOINT_REGEX.captures(obj_meta.location.as_ref()) {
658 let curr_ver_str = captures.get(1).unwrap().as_str();
659 let curr_ver: i64 = curr_ver_str.parse().unwrap();
660 if curr_ver > version {
661 continue;
663 }
664 if cp.is_none() || curr_ver > cp.unwrap().version {
665 cp = Some(CheckPoint::new(curr_ver, 0, None));
666 }
667 continue;
668 }
669
670 if let Some(captures) = CHECKPOINT_PARTS_REGEX.captures(obj_meta.location.as_ref()) {
671 let curr_ver_str = captures.get(1).unwrap().as_str();
672 let curr_ver: i64 = curr_ver_str.parse().unwrap();
673 if curr_ver > version {
674 continue;
676 }
677 if cp.is_none() || curr_ver > cp.unwrap().version {
678 let parts_str = captures.get(2).unwrap().as_str();
679 let parts = parts_str.parse().unwrap();
680 cp = Some(CheckPoint::new(curr_ver, 0, Some(parts)));
681 }
682 continue;
683 }
684 }
685
686 Ok(cp)
687}
688
689#[cfg(test)]
690mod tests {
691 use super::*;
692 use crate::kernel::Action;
693
694 #[test]
695 fn test_load_table_stats() {
696 let action = Add {
697 stats: Some(
698 serde_json::json!({
699 "numRecords": 22,
700 "minValues": {"a": 1, "nested": {"b": 2, "c": "a"}},
701 "maxValues": {"a": 10, "nested": {"b": 20, "c": "z"}},
702 "nullCount": {"a": 1, "nested": {"b": 0, "c": 1}},
703 })
704 .to_string(),
705 ),
706 path: Default::default(),
707 data_change: true,
708 deletion_vector: None,
709 partition_values: Default::default(),
710 stats_parsed: None,
711 tags: None,
712 size: 0,
713 modification_time: 0,
714 base_row_id: None,
715 default_row_commit_version: None,
716 clustering_provider: None,
717 };
718
719 let stats = action.get_stats().unwrap().unwrap();
720
721 assert_eq!(stats.num_records, 22);
722
723 assert_eq!(
724 stats.min_values["a"].as_value().unwrap(),
725 &serde_json::json!(1)
726 );
727 assert_eq!(
728 stats.min_values["nested"].as_column().unwrap()["b"]
729 .as_value()
730 .unwrap(),
731 &serde_json::json!(2)
732 );
733 assert_eq!(
734 stats.min_values["nested"].as_column().unwrap()["c"]
735 .as_value()
736 .unwrap(),
737 &serde_json::json!("a")
738 );
739
740 assert_eq!(
741 stats.max_values["a"].as_value().unwrap(),
742 &serde_json::json!(10)
743 );
744 assert_eq!(
745 stats.max_values["nested"].as_column().unwrap()["b"]
746 .as_value()
747 .unwrap(),
748 &serde_json::json!(20)
749 );
750 assert_eq!(
751 stats.max_values["nested"].as_column().unwrap()["c"]
752 .as_value()
753 .unwrap(),
754 &serde_json::json!("z")
755 );
756
757 assert_eq!(stats.null_count["a"].as_value().unwrap(), 1);
758 assert_eq!(
759 stats.null_count["nested"].as_column().unwrap()["b"]
760 .as_value()
761 .unwrap(),
762 0
763 );
764 assert_eq!(
765 stats.null_count["nested"].as_column().unwrap()["c"]
766 .as_value()
767 .unwrap(),
768 1
769 );
770 }
771
772 #[test]
773 fn test_load_table_partial_stats() {
774 let action = Add {
775 stats: Some(
776 serde_json::json!({
777 "numRecords": 22
778 })
779 .to_string(),
780 ),
781 path: Default::default(),
782 data_change: true,
783 deletion_vector: None,
784 partition_values: Default::default(),
785 stats_parsed: None,
786 tags: None,
787 size: 0,
788 modification_time: 0,
789 base_row_id: None,
790 default_row_commit_version: None,
791 clustering_provider: None,
792 };
793
794 let stats = action.get_stats().unwrap().unwrap();
795
796 assert_eq!(stats.num_records, 22);
797 assert_eq!(stats.min_values.len(), 0);
798 assert_eq!(stats.max_values.len(), 0);
799 assert_eq!(stats.null_count.len(), 0);
800 }
801
802 #[test]
803 fn test_read_commit_info() {
804 let raw = r#"
805 {
806 "timestamp": 1670892998177,
807 "operation": "WRITE",
808 "operationParameters": {
809 "mode": "Append",
810 "partitionBy": "[\"c1\",\"c2\"]"
811 },
812 "isolationLevel": "Serializable",
813 "isBlindAppend": true,
814 "operationMetrics": {
815 "numFiles": "3",
816 "numOutputRows": "3",
817 "numOutputBytes": "1356"
818 },
819 "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
820 "txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c"
821 }"#;
822
823 let info = serde_json::from_str::<CommitInfo>(raw);
824 assert!(info.is_ok());
825
826 let raw = "{}";
828 let info = serde_json::from_str::<CommitInfo>(raw);
829 assert!(info.is_ok());
830
831 let raw = r#"
833 {
834 "timestamp": 1670892998177,
835 "operation": "WRITE",
836 "operationParameters": {
837 "mode": "Append",
838 "partitionBy": "[\"c1\",\"c2\"]"
839 },
840 "isolationLevel": "Serializable",
841 "isBlindAppend": true,
842 "operationMetrics": {
843 "numFiles": "3",
844 "numOutputRows": "3",
845 "numOutputBytes": "1356"
846 },
847 "engineInfo": "Apache-Spark/3.3.1 Delta-Lake/2.2.0",
848 "txnId": "046a258f-45e3-4657-b0bf-abfb0f76681c",
849 "additionalField": "more data",
850 "additionalStruct": {
851 "key": "value",
852 "otherKey": 123
853 }
854 }"#;
855
856 let info = serde_json::from_str::<CommitInfo>(raw).expect("should parse");
857 assert!(info.info.contains_key("additionalField"));
858 assert!(info.info.contains_key("additionalStruct"));
859 }
860
861 #[test]
862 fn test_read_domain_metadata() {
863 let buf = r#"{"domainMetadata":{"domain":"delta.liquid","configuration":"{\"clusteringColumns\":[{\"physicalName\":[\"id\"]}],\"domainName\":\"delta.liquid\"}","removed":false}}"#;
864 let _action: Action =
865 serde_json::from_str(buf).expect("Expected to be able to deserialize");
866 }
867
868 mod arrow_tests {
869 use arrow::array::{self, ArrayRef, StructArray};
870 use arrow::compute::kernels::cast_utils::Parser;
871 use arrow::compute::sort_to_indices;
872 use arrow::datatypes::{DataType, Date32Type, Field, Fields, TimestampMicrosecondType};
873 use arrow::record_batch::RecordBatch;
874 use std::sync::Arc;
875
876 fn sort_batch_by(batch: &RecordBatch, column: &str) -> arrow::error::Result<RecordBatch> {
877 let sort_column = batch.column(batch.schema().column_with_name(column).unwrap().0);
878 let sort_indices = sort_to_indices(sort_column, None, None)?;
879 let schema = batch.schema();
880 let sorted_columns: Vec<(&String, ArrayRef)> = schema
881 .fields()
882 .iter()
883 .zip(batch.columns().iter())
884 .map(|(field, column)| {
885 Ok((
886 field.name(),
887 arrow::compute::take(column, &sort_indices, None)?,
888 ))
889 })
890 .collect::<arrow::error::Result<_>>()?;
891 RecordBatch::try_from_iter(sorted_columns)
892 }
893
894 #[tokio::test]
895 async fn test_with_partitions() {
896 let path = "../test/tests/data/delta-0.8.0-null-partition";
898 let table = crate::open_table(path).await.unwrap();
899 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
900
901 let mut expected_columns: Vec<(&str, ArrayRef)> = vec![
902 ("path", Arc::new(array::StringArray::from(vec![
903 "k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet",
904 "k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet"
905 ]))),
906 ("size_bytes", Arc::new(array::Int64Array::from(vec![460, 460]))),
907 ("modification_time", Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
908 1627990384000, 1627990384000
909 ]))),
910 ("data_change", Arc::new(array::BooleanArray::from(vec![true, true]))),
911 ("partition.k", Arc::new(array::StringArray::from(vec![Some("A"), None]))),
912 ];
913 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
914
915 assert_eq!(expected, actions);
916
917 let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
918 let actions = sort_batch_by(&actions, "path").unwrap();
919
920 expected_columns[4] = (
921 "partition_values",
922 Arc::new(array::StructArray::new(
923 Fields::from(vec![Field::new("k", DataType::Utf8, true)]),
924 vec![Arc::new(array::StringArray::from(vec![Some("A"), None])) as ArrayRef],
925 None,
926 )),
927 );
928 let expected = RecordBatch::try_from_iter(expected_columns).unwrap();
929
930 assert_eq!(expected, actions);
931 }
932
933 #[tokio::test]
934 #[ignore = "enable when deletion vector is supported"]
935 async fn test_with_deletion_vector() {
936 let path = "../test/tests/data/table_with_deletion_logs";
938 let table = crate::open_table(path).await.unwrap();
939 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
940 let actions = sort_batch_by(&actions, "path").unwrap();
941 let actions = actions
942 .project(&[
943 actions.schema().index_of("path").unwrap(),
944 actions.schema().index_of("size_bytes").unwrap(),
945 actions
946 .schema()
947 .index_of("deletionVector.storageType")
948 .unwrap(),
949 actions
950 .schema()
951 .index_of("deletionVector.pathOrInlineDiv")
952 .unwrap(),
953 actions.schema().index_of("deletionVector.offset").unwrap(),
954 actions
955 .schema()
956 .index_of("deletionVector.sizeInBytes")
957 .unwrap(),
958 actions
959 .schema()
960 .index_of("deletionVector.cardinality")
961 .unwrap(),
962 ])
963 .unwrap();
964 let expected_columns: Vec<(&str, ArrayRef)> = vec![
965 (
966 "path",
967 Arc::new(array::StringArray::from(vec![
968 "part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
969 ])),
970 ),
971 ("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
972 (
973 "deletionVector.storageType",
974 Arc::new(array::StringArray::from(vec!["u"])),
975 ),
976 (
977 "deletionVector.pathOrInlineDiv",
978 Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"])),
979 ),
980 (
981 "deletionVector.offset",
982 Arc::new(array::Int32Array::from(vec![1])),
983 ),
984 (
985 "deletionVector.sizeInBytes",
986 Arc::new(array::Int32Array::from(vec![36])),
987 ),
988 (
989 "deletionVector.cardinality",
990 Arc::new(array::Int64Array::from(vec![2])),
991 ),
992 ];
993 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
994
995 assert_eq!(expected, actions);
996
997 let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
998 let actions = sort_batch_by(&actions, "path").unwrap();
999 let actions = actions
1000 .project(&[
1001 actions.schema().index_of("path").unwrap(),
1002 actions.schema().index_of("size_bytes").unwrap(),
1003 actions.schema().index_of("deletionVector").unwrap(),
1004 ])
1005 .unwrap();
1006 let expected_columns: Vec<(&str, ArrayRef)> = vec![
1007 (
1008 "path",
1009 Arc::new(array::StringArray::from(vec![
1010 "part-00000-cb251d5e-b665-437a-a9a7-fbfc5137c77d.c000.snappy.parquet",
1011 ])),
1012 ),
1013 ("size_bytes", Arc::new(array::Int64Array::from(vec![10499]))),
1014 (
1015 "deletionVector",
1016 Arc::new(array::StructArray::new(
1017 Fields::from(vec![
1018 Field::new("storageType", DataType::Utf8, false),
1019 Field::new("pathOrInlineDiv", DataType::Utf8, false),
1020 Field::new("offset", DataType::Int32, true),
1021 Field::new("sizeInBytes", DataType::Int32, false),
1022 Field::new("cardinality", DataType::Int64, false),
1023 ]),
1024 vec![
1025 Arc::new(array::StringArray::from(vec!["u"])) as ArrayRef,
1026 Arc::new(array::StringArray::from(vec!["Q6Kt3y1b)0MgZSWwPunr"]))
1027 as ArrayRef,
1028 Arc::new(array::Int32Array::from(vec![1])) as ArrayRef,
1029 Arc::new(array::Int32Array::from(vec![36])) as ArrayRef,
1030 Arc::new(array::Int64Array::from(vec![2])) as ArrayRef,
1031 ],
1032 None,
1033 )),
1034 ),
1035 ];
1036 let expected = RecordBatch::try_from_iter(expected_columns).unwrap();
1037
1038 assert_eq!(expected, actions);
1039 }
1040 #[tokio::test]
1041 async fn test_without_partitions() {
1042 let path = "../test/tests/data/simple_table";
1044 let table = crate::open_table(path).await.unwrap();
1045
1046 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1047 let actions = sort_batch_by(&actions, "path").unwrap();
1048
1049 let expected_columns: Vec<(&str, ArrayRef)> = vec![
1050 (
1051 "path",
1052 Arc::new(array::StringArray::from(vec![
1053 "part-00000-2befed33-c358-4768-a43c-3eda0d2a499d-c000.snappy.parquet",
1054 "part-00000-c1777d7d-89d9-4790-b38a-6ee7e24456b1-c000.snappy.parquet",
1055 "part-00001-7891c33d-cedc-47c3-88a6-abcfb049d3b4-c000.snappy.parquet",
1056 "part-00004-315835fe-fb44-4562-98f6-5e6cfa3ae45d-c000.snappy.parquet",
1057 "part-00007-3a0e4727-de0d-41b6-81ef-5223cf40f025-c000.snappy.parquet",
1058 ])),
1059 ),
1060 (
1061 "size_bytes",
1062 Arc::new(array::Int64Array::from(vec![262, 262, 429, 429, 429])),
1063 ),
1064 (
1065 "modification_time",
1066 Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1067 1587968626000,
1068 1587968602000,
1069 1587968602000,
1070 1587968602000,
1071 1587968602000,
1072 ])),
1073 ),
1074 (
1075 "data_change",
1076 Arc::new(array::BooleanArray::from(vec![
1077 true, true, true, true, true,
1078 ])),
1079 ),
1080 ];
1081 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1082
1083 assert_eq!(expected, actions);
1084
1085 let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
1086 let actions = sort_batch_by(&actions, "path").unwrap();
1087
1088 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1094
1095 assert_eq!(expected, actions);
1096 }
1097
1098 #[tokio::test]
1099 #[ignore = "column mapping not yet supported."]
1100 async fn test_with_column_mapping() {
1101 let path = "../test/tests/data/table_with_column_mapping";
1103 let table = crate::open_table(path).await.unwrap();
1104 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1105 let expected_columns: Vec<(&str, ArrayRef)> = vec![
1106 (
1107 "path",
1108 Arc::new(array::StringArray::from(vec![
1109 "BH/part-00000-4d6e745c-8e04-48d9-aa60-438228358f1a.c000.zstd.parquet",
1110 "8v/part-00001-69b4a452-aeac-4ffa-bf5c-a0c2833d05eb.c000.zstd.parquet",
1111 ])),
1112 ),
1113 (
1114 "size_bytes",
1115 Arc::new(array::Int64Array::from(vec![890, 810])),
1116 ),
1117 (
1118 "modification_time",
1119 Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1120 1699946088000,
1121 1699946088000,
1122 ])),
1123 ),
1124 (
1125 "data_change",
1126 Arc::new(array::BooleanArray::from(vec![true, true])),
1127 ),
1128 (
1129 "partition.Company Very Short",
1130 Arc::new(array::StringArray::from(vec!["BMS", "BME"])),
1131 ),
1132 ("num_records", Arc::new(array::Int64Array::from(vec![4, 1]))),
1133 (
1134 "null_count.Company Very Short",
1135 Arc::new(array::NullArray::new(2)),
1136 ),
1137 ("min.Company Very Short", Arc::new(array::NullArray::new(2))),
1138 ("max.Company Very Short", Arc::new(array::NullArray::new(2))),
1139 ("null_count.Super Name", Arc::new(array::NullArray::new(2))),
1140 ("min.Super Name", Arc::new(array::NullArray::new(2))),
1141 ("max.Super Name", Arc::new(array::NullArray::new(2))),
1142 (
1143 "tags.INSERTION_TIME",
1144 Arc::new(array::StringArray::from(vec![
1145 "1699946088000000",
1146 "1699946088000001",
1147 ])),
1148 ),
1149 (
1150 "tags.MAX_INSERTION_TIME",
1151 Arc::new(array::StringArray::from(vec![
1152 "1699946088000000",
1153 "1699946088000001",
1154 ])),
1155 ),
1156 (
1157 "tags.MIN_INSERTION_TIME",
1158 Arc::new(array::StringArray::from(vec![
1159 "1699946088000000",
1160 "1699946088000001",
1161 ])),
1162 ),
1163 (
1164 "tags.OPTIMIZE_TARGET_SIZE",
1165 Arc::new(array::StringArray::from(vec!["33554432", "33554432"])),
1166 ),
1167 ];
1168 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1169
1170 assert_eq!(expected, actions);
1171 }
1172
1173 #[tokio::test]
1174 async fn test_with_stats() {
1175 let path = "../test/tests/data/delta-0.8.0";
1177 let table = crate::open_table(path).await.unwrap();
1178 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1179 let actions = sort_batch_by(&actions, "path").unwrap();
1180
1181 let expected_columns: Vec<(&str, ArrayRef)> = vec![
1182 (
1183 "path",
1184 Arc::new(array::StringArray::from(vec![
1185 "part-00000-04ec9591-0b73-459e-8d18-ba5711d6cbe1-c000.snappy.parquet",
1186 "part-00000-c9b90f86-73e6-46c8-93ba-ff6bfaf892a1-c000.snappy.parquet",
1187 ])),
1188 ),
1189 (
1190 "size_bytes",
1191 Arc::new(array::Int64Array::from(vec![440, 440])),
1192 ),
1193 (
1194 "modification_time",
1195 Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1196 1615043776000,
1197 1615043767000,
1198 ])),
1199 ),
1200 (
1201 "data_change",
1202 Arc::new(array::BooleanArray::from(vec![true, true])),
1203 ),
1204 ("num_records", Arc::new(array::Int64Array::from(vec![2, 2]))),
1205 (
1206 "null_count.value",
1207 Arc::new(array::Int64Array::from(vec![0, 0])),
1208 ),
1209 ("min.value", Arc::new(array::Int32Array::from(vec![2, 0]))),
1210 ("max.value", Arc::new(array::Int32Array::from(vec![4, 2]))),
1211 ];
1212 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1213
1214 assert_eq!(expected, actions);
1215 }
1216
1217 #[tokio::test]
1218 async fn test_table_not_always_with_stats() {
1219 let path = "../test/tests/data/delta-stats-optional";
1220 let mut table = crate::open_table(path).await.unwrap();
1221 table.load().await.unwrap();
1222 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1223 let actions = sort_batch_by(&actions, "path").unwrap();
1224 let expected_path: ArrayRef = Arc::new(array::StringArray::from(vec![
1226 "part-00000-28925d3a-bdf2-411e-bca9-b067444cbcb0-c000.snappy.parquet",
1227 "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
1228 ]));
1229 let expected_num_records: ArrayRef =
1230 Arc::new(array::Int64Array::from(vec![None, Some(1)]));
1231 let expected_null_count: ArrayRef =
1232 Arc::new(array::Int64Array::from(vec![None, Some(0)]));
1233
1234 let path_column = actions.column(0);
1235 let num_records_column = actions.column(4);
1236 let null_count_column = actions.column(5);
1237
1238 assert_eq!(&expected_path, path_column);
1239 assert_eq!(&expected_num_records, num_records_column);
1240 assert_eq!(&expected_null_count, null_count_column);
1241 }
1242
1243 #[tokio::test]
1244 async fn test_table_checkpoint_not_always_with_stats() {
1245 let path = "../test/tests/data/delta-checkpoint-stats-optional";
1246 let mut table = crate::open_table(path).await.unwrap();
1247 table.load().await.unwrap();
1248
1249 assert_eq!(2, table.snapshot().unwrap().file_actions().unwrap().len());
1250 }
1251
1252 #[tokio::test]
1253 async fn test_only_struct_stats() {
1254 let path = "../test/tests/data/delta-1.2.1-only-struct-stats";
1256 let mut table = crate::open_table(path).await.unwrap();
1257 table.load_version(1).await.unwrap();
1258
1259 let actions = table.snapshot().unwrap().add_actions_table(true).unwrap();
1260
1261 let expected_columns: Vec<(&str, ArrayRef)> = vec![
1262 (
1263 "path",
1264 Arc::new(array::StringArray::from(vec![
1265 "part-00000-7a509247-4f58-4453-9202-51d75dee59af-c000.snappy.parquet",
1266 ])),
1267 ),
1268 ("size_bytes", Arc::new(array::Int64Array::from(vec![5489]))),
1269 (
1270 "modification_time",
1271 Arc::new(arrow::array::TimestampMillisecondArray::from(vec![
1272 1666652373000,
1273 ])),
1274 ),
1275 (
1276 "data_change",
1277 Arc::new(array::BooleanArray::from(vec![true])),
1278 ),
1279 ("num_records", Arc::new(array::Int64Array::from(vec![1]))),
1280 (
1281 "null_count.integer",
1282 Arc::new(array::Int64Array::from(vec![0])),
1283 ),
1284 ("min.integer", Arc::new(array::Int32Array::from(vec![0]))),
1285 ("max.integer", Arc::new(array::Int32Array::from(vec![0]))),
1286 (
1287 "null_count.null",
1288 Arc::new(array::Int64Array::from(vec![1])),
1289 ),
1290 ("min.null", Arc::new(array::NullArray::new(1))),
1291 ("max.null", Arc::new(array::NullArray::new(1))),
1292 (
1293 "null_count.boolean",
1294 Arc::new(array::Int64Array::from(vec![0])),
1295 ),
1296 ("min.boolean", Arc::new(array::NullArray::new(1))),
1297 ("max.boolean", Arc::new(array::NullArray::new(1))),
1298 (
1299 "null_count.double",
1300 Arc::new(array::Int64Array::from(vec![0])),
1301 ),
1302 (
1303 "min.double",
1304 Arc::new(array::Float64Array::from(vec![1.234])),
1305 ),
1306 (
1307 "max.double",
1308 Arc::new(array::Float64Array::from(vec![1.234])),
1309 ),
1310 (
1311 "null_count.decimal",
1312 Arc::new(array::Int64Array::from(vec![0])),
1313 ),
1314 (
1315 "min.decimal",
1316 Arc::new(
1317 array::Decimal128Array::from_iter_values([-567800])
1318 .with_precision_and_scale(8, 5)
1319 .unwrap(),
1320 ),
1321 ),
1322 (
1323 "max.decimal",
1324 Arc::new(
1325 array::Decimal128Array::from_iter_values([-567800])
1326 .with_precision_and_scale(8, 5)
1327 .unwrap(),
1328 ),
1329 ),
1330 (
1331 "null_count.string",
1332 Arc::new(array::Int64Array::from(vec![0])),
1333 ),
1334 (
1335 "min.string",
1336 Arc::new(array::StringArray::from(vec!["string"])),
1337 ),
1338 (
1339 "max.string",
1340 Arc::new(array::StringArray::from(vec!["string"])),
1341 ),
1342 (
1343 "null_count.binary",
1344 Arc::new(array::Int64Array::from(vec![0])),
1345 ),
1346 ("min.binary", Arc::new(array::NullArray::new(1))),
1347 ("max.binary", Arc::new(array::NullArray::new(1))),
1348 (
1349 "null_count.date",
1350 Arc::new(array::Int64Array::from(vec![0])),
1351 ),
1352 (
1353 "min.date",
1354 Arc::new(array::Date32Array::from(vec![Date32Type::parse(
1355 "2022-10-24",
1356 )])),
1357 ),
1358 (
1359 "max.date",
1360 Arc::new(array::Date32Array::from(vec![Date32Type::parse(
1361 "2022-10-24",
1362 )])),
1363 ),
1364 (
1365 "null_count.timestamp",
1366 Arc::new(array::Int64Array::from(vec![0])),
1367 ),
1368 (
1369 "min.timestamp",
1370 Arc::new(
1371 array::TimestampMicrosecondArray::from(vec![
1372 TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"),
1373 ])
1374 .with_timezone("UTC"),
1375 ),
1376 ),
1377 (
1378 "max.timestamp",
1379 Arc::new(
1380 array::TimestampMicrosecondArray::from(vec![
1381 TimestampMicrosecondType::parse("2022-10-24T22:59:32.846Z"),
1382 ])
1383 .with_timezone("UTC"),
1384 ),
1385 ),
1386 (
1387 "null_count.struct.struct_element",
1388 Arc::new(array::Int64Array::from(vec![0])),
1389 ),
1390 (
1391 "min.struct.struct_element",
1392 Arc::new(array::StringArray::from(vec!["struct_value"])),
1393 ),
1394 (
1395 "max.struct.struct_element",
1396 Arc::new(array::StringArray::from(vec!["struct_value"])),
1397 ),
1398 ("null_count.map", Arc::new(array::Int64Array::from(vec![0]))),
1399 (
1400 "null_count.array",
1401 Arc::new(array::Int64Array::from(vec![0])),
1402 ),
1403 (
1404 "null_count.nested_struct.struct_element.nested_struct_element",
1405 Arc::new(array::Int64Array::from(vec![0])),
1406 ),
1407 (
1408 "min.nested_struct.struct_element.nested_struct_element",
1409 Arc::new(array::StringArray::from(vec!["nested_struct_value"])),
1410 ),
1411 (
1412 "max.nested_struct.struct_element.nested_struct_element",
1413 Arc::new(array::StringArray::from(vec!["nested_struct_value"])),
1414 ),
1415 (
1416 "null_count.struct_of_array_of_map.struct_element",
1417 Arc::new(array::Int64Array::from(vec![0])),
1418 ),
1419 (
1420 "tags.INSERTION_TIME",
1421 Arc::new(array::StringArray::from(vec!["1666652373000000"])),
1422 ),
1423 (
1424 "tags.OPTIMIZE_TARGET_SIZE",
1425 Arc::new(array::StringArray::from(vec!["268435456"])),
1426 ),
1427 ];
1428 let expected = RecordBatch::try_from_iter(expected_columns.clone()).unwrap();
1429
1430 assert_eq!(
1431 expected
1432 .schema()
1433 .fields()
1434 .iter()
1435 .map(|field| field.name().as_str())
1436 .collect::<Vec<&str>>(),
1437 actions
1438 .schema()
1439 .fields()
1440 .iter()
1441 .map(|field| field.name().as_str())
1442 .collect::<Vec<&str>>()
1443 );
1444 assert_eq!(expected, actions);
1445
1446 let actions = table.snapshot().unwrap().add_actions_table(false).unwrap();
1447 assert_eq!(
1450 actions
1451 .get_field_at_path(&[
1452 "null_count",
1453 "nested_struct",
1454 "struct_element",
1455 "nested_struct_element"
1456 ])
1457 .unwrap()
1458 .as_any()
1459 .downcast_ref::<array::Int64Array>()
1460 .unwrap(),
1461 &array::Int64Array::from(vec![0]),
1462 );
1463
1464 assert_eq!(
1465 actions
1466 .get_field_at_path(&[
1467 "min",
1468 "nested_struct",
1469 "struct_element",
1470 "nested_struct_element"
1471 ])
1472 .unwrap()
1473 .as_any()
1474 .downcast_ref::<array::StringArray>()
1475 .unwrap(),
1476 &array::StringArray::from(vec!["nested_struct_value"]),
1477 );
1478
1479 assert_eq!(
1480 actions
1481 .get_field_at_path(&[
1482 "max",
1483 "nested_struct",
1484 "struct_element",
1485 "nested_struct_element"
1486 ])
1487 .unwrap()
1488 .as_any()
1489 .downcast_ref::<array::StringArray>()
1490 .unwrap(),
1491 &array::StringArray::from(vec!["nested_struct_value"]),
1492 );
1493
1494 assert_eq!(
1495 actions
1496 .get_field_at_path(&["null_count", "struct_of_array_of_map", "struct_element"])
1497 .unwrap()
1498 .as_any()
1499 .downcast_ref::<array::Int64Array>()
1500 .unwrap(),
1501 &array::Int64Array::from(vec![0])
1502 );
1503
1504 assert_eq!(
1505 actions
1506 .get_field_at_path(&["tags", "OPTIMIZE_TARGET_SIZE"])
1507 .unwrap()
1508 .as_any()
1509 .downcast_ref::<array::StringArray>()
1510 .unwrap(),
1511 &array::StringArray::from(vec!["268435456"])
1512 );
1513 }
1514
1515 trait NestedTabular {
1517 fn get_field_at_path(&self, path: &[&str]) -> Option<ArrayRef>;
1518 }
1519
1520 impl NestedTabular for RecordBatch {
1521 fn get_field_at_path(&self, path: &[&str]) -> Option<ArrayRef> {
1522 let (first_key, remainder) = path.split_at(1);
1524 let mut col = self.column(self.schema().column_with_name(first_key[0])?.0);
1525
1526 if remainder.is_empty() {
1527 return Some(Arc::clone(col));
1528 }
1529
1530 for segment in remainder {
1531 col = col
1532 .as_any()
1533 .downcast_ref::<StructArray>()?
1534 .column_by_name(segment)?;
1535 }
1536
1537 Some(Arc::clone(col))
1538 }
1539 }
1540 }
1541}