1use std::collections::HashMap;
29use std::num::NonZeroU64;
30use std::str::FromStr;
31use std::sync::Arc;
32use std::time::Instant;
33use std::vec;
34
35use arrow::array::RecordBatch;
36use datafusion::catalog::{Session, TableProvider};
37use datafusion::common::Result;
38use datafusion::datasource::{MemTable, provider_as_source};
39use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE};
40use delta_kernel::engine::arrow_conversion::TryIntoKernel as _;
41use delta_kernel::table_features::ColumnMappingMode;
42use futures::future::BoxFuture;
43use parquet::file::properties::WriterProperties;
44use serde::{Deserialize, Serialize};
45use tracing::Instrument;
46use url::Url;
47
48pub use self::configs::WriterStatsConfig;
49use self::execution::write_execution_plan_v2;
50use self::metrics::{SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
51use super::{CreateBuilder, CustomExecuteHandler, Operation};
52use crate::DeltaTable;
53use crate::delta_datafusion::Expression;
54use crate::delta_datafusion::expr::fmt_expr_to_sql;
55use crate::delta_datafusion::physical::{find_metric_node, get_metric};
56use crate::delta_datafusion::{
57 DeltaSessionExt, SessionFallbackPolicy, SessionResolveContext, create_session,
58 resolve_session_state, update_datafusion_session,
59};
60use crate::errors::{DeltaResult, DeltaTableError, unsupported_column_mapping_write};
61use crate::kernel::schema::cast::normalize_for_delta;
62use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL, TableReference};
63use crate::kernel::{Action, EagerSnapshot, StructType};
64use crate::logstore::LogStoreRef;
65use crate::protocol::{DeltaOperation, SaveMode};
66
67pub mod configs;
68pub(crate) mod execution;
69pub(crate) mod generated_columns;
70pub(crate) mod metrics;
71mod plan;
72pub(crate) mod schema_evolution;
73pub mod writer;
74
75#[derive(thiserror::Error, Debug)]
76pub(crate) enum WriteError {
77 #[error("No data source supplied to write command.")]
78 MissingData,
79
80 #[error("A table already exists at: {0}")]
81 AlreadyExists(Url),
82
83 #[error(
84 "Specified table partitioning does not match table partitioning: expected: {expected:?}, got: {got:?}. To change partition columns, use full table overwrite with schema overwrite and no replaceWhere predicate."
85 )]
86 PartitionColumnMismatch {
87 expected: Vec<String>,
88 got: Vec<String>,
89 },
90
91 #[error("Partition column(s) not found in write schema: {}", columns.join(", "))]
92 MissingPartitionColumns { columns: Vec<String> },
93}
94
95impl From<WriteError> for DeltaTableError {
96 fn from(err: WriteError) -> Self {
97 DeltaTableError::GenericError {
98 source: Box::new(err),
99 }
100 }
101}
102
103#[derive(PartialEq, Clone, Copy)]
105pub enum SchemaMode {
106 Overwrite,
108 Merge,
110}
111
112impl FromStr for SchemaMode {
113 type Err = DeltaTableError;
114
115 fn from_str(s: &str) -> DeltaResult<Self> {
116 match s.to_ascii_lowercase().as_str() {
117 "overwrite" => Ok(SchemaMode::Overwrite),
118 "merge" => Ok(SchemaMode::Merge),
119 _ => Err(DeltaTableError::Generic(format!(
120 "Invalid schema write mode provided: {s}, only these are supported: ['overwrite', 'merge']"
121 ))),
122 }
123 }
124}
125
126pub struct WriteBuilder {
128 snapshot: Option<EagerSnapshot>,
130 log_store: LogStoreRef,
132 input: Option<LogicalPlan>,
134 session: Option<Arc<dyn Session>>,
136 session_fallback_policy: SessionFallbackPolicy,
137 mode: SaveMode,
139 partition_columns: Option<Vec<String>>,
141 predicate: Option<Expression>,
143 target_file_size: Option<Option<NonZeroU64>>,
146 write_batch_size: Option<usize>,
148 schema_mode: Option<SchemaMode>,
150 safe_cast: bool,
152 writer_properties: Option<WriterProperties>,
154 commit_properties: CommitProperties,
156 name: Option<String>,
158 description: Option<String>,
160 configuration: HashMap<String, Option<String>>,
162 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
163}
164
165#[derive(Default, Debug, Serialize, Deserialize)]
166pub struct WriteMetrics {
168 pub num_added_files: usize,
170 pub num_removed_files: usize,
172 pub num_partitions: usize,
174 pub num_added_rows: usize,
176 pub execution_time_ms: u64,
178}
179
180impl super::Operation for WriteBuilder {
181 fn log_store(&self) -> &LogStoreRef {
182 &self.log_store
183 }
184 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
185 self.custom_execute_handler.clone()
186 }
187}
188
189impl WriteBuilder {
190 pub fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
192 Self {
193 snapshot,
194 log_store,
195 input: None,
196 session: None,
197 session_fallback_policy: SessionFallbackPolicy::default(),
198 mode: SaveMode::Append,
199 partition_columns: None,
200 predicate: None,
201 target_file_size: None,
202 write_batch_size: None,
203 safe_cast: false,
204 schema_mode: None,
205 writer_properties: None,
206 commit_properties: CommitProperties::default(),
207 name: None,
208 description: None,
209 configuration: Default::default(),
210 custom_execute_handler: None,
211 }
212 }
213
214 pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self {
216 self.mode = save_mode;
217 self
218 }
219
220 pub fn with_schema_mode(mut self, schema_mode: SchemaMode) -> Self {
222 self.schema_mode = Some(schema_mode);
223 self
224 }
225
226 pub fn with_replace_where(mut self, predicate: impl Into<Expression>) -> Self {
228 self.predicate = Some(predicate.into());
229 self
230 }
231
232 pub fn with_partition_columns(
237 mut self,
238 partition_columns: impl IntoIterator<Item = impl Into<String>>,
239 ) -> Self {
240 self.partition_columns = Some(partition_columns.into_iter().map(|s| s.into()).collect());
241 self
242 }
243
244 #[deprecated(since = "0.31.0", note = "Use `with_input_plan` instead")]
246 pub fn with_input_execution_plan(self, plan: Arc<LogicalPlan>) -> Self {
247 self.with_input_plan(plan.as_ref().clone())
248 }
249
250 pub fn with_input_plan(mut self, plan: LogicalPlan) -> Self {
252 self.input = Some(plan);
253 self
254 }
255
256 pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
266 self.session = Some(session);
267 self
268 }
269
270 pub fn with_session_fallback_policy(mut self, policy: SessionFallbackPolicy) -> Self {
274 self.session_fallback_policy = policy;
275 self
276 }
277
278 pub fn with_target_file_size(mut self, target_file_size: Option<NonZeroU64>) -> Self {
280 self.target_file_size = Some(target_file_size);
281 self
282 }
283
284 pub fn with_write_batch_size(mut self, write_batch_size: usize) -> Self {
286 self.write_batch_size = Some(write_batch_size);
287 self
288 }
289
290 pub fn with_cast_safety(mut self, safe: bool) -> Self {
293 self.safe_cast = safe;
294 self
295 }
296
297 pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
299 self.writer_properties = Some(writer_properties);
300 self
301 }
302
303 pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
305 self.commit_properties = commit_properties;
306 self
307 }
308
309 pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
312 self.name = Some(name.into());
313 self
314 }
315
316 pub fn with_description(mut self, description: impl Into<String>) -> Self {
318 self.description = Some(description.into());
319 self
320 }
321
322 pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
324 self.custom_execute_handler = Some(handler);
325 self
326 }
327
328 pub fn with_configuration(
330 mut self,
331 configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
332 ) -> Self {
333 self.configuration = configuration
334 .into_iter()
335 .map(|(k, v)| (k.into(), v.map(|s| s.into())))
336 .collect();
337 self
338 }
339
340 pub fn with_input_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
342 let batches: Vec<RecordBatch> = batches.into_iter().collect();
343 if !batches.is_empty() {
344 let table_provider: Arc<dyn TableProvider> =
345 Arc::new(MemTable::try_new(batches[0].schema(), vec![batches]).unwrap());
346 let source_plan =
347 LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(table_provider), None)
348 .unwrap()
349 .build()
350 .unwrap();
351 self.input = Some(source_plan);
352 }
353 self
354 }
355
356 fn can_overwrite_partition_columns(&self) -> bool {
359 self.mode == SaveMode::Overwrite
360 && self.schema_mode == Some(SchemaMode::Overwrite)
361 && self.predicate.is_none()
362 }
363
364 fn get_partition_columns(&self) -> Result<Vec<String>, WriteError> {
365 let active_partitions = self
367 .snapshot
368 .as_ref()
369 .map(|s| s.metadata().partition_columns().to_vec());
370
371 if let Some(active_part) = active_partitions {
372 if let Some(ref partition_columns) = self.partition_columns {
373 if &active_part != partition_columns {
374 if self.can_overwrite_partition_columns() {
375 Ok(partition_columns.clone())
376 } else {
377 Err(WriteError::PartitionColumnMismatch {
378 expected: active_part,
379 got: partition_columns.to_vec(),
380 })
381 }
382 } else {
383 Ok(partition_columns.clone())
384 }
385 } else {
386 Ok(active_part)
387 }
388 } else {
389 Ok(self.partition_columns.clone().unwrap_or_default().to_vec())
390 }
391 }
392
393 async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
394 if self.schema_mode == Some(SchemaMode::Overwrite) && self.mode != SaveMode::Overwrite {
395 return Err(DeltaTableError::Generic(
396 "Schema overwrite not supported for Append".to_string(),
397 ));
398 }
399
400 let input = self
401 .input
402 .as_ref()
403 .ok_or::<DeltaTableError>(WriteError::MissingData.into())?;
404 let normalized_arrow = normalize_for_delta(input.schema().inner());
405 let schema: StructType = normalized_arrow.try_into_kernel()?;
406
407 match &self.snapshot {
408 Some(snapshot) => {
409 if snapshot.table_configuration().column_mapping_mode() != ColumnMappingMode::None {
410 return Err(unsupported_column_mapping_write("WRITE"));
411 }
412
413 if self.mode == SaveMode::Overwrite {
414 PROTOCOL.check_append_only(snapshot)?;
415 if !snapshot.load_config().require_files {
416 return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
417 }
418 }
419
420 PROTOCOL.can_write_to(snapshot)?;
421
422 if self.schema_mode.is_none() {
423 PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?;
424 #[cfg(feature = "nanosecond-timestamps")]
425 PROTOCOL.check_can_write_timestamp_nanos(snapshot, &schema)?;
426 }
427 match self.mode {
428 SaveMode::ErrorIfExists => {
429 Err(WriteError::AlreadyExists(self.log_store.root_url().clone()).into())
430 }
431 _ => Ok(vec![]),
432 }
433 }
434 None => {
435 let mut builder = CreateBuilder::new()
436 .with_log_store(self.log_store.clone())
437 .with_columns(schema.fields().cloned())
438 .with_configuration(self.configuration.clone());
439 if let Some(partition_columns) = self.partition_columns.as_ref() {
440 builder = builder.with_partition_columns(partition_columns.clone())
441 }
442
443 if let Some(name) = self.name.as_ref() {
444 builder = builder.with_table_name(name.clone());
445 };
446
447 if let Some(desc) = self.description.as_ref() {
448 builder = builder.with_comment(desc.clone());
449 };
450
451 let (_, actions, _, _) = builder.into_table_and_actions().await?;
452 Ok(actions)
453 }
454 }
455 }
456}
457
458impl std::future::IntoFuture for WriteBuilder {
459 type Output = DeltaResult<DeltaTable>;
460 type IntoFuture = BoxFuture<'static, Self::Output>;
461
462 fn into_future(self) -> Self::IntoFuture {
463 let mut this = self;
464 let table_uri = this.log_store.root_url().clone();
465 let mode = this.mode;
466
467 Box::pin(
468 async move {
469 let operation_id = this.get_operation_id();
471 this.pre_execute(operation_id).await?;
472
473 let mut metrics = WriteMetrics::default();
474 let exec_start = Instant::now();
475
476 let mut actions = this.check_preconditions().await?;
479
480 let partition_columns = this.get_partition_columns()?;
481
482 let Some(source) = this.input.take() else {
483 return Err(WriteError::MissingData.into());
484 };
485
486 let (session, _) = resolve_session_state(
487 this.session.as_deref(),
488 this.session_fallback_policy,
489 || create_session().state(),
490 SessionResolveContext {
491 operation: "write",
492 table_uri: Some(this.log_store.root_url()),
493 cdc: false,
494 },
495 )?;
496
497 update_datafusion_session(&session, &this.log_store, Some(operation_id))?;
498 session.ensure_log_store_registered(this.log_store.as_ref())?;
499
500 let prepared_write = plan::prepare_write(plan::WritePreparationInput {
501 snapshot: this.snapshot.as_ref(),
502 session: &session,
503 source,
504 mode: this.mode,
505 schema_mode: this.schema_mode,
506 safe_cast: this.safe_cast,
507 partition_columns: partition_columns.clone(),
508 predicate: this.predicate,
509 target_file_size: this.target_file_size,
510 write_batch_size: this.write_batch_size,
511 writer_properties: this.writer_properties.clone(),
512 configuration: &this.configuration,
513 })?;
514
515 let overwrite_plan = plan::plan_overwrite_rewrite(
516 this.snapshot.as_ref(),
517 &this.log_store,
518 &session,
519 this.mode,
520 &prepared_write,
521 operation_id,
522 )
523 .await?;
524
525 if overwrite_plan.diagnostics.dropped_pruning_term_count > 0 {
526 tracing::warn!(
527 rewrite_kind = ?overwrite_plan.kind,
528 matched_file_count = overwrite_plan.diagnostics.matched_file_count,
529 translated_pruning_term_count =
530 overwrite_plan.diagnostics.translated_pruning_term_count,
531 dropped_pruning_term_count =
532 overwrite_plan.diagnostics.dropped_pruning_term_count,
533 "overwrite rewrite predicate was only partially translated for pruning; exact validation remains enabled"
534 );
535 }
536
537 let plan::PreparedWrite {
538 schema_delta,
539 exact_validation,
540 exec_options,
541 ..
542 } = prepared_write;
543 actions.extend(schema_delta.into_actions());
544
545 metrics.num_removed_files = overwrite_plan.num_removed_files();
546
547 let plan::WriteExecOptions {
548 partition_columns,
549 target_file_size,
550 write_batch_size,
551 writer_properties,
552 writer_stats_config,
553 } = exec_options;
554 let predicate_sql = exact_validation.as_ref().map(fmt_expr_to_sql).transpose()?;
555 let (sink_plan, contains_cdc, insert_marker_column) =
556 overwrite_plan.build_sink_plan()?;
557 let source_plan = session.create_physical_plan(&sink_plan).await?;
558
559 let (add_actions, _) = write_execution_plan_v2(
561 this.snapshot.as_ref(),
562 &session,
563 source_plan.clone(),
564 partition_columns.clone(),
565 this.log_store.object_store(Some(operation_id)).clone(),
566 target_file_size,
567 write_batch_size,
568 writer_properties,
569 writer_stats_config,
570 exact_validation,
571 contains_cdc,
572 insert_marker_column,
573 )
574 .await?;
575
576 actions.extend(
577 overwrite_plan
578 .matched_existing
579 .into_actions(overwrite_plan.deletion_timestamp)?,
580 );
581
582 let source_count =
583 find_metric_node(SOURCE_COUNT_ID, &source_plan).ok_or_else(|| {
584 DeltaTableError::Generic("Unable to locate expected metric node".into())
585 })?;
586 let source_count_metrics = source_count.metrics().unwrap();
587 let num_added_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
588 metrics.num_added_rows = num_added_rows;
589
590 metrics.num_added_files = add_actions.len();
591 actions.extend(add_actions);
592
593 metrics.execution_time_ms =
594 Instant::now().duration_since(exec_start).as_millis() as u64;
595
596 let operation = DeltaOperation::Write {
597 mode: this.mode,
598 partition_by: if !partition_columns.is_empty() {
599 Some(partition_columns)
600 } else {
601 None
602 },
603 predicate: predicate_sql,
604 };
605
606 let mut commit_properties = this.commit_properties.clone();
607 commit_properties.app_metadata.insert(
608 "operationMetrics".to_owned(),
609 serde_json::to_value(&metrics)?,
610 );
611
612 let commit = CommitBuilder::from(commit_properties)
613 .with_actions(actions)
614 .with_post_commit_hook_handler(this.custom_execute_handler.clone())
615 .with_operation_id(operation_id)
616 .build(
617 this.snapshot.as_ref().map(|f| f as &dyn TableReference),
618 this.log_store.clone(),
619 operation.clone(),
620 )
621 .await?;
622
623 if let Some(handler) = this.custom_execute_handler {
624 handler.post_execute(&this.log_store, operation_id).await?;
625 }
626
627 Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot))
628 }
629 .instrument(tracing::info_span!(
630 "write_operation",
631 operation = "write",
632 mode = ?mode,
633 table_uri = %table_uri
634 )),
635 )
636 }
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642 use crate::TableProperty;
643 use crate::ensure_table_uri;
644 use crate::kernel::CommitInfo;
645 use crate::logstore::get_actions;
646 use crate::operations::collect_sendable_stream;
647 use crate::protocol::SaveMode;
648 use crate::test_utils::{TestResult, TestSchemas};
649 use crate::writer::test_utils::datafusion::{get_data, get_data_sorted, write_batch};
650 use crate::writer::test_utils::{
651 get_arrow_schema, get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch,
652 get_record_batch_with_nested_struct, setup_table_with_configuration,
653 };
654 use arrow_array::{
655 Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
656 };
657 use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
658 use datafusion::physical_plan::collect;
659 use datafusion::prelude::*;
660 use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
661 use delta_kernel::engine::arrow_conversion::TryIntoArrow;
662 use delta_kernel::schema::MetadataValue;
663 use futures::TryStreamExt;
664 use itertools::Itertools;
665 use serde_json::{Value, json};
666
667 async fn get_write_metrics(table: &DeltaTable) -> WriteMetrics {
668 let mut commit_info: Vec<_> = table.history(Some(1)).await.unwrap().collect();
669 let metrics = commit_info
670 .first_mut()
671 .unwrap()
672 .info
673 .remove("operationMetrics")
674 .unwrap();
675 serde_json::from_value(metrics).unwrap()
676 }
677
678 async fn query_table(table: &DeltaTable, sql: &str) -> TestResult<Vec<RecordBatch>> {
679 let table = DeltaTable::new_with_state(
680 table.log_store.clone(),
681 table.state.as_ref().unwrap().clone(),
682 );
683 let ctx = SessionContext::new();
684 table.update_datafusion_session(&ctx.state()).unwrap();
685 ctx.register_table("test", table.table_provider().await.unwrap())
686 .unwrap();
687
688 Ok(ctx.sql(sql).await?.collect().await?)
689 }
690
691 async fn query_single_i64_row(table: &DeltaTable, sql: &str) -> TestResult<Vec<i64>> {
692 let batches = query_table(table, sql).await?;
693 let batch = batches
694 .first()
695 .expect("expected aggregate query to return a single batch");
696
697 Ok(batch
698 .columns()
699 .iter()
700 .map(|column| {
701 column
702 .as_any()
703 .downcast_ref::<Int64Array>()
704 .expect("expected Int64 aggregate column")
705 .value(0)
706 })
707 .collect())
708 }
709
710 async fn query_i32_rows(table: &DeltaTable, sql: &str, column: &str) -> TestResult<Vec<i32>> {
711 let mut values = Vec::new();
712 for batch in query_table(table, sql).await? {
713 let array = batch
714 .column_by_name(column)
715 .expect("expected query column")
716 .as_any()
717 .downcast_ref::<Int32Array>()
718 .expect("expected Int32 query column");
719 values.extend(
720 array
721 .iter()
722 .map(|value| value.expect("expected non-null Int32 value")),
723 );
724 }
725 Ok(values)
726 }
727
728 async fn open_copied_table_fixture(
729 fixture_source: &std::path::Path,
730 table_dir_name: &str,
731 ) -> TestResult<(tempfile::TempDir, DeltaTable)> {
732 let temp_dir = tempfile::tempdir()?;
733 fs_extra::dir::copy(fixture_source, temp_dir.path(), &Default::default())?;
734 let table_url =
735 url::Url::from_directory_path(temp_dir.path().join(table_dir_name).canonicalize()?)
736 .unwrap();
737 Ok((temp_dir, crate::open_table(table_url).await?))
738 }
739
740 async fn latest_remove_actions(table: &DeltaTable) -> TestResult<Vec<crate::kernel::Remove>> {
741 let version = table
742 .version()
743 .expect("expected committed version for latest remove actions");
744 let snapshot_bytes = table
745 .log_store
746 .read_commit_entry(version)
747 .await?
748 .expect("failed to get snapshot bytes");
749 Ok(get_actions(version, &snapshot_bytes)?
750 .into_iter()
751 .filter_map(|action| match action {
752 Action::Remove(remove) => Some(remove),
753 _ => None,
754 })
755 .collect())
756 }
757
758 async fn modified_partitioned_table(batch: &RecordBatch) -> TestResult<DeltaTable> {
759 Ok(DeltaTable::new_in_memory()
760 .write(vec![batch.clone()])
761 .with_partition_columns(["modified"])
762 .await?)
763 }
764
765 fn expect_write_error(err: &DeltaTableError) -> &WriteError {
766 let DeltaTableError::GenericError { source } = err else {
767 panic!("expected WriteError, got {err:?}");
768 };
769 source
770 .downcast_ref::<WriteError>()
771 .expect("expected WriteError source")
772 }
773
774 fn assert_common_write_metrics(write_metrics: WriteMetrics) {
775 assert!(write_metrics.num_added_files > 0);
777 }
778
779 #[tokio::test]
780 async fn test_write_when_delta_table_is_append_only() {
781 let table = setup_table_with_configuration(TableProperty::AppendOnly, Some("true")).await;
782 let batch = get_record_batch(None, false);
783 let table = write_batch(table, batch.clone()).await;
785 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
786 assert_eq!(write_metrics.num_added_rows, batch.num_rows());
787 assert_eq!(write_metrics.num_removed_files, 0);
788 assert_common_write_metrics(write_metrics);
789
790 let _err = table
792 .write(vec![batch])
793 .with_save_mode(SaveMode::Overwrite)
794 .await
795 .expect_err("Remove action is included when Delta table is append-only. Should error");
796 }
797
798 #[tokio::test]
799 async fn test_create_write() {
800 let table_schema = get_delta_schema();
801 let batch = get_record_batch(None, false);
802
803 let table = DeltaTable::new_in_memory()
804 .create()
805 .with_columns(table_schema.fields().cloned())
806 .await
807 .unwrap();
808 assert_eq!(table.version(), Some(0));
809
810 let metadata = HashMap::from_iter(vec![("k1".to_string(), json!("v1.1"))]);
812 let mut table = table
813 .write(vec![batch.clone()])
814 .with_save_mode(SaveMode::Append)
815 .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone()))
816 .await
817 .unwrap();
818 assert_eq!(table.version(), Some(1));
819 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
820
821 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
822 assert_eq!(write_metrics.num_added_rows, batch.num_rows());
823 assert_eq!(
824 write_metrics.num_added_files,
825 table.snapshot().unwrap().log_data().num_files()
826 );
827 assert_common_write_metrics(write_metrics);
828
829 table.load().await.unwrap();
830 let history: Vec<CommitInfo> = table.history(None).await.unwrap().collect();
831 assert_eq!(history.len(), 2);
832 assert_eq!(
833 history[0]
834 .info
835 .clone()
836 .into_iter()
837 .filter(|(k, _)| k == "k1")
838 .collect::<HashMap<String, Value>>(),
839 metadata
840 );
841
842 let metadata: HashMap<String, Value> =
844 HashMap::from_iter(vec![("k1".to_string(), json!("v1.2"))]);
845 let mut table = table
846 .write(vec![batch.clone()])
847 .with_save_mode(SaveMode::Append)
848 .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone()))
849 .await
850 .unwrap();
851 assert_eq!(table.version(), Some(2));
852 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
853 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
854 assert_eq!(write_metrics.num_added_rows, batch.num_rows());
855 assert_eq!(write_metrics.num_added_files, 1);
856 assert_common_write_metrics(write_metrics);
857
858 table.load().await.unwrap();
859 let history: Vec<CommitInfo> = table.history(None).await.unwrap().collect();
860 assert_eq!(history.len(), 3);
861 assert_eq!(
862 history[0]
863 .info
864 .clone()
865 .into_iter()
866 .filter(|(k, _)| k == "k1")
867 .collect::<HashMap<String, Value>>(),
868 metadata
869 );
870
871 let metadata: HashMap<String, Value> =
873 HashMap::from_iter(vec![("k2".to_string(), json!("v2.1"))]);
874 let mut table = table
875 .write(vec![batch.clone()])
876 .with_save_mode(SaveMode::Overwrite)
877 .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone()))
878 .await
879 .unwrap();
880 assert_eq!(table.version(), Some(3));
881 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
882 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
883 assert_eq!(write_metrics.num_added_rows, batch.num_rows());
884 assert!(write_metrics.num_removed_files > 0);
885 assert_common_write_metrics(write_metrics);
886
887 table.load().await.unwrap();
888 let history: Vec<CommitInfo> = table.history(None).await.unwrap().collect();
889 assert_eq!(history.len(), 4);
890 assert_eq!(
891 history[0]
892 .info
893 .clone()
894 .into_iter()
895 .filter(|(k, _)| k == "k2")
896 .collect::<HashMap<String, Value>>(),
897 metadata
898 );
899 }
900
901 #[tokio::test]
902 async fn test_write_different_types() {
903 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
907 "value",
908 DataType::Int32,
909 true,
910 )]));
911
912 let batch = RecordBatch::try_new(
913 Arc::clone(&schema),
914 vec![Arc::new(Int32Array::from(vec![Some(0), None]))],
915 )
916 .unwrap();
917 let table = DeltaTable::new_in_memory()
918 .write(vec![batch])
919 .await
920 .unwrap();
921 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
922 assert_eq!(write_metrics.num_added_rows, 2);
923 assert_common_write_metrics(write_metrics);
924
925 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
926 "value",
927 DataType::Utf8,
928 true,
929 )]));
930
931 let batch = RecordBatch::try_new(
932 Arc::clone(&schema),
933 vec![Arc::new(StringArray::from(vec![
934 Some("Test123".to_owned()),
935 Some("123".to_owned()),
936 None,
937 ]))],
938 )
939 .unwrap();
940
941 let table = table
943 .write(vec![batch.clone()])
944 .with_cast_safety(true)
945 .await
946 .unwrap();
947
948 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
949 assert_eq!(write_metrics.num_added_rows, 3);
950 assert_common_write_metrics(write_metrics);
951
952 let expected = [
953 "+-------+",
954 "| value |",
955 "+-------+",
956 "| |",
957 "| |",
958 "| |",
959 "| 123 |",
960 "| 0 |",
961 "+-------+",
962 ];
963 let actual = get_data(&table).await;
964 assert_batches_sorted_eq!(&expected, &actual);
965
966 let res = table.write(vec![batch]).await;
967 assert!(res.is_err());
968
969 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
971 "value",
972 arrow::datatypes::DataType::Utf8,
973 true,
974 )]));
975
976 let batch = RecordBatch::try_new(
977 Arc::clone(&schema),
978 vec![Arc::new(StringArray::from(vec![Some(
979 "2023-06-03 15:35:00".to_owned(),
980 )]))],
981 )
982 .unwrap();
983 let table = DeltaTable::new_in_memory()
984 .write(vec![batch])
985 .await
986 .unwrap();
987
988 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
989 assert_eq!(write_metrics.num_added_rows, 1);
990 assert_common_write_metrics(write_metrics);
991
992 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
993 "value",
994 DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())),
995 true,
996 )]));
997 let batch = RecordBatch::try_new(
998 Arc::clone(&schema),
999 vec![Arc::new(
1000 TimestampMicrosecondArray::from(vec![Some(10000)]).with_timezone("UTC"),
1001 )],
1002 )
1003 .unwrap();
1004
1005 let _res = table.write(vec![batch]).await.unwrap();
1006 let expected = [
1007 "+--------------------------+",
1008 "| value |",
1009 "+--------------------------+",
1010 "| 1970-01-01T00:00:00.010Z |",
1011 "| 2023-06-03 15:35:00 |",
1012 "+--------------------------+",
1013 ];
1014 let actual = get_data(&_res).await;
1015 assert_batches_sorted_eq!(&expected, &actual);
1016 }
1017
1018 #[tokio::test]
1019 async fn test_write_nonexistent() {
1020 let batch = get_record_batch(None, false);
1021 let table = DeltaTable::new_in_memory()
1022 .write(vec![batch])
1023 .with_save_mode(SaveMode::ErrorIfExists)
1024 .await
1025 .unwrap();
1026 assert_eq!(table.version(), Some(0));
1027 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
1028 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1029 assert_common_write_metrics(write_metrics);
1030 }
1031
1032 #[tokio::test]
1033 async fn test_write_partitioned() {
1034 let batch = get_record_batch(None, false);
1035 let table = DeltaTable::new_in_memory()
1036 .write(vec![batch.clone()])
1037 .with_save_mode(SaveMode::ErrorIfExists)
1038 .with_partition_columns(["modified"])
1039 .await
1040 .unwrap();
1041 assert_eq!(table.version(), Some(0));
1042 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
1043 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1044 assert_eq!(write_metrics.num_added_files, 2);
1045 assert_common_write_metrics(write_metrics);
1046
1047 let table = DeltaTable::new_in_memory()
1048 .write(vec![batch])
1049 .with_save_mode(SaveMode::ErrorIfExists)
1050 .with_partition_columns(["modified", "id"])
1051 .await
1052 .unwrap();
1053 assert_eq!(table.version(), Some(0));
1054 assert_eq!(table.snapshot().unwrap().log_data().num_files(), 4);
1055
1056 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1057 assert_eq!(write_metrics.num_added_files, 4);
1058 assert_common_write_metrics(write_metrics);
1059 }
1060
1061 #[tokio::test]
1062 async fn test_write_partitioned_parallel_writers() {
1063 let batch = get_record_batch(None, false);
1064
1065 let multi_stream_input: Arc<dyn TableProvider> = Arc::new(
1066 MemTable::try_new(
1067 batch.schema(),
1068 vec![
1069 vec![batch.clone()],
1070 vec![batch.clone()],
1071 vec![batch.clone()],
1072 ],
1073 )
1074 .unwrap(),
1075 );
1076 let multi_stream_plan =
1077 LogicalPlanBuilder::scan("source", provider_as_source(multi_stream_input), None)
1078 .unwrap()
1079 .build()
1080 .unwrap();
1081
1082 let parallel_table = DeltaTable::new_in_memory()
1083 .write(vec![])
1084 .with_save_mode(SaveMode::ErrorIfExists)
1085 .with_input_plan(multi_stream_plan)
1086 .with_partition_columns(["modified"])
1087 .await
1088 .unwrap();
1089
1090 let single_writer_table = DeltaTable::new_in_memory()
1091 .write(vec![batch.clone(), batch.clone(), batch.clone()])
1092 .with_save_mode(SaveMode::ErrorIfExists)
1093 .with_partition_columns(["modified"])
1094 .await
1095 .unwrap();
1096
1097 let parallel_data = get_data_sorted(¶llel_table, "modified, id, value").await;
1098 let single_writer_data = get_data_sorted(&single_writer_table, "modified, id, value").await;
1099 assert_eq!(parallel_data, single_writer_data);
1100
1101 let parallel_files = parallel_table.snapshot().unwrap().log_data().num_files();
1102 let single_writer_files = single_writer_table
1103 .snapshot()
1104 .unwrap()
1105 .log_data()
1106 .num_files();
1107 assert_eq!(parallel_files, single_writer_files);
1108 assert_eq!(parallel_files, 2);
1109
1110 let parallel_write_metrics: WriteMetrics = get_write_metrics(¶llel_table).await;
1111 let single_writer_metrics: WriteMetrics = get_write_metrics(&single_writer_table).await;
1112 assert_eq!(
1113 parallel_write_metrics.num_added_files,
1114 single_writer_metrics.num_added_files
1115 );
1116 assert_eq!(parallel_write_metrics.num_added_files, 2);
1117 }
1118
1119 #[tokio::test]
1120 async fn test_write_partitioned_parallel_writers_error_propagation() {
1121 let batch = get_record_batch(None, false);
1122
1123 let schema: StructType = serde_json::from_value(json!({
1124 "type": "struct",
1125 "fields": [
1126 {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1127 {"name": "value", "type": "integer", "nullable": true, "metadata": {
1128 "delta.invariants": "{\"expression\": { \"expression\": \"value < 6\"} }"
1129 }},
1130 {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1131 ]
1132 }))
1133 .unwrap();
1134
1135 let table = DeltaTable::new_in_memory()
1136 .create()
1137 .with_save_mode(SaveMode::ErrorIfExists)
1138 .with_columns(schema.fields().cloned())
1139 .with_partition_columns(["modified"])
1140 .await
1141 .unwrap();
1142
1143 let multi_stream_input: Arc<dyn TableProvider> = Arc::new(
1144 MemTable::try_new(
1145 batch.schema(),
1146 vec![
1147 vec![batch.clone()],
1148 vec![batch.clone()],
1149 vec![batch.clone()],
1150 ],
1151 )
1152 .unwrap(),
1153 );
1154 let multi_stream_plan =
1155 LogicalPlanBuilder::scan("source", provider_as_source(multi_stream_input), None)
1156 .unwrap()
1157 .build()
1158 .unwrap();
1159
1160 let result = table
1161 .write(vec![])
1162 .with_save_mode(SaveMode::Append)
1163 .with_input_plan(multi_stream_plan)
1164 .await;
1165
1166 assert!(
1167 result.is_err(),
1168 "write should fail when invariant is violated in parallel writers"
1169 );
1170 }
1171
1172 #[tokio::test]
1173 async fn test_merge_schema() {
1174 let batch = get_record_batch(None, false);
1175 let table = DeltaTable::new_in_memory()
1176 .write(vec![batch.clone()])
1177 .with_save_mode(SaveMode::ErrorIfExists)
1178 .await
1179 .unwrap();
1180 assert_eq!(table.version(), Some(0));
1181
1182 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1183 assert_common_write_metrics(write_metrics);
1184
1185 let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1186 for field in batch.schema().fields() {
1187 if field.name() != "modified" {
1188 new_schema_builder.push(field.clone());
1189 }
1190 }
1191 new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1192 let new_schema = new_schema_builder.finish();
1193 let new_fields = new_schema.fields();
1194 let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1195 assert_eq!(new_names, vec!["id", "value", "inserted_by"]);
1196 let inserted_by = StringArray::from(vec![
1197 Some("A1"),
1198 Some("B1"),
1199 None,
1200 Some("B2"),
1201 Some("A3"),
1202 Some("A4"),
1203 None,
1204 None,
1205 Some("B4"),
1206 Some("A5"),
1207 Some("A7"),
1208 ]);
1209 let new_batch = RecordBatch::try_new(
1210 Arc::new(new_schema),
1211 vec![
1212 Arc::new(batch.column_by_name("id").unwrap().clone()),
1213 Arc::new(batch.column_by_name("value").unwrap().clone()),
1214 Arc::new(inserted_by),
1215 ],
1216 )
1217 .unwrap();
1218
1219 let mut table = table
1220 .write(vec![new_batch])
1221 .with_save_mode(SaveMode::Append)
1222 .with_schema_mode(SchemaMode::Merge)
1223 .await
1224 .unwrap();
1225 table.load().await.unwrap();
1226 assert_eq!(table.version(), Some(1));
1227 let new_schema = table.snapshot().unwrap().metadata().parse_schema().unwrap();
1228 let fields = new_schema.fields();
1229 let names = fields.map(|f| f.name()).collect::<Vec<_>>();
1230 assert_eq!(names, vec!["id", "value", "modified", "inserted_by"]);
1231
1232 let metadata = table
1234 .snapshot()
1235 .expect("Failed to retrieve updated snapshot")
1236 .metadata();
1237 assert_ne!(
1238 None,
1239 metadata.created_time(),
1240 "Created time should be the milliseconds since epoch of when the action was created"
1241 );
1242
1243 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1244 assert_common_write_metrics(write_metrics);
1245 }
1246
1247 #[tokio::test]
1248 async fn test_merge_schema_with_partitions() {
1249 let batch = get_record_batch(None, false);
1250 let table = DeltaTable::new_in_memory()
1251 .write(vec![batch.clone()])
1252 .with_partition_columns(vec!["id", "value"])
1253 .with_save_mode(SaveMode::ErrorIfExists)
1254 .await
1255 .unwrap();
1256 assert_eq!(table.version(), Some(0));
1257
1258 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1259 assert_common_write_metrics(write_metrics);
1260
1261 let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1262 for field in batch.schema().fields() {
1263 if field.name() != "modified" {
1264 new_schema_builder.push(field.clone());
1265 }
1266 }
1267 new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1268 let new_schema = new_schema_builder.finish();
1269 let new_fields = new_schema.fields();
1270 let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1271 assert_eq!(new_names, vec!["id", "value", "inserted_by"]);
1272 let inserted_by = StringArray::from(vec![
1273 Some("A1"),
1274 Some("B1"),
1275 None,
1276 Some("B2"),
1277 Some("A3"),
1278 Some("A4"),
1279 None,
1280 None,
1281 Some("B4"),
1282 Some("A5"),
1283 Some("A7"),
1284 ]);
1285 let new_batch = RecordBatch::try_new(
1286 Arc::new(new_schema),
1287 vec![
1288 Arc::new(batch.column_by_name("id").unwrap().clone()),
1289 Arc::new(batch.column_by_name("value").unwrap().clone()),
1290 Arc::new(inserted_by),
1291 ],
1292 )
1293 .unwrap();
1294 let table = table
1295 .write(vec![new_batch])
1296 .with_save_mode(SaveMode::Append)
1297 .with_schema_mode(SchemaMode::Merge)
1298 .await
1299 .unwrap();
1300
1301 assert_eq!(table.version(), Some(1));
1302 let new_schema = table.snapshot().unwrap().metadata().parse_schema().unwrap();
1303 let fields = new_schema.fields();
1304 let mut names = fields.map(|f| f.name()).collect::<Vec<_>>();
1305 names.sort();
1306 assert_eq!(names, vec!["id", "inserted_by", "modified", "value"]);
1307 let part_cols = table.snapshot().unwrap().metadata().partition_columns();
1308 assert_eq!(part_cols, ["id".to_string(), "value".to_string()]); let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1311 assert_common_write_metrics(write_metrics);
1312 }
1313
1314 #[tokio::test]
1315 async fn test_merge_schema_with_partitions_allows_source_missing_partition_column() -> TestResult
1316 {
1317 let batch = get_record_batch(None, false);
1318 let table = modified_partitioned_table(&batch).await?;
1319
1320 let evolved_schema = Arc::new(ArrowSchema::new(vec![
1321 batch.schema().field(0).as_ref().clone(),
1322 batch.schema().field(1).as_ref().clone(),
1323 Field::new("inserted_by", DataType::Utf8, true),
1324 ]));
1325 let evolved_batch = RecordBatch::try_new(
1326 evolved_schema,
1327 vec![
1328 batch.column(0).clone(),
1329 batch.column(1).clone(),
1330 Arc::new(StringArray::from(vec![
1331 Some("A1"),
1332 Some("B1"),
1333 None,
1334 Some("B2"),
1335 Some("A3"),
1336 Some("A4"),
1337 None,
1338 None,
1339 Some("B4"),
1340 Some("A5"),
1341 Some("A7"),
1342 ])),
1343 ],
1344 )?;
1345
1346 let table = table
1347 .write(vec![evolved_batch])
1348 .with_save_mode(SaveMode::Append)
1349 .with_schema_mode(SchemaMode::Merge)
1350 .await?;
1351
1352 assert_eq!(table.version(), Some(1));
1353 let schema = table.snapshot().unwrap().metadata().parse_schema()?;
1354 let names = schema
1355 .fields()
1356 .map(|field| field.name())
1357 .collect::<Vec<_>>();
1358 assert_eq!(names, vec!["id", "value", "modified", "inserted_by"]);
1359 assert_eq!(
1360 table.snapshot().unwrap().metadata().partition_columns(),
1361 &vec!["modified".to_string()]
1362 );
1363
1364 Ok(())
1365 }
1366
1367 #[tokio::test]
1368 async fn test_merge_schema_preserves_existing_field_metadata() {
1369 let schema: StructType = serde_json::from_value(json!({
1370 "type": "struct",
1371 "fields": [
1372 {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1373 {"name": "value", "type": "integer", "nullable": true, "metadata": {
1374 "delta.invariants": "{\"expression\": { \"expression\": \"value < 12\"} }",
1375 "delta.userMetadata": "preserve-me"
1376 }},
1377 {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1378 ]
1379 }))
1380 .unwrap();
1381
1382 let table = DeltaTable::new_in_memory()
1383 .create()
1384 .with_save_mode(SaveMode::ErrorIfExists)
1385 .with_columns(schema.fields().cloned())
1386 .await
1387 .unwrap()
1388 .write(vec![get_record_batch(None, false)])
1389 .await
1390 .unwrap();
1391
1392 let batch = get_record_batch(None, false);
1393 let evolved_schema = Arc::new(ArrowSchema::new(vec![
1394 batch.schema().field(0).as_ref().clone(),
1395 batch.schema().field(1).as_ref().clone(),
1396 batch.schema().field(2).as_ref().clone(),
1397 Field::new("inserted_by", DataType::Utf8, true),
1398 ]));
1399 let evolved_batch = RecordBatch::try_new(
1400 evolved_schema,
1401 vec![
1402 batch.column(0).clone(),
1403 batch.column(1).clone(),
1404 batch.column(2).clone(),
1405 Arc::new(StringArray::from(vec![
1406 Some("A1"),
1407 Some("B1"),
1408 None,
1409 Some("B2"),
1410 Some("A3"),
1411 Some("A4"),
1412 None,
1413 None,
1414 Some("B4"),
1415 Some("A5"),
1416 Some("A7"),
1417 ])),
1418 ],
1419 )
1420 .unwrap();
1421
1422 let table = table
1423 .write(vec![evolved_batch])
1424 .with_save_mode(SaveMode::Append)
1425 .with_schema_mode(SchemaMode::Merge)
1426 .await
1427 .unwrap();
1428
1429 let schema = table.snapshot().unwrap().metadata().parse_schema().unwrap();
1430 let value = schema.field("value").unwrap();
1431 assert_eq!(
1432 value.metadata.get("delta.invariants"),
1433 Some(&MetadataValue::String(
1434 "{\"expression\": { \"expression\": \"value < 12\"} }".to_string()
1435 ))
1436 );
1437 assert_eq!(
1438 value.metadata.get("delta.userMetadata"),
1439 Some(&MetadataValue::String("preserve-me".to_string()))
1440 );
1441 }
1442
1443 #[tokio::test]
1444 async fn test_overwrite_schema() {
1445 let batch = get_record_batch(None, false);
1446 let table = DeltaTable::new_in_memory()
1447 .write(vec![batch.clone()])
1448 .with_save_mode(SaveMode::ErrorIfExists)
1449 .await
1450 .unwrap();
1451 assert_eq!(table.version(), Some(0));
1452 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1453 assert_common_write_metrics(write_metrics);
1454 let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1455 for field in batch.schema().fields() {
1456 if field.name() != "modified" {
1457 new_schema_builder.push(field.clone());
1458 }
1459 }
1460 new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1461 let new_schema = new_schema_builder.finish();
1462 let new_fields = new_schema.fields();
1463 let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1464 assert_eq!(new_names, vec!["id", "value", "inserted_by"]);
1465 let inserted_by = StringArray::from(vec![
1466 Some("A1"),
1467 Some("B1"),
1468 None,
1469 Some("B2"),
1470 Some("A3"),
1471 Some("A4"),
1472 None,
1473 None,
1474 Some("B4"),
1475 Some("A5"),
1476 Some("A7"),
1477 ]);
1478 let new_batch = RecordBatch::try_new(
1479 Arc::new(new_schema),
1480 vec![
1481 Arc::new(batch.column_by_name("id").unwrap().clone()),
1482 Arc::new(batch.column_by_name("value").unwrap().clone()),
1483 Arc::new(inserted_by),
1484 ],
1485 )
1486 .unwrap();
1487
1488 let table = table
1489 .write(vec![new_batch])
1490 .with_save_mode(SaveMode::Append)
1491 .with_schema_mode(SchemaMode::Overwrite)
1492 .await;
1493 assert!(table.is_err());
1494 }
1495
1496 #[tokio::test]
1497 async fn test_overwrite_schema_can_change_partition_columns_without_schema_change() -> TestResult
1498 {
1499 let batch = get_record_batch(None, false);
1500
1501 let table = modified_partitioned_table(&batch).await?;
1502
1503 assert_eq!(
1504 table.snapshot().unwrap().metadata().partition_columns(),
1505 &vec!["modified".to_string()]
1506 );
1507 let initial_num_files = table.snapshot().unwrap().log_data().num_files();
1508
1509 let table = table
1510 .write(vec![batch])
1511 .with_save_mode(SaveMode::Overwrite)
1512 .with_schema_mode(SchemaMode::Overwrite)
1513 .with_partition_columns(["id"])
1514 .await?;
1515
1516 assert_eq!(table.version(), Some(1));
1517 assert_eq!(
1518 table.snapshot().unwrap().metadata().partition_columns(),
1519 &vec!["id".to_string()]
1520 );
1521
1522 let add_paths = table
1523 .snapshot()
1524 .unwrap()
1525 .log_data()
1526 .iter()
1527 .map(|add| add.path().into_owned())
1528 .collect::<Vec<_>>();
1529 assert!(!add_paths.is_empty());
1530 assert!(add_paths.iter().all(|path| path.contains("id=")));
1531
1532 let remove_actions = latest_remove_actions(&table).await?;
1533 assert_eq!(remove_actions.len(), initial_num_files);
1534 assert!(
1535 remove_actions
1536 .iter()
1537 .all(|remove| remove.deletion_timestamp.is_some())
1538 );
1539
1540 let commit_info: Vec<_> = table.history(Some(1)).await?.collect();
1541 let operation_parameters = commit_info[0].operation_parameters.as_ref().unwrap();
1542 assert_eq!(operation_parameters["partitionBy"], json!("[\"id\"]"));
1543
1544 Ok(())
1545 }
1546
1547 #[tokio::test]
1548 async fn test_overwrite_schema_can_change_schema_and_partition_columns() -> TestResult {
1549 let batch = get_record_batch(None, false);
1550 let table = modified_partitioned_table(&batch).await?;
1551
1552 let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1553 for field in batch.schema().fields() {
1554 if field.name() != "modified" {
1555 new_schema_builder.push(field.clone());
1556 }
1557 }
1558 new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1559 let new_schema = new_schema_builder.finish();
1560 let inserted_by = StringArray::from(vec![
1561 Some("A1"),
1562 Some("B1"),
1563 None,
1564 Some("B2"),
1565 Some("A3"),
1566 Some("A4"),
1567 None,
1568 None,
1569 Some("B4"),
1570 Some("A5"),
1571 Some("A7"),
1572 ]);
1573 let new_batch = RecordBatch::try_new(
1574 Arc::new(new_schema),
1575 vec![
1576 Arc::new(batch.column_by_name("id").unwrap().clone()),
1577 Arc::new(batch.column_by_name("value").unwrap().clone()),
1578 Arc::new(inserted_by),
1579 ],
1580 )?;
1581
1582 let table = table
1583 .write(vec![new_batch])
1584 .with_save_mode(SaveMode::Overwrite)
1585 .with_schema_mode(SchemaMode::Overwrite)
1586 .with_partition_columns(["inserted_by"])
1587 .await?;
1588
1589 let schema = table.snapshot().unwrap().metadata().parse_schema()?;
1590 let names = schema
1591 .fields()
1592 .map(|field| field.name())
1593 .collect::<Vec<_>>();
1594 assert_eq!(names, vec!["id", "value", "inserted_by"]);
1595 assert_eq!(
1596 table.snapshot().unwrap().metadata().partition_columns(),
1597 &vec!["inserted_by".to_string()]
1598 );
1599
1600 let add_paths = table
1601 .snapshot()
1602 .unwrap()
1603 .log_data()
1604 .iter()
1605 .map(|add| add.path().into_owned())
1606 .collect::<Vec<_>>();
1607 assert!(!add_paths.is_empty());
1608 assert!(add_paths.iter().all(|path| path.contains("inserted_by=")));
1609
1610 Ok(())
1611 }
1612
1613 #[tokio::test]
1614 async fn test_overwrite_schema_partition_change_preserves_table_metadata() -> TestResult {
1615 let batch = get_record_batch(None, false);
1616 let table = DeltaTable::new_in_memory()
1617 .write(vec![batch.clone()])
1618 .with_partition_columns(["modified"])
1619 .with_table_name("preserve_name")
1620 .with_description("preserve_description")
1621 .await?;
1622
1623 let initial_metadata = table.snapshot().unwrap().metadata().clone();
1624 let initial_created_time = initial_metadata.created_time();
1625
1626 let table = table
1627 .write(vec![batch])
1628 .with_save_mode(SaveMode::Overwrite)
1629 .with_schema_mode(SchemaMode::Overwrite)
1630 .with_partition_columns(["id"])
1631 .await?;
1632
1633 let metadata = table.snapshot().unwrap().metadata();
1634 assert_eq!(metadata.id(), initial_metadata.id());
1635 assert_eq!(metadata.name(), Some("preserve_name"));
1636 assert_eq!(metadata.description(), Some("preserve_description"));
1637 assert_eq!(metadata.created_time(), initial_created_time);
1638 assert_eq!(metadata.partition_columns(), &vec!["id".to_string()]);
1639
1640 Ok(())
1641 }
1642
1643 #[tokio::test]
1644 async fn test_overwrite_schema_can_remove_partition_columns() -> TestResult {
1645 let batch = get_record_batch(None, false);
1646 let table = modified_partitioned_table(&batch).await?;
1647
1648 let table = table
1649 .write(vec![batch])
1650 .with_save_mode(SaveMode::Overwrite)
1651 .with_schema_mode(SchemaMode::Overwrite)
1652 .with_partition_columns(std::iter::empty::<&str>())
1653 .await?;
1654
1655 assert_eq!(table.version(), Some(1));
1656 assert!(
1657 table
1658 .snapshot()
1659 .unwrap()
1660 .metadata()
1661 .partition_columns()
1662 .is_empty()
1663 );
1664
1665 let add_paths = table
1666 .snapshot()
1667 .unwrap()
1668 .log_data()
1669 .iter()
1670 .map(|add| add.path().into_owned())
1671 .collect::<Vec<_>>();
1672 assert!(!add_paths.is_empty());
1673 assert!(add_paths.iter().all(|path| !path.contains('/')));
1674
1675 Ok(())
1676 }
1677
1678 #[tokio::test]
1679 async fn test_append_rejects_partition_column_change() -> TestResult {
1680 let batch = get_record_batch(None, false);
1681 let table = modified_partitioned_table(&batch).await?;
1682
1683 let result = table
1684 .write(vec![batch])
1685 .with_save_mode(SaveMode::Append)
1686 .with_partition_columns(["id"])
1687 .await;
1688
1689 let err = result.expect_err("append should reject partition column change");
1690 match expect_write_error(&err) {
1691 WriteError::PartitionColumnMismatch { expected, got } => {
1692 assert_eq!(expected, &vec!["modified".to_string()]);
1693 assert_eq!(got, &vec!["id".to_string()]);
1694 }
1695 other => panic!("unexpected error: {other:?}"),
1696 }
1697 Ok(())
1698 }
1699
1700 #[tokio::test]
1701 async fn test_overwrite_without_schema_overwrite_rejects_partition_column_change() -> TestResult
1702 {
1703 let batch = get_record_batch(None, false);
1704 let table = modified_partitioned_table(&batch).await?;
1705
1706 let result = table
1707 .write(vec![batch])
1708 .with_save_mode(SaveMode::Overwrite)
1709 .with_partition_columns(["id"])
1710 .await;
1711
1712 assert!(matches!(result, Err(DeltaTableError::GenericError { .. })));
1713 Ok(())
1714 }
1715
1716 #[tokio::test]
1717 async fn test_replace_where_rejects_partition_column_change() -> TestResult {
1718 let batch = get_record_batch(None, false);
1719 let table = modified_partitioned_table(&batch).await?;
1720
1721 let result = table
1722 .write(vec![batch])
1723 .with_save_mode(SaveMode::Overwrite)
1724 .with_schema_mode(SchemaMode::Overwrite)
1725 .with_partition_columns(["id"])
1726 .with_replace_where(col("id").eq(lit("A")))
1727 .await;
1728
1729 assert!(matches!(result, Err(DeltaTableError::GenericError { .. })));
1730 Ok(())
1731 }
1732
1733 #[tokio::test]
1734 async fn test_overwrite_schema_preserves_partition_columns_when_omitted() -> TestResult {
1735 let batch = get_record_batch(None, false);
1736 let table = modified_partitioned_table(&batch).await?;
1737
1738 let table = table
1739 .write(vec![batch])
1740 .with_save_mode(SaveMode::Overwrite)
1741 .with_schema_mode(SchemaMode::Overwrite)
1742 .await?;
1743
1744 assert_eq!(
1745 table.snapshot().unwrap().metadata().partition_columns(),
1746 &vec!["modified".to_string()]
1747 );
1748
1749 Ok(())
1750 }
1751
1752 #[tokio::test]
1753 async fn test_overwrite_schema_rejects_missing_new_partition_column() -> TestResult {
1754 let batch = get_record_batch(None, false);
1755 let table = modified_partitioned_table(&batch).await?;
1756
1757 let result = table
1758 .write(vec![batch])
1759 .with_save_mode(SaveMode::Overwrite)
1760 .with_schema_mode(SchemaMode::Overwrite)
1761 .with_partition_columns(["missing_partition"])
1762 .await;
1763
1764 let err = result.expect_err("missing partition column should fail");
1765 match expect_write_error(&err) {
1766 WriteError::MissingPartitionColumns { columns } => {
1767 assert_eq!(columns, &vec!["missing_partition".to_string()]);
1768 }
1769 other => panic!("unexpected error: {other:?}"),
1770 }
1771
1772 Ok(())
1773 }
1774
1775 #[tokio::test]
1776 async fn test_overwrite_check() {
1777 let batch = get_record_batch(None, false);
1779 let table = DeltaTable::new_in_memory()
1780 .write(vec![batch.clone()])
1781 .with_save_mode(SaveMode::ErrorIfExists)
1782 .await
1783 .unwrap();
1784 assert_eq!(table.version(), Some(0));
1785 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1786 assert_common_write_metrics(write_metrics);
1787
1788 let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1789
1790 new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1791 let new_schema = new_schema_builder.finish();
1792 let new_fields = new_schema.fields();
1793 let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1794 assert_eq!(new_names, vec!["inserted_by"]);
1795 let inserted_by = StringArray::from(vec![
1796 Some("A1"),
1797 Some("B1"),
1798 None,
1799 Some("B2"),
1800 Some("A3"),
1801 Some("A4"),
1802 None,
1803 None,
1804 Some("B4"),
1805 Some("A5"),
1806 Some("A7"),
1807 ]);
1808 let new_batch =
1809 RecordBatch::try_new(Arc::new(new_schema), vec![Arc::new(inserted_by)]).unwrap();
1810
1811 let table = table
1812 .write(vec![new_batch])
1813 .with_save_mode(SaveMode::Append)
1814 .await;
1815 assert!(table.is_err());
1816 }
1817
1818 #[tokio::test]
1819 async fn test_check_invariants() {
1820 let batch = get_record_batch(None, false);
1821 let schema: StructType = serde_json::from_value(json!({
1822 "type": "struct",
1823 "fields": [
1824 {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1825 {"name": "value", "type": "integer", "nullable": true, "metadata": {
1826 "delta.invariants": "{\"expression\": { \"expression\": \"value < 12\"} }"
1827 }},
1828 {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1829 ]
1830 }))
1831 .unwrap();
1832 let table = DeltaTable::new_in_memory()
1833 .create()
1834 .with_save_mode(SaveMode::ErrorIfExists)
1835 .with_columns(schema.fields().cloned())
1836 .await
1837 .unwrap();
1838 assert_eq!(table.version(), Some(0));
1839
1840 let table = table.write(vec![batch.clone()]).await.unwrap();
1841 assert_eq!(table.version(), Some(1));
1842 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1843 assert_common_write_metrics(write_metrics);
1844
1845 let schema: StructType = serde_json::from_value(json!({
1846 "type": "struct",
1847 "fields": [
1848 {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1849 {"name": "value", "type": "integer", "nullable": true, "metadata": {
1850 "delta.invariants": "{\"expression\": { \"expression\": \"value < 6\"} }"
1851 }},
1852 {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1853 ]
1854 }))
1855 .unwrap();
1856 let table = DeltaTable::new_in_memory()
1857 .create()
1858 .with_save_mode(SaveMode::ErrorIfExists)
1859 .with_columns(schema.fields().cloned())
1860 .await
1861 .unwrap();
1862 assert_eq!(table.version(), Some(0));
1863
1864 let table = table.write(vec![batch.clone()]).await;
1865 assert!(table.is_err());
1866 }
1867
1868 #[tokio::test]
1869 async fn test_nested_struct() {
1870 let table_schema = get_delta_schema_with_nested_struct();
1871 let batch = get_record_batch_with_nested_struct();
1872
1873 let table = DeltaTable::new_in_memory()
1874 .create()
1875 .with_columns(table_schema.fields().cloned())
1876 .await
1877 .unwrap();
1878 assert_eq!(table.version(), Some(0));
1879
1880 let table = table
1881 .write(vec![batch.clone()])
1882 .with_save_mode(SaveMode::Append)
1883 .await
1884 .unwrap();
1885 assert_eq!(table.version(), Some(1));
1886 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1887 assert_common_write_metrics(write_metrics);
1888
1889 let actual = get_data(&table).await;
1890 let expected = DataType::Struct(Fields::from(vec![Field::new(
1891 "count",
1892 DataType::Int32,
1893 true,
1894 )]));
1895 assert_eq!(
1896 actual[0].column_by_name("nested").unwrap().data_type(),
1897 &expected
1898 );
1899 }
1900
1901 #[tokio::test]
1902 async fn test_special_characters_write_read() {
1903 let tmp_dir = tempfile::tempdir().unwrap();
1904 let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
1905
1906 let schema = Arc::new(ArrowSchema::new(vec![
1907 Field::new("string", DataType::Utf8, true),
1908 Field::new("data", DataType::Utf8, true),
1909 ]));
1910
1911 let str_values = StringArray::from(vec![r#"$%&/()=^"[]#*?._- {=}|`<>~/\r\n+"#]);
1912 let data_values = StringArray::from(vec!["test"]);
1913
1914 let batch = RecordBatch::try_new(schema, vec![Arc::new(str_values), Arc::new(data_values)])
1915 .unwrap();
1916
1917 let ops = DeltaTable::try_from_url(
1918 ensure_table_uri(tmp_path.as_os_str().to_str().unwrap()).unwrap(),
1919 )
1920 .await
1921 .unwrap();
1922
1923 let table = ops
1924 .write([batch.clone()])
1925 .with_partition_columns(["string"])
1926 .await
1927 .unwrap();
1928 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1929 assert_common_write_metrics(write_metrics);
1930
1931 let table_uri = url::Url::from_directory_path(&tmp_path).unwrap();
1932 let table = crate::open_table(table_uri).await.unwrap();
1933 let (_table, stream) = table.scan_table().await.unwrap();
1934 let data: Vec<RecordBatch> = collect_sendable_stream(stream).await.unwrap();
1935
1936 let expected = vec![
1937 r#"+----------------------------------+------+"#,
1938 r#"| string | data |"#,
1939 r#"+----------------------------------+------+"#,
1940 r#"| $%&/()=^"[]#*?._- {=}|`<>~/\r\n+ | test |"#,
1941 r#"+----------------------------------+------+"#,
1942 ];
1943
1944 assert_batches_eq!(&expected, &data);
1945 }
1946
1947 #[tokio::test]
1948 async fn test_replace_where() {
1949 let schema = get_arrow_schema(&None);
1950
1951 let batch = RecordBatch::try_new(
1952 Arc::clone(&schema),
1953 vec![
1954 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
1955 Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
1956 Arc::new(arrow::array::StringArray::from(vec![
1957 "2021-02-02",
1958 "2021-02-03",
1959 "2021-02-02",
1960 "2021-02-04",
1961 ])),
1962 ],
1963 )
1964 .unwrap();
1965
1966 let table = DeltaTable::new_in_memory()
1967 .write(vec![batch])
1968 .with_save_mode(SaveMode::Append)
1969 .await
1970 .unwrap();
1971 assert_eq!(table.version(), Some(0));
1972 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1973 assert_eq!(write_metrics.num_added_rows, 4);
1974 assert_common_write_metrics(write_metrics);
1975
1976 let batch_add = RecordBatch::try_new(
1977 Arc::clone(&schema),
1978 vec![
1979 Arc::new(arrow::array::StringArray::from(vec!["C"])),
1980 Arc::new(arrow::array::Int32Array::from(vec![50])),
1981 Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])),
1982 ],
1983 )
1984 .unwrap();
1985
1986 let table = table
1987 .write(vec![batch_add])
1988 .with_save_mode(SaveMode::Overwrite)
1989 .with_replace_where(col("id").eq(lit("C")))
1990 .await
1991 .unwrap();
1992 assert_eq!(table.version(), Some(1));
1993 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1994 assert_eq!(write_metrics.num_added_rows, 1);
1995 assert_common_write_metrics(write_metrics);
1996
1997 let expected = [
1998 "+----+-------+------------+",
1999 "| id | value | modified |",
2000 "+----+-------+------------+",
2001 "| A | 0 | 2021-02-02 |",
2002 "| B | 20 | 2021-02-03 |",
2003 "| C | 50 | 2023-01-01 |",
2004 "+----+-------+------------+",
2005 ];
2006 let actual = get_data(&table).await;
2007 assert_batches_sorted_eq!(&expected, &actual);
2008 }
2009
2010 #[tokio::test]
2011 async fn test_replace_where_fail_not_matching_predicate() {
2012 let schema = get_arrow_schema(&None);
2013 let batch = RecordBatch::try_new(
2014 Arc::clone(&schema),
2015 vec![
2016 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
2017 Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
2018 Arc::new(arrow::array::StringArray::from(vec![
2019 "2021-02-02",
2020 "2021-02-03",
2021 "2021-02-02",
2022 "2021-02-04",
2023 ])),
2024 ],
2025 )
2026 .unwrap();
2027
2028 let table = DeltaTable::new_in_memory()
2029 .write(vec![batch])
2030 .with_save_mode(SaveMode::Append)
2031 .await
2032 .unwrap();
2033 assert_eq!(table.version(), Some(0));
2034 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2035 assert_common_write_metrics(write_metrics);
2036
2037 let table_logstore = table.log_store.clone();
2040 let table_state = table.state.clone().unwrap();
2041
2042 let batch_fail = RecordBatch::try_new(
2044 Arc::clone(&schema),
2045 vec![
2046 Arc::new(arrow::array::StringArray::from(vec!["D"])),
2047 Arc::new(arrow::array::Int32Array::from(vec![1000])),
2048 Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])),
2049 ],
2050 )
2051 .unwrap();
2052
2053 let table = table
2054 .write(vec![batch_fail])
2055 .with_save_mode(SaveMode::Overwrite)
2056 .with_replace_where(col("id").eq(lit("C")))
2057 .await;
2058 assert!(table.is_err());
2059
2060 let table = DeltaTable::new_with_state(table_logstore, table_state);
2062 assert_eq!(table.get_latest_version().await.unwrap(), 0);
2063 }
2064
2065 #[tokio::test]
2066 async fn test_replace_where_no_matching_files_still_validates_input() {
2067 let schema = get_arrow_schema(&None);
2068 let batch = RecordBatch::try_new(
2069 Arc::clone(&schema),
2070 vec![
2071 Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
2072 Arc::new(arrow::array::Int32Array::from(vec![10, 20])),
2073 Arc::new(arrow::array::StringArray::from(vec![
2074 "2021-02-02",
2075 "2021-02-03",
2076 ])),
2077 ],
2078 )
2079 .unwrap();
2080
2081 let table = DeltaTable::new_in_memory()
2082 .write(vec![batch])
2083 .with_save_mode(SaveMode::Append)
2084 .await
2085 .unwrap();
2086 assert_eq!(table.version(), Some(0));
2087
2088 let table_logstore = table.log_store();
2089 let table_state = table.state.clone().unwrap();
2090
2091 let batch_fail = RecordBatch::try_new(
2092 Arc::clone(&schema),
2093 vec![
2094 Arc::new(arrow::array::StringArray::from(vec!["D"])),
2095 Arc::new(arrow::array::Int32Array::from(vec![1000])),
2096 Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])),
2097 ],
2098 )
2099 .unwrap();
2100
2101 let result = table
2102 .write(vec![batch_fail])
2103 .with_save_mode(SaveMode::Overwrite)
2104 .with_replace_where(col("id").eq(lit("Z")))
2105 .await;
2106 assert!(result.is_err());
2107
2108 let table = DeltaTable::new_with_state(table_logstore, table_state);
2109 assert_eq!(table.get_latest_version().await.unwrap(), 0);
2110 }
2111
2112 #[tokio::test]
2113 async fn test_write_preserves_user_insert_marker_column_outside_rewrite() {
2114 let schema = Arc::new(ArrowSchema::new(vec![
2115 Field::new("id", DataType::Utf8, true),
2116 Field::new("value", DataType::Int32, true),
2117 Field::new("modified", DataType::Utf8, true),
2118 Field::new(
2119 super::plan::WRITE_INSERT_MARKER_COLUMN,
2120 DataType::Boolean,
2121 true,
2122 ),
2123 ]));
2124
2125 let batch = RecordBatch::try_new(
2126 schema,
2127 vec![
2128 Arc::new(StringArray::from(vec![Some("A"), Some("B"), Some("C")])),
2129 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2130 Arc::new(StringArray::from(vec![
2131 Some("2021-02-02"),
2132 Some("2021-02-03"),
2133 Some("2021-02-04"),
2134 ])),
2135 Arc::new(arrow::array::BooleanArray::from(vec![
2136 Some(false),
2137 Some(true),
2138 Some(false),
2139 ])),
2140 ],
2141 )
2142 .unwrap();
2143
2144 let table = DeltaTable::new_in_memory()
2145 .write(vec![batch])
2146 .with_save_mode(SaveMode::Append)
2147 .await
2148 .unwrap();
2149
2150 let actual = get_data_sorted(
2151 &table,
2152 format!(
2153 "id,value,modified,{}",
2154 super::plan::WRITE_INSERT_MARKER_COLUMN
2155 )
2156 .as_str(),
2157 )
2158 .await;
2159 assert_batches_sorted_eq!(
2160 &[
2161 "+----+-------+------------+-------------------------+",
2162 "| id | value | modified | __delta_rs_write_insert |",
2163 "+----+-------+------------+-------------------------+",
2164 "| A | 1 | 2021-02-02 | false |",
2165 "| B | 2 | 2021-02-03 | true |",
2166 "| C | 3 | 2021-02-04 | false |",
2167 "+----+-------+------------+-------------------------+",
2168 ],
2169 &actual
2170 );
2171 }
2172
2173 #[tokio::test]
2174 async fn test_replace_where_preserves_user_insert_marker_column() {
2175 let schema = Arc::new(ArrowSchema::new(vec![
2176 Field::new("id", DataType::Utf8, true),
2177 Field::new("value", DataType::Int32, true),
2178 Field::new("modified", DataType::Utf8, true),
2179 Field::new(
2180 super::plan::WRITE_INSERT_MARKER_COLUMN,
2181 DataType::Boolean,
2182 true,
2183 ),
2184 ]));
2185
2186 let batch = RecordBatch::try_new(
2187 Arc::clone(&schema),
2188 vec![
2189 Arc::new(StringArray::from(vec![Some("A"), Some("B"), Some("C")])),
2190 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2191 Arc::new(StringArray::from(vec![
2192 Some("2021-02-02"),
2193 Some("2021-02-03"),
2194 Some("2021-02-04"),
2195 ])),
2196 Arc::new(arrow::array::BooleanArray::from(vec![
2197 Some(false),
2198 Some(false),
2199 Some(true),
2200 ])),
2201 ],
2202 )
2203 .unwrap();
2204
2205 let table = DeltaTable::new_in_memory()
2206 .write(vec![batch])
2207 .with_save_mode(SaveMode::Append)
2208 .await
2209 .unwrap();
2210
2211 let replacement_batch = RecordBatch::try_new(
2212 schema,
2213 vec![
2214 Arc::new(StringArray::from(vec![Some("C")])),
2215 Arc::new(Int32Array::from(vec![Some(3)])),
2216 Arc::new(StringArray::from(vec![Some("2023-01-01")])),
2217 Arc::new(arrow::array::BooleanArray::from(vec![Some(false)])),
2218 ],
2219 )
2220 .unwrap();
2221
2222 let table = table
2223 .write(vec![replacement_batch])
2224 .with_save_mode(SaveMode::Overwrite)
2225 .with_replace_where(col("value").eq(lit(3)))
2226 .await
2227 .expect("replaceWhere should preserve user columns named like internal markers");
2228
2229 let actual = get_data_sorted(
2230 &table,
2231 format!(
2232 "id,value,modified,{}",
2233 super::plan::WRITE_INSERT_MARKER_COLUMN
2234 )
2235 .as_str(),
2236 )
2237 .await;
2238 assert_batches_sorted_eq!(
2239 &[
2240 "+----+-------+------------+-------------------------+",
2241 "| id | value | modified | __delta_rs_write_insert |",
2242 "+----+-------+------------+-------------------------+",
2243 "| A | 1 | 2021-02-02 | false |",
2244 "| B | 2 | 2021-02-03 | false |",
2245 "| C | 3 | 2023-01-01 | false |",
2246 "+----+-------+------------+-------------------------+",
2247 ],
2248 &actual
2249 );
2250 }
2251
2252 #[tokio::test]
2253 async fn test_replace_where_merge_schema_rescues_existing_rows() -> TestResult {
2254 let base_schema = Arc::new(ArrowSchema::new(vec![
2255 Field::new("id", DataType::Utf8, true),
2256 Field::new("value", DataType::Int32, true),
2257 Field::new("modified", DataType::Utf8, true),
2258 ]));
2259 let base_batch = RecordBatch::try_new(
2260 Arc::clone(&base_schema),
2261 vec![
2262 Arc::new(StringArray::from(vec![Some("A"), Some("B"), Some("C")])),
2263 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2264 Arc::new(StringArray::from(vec![
2265 Some("2021-02-02"),
2266 Some("2021-02-03"),
2267 Some("2021-02-04"),
2268 ])),
2269 ],
2270 )?;
2271
2272 let table = DeltaTable::new_in_memory()
2273 .write(vec![base_batch])
2274 .with_save_mode(SaveMode::Append)
2275 .await?;
2276
2277 let merge_schema = Arc::new(ArrowSchema::new(vec![
2278 Field::new("id", DataType::Utf8, true),
2279 Field::new("value", DataType::Int32, true),
2280 Field::new("modified", DataType::Utf8, true),
2281 Field::new("inserted_by", DataType::Utf8, true),
2282 ]));
2283 let replacement_batch = RecordBatch::try_new(
2284 merge_schema,
2285 vec![
2286 Arc::new(StringArray::from(vec![Some("C")])),
2287 Arc::new(Int32Array::from(vec![Some(3)])),
2288 Arc::new(StringArray::from(vec![Some("2023-01-01")])),
2289 Arc::new(StringArray::from(vec![Some("rewrite")])),
2290 ],
2291 )?;
2292
2293 let table = table
2294 .write(vec![replacement_batch])
2295 .with_save_mode(SaveMode::Overwrite)
2296 .with_schema_mode(SchemaMode::Merge)
2297 .with_replace_where(col("value").eq(lit(3)))
2298 .await?;
2299
2300 let actual = get_data_sorted(&table, "id,value,modified,inserted_by").await;
2301 assert_batches_sorted_eq!(
2302 &[
2303 "+----+-------+------------+-------------+",
2304 "| id | value | modified | inserted_by |",
2305 "+----+-------+------------+-------------+",
2306 "| A | 1 | 2021-02-02 | |",
2307 "| B | 2 | 2021-02-03 | |",
2308 "| C | 3 | 2023-01-01 | rewrite |",
2309 "+----+-------+------------+-------------+",
2310 ],
2311 &actual
2312 );
2313
2314 Ok(())
2315 }
2316
2317 fn mixed_case_replace_where_batches() -> TestResult<(Arc<ArrowSchema>, RecordBatch, RecordBatch)>
2318 {
2319 let schema = Arc::new(ArrowSchema::new(vec![
2320 Field::new("utcDate", DataType::Utf8, true),
2321 Field::new("homeTeam", DataType::Utf8, true),
2322 Field::new("score", DataType::Utf8, true),
2323 ]));
2324 let base_batch = RecordBatch::try_new(
2325 Arc::clone(&schema),
2326 vec![
2327 Arc::new(StringArray::from(vec![
2328 Some("2008-08-16T15:00:00Z"),
2329 Some("2009-05-16T15:00:00Z"),
2330 ])),
2331 Arc::new(StringArray::from(vec![Some("Everton"), Some("Everton")])),
2332 Arc::new(StringArray::from(vec![Some("0-1"), Some("3-1")])),
2333 ],
2334 )?;
2335 let replacement_batch = RecordBatch::try_new(
2336 Arc::clone(&schema),
2337 vec![
2338 Arc::new(StringArray::from(vec![Some("2010-01-01T15:00:00Z")])),
2339 Arc::new(StringArray::from(vec![Some("Everton")])),
2340 Arc::new(StringArray::from(vec![Some("0-1")])),
2341 ],
2342 )?;
2343
2344 Ok((schema, base_batch, replacement_batch))
2345 }
2346
2347 #[tokio::test]
2348 async fn test_replace_where_preserves_mixed_case_columns_when_rescuing_rows() -> TestResult {
2349 let (_, base_batch, replacement_batch) = mixed_case_replace_where_batches()?;
2350
2351 let table = DeltaTable::new_in_memory()
2352 .write(vec![base_batch])
2353 .with_save_mode(SaveMode::Append)
2354 .await?;
2355
2356 let table = table
2357 .write(vec![replacement_batch])
2358 .with_save_mode(SaveMode::Overwrite)
2359 .with_schema_mode(SchemaMode::Overwrite)
2360 .with_replace_where(col("score").eq(lit("0-1")))
2361 .await?;
2362
2363 let actual = get_data_sorted(&table, r#""utcDate","homeTeam",score"#).await;
2364 assert_batches_sorted_eq!(
2365 &[
2366 "+----------------------+----------+-------+",
2367 "| utcDate | homeTeam | score |",
2368 "+----------------------+----------+-------+",
2369 "| 2009-05-16T15:00:00Z | Everton | 3-1 |",
2370 "| 2010-01-01T15:00:00Z | Everton | 0-1 |",
2371 "+----------------------+----------+-------+",
2372 ],
2373 &actual
2374 );
2375
2376 Ok(())
2377 }
2378
2379 #[tokio::test]
2380 async fn test_replace_where_preserves_live_rows_with_deletion_vectors() -> TestResult {
2381 let (_temp_dir, table) = open_copied_table_fixture(
2382 &crate::test_utils::TestTables::WithDvSmall.as_path(),
2383 "table-with-dv-small",
2384 )
2385 .await?;
2386
2387 let source_files = table
2388 .get_active_add_actions_by_partitions(&[])
2389 .try_collect::<Vec<_>>()
2390 .await?;
2391 assert_eq!(source_files.len(), 1);
2392 let source_path = source_files[0].path().to_string();
2393 let source_deletion_vector = source_files[0].deletion_vector_descriptor();
2394 assert!(
2395 source_deletion_vector.is_some(),
2396 "expected DV-backed source file"
2397 );
2398 assert_eq!(
2399 query_i32_rows(&table, "SELECT value FROM test ORDER BY value", "value").await?,
2400 vec![1, 2, 3, 4, 5, 6, 7, 8]
2401 );
2402
2403 let replacement_batch = RecordBatch::try_new(
2404 Arc::new(ArrowSchema::new(vec![Field::new(
2405 "value",
2406 DataType::Int32,
2407 true,
2408 )])),
2409 vec![Arc::new(Int32Array::from(vec![Some(50)]))],
2410 )?;
2411
2412 let table = table
2413 .write(vec![replacement_batch])
2414 .with_save_mode(SaveMode::Overwrite)
2415 .with_replace_where("value = 5 OR value = 50")
2416 .await?;
2417 assert_eq!(table.version(), Some(2));
2418
2419 assert_eq!(
2420 query_i32_rows(&table, "SELECT value FROM test ORDER BY value", "value").await?,
2421 vec![1, 2, 3, 4, 6, 7, 8, 50]
2422 );
2423
2424 let remove_actions = latest_remove_actions(&table).await?;
2425
2426 assert_eq!(remove_actions.len(), 1);
2427 let remove = &remove_actions[0];
2428 assert_eq!(remove.path, source_path);
2429 assert_eq!(remove.deletion_vector, source_deletion_vector);
2430
2431 Ok(())
2432 }
2433
2434 #[tokio::test]
2435 async fn test_replace_where_rewrites_multiple_files_with_deletion_vectors() -> TestResult {
2436 let (_temp_dir, table) = open_copied_table_fixture(
2437 &crate::test_utils::TestTables::WithDvSmall.as_path(),
2438 "table-with-dv-small",
2439 )
2440 .await?;
2441
2442 let source_files = table
2443 .get_active_add_actions_by_partitions(&[])
2444 .try_collect::<Vec<_>>()
2445 .await?;
2446 assert_eq!(source_files.len(), 1);
2447 let dv_source = source_files
2448 .into_iter()
2449 .next()
2450 .expect("expected DV-backed source file");
2451 assert!(
2452 dv_source.deletion_vector_descriptor().is_some(),
2453 "expected DV-backed source file"
2454 );
2455
2456 let append_batch = RecordBatch::try_new(
2457 Arc::new(ArrowSchema::new(vec![Field::new(
2458 "value",
2459 DataType::Int32,
2460 true,
2461 )])),
2462 vec![Arc::new(Int32Array::from(vec![Some(0), Some(9)]))],
2463 )?;
2464
2465 let table = table
2466 .write(vec![append_batch])
2467 .with_save_mode(SaveMode::Append)
2468 .await?;
2469
2470 let source_files = table
2471 .get_active_add_actions_by_partitions(&[])
2472 .try_collect::<Vec<_>>()
2473 .await?;
2474 assert_eq!(source_files.len(), 2);
2475 let appended_source = source_files
2476 .iter()
2477 .find(|file| file.path() != dv_source.path())
2478 .expect("expected appended source file");
2479 assert!(
2480 appended_source.deletion_vector_descriptor().is_none(),
2481 "expected appended source without DV metadata"
2482 );
2483
2484 let replacement_batch = RecordBatch::try_new(
2485 Arc::new(ArrowSchema::new(vec![Field::new(
2486 "value",
2487 DataType::Int32,
2488 true,
2489 )])),
2490 vec![Arc::new(Int32Array::from(vec![Some(50)]))],
2491 )?;
2492
2493 let table = table
2494 .write(vec![replacement_batch])
2495 .with_save_mode(SaveMode::Overwrite)
2496 .with_replace_where("value >= 5")
2497 .await?;
2498 assert_eq!(table.version(), Some(3));
2499
2500 assert_eq!(
2501 query_i32_rows(&table, "SELECT value FROM test ORDER BY value", "value").await?,
2502 vec![0, 1, 2, 3, 4, 50]
2503 );
2504
2505 let remove_actions = latest_remove_actions(&table).await?;
2506
2507 assert_eq!(remove_actions.len(), 2);
2508 assert!(
2509 remove_actions.iter().any(|remove| {
2510 remove.path == dv_source.path()
2511 && remove.deletion_vector == dv_source.deletion_vector_descriptor()
2512 }),
2513 "expected tombstone for DV-backed source file"
2514 );
2515 assert!(
2516 remove_actions.iter().any(|remove| {
2517 remove.path == appended_source.path() && remove.deletion_vector.is_none()
2518 }),
2519 "expected tombstone for appended non-DV source file"
2520 );
2521
2522 Ok(())
2523 }
2524
2525 #[tokio::test]
2526 async fn test_replace_where_real_world_deletion_logs_preserve_live_rows() -> TestResult {
2527 let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
2528 .join("../test/tests/data/table_with_deletion_logs");
2529 let (_temp_dir, table) =
2530 open_copied_table_fixture(&fixture_path, "table_with_deletion_logs").await?;
2531
2532 let source_files = table
2533 .get_active_add_actions_by_partitions(&[])
2534 .try_collect::<Vec<_>>()
2535 .await?;
2536 let dv_sources = source_files
2537 .iter()
2538 .filter_map(|file| {
2539 file.deletion_vector_descriptor()
2540 .map(|descriptor| (file.path().to_string(), descriptor))
2541 })
2542 .collect::<std::collections::HashMap<_, _>>();
2543 assert!(
2544 !dv_sources.is_empty(),
2545 "expected at least one active DV-backed source file"
2546 );
2547
2548 let initial_counts = query_single_i64_row(
2549 &table,
2550 "SELECT \
2551 SUM(CASE WHEN id < 100 THEN 1 ELSE 0 END) AS matching_rows, \
2552 SUM(CASE WHEN id >= 100 THEN 1 ELSE 0 END) AS preserved_rows \
2553 FROM test",
2554 )
2555 .await?;
2556 let matching_rows = initial_counts[0];
2557 let preserved_rows = initial_counts[1];
2558 assert!(matching_rows > 0, "expected fixture rows matching id < 100");
2559 assert!(preserved_rows > 0, "expected fixture rows with id >= 100");
2560
2561 let replacement_batch = RecordBatch::try_new(
2562 Arc::new(ArrowSchema::new(vec![
2563 Field::new("address", DataType::Utf8, true),
2564 Field::new("age", DataType::Float64, true),
2565 Field::new("company", DataType::Utf8, true),
2566 Field::new("id", DataType::Int64, true),
2567 Field::new("name", DataType::Utf8, true),
2568 Field::new("nbr", DataType::Int64, true),
2569 Field::new("phone_number", DataType::Utf8, true),
2570 ])),
2571 vec![
2572 Arc::new(StringArray::from(vec![Some("Replacement Ave")])),
2573 Arc::new(Float64Array::from(vec![Some(42.0)])),
2574 Arc::new(StringArray::from(vec![Some("delta-rs")])),
2575 Arc::new(Int64Array::from(vec![Some(42)])),
2576 Arc::new(StringArray::from(vec![Some("replacement")])),
2577 Arc::new(Int64Array::from(vec![Some(4242)])),
2578 Arc::new(StringArray::from(vec![Some("555-4242")])),
2579 ],
2580 )?;
2581
2582 let table = table
2583 .write(vec![replacement_batch])
2584 .with_save_mode(SaveMode::Overwrite)
2585 .with_replace_where("id < 100")
2586 .await?;
2587
2588 let final_counts = query_single_i64_row(
2589 &table,
2590 "SELECT \
2591 COUNT(*) AS total_rows, \
2592 SUM(CASE WHEN id < 100 THEN 1 ELSE 0 END) AS matching_rows, \
2593 SUM(CASE WHEN id >= 100 THEN 1 ELSE 0 END) AS preserved_rows, \
2594 SUM(CASE WHEN id = 42 AND name = 'replacement' THEN 1 ELSE 0 END) AS replacement_rows \
2595 FROM test",
2596 )
2597 .await?;
2598
2599 assert_eq!(final_counts[0], preserved_rows + 1);
2600 assert_eq!(final_counts[1], 1);
2601 assert_eq!(final_counts[2], preserved_rows);
2602 assert_eq!(final_counts[3], 1);
2603
2604 let remove_actions = latest_remove_actions(&table).await?;
2605
2606 assert!(
2607 remove_actions.iter().any(|remove| {
2608 dv_sources
2609 .get(&remove.path)
2610 .is_some_and(|descriptor| remove.deletion_vector.as_ref() == Some(descriptor))
2611 }),
2612 "expected at least one DV-backed tombstone preserving its deletion vector"
2613 );
2614
2615 Ok(())
2616 }
2617
2618 #[tokio::test]
2619 async fn test_overwrite_without_files_is_rejected() -> TestResult {
2620 let temp_dir = tempfile::tempdir()?;
2621 let table_path = temp_dir.path().join("without_files_overwrite");
2622 std::fs::create_dir(&table_path)?;
2623 let table_uri = ensure_table_uri(table_path.to_str().unwrap())?;
2624
2625 DeltaTable::try_from_url(table_uri.clone())
2626 .await?
2627 .write(vec![get_record_batch(None, false)])
2628 .await?;
2629
2630 let table = crate::DeltaTableBuilder::from_url(table_uri)?
2631 .without_files()
2632 .load()
2633 .await?;
2634
2635 assert_eq!(table.version(), Some(0));
2636
2637 let err = table
2640 .write(vec![get_record_batch(None, false)])
2641 .with_save_mode(SaveMode::Overwrite)
2642 .await
2643 .expect_err("overwrite should fail when table was loaded without files");
2644
2645 assert!(matches!(
2646 err,
2647 DeltaTableError::NotInitializedWithFiles(operation) if operation == "WRITE"
2648 ));
2649
2650 Ok(())
2651 }
2652
2653 #[tokio::test]
2654 async fn test_replace_where_partitioned() {
2655 let schema = get_arrow_schema(&None);
2656
2657 let batch = get_record_batch(None, false);
2658
2659 let table = DeltaTable::new_in_memory()
2660 .write(vec![batch])
2661 .with_partition_columns(["id", "value"])
2662 .with_save_mode(SaveMode::Append)
2663 .await
2664 .unwrap();
2665 assert_eq!(table.version(), Some(0));
2666 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2667 assert_common_write_metrics(write_metrics);
2668
2669 let batch_add = RecordBatch::try_new(
2670 Arc::clone(&schema),
2671 vec![
2672 Arc::new(arrow::array::StringArray::from(vec!["A", "A", "A"])),
2673 Arc::new(arrow::array::Int32Array::from(vec![11, 13, 15])),
2674 Arc::new(arrow::array::StringArray::from(vec![
2675 "2024-02-02",
2676 "2024-02-02",
2677 "2024-02-01",
2678 ])),
2679 ],
2680 )
2681 .unwrap();
2682
2683 let table = table
2684 .write(vec![batch_add])
2685 .with_save_mode(SaveMode::Overwrite)
2686 .with_replace_where(col("id").eq(lit("A")))
2687 .await
2688 .unwrap();
2689 assert_eq!(table.version(), Some(1));
2690 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2691 assert_eq!(write_metrics.num_added_rows, 3);
2692 assert_common_write_metrics(write_metrics);
2693
2694 let expected = [
2695 "+----+-------+------------+",
2696 "| id | value | modified |",
2697 "+----+-------+------------+",
2698 "| A | 11 | 2024-02-02 |",
2699 "| A | 13 | 2024-02-02 |",
2700 "| A | 15 | 2024-02-01 |",
2701 "| B | 2 | 2021-02-02 |",
2702 "| B | 4 | 2021-02-01 |",
2703 "| B | 8 | 2021-02-01 |",
2704 "| B | 9 | 2021-02-01 |",
2705 "+----+-------+------------+",
2706 ];
2707 let actual = get_data_sorted(&table, "id,value,modified").await;
2708 assert_batches_sorted_eq!(&expected, &actual);
2709 }
2710
2711 #[tokio::test]
2712 async fn test_dont_write_cdc_with_overwrite() -> TestResult {
2713 let delta_schema = TestSchemas::simple();
2714 let table: DeltaTable = DeltaTable::new_in_memory()
2715 .create()
2716 .with_columns(delta_schema.fields().cloned())
2717 .with_partition_columns(["id"])
2718 .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2719 .await
2720 .unwrap();
2721 assert_eq!(table.version(), Some(0));
2722
2723 let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
2724
2725 let batch = RecordBatch::try_new(
2726 Arc::clone(&schema),
2727 vec![
2728 Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
2729 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2730 Arc::new(StringArray::from(vec![
2731 Some("yes"),
2732 Some("yes"),
2733 Some("no"),
2734 ])),
2735 ],
2736 )
2737 .unwrap();
2738
2739 let second_batch = RecordBatch::try_new(
2740 Arc::clone(&schema),
2741 vec![
2742 Arc::new(StringArray::from(vec![Some("3")])),
2743 Arc::new(Int32Array::from(vec![Some(10)])),
2744 Arc::new(StringArray::from(vec![Some("yes")])),
2745 ],
2746 )
2747 .unwrap();
2748
2749 let table = table
2750 .write(vec![batch])
2751 .await
2752 .expect("Failed to write first batch");
2753 assert_eq!(table.version(), Some(1));
2754 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2755 assert_eq!(write_metrics.num_added_rows, 3);
2756 assert_common_write_metrics(write_metrics);
2757
2758 let table = table
2759 .write([second_batch])
2760 .with_save_mode(crate::protocol::SaveMode::Overwrite)
2761 .await
2762 .unwrap();
2763 assert_eq!(table.version(), Some(2));
2764 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2765 assert_eq!(write_metrics.num_added_rows, 1);
2766 assert!(write_metrics.num_removed_files > 0);
2767 assert_common_write_metrics(write_metrics);
2768
2769 let snapshot_bytes = table
2770 .log_store
2771 .read_commit_entry(2)
2772 .await?
2773 .expect("failed to get snapshot bytes");
2774 let version_actions = get_actions(2, &snapshot_bytes)?;
2775
2776 let cdc_actions = version_actions
2777 .iter()
2778 .filter(|action| matches!(action, &&Action::Cdc(_)))
2779 .collect_vec();
2780 assert!(cdc_actions.is_empty());
2781 Ok(())
2782 }
2783
2784 #[tokio::test]
2785 async fn test_dont_write_cdc_with_overwrite_predicate_partitioned() -> TestResult {
2786 let delta_schema = TestSchemas::simple();
2787 let table: DeltaTable = DeltaTable::new_in_memory()
2788 .create()
2789 .with_columns(delta_schema.fields().cloned())
2790 .with_partition_columns(["id"])
2791 .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2792 .await
2793 .unwrap();
2794 assert_eq!(table.version(), Some(0));
2795
2796 let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
2797
2798 let batch = RecordBatch::try_new(
2799 Arc::clone(&schema),
2800 vec![
2801 Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
2802 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2803 Arc::new(StringArray::from(vec![
2804 Some("yes"),
2805 Some("yes"),
2806 Some("no"),
2807 ])),
2808 ],
2809 )
2810 .unwrap();
2811
2812 let second_batch = RecordBatch::try_new(
2813 Arc::clone(&schema),
2814 vec![
2815 Arc::new(StringArray::from(vec![Some("3")])),
2816 Arc::new(Int32Array::from(vec![Some(10)])),
2817 Arc::new(StringArray::from(vec![Some("yes")])),
2818 ],
2819 )
2820 .unwrap();
2821
2822 let table = table
2823 .write(vec![batch])
2824 .await
2825 .expect("Failed to write first batch");
2826 assert_eq!(table.version(), Some(1));
2827 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2828 assert_eq!(write_metrics.num_added_rows, 3);
2829 assert_common_write_metrics(write_metrics);
2830
2831 let table = table
2832 .write([second_batch])
2833 .with_save_mode(crate::protocol::SaveMode::Overwrite)
2834 .with_replace_where("id='3'")
2835 .await
2836 .unwrap();
2837 assert_eq!(table.version(), Some(2));
2838 let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2839 assert_eq!(write_metrics.num_added_rows, 1);
2840 assert!(write_metrics.num_removed_files > 0);
2841 assert_common_write_metrics(write_metrics);
2842
2843 let snapshot_bytes = table
2844 .log_store
2845 .read_commit_entry(2)
2846 .await?
2847 .expect("failed to get snapshot bytes");
2848 let version_actions = get_actions(2, &snapshot_bytes)?;
2849
2850 let cdc_actions = version_actions
2851 .iter()
2852 .filter(|action| matches!(action, &&Action::Cdc(_)))
2853 .collect_vec();
2854 assert!(cdc_actions.is_empty());
2855 Ok(())
2856 }
2857
2858 #[tokio::test]
2859 async fn test_dont_write_cdc_with_overwrite_predicate_unpartitioned() -> TestResult {
2860 let delta_schema = TestSchemas::simple();
2861 let table: DeltaTable = DeltaTable::new_in_memory()
2862 .create()
2863 .with_columns(delta_schema.fields().cloned())
2864 .with_partition_columns(["id"])
2865 .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2866 .await
2867 .unwrap();
2868 assert_eq!(table.version(), Some(0));
2869
2870 let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
2871
2872 let batch = RecordBatch::try_new(
2873 Arc::clone(&schema),
2874 vec![
2875 Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
2876 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2877 Arc::new(StringArray::from(vec![
2878 Some("yes"),
2879 Some("yes"),
2880 Some("no"),
2881 ])),
2882 ],
2883 )
2884 .unwrap();
2885
2886 let second_batch = RecordBatch::try_new(
2887 Arc::clone(&schema),
2888 vec![
2889 Arc::new(StringArray::from(vec![Some("3")])),
2890 Arc::new(Int32Array::from(vec![Some(3)])),
2891 Arc::new(StringArray::from(vec![Some("yes")])),
2892 ],
2893 )
2894 .unwrap();
2895
2896 let table = table
2897 .write(vec![batch])
2898 .await
2899 .expect("Failed to write first batch");
2900 assert_eq!(table.version(), Some(1));
2901
2902 let table = table
2903 .write([second_batch])
2904 .with_save_mode(crate::protocol::SaveMode::Overwrite)
2905 .with_replace_where("value=3")
2906 .await
2907 .unwrap();
2908 assert_eq!(table.version(), Some(2));
2909
2910 let ctx = SessionContext::new();
2911 let cdf_scan = table
2912 .clone()
2913 .scan_cdf()
2914 .with_starting_version(0)
2915 .build(&ctx.state(), None)
2916 .await
2917 .expect("Failed to load CDF");
2918
2919 let mut batches = collect(cdf_scan, ctx.state().task_ctx())
2920 .await
2921 .expect("Failed to collect batches");
2922
2923 let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect();
2925
2926 assert_batches_sorted_eq! {[
2927 "+-------+----------+----+--------------+-----------------+",
2928 "| value | modified | id | _change_type | _commit_version |",
2929 "+-------+----------+----+--------------+-----------------+",
2930 "| 1 | yes | 1 | insert | 1 |",
2931 "| 2 | yes | 2 | insert | 1 |",
2932 "| 3 | no | 3 | delete | 2 |",
2933 "| 3 | no | 3 | insert | 1 |",
2934 "| 3 | yes | 3 | insert | 2 |",
2935 "+-------+----------+----+--------------+-----------------+",
2936 ], &batches }
2937
2938 let snapshot_bytes = table
2939 .log_store
2940 .read_commit_entry(2)
2941 .await?
2942 .expect("failed to get snapshot bytes");
2943 let version_actions = get_actions(2, &snapshot_bytes)?;
2944
2945 let cdc_actions = version_actions
2946 .iter()
2947 .filter(|action| matches!(action, &&Action::Cdc(_)))
2948 .collect_vec();
2949 assert!(!cdc_actions.is_empty());
2950 Ok(())
2951 }
2952
2953 #[tokio::test]
2954 async fn test_write_cdc_with_replace_where_preserves_mixed_case_columns() -> TestResult {
2955 let (schema, base_batch, replacement_batch) = mixed_case_replace_where_batches()?;
2956 let delta_schema: StructType = Arc::clone(&schema).try_into_kernel()?;
2957 let table: DeltaTable = DeltaTable::new_in_memory()
2958 .create()
2959 .with_columns(delta_schema.fields().cloned())
2960 .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2961 .await?;
2962 assert_eq!(table.version(), Some(0));
2963
2964 let table = table.write(vec![base_batch]).await?;
2965 assert_eq!(table.version(), Some(1));
2966
2967 let table = table
2968 .write(vec![replacement_batch])
2969 .with_save_mode(crate::protocol::SaveMode::Overwrite)
2970 .with_replace_where("score='0-1'")
2971 .await?;
2972 assert_eq!(table.version(), Some(2));
2973
2974 let ctx = SessionContext::new();
2975 let cdf_scan = table
2976 .clone()
2977 .scan_cdf()
2978 .with_starting_version(0)
2979 .build(&ctx.state(), None)
2980 .await
2981 .expect("Failed to load CDF");
2982 let mut batches = collect(cdf_scan, ctx.state().task_ctx())
2983 .await
2984 .expect("Failed to collect CDF batches");
2985
2986 let commit_timestamp_index = batches
2987 .first()
2988 .expect("expected CDF batches")
2989 .schema()
2990 .index_of("_commit_timestamp")
2991 .expect("expected CDF commit timestamp column");
2992 let _: Vec<_> = batches
2993 .iter_mut()
2994 .map(|batch| batch.remove_column(commit_timestamp_index))
2995 .collect();
2996
2997 assert_batches_sorted_eq! {[
2998 "+----------------------+----------+-------+--------------+-----------------+",
2999 "| utcDate | homeTeam | score | _change_type | _commit_version |",
3000 "+----------------------+----------+-------+--------------+-----------------+",
3001 "| 2008-08-16T15:00:00Z | Everton | 0-1 | delete | 2 |",
3002 "| 2008-08-16T15:00:00Z | Everton | 0-1 | insert | 1 |",
3003 "| 2009-05-16T15:00:00Z | Everton | 3-1 | insert | 1 |",
3004 "| 2010-01-01T15:00:00Z | Everton | 0-1 | insert | 2 |",
3005 "+----------------------+----------+-------+--------------+-----------------+",
3006 ], &batches }
3007
3008 Ok(())
3009 }
3010
3011 #[tokio::test]
3012 async fn test_write_cdc_with_overwrite_predicate_partitioned_parallel_input() -> TestResult {
3013 let delta_schema = TestSchemas::simple();
3014 let table: DeltaTable = DeltaTable::new_in_memory()
3015 .create()
3016 .with_columns(delta_schema.fields().cloned())
3017 .with_partition_columns(["id"])
3018 .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
3019 .await
3020 .unwrap();
3021 assert_eq!(table.version(), Some(0));
3022
3023 let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
3024
3025 let batch = RecordBatch::try_new(
3026 Arc::clone(&schema),
3027 vec![
3028 Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
3029 Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
3030 Arc::new(StringArray::from(vec![
3031 Some("yes"),
3032 Some("yes"),
3033 Some("no"),
3034 ])),
3035 ],
3036 )
3037 .unwrap();
3038
3039 let second_batch = RecordBatch::try_new(
3040 Arc::clone(&schema),
3041 vec![
3042 Arc::new(StringArray::from(vec![Some("3")])),
3043 Arc::new(Int32Array::from(vec![Some(3)])),
3044 Arc::new(StringArray::from(vec![Some("updated")])),
3045 ],
3046 )
3047 .unwrap();
3048
3049 let table = table
3050 .write(vec![batch])
3051 .await
3052 .expect("Failed to write first batch");
3053 assert_eq!(table.version(), Some(1));
3054
3055 let multi_stream_input: Arc<dyn TableProvider> = Arc::new(
3056 MemTable::try_new(
3057 second_batch.schema(),
3058 vec![
3059 vec![second_batch.clone()],
3060 vec![second_batch.clone()],
3061 vec![second_batch.clone()],
3062 ],
3063 )
3064 .unwrap(),
3065 );
3066 let multi_stream_plan =
3067 LogicalPlanBuilder::scan("source", provider_as_source(multi_stream_input), None)?
3068 .build()?;
3069
3070 let table = table
3071 .write(vec![])
3072 .with_input_plan(multi_stream_plan)
3073 .with_save_mode(crate::protocol::SaveMode::Overwrite)
3074 .with_replace_where("value=3")
3075 .await
3076 .unwrap();
3077 assert_eq!(table.version(), Some(2));
3078
3079 let snapshot_bytes = table
3080 .log_store
3081 .read_commit_entry(2)
3082 .await?
3083 .expect("failed to get snapshot bytes");
3084 let version_actions = get_actions(2, &snapshot_bytes)?;
3085
3086 let cdc_actions = version_actions
3087 .iter()
3088 .filter(|action| matches!(action, &&Action::Cdc(_)))
3089 .collect_vec();
3090 assert!(!cdc_actions.is_empty());
3091
3092 let ctx = SessionContext::new();
3093 let cdf_scan = table
3094 .clone()
3095 .scan_cdf()
3096 .with_starting_version(0)
3097 .build(&ctx.state(), None)
3098 .await
3099 .expect("Failed to load CDF");
3100 let mut batches = collect(cdf_scan, ctx.state().task_ctx())
3101 .await
3102 .expect("Failed to collect CDF batches");
3103
3104 let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect();
3106
3107 assert_batches_sorted_eq! {[
3108 "+-------+----------+----+--------------+-----------------+",
3109 "| value | modified | id | _change_type | _commit_version |",
3110 "+-------+----------+----+--------------+-----------------+",
3111 "| 1 | yes | 1 | insert | 1 |",
3112 "| 2 | yes | 2 | insert | 1 |",
3113 "| 3 | no | 3 | delete | 2 |",
3114 "| 3 | no | 3 | insert | 1 |",
3115 "| 3 | updated | 3 | insert | 2 |",
3116 "| 3 | updated | 3 | insert | 2 |",
3117 "| 3 | updated | 3 | insert | 2 |",
3118 "+-------+----------+----+--------------+-----------------+",
3119 ], &batches }
3120
3121 let expected_table = [
3122 "+-------+----------+----+",
3123 "| value | modified | id |",
3124 "+-------+----------+----+",
3125 "| 1 | yes | 1 |",
3126 "| 2 | yes | 2 |",
3127 "| 3 | updated | 3 |",
3128 "| 3 | updated | 3 |",
3129 "| 3 | updated | 3 |",
3130 "+-------+----------+----+",
3131 ];
3132 let actual_table = get_data_sorted(&table, "value, modified, id").await;
3133 assert_batches_sorted_eq!(&expected_table, &actual_table);
3134 Ok(())
3135 }
3136
3137 mod check_preconditions_test {
3140 use super::*;
3141
3142 #[tokio::test]
3143 async fn test_schema_overwrite_on_append() -> DeltaResult<()> {
3144 let table_schema = get_delta_schema();
3145 let batch = get_record_batch(None, false);
3146 let table = DeltaTable::new_in_memory()
3147 .create()
3148 .with_columns(table_schema.fields().cloned())
3149 .await?;
3150 let writer = table
3151 .write(vec![batch])
3152 .with_schema_mode(SchemaMode::Overwrite)
3153 .with_save_mode(SaveMode::Append);
3154
3155 let check = writer.check_preconditions().await;
3156 assert!(check.is_err());
3157 Ok(())
3158 }
3159
3160 #[tokio::test]
3161 async fn test_savemode_overwrite_on_append_table() -> DeltaResult<()> {
3162 let table_schema = get_delta_schema();
3163 let batch = get_record_batch(None, false);
3164 let table = DeltaTable::new_in_memory()
3165 .create()
3166 .with_configuration_property(TableProperty::AppendOnly, Some("true".to_string()))
3167 .with_columns(table_schema.fields().cloned())
3168 .await?;
3169 let writer = table.write(vec![batch]).with_save_mode(SaveMode::Overwrite);
3170
3171 let check = writer.check_preconditions().await;
3172 assert!(check.is_err());
3173 Ok(())
3174 }
3175
3176 #[tokio::test]
3177 async fn test_empty_set_of_batches() -> DeltaResult<()> {
3178 let table_schema = get_delta_schema();
3179 let table = DeltaTable::new_in_memory()
3180 .create()
3181 .with_columns(table_schema.fields().cloned())
3182 .await?;
3183 let writer = table.write(vec![]);
3184
3185 match writer.check_preconditions().await {
3186 Ok(_) => panic!("Expected check_preconditions to fail!"),
3187 Err(DeltaTableError::GenericError { .. }) => {}
3188 Err(e) => panic!("Unexpected error returned: {e:#?}"),
3189 }
3190 Ok(())
3191 }
3192
3193 #[tokio::test]
3194 async fn test_errorifexists() -> DeltaResult<()> {
3195 let table_schema = get_delta_schema();
3196 let batch = get_record_batch(None, false);
3197 let table = DeltaTable::new_in_memory()
3198 .create()
3199 .with_columns(table_schema.fields().cloned())
3200 .await?;
3201 let writer = table
3202 .write(vec![batch])
3203 .with_save_mode(SaveMode::ErrorIfExists);
3204
3205 match writer.check_preconditions().await {
3206 Ok(_) => panic!("Expected check_preconditions to fail!"),
3207 Err(DeltaTableError::GenericError { .. }) => {}
3208 Err(e) => panic!("Unexpected error returned: {e:#?}"),
3209 }
3210 Ok(())
3211 }
3212
3213 #[tokio::test]
3214 async fn test_allow_empty_batches_with_input_plan() -> DeltaResult<()> {
3215 let table_schema = get_delta_schema();
3216 let table = DeltaTable::new_in_memory()
3217 .create()
3218 .with_columns(table_schema.fields().cloned())
3219 .await?;
3220
3221 let ctx = SessionContext::new();
3222 let plan = ctx
3223 .sql("SELECT 1 as id")
3224 .await
3225 .unwrap()
3226 .logical_plan()
3227 .clone();
3228 let writer =
3229 WriteBuilder::new(table.log_store.clone(), table.state.map(|f| f.snapshot))
3230 .with_input_plan(plan)
3231 .with_save_mode(SaveMode::Overwrite);
3232
3233 let _ = writer.check_preconditions().await?;
3234 Ok(())
3235 }
3236
3237 #[tokio::test]
3238 async fn test_no_snapshot_create_actions() -> DeltaResult<()> {
3239 let table_schema = get_delta_schema();
3240 let table = DeltaTable::new_in_memory()
3241 .create()
3242 .with_columns(table_schema.fields().cloned())
3243 .await?;
3244 let batch = get_record_batch(None, false);
3245 let writer =
3246 WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![batch]);
3247
3248 let actions = writer.check_preconditions().await?;
3249 assert_eq!(
3250 actions.len(),
3251 2,
3252 "Expecting a Protocol and a Metadata action in {actions:?}"
3253 );
3254
3255 Ok(())
3256 }
3257
3258 #[tokio::test]
3259 async fn test_no_snapshot_err_no_batches_check() -> DeltaResult<()> {
3260 let table_schema = get_delta_schema();
3261 let table = DeltaTable::new_in_memory()
3262 .create()
3263 .with_columns(table_schema.fields().cloned())
3264 .await?;
3265 let writer =
3266 WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![]);
3267
3268 match writer.check_preconditions().await {
3269 Ok(_) => panic!("Expected check_preconditions to fail!"),
3270 Err(DeltaTableError::GenericError { .. }) => {}
3271 Err(e) => panic!("Unexpected error returned: {e:#?}"),
3272 }
3273
3274 Ok(())
3275 }
3276 }
3277
3278 #[tokio::test]
3279 async fn test_preserve_nullability_on_overwrite() -> TestResult {
3280 use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
3282 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3283 use std::sync::Arc;
3284
3285 let initial_schema = Arc::new(ArrowSchema::new(vec![
3287 Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, true), Field::new("active", DataType::Boolean, false), Field::new("count", DataType::Int32, false), ]));
3292
3293 let initial_batch = RecordBatch::try_new(
3294 initial_schema.clone(),
3295 vec![
3296 Arc::new(Int64Array::from(vec![1, 2, 3])),
3297 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
3298 Arc::new(BooleanArray::from(vec![true, false, true])),
3299 Arc::new(Int32Array::from(vec![10, 20, 30])),
3300 ],
3301 )?;
3302
3303 let table = DeltaTable::new_in_memory()
3305 .write(vec![initial_batch])
3306 .with_save_mode(SaveMode::Overwrite)
3307 .await?;
3308
3309 let schema = table.snapshot().unwrap().schema();
3311 let schema_fields: Vec<_> = schema.fields().collect();
3312 assert!(!schema_fields[0].is_nullable(), "id should be non-nullable");
3313 assert!(schema_fields[1].is_nullable(), "name should be nullable");
3314 assert!(
3315 !schema_fields[2].is_nullable(),
3316 "active should be non-nullable"
3317 );
3318 assert!(
3319 !schema_fields[3].is_nullable(),
3320 "count should be non-nullable"
3321 );
3322
3323 let new_schema = Arc::new(ArrowSchema::new(vec![
3325 Field::new("id", DataType::Int64, true), Field::new("name", DataType::Utf8, true), Field::new("active", DataType::Boolean, true), Field::new("count", DataType::Int32, true), ]));
3330
3331 let new_batch = RecordBatch::try_new(
3332 new_schema,
3333 vec![
3334 Arc::new(Int64Array::from(vec![Some(4), Some(5), Some(6)])),
3335 Arc::new(StringArray::from(vec![
3336 Some("David"),
3337 Some("Eve"),
3338 Some("Frank"),
3339 ])),
3340 Arc::new(BooleanArray::from(vec![
3341 Some(false),
3342 Some(true),
3343 Some(false),
3344 ])),
3345 Arc::new(Int32Array::from(vec![Some(40), Some(50), Some(60)])),
3346 ],
3347 )?;
3348
3349 let table = table
3351 .write(vec![new_batch])
3352 .with_save_mode(SaveMode::Overwrite)
3353 .await?;
3355
3356 let schema = table.snapshot().unwrap().schema();
3358 let final_fields: Vec<_> = schema.fields().collect();
3359 assert!(
3360 !final_fields[0].is_nullable(),
3361 "id should remain non-nullable after overwrite"
3362 );
3363 assert!(
3364 final_fields[1].is_nullable(),
3365 "name should remain nullable after overwrite"
3366 );
3367 assert!(
3368 !final_fields[2].is_nullable(),
3369 "active should remain non-nullable after overwrite"
3370 );
3371 assert!(
3372 !final_fields[3].is_nullable(),
3373 "count should remain non-nullable after overwrite"
3374 );
3375
3376 assert_eq!(table.version(), Some(1)); Ok(())
3380 }
3381
3382 #[tokio::test]
3383 async fn test_schema_mode_none_enforces_constraints_on_overwrite() -> TestResult {
3384 use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
3388 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3389 use std::sync::Arc;
3390
3391 let initial_schema = Arc::new(ArrowSchema::new(vec![
3393 Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, true), Field::new("active", DataType::Boolean, false), Field::new("count", DataType::Int32, false), ]));
3398
3399 let initial_batch = RecordBatch::try_new(
3400 initial_schema.clone(),
3401 vec![
3402 Arc::new(Int64Array::from(vec![1, 2, 3])),
3403 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
3404 Arc::new(BooleanArray::from(vec![true, false, true])),
3405 Arc::new(Int32Array::from(vec![10, 20, 30])),
3406 ],
3407 )?;
3408
3409 let table = DeltaTable::new_in_memory()
3411 .write(vec![initial_batch])
3412 .with_save_mode(SaveMode::Overwrite)
3413 .await?;
3414
3415 let initial_schema_fields: Vec<_> = table
3417 .snapshot()
3418 .unwrap()
3419 .schema()
3420 .fields()
3421 .cloned()
3422 .collect();
3423
3424 let new_schema_all_nullable = Arc::new(ArrowSchema::new(vec![
3426 Field::new("id", DataType::Int64, true), Field::new("name", DataType::Utf8, true), Field::new("active", DataType::Boolean, true), Field::new("count", DataType::Int32, true), ]));
3431
3432 let valid_batch = RecordBatch::try_new(
3433 new_schema_all_nullable.clone(),
3434 vec![
3435 Arc::new(Int64Array::from(vec![Some(4), Some(5), Some(6)])),
3436 Arc::new(StringArray::from(vec![
3437 Some("David"),
3438 Some("Eve"),
3439 Some("Frank"),
3440 ])),
3441 Arc::new(BooleanArray::from(vec![
3442 Some(false),
3443 Some(true),
3444 Some(false),
3445 ])),
3446 Arc::new(Int32Array::from(vec![Some(40), Some(50), Some(60)])),
3447 ],
3448 )?;
3449
3450 let table = table
3452 .write(vec![valid_batch])
3453 .with_save_mode(SaveMode::Overwrite)
3454 .await?;
3456
3457 let schema = table.snapshot().unwrap().schema();
3459 let after_overwrite_fields: Vec<_> = schema.fields().collect();
3460
3461 assert_eq!(
3463 after_overwrite_fields.len(),
3464 initial_schema_fields.len(),
3465 "Schema should have same number of fields"
3466 );
3467
3468 for (i, field) in after_overwrite_fields.iter().enumerate() {
3469 assert_eq!(
3470 field.is_nullable(),
3471 initial_schema_fields[i].is_nullable(),
3472 "Field '{}' nullability should not change",
3473 field.name()
3474 );
3475 }
3476
3477 assert!(
3479 !after_overwrite_fields[0].is_nullable(),
3480 "id must remain non-nullable"
3481 );
3482 assert!(
3483 !after_overwrite_fields[2].is_nullable(),
3484 "active must remain non-nullable"
3485 );
3486 assert!(
3487 !after_overwrite_fields[3].is_nullable(),
3488 "count must remain non-nullable"
3489 );
3490
3491 let invalid_batch = RecordBatch::try_new(
3496 new_schema_all_nullable.clone(),
3497 vec![
3498 Arc::new(Int64Array::from(vec![Some(7), None, Some(9)])), Arc::new(StringArray::from(vec![
3500 Some("George"),
3501 Some("Helen"),
3502 Some("Ivan"),
3503 ])),
3504 Arc::new(BooleanArray::from(vec![
3505 Some(true),
3506 Some(false),
3507 Some(true),
3508 ])),
3509 Arc::new(Int32Array::from(vec![Some(70), Some(80), Some(90)])),
3510 ],
3511 )?;
3512
3513 let result = table
3515 .clone()
3516 .write(vec![invalid_batch])
3517 .with_save_mode(SaveMode::Overwrite)
3518 .await;
3519
3520 assert!(
3522 result.is_err(),
3523 "Writing NULL to non-nullable field should fail"
3524 );
3525
3526 let invalid_batch_2 = RecordBatch::try_new(
3528 new_schema_all_nullable.clone(),
3529 vec![
3530 Arc::new(Int64Array::from(vec![Some(10), Some(11), Some(12)])),
3531 Arc::new(StringArray::from(vec![
3532 Some("Jane"),
3533 Some("Karl"),
3534 Some("Lisa"),
3535 ])),
3536 Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)])), Arc::new(Int32Array::from(vec![Some(100), Some(110), Some(120)])),
3538 ],
3539 )?;
3540
3541 let result2 = table
3542 .clone()
3543 .write(vec![invalid_batch_2])
3544 .with_save_mode(SaveMode::Overwrite)
3545 .await;
3546
3547 assert!(
3549 result2.is_err(),
3550 "Writing NULL to non-nullable 'active' field should fail"
3551 );
3552
3553 let schema = table.snapshot().unwrap().schema();
3555 let final_fields: Vec<_> = schema.fields().collect();
3556
3557 assert!(
3559 !final_fields[0].is_nullable(),
3560 "id still non-nullable after failed writes"
3561 );
3562 assert!(
3563 !final_fields[2].is_nullable(),
3564 "active still non-nullable after failed writes"
3565 );
3566 assert!(
3567 !final_fields[3].is_nullable(),
3568 "count still non-nullable after failed writes"
3569 );
3570
3571 Ok(())
3572 }
3573
3574 #[tokio::test]
3575 async fn test_schema_preserved_with_replace_where() -> TestResult {
3576 use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
3578 use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3579 use std::sync::Arc;
3580
3581 let initial_schema = Arc::new(ArrowSchema::new(vec![
3583 Field::new("id", DataType::Int64, false), Field::new("name", DataType::Utf8, true), Field::new("active", DataType::Boolean, false), Field::new("count", DataType::Int32, false), ]));
3588
3589 let initial_batch = RecordBatch::try_new(
3590 initial_schema.clone(),
3591 vec![
3592 Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
3593 Arc::new(StringArray::from(vec![
3594 Some("Alice"),
3595 Some("Bob"),
3596 None,
3597 Some("David"),
3598 Some("Eve"),
3599 ])),
3600 Arc::new(BooleanArray::from(vec![true, false, true, false, true])),
3601 Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
3602 ],
3603 )?;
3604
3605 let table = DeltaTable::new_in_memory()
3606 .write(vec![initial_batch])
3607 .with_save_mode(SaveMode::Overwrite)
3608 .await?;
3609
3610 let initial_fields: Vec<_> = table
3612 .snapshot()
3613 .unwrap()
3614 .schema()
3615 .fields()
3616 .cloned()
3617 .collect();
3618
3619 let new_schema = Arc::new(ArrowSchema::new(vec![
3621 Field::new("id", DataType::Int64, true), Field::new("name", DataType::Utf8, true), Field::new("active", DataType::Boolean, true), Field::new("count", DataType::Int32, true), ]));
3626
3627 let replacement_batch = RecordBatch::try_new(
3628 new_schema.clone(),
3629 vec![
3630 Arc::new(Int64Array::from(vec![Some(2), Some(4)])), Arc::new(StringArray::from(vec![Some("Bob2"), Some("David2")])),
3632 Arc::new(BooleanArray::from(vec![Some(true), Some(true)])),
3633 Arc::new(Int32Array::from(vec![Some(200), Some(400)])),
3634 ],
3635 )?;
3636
3637 let table = table
3639 .write(vec![replacement_batch])
3640 .with_save_mode(SaveMode::Overwrite)
3641 .with_replace_where("id = 2 OR id = 4")
3642 .await?;
3643
3644 let schema = table.snapshot().unwrap().schema();
3646 let final_fields: Vec<_> = schema.fields().collect();
3647
3648 for (i, field) in final_fields.iter().enumerate() {
3649 assert_eq!(
3650 field.is_nullable(),
3651 initial_fields[i].is_nullable(),
3652 "Field '{}' nullability should be preserved with replaceWhere",
3653 field.name()
3654 );
3655 }
3656
3657 let invalid_batch = RecordBatch::try_new(
3659 new_schema,
3660 vec![
3661 Arc::new(Int64Array::from(vec![None, Some(3)])), Arc::new(StringArray::from(vec![Some("Invalid"), Some("Valid")])),
3663 Arc::new(BooleanArray::from(vec![Some(false), Some(false)])),
3664 Arc::new(Int32Array::from(vec![Some(999), Some(333)])),
3665 ],
3666 )?;
3667
3668 let result = table
3669 .write(vec![invalid_batch])
3670 .with_save_mode(SaveMode::Overwrite)
3671 .with_replace_where("id = 1 OR id = 3")
3672 .await;
3673
3674 assert!(
3675 result.is_err(),
3676 "replaceWhere should still enforce non-nullable constraints"
3677 );
3678
3679 Ok(())
3680 }
3681
3682 #[tokio::test]
3683 async fn test_write_date64_normalizes_to_date32() {
3684 use arrow_array::Date64Array;
3685
3686 let schema = Arc::new(ArrowSchema::new(vec![
3687 Field::new("id", DataType::Int32, false),
3688 Field::new("sales_date", DataType::Date64, true),
3689 ]));
3690 let millis = 1760918400000i64; let batch = RecordBatch::try_new(
3692 schema,
3693 vec![
3694 Arc::new(Int32Array::from(vec![1])),
3695 Arc::new(Date64Array::from(vec![millis])),
3696 ],
3697 )
3698 .unwrap();
3699
3700 let table = DeltaTable::new_in_memory()
3701 .write(vec![batch])
3702 .await
3703 .unwrap();
3704
3705 let table_schema = table.snapshot().unwrap().schema();
3706 let date_field = table_schema.field("sales_date").unwrap();
3707 assert_eq!(date_field.data_type(), &crate::kernel::DataType::DATE);
3708
3709 let batches = get_data(&table).await;
3710 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
3711 assert_eq!(total_rows, 1);
3712 assert_eq!(
3713 batches[0]
3714 .schema()
3715 .field_with_name("sales_date")
3716 .unwrap()
3717 .data_type(),
3718 &DataType::Date32,
3719 );
3720 }
3721
3722 #[cfg(not(feature = "nanosecond-timestamps"))]
3723 #[tokio::test]
3724 async fn test_write_timestamp_ns_normalizes_to_us() {
3725 test_write_timestamp_ns_maybe_normalization(TimeUnit::Microsecond).await;
3726 }
3727
3728 #[cfg(feature = "nanosecond-timestamps")]
3729 #[tokio::test]
3730 async fn test_write_timestamp_ns_stays_ns() {
3731 test_write_timestamp_ns_maybe_normalization(TimeUnit::Nanosecond).await;
3732 }
3733
3734 async fn test_write_timestamp_ns_maybe_normalization(unit: TimeUnit) {
3735 use arrow_array::TimestampNanosecondArray;
3736
3737 let schema = Arc::new(ArrowSchema::new(vec![
3738 Field::new("id", DataType::Int32, false),
3739 Field::new(
3740 "ts",
3741 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
3742 true,
3743 ),
3744 ]));
3745 let nanos = 1_760_961_600_123_456_789_i64;
3746 let batch = RecordBatch::try_new(
3747 schema,
3748 vec![
3749 Arc::new(Int32Array::from(vec![1])),
3750 Arc::new(TimestampNanosecondArray::from(vec![nanos]).with_timezone("UTC")),
3751 ],
3752 )
3753 .unwrap();
3754
3755 let table = DeltaTable::new_in_memory()
3756 .write(vec![batch])
3757 .await
3758 .unwrap();
3759
3760 let batches = get_data(&table).await;
3761 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
3762 let schema = batches[0].schema();
3763 let result_field = schema.field_with_name("ts").unwrap();
3764 assert_eq!(
3765 result_field.data_type(),
3766 &DataType::Timestamp(unit, Some("UTC".into())),
3767 );
3768 }
3769
3770 #[tokio::test]
3771 async fn test_write_large_utf8_normalizes_to_utf8() {
3772 use arrow_array::LargeStringArray;
3773
3774 let schema = Arc::new(ArrowSchema::new(vec![
3775 Field::new("id", DataType::Int32, false),
3776 Field::new("name", DataType::LargeUtf8, true),
3777 ]));
3778 let batch = RecordBatch::try_new(
3779 schema,
3780 vec![
3781 Arc::new(Int32Array::from(vec![1, 2])),
3782 Arc::new(LargeStringArray::from(vec!["hello", "world"])),
3783 ],
3784 )
3785 .unwrap();
3786
3787 let table = DeltaTable::new_in_memory()
3788 .write(vec![batch])
3789 .await
3790 .unwrap();
3791
3792 let table_schema = table.snapshot().unwrap().schema();
3793 assert_eq!(
3794 table_schema.field("name").unwrap().data_type(),
3795 &crate::kernel::DataType::STRING,
3796 );
3797
3798 let batches = get_data(&table).await;
3799 assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
3800 }
3801
3802 #[tokio::test]
3803 async fn test_append_date64_to_existing_date32_table() {
3804 use arrow_array::{Date32Array, Date64Array};
3805
3806 let schema32 = Arc::new(ArrowSchema::new(vec![
3807 Field::new("id", DataType::Int32, false),
3808 Field::new("d", DataType::Date32, true),
3809 ]));
3810 let batch32 = RecordBatch::try_new(
3811 schema32,
3812 vec![
3813 Arc::new(Int32Array::from(vec![1])),
3814 Arc::new(Date32Array::from(vec![19650])),
3815 ],
3816 )
3817 .unwrap();
3818
3819 let table = DeltaTable::new_in_memory()
3820 .write(vec![batch32])
3821 .await
3822 .unwrap();
3823
3824 let schema64 = Arc::new(ArrowSchema::new(vec![
3825 Field::new("id", DataType::Int32, false),
3826 Field::new("d", DataType::Date64, true),
3827 ]));
3828 let batch64 = RecordBatch::try_new(
3829 schema64,
3830 vec![
3831 Arc::new(Int32Array::from(vec![2])),
3832 Arc::new(Date64Array::from(vec![1760918400000i64])),
3833 ],
3834 )
3835 .unwrap();
3836
3837 let table = table
3838 .write(vec![batch64])
3839 .with_save_mode(SaveMode::Append)
3840 .await
3841 .unwrap();
3842
3843 let batches = get_data(&table).await;
3844 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
3845 assert_eq!(total_rows, 2);
3846 }
3847}