deltalake_core/protocol/
checkpoints.rs

1//! Implementation for writing delta checkpoints.
2
3use std::collections::HashMap;
4use std::iter::Iterator;
5use std::sync::LazyLock;
6
7use arrow_json::ReaderBuilder;
8use arrow_schema::ArrowError;
9
10use chrono::{Datelike, NaiveDate, NaiveDateTime, Utc};
11use futures::{StreamExt, TryStreamExt};
12use itertools::Itertools;
13use object_store::{Error, ObjectStore};
14use parquet::arrow::ArrowWriter;
15use parquet::basic::Compression;
16use parquet::basic::Encoding;
17use parquet::errors::ParquetError;
18use parquet::file::properties::WriterProperties;
19use regex::Regex;
20use serde_json::Value;
21use tracing::{debug, error};
22use uuid::Uuid;
23
24use super::{time_utils, ProtocolError};
25use crate::kernel::arrow::delta_log_schema_for_table;
26use crate::kernel::{
27    Action, Add as AddAction, DataType, PrimitiveType, Protocol, Remove, StructField,
28};
29use crate::logstore::LogStore;
30use crate::table::state::DeltaTableState;
31use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
32use crate::{open_table_with_version, DeltaTable};
33
34type SchemaPath = Vec<String>;
35
36/// Error returned when there is an error during creating a checkpoint.
37#[derive(thiserror::Error, Debug)]
38enum CheckpointError {
39    /// Error returned when a string formatted partition value cannot be parsed to its appropriate
40    /// data type.
41    #[error("Partition value {0} cannot be parsed from string.")]
42    PartitionValueNotParseable(String),
43
44    /// Caller attempt to create a checkpoint for a version which does not exist on the table state
45    #[error("Attempted to create a checkpoint for a version {0} that does not match the table state {1}")]
46    StaleTableVersion(i64, i64),
47
48    /// Error returned when the parquet writer fails while writing the checkpoint.
49    #[error("Failed to write parquet: {}", .source)]
50    Parquet {
51        /// Parquet error details returned when writing the checkpoint failed.
52        #[from]
53        source: ParquetError,
54    },
55
56    /// Error returned when converting the schema to Arrow format failed.
57    #[error("Failed to convert into Arrow schema: {}", .source)]
58    Arrow {
59        /// Arrow error details returned when converting the schema in Arrow format failed
60        #[from]
61        source: ArrowError,
62    },
63
64    #[error("missing required action type in snapshot: {0}")]
65    MissingActionType(String),
66}
67
68impl From<CheckpointError> for ProtocolError {
69    fn from(value: CheckpointError) -> Self {
70        match value {
71            CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()),
72            CheckpointError::Arrow { source } => Self::Arrow { source },
73            CheckpointError::StaleTableVersion(..) => Self::Generic(value.to_string()),
74            CheckpointError::Parquet { source } => Self::ParquetParseError { source },
75            CheckpointError::MissingActionType(_) => Self::Generic(value.to_string()),
76        }
77    }
78}
79
80use core::str::Utf8Error;
81impl From<Utf8Error> for ProtocolError {
82    fn from(value: Utf8Error) -> Self {
83        Self::Generic(value.to_string())
84    }
85}
86
87/// The record batch size for checkpoint parquet file
88pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;
89
90/// Creates checkpoint at current table version
91pub async fn create_checkpoint(
92    table: &DeltaTable,
93    operation_id: Option<Uuid>,
94) -> Result<(), ProtocolError> {
95    create_checkpoint_for(
96        table.version(),
97        table.snapshot().map_err(|_| ProtocolError::NoMetaData)?,
98        table.log_store.as_ref(),
99        operation_id,
100    )
101    .await?;
102    Ok(())
103}
104
105/// Delete expires log files before given version from table. The table log retention is based on
106/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
107pub async fn cleanup_metadata(
108    table: &DeltaTable,
109    operation_id: Option<Uuid>,
110) -> Result<usize, ProtocolError> {
111    let log_retention_timestamp = Utc::now().timestamp_millis()
112        - table
113            .snapshot()
114            .map_err(|_| ProtocolError::NoMetaData)?
115            .table_config()
116            .log_retention_duration()
117            .as_millis() as i64;
118    cleanup_expired_logs_for(
119        table.version(),
120        table.log_store.as_ref(),
121        log_retention_timestamp,
122        operation_id,
123    )
124    .await
125}
126
127/// Loads table from given `table_uri` at given `version` and creates checkpoint for it.
128/// The `cleanup` param decides whether to run metadata cleanup of obsolete logs.
129/// If it's empty then the table's `enableExpiredLogCleanup` is used.
130pub async fn create_checkpoint_from_table_uri_and_cleanup(
131    table_uri: &str,
132    version: i64,
133    cleanup: Option<bool>,
134    operation_id: Option<Uuid>,
135) -> Result<(), ProtocolError> {
136    let table = open_table_with_version(table_uri, version)
137        .await
138        .map_err(|err| ProtocolError::Generic(err.to_string()))?;
139    let snapshot = table.snapshot().map_err(|_| ProtocolError::NoMetaData)?;
140    create_checkpoint_for(version, snapshot, table.log_store.as_ref(), None).await?;
141
142    let enable_expired_log_cleanup =
143        cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup());
144
145    if table.version() >= 0 && enable_expired_log_cleanup {
146        let deleted_log_num = cleanup_metadata(&table, operation_id).await?;
147        debug!("Deleted {deleted_log_num:?} log files.");
148    }
149
150    Ok(())
151}
152
153/// Creates checkpoint for a given table version, table state and object store
154pub async fn create_checkpoint_for(
155    version: i64,
156    state: &DeltaTableState,
157    log_store: &dyn LogStore,
158    operation_id: Option<Uuid>,
159) -> Result<(), ProtocolError> {
160    if !state.load_config().require_files {
161        return Err(ProtocolError::Generic(
162            "Table has not yet been initialized with files, therefore creating a checkpoint is not possible.".to_string()
163        ));
164    }
165
166    if version != state.version() {
167        error!(
168            "create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded",
169            state.version()
170        );
171        return Err(CheckpointError::StaleTableVersion(version, state.version()).into());
172    }
173
174    // TODO: checkpoints _can_ be multi-part... haven't actually found a good reference for
175    // an appropriate split point yet though so only writing a single part currently.
176    // See https://github.com/delta-io/delta-rs/issues/288
177    let last_checkpoint_path = log_store.log_path().child("_last_checkpoint");
178
179    debug!("Writing parquet bytes to checkpoint buffer.");
180    let tombstones = state
181        .unexpired_tombstones(log_store.object_store(None).clone())
182        .await
183        .map_err(|_| ProtocolError::Generic("filed to get tombstones".into()))?
184        .collect::<Vec<_>>();
185    let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state, tombstones)?;
186
187    let file_name = format!("{version:020}.checkpoint.parquet");
188    let checkpoint_path = log_store.log_path().child(file_name);
189
190    let object_store = log_store.object_store(operation_id);
191    debug!("Writing checkpoint to {checkpoint_path:?}.");
192    object_store
193        .put(&checkpoint_path, parquet_bytes.into())
194        .await?;
195
196    let last_checkpoint_content: Value = serde_json::to_value(checkpoint)?;
197    let last_checkpoint_content = bytes::Bytes::from(serde_json::to_vec(&last_checkpoint_content)?);
198
199    debug!("Writing _last_checkpoint to {last_checkpoint_path:?}.");
200    object_store
201        .put(&last_checkpoint_path, last_checkpoint_content.into())
202        .await?;
203
204    Ok(())
205}
206
207/// Deletes all delta log commits that are older than the cutoff time
208/// and less than the specified version.
209pub async fn cleanup_expired_logs_for(
210    until_version: i64,
211    log_store: &dyn LogStore,
212    cutoff_timestamp: i64,
213    operation_id: Option<Uuid>,
214) -> Result<usize, ProtocolError> {
215    static DELTA_LOG_REGEX: LazyLock<Regex> = LazyLock::new(|| {
216        Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap()
217    });
218
219    let object_store = log_store.object_store(None);
220    let maybe_last_checkpoint = object_store
221        .get(&log_store.log_path().child("_last_checkpoint"))
222        .await;
223
224    if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint {
225        return Ok(0);
226    }
227
228    let last_checkpoint = maybe_last_checkpoint?.bytes().await?;
229    let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint)?;
230    let until_version = i64::min(until_version, last_checkpoint.version);
231
232    // Feed a stream of candidate deletion files directly into the delete_stream
233    // function to try to improve the speed of cleanup and reduce the need for
234    // intermediate memory.
235    let object_store = log_store.object_store(operation_id);
236    let deleted = object_store
237        .delete_stream(
238            object_store
239                .list(Some(log_store.log_path()))
240                // This predicate function will filter out any locations that don't
241                // match the given timestamp range
242                .filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
243                    if meta.is_err() {
244                        error!("Error received while cleaning up expired logs: {meta:?}");
245                        return None;
246                    }
247                    let meta = meta.unwrap();
248                    let ts = meta.last_modified.timestamp_millis();
249
250                    match DELTA_LOG_REGEX.captures(meta.location.as_ref()) {
251                        Some(captures) => {
252                            let log_ver_str = captures.get(1).unwrap().as_str();
253                            let log_ver: i64 = log_ver_str.parse().unwrap();
254                            if log_ver < until_version && ts <= cutoff_timestamp {
255                                // This location is ready to be deleted
256                                Some(Ok(meta.location))
257                            } else {
258                                None
259                            }
260                        }
261                        None => None,
262                    }
263                })
264                .boxed(),
265        )
266        .try_collect::<Vec<_>>()
267        .await?;
268
269    debug!("Deleted {} expired logs", deleted.len());
270    Ok(deleted.len())
271}
272
273fn parquet_bytes_from_state(
274    state: &DeltaTableState,
275    mut tombstones: Vec<Remove>,
276) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
277    let current_metadata = state.metadata();
278    let schema = current_metadata.schema()?;
279
280    let partition_col_data_types = get_partition_col_data_types(&schema, current_metadata);
281
282    // Collect a map of paths that require special stats conversion.
283    let mut stats_conversions: Vec<(SchemaPath, DataType)> = Vec::new();
284    let fields = schema.fields().collect_vec();
285    collect_stats_conversions(&mut stats_conversions, fields.as_slice());
286
287    // if any, tombstones do not include extended file metadata, we must omit the extended metadata fields from the remove schema
288    // See https://github.com/delta-io/delta/blob/master/PROTOCOL.md#add-file-and-remove-file
289    //
290    // DBR version 8.x and greater have different behaviors of reading the parquet file depending
291    // on the `extended_file_metadata` flag, hence this is safer to set `extended_file_metadata=false`
292    // and omit metadata columns if at least one remove action has `extended_file_metadata=false`.
293    // We've added the additional check on `size.is_some` because in delta-spark the primitive long type
294    // is used, hence we want to omit possible errors when `extended_file_metadata=true`, but `size=null`
295    let use_extended_remove_schema = tombstones
296        .iter()
297        .all(|r| r.extended_file_metadata == Some(true) && r.size.is_some());
298
299    // If use_extended_remove_schema=false for some of the tombstones, then it should be for each.
300    if !use_extended_remove_schema {
301        for remove in tombstones.iter_mut() {
302            remove.extended_file_metadata = Some(false);
303        }
304    }
305    let files = state
306        .file_actions_iter()
307        .map_err(|e| ProtocolError::Generic(e.to_string()))?;
308    // protocol
309    let jsons = std::iter::once(Action::Protocol(Protocol {
310        min_reader_version: state.protocol().min_reader_version,
311        min_writer_version: state.protocol().min_writer_version,
312        writer_features: if state.protocol().min_writer_version >= 7 {
313            Some(state.protocol().writer_features.clone().unwrap_or_default())
314        } else {
315            None
316        },
317        reader_features: if state.protocol().min_reader_version >= 3 {
318            Some(state.protocol().reader_features.clone().unwrap_or_default())
319        } else {
320            None
321        },
322    }))
323    // metaData
324    .chain(std::iter::once(Action::Metadata(current_metadata.clone())))
325    // txns
326    .chain(
327        state
328            .app_transaction_version()
329            .map_err(|_| CheckpointError::MissingActionType("txn".to_string()))?
330            .map(Action::Txn),
331    )
332    // removes
333    .chain(tombstones.iter().map(|r| {
334        let mut r = (*r).clone();
335
336        // As a "new writer", we should always set `extendedFileMetadata` when writing, and include/ignore the other three fields accordingly.
337        // https://github.com/delta-io/delta/blob/fb0452c2fb142310211c6d3604eefb767bb4a134/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L311-L314
338        if r.extended_file_metadata.is_none() {
339            r.extended_file_metadata = Some(false);
340        }
341
342        Action::Remove(r)
343    }))
344    .map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
345    // adds
346    .chain(files.map(|f| {
347        checkpoint_add_from_state(
348            &f,
349            partition_col_data_types.as_slice(),
350            &stats_conversions,
351            state.table_config().write_stats_as_json(),
352            state.table_config().write_stats_as_struct(),
353        )
354    }));
355
356    // Create the arrow schema that represents the Checkpoint parquet file.
357    let arrow_schema = delta_log_schema_for_table(
358        (&schema).try_into()?,
359        current_metadata.partition_columns.as_slice(),
360        use_extended_remove_schema,
361        state.table_config().write_stats_as_json(),
362        state.table_config().write_stats_as_struct(),
363    );
364
365    debug!("Writing to checkpoint parquet buffer...");
366
367    let writer_properties = if state.table_config().use_checkpoint_rle() {
368        WriterProperties::builder()
369            .set_compression(Compression::SNAPPY)
370            .build()
371    } else {
372        WriterProperties::builder()
373            .set_compression(Compression::SNAPPY)
374            .set_dictionary_enabled(false)
375            .set_encoding(Encoding::PLAIN)
376            .build()
377    };
378
379    // Write the Checkpoint parquet file.
380    let mut bytes = vec![];
381    let mut writer =
382        ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), Some(writer_properties))?;
383    let mut decoder = ReaderBuilder::new(arrow_schema)
384        .with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
385        .build_decoder()?;
386
387    // Count of actions
388    let mut total_actions = 0;
389
390    let span = tracing::debug_span!("serialize_checkpoint").entered();
391    for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) {
392        let mut buf = Vec::new();
393        for j in chunk {
394            serde_json::to_writer(&mut buf, &j?)?;
395            total_actions += 1;
396        }
397        let _ = decoder.decode(&buf)?;
398        while let Some(batch) = decoder.flush()? {
399            writer.write(&batch)?;
400        }
401    }
402    drop(span);
403
404    let _ = writer.close()?;
405    debug!(total_actions, "Finished writing checkpoint parquet buffer.");
406
407    let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
408        .with_size_in_bytes(bytes.len() as i64)
409        .build();
410    Ok((checkpoint, bytes::Bytes::from(bytes)))
411}
412
413fn checkpoint_add_from_state(
414    add: &AddAction,
415    partition_col_data_types: &[(&String, &DataType)],
416    stats_conversions: &[(SchemaPath, DataType)],
417    write_stats_as_json: bool,
418    write_stats_as_struct: bool,
419) -> Result<Value, ProtocolError> {
420    let mut v = serde_json::to_value(Action::Add(add.clone()))
421        .map_err(|err| ArrowError::JsonError(err.to_string()))?;
422
423    v["add"]["dataChange"] = Value::Bool(false);
424
425    // Only created partitionValues_parsed when delta.checkpoint.writeStatsAsStruct is enabled
426    if !add.partition_values.is_empty() && write_stats_as_struct {
427        let mut partition_values_parsed: HashMap<String, Value> = HashMap::new();
428
429        for (field_name, data_type) in partition_col_data_types.iter() {
430            if let Some(string_value) = add.partition_values.get(*field_name) {
431                let v = typed_partition_value_from_option_string(string_value, data_type)?;
432
433                partition_values_parsed.insert(field_name.to_string(), v);
434            }
435        }
436
437        let partition_values_parsed = serde_json::to_value(partition_values_parsed)
438            .map_err(|err| ArrowError::JsonError(err.to_string()))?;
439        v["add"]["partitionValues_parsed"] = partition_values_parsed;
440    }
441
442    // Only created stats_parsed when delta.checkpoint.writeStatsAsStruct is enabled
443    if write_stats_as_struct {
444        if let Ok(Some(stats)) = add.get_stats() {
445            let mut stats = serde_json::to_value(stats)
446                .map_err(|err| ArrowError::JsonError(err.to_string()))?;
447            let min_values = stats.get_mut("minValues").and_then(|v| v.as_object_mut());
448
449            if let Some(min_values) = min_values {
450                for (path, data_type) in stats_conversions {
451                    apply_stats_conversion(min_values, path.as_slice(), data_type)
452                }
453            }
454
455            let max_values = stats.get_mut("maxValues").and_then(|v| v.as_object_mut());
456            if let Some(max_values) = max_values {
457                for (path, data_type) in stats_conversions {
458                    apply_stats_conversion(max_values, path.as_slice(), data_type)
459                }
460            }
461
462            v["add"]["stats_parsed"] = stats;
463        }
464    }
465
466    // Don't write stats when delta.checkpoint.writeStatsAsJson is disabled
467    if !write_stats_as_json {
468        v.get_mut("add")
469            .and_then(|v| v.as_object_mut())
470            .and_then(|v| v.remove("stats"));
471    }
472    Ok(v)
473}
474
475fn typed_partition_value_from_string(
476    string_value: &str,
477    data_type: &DataType,
478) -> Result<Value, ProtocolError> {
479    match data_type {
480        DataType::Primitive(primitive_type) => match primitive_type {
481            PrimitiveType::String | PrimitiveType::Binary => Ok(string_value.to_owned().into()),
482            PrimitiveType::Long
483            | PrimitiveType::Integer
484            | PrimitiveType::Short
485            | PrimitiveType::Byte => Ok(string_value
486                .parse::<i64>()
487                .map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
488                .into()),
489            PrimitiveType::Boolean => Ok(string_value
490                .parse::<bool>()
491                .map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
492                .into()),
493            PrimitiveType::Float | PrimitiveType::Double => Ok(string_value
494                .parse::<f64>()
495                .map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
496                .into()),
497            PrimitiveType::Date => {
498                let d = NaiveDate::parse_from_str(string_value, "%Y-%m-%d").map_err(|_| {
499                    CheckpointError::PartitionValueNotParseable(string_value.to_owned())
500                })?;
501                // day 0 is 1970-01-01 (719163 days from ce)
502                Ok((d.num_days_from_ce() - 719_163).into())
503            }
504            PrimitiveType::Timestamp | PrimitiveType::TimestampNtz => {
505                let ts = NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S.%6f");
506                let ts: NaiveDateTime = match ts {
507                    Ok(_) => ts,
508                    Err(_) => NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S"),
509                }
510                .map_err(|_| {
511                    CheckpointError::PartitionValueNotParseable(string_value.to_owned())
512                })?;
513                Ok((ts.and_utc().timestamp_millis() * 1000).into())
514            }
515            s => unimplemented!("Primitive type {s} is not supported for partition column values."),
516        },
517        d => unimplemented!("Data type {d:?} is not supported for partition column values."),
518    }
519}
520
521fn typed_partition_value_from_option_string(
522    string_value: &Option<String>,
523    data_type: &DataType,
524) -> Result<Value, ProtocolError> {
525    match string_value {
526        Some(s) => {
527            if s.is_empty() {
528                Ok(Value::Null) // empty string should be deserialized as null
529            } else {
530                typed_partition_value_from_string(s, data_type)
531            }
532        }
533        None => Ok(Value::Null),
534    }
535}
536
537fn collect_stats_conversions(paths: &mut Vec<(SchemaPath, DataType)>, fields: &[&StructField]) {
538    let mut _path = SchemaPath::new();
539    fields
540        .iter()
541        .for_each(|f| collect_field_conversion(&mut _path, paths, f));
542}
543
544fn collect_field_conversion(
545    current_path: &mut SchemaPath,
546    all_paths: &mut Vec<(SchemaPath, DataType)>,
547    field: &StructField,
548) {
549    match field.data_type() {
550        DataType::Primitive(PrimitiveType::Timestamp) => {
551            let mut key_path = current_path.clone();
552            key_path.push(field.name().to_owned());
553            all_paths.push((key_path, field.data_type().to_owned()));
554        }
555        DataType::Struct(struct_field) => {
556            let struct_fields = struct_field.fields();
557            current_path.push(field.name().to_owned());
558            struct_fields.for_each(|f| collect_field_conversion(current_path, all_paths, f));
559            current_path.pop();
560        }
561        _ => { /* noop */ }
562    }
563}
564
565fn apply_stats_conversion(
566    context: &mut serde_json::Map<String, Value>,
567    path: &[String],
568    data_type: &DataType,
569) {
570    if path.len() == 1 {
571        if let DataType::Primitive(PrimitiveType::Timestamp) = data_type {
572            let v = context.get_mut(&path[0]);
573
574            if let Some(v) = v {
575                let ts = v
576                    .as_str()
577                    .and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
578                    .map(|n| Value::Number(serde_json::Number::from(n)));
579
580                if let Some(ts) = ts {
581                    *v = ts;
582                }
583            }
584        }
585    } else {
586        let next_context = context.get_mut(&path[0]).and_then(|v| v.as_object_mut());
587        if let Some(next_context) = next_context {
588            apply_stats_conversion(next_context, &path[1..], data_type);
589        }
590    }
591}
592
593#[cfg(test)]
594mod tests {
595    use std::sync::Arc;
596
597    use arrow_array::builder::{Int32Builder, ListBuilder, StructBuilder};
598    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
599    use arrow_schema::Schema as ArrowSchema;
600    use chrono::Duration;
601    use object_store::path::Path;
602    use serde_json::json;
603
604    use super::*;
605    use crate::kernel::transaction::{CommitBuilder, TableReference};
606    use crate::kernel::StructType;
607    use crate::operations::DeltaOps;
608    use crate::protocol::Metadata;
609    use crate::writer::test_utils::get_delta_schema;
610    use crate::DeltaResult;
611
612    #[tokio::test]
613    async fn test_create_checkpoint_for() {
614        let table_schema = get_delta_schema();
615
616        let table = DeltaOps::new_in_memory()
617            .create()
618            .with_columns(table_schema.fields().cloned())
619            .with_save_mode(crate::protocol::SaveMode::Ignore)
620            .await
621            .unwrap();
622        assert_eq!(table.version(), 0);
623        assert_eq!(table.get_schema().unwrap(), &table_schema);
624        let res =
625            create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref(), None)
626                .await;
627        assert!(res.is_ok());
628
629        // Look at the "files" and verify that the _last_checkpoint has the right version
630        let path = Path::from("_delta_log/_last_checkpoint");
631        let last_checkpoint = table
632            .object_store()
633            .get(&path)
634            .await
635            .expect("Failed to get the _last_checkpoint")
636            .bytes()
637            .await
638            .expect("Failed to get bytes for _last_checkpoint");
639        let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
640        assert_eq!(last_checkpoint.version, 0);
641    }
642
643    /// This test validates that a checkpoint can be written and re-read with the minimum viable
644    /// Metadata. There was a bug which didn't handle the optionality of createdTime.
645    #[tokio::test]
646    async fn test_create_checkpoint_with_metadata() {
647        let table_schema = get_delta_schema();
648
649        let mut table = DeltaOps::new_in_memory()
650            .create()
651            .with_columns(table_schema.fields().cloned())
652            .with_save_mode(crate::protocol::SaveMode::Ignore)
653            .await
654            .unwrap();
655        assert_eq!(table.version(), 0);
656        assert_eq!(table.get_schema().unwrap(), &table_schema);
657
658        let part_cols: Vec<String> = vec![];
659        let metadata = Metadata::try_new(table_schema, part_cols, HashMap::new()).unwrap();
660        let actions = vec![Action::Metadata(metadata)];
661
662        let epoch_id = std::time::SystemTime::now()
663            .duration_since(std::time::UNIX_EPOCH)
664            .expect("Time went backwards")
665            .as_millis() as i64;
666
667        let operation = crate::protocol::DeltaOperation::StreamingUpdate {
668            output_mode: crate::protocol::OutputMode::Append,
669            query_id: "test".into(),
670            epoch_id,
671        };
672        let finalized_commit = CommitBuilder::default()
673            .with_actions(actions)
674            .build(
675                table.state.as_ref().map(|f| f as &dyn TableReference),
676                table.log_store(),
677                operation,
678            )
679            .await
680            .unwrap();
681
682        assert_eq!(
683            1,
684            finalized_commit.version(),
685            "Expected the commit to create table version 1"
686        );
687        assert_eq!(
688            0, finalized_commit.metrics.num_retries,
689            "Expected no retries"
690        );
691        assert_eq!(
692            0, finalized_commit.metrics.num_log_files_cleaned_up,
693            "Expected no log files cleaned up"
694        );
695        assert!(
696            !finalized_commit.metrics.new_checkpoint_created,
697            "Expected checkpoint created."
698        );
699        table.load().await.expect("Failed to reload table");
700        assert_eq!(
701            table.version(),
702            1,
703            "The loaded version of the table is not up to date"
704        );
705
706        let res = create_checkpoint_for(
707            table.version(),
708            table.state.as_ref().unwrap(),
709            table.log_store.as_ref(),
710            None,
711        )
712        .await;
713        assert!(res.is_ok());
714
715        // Look at the "files" and verify that the _last_checkpoint has the right version
716        let path = Path::from("_delta_log/_last_checkpoint");
717        let last_checkpoint = table
718            .object_store()
719            .get(&path)
720            .await
721            .expect("Failed to get the _last_checkpoint")
722            .bytes()
723            .await
724            .expect("Failed to get bytes for _last_checkpoint");
725        let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
726        assert_eq!(last_checkpoint.version, 1);
727
728        // If the regression exists, this will fail
729        table.load().await.expect("Failed to reload the table, this likely means that the optional createdTime was not actually optional");
730        assert_eq!(
731            1,
732            table.version(),
733            "The reloaded table doesn't have the right version"
734        );
735    }
736
737    #[tokio::test]
738    async fn test_create_checkpoint_for_invalid_version() {
739        let table_schema = get_delta_schema();
740
741        let table = DeltaOps::new_in_memory()
742            .create()
743            .with_columns(table_schema.fields().cloned())
744            .with_save_mode(crate::protocol::SaveMode::Ignore)
745            .await
746            .unwrap();
747        assert_eq!(table.version(), 0);
748        assert_eq!(table.get_schema().unwrap(), &table_schema);
749        match create_checkpoint_for(1, table.snapshot().unwrap(), table.log_store.as_ref(), None)
750            .await
751        {
752            Ok(_) => {
753                /*
754                 * If a checkpoint is allowed to be created here, it will use the passed in
755                 * version, but _last_checkpoint is generated from the table state will point to a
756                 * version 0 checkpoint.
757                 * E.g.
758                 *
759                 * Path { raw: "_delta_log/00000000000000000000.json" }
760                 * Path { raw: "_delta_log/00000000000000000001.checkpoint.parquet" }
761                 * Path { raw: "_delta_log/_last_checkpoint" }
762                 *
763                 */
764                panic!(
765                    "We should not allow creating a checkpoint for a version which doesn't exist!"
766                );
767            }
768            Err(_) => { /* We should expect an error in the "right" case */ }
769        }
770    }
771
772    #[test]
773    fn typed_partition_value_from_string_test() {
774        let string_value: Value = "Hello World!".into();
775        assert_eq!(
776            string_value,
777            typed_partition_value_from_option_string(
778                &Some("Hello World!".to_string()),
779                &DataType::Primitive(PrimitiveType::String),
780            )
781            .unwrap()
782        );
783
784        let bool_value: Value = true.into();
785        assert_eq!(
786            bool_value,
787            typed_partition_value_from_option_string(
788                &Some("true".to_string()),
789                &DataType::Primitive(PrimitiveType::Boolean),
790            )
791            .unwrap()
792        );
793
794        let number_value: Value = 42.into();
795        assert_eq!(
796            number_value,
797            typed_partition_value_from_option_string(
798                &Some("42".to_string()),
799                &DataType::Primitive(PrimitiveType::Integer),
800            )
801            .unwrap()
802        );
803
804        for (s, v) in [
805            ("2021-08-08", 18_847),
806            ("1970-01-02", 1),
807            ("1970-01-01", 0),
808            ("1969-12-31", -1),
809            ("1-01-01", -719_162),
810        ] {
811            let date_value: Value = v.into();
812            assert_eq!(
813                date_value,
814                typed_partition_value_from_option_string(
815                    &Some(s.to_string()),
816                    &DataType::Primitive(PrimitiveType::Date),
817                )
818                .unwrap()
819            );
820        }
821
822        for (s, v) in [
823            ("2021-08-08 01:00:01.000000", 1628384401000000i64),
824            ("2021-08-08 01:00:01", 1628384401000000i64),
825            ("1970-01-02 12:59:59.000000", 133199000000i64),
826            ("1970-01-02 12:59:59", 133199000000i64),
827            ("1970-01-01 13:00:01.000000", 46801000000i64),
828            ("1970-01-01 13:00:01", 46801000000i64),
829            ("1969-12-31 00:00:00", -86400000000i64),
830            ("1677-09-21 00:12:44", -9223372036000000i64),
831        ] {
832            let timestamp_value: Value = v.into();
833            assert_eq!(
834                timestamp_value,
835                typed_partition_value_from_option_string(
836                    &Some(s.to_string()),
837                    &DataType::Primitive(PrimitiveType::Timestamp),
838                )
839                .unwrap()
840            );
841        }
842
843        let binary_value: Value = "\u{2081}\u{2082}\u{2083}\u{2084}".into();
844        assert_eq!(
845            binary_value,
846            typed_partition_value_from_option_string(
847                &Some("₁₂₃₄".to_string()),
848                &DataType::Primitive(PrimitiveType::Binary),
849            )
850            .unwrap()
851        );
852    }
853
854    #[test]
855    fn null_partition_value_from_string_test() {
856        assert_eq!(
857            Value::Null,
858            typed_partition_value_from_option_string(
859                &None,
860                &DataType::Primitive(PrimitiveType::Integer),
861            )
862            .unwrap()
863        );
864
865        // empty string should be treated as null
866        assert_eq!(
867            Value::Null,
868            typed_partition_value_from_option_string(
869                &Some("".to_string()),
870                &DataType::Primitive(PrimitiveType::Integer),
871            )
872            .unwrap()
873        );
874    }
875
876    #[test]
877    fn collect_stats_conversions_test() {
878        let delta_schema: StructType = serde_json::from_value(SCHEMA.clone()).unwrap();
879        let fields = delta_schema.fields().collect_vec();
880        let mut paths = Vec::new();
881        collect_stats_conversions(&mut paths, fields.as_slice());
882
883        assert_eq!(2, paths.len());
884        assert_eq!(
885            (
886                vec!["some_struct".to_string(), "struct_timestamp".to_string()],
887                DataType::Primitive(PrimitiveType::Timestamp)
888            ),
889            paths[0]
890        );
891        assert_eq!(
892            (
893                vec!["some_timestamp".to_string()],
894                DataType::Primitive(PrimitiveType::Timestamp)
895            ),
896            paths[1]
897        );
898    }
899
900    async fn setup_table() -> DeltaTable {
901        use arrow_schema::{DataType, Field};
902        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
903            "id",
904            DataType::Utf8,
905            false,
906        )]));
907
908        let data =
909            vec![Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef];
910        let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];
911
912        let table = DeltaOps::new_in_memory()
913            .write(batches.clone())
914            .await
915            .unwrap();
916
917        DeltaOps(table)
918            .write(batches)
919            .with_save_mode(crate::protocol::SaveMode::Overwrite)
920            .await
921            .unwrap()
922    }
923
924    #[tokio::test]
925    async fn test_cleanup_no_checkpoints() {
926        // Test that metadata clean up does not corrupt the table when no checkpoints exist
927        let table = setup_table().await;
928
929        let log_retention_timestamp = (Utc::now().timestamp_millis()
930            + Duration::days(31).num_milliseconds())
931            - table
932                .snapshot()
933                .unwrap()
934                .table_config()
935                .log_retention_duration()
936                .as_millis() as i64;
937        let count = cleanup_expired_logs_for(
938            table.version(),
939            table.log_store().as_ref(),
940            log_retention_timestamp,
941            None,
942        )
943        .await
944        .unwrap();
945        assert_eq!(count, 0);
946        println!("{count:?}");
947
948        let path = Path::from("_delta_log/00000000000000000000.json");
949        let res = table.log_store().object_store(None).get(&path).await;
950        assert!(res.is_ok());
951    }
952
953    #[tokio::test]
954    async fn test_cleanup_with_checkpoints() {
955        let table = setup_table().await;
956        create_checkpoint(&table, None).await.unwrap();
957
958        let log_retention_timestamp = (Utc::now().timestamp_millis()
959            + Duration::days(32).num_milliseconds())
960            - table
961                .snapshot()
962                .unwrap()
963                .table_config()
964                .log_retention_duration()
965                .as_millis() as i64;
966        let count = cleanup_expired_logs_for(
967            table.version(),
968            table.log_store().as_ref(),
969            log_retention_timestamp,
970            None,
971        )
972        .await
973        .unwrap();
974        assert_eq!(count, 1);
975
976        let log_store = table.log_store();
977
978        let path = log_store.log_path().child("00000000000000000000.json");
979        let res = table.log_store().object_store(None).get(&path).await;
980        assert!(res.is_err());
981
982        let path = log_store
983            .log_path()
984            .child("00000000000000000001.checkpoint.parquet");
985        let res = table.log_store().object_store(None).get(&path).await;
986        assert!(res.is_ok());
987
988        let path = log_store.log_path().child("00000000000000000001.json");
989        let res = table.log_store().object_store(None).get(&path).await;
990        assert!(res.is_ok());
991    }
992
993    #[test]
994    fn apply_stats_conversion_test() {
995        let mut stats = STATS_JSON.clone();
996
997        let min_values = stats.get_mut("minValues").unwrap().as_object_mut().unwrap();
998
999        apply_stats_conversion(
1000            min_values,
1001            &["some_struct".to_string(), "struct_string".to_string()],
1002            &DataType::Primitive(PrimitiveType::String),
1003        );
1004        apply_stats_conversion(
1005            min_values,
1006            &["some_struct".to_string(), "struct_timestamp".to_string()],
1007            &DataType::Primitive(PrimitiveType::Timestamp),
1008        );
1009        apply_stats_conversion(
1010            min_values,
1011            &["some_string".to_string()],
1012            &DataType::Primitive(PrimitiveType::String),
1013        );
1014        apply_stats_conversion(
1015            min_values,
1016            &["some_timestamp".to_string()],
1017            &DataType::Primitive(PrimitiveType::Timestamp),
1018        );
1019
1020        let max_values = stats.get_mut("maxValues").unwrap().as_object_mut().unwrap();
1021
1022        apply_stats_conversion(
1023            max_values,
1024            &["some_struct".to_string(), "struct_string".to_string()],
1025            &DataType::Primitive(PrimitiveType::String),
1026        );
1027        apply_stats_conversion(
1028            max_values,
1029            &["some_struct".to_string(), "struct_timestamp".to_string()],
1030            &DataType::Primitive(PrimitiveType::Timestamp),
1031        );
1032        apply_stats_conversion(
1033            max_values,
1034            &["some_string".to_string()],
1035            &DataType::Primitive(PrimitiveType::String),
1036        );
1037        apply_stats_conversion(
1038            max_values,
1039            &["some_timestamp".to_string()],
1040            &DataType::Primitive(PrimitiveType::Timestamp),
1041        );
1042
1043        // minValues
1044        assert_eq!(
1045            "A",
1046            stats["minValues"]["some_struct"]["struct_string"]
1047                .as_str()
1048                .unwrap()
1049        );
1050        assert_eq!(
1051            1627668684594000i64,
1052            stats["minValues"]["some_struct"]["struct_timestamp"]
1053                .as_i64()
1054                .unwrap()
1055        );
1056        assert_eq!("P", stats["minValues"]["some_string"].as_str().unwrap());
1057        assert_eq!(
1058            1627668684594000i64,
1059            stats["minValues"]["some_timestamp"].as_i64().unwrap()
1060        );
1061
1062        // maxValues
1063        assert_eq!(
1064            "B",
1065            stats["maxValues"]["some_struct"]["struct_string"]
1066                .as_str()
1067                .unwrap()
1068        );
1069        assert_eq!(
1070            1627668685594000i64,
1071            stats["maxValues"]["some_struct"]["struct_timestamp"]
1072                .as_i64()
1073                .unwrap()
1074        );
1075        assert_eq!("Q", stats["maxValues"]["some_string"].as_str().unwrap());
1076        assert_eq!(
1077            1627668685594000i64,
1078            stats["maxValues"]["some_timestamp"].as_i64().unwrap()
1079        );
1080    }
1081
1082    #[tokio::test]
1083    async fn test_struct_with_single_list_field() {
1084        // you need another column otherwise the entire stats struct is empty
1085        // which also fails parquet write during checkpoint
1086        let other_column_array: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1087
1088        let mut list_item_builder = Int32Builder::new();
1089        list_item_builder.append_value(1);
1090
1091        let mut list_in_struct_builder = ListBuilder::new(list_item_builder);
1092        list_in_struct_builder.append(true);
1093
1094        let mut struct_builder = StructBuilder::new(
1095            vec![arrow_schema::Field::new(
1096                "list_in_struct",
1097                arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1098                    "item",
1099                    arrow_schema::DataType::Int32,
1100                    true,
1101                ))),
1102                true,
1103            )],
1104            vec![Box::new(list_in_struct_builder)],
1105        );
1106        struct_builder.append(true);
1107
1108        let struct_with_list_array: ArrayRef = Arc::new(struct_builder.finish());
1109        let batch = RecordBatch::try_from_iter(vec![
1110            ("other_column", other_column_array),
1111            ("struct_with_list", struct_with_list_array),
1112        ])
1113        .unwrap();
1114        let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap();
1115
1116        create_checkpoint(&table, None).await.unwrap();
1117    }
1118
1119    static SCHEMA: LazyLock<Value> = LazyLock::new(|| {
1120        json!({
1121            "type": "struct",
1122            "fields": [
1123                {
1124                    "name": "some_struct",
1125                    "type": {
1126                        "type": "struct",
1127                        "fields": [
1128                            {
1129                                "name": "struct_string",
1130                                "type": "string",
1131                                "nullable": true, "metadata": {}
1132                            },
1133                            {
1134                                "name": "struct_timestamp",
1135                                "type": "timestamp",
1136                                "nullable": true, "metadata": {}
1137                            }]
1138                    },
1139                    "nullable": true, "metadata": {}
1140                },
1141                { "name": "some_string", "type": "string", "nullable": true, "metadata": {} },
1142                { "name": "some_timestamp", "type": "timestamp", "nullable": true, "metadata": {} },
1143            ]
1144        })
1145    });
1146    static STATS_JSON: LazyLock<Value> = LazyLock::new(|| {
1147        json!({
1148            "minValues": {
1149                "some_struct": {
1150                    "struct_string": "A",
1151                    "struct_timestamp": "2021-07-30T18:11:24.594Z"
1152                },
1153                "some_string": "P",
1154                "some_timestamp": "2021-07-30T18:11:24.594Z"
1155            },
1156            "maxValues": {
1157                "some_struct": {
1158                    "struct_string": "B",
1159                    "struct_timestamp": "2021-07-30T18:11:25.594Z"
1160                },
1161                "some_string": "Q",
1162                "some_timestamp": "2021-07-30T18:11:25.594Z"
1163            }
1164        })
1165    });
1166
1167    #[ignore = "This test is only useful if the batch size has been made small"]
1168    #[tokio::test]
1169    async fn test_checkpoint_large_table() -> DeltaResult<()> {
1170        use crate::writer::test_utils::get_arrow_schema;
1171
1172        let table_schema = get_delta_schema();
1173        let temp_dir = tempfile::tempdir()?;
1174        let table_path = temp_dir.path().to_str().unwrap();
1175        let mut table = DeltaOps::try_from_uri(&table_path)
1176            .await?
1177            .create()
1178            .with_columns(table_schema.fields().cloned())
1179            .await
1180            .unwrap();
1181        assert_eq!(table.version(), 0);
1182        let count = 20;
1183
1184        for _ in 0..count {
1185            table.load().await?;
1186            let batch = RecordBatch::try_new(
1187                Arc::clone(&get_arrow_schema(&None)),
1188                vec![
1189                    Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
1190                    Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
1191                    Arc::new(arrow::array::StringArray::from(vec![
1192                        "2021-02-02",
1193                        "2021-02-03",
1194                        "2021-02-02",
1195                        "2021-02-04",
1196                    ])),
1197                ],
1198            )
1199            .unwrap();
1200            let _ = DeltaOps(table.clone()).write(vec![batch]).await?;
1201        }
1202
1203        table.load().await?;
1204        assert_eq!(table.version(), count, "Expected {count} transactions");
1205        let pre_checkpoint_actions = table.snapshot()?.file_actions()?;
1206
1207        let before = table.version();
1208        let res = create_checkpoint(&table, None).await;
1209        assert!(res.is_ok(), "Failed to create the checkpoint! {res:#?}");
1210
1211        let table = crate::open_table(&table_path).await?;
1212        assert_eq!(
1213            before,
1214            table.version(),
1215            "Why on earth did a checkpoint creata version?"
1216        );
1217
1218        let post_checkpoint_actions = table.snapshot()?.file_actions()?;
1219
1220        assert_eq!(
1221            pre_checkpoint_actions.len(),
1222            post_checkpoint_actions.len(),
1223            "The number of actions read from the table after checkpointing is wrong!"
1224        );
1225        Ok(())
1226    }
1227
1228    /// <https://github.com/delta-io/delta-rs/issues/3030>
1229    #[cfg(feature = "datafusion")]
1230    #[tokio::test]
1231    async fn test_create_checkpoint_overwrite() -> DeltaResult<()> {
1232        use crate::protocol::SaveMode;
1233        use crate::writer::test_utils::datafusion::get_data_sorted;
1234        use crate::writer::test_utils::get_arrow_schema;
1235        use datafusion::assert_batches_sorted_eq;
1236
1237        let tmp_dir = tempfile::tempdir().unwrap();
1238        let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
1239
1240        let batch = RecordBatch::try_new(
1241            Arc::clone(&get_arrow_schema(&None)),
1242            vec![
1243                Arc::new(arrow::array::StringArray::from(vec!["C"])),
1244                Arc::new(arrow::array::Int32Array::from(vec![30])),
1245                Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])),
1246            ],
1247        )
1248        .unwrap();
1249
1250        let mut table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap())
1251            .await?
1252            .write(vec![batch])
1253            .await?;
1254        table.load().await?;
1255        assert_eq!(table.version(), 0);
1256
1257        create_checkpoint(&table, None).await?;
1258
1259        let batch = RecordBatch::try_new(
1260            Arc::clone(&get_arrow_schema(&None)),
1261            vec![
1262                Arc::new(arrow::array::StringArray::from(vec!["A"])),
1263                Arc::new(arrow::array::Int32Array::from(vec![0])),
1264                Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])),
1265            ],
1266        )
1267        .unwrap();
1268
1269        let table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap())
1270            .await?
1271            .write(vec![batch])
1272            .with_save_mode(SaveMode::Overwrite)
1273            .await?;
1274        assert_eq!(table.version(), 1);
1275
1276        let expected = [
1277            "+----+-------+------------+",
1278            "| id | value | modified   |",
1279            "+----+-------+------------+",
1280            "| A  | 0     | 2021-02-02 |",
1281            "+----+-------+------------+",
1282        ];
1283        let actual = get_data_sorted(&table, "id,value,modified").await;
1284        assert_batches_sorted_eq!(&expected, &actual);
1285        Ok(())
1286    }
1287}