1use std::collections::HashMap;
4use std::iter::Iterator;
5use std::sync::LazyLock;
6
7use arrow_json::ReaderBuilder;
8use arrow_schema::ArrowError;
9
10use chrono::{Datelike, NaiveDate, NaiveDateTime, Utc};
11use futures::{StreamExt, TryStreamExt};
12use itertools::Itertools;
13use object_store::{Error, ObjectStore};
14use parquet::arrow::ArrowWriter;
15use parquet::basic::Compression;
16use parquet::basic::Encoding;
17use parquet::errors::ParquetError;
18use parquet::file::properties::WriterProperties;
19use regex::Regex;
20use serde_json::Value;
21use tracing::{debug, error};
22use uuid::Uuid;
23
24use super::{time_utils, ProtocolError};
25use crate::kernel::arrow::delta_log_schema_for_table;
26use crate::kernel::{
27 Action, Add as AddAction, DataType, PrimitiveType, Protocol, Remove, StructField,
28};
29use crate::logstore::LogStore;
30use crate::table::state::DeltaTableState;
31use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
32use crate::{open_table_with_version, DeltaTable};
33
34type SchemaPath = Vec<String>;
35
36#[derive(thiserror::Error, Debug)]
38enum CheckpointError {
39 #[error("Partition value {0} cannot be parsed from string.")]
42 PartitionValueNotParseable(String),
43
44 #[error("Attempted to create a checkpoint for a version {0} that does not match the table state {1}")]
46 StaleTableVersion(i64, i64),
47
48 #[error("Failed to write parquet: {}", .source)]
50 Parquet {
51 #[from]
53 source: ParquetError,
54 },
55
56 #[error("Failed to convert into Arrow schema: {}", .source)]
58 Arrow {
59 #[from]
61 source: ArrowError,
62 },
63
64 #[error("missing required action type in snapshot: {0}")]
65 MissingActionType(String),
66}
67
68impl From<CheckpointError> for ProtocolError {
69 fn from(value: CheckpointError) -> Self {
70 match value {
71 CheckpointError::PartitionValueNotParseable(_) => Self::InvalidField(value.to_string()),
72 CheckpointError::Arrow { source } => Self::Arrow { source },
73 CheckpointError::StaleTableVersion(..) => Self::Generic(value.to_string()),
74 CheckpointError::Parquet { source } => Self::ParquetParseError { source },
75 CheckpointError::MissingActionType(_) => Self::Generic(value.to_string()),
76 }
77 }
78}
79
80use core::str::Utf8Error;
81impl From<Utf8Error> for ProtocolError {
82 fn from(value: Utf8Error) -> Self {
83 Self::Generic(value.to_string())
84 }
85}
86
87pub const CHECKPOINT_RECORD_BATCH_SIZE: usize = 5000;
89
90pub async fn create_checkpoint(
92 table: &DeltaTable,
93 operation_id: Option<Uuid>,
94) -> Result<(), ProtocolError> {
95 create_checkpoint_for(
96 table.version(),
97 table.snapshot().map_err(|_| ProtocolError::NoMetaData)?,
98 table.log_store.as_ref(),
99 operation_id,
100 )
101 .await?;
102 Ok(())
103}
104
105pub async fn cleanup_metadata(
108 table: &DeltaTable,
109 operation_id: Option<Uuid>,
110) -> Result<usize, ProtocolError> {
111 let log_retention_timestamp = Utc::now().timestamp_millis()
112 - table
113 .snapshot()
114 .map_err(|_| ProtocolError::NoMetaData)?
115 .table_config()
116 .log_retention_duration()
117 .as_millis() as i64;
118 cleanup_expired_logs_for(
119 table.version(),
120 table.log_store.as_ref(),
121 log_retention_timestamp,
122 operation_id,
123 )
124 .await
125}
126
127pub async fn create_checkpoint_from_table_uri_and_cleanup(
131 table_uri: &str,
132 version: i64,
133 cleanup: Option<bool>,
134 operation_id: Option<Uuid>,
135) -> Result<(), ProtocolError> {
136 let table = open_table_with_version(table_uri, version)
137 .await
138 .map_err(|err| ProtocolError::Generic(err.to_string()))?;
139 let snapshot = table.snapshot().map_err(|_| ProtocolError::NoMetaData)?;
140 create_checkpoint_for(version, snapshot, table.log_store.as_ref(), None).await?;
141
142 let enable_expired_log_cleanup =
143 cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup());
144
145 if table.version() >= 0 && enable_expired_log_cleanup {
146 let deleted_log_num = cleanup_metadata(&table, operation_id).await?;
147 debug!("Deleted {deleted_log_num:?} log files.");
148 }
149
150 Ok(())
151}
152
153pub async fn create_checkpoint_for(
155 version: i64,
156 state: &DeltaTableState,
157 log_store: &dyn LogStore,
158 operation_id: Option<Uuid>,
159) -> Result<(), ProtocolError> {
160 if !state.load_config().require_files {
161 return Err(ProtocolError::Generic(
162 "Table has not yet been initialized with files, therefore creating a checkpoint is not possible.".to_string()
163 ));
164 }
165
166 if version != state.version() {
167 error!(
168 "create_checkpoint_for called with version {version} but table state contains: {}. The table state may need to be reloaded",
169 state.version()
170 );
171 return Err(CheckpointError::StaleTableVersion(version, state.version()).into());
172 }
173
174 let last_checkpoint_path = log_store.log_path().child("_last_checkpoint");
178
179 debug!("Writing parquet bytes to checkpoint buffer.");
180 let tombstones = state
181 .unexpired_tombstones(log_store.object_store(None).clone())
182 .await
183 .map_err(|_| ProtocolError::Generic("filed to get tombstones".into()))?
184 .collect::<Vec<_>>();
185 let (checkpoint, parquet_bytes) = parquet_bytes_from_state(state, tombstones)?;
186
187 let file_name = format!("{version:020}.checkpoint.parquet");
188 let checkpoint_path = log_store.log_path().child(file_name);
189
190 let object_store = log_store.object_store(operation_id);
191 debug!("Writing checkpoint to {checkpoint_path:?}.");
192 object_store
193 .put(&checkpoint_path, parquet_bytes.into())
194 .await?;
195
196 let last_checkpoint_content: Value = serde_json::to_value(checkpoint)?;
197 let last_checkpoint_content = bytes::Bytes::from(serde_json::to_vec(&last_checkpoint_content)?);
198
199 debug!("Writing _last_checkpoint to {last_checkpoint_path:?}.");
200 object_store
201 .put(&last_checkpoint_path, last_checkpoint_content.into())
202 .await?;
203
204 Ok(())
205}
206
207pub async fn cleanup_expired_logs_for(
210 until_version: i64,
211 log_store: &dyn LogStore,
212 cutoff_timestamp: i64,
213 operation_id: Option<Uuid>,
214) -> Result<usize, ProtocolError> {
215 static DELTA_LOG_REGEX: LazyLock<Regex> = LazyLock::new(|| {
216 Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap()
217 });
218
219 let object_store = log_store.object_store(None);
220 let maybe_last_checkpoint = object_store
221 .get(&log_store.log_path().child("_last_checkpoint"))
222 .await;
223
224 if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint {
225 return Ok(0);
226 }
227
228 let last_checkpoint = maybe_last_checkpoint?.bytes().await?;
229 let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint)?;
230 let until_version = i64::min(until_version, last_checkpoint.version);
231
232 let object_store = log_store.object_store(operation_id);
236 let deleted = object_store
237 .delete_stream(
238 object_store
239 .list(Some(log_store.log_path()))
240 .filter_map(|meta: Result<crate::ObjectMeta, _>| async move {
243 if meta.is_err() {
244 error!("Error received while cleaning up expired logs: {meta:?}");
245 return None;
246 }
247 let meta = meta.unwrap();
248 let ts = meta.last_modified.timestamp_millis();
249
250 match DELTA_LOG_REGEX.captures(meta.location.as_ref()) {
251 Some(captures) => {
252 let log_ver_str = captures.get(1).unwrap().as_str();
253 let log_ver: i64 = log_ver_str.parse().unwrap();
254 if log_ver < until_version && ts <= cutoff_timestamp {
255 Some(Ok(meta.location))
257 } else {
258 None
259 }
260 }
261 None => None,
262 }
263 })
264 .boxed(),
265 )
266 .try_collect::<Vec<_>>()
267 .await?;
268
269 debug!("Deleted {} expired logs", deleted.len());
270 Ok(deleted.len())
271}
272
273fn parquet_bytes_from_state(
274 state: &DeltaTableState,
275 mut tombstones: Vec<Remove>,
276) -> Result<(CheckPoint, bytes::Bytes), ProtocolError> {
277 let current_metadata = state.metadata();
278 let schema = current_metadata.schema()?;
279
280 let partition_col_data_types = get_partition_col_data_types(&schema, current_metadata);
281
282 let mut stats_conversions: Vec<(SchemaPath, DataType)> = Vec::new();
284 let fields = schema.fields().collect_vec();
285 collect_stats_conversions(&mut stats_conversions, fields.as_slice());
286
287 let use_extended_remove_schema = tombstones
296 .iter()
297 .all(|r| r.extended_file_metadata == Some(true) && r.size.is_some());
298
299 if !use_extended_remove_schema {
301 for remove in tombstones.iter_mut() {
302 remove.extended_file_metadata = Some(false);
303 }
304 }
305 let files = state
306 .file_actions_iter()
307 .map_err(|e| ProtocolError::Generic(e.to_string()))?;
308 let jsons = std::iter::once(Action::Protocol(Protocol {
310 min_reader_version: state.protocol().min_reader_version,
311 min_writer_version: state.protocol().min_writer_version,
312 writer_features: if state.protocol().min_writer_version >= 7 {
313 Some(state.protocol().writer_features.clone().unwrap_or_default())
314 } else {
315 None
316 },
317 reader_features: if state.protocol().min_reader_version >= 3 {
318 Some(state.protocol().reader_features.clone().unwrap_or_default())
319 } else {
320 None
321 },
322 }))
323 .chain(std::iter::once(Action::Metadata(current_metadata.clone())))
325 .chain(
327 state
328 .app_transaction_version()
329 .map_err(|_| CheckpointError::MissingActionType("txn".to_string()))?
330 .map(Action::Txn),
331 )
332 .chain(tombstones.iter().map(|r| {
334 let mut r = (*r).clone();
335
336 if r.extended_file_metadata.is_none() {
339 r.extended_file_metadata = Some(false);
340 }
341
342 Action::Remove(r)
343 }))
344 .map(|a| serde_json::to_value(a).map_err(ProtocolError::from))
345 .chain(files.map(|f| {
347 checkpoint_add_from_state(
348 &f,
349 partition_col_data_types.as_slice(),
350 &stats_conversions,
351 state.table_config().write_stats_as_json(),
352 state.table_config().write_stats_as_struct(),
353 )
354 }));
355
356 let arrow_schema = delta_log_schema_for_table(
358 (&schema).try_into()?,
359 current_metadata.partition_columns.as_slice(),
360 use_extended_remove_schema,
361 state.table_config().write_stats_as_json(),
362 state.table_config().write_stats_as_struct(),
363 );
364
365 debug!("Writing to checkpoint parquet buffer...");
366
367 let writer_properties = if state.table_config().use_checkpoint_rle() {
368 WriterProperties::builder()
369 .set_compression(Compression::SNAPPY)
370 .build()
371 } else {
372 WriterProperties::builder()
373 .set_compression(Compression::SNAPPY)
374 .set_dictionary_enabled(false)
375 .set_encoding(Encoding::PLAIN)
376 .build()
377 };
378
379 let mut bytes = vec![];
381 let mut writer =
382 ArrowWriter::try_new(&mut bytes, arrow_schema.clone(), Some(writer_properties))?;
383 let mut decoder = ReaderBuilder::new(arrow_schema)
384 .with_batch_size(CHECKPOINT_RECORD_BATCH_SIZE)
385 .build_decoder()?;
386
387 let mut total_actions = 0;
389
390 let span = tracing::debug_span!("serialize_checkpoint").entered();
391 for chunk in &jsons.chunks(CHECKPOINT_RECORD_BATCH_SIZE) {
392 let mut buf = Vec::new();
393 for j in chunk {
394 serde_json::to_writer(&mut buf, &j?)?;
395 total_actions += 1;
396 }
397 let _ = decoder.decode(&buf)?;
398 while let Some(batch) = decoder.flush()? {
399 writer.write(&batch)?;
400 }
401 }
402 drop(span);
403
404 let _ = writer.close()?;
405 debug!(total_actions, "Finished writing checkpoint parquet buffer.");
406
407 let checkpoint = CheckPointBuilder::new(state.version(), total_actions)
408 .with_size_in_bytes(bytes.len() as i64)
409 .build();
410 Ok((checkpoint, bytes::Bytes::from(bytes)))
411}
412
413fn checkpoint_add_from_state(
414 add: &AddAction,
415 partition_col_data_types: &[(&String, &DataType)],
416 stats_conversions: &[(SchemaPath, DataType)],
417 write_stats_as_json: bool,
418 write_stats_as_struct: bool,
419) -> Result<Value, ProtocolError> {
420 let mut v = serde_json::to_value(Action::Add(add.clone()))
421 .map_err(|err| ArrowError::JsonError(err.to_string()))?;
422
423 v["add"]["dataChange"] = Value::Bool(false);
424
425 if !add.partition_values.is_empty() && write_stats_as_struct {
427 let mut partition_values_parsed: HashMap<String, Value> = HashMap::new();
428
429 for (field_name, data_type) in partition_col_data_types.iter() {
430 if let Some(string_value) = add.partition_values.get(*field_name) {
431 let v = typed_partition_value_from_option_string(string_value, data_type)?;
432
433 partition_values_parsed.insert(field_name.to_string(), v);
434 }
435 }
436
437 let partition_values_parsed = serde_json::to_value(partition_values_parsed)
438 .map_err(|err| ArrowError::JsonError(err.to_string()))?;
439 v["add"]["partitionValues_parsed"] = partition_values_parsed;
440 }
441
442 if write_stats_as_struct {
444 if let Ok(Some(stats)) = add.get_stats() {
445 let mut stats = serde_json::to_value(stats)
446 .map_err(|err| ArrowError::JsonError(err.to_string()))?;
447 let min_values = stats.get_mut("minValues").and_then(|v| v.as_object_mut());
448
449 if let Some(min_values) = min_values {
450 for (path, data_type) in stats_conversions {
451 apply_stats_conversion(min_values, path.as_slice(), data_type)
452 }
453 }
454
455 let max_values = stats.get_mut("maxValues").and_then(|v| v.as_object_mut());
456 if let Some(max_values) = max_values {
457 for (path, data_type) in stats_conversions {
458 apply_stats_conversion(max_values, path.as_slice(), data_type)
459 }
460 }
461
462 v["add"]["stats_parsed"] = stats;
463 }
464 }
465
466 if !write_stats_as_json {
468 v.get_mut("add")
469 .and_then(|v| v.as_object_mut())
470 .and_then(|v| v.remove("stats"));
471 }
472 Ok(v)
473}
474
475fn typed_partition_value_from_string(
476 string_value: &str,
477 data_type: &DataType,
478) -> Result<Value, ProtocolError> {
479 match data_type {
480 DataType::Primitive(primitive_type) => match primitive_type {
481 PrimitiveType::String | PrimitiveType::Binary => Ok(string_value.to_owned().into()),
482 PrimitiveType::Long
483 | PrimitiveType::Integer
484 | PrimitiveType::Short
485 | PrimitiveType::Byte => Ok(string_value
486 .parse::<i64>()
487 .map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
488 .into()),
489 PrimitiveType::Boolean => Ok(string_value
490 .parse::<bool>()
491 .map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
492 .into()),
493 PrimitiveType::Float | PrimitiveType::Double => Ok(string_value
494 .parse::<f64>()
495 .map_err(|_| CheckpointError::PartitionValueNotParseable(string_value.to_owned()))?
496 .into()),
497 PrimitiveType::Date => {
498 let d = NaiveDate::parse_from_str(string_value, "%Y-%m-%d").map_err(|_| {
499 CheckpointError::PartitionValueNotParseable(string_value.to_owned())
500 })?;
501 Ok((d.num_days_from_ce() - 719_163).into())
503 }
504 PrimitiveType::Timestamp | PrimitiveType::TimestampNtz => {
505 let ts = NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S.%6f");
506 let ts: NaiveDateTime = match ts {
507 Ok(_) => ts,
508 Err(_) => NaiveDateTime::parse_from_str(string_value, "%Y-%m-%d %H:%M:%S"),
509 }
510 .map_err(|_| {
511 CheckpointError::PartitionValueNotParseable(string_value.to_owned())
512 })?;
513 Ok((ts.and_utc().timestamp_millis() * 1000).into())
514 }
515 s => unimplemented!("Primitive type {s} is not supported for partition column values."),
516 },
517 d => unimplemented!("Data type {d:?} is not supported for partition column values."),
518 }
519}
520
521fn typed_partition_value_from_option_string(
522 string_value: &Option<String>,
523 data_type: &DataType,
524) -> Result<Value, ProtocolError> {
525 match string_value {
526 Some(s) => {
527 if s.is_empty() {
528 Ok(Value::Null) } else {
530 typed_partition_value_from_string(s, data_type)
531 }
532 }
533 None => Ok(Value::Null),
534 }
535}
536
537fn collect_stats_conversions(paths: &mut Vec<(SchemaPath, DataType)>, fields: &[&StructField]) {
538 let mut _path = SchemaPath::new();
539 fields
540 .iter()
541 .for_each(|f| collect_field_conversion(&mut _path, paths, f));
542}
543
544fn collect_field_conversion(
545 current_path: &mut SchemaPath,
546 all_paths: &mut Vec<(SchemaPath, DataType)>,
547 field: &StructField,
548) {
549 match field.data_type() {
550 DataType::Primitive(PrimitiveType::Timestamp) => {
551 let mut key_path = current_path.clone();
552 key_path.push(field.name().to_owned());
553 all_paths.push((key_path, field.data_type().to_owned()));
554 }
555 DataType::Struct(struct_field) => {
556 let struct_fields = struct_field.fields();
557 current_path.push(field.name().to_owned());
558 struct_fields.for_each(|f| collect_field_conversion(current_path, all_paths, f));
559 current_path.pop();
560 }
561 _ => { }
562 }
563}
564
565fn apply_stats_conversion(
566 context: &mut serde_json::Map<String, Value>,
567 path: &[String],
568 data_type: &DataType,
569) {
570 if path.len() == 1 {
571 if let DataType::Primitive(PrimitiveType::Timestamp) = data_type {
572 let v = context.get_mut(&path[0]);
573
574 if let Some(v) = v {
575 let ts = v
576 .as_str()
577 .and_then(|s| time_utils::timestamp_micros_from_stats_string(s).ok())
578 .map(|n| Value::Number(serde_json::Number::from(n)));
579
580 if let Some(ts) = ts {
581 *v = ts;
582 }
583 }
584 }
585 } else {
586 let next_context = context.get_mut(&path[0]).and_then(|v| v.as_object_mut());
587 if let Some(next_context) = next_context {
588 apply_stats_conversion(next_context, &path[1..], data_type);
589 }
590 }
591}
592
593#[cfg(test)]
594mod tests {
595 use std::sync::Arc;
596
597 use arrow_array::builder::{Int32Builder, ListBuilder, StructBuilder};
598 use arrow_array::{ArrayRef, Int32Array, RecordBatch};
599 use arrow_schema::Schema as ArrowSchema;
600 use chrono::Duration;
601 use object_store::path::Path;
602 use serde_json::json;
603
604 use super::*;
605 use crate::kernel::transaction::{CommitBuilder, TableReference};
606 use crate::kernel::StructType;
607 use crate::operations::DeltaOps;
608 use crate::protocol::Metadata;
609 use crate::writer::test_utils::get_delta_schema;
610 use crate::DeltaResult;
611
612 #[tokio::test]
613 async fn test_create_checkpoint_for() {
614 let table_schema = get_delta_schema();
615
616 let table = DeltaOps::new_in_memory()
617 .create()
618 .with_columns(table_schema.fields().cloned())
619 .with_save_mode(crate::protocol::SaveMode::Ignore)
620 .await
621 .unwrap();
622 assert_eq!(table.version(), 0);
623 assert_eq!(table.get_schema().unwrap(), &table_schema);
624 let res =
625 create_checkpoint_for(0, table.snapshot().unwrap(), table.log_store.as_ref(), None)
626 .await;
627 assert!(res.is_ok());
628
629 let path = Path::from("_delta_log/_last_checkpoint");
631 let last_checkpoint = table
632 .object_store()
633 .get(&path)
634 .await
635 .expect("Failed to get the _last_checkpoint")
636 .bytes()
637 .await
638 .expect("Failed to get bytes for _last_checkpoint");
639 let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
640 assert_eq!(last_checkpoint.version, 0);
641 }
642
643 #[tokio::test]
646 async fn test_create_checkpoint_with_metadata() {
647 let table_schema = get_delta_schema();
648
649 let mut table = DeltaOps::new_in_memory()
650 .create()
651 .with_columns(table_schema.fields().cloned())
652 .with_save_mode(crate::protocol::SaveMode::Ignore)
653 .await
654 .unwrap();
655 assert_eq!(table.version(), 0);
656 assert_eq!(table.get_schema().unwrap(), &table_schema);
657
658 let part_cols: Vec<String> = vec![];
659 let metadata = Metadata::try_new(table_schema, part_cols, HashMap::new()).unwrap();
660 let actions = vec![Action::Metadata(metadata)];
661
662 let epoch_id = std::time::SystemTime::now()
663 .duration_since(std::time::UNIX_EPOCH)
664 .expect("Time went backwards")
665 .as_millis() as i64;
666
667 let operation = crate::protocol::DeltaOperation::StreamingUpdate {
668 output_mode: crate::protocol::OutputMode::Append,
669 query_id: "test".into(),
670 epoch_id,
671 };
672 let finalized_commit = CommitBuilder::default()
673 .with_actions(actions)
674 .build(
675 table.state.as_ref().map(|f| f as &dyn TableReference),
676 table.log_store(),
677 operation,
678 )
679 .await
680 .unwrap();
681
682 assert_eq!(
683 1,
684 finalized_commit.version(),
685 "Expected the commit to create table version 1"
686 );
687 assert_eq!(
688 0, finalized_commit.metrics.num_retries,
689 "Expected no retries"
690 );
691 assert_eq!(
692 0, finalized_commit.metrics.num_log_files_cleaned_up,
693 "Expected no log files cleaned up"
694 );
695 assert!(
696 !finalized_commit.metrics.new_checkpoint_created,
697 "Expected checkpoint created."
698 );
699 table.load().await.expect("Failed to reload table");
700 assert_eq!(
701 table.version(),
702 1,
703 "The loaded version of the table is not up to date"
704 );
705
706 let res = create_checkpoint_for(
707 table.version(),
708 table.state.as_ref().unwrap(),
709 table.log_store.as_ref(),
710 None,
711 )
712 .await;
713 assert!(res.is_ok());
714
715 let path = Path::from("_delta_log/_last_checkpoint");
717 let last_checkpoint = table
718 .object_store()
719 .get(&path)
720 .await
721 .expect("Failed to get the _last_checkpoint")
722 .bytes()
723 .await
724 .expect("Failed to get bytes for _last_checkpoint");
725 let last_checkpoint: CheckPoint = serde_json::from_slice(&last_checkpoint).expect("Fail");
726 assert_eq!(last_checkpoint.version, 1);
727
728 table.load().await.expect("Failed to reload the table, this likely means that the optional createdTime was not actually optional");
730 assert_eq!(
731 1,
732 table.version(),
733 "The reloaded table doesn't have the right version"
734 );
735 }
736
737 #[tokio::test]
738 async fn test_create_checkpoint_for_invalid_version() {
739 let table_schema = get_delta_schema();
740
741 let table = DeltaOps::new_in_memory()
742 .create()
743 .with_columns(table_schema.fields().cloned())
744 .with_save_mode(crate::protocol::SaveMode::Ignore)
745 .await
746 .unwrap();
747 assert_eq!(table.version(), 0);
748 assert_eq!(table.get_schema().unwrap(), &table_schema);
749 match create_checkpoint_for(1, table.snapshot().unwrap(), table.log_store.as_ref(), None)
750 .await
751 {
752 Ok(_) => {
753 panic!(
765 "We should not allow creating a checkpoint for a version which doesn't exist!"
766 );
767 }
768 Err(_) => { }
769 }
770 }
771
772 #[test]
773 fn typed_partition_value_from_string_test() {
774 let string_value: Value = "Hello World!".into();
775 assert_eq!(
776 string_value,
777 typed_partition_value_from_option_string(
778 &Some("Hello World!".to_string()),
779 &DataType::Primitive(PrimitiveType::String),
780 )
781 .unwrap()
782 );
783
784 let bool_value: Value = true.into();
785 assert_eq!(
786 bool_value,
787 typed_partition_value_from_option_string(
788 &Some("true".to_string()),
789 &DataType::Primitive(PrimitiveType::Boolean),
790 )
791 .unwrap()
792 );
793
794 let number_value: Value = 42.into();
795 assert_eq!(
796 number_value,
797 typed_partition_value_from_option_string(
798 &Some("42".to_string()),
799 &DataType::Primitive(PrimitiveType::Integer),
800 )
801 .unwrap()
802 );
803
804 for (s, v) in [
805 ("2021-08-08", 18_847),
806 ("1970-01-02", 1),
807 ("1970-01-01", 0),
808 ("1969-12-31", -1),
809 ("1-01-01", -719_162),
810 ] {
811 let date_value: Value = v.into();
812 assert_eq!(
813 date_value,
814 typed_partition_value_from_option_string(
815 &Some(s.to_string()),
816 &DataType::Primitive(PrimitiveType::Date),
817 )
818 .unwrap()
819 );
820 }
821
822 for (s, v) in [
823 ("2021-08-08 01:00:01.000000", 1628384401000000i64),
824 ("2021-08-08 01:00:01", 1628384401000000i64),
825 ("1970-01-02 12:59:59.000000", 133199000000i64),
826 ("1970-01-02 12:59:59", 133199000000i64),
827 ("1970-01-01 13:00:01.000000", 46801000000i64),
828 ("1970-01-01 13:00:01", 46801000000i64),
829 ("1969-12-31 00:00:00", -86400000000i64),
830 ("1677-09-21 00:12:44", -9223372036000000i64),
831 ] {
832 let timestamp_value: Value = v.into();
833 assert_eq!(
834 timestamp_value,
835 typed_partition_value_from_option_string(
836 &Some(s.to_string()),
837 &DataType::Primitive(PrimitiveType::Timestamp),
838 )
839 .unwrap()
840 );
841 }
842
843 let binary_value: Value = "\u{2081}\u{2082}\u{2083}\u{2084}".into();
844 assert_eq!(
845 binary_value,
846 typed_partition_value_from_option_string(
847 &Some("₁₂₃₄".to_string()),
848 &DataType::Primitive(PrimitiveType::Binary),
849 )
850 .unwrap()
851 );
852 }
853
854 #[test]
855 fn null_partition_value_from_string_test() {
856 assert_eq!(
857 Value::Null,
858 typed_partition_value_from_option_string(
859 &None,
860 &DataType::Primitive(PrimitiveType::Integer),
861 )
862 .unwrap()
863 );
864
865 assert_eq!(
867 Value::Null,
868 typed_partition_value_from_option_string(
869 &Some("".to_string()),
870 &DataType::Primitive(PrimitiveType::Integer),
871 )
872 .unwrap()
873 );
874 }
875
876 #[test]
877 fn collect_stats_conversions_test() {
878 let delta_schema: StructType = serde_json::from_value(SCHEMA.clone()).unwrap();
879 let fields = delta_schema.fields().collect_vec();
880 let mut paths = Vec::new();
881 collect_stats_conversions(&mut paths, fields.as_slice());
882
883 assert_eq!(2, paths.len());
884 assert_eq!(
885 (
886 vec!["some_struct".to_string(), "struct_timestamp".to_string()],
887 DataType::Primitive(PrimitiveType::Timestamp)
888 ),
889 paths[0]
890 );
891 assert_eq!(
892 (
893 vec!["some_timestamp".to_string()],
894 DataType::Primitive(PrimitiveType::Timestamp)
895 ),
896 paths[1]
897 );
898 }
899
900 async fn setup_table() -> DeltaTable {
901 use arrow_schema::{DataType, Field};
902 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
903 "id",
904 DataType::Utf8,
905 false,
906 )]));
907
908 let data =
909 vec![Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])) as ArrayRef];
910 let batches = vec![RecordBatch::try_new(schema.clone(), data).unwrap()];
911
912 let table = DeltaOps::new_in_memory()
913 .write(batches.clone())
914 .await
915 .unwrap();
916
917 DeltaOps(table)
918 .write(batches)
919 .with_save_mode(crate::protocol::SaveMode::Overwrite)
920 .await
921 .unwrap()
922 }
923
924 #[tokio::test]
925 async fn test_cleanup_no_checkpoints() {
926 let table = setup_table().await;
928
929 let log_retention_timestamp = (Utc::now().timestamp_millis()
930 + Duration::days(31).num_milliseconds())
931 - table
932 .snapshot()
933 .unwrap()
934 .table_config()
935 .log_retention_duration()
936 .as_millis() as i64;
937 let count = cleanup_expired_logs_for(
938 table.version(),
939 table.log_store().as_ref(),
940 log_retention_timestamp,
941 None,
942 )
943 .await
944 .unwrap();
945 assert_eq!(count, 0);
946 println!("{count:?}");
947
948 let path = Path::from("_delta_log/00000000000000000000.json");
949 let res = table.log_store().object_store(None).get(&path).await;
950 assert!(res.is_ok());
951 }
952
953 #[tokio::test]
954 async fn test_cleanup_with_checkpoints() {
955 let table = setup_table().await;
956 create_checkpoint(&table, None).await.unwrap();
957
958 let log_retention_timestamp = (Utc::now().timestamp_millis()
959 + Duration::days(32).num_milliseconds())
960 - table
961 .snapshot()
962 .unwrap()
963 .table_config()
964 .log_retention_duration()
965 .as_millis() as i64;
966 let count = cleanup_expired_logs_for(
967 table.version(),
968 table.log_store().as_ref(),
969 log_retention_timestamp,
970 None,
971 )
972 .await
973 .unwrap();
974 assert_eq!(count, 1);
975
976 let log_store = table.log_store();
977
978 let path = log_store.log_path().child("00000000000000000000.json");
979 let res = table.log_store().object_store(None).get(&path).await;
980 assert!(res.is_err());
981
982 let path = log_store
983 .log_path()
984 .child("00000000000000000001.checkpoint.parquet");
985 let res = table.log_store().object_store(None).get(&path).await;
986 assert!(res.is_ok());
987
988 let path = log_store.log_path().child("00000000000000000001.json");
989 let res = table.log_store().object_store(None).get(&path).await;
990 assert!(res.is_ok());
991 }
992
993 #[test]
994 fn apply_stats_conversion_test() {
995 let mut stats = STATS_JSON.clone();
996
997 let min_values = stats.get_mut("minValues").unwrap().as_object_mut().unwrap();
998
999 apply_stats_conversion(
1000 min_values,
1001 &["some_struct".to_string(), "struct_string".to_string()],
1002 &DataType::Primitive(PrimitiveType::String),
1003 );
1004 apply_stats_conversion(
1005 min_values,
1006 &["some_struct".to_string(), "struct_timestamp".to_string()],
1007 &DataType::Primitive(PrimitiveType::Timestamp),
1008 );
1009 apply_stats_conversion(
1010 min_values,
1011 &["some_string".to_string()],
1012 &DataType::Primitive(PrimitiveType::String),
1013 );
1014 apply_stats_conversion(
1015 min_values,
1016 &["some_timestamp".to_string()],
1017 &DataType::Primitive(PrimitiveType::Timestamp),
1018 );
1019
1020 let max_values = stats.get_mut("maxValues").unwrap().as_object_mut().unwrap();
1021
1022 apply_stats_conversion(
1023 max_values,
1024 &["some_struct".to_string(), "struct_string".to_string()],
1025 &DataType::Primitive(PrimitiveType::String),
1026 );
1027 apply_stats_conversion(
1028 max_values,
1029 &["some_struct".to_string(), "struct_timestamp".to_string()],
1030 &DataType::Primitive(PrimitiveType::Timestamp),
1031 );
1032 apply_stats_conversion(
1033 max_values,
1034 &["some_string".to_string()],
1035 &DataType::Primitive(PrimitiveType::String),
1036 );
1037 apply_stats_conversion(
1038 max_values,
1039 &["some_timestamp".to_string()],
1040 &DataType::Primitive(PrimitiveType::Timestamp),
1041 );
1042
1043 assert_eq!(
1045 "A",
1046 stats["minValues"]["some_struct"]["struct_string"]
1047 .as_str()
1048 .unwrap()
1049 );
1050 assert_eq!(
1051 1627668684594000i64,
1052 stats["minValues"]["some_struct"]["struct_timestamp"]
1053 .as_i64()
1054 .unwrap()
1055 );
1056 assert_eq!("P", stats["minValues"]["some_string"].as_str().unwrap());
1057 assert_eq!(
1058 1627668684594000i64,
1059 stats["minValues"]["some_timestamp"].as_i64().unwrap()
1060 );
1061
1062 assert_eq!(
1064 "B",
1065 stats["maxValues"]["some_struct"]["struct_string"]
1066 .as_str()
1067 .unwrap()
1068 );
1069 assert_eq!(
1070 1627668685594000i64,
1071 stats["maxValues"]["some_struct"]["struct_timestamp"]
1072 .as_i64()
1073 .unwrap()
1074 );
1075 assert_eq!("Q", stats["maxValues"]["some_string"].as_str().unwrap());
1076 assert_eq!(
1077 1627668685594000i64,
1078 stats["maxValues"]["some_timestamp"].as_i64().unwrap()
1079 );
1080 }
1081
1082 #[tokio::test]
1083 async fn test_struct_with_single_list_field() {
1084 let other_column_array: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1087
1088 let mut list_item_builder = Int32Builder::new();
1089 list_item_builder.append_value(1);
1090
1091 let mut list_in_struct_builder = ListBuilder::new(list_item_builder);
1092 list_in_struct_builder.append(true);
1093
1094 let mut struct_builder = StructBuilder::new(
1095 vec![arrow_schema::Field::new(
1096 "list_in_struct",
1097 arrow_schema::DataType::List(Arc::new(arrow_schema::Field::new(
1098 "item",
1099 arrow_schema::DataType::Int32,
1100 true,
1101 ))),
1102 true,
1103 )],
1104 vec![Box::new(list_in_struct_builder)],
1105 );
1106 struct_builder.append(true);
1107
1108 let struct_with_list_array: ArrayRef = Arc::new(struct_builder.finish());
1109 let batch = RecordBatch::try_from_iter(vec![
1110 ("other_column", other_column_array),
1111 ("struct_with_list", struct_with_list_array),
1112 ])
1113 .unwrap();
1114 let table = DeltaOps::new_in_memory().write(vec![batch]).await.unwrap();
1115
1116 create_checkpoint(&table, None).await.unwrap();
1117 }
1118
1119 static SCHEMA: LazyLock<Value> = LazyLock::new(|| {
1120 json!({
1121 "type": "struct",
1122 "fields": [
1123 {
1124 "name": "some_struct",
1125 "type": {
1126 "type": "struct",
1127 "fields": [
1128 {
1129 "name": "struct_string",
1130 "type": "string",
1131 "nullable": true, "metadata": {}
1132 },
1133 {
1134 "name": "struct_timestamp",
1135 "type": "timestamp",
1136 "nullable": true, "metadata": {}
1137 }]
1138 },
1139 "nullable": true, "metadata": {}
1140 },
1141 { "name": "some_string", "type": "string", "nullable": true, "metadata": {} },
1142 { "name": "some_timestamp", "type": "timestamp", "nullable": true, "metadata": {} },
1143 ]
1144 })
1145 });
1146 static STATS_JSON: LazyLock<Value> = LazyLock::new(|| {
1147 json!({
1148 "minValues": {
1149 "some_struct": {
1150 "struct_string": "A",
1151 "struct_timestamp": "2021-07-30T18:11:24.594Z"
1152 },
1153 "some_string": "P",
1154 "some_timestamp": "2021-07-30T18:11:24.594Z"
1155 },
1156 "maxValues": {
1157 "some_struct": {
1158 "struct_string": "B",
1159 "struct_timestamp": "2021-07-30T18:11:25.594Z"
1160 },
1161 "some_string": "Q",
1162 "some_timestamp": "2021-07-30T18:11:25.594Z"
1163 }
1164 })
1165 });
1166
1167 #[ignore = "This test is only useful if the batch size has been made small"]
1168 #[tokio::test]
1169 async fn test_checkpoint_large_table() -> DeltaResult<()> {
1170 use crate::writer::test_utils::get_arrow_schema;
1171
1172 let table_schema = get_delta_schema();
1173 let temp_dir = tempfile::tempdir()?;
1174 let table_path = temp_dir.path().to_str().unwrap();
1175 let mut table = DeltaOps::try_from_uri(&table_path)
1176 .await?
1177 .create()
1178 .with_columns(table_schema.fields().cloned())
1179 .await
1180 .unwrap();
1181 assert_eq!(table.version(), 0);
1182 let count = 20;
1183
1184 for _ in 0..count {
1185 table.load().await?;
1186 let batch = RecordBatch::try_new(
1187 Arc::clone(&get_arrow_schema(&None)),
1188 vec![
1189 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
1190 Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
1191 Arc::new(arrow::array::StringArray::from(vec![
1192 "2021-02-02",
1193 "2021-02-03",
1194 "2021-02-02",
1195 "2021-02-04",
1196 ])),
1197 ],
1198 )
1199 .unwrap();
1200 let _ = DeltaOps(table.clone()).write(vec![batch]).await?;
1201 }
1202
1203 table.load().await?;
1204 assert_eq!(table.version(), count, "Expected {count} transactions");
1205 let pre_checkpoint_actions = table.snapshot()?.file_actions()?;
1206
1207 let before = table.version();
1208 let res = create_checkpoint(&table, None).await;
1209 assert!(res.is_ok(), "Failed to create the checkpoint! {res:#?}");
1210
1211 let table = crate::open_table(&table_path).await?;
1212 assert_eq!(
1213 before,
1214 table.version(),
1215 "Why on earth did a checkpoint creata version?"
1216 );
1217
1218 let post_checkpoint_actions = table.snapshot()?.file_actions()?;
1219
1220 assert_eq!(
1221 pre_checkpoint_actions.len(),
1222 post_checkpoint_actions.len(),
1223 "The number of actions read from the table after checkpointing is wrong!"
1224 );
1225 Ok(())
1226 }
1227
1228 #[cfg(feature = "datafusion")]
1230 #[tokio::test]
1231 async fn test_create_checkpoint_overwrite() -> DeltaResult<()> {
1232 use crate::protocol::SaveMode;
1233 use crate::writer::test_utils::datafusion::get_data_sorted;
1234 use crate::writer::test_utils::get_arrow_schema;
1235 use datafusion::assert_batches_sorted_eq;
1236
1237 let tmp_dir = tempfile::tempdir().unwrap();
1238 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
1239
1240 let batch = RecordBatch::try_new(
1241 Arc::clone(&get_arrow_schema(&None)),
1242 vec![
1243 Arc::new(arrow::array::StringArray::from(vec!["C"])),
1244 Arc::new(arrow::array::Int32Array::from(vec![30])),
1245 Arc::new(arrow::array::StringArray::from(vec!["2021-02-03"])),
1246 ],
1247 )
1248 .unwrap();
1249
1250 let mut table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap())
1251 .await?
1252 .write(vec![batch])
1253 .await?;
1254 table.load().await?;
1255 assert_eq!(table.version(), 0);
1256
1257 create_checkpoint(&table, None).await?;
1258
1259 let batch = RecordBatch::try_new(
1260 Arc::clone(&get_arrow_schema(&None)),
1261 vec![
1262 Arc::new(arrow::array::StringArray::from(vec!["A"])),
1263 Arc::new(arrow::array::Int32Array::from(vec![0])),
1264 Arc::new(arrow::array::StringArray::from(vec!["2021-02-02"])),
1265 ],
1266 )
1267 .unwrap();
1268
1269 let table = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap())
1270 .await?
1271 .write(vec![batch])
1272 .with_save_mode(SaveMode::Overwrite)
1273 .await?;
1274 assert_eq!(table.version(), 1);
1275
1276 let expected = [
1277 "+----+-------+------------+",
1278 "| id | value | modified |",
1279 "+----+-------+------------+",
1280 "| A | 0 | 2021-02-02 |",
1281 "+----+-------+------------+",
1282 ];
1283 let actual = get_data_sorted(&table, "id,value,modified").await;
1284 assert_batches_sorted_eq!(&expected, &actual);
1285 Ok(())
1286 }
1287}