1use std::sync::LazyLock;
4
5use url::Url;
6
7use arrow::compute::filter_record_batch;
8use arrow_array::{BooleanArray, RecordBatch};
9use chrono::{TimeZone, Utc};
10use delta_kernel::engine::arrow_data::ArrowEngineData;
11use delta_kernel::engine_data::FilteredEngineData;
12use delta_kernel::snapshot::Snapshot;
13use delta_kernel::FileMeta;
14use futures::{StreamExt, TryStreamExt};
15use object_store::path::Path;
16use object_store::ObjectStore;
17use parquet::arrow::async_writer::ParquetObjectWriter;
18use parquet::arrow::AsyncArrowWriter;
19use regex::Regex;
20use tokio::task::spawn_blocking;
21use tracing::{debug, error};
22use uuid::Uuid;
23
24use crate::logstore::{LogStore, LogStoreExt, DELTA_LOG_REGEX};
25use crate::table::config::TablePropertiesExt as _;
26use crate::{open_table_with_version, DeltaTable};
27use crate::{DeltaResult, DeltaTableError};
28
29static CHECKPOINT_REGEX: LazyLock<Regex> =
30 LazyLock::new(|| Regex::new(r"_delta_log/(\d{20})\.(checkpoint).*$").unwrap());
31
32pub(crate) async fn create_checkpoint_for(
34 version: u64,
35 log_store: &dyn LogStore,
36 operation_id: Option<Uuid>,
37) -> DeltaResult<()> {
38 let table_root = if let Some(op_id) = operation_id {
39 #[allow(deprecated)]
40 log_store.transaction_url(op_id, &log_store.table_root_url())?
41 } else {
42 log_store.table_root_url()
43 };
44 let engine = log_store.engine(operation_id);
45
46 let task_engine = engine.clone();
47 let snapshot = spawn_blocking(move || {
48 Snapshot::builder_for(table_root)
49 .at_version(version)
50 .build(task_engine.as_ref())
51 })
52 .await
53 .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
54
55 let cp_writer = snapshot.checkpoint()?;
56
57 let cp_url = cp_writer.checkpoint_path()?;
58 let cp_path = Path::from_url_path(cp_url.path())?;
59 let mut cp_data = cp_writer.checkpoint_data(engine.as_ref())?;
60
61 let (first_batch, mut cp_data) = spawn_blocking(move || {
62 let Some(first_batch) = cp_data.next() else {
63 return Err(DeltaTableError::Generic("No data".to_string()));
64 };
65 Ok((to_rb(first_batch?)?, cp_data))
66 })
67 .await
68 .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
69
70 let root_store = log_store.root_object_store(operation_id);
71 let object_store_writer = ParquetObjectWriter::new(root_store.clone(), cp_path.clone());
72 let mut writer = AsyncArrowWriter::try_new(object_store_writer, first_batch.schema(), None)?;
73 writer.write(&first_batch).await?;
74
75 let checkpoint_schema = first_batch.schema();
81
82 let mut current_batch;
83 loop {
84 (current_batch, cp_data) = spawn_blocking(move || {
85 let Some(first_batch) = cp_data.next() else {
86 return Ok::<_, DeltaTableError>((None, cp_data));
87 };
88 Ok((Some(to_rb(first_batch?)?), cp_data))
89 })
90 .await
91 .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
92
93 let Some(batch) = current_batch else {
94 break;
95 };
96
97 let batch = if batch.schema() != checkpoint_schema {
102 crate::cast_record_batch(&batch, checkpoint_schema.clone(), true, true)?
103 } else {
104 batch
105 };
106
107 writer.write(&batch).await?;
108 }
109
110 let _pq_meta = writer.close().await?;
111 let file_meta = root_store.head(&cp_path).await?;
112 let file_meta = FileMeta {
113 location: cp_url,
114 size: file_meta.size,
115 last_modified: file_meta.last_modified.timestamp_millis(),
116 };
117
118 spawn_blocking(move || cp_writer.finalize(engine.as_ref(), &file_meta, cp_data))
119 .await
120 .map_err(|e| DeltaTableError::Generic(e.to_string()))??;
121
122 Ok(())
123}
124
125fn to_rb(data: FilteredEngineData) -> DeltaResult<RecordBatch> {
126 let engine_data = ArrowEngineData::try_from_engine_data(data.data)?;
127 let predicate = BooleanArray::from(data.selection_vector);
128 let batch = filter_record_batch(engine_data.record_batch(), &predicate)?;
129 Ok(batch)
130}
131
132pub async fn create_checkpoint(table: &DeltaTable, operation_id: Option<Uuid>) -> DeltaResult<()> {
134 let snapshot = table.snapshot()?;
135 create_checkpoint_for(
136 snapshot.version() as u64,
137 table.log_store.as_ref(),
138 operation_id,
139 )
140 .await?;
141 Ok(())
142}
143
144pub async fn cleanup_metadata(
147 table: &DeltaTable,
148 operation_id: Option<Uuid>,
149) -> DeltaResult<usize> {
150 let snapshot = table.snapshot()?;
151 let log_retention_timestamp = Utc::now().timestamp_millis()
152 - snapshot.table_config().log_retention_duration().as_millis() as i64;
153 cleanup_expired_logs_for(
154 snapshot.version(),
155 table.log_store.as_ref(),
156 log_retention_timestamp,
157 operation_id,
158 )
159 .await
160}
161
162pub async fn create_checkpoint_from_table_uri_and_cleanup(
166 table_url: Url,
167 version: i64,
168 cleanup: Option<bool>,
169 operation_id: Option<Uuid>,
170) -> DeltaResult<()> {
171 let table = open_table_with_version(table_url, version).await?;
172 let snapshot = table.snapshot()?;
173 create_checkpoint_for(version as u64, table.log_store.as_ref(), operation_id).await?;
174
175 let enable_expired_log_cleanup =
176 cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup());
177
178 if snapshot.version() >= 0 && enable_expired_log_cleanup {
179 let deleted_log_num = cleanup_metadata(&table, operation_id).await?;
180 debug!("Deleted {deleted_log_num:?} log files.");
181 }
182
183 Ok(())
184}
185
186pub async fn cleanup_expired_logs_for(
205 mut keep_version: i64,
206 log_store: &dyn LogStore,
207 cutoff_timestamp: i64,
208 operation_id: Option<Uuid>,
209) -> DeltaResult<usize> {
210 debug!("called cleanup_expired_logs_for");
211 let object_store = log_store.object_store(operation_id);
212 let log_path = log_store.log_path();
213
214 let log_entries: Vec<Result<crate::ObjectMeta, _>> =
216 object_store.list(Some(log_path)).collect().await;
217
218 debug!("starting keep_version: {:?}", keep_version);
219 debug!(
220 "starting cutoff_timestamp: {:?}",
221 Utc.timestamp_millis_opt(cutoff_timestamp).unwrap()
222 );
223
224 let min_retention_version = log_entries
226 .iter()
227 .filter_map(|m| m.as_ref().ok())
228 .filter_map(|m| {
229 let path = m.location.as_ref();
230 DELTA_LOG_REGEX
231 .captures(path)
232 .and_then(|caps| caps.get(1))
233 .and_then(|v| v.as_str().parse::<i64>().ok())
234 .map(|ver| (ver, m.last_modified.timestamp_millis()))
235 })
236 .filter(|(_, ts)| *ts >= cutoff_timestamp)
237 .map(|(ver, _)| ver)
238 .min();
239
240 let min_retention_version = min_retention_version.unwrap_or(keep_version);
241
242 keep_version = keep_version.min(min_retention_version);
245
246 let safe_checkpoint_version_opt = log_entries
248 .iter()
249 .filter_map(|m| m.as_ref().ok())
250 .filter_map(|m| {
251 let path = m.location.as_ref();
252 CHECKPOINT_REGEX
253 .captures(path)
254 .and_then(|caps| caps.get(1))
255 .and_then(|v| v.as_str().parse::<i64>().ok())
256 })
257 .filter(|ver| *ver <= keep_version)
258 .max();
259
260 let Some(safe_checkpoint_version) = safe_checkpoint_version_opt else {
262 debug!(
263 "Not cleaning metadata files, could not find a checkpoint with version <= keep_version ({})",
264 keep_version
265 );
266 return Ok(0);
267 };
268
269 debug!("safe_checkpoint_version: {}", safe_checkpoint_version);
270
271 let locations = futures::stream::iter(log_entries.into_iter())
273 .filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
274 let meta = match meta {
275 Ok(m) => m,
276 Err(err) => {
277 error!("Error received while cleaning up expired logs: {err:?}");
278 return None;
279 }
280 };
281 let path_str = meta.location.as_ref();
282 let captures = DELTA_LOG_REGEX.captures(path_str)?;
283 let ts = meta.last_modified.timestamp_millis();
284 let log_ver_str = captures.get(1).unwrap().as_str();
285 let Ok(log_ver) = log_ver_str.parse::<i64>() else {
286 return None;
287 };
288 if log_ver < safe_checkpoint_version && ts <= cutoff_timestamp {
289 debug!("file to delete: {:?}", meta.location);
290 Some(Ok(meta.location))
291 } else {
292 None
293 }
294 })
295 .boxed();
296
297 let deleted = object_store
298 .delete_stream(locations)
299 .try_collect::<Vec<_>>()
300 .await?;
301
302 debug!("Deleted {} expired logs", deleted.len());
303 Ok(deleted.len())
304}
305
306#[cfg(test)]
307mod tests {
308 use std::sync::Arc;
309
310 use arrow_array::builder::{Int32Builder, ListBuilder, StructBuilder};
311 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
312 use arrow_schema::Schema as ArrowSchema;
313 use chrono::Duration;
314 use delta_kernel::last_checkpoint_hint::LastCheckpointHint;
315 use object_store::path::Path;
316 use object_store::Error;
317 use tracing::warn;
318
319 use super::*;
320 use crate::ensure_table_uri;
321 use crate::kernel::transaction::{CommitBuilder, TableReference};
322 use crate::kernel::Action;
323 use crate::operations::DeltaOps;
324 use crate::writer::test_utils::get_delta_schema;
325 use crate::DeltaResult;
326
327 async fn read_last_checkpoint(
334 storage: &dyn ObjectStore,
335 log_path: &Path,
336 ) -> DeltaResult<Option<LastCheckpointHint>> {
337 const LAST_CHECKPOINT_FILE_NAME: &str = "_last_checkpoint";
338 let file_path = log_path.child(LAST_CHECKPOINT_FILE_NAME);
339 let maybe_data = storage.get(&file_path).await;
340 let data = match maybe_data {
341 Ok(data) => data.bytes().await?,
342 Err(Error::NotFound { .. }) => return Ok(None),
343 Err(err) => return Err(err.into()),
344 };
345 Ok(serde_json::from_slice(&data)
346 .inspect_err(|e| warn!("invalid _last_checkpoint JSON: {e}"))
347 .ok())
348 }
349
350 #[tokio::test]
351 async fn test_create_checkpoint_for() {
352 let table_schema = get_delta_schema();
353
354 let table = DeltaOps::new_in_memory()
355 .create()
356 .with_columns(table_schema.fields().cloned())
357 .with_save_mode(crate::protocol::SaveMode::Ignore)
358 .await
359 .unwrap();
360 assert_eq!(table.version(), Some(0));
361 assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema);
362 let res = create_checkpoint_for(0, table.log_store.as_ref(), None).await;
363 assert!(res.is_ok());
364
365 let log_path = Path::from("_delta_log");
367 let store = table.log_store().object_store(None);
368 let last_checkpoint = read_last_checkpoint(store.as_ref(), &log_path)
369 .await
370 .expect("Failed to get the _last_checkpoint")
371 .expect("Expected checkpoint hint");
372 assert_eq!(last_checkpoint.version, 0);
373 }
374
375 #[cfg(feature = "datafusion")]
378 #[tokio::test]
379 async fn test_create_checkpoint_with_metadata() {
380 use crate::kernel::new_metadata;
381
382 let table_schema = get_delta_schema();
383
384 let mut table = DeltaOps::new_in_memory()
385 .create()
386 .with_columns(table_schema.fields().cloned())
387 .with_save_mode(crate::protocol::SaveMode::Ignore)
388 .await
389 .unwrap();
390 assert_eq!(table.version(), Some(0));
391 assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema);
392
393 let part_cols: Vec<String> = vec![];
394 let metadata =
395 new_metadata(&table_schema, part_cols, std::iter::empty::<(&str, &str)>()).unwrap();
396 let actions = vec![Action::Metadata(metadata)];
397
398 let epoch_id = std::time::SystemTime::now()
399 .duration_since(std::time::UNIX_EPOCH)
400 .expect("Time went backwards")
401 .as_millis() as i64;
402
403 let operation = crate::protocol::DeltaOperation::StreamingUpdate {
404 output_mode: crate::protocol::OutputMode::Append,
405 query_id: "test".into(),
406 epoch_id,
407 };
408 let finalized_commit = CommitBuilder::default()
409 .with_actions(actions)
410 .build(
411 table.state.as_ref().map(|f| f as &dyn TableReference),
412 table.log_store(),
413 operation,
414 )
415 .await
416 .unwrap();
417
418 assert_eq!(
419 1,
420 finalized_commit.version(),
421 "Expected the commit to create table version 1"
422 );
423 assert_eq!(
424 0, finalized_commit.metrics.num_retries,
425 "Expected no retries"
426 );
427 assert_eq!(
428 0, finalized_commit.metrics.num_log_files_cleaned_up,
429 "Expected no log files cleaned up"
430 );
431 assert!(
432 !finalized_commit.metrics.new_checkpoint_created,
433 "Expected checkpoint created."
434 );
435 table.load().await.expect("Failed to reload table");
436 assert_eq!(
437 table.version(),
438 Some(1),
439 "The loaded version of the table is not up to date"
440 );
441
442 let res = create_checkpoint_for(
443 table.version().unwrap() as u64,
444 table.log_store.as_ref(),
445 None,
446 )
447 .await;
448 assert!(res.is_ok());
449
450 let log_path = Path::from("_delta_log");
452 let store = table.log_store().object_store(None);
453 let last_checkpoint = read_last_checkpoint(store.as_ref(), &log_path)
454 .await
455 .expect("Failed to get the _last_checkpoint")
456 .expect("Expected checkpoint hint");
457 assert_eq!(last_checkpoint.version, 1);
458
459 table.load().await.expect("Failed to reload the table, this likely means that the optional createdTime was not actually optional");
461 assert_eq!(
462 Some(1),
463 table.version(),
464 "The reloaded table doesn't have the right version"
465 );
466 }
467
468 #[tokio::test]
469 async fn test_create_checkpoint_for_invalid_version() {
470 let table_schema = get_delta_schema();
471
472 let table = DeltaOps::new_in_memory()
473 .create()
474 .with_columns(table_schema.fields().cloned())
475 .with_save_mode(crate::protocol::SaveMode::Ignore)
476 .await
477 .unwrap();
478 assert_eq!(table.version(), Some(0));
479 assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema);
480 match create_checkpoint_for(1, table.log_store.as_ref(), None).await {
481 Ok(_) => {
482 panic!(
494 "We should not allow creating a checkpoint for a version which doesn't exist!"
495 );
496 }
497 Err(_) => { }
498 }
499 }
500
501 #[cfg(feature = "datafusion")]
502 async fn setup_table() -> DeltaTable {
503 use arrow_schema::{DataType, Field};
504 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
505 "id",
506 DataType::Utf8,
507 false,
508 )]));
509
510 let data =
511 vec![Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef];
512 let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];
513
514 let table = DeltaOps::new_in_memory()
515 .write(batches.clone())
516 .await
517 .unwrap();
518
519 DeltaOps(table)
520 .write(batches)
521 .with_save_mode(crate::protocol::SaveMode::Overwrite)
522 .await
523 .unwrap()
524 }
525
526 #[cfg(feature = "datafusion")]
527 #[tokio::test]
528 async fn test_cleanup_no_checkpoints() {
529 let table = setup_table().await;
531
532 let log_retention_timestamp = (Utc::now().timestamp_millis()
533 + Duration::days(31).num_milliseconds())
534 - table
535 .snapshot()
536 .unwrap()
537 .table_config()
538 .log_retention_duration()
539 .as_millis() as i64;
540 let count = cleanup_expired_logs_for(
541 table.version().unwrap(),
542 table.log_store().as_ref(),
543 log_retention_timestamp,
544 None,
545 )
546 .await
547 .unwrap();
548 assert_eq!(count, 0);
549 println!("{count:?}");
550
551 let path = Path::from("_delta_log/00000000000000000000.json");
552 let res = table.log_store().object_store(None).get(&path).await;
553 assert!(res.is_ok());
554 }
555
556 #[cfg(feature = "datafusion")]
557 #[tokio::test]
558 async fn test_cleanup_with_checkpoints() {
559 let table = setup_table().await;
560 create_checkpoint(&table, None).await.unwrap();
561
562 let log_retention_timestamp = (Utc::now().timestamp_millis()
563 + Duration::days(32).num_milliseconds())
564 - table
565 .snapshot()
566 .unwrap()
567 .table_config()
568 .log_retention_duration()
569 .as_millis() as i64;
570 let count = cleanup_expired_logs_for(
571 table.version().unwrap(),
572 table.log_store().as_ref(),
573 log_retention_timestamp,
574 None,
575 )
576 .await
577 .unwrap();
578 assert_eq!(count, 1);
579
580 let log_store = table.log_store();
581
582 let path = log_store.log_path().child("00000000000000000000.json");
583 let res = table.log_store().object_store(None).get(&path).await;
584 assert!(res.is_err());
585
586 let path = log_store
587 .log_path()
588 .child("00000000000000000001.checkpoint.parquet");
589 let res = table.log_store().object_store(None).get(&path).await;
590 assert!(res.is_ok());
591
592 let path = log_store.log_path().child("00000000000000000001.json");
593 let res = table.log_store().object_store(None).get(&path).await;
594 assert!(res.is_ok());
595 }
596
597 #[cfg(feature = "datafusion")]
598 #[tokio::test]
599 async fn test_struct_with_single_list_field() {
600 let other_column_array: ArrayRef = Arc::new(Int32Array::from(vec![1]));
603
604 let mut list_item_builder = Int32Builder::new();
605 list_item_builder.append_value(1);
606
607 let mut list_in_struct_builder = ListBuilder::new(list_item_builder);
608 list_in_struct_builder.append(true);
609
610 let mut struct_builder = StructBuilder::new(
611 vec![arrow_schema::Field::new(
612 "list_in_struct",
613 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
614 "item",
615 arrow_schema::DataType::Int32,
616 true,
617 ))),
618 true,
619 )],
620 vec![Box::new(list_in_struct_builder)],
621 );
622 struct_builder.append(true);
623
624 let struct_with_list_array: ArrayRef = Arc::new(struct_builder.finish());
625 let batch = RecordBatch::try_from_iter(vec![
626 ("other_column", other_column_array),
627 ("struct_with_list", struct_with_list_array),
628 ])
629 .unwrap();
630 let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap();
631
632 create_checkpoint(&table, None).await.unwrap();
633 }
634
635 #[ignore = "This test is only useful if the batch size has been made small"]
636 #[cfg(feature = "datafusion")]
637 #[tokio::test]
638 async fn test_checkpoint_large_table() -> DeltaResult<()> {
639 use crate::writer::test_utils::get_arrow_schema;
640
641 let table_schema = get_delta_schema();
642 let temp_dir = tempfile::tempdir()?;
643 let table_path = temp_dir.path().to_str().unwrap();
644 let table_uri = ensure_table_uri(table_path).unwrap();
645 let mut table = DeltaOps::try_from_uri(table_uri)
646 .await?
647 .create()
648 .with_columns(table_schema.fields().cloned())
649 .await
650 .unwrap();
651 assert_eq!(table.version(), Some(0));
652 let count = 20;
653
654 for _ in 0..count {
655 table.load().await?;
656 let batch = RecordBatch::try_new(
657 Arc::clone(&get_arrow_schema(&None)),
658 vec![
659 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
660 Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
661 Arc::new(arrow::array::StringArray::from(vec![
662 "2021-02-02",
663 "2021-02-03",
664 "2021-02-02",
665 "2021-02-04",
666 ])),
667 ],
668 )
669 .unwrap();
670 let _ = DeltaOps(table.clone()).write(vec![batch]).await?;
671 }
672
673 table.load().await?;
674 assert_eq!(
675 table.version().unwrap(),
676 count,
677 "Expected {count} transactions"
678 );
679 let pre_checkpoint_actions = table.snapshot()?.file_actions(&table.log_store).await?;
680
681 let before = table.version();
682 let res = create_checkpoint(&table, None).await;
683 assert!(res.is_ok(), "Failed to create the checkpoint! {res:#?}");
684
685 let table =
686 crate::open_table(Url::from_directory_path(std::path::Path::new(table_path)).unwrap())
687 .await?;
688 assert_eq!(
689 before,
690 table.version(),
691 "Why on earth did a checkpoint creata version?"
692 );
693
694 let post_checkpoint_actions = table.snapshot()?.file_actions(&table.log_store).await?;
695
696 assert_eq!(
697 pre_checkpoint_actions.len(),
698 post_checkpoint_actions.len(),
699 "The number of actions read from the table after checkpointing is wrong!"
700 );
701 Ok(())
702 }
703
704 #[cfg(feature = "datafusion")]
706 #[tokio::test]
707 async fn test_create_checkpoint_overwrite() -> DeltaResult<()> {
708 use crate::protocol::SaveMode;
709 use crate::writer::test_utils::datafusion::get_data_sorted;
710 use crate::writer::test_utils::get_arrow_schema;
711 use datafusion::assert_batches_sorted_eq;
712
713 let tmp_dir = tempfile::tempdir().unwrap();
714 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
715
716 let batch = RecordBatch::try_new(
717 Arc::clone(&get_arrow_schema(&None)),
718 vec![
719 Arc::new(arrow::array::StringArray::from(vec!["C"])),
720 Arc::new(arrow::array::Int32Array::from(vec![30])),
721 Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])),
722 ],
723 )
724 .unwrap();
725
726 let table_uri = Url::from_directory_path(&tmp_path).unwrap();
727 let mut table = DeltaOps::try_from_uri(table_uri)
728 .await?
729 .write(vec![batch])
730 .await?;
731 table.load().await?;
732 assert_eq!(table.version(), Some(0));
733
734 create_checkpoint(&table, None).await?;
735
736 let batch = RecordBatch::try_new(
737 Arc::clone(&get_arrow_schema(&None)),
738 vec![
739 Arc::new(arrow::array::StringArray::from(vec!["A"])),
740 Arc::new(arrow::array::Int32Array::from(vec![0])),
741 Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])),
742 ],
743 )
744 .unwrap();
745
746 let table_uri = Url::from_directory_path(&tmp_path).unwrap();
747 let table = DeltaOps::try_from_uri(table_uri)
748 .await?
749 .write(vec![batch])
750 .with_save_mode(SaveMode::Overwrite)
751 .await?;
752 assert_eq!(table.version(), Some(1));
753
754 let expected = [
755 "+----+-------+------------+",
756 "| id | value | modified |",
757 "+----+-------+------------+",
758 "| A | 0 | 2021-02-02 |",
759 "+----+-------+------------+",
760 ];
761 let actual = get_data_sorted(&table, "id,value,modified").await;
762 assert_batches_sorted_eq!(&expected, &actual);
763 Ok(())
764 }
765}