deltalake_core/protocol/
checkpoints.rs

1//! Implementation for writing delta checkpoints.
2
3use std::sync::LazyLock;
4
5use url::Url;
6
7use arrow::compute::filter_record_batch;
8use arrow_array::{BooleanArray, RecordBatch};
9use chrono::{TimeZone, Utc};
10use delta_kernel::engine::arrow_data::ArrowEngineData;
11use delta_kernel::engine_data::FilteredEngineData;
12use delta_kernel::snapshot::Snapshot;
13use delta_kernel::FileMeta;
14use futures::{StreamExt, TryStreamExt};
15use object_store::path::Path;
16use object_store::ObjectStore;
17use parquet::arrow::async_writer::ParquetObjectWriter;
18use parquet::arrow::AsyncArrowWriter;
19use regex::Regex;
20use tokio::task::spawn_blocking;
21use tracing::{debug, error};
22use uuid::Uuid;
23
24use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX};
25use crate::table::config::TablePropertiesExt as _;
26use crate::{open_table_with_version, DeltaTable};
27use crate::{DeltaResult, DeltaTableError};
28
29static CHECKPOINT_REGEX: LazyLock<Regex> =
30    LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap());
31
32/// Creates checkpoint for a given table version, table state and object store
33pub(crate) async fn create_checkpoint_for(
34    version: u64,
35    log_store: &dyn LogStore,
36    operation_id: Option<Uuid>,
37) -> DeltaResult<()> {
38    let table_root = if let Some(op_id) = operation_id {
39        #[allow(deprecated)]
40        log_store.transaction_url(op_id, &log_store.table_root_url())?
41    } else {
42        log_store.table_root_url()
43    };
44    let engine = log_store.engine(operation_id);
45
46    let task_engine = engine.clone();
47    let snapshot = spawn_blocking(move || {
48        Snapshot::builder_for(table_root)
49            .at_version(version)
50            .build(task_engine.as_ref())
51    })
52    .await
53    .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
54
55    let cp_writer = snapshot.checkpoint()?;
56
57    let cp_url = cp_writer.checkpoint_path()?;
58    let cp_path = Path::from_url_path(cp_url.path())?;
59    let mut cp_data = cp_writer.checkpoint_data(engine.as_ref())?;
60
61    let (first_batch, mut cp_data) = spawn_blocking(move || {
62        let Some(first_batch) = cp_data.next() else {
63            return Err(DeltaTableError::Generic("No data".to_string()));
64        };
65        Ok((to_rb(first_batch?)?, cp_data))
66    })
67    .await
68    .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
69
70    let root_store = log_store.root_object_store(operation_id);
71    let object_store_writer = ParquetObjectWriter::new(root_store.clone(), cp_path.clone());
72    let mut writer = AsyncArrowWriter::try_new(object_store_writer, first_batch.schema(), None)?;
73    writer.write(&first_batch).await?;
74
75    // Hold onto the schema used for future batches.
76    // This ensures that each batch is consistent since the kernel will yeet back the data that it
77    // read from prior checkpoints regardless of whether they are identical in schema.
78    //
79    // See: <https://github.com/delta-io/delta-rs/issues/3527>!
80    let checkpoint_schema = first_batch.schema();
81
82    let mut current_batch;
83    loop {
84        (current_batch, cp_data) = spawn_blocking(move || {
85            let Some(first_batch) = cp_data.next() else {
86                return Ok::<_, DeltaTableError>((None, cp_data));
87            };
88            Ok((Some(to_rb(first_batch?)?), cp_data))
89        })
90        .await
91        .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
92
93        let Some(batch) = current_batch else {
94            break;
95        };
96
97        // If the subsequently yielded batches do not match the first batch written for whatever
98        // reason, attempt to safely cast the batches to ensure a coherent checkpoint parquet file
99        //
100        // See also: <https://github.com/delta-io/delta-rs/issues/3527>
101        let batch = if batch.schema() != checkpoint_schema {
102            crate::cast_record_batch(&batch, checkpoint_schema.clone(), true, true)?
103        } else {
104            batch
105        };
106
107        writer.write(&batch).await?;
108    }
109
110    let _pq_meta = writer.close().await?;
111    let file_meta = root_store.head(&cp_path).await?;
112    let file_meta = FileMeta {
113        location: cp_url,
114        size: file_meta.size,
115        last_modified: file_meta.last_modified.timestamp_millis(),
116    };
117
118    spawn_blocking(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data))
119        .await
120        .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
121
122    Ok(())
123}
124
125fn to_rb(data: FilteredEngineData) -> DeltaResult<RecordBatch> {
126    let engine_data = ArrowEngineData::try_from_engine_data(data.data)?;
127    let predicate = BooleanArray::from(data.selection_vector);
128    let batch = filter_record_batch(engine_data.record_batch(), &predicate)?;
129    Ok(batch)
130}
131
132/// Creates checkpoint at current table version
133pub async fn create_checkpoint(table: &DeltaTable, operation_id: Option<Uuid>) -> DeltaResult<()> {
134    let snapshot = table.snapshot()?;
135    create_checkpoint_for(
136        snapshot.version() as u64,
137        table.log_store.as_ref(),
138        operation_id,
139    )
140    .await?;
141    Ok(())
142}
143
144/// Delete expires log files before given version from table. The table log retention is based on
145/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
146pub async fn cleanup_metadata(
147    table: &DeltaTable,
148    operation_id: Option<Uuid>,
149) -> DeltaResult<usize> {
150    let snapshot = table.snapshot()?;
151    let log_retention_timestamp = Utc::now().timestamp_millis()
152        - snapshot.table_config().log_retention_duration().as_millis() as i64;
153    cleanup_expired_logs_for(
154        snapshot.version(),
155        table.log_store.as_ref(),
156        log_retention_timestamp,
157        operation_id,
158    )
159    .await
160}
161
162/// Loads table from given `table_uri` at given `version` and creates checkpoint for it.
163/// The `cleanup` param decides whether to run metadata cleanup of obsolete logs.
164/// If it's empty then the table's `enableExpiredLogCleanup` is used.
165pub async fn create_checkpoint_from_table_uri_and_cleanup(
166    table_url: Url,
167    version: i64,
168    cleanup: Option<bool>,
169    operation_id: Option<Uuid>,
170) -> DeltaResult<()> {
171    let table = open_table_with_version(table_url, version).await?;
172    let snapshot = table.snapshot()?;
173    create_checkpoint_for(version as u64, table.log_store.as_ref(), operation_id).await?;
174
175    let enable_expired_log_cleanup =
176        cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup());
177
178    if snapshot.version() >= 0 && enable_expired_log_cleanup {
179        let deleted_log_num = cleanup_metadata(&table, operation_id).await?;
180        debug!("Deleted {deleted_log_num:?} log files.");
181    }
182
183    Ok(())
184}
185
186/// Delete expired Delta log files up to a safe checkpoint boundary.
187///
188/// This routine removes JSON commit files, in-progress JSON temp files, and
189/// checkpoint files under `_delta_log/` that are both:
190/// - older than the provided `cutoff_timestamp` (milliseconds since epoch), and
191/// - strictly less than the provided `until_version`.
192///
193/// Safety guarantee:
194/// To avoid deleting files that might still be required to reconstruct the
195/// table state at or before the requested cutoff, the function first identifies
196/// the most recent checkpoint whose version is `<= until_version` and whose file
197/// modification time is `<= cutoff_timestamp`. Only files strictly older than
198/// this checkpoint (both by version and timestamp) are considered for deletion.
199/// If no such checkpoint exists (including when there is no `_last_checkpoint`),
200/// the function performs no deletions and returns `Ok(0)`.
201///
202/// See also: https://github.com/delta-io/delta-rs/issues/3692 for background on
203/// why cleanup must align to an existing checkpoint.
204pub async fn cleanup_expired_logs_for(
205    mut keep_version: i64,
206    log_store: &dyn LogStore,
207    cutoff_timestamp: i64,
208    operation_id: Option<Uuid>,
209) -> DeltaResult<usize> {
210    debug!("called cleanup_expired_logs_for");
211    let object_store = log_store.object_store(operation_id);
212    let log_path = log_store.log_path();
213
214    // List all log entries under _delta_log
215    let log_entries: Vec<Result<crate::ObjectMeta, _>> =
216        object_store.list(Some(log_path)).collect().await;
217
218    debug!("starting keep_version: {:?}", keep_version);
219    debug!(
220        "starting cutoff_timestamp: {:?}",
221        Utc.timestamp_millis_opt(cutoff_timestamp).unwrap()
222    );
223
224    // Step 1: Find min_retention_version among DELTA_LOG files with ts >= cutoff_timestamp
225    let min_retention_version = log_entries
226        .iter()
227        .filter_map(|m| m.as_ref().ok())
228        .filter_map(|m| {
229            let path = m.location.as_ref();
230            DELTA_LOG_REGEX
231                .captures(path)
232                .and_then(|caps| caps.get(1))
233                .and_then(|v| v.as_str().parse::<i64>().ok())
234                .map(|ver| (ver, m.last_modified.timestamp_millis()))
235        })
236        .filter(|(_, ts)| *ts >= cutoff_timestamp)
237        .map(|(ver, _)| ver)
238        .min();
239
240    let min_retention_version = min_retention_version.unwrap_or(keep_version);
241
242    // Step 2: Move keep_version down to the minimum version inside the retention period to make sure
243    // every version inside the retention period is kept.
244    keep_version = keep_version.min(min_retention_version);
245
246    // Step 3: Find safe checkpoint with checkpoint_version <= keep_version (no ts restriction)
247    let safe_checkpoint_version_opt = log_entries
248        .iter()
249        .filter_map(|m| m.as_ref().ok())
250        .filter_map(|m| {
251            let path = m.location.as_ref();
252            CHECKPOINT_REGEX
253                .captures(path)
254                .and_then(|caps| caps.get(1))
255                .and_then(|v| v.as_str().parse::<i64>().ok())
256        })
257        .filter(|ver| *ver <= keep_version)
258        .max();
259
260    // Exit if no safe_checkpoint file was found.
261    let Some(safe_checkpoint_version) = safe_checkpoint_version_opt else {
262        debug!(
263            "Not cleaning metadata files, could not find a checkpoint with version <= keep_version ({})",
264            keep_version
265        );
266        return Ok(0);
267    };
268
269    debug!("safe_checkpoint_version: {}", safe_checkpoint_version);
270
271    // Step 4: Delete DELTA_LOG files where log_ver < safe_checkpoint_version && ts <= cutoff_timestamp
272    let locations = futures::stream::iter(log_entries.into_iter())
273        .filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
274            let meta = match meta {
275                Ok(m) => m,
276                Err(err) => {
277                    error!("Error received while cleaning up expired logs: {err:?}");
278                    return None;
279                }
280            };
281            let path_str = meta.location.as_ref();
282            let captures = DELTA_LOG_REGEX.captures(path_str)?;
283            let ts = meta.last_modified.timestamp_millis();
284            let log_ver_str = captures.get(1).unwrap().as_str();
285            let Ok(log_ver) = log_ver_str.parse::<i64>() else {
286                return None;
287            };
288            if log_ver < safe_checkpoint_version && ts <= cutoff_timestamp {
289                debug!("file to delete: {:?}", meta.location);
290                Some(Ok(meta.location))
291            } else {
292                None
293            }
294        })
295        .boxed();
296
297    let deleted = object_store
298        .delete_stream(locations)
299        .try_collect::<Vec<_>>()
300        .await?;
301
302    debug!("Deleted {} expired logs", deleted.len());
303    Ok(deleted.len())
304}
305
306#[cfg(test)]
307mod tests {
308    use std::sync::Arc;
309
310    use arrow_array::builder::{Int32Builder, ListBuilder, StructBuilder};
311    use arrow_array::{ArrayRef, Int32Array, RecordBatch};
312    use arrow_schema::Schema as ArrowSchema;
313    use chrono::Duration;
314    use delta_kernel::last_checkpoint_hint::LastCheckpointHint;
315    use object_store::path::Path;
316    use object_store::Error;
317    use tracing::warn;
318
319    use super::*;
320    use crate::ensure_table_uri;
321    use crate::kernel::transaction::{CommitBuilder, TableReference};
322    use crate::kernel::Action;
323    use crate::operations::DeltaOps;
324    use crate::writer::test_utils::get_delta_schema;
325    use crate::DeltaResult;
326
327    /// Try reading the `_last_checkpoint` file.
328    ///
329    /// Note that we typically want to ignore a missing/invalid `_last_checkpoint` file without failing
330    /// the read. Thus, the semantics of this function are to return `None` if the file is not found or
331    /// is invalid JSON. Unexpected/unrecoverable errors are returned as `Err` case and are assumed to
332    /// cause failure.
333    async fn read_last_checkpoint(
334        storage: &dyn ObjectStore,
335        log_path: &Path,
336    ) -> DeltaResult<Option<LastCheckpointHint>> {
337        const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
338        let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME);
339        let maybe_data = storage.get(&file_path).await;
340        let data = match maybe_data {
341            Ok(data) => data.bytes().await?,
342            Err(Error::NotFound { .. }) => return Ok(None),
343            Err(err) => return Err(err.into()),
344        };
345        Ok(serde_json::from_slice(&data)
346            .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
347            .ok())
348    }
349
350    #[tokio::test]
351    async fn test_create_checkpoint_for() {
352        let table_schema = get_delta_schema();
353
354        let table = DeltaOps::new_in_memory()
355            .create()
356            .with_columns(table_schema.fields().cloned())
357            .with_save_mode(crate::protocol::SaveMode::Ignore)
358            .await
359            .unwrap();
360        assert_eq!(table.version(), Some(0));
361        assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema);
362        let res = create_checkpoint_for(0, table.log_store.as_ref(), None).await;
363        assert!(res.is_ok());
364
365        // Look at the "files" and verify that the _last_checkpoint has the right version
366        let log_path = Path::from("_delta_log");
367        let store = table.log_store().object_store(None);
368        let last_checkpoint = read_last_checkpoint(store.as_ref(), &log_path)
369            .await
370            .expect("Failed to get the _last_checkpoint")
371            .expect("Expected checkpoint hint");
372        assert_eq!(last_checkpoint.version, 0);
373    }
374
375    /// This test validates that a checkpoint can be written and re-read with the minimum viable
376    /// Metadata. There was a bug which didn't handle the optionality of createdTime.
377    #[cfg(feature = "datafusion")]
378    #[tokio::test]
379    async fn test_create_checkpoint_with_metadata() {
380        use crate::kernel::new_metadata;
381
382        let table_schema = get_delta_schema();
383
384        let mut table = DeltaOps::new_in_memory()
385            .create()
386            .with_columns(table_schema.fields().cloned())
387            .with_save_mode(crate::protocol::SaveMode::Ignore)
388            .await
389            .unwrap();
390        assert_eq!(table.version(), Some(0));
391        assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema);
392
393        let part_cols: Vec<String> = vec![];
394        let metadata =
395            new_metadata(&table_schema, part_cols, std::iter::empty::<(&str, &str)>()).unwrap();
396        let actions = vec![Action::Metadata(metadata)];
397
398        let epoch_id = std::time::SystemTime::now()
399            .duration_since(std::time::UNIX_EPOCH)
400            .expect("Time went backwards")
401            .as_millis() as i64;
402
403        let operation = crate::protocol::DeltaOperation::StreamingUpdate {
404            output_mode: crate::protocol::OutputMode::Append,
405            query_id: "test".into(),
406            epoch_id,
407        };
408        let finalized_commit = CommitBuilder::default()
409            .with_actions(actions)
410            .build(
411                table.state.as_ref().map(|f| f as &dyn TableReference),
412                table.log_store(),
413                operation,
414            )
415            .await
416            .unwrap();
417
418        assert_eq!(
419            1,
420            finalized_commit.version(),
421            "Expected the commit to create table version 1"
422        );
423        assert_eq!(
424            0, finalized_commit.metrics.num_retries,
425            "Expected no retries"
426        );
427        assert_eq!(
428            0, finalized_commit.metrics.num_log_files_cleaned_up,
429            "Expected no log files cleaned up"
430        );
431        assert!(
432            !finalized_commit.metrics.new_checkpoint_created,
433            "Expected checkpoint created."
434        );
435        table.load().await.expect("Failed to reload table");
436        assert_eq!(
437            table.version(),
438            Some(1),
439            "The loaded version of the table is not up to date"
440        );
441
442        let res = create_checkpoint_for(
443            table.version().unwrap() as u64,
444            table.log_store.as_ref(),
445            None,
446        )
447        .await;
448        assert!(res.is_ok());
449
450        // Look at the "files" and verify that the _last_checkpoint has the right version
451        let log_path = Path::from("_delta_log");
452        let store = table.log_store().object_store(None);
453        let last_checkpoint = read_last_checkpoint(store.as_ref(), &log_path)
454            .await
455            .expect("Failed to get the _last_checkpoint")
456            .expect("Expected checkpoint hint");
457        assert_eq!(last_checkpoint.version, 1);
458
459        // If the regression exists, this will fail
460        table.load().await.expect("Failed to reload the table, this likely means that the optional createdTime was not actually optional");
461        assert_eq!(
462            Some(1),
463            table.version(),
464            "The reloaded table doesn't have the right version"
465        );
466    }
467
468    #[tokio::test]
469    async fn test_create_checkpoint_for_invalid_version() {
470        let table_schema = get_delta_schema();
471
472        let table = DeltaOps::new_in_memory()
473            .create()
474            .with_columns(table_schema.fields().cloned())
475            .with_save_mode(crate::protocol::SaveMode::Ignore)
476            .await
477            .unwrap();
478        assert_eq!(table.version(), Some(0));
479        assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema);
480        match create_checkpoint_for(1, table.log_store.as_ref(), None).await {
481            Ok(_) => {
482                /*
483                 * If a checkpoint is allowed to be created here, it will use the passed in
484                 * version, but _last_checkpoint is generated from the table state will point to a
485                 * version 0 checkpoint.
486                 * E.g.
487                 *
488                 * Path { raw: "_delta_log/00000000000000000000.json" }
489                 * Path { raw: "_delta_log/00000000000000000001.checkpoint.parquet" }
490                 * Path { raw: "_delta_log/_last_checkpoint" }
491                 *
492                 */
493                panic!(
494                    "We should not allow creating a checkpoint for a version which doesn't exist!"
495                );
496            }
497            Err(_) => { /* We should expect an error in the "right" case */ }
498        }
499    }
500
501    #[cfg(feature = "datafusion")]
502    async fn setup_table() -> DeltaTable {
503        use arrow_schema::{DataType, Field};
504        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
505            "id",
506            DataType::Utf8,
507            false,
508        )]));
509
510        let data =
511            vec![Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef];
512        let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];
513
514        let table = DeltaOps::new_in_memory()
515            .write(batches.clone())
516            .await
517            .unwrap();
518
519        DeltaOps(table)
520            .write(batches)
521            .with_save_mode(crate::protocol::SaveMode::Overwrite)
522            .await
523            .unwrap()
524    }
525
526    #[cfg(feature = "datafusion")]
527    #[tokio::test]
528    async fn test_cleanup_no_checkpoints() {
529        // Test that metadata clean up does not corrupt the table when no checkpoints exist
530        let table = setup_table().await;
531
532        let log_retention_timestamp = (Utc::now().timestamp_millis()
533            + Duration::days(31).num_milliseconds())
534            - table
535                .snapshot()
536                .unwrap()
537                .table_config()
538                .log_retention_duration()
539                .as_millis() as i64;
540        let count = cleanup_expired_logs_for(
541            table.version().unwrap(),
542            table.log_store().as_ref(),
543            log_retention_timestamp,
544            None,
545        )
546        .await
547        .unwrap();
548        assert_eq!(count, 0);
549        println!("{count:?}");
550
551        let path = Path::from("_delta_log/00000000000000000000.json");
552        let res = table.log_store().object_store(None).get(&path).await;
553        assert!(res.is_ok());
554    }
555
556    #[cfg(feature = "datafusion")]
557    #[tokio::test]
558    async fn test_cleanup_with_checkpoints() {
559        let table = setup_table().await;
560        create_checkpoint(&table, None).await.unwrap();
561
562        let log_retention_timestamp = (Utc::now().timestamp_millis()
563            + Duration::days(32).num_milliseconds())
564            - table
565                .snapshot()
566                .unwrap()
567                .table_config()
568                .log_retention_duration()
569                .as_millis() as i64;
570        let count = cleanup_expired_logs_for(
571            table.version().unwrap(),
572            table.log_store().as_ref(),
573            log_retention_timestamp,
574            None,
575        )
576        .await
577        .unwrap();
578        assert_eq!(count, 1);
579
580        let log_store = table.log_store();
581
582        let path = log_store.log_path().child("00000000000000000000.json");
583        let res = table.log_store().object_store(None).get(&path).await;
584        assert!(res.is_err());
585
586        let path = log_store
587            .log_path()
588            .child("00000000000000000001.checkpoint.parquet");
589        let res = table.log_store().object_store(None).get(&path).await;
590        assert!(res.is_ok());
591
592        let path = log_store.log_path().child("00000000000000000001.json");
593        let res = table.log_store().object_store(None).get(&path).await;
594        assert!(res.is_ok());
595    }
596
597    #[cfg(feature = "datafusion")]
598    #[tokio::test]
599    async fn test_struct_with_single_list_field() {
600        // you need another column otherwise the entire stats struct is empty
601        // which also fails parquet write during checkpoint
602        let other_column_array: ArrayRef = Arc::new(Int32Array::from(vec![1]));
603
604        let mut list_item_builder = Int32Builder::new();
605        list_item_builder.append_value(1);
606
607        let mut list_in_struct_builder = ListBuilder::new(list_item_builder);
608        list_in_struct_builder.append(true);
609
610        let mut struct_builder = StructBuilder::new(
611            vec![arrow_schema::Field::new(
612                "list_in_struct",
613                arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
614                    "item",
615                    arrow_schema::DataType::Int32,
616                    true,
617                ))),
618                true,
619            )],
620            vec![Box::new(list_in_struct_builder)],
621        );
622        struct_builder.append(true);
623
624        let struct_with_list_array: ArrayRef = Arc::new(struct_builder.finish());
625        let batch = RecordBatch::try_from_iter(vec![
626            ("other_column", other_column_array),
627            ("struct_with_list", struct_with_list_array),
628        ])
629        .unwrap();
630        let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap();
631
632        create_checkpoint(&table, None).await.unwrap();
633    }
634
635    #[ignore = "This test is only useful if the batch size has been made small"]
636    #[cfg(feature = "datafusion")]
637    #[tokio::test]
638    async fn test_checkpoint_large_table() -> DeltaResult<()> {
639        use crate::writer::test_utils::get_arrow_schema;
640
641        let table_schema = get_delta_schema();
642        let temp_dir = tempfile::tempdir()?;
643        let table_path = temp_dir.path().to_str().unwrap();
644        let table_uri = ensure_table_uri(table_path).unwrap();
645        let mut table = DeltaOps::try_from_uri(table_uri)
646            .await?
647            .create()
648            .with_columns(table_schema.fields().cloned())
649            .await
650            .unwrap();
651        assert_eq!(table.version(), Some(0));
652        let count = 20;
653
654        for _ in 0..count {
655            table.load().await?;
656            let batch = RecordBatch::try_new(
657                Arc::clone(&get_arrow_schema(&None)),
658                vec![
659                    Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
660                    Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
661                    Arc::new(arrow::array::StringArray::from(vec![
662                        "2021-02-02",
663                        "2021-02-03",
664                        "2021-02-02",
665                        "2021-02-04",
666                    ])),
667                ],
668            )
669            .unwrap();
670            let _ = DeltaOps(table.clone()).write(vec![batch]).await?;
671        }
672
673        table.load().await?;
674        assert_eq!(
675            table.version().unwrap(),
676            count,
677            "Expected {count} transactions"
678        );
679        let pre_checkpoint_actions = table.snapshot()?.file_actions(&table.log_store).await?;
680
681        let before = table.version();
682        let res = create_checkpoint(&table, None).await;
683        assert!(res.is_ok(), "Failed to create the checkpoint! {res:#?}");
684
685        let table =
686            crate::open_table(Url::from_directory_path(std::path::Path::new(table_path)).unwrap())
687                .await?;
688        assert_eq!(
689            before,
690            table.version(),
691            "Why on earth did a checkpoint creata version?"
692        );
693
694        let post_checkpoint_actions = table.snapshot()?.file_actions(&table.log_store).await?;
695
696        assert_eq!(
697            pre_checkpoint_actions.len(),
698            post_checkpoint_actions.len(),
699            "The number of actions read from the table after checkpointing is wrong!"
700        );
701        Ok(())
702    }
703
704    /// <https://github.com/delta-io/delta-rs/issues/3030>
705    #[cfg(feature = "datafusion")]
706    #[tokio::test]
707    async fn test_create_checkpoint_overwrite() -> DeltaResult<()> {
708        use crate::protocol::SaveMode;
709        use crate::writer::test_utils::datafusion::get_data_sorted;
710        use crate::writer::test_utils::get_arrow_schema;
711        use datafusion::assert_batches_sorted_eq;
712
713        let tmp_dir = tempfile::tempdir().unwrap();
714        let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
715
716        let batch = RecordBatch::try_new(
717            Arc::clone(&get_arrow_schema(&None)),
718            vec![
719                Arc::new(arrow::array::StringArray::from(vec!["C"])),
720                Arc::new(arrow::array::Int32Array::from(vec![30])),
721                Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])),
722            ],
723        )
724        .unwrap();
725
726        let table_uri = Url::from_directory_path(&tmp_path).unwrap();
727        let mut table = DeltaOps::try_from_uri(table_uri)
728            .await?
729            .write(vec![batch])
730            .await?;
731        table.load().await?;
732        assert_eq!(table.version(), Some(0));
733
734        create_checkpoint(&table, None).await?;
735
736        let batch = RecordBatch::try_new(
737            Arc::clone(&get_arrow_schema(&None)),
738            vec![
739                Arc::new(arrow::array::StringArray::from(vec!["A"])),
740                Arc::new(arrow::array::Int32Array::from(vec![0])),
741                Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])),
742            ],
743        )
744        .unwrap();
745
746        let table_uri = Url::from_directory_path(&tmp_path).unwrap();
747        let table = DeltaOps::try_from_uri(table_uri)
748            .await?
749            .write(vec![batch])
750            .with_save_mode(SaveMode::Overwrite)
751            .await?;
752        assert_eq!(table.version(), Some(1));
753
754        let expected = [
755            "+----+-------+------------+",
756            "| id | value | modified   |",
757            "+----+-------+------------+",
758            "| A  | 0     | 2021-02-02 |",
759            "+----+-------+------------+",
760        ];
761        let actual = get_data_sorted(&table, "id,value,modified").await;
762        assert_batches_sorted_eq!(&expected, &actual);
763        Ok(())
764    }
765}