Skip to main content

deltalake_core/operations/write/
mod.rs

1//!
2//! New Table Semantics
3//!  - The schema of the [Plan] is used to initialize the table.
4//!  - The partition columns will be used to partition the table.
5//!
6//! Existing Table Semantics
7//!  - The save mode will control how existing data is handled (i.e. overwrite, append, etc)
8//!  - Conflicting columns (i.e. a INT, and a STRING)
9//!    will result in an exception.
10//!    Partition columns, if present, are validated against the existing metadata.
11//!    When omitted, the table partitioning is respected.
12//!    Full table overwrite with `SchemaMode::Overwrite` and no replaceWhere predicate may
13//!    replace the partition columns.
14//!
15//! In combination with `Overwrite`, a `replaceWhere` option can be used to transactionally
16//! replace data that matches a predicate.
17//!
18//! # Example
19//! ```rust ignore
20//! let id_field = arrow::datatypes::Field::new("id", arrow::datatypes::DataType::Int32, false);
21//! let schema = Arc::new(arrow::datatypes::Schema::new(vec![id_field]));
22//! let ids = arrow::array::Int32Array::from(vec![1, 2, 3, 4, 5]);
23//! let batch = RecordBatch::try_new(schema, vec![Arc::new(ids)])?;
24//! let ops = DeltaOps::try_from_url("../path/to/empty/dir").await?;
25//! let table = ops.write(vec![batch]).await?;
26//! ````
27
28use std::collections::HashMap;
29use std::num::NonZeroU64;
30use std::str::FromStr;
31use std::sync::Arc;
32use std::time::Instant;
33use std::vec;
34
35use arrow::array::RecordBatch;
36use datafusion::catalog::{Session, TableProvider};
37use datafusion::common::Result;
38use datafusion::datasource::{MemTable, provider_as_source};
39use datafusion::logical_expr::{LogicalPlan, LogicalPlanBuilder, UNNAMED_TABLE};
40use delta_kernel::engine::arrow_conversion::TryIntoKernel as _;
41use delta_kernel::table_features::ColumnMappingMode;
42use futures::future::BoxFuture;
43use parquet::file::properties::WriterProperties;
44use serde::{Deserialize, Serialize};
45use tracing::Instrument;
46use url::Url;
47
48pub use self::configs::WriterStatsConfig;
49use self::execution::write_execution_plan_v2;
50use self::metrics::{SOURCE_COUNT_ID, SOURCE_COUNT_METRIC};
51use super::{CreateBuilder, CustomExecuteHandler, Operation};
52use crate::DeltaTable;
53use crate::delta_datafusion::Expression;
54use crate::delta_datafusion::expr::fmt_expr_to_sql;
55use crate::delta_datafusion::physical::{find_metric_node, get_metric};
56use crate::delta_datafusion::{
57    DeltaSessionExt, SessionFallbackPolicy, SessionResolveContext, create_session,
58    resolve_session_state, update_datafusion_session,
59};
60use crate::errors::{DeltaResult, DeltaTableError, unsupported_column_mapping_write};
61use crate::kernel::schema::cast::normalize_for_delta;
62use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL, TableReference};
63use crate::kernel::{Action, EagerSnapshot, StructType};
64use crate::logstore::LogStoreRef;
65use crate::protocol::{DeltaOperation, SaveMode};
66
67pub mod configs;
68pub(crate) mod execution;
69pub(crate) mod generated_columns;
70pub(crate) mod metrics;
71mod plan;
72pub(crate) mod schema_evolution;
73pub mod writer;
74
75#[derive(thiserror::Error, Debug)]
76pub(crate) enum WriteError {
77    #[error("No data source supplied to write command.")]
78    MissingData,
79
80    #[error("A table already exists at: {0}")]
81    AlreadyExists(Url),
82
83    #[error(
84        "Specified table partitioning does not match table partitioning: expected: {expected:?}, got: {got:?}. To change partition columns, use full table overwrite with schema overwrite and no replaceWhere predicate."
85    )]
86    PartitionColumnMismatch {
87        expected: Vec<String>,
88        got: Vec<String>,
89    },
90
91    #[error("Partition column(s) not found in write schema: {}", columns.join(", "))]
92    MissingPartitionColumns { columns: Vec<String> },
93}
94
95impl From<WriteError> for DeltaTableError {
96    fn from(err: WriteError) -> Self {
97        DeltaTableError::GenericError {
98            source: Box::new(err),
99        }
100    }
101}
102
103///Specifies how to handle schema drifts
104#[derive(PartialEq, Clone, Copy)]
105pub enum SchemaMode {
106    /// Overwrite the schema with the new schema
107    Overwrite,
108    /// Append the new schema to the existing schema
109    Merge,
110}
111
112impl FromStr for SchemaMode {
113    type Err = DeltaTableError;
114
115    fn from_str(s: &str) -> DeltaResult<Self> {
116        match s.to_ascii_lowercase().as_str() {
117            "overwrite" => Ok(SchemaMode::Overwrite),
118            "merge" => Ok(SchemaMode::Merge),
119            _ => Err(DeltaTableError::Generic(format!(
120                "Invalid schema write mode provided: {s}, only these are supported: ['overwrite', 'merge']"
121            ))),
122        }
123    }
124}
125
126/// Write data into a DeltaTable
127pub struct WriteBuilder {
128    /// A snapshot of the to-be-loaded table's state
129    snapshot: Option<EagerSnapshot>,
130    /// Delta object store for handling data files
131    log_store: LogStoreRef,
132    /// The input plan
133    input: Option<LogicalPlan>,
134    /// Datafusion session state relevant for executing the input plan
135    session: Option<Arc<dyn Session>>,
136    session_fallback_policy: SessionFallbackPolicy,
137    /// SaveMode defines how to treat data already written to table location
138    mode: SaveMode,
139    /// Column names for table partitioning
140    partition_columns: Option<Vec<String>>,
141    /// When using `Overwrite` mode, replace data that matches a predicate
142    predicate: Option<Expression>,
143    /// Size above which we will write a buffered parquet file to disk.
144    /// If None, the writer will not create a new file until the writer is closed.
145    target_file_size: Option<Option<NonZeroU64>>,
146    /// Number of records to be written in single batch to underlying writer
147    write_batch_size: Option<usize>,
148    /// whether to overwrite the schema or to merge it. None means to fail on schmema drift
149    schema_mode: Option<SchemaMode>,
150    /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
151    safe_cast: bool,
152    /// Parquet writer properties
153    writer_properties: Option<WriterProperties>,
154    /// Additional information to add to the commit
155    commit_properties: CommitProperties,
156    /// Name of the table, only used when table doesn't exist yet
157    name: Option<String>,
158    /// Description of the table, only used when table doesn't exist yet
159    description: Option<String>,
160    /// Configurations of the delta table, only used when table doesn't exist
161    configuration: HashMap<String, Option<String>>,
162    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
163}
164
165#[derive(Default, Debug, Serialize, Deserialize)]
166/// Metrics for the Write Operation
167pub struct WriteMetrics {
168    /// Number of files added
169    pub num_added_files: usize,
170    /// Number of files removed
171    pub num_removed_files: usize,
172    /// Number of partitions
173    pub num_partitions: usize,
174    /// Number of rows added
175    pub num_added_rows: usize,
176    /// Time taken to execute the entire operation
177    pub execution_time_ms: u64,
178}
179
180impl super::Operation for WriteBuilder {
181    fn log_store(&self) -> &LogStoreRef {
182        &self.log_store
183    }
184    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
185        self.custom_execute_handler.clone()
186    }
187}
188
189impl WriteBuilder {
190    /// Create a new [`WriteBuilder`]
191    pub fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
192        Self {
193            snapshot,
194            log_store,
195            input: None,
196            session: None,
197            session_fallback_policy: SessionFallbackPolicy::default(),
198            mode: SaveMode::Append,
199            partition_columns: None,
200            predicate: None,
201            target_file_size: None,
202            write_batch_size: None,
203            safe_cast: false,
204            schema_mode: None,
205            writer_properties: None,
206            commit_properties: CommitProperties::default(),
207            name: None,
208            description: None,
209            configuration: Default::default(),
210            custom_execute_handler: None,
211        }
212    }
213
214    /// Specify the behavior when a table exists at location
215    pub fn with_save_mode(mut self, save_mode: SaveMode) -> Self {
216        self.mode = save_mode;
217        self
218    }
219
220    /// Add Schema Write Mode
221    pub fn with_schema_mode(mut self, schema_mode: SchemaMode) -> Self {
222        self.schema_mode = Some(schema_mode);
223        self
224    }
225
226    /// When using `Overwrite` mode, replace data that matches a predicate
227    pub fn with_replace_where(mut self, predicate: impl Into<Expression>) -> Self {
228        self.predicate = Some(predicate.into());
229        self
230    }
231
232    /// (Optional) Specify table partitioning. For existing tables this must match the
233    /// current partitioning, except full table overwrite with schema overwrite and
234    /// no replaceWhere predicate may replace the partitioning. For new tables, the
235    /// partitioning is applied.
236    pub fn with_partition_columns(
237        mut self,
238        partition_columns: impl IntoIterator<Item = impl Into<String>>,
239    ) -> Self {
240        self.partition_columns = Some(partition_columns.into_iter().map(|s| s.into()).collect());
241        self
242    }
243
244    /// Logical execution plan that produces the data to be written to the delta table
245    #[deprecated(since = "0.31.0", note = "Use `with_input_plan` instead")]
246    pub fn with_input_execution_plan(self, plan: Arc<LogicalPlan>) -> Self {
247        self.with_input_plan(plan.as_ref().clone())
248    }
249
250    /// Logical plan that produces the data to be written to the delta table
251    pub fn with_input_plan(mut self, plan: LogicalPlan) -> Self {
252        self.input = Some(plan);
253        self
254    }
255
256    /// Set the DataFusion session used for planning and execution.
257    ///
258    /// The provided `session` should wrap a concrete `datafusion::execution::context::SessionState`.
259    ///
260    /// If `session` is not a `SessionState`, the default policy is to log a warning and fall back to
261    /// internal defaults. To make this strict (error instead), set
262    /// `with_session_fallback_policy(SessionFallbackPolicy::RequireSessionState)`.
263    ///
264    /// Example: `Arc::new(create_session().state())`.
265    pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
266        self.session = Some(session);
267        self
268    }
269
270    /// Control how delta-rs resolves the provided session when it is not a concrete `SessionState`.
271    ///
272    /// Defaults to `SessionFallbackPolicy::InternalDefaults` to preserve existing behavior.
273    pub fn with_session_fallback_policy(mut self, policy: SessionFallbackPolicy) -> Self {
274        self.session_fallback_policy = policy;
275        self
276    }
277
278    /// Specify the target file size for data files written to the delta table.
279    pub fn with_target_file_size(mut self, target_file_size: Option<NonZeroU64>) -> Self {
280        self.target_file_size = Some(target_file_size);
281        self
282    }
283
284    /// Specify the target batch size for row groups written to parquet files.
285    pub fn with_write_batch_size(mut self, write_batch_size: usize) -> Self {
286        self.write_batch_size = Some(write_batch_size);
287        self
288    }
289
290    /// Specify the safety of the casting operation
291    /// how to handle cast failures, either return NULL (safe=true) or return ERR (safe=false)
292    pub fn with_cast_safety(mut self, safe: bool) -> Self {
293        self.safe_cast = safe;
294        self
295    }
296
297    /// Specify the writer properties to use when writing a parquet file
298    pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
299        self.writer_properties = Some(writer_properties);
300        self
301    }
302
303    /// Additional metadata to be added to commit info
304    pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
305        self.commit_properties = commit_properties;
306        self
307    }
308
309    /// Specify the table name. Optionally qualified with
310    /// a database name [database_name.] table_name.
311    pub fn with_table_name(mut self, name: impl Into<String>) -> Self {
312        self.name = Some(name.into());
313        self
314    }
315
316    /// Comment to describe the table.
317    pub fn with_description(mut self, description: impl Into<String>) -> Self {
318        self.description = Some(description.into());
319        self
320    }
321
322    /// Set a custom execute handler, for pre and post execution
323    pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
324        self.custom_execute_handler = Some(handler);
325        self
326    }
327
328    /// Set configuration on created table
329    pub fn with_configuration(
330        mut self,
331        configuration: impl IntoIterator<Item = (impl Into<String>, Option<impl Into<String>>)>,
332    ) -> Self {
333        self.configuration = configuration
334            .into_iter()
335            .map(|(k, v)| (k.into(), v.map(|s| s.into())))
336            .collect();
337        self
338    }
339
340    /// Execution plan that produces the data to be written to the delta table
341    pub fn with_input_batches(mut self, batches: impl IntoIterator<Item = RecordBatch>) -> Self {
342        let batches: Vec<RecordBatch> = batches.into_iter().collect();
343        if !batches.is_empty() {
344            let table_provider: Arc<dyn TableProvider> =
345                Arc::new(MemTable::try_new(batches[0].schema(), vec![batches]).unwrap());
346            let source_plan =
347                LogicalPlanBuilder::scan(UNNAMED_TABLE, provider_as_source(table_provider), None)
348                    .unwrap()
349                    .build()
350                    .unwrap();
351            self.input = Some(source_plan);
352        }
353        self
354    }
355
356    /// Partition layout changes require a full table rewrite. Predicate overwrites
357    /// replace only a table subset and must keep the existing layout.
358    fn can_overwrite_partition_columns(&self) -> bool {
359        self.mode == SaveMode::Overwrite
360            && self.schema_mode == Some(SchemaMode::Overwrite)
361            && self.predicate.is_none()
362    }
363
364    fn get_partition_columns(&self) -> Result<Vec<String>, WriteError> {
365        // validate partition columns
366        let active_partitions = self
367            .snapshot
368            .as_ref()
369            .map(|s| s.metadata().partition_columns().to_vec());
370
371        if let Some(active_part) = active_partitions {
372            if let Some(ref partition_columns) = self.partition_columns {
373                if &active_part != partition_columns {
374                    if self.can_overwrite_partition_columns() {
375                        Ok(partition_columns.clone())
376                    } else {
377                        Err(WriteError::PartitionColumnMismatch {
378                            expected: active_part,
379                            got: partition_columns.to_vec(),
380                        })
381                    }
382                } else {
383                    Ok(partition_columns.clone())
384                }
385            } else {
386                Ok(active_part)
387            }
388        } else {
389            Ok(self.partition_columns.clone().unwrap_or_default().to_vec())
390        }
391    }
392
393    async fn check_preconditions(&self) -> DeltaResult<Vec<Action>> {
394        if self.schema_mode == Some(SchemaMode::Overwrite) && self.mode != SaveMode::Overwrite {
395            return Err(DeltaTableError::Generic(
396                "Schema overwrite not supported for Append".to_string(),
397            ));
398        }
399
400        let input = self
401            .input
402            .as_ref()
403            .ok_or::<DeltaTableError>(WriteError::MissingData.into())?;
404        let normalized_arrow = normalize_for_delta(input.schema().inner());
405        let schema: StructType = normalized_arrow.try_into_kernel()?;
406
407        match &self.snapshot {
408            Some(snapshot) => {
409                if snapshot.table_configuration().column_mapping_mode() != ColumnMappingMode::None {
410                    return Err(unsupported_column_mapping_write("WRITE"));
411                }
412
413                if self.mode == SaveMode::Overwrite {
414                    PROTOCOL.check_append_only(snapshot)?;
415                    if !snapshot.load_config().require_files {
416                        return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into()));
417                    }
418                }
419
420                PROTOCOL.can_write_to(snapshot)?;
421
422                if self.schema_mode.is_none() {
423                    PROTOCOL.check_can_write_timestamp_ntz(snapshot, &schema)?;
424                    #[cfg(feature = "nanosecond-timestamps")]
425                    PROTOCOL.check_can_write_timestamp_nanos(snapshot, &schema)?;
426                }
427                match self.mode {
428                    SaveMode::ErrorIfExists => {
429                        Err(WriteError::AlreadyExists(self.log_store.root_url().clone()).into())
430                    }
431                    _ => Ok(vec![]),
432                }
433            }
434            None => {
435                let mut builder = CreateBuilder::new()
436                    .with_log_store(self.log_store.clone())
437                    .with_columns(schema.fields().cloned())
438                    .with_configuration(self.configuration.clone());
439                if let Some(partition_columns) = self.partition_columns.as_ref() {
440                    builder = builder.with_partition_columns(partition_columns.clone())
441                }
442
443                if let Some(name) = self.name.as_ref() {
444                    builder = builder.with_table_name(name.clone());
445                };
446
447                if let Some(desc) = self.description.as_ref() {
448                    builder = builder.with_comment(desc.clone());
449                };
450
451                let (_, actions, _, _) = builder.into_table_and_actions().await?;
452                Ok(actions)
453            }
454        }
455    }
456}
457
458impl std::future::IntoFuture for WriteBuilder {
459    type Output = DeltaResult<DeltaTable>;
460    type IntoFuture = BoxFuture<'static, Self::Output>;
461
462    fn into_future(self) -> Self::IntoFuture {
463        let mut this = self;
464        let table_uri = this.log_store.root_url().clone();
465        let mode = this.mode;
466
467        Box::pin(
468            async move {
469                // Runs pre execution handler.
470                let operation_id = this.get_operation_id();
471                this.pre_execute(operation_id).await?;
472
473                let mut metrics = WriteMetrics::default();
474                let exec_start = Instant::now();
475
476                // Create table actions to initialize table in case it does not yet exist
477                // and should be created
478                let mut actions = this.check_preconditions().await?;
479
480                let partition_columns = this.get_partition_columns()?;
481
482                let Some(source) = this.input.take() else {
483                    return Err(WriteError::MissingData.into());
484                };
485
486                let (session, _) = resolve_session_state(
487                    this.session.as_deref(),
488                    this.session_fallback_policy,
489                    || create_session().state(),
490                    SessionResolveContext {
491                        operation: "write",
492                        table_uri: Some(this.log_store.root_url()),
493                        cdc: false,
494                    },
495                )?;
496
497                update_datafusion_session(&session, &this.log_store, Some(operation_id))?;
498                session.ensure_log_store_registered(this.log_store.as_ref())?;
499
500                let prepared_write = plan::prepare_write(plan::WritePreparationInput {
501                    snapshot: this.snapshot.as_ref(),
502                    session: &session,
503                    source,
504                    mode: this.mode,
505                    schema_mode: this.schema_mode,
506                    safe_cast: this.safe_cast,
507                    partition_columns: partition_columns.clone(),
508                    predicate: this.predicate,
509                    target_file_size: this.target_file_size,
510                    write_batch_size: this.write_batch_size,
511                    writer_properties: this.writer_properties.clone(),
512                    configuration: &this.configuration,
513                })?;
514
515                let overwrite_plan = plan::plan_overwrite_rewrite(
516                    this.snapshot.as_ref(),
517                    &this.log_store,
518                    &session,
519                    this.mode,
520                    &prepared_write,
521                    operation_id,
522                )
523                .await?;
524
525                if overwrite_plan.diagnostics.dropped_pruning_term_count > 0 {
526                    tracing::warn!(
527                        rewrite_kind = ?overwrite_plan.kind,
528                        matched_file_count = overwrite_plan.diagnostics.matched_file_count,
529                        translated_pruning_term_count =
530                            overwrite_plan.diagnostics.translated_pruning_term_count,
531                        dropped_pruning_term_count =
532                            overwrite_plan.diagnostics.dropped_pruning_term_count,
533                        "overwrite rewrite predicate was only partially translated for pruning; exact validation remains enabled"
534                    );
535                }
536
537                let plan::PreparedWrite {
538                    schema_delta,
539                    exact_validation,
540                    exec_options,
541                    ..
542                } = prepared_write;
543                actions.extend(schema_delta.into_actions());
544
545                metrics.num_removed_files = overwrite_plan.num_removed_files();
546
547                let plan::WriteExecOptions {
548                    partition_columns,
549                    target_file_size,
550                    write_batch_size,
551                    writer_properties,
552                    writer_stats_config,
553                } = exec_options;
554                let predicate_sql = exact_validation.as_ref().map(fmt_expr_to_sql).transpose()?;
555                let (sink_plan, contains_cdc, insert_marker_column) =
556                    overwrite_plan.build_sink_plan()?;
557                let source_plan = session.create_physical_plan(&sink_plan).await?;
558
559                // Here we need to validate if the new data conforms to a predicate if one is provided
560                let (add_actions, _) = write_execution_plan_v2(
561                    this.snapshot.as_ref(),
562                    &session,
563                    source_plan.clone(),
564                    partition_columns.clone(),
565                    this.log_store.object_store(Some(operation_id)).clone(),
566                    target_file_size,
567                    write_batch_size,
568                    writer_properties,
569                    writer_stats_config,
570                    exact_validation,
571                    contains_cdc,
572                    insert_marker_column,
573                )
574                .await?;
575
576                actions.extend(
577                    overwrite_plan
578                        .matched_existing
579                        .into_actions(overwrite_plan.deletion_timestamp)?,
580                );
581
582                let source_count =
583                    find_metric_node(SOURCE_COUNT_ID, &source_plan).ok_or_else(|| {
584                        DeltaTableError::Generic("Unable to locate expected metric node".into())
585                    })?;
586                let source_count_metrics = source_count.metrics().unwrap();
587                let num_added_rows = get_metric(&source_count_metrics, SOURCE_COUNT_METRIC);
588                metrics.num_added_rows = num_added_rows;
589
590                metrics.num_added_files = add_actions.len();
591                actions.extend(add_actions);
592
593                metrics.execution_time_ms =
594                    Instant::now().duration_since(exec_start).as_millis() as u64;
595
596                let operation = DeltaOperation::Write {
597                    mode: this.mode,
598                    partition_by: if !partition_columns.is_empty() {
599                        Some(partition_columns)
600                    } else {
601                        None
602                    },
603                    predicate: predicate_sql,
604                };
605
606                let mut commit_properties = this.commit_properties.clone();
607                commit_properties.app_metadata.insert(
608                    "operationMetrics".to_owned(),
609                    serde_json::to_value(&metrics)?,
610                );
611
612                let commit = CommitBuilder::from(commit_properties)
613                    .with_actions(actions)
614                    .with_post_commit_hook_handler(this.custom_execute_handler.clone())
615                    .with_operation_id(operation_id)
616                    .build(
617                        this.snapshot.as_ref().map(|f| f as &dyn TableReference),
618                        this.log_store.clone(),
619                        operation.clone(),
620                    )
621                    .await?;
622
623                if let Some(handler) = this.custom_execute_handler {
624                    handler.post_execute(&this.log_store, operation_id).await?;
625                }
626
627                Ok(DeltaTable::new_with_state(this.log_store, commit.snapshot))
628            }
629            .instrument(tracing::info_span!(
630                "write_operation",
631                operation = "write",
632                mode = ?mode,
633                table_uri = %table_uri
634            )),
635        )
636    }
637}
638
639#[cfg(test)]
640mod tests {
641    use super::*;
642    use crate::TableProperty;
643    use crate::ensure_table_uri;
644    use crate::kernel::CommitInfo;
645    use crate::logstore::get_actions;
646    use crate::operations::collect_sendable_stream;
647    use crate::protocol::SaveMode;
648    use crate::test_utils::{TestResult, TestSchemas};
649    use crate::writer::test_utils::datafusion::{get_data, get_data_sorted, write_batch};
650    use crate::writer::test_utils::{
651        get_arrow_schema, get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch,
652        get_record_batch_with_nested_struct, setup_table_with_configuration,
653    };
654    use arrow_array::{
655        Float64Array, Int32Array, Int64Array, StringArray, TimestampMicrosecondArray,
656    };
657    use arrow_schema::{DataType, Field, Fields, Schema as ArrowSchema, TimeUnit};
658    use datafusion::physical_plan::collect;
659    use datafusion::prelude::*;
660    use datafusion::{assert_batches_eq, assert_batches_sorted_eq};
661    use delta_kernel::engine::arrow_conversion::TryIntoArrow;
662    use delta_kernel::schema::MetadataValue;
663    use futures::TryStreamExt;
664    use itertools::Itertools;
665    use serde_json::{Value, json};
666
667    async fn get_write_metrics(table: &DeltaTable) -> WriteMetrics {
668        let mut commit_info: Vec<_> = table.history(Some(1)).await.unwrap().collect();
669        let metrics = commit_info
670            .first_mut()
671            .unwrap()
672            .info
673            .remove("operationMetrics")
674            .unwrap();
675        serde_json::from_value(metrics).unwrap()
676    }
677
678    async fn query_table(table: &DeltaTable, sql: &str) -> TestResult<Vec<RecordBatch>> {
679        let table = DeltaTable::new_with_state(
680            table.log_store.clone(),
681            table.state.as_ref().unwrap().clone(),
682        );
683        let ctx = SessionContext::new();
684        table.update_datafusion_session(&ctx.state()).unwrap();
685        ctx.register_table("test", table.table_provider().await.unwrap())
686            .unwrap();
687
688        Ok(ctx.sql(sql).await?.collect().await?)
689    }
690
691    async fn query_single_i64_row(table: &DeltaTable, sql: &str) -> TestResult<Vec<i64>> {
692        let batches = query_table(table, sql).await?;
693        let batch = batches
694            .first()
695            .expect("expected aggregate query to return a single batch");
696
697        Ok(batch
698            .columns()
699            .iter()
700            .map(|column| {
701                column
702                    .as_any()
703                    .downcast_ref::<Int64Array>()
704                    .expect("expected Int64 aggregate column")
705                    .value(0)
706            })
707            .collect())
708    }
709
710    async fn query_i32_rows(table: &DeltaTable, sql: &str, column: &str) -> TestResult<Vec<i32>> {
711        let mut values = Vec::new();
712        for batch in query_table(table, sql).await? {
713            let array = batch
714                .column_by_name(column)
715                .expect("expected query column")
716                .as_any()
717                .downcast_ref::<Int32Array>()
718                .expect("expected Int32 query column");
719            values.extend(
720                array
721                    .iter()
722                    .map(|value| value.expect("expected non-null Int32 value")),
723            );
724        }
725        Ok(values)
726    }
727
728    async fn open_copied_table_fixture(
729        fixture_source: &std::path::Path,
730        table_dir_name: &str,
731    ) -> TestResult<(tempfile::TempDir, DeltaTable)> {
732        let temp_dir = tempfile::tempdir()?;
733        fs_extra::dir::copy(fixture_source, temp_dir.path(), &Default::default())?;
734        let table_url =
735            url::Url::from_directory_path(temp_dir.path().join(table_dir_name).canonicalize()?)
736                .unwrap();
737        Ok((temp_dir, crate::open_table(table_url).await?))
738    }
739
740    async fn latest_remove_actions(table: &DeltaTable) -> TestResult<Vec<crate::kernel::Remove>> {
741        let version = table
742            .version()
743            .expect("expected committed version for latest remove actions");
744        let snapshot_bytes = table
745            .log_store
746            .read_commit_entry(version)
747            .await?
748            .expect("failed to get snapshot bytes");
749        Ok(get_actions(version, &snapshot_bytes)?
750            .into_iter()
751            .filter_map(|action| match action {
752                Action::Remove(remove) => Some(remove),
753                _ => None,
754            })
755            .collect())
756    }
757
758    async fn modified_partitioned_table(batch: &RecordBatch) -> TestResult<DeltaTable> {
759        Ok(DeltaTable::new_in_memory()
760            .write(vec![batch.clone()])
761            .with_partition_columns(["modified"])
762            .await?)
763    }
764
765    fn expect_write_error(err: &DeltaTableError) -> &WriteError {
766        let DeltaTableError::GenericError { source } = err else {
767            panic!("expected WriteError, got {err:?}");
768        };
769        source
770            .downcast_ref::<WriteError>()
771            .expect("expected WriteError source")
772    }
773
774    fn assert_common_write_metrics(write_metrics: WriteMetrics) {
775        // assert!(write_metrics.execution_time_ms > 0);
776        assert!(write_metrics.num_added_files > 0);
777    }
778
779    #[tokio::test]
780    async fn test_write_when_delta_table_is_append_only() {
781        let table = setup_table_with_configuration(TableProperty::AppendOnly, Some("true")).await;
782        let batch = get_record_batch(None, false);
783        // Append
784        let table = write_batch(table, batch.clone()).await;
785        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
786        assert_eq!(write_metrics.num_added_rows, batch.num_rows());
787        assert_eq!(write_metrics.num_removed_files, 0);
788        assert_common_write_metrics(write_metrics);
789
790        // Overwrite
791        let _err = table
792            .write(vec![batch])
793            .with_save_mode(SaveMode::Overwrite)
794            .await
795            .expect_err("Remove action is included when Delta table is append-only. Should error");
796    }
797
798    #[tokio::test]
799    async fn test_create_write() {
800        let table_schema = get_delta_schema();
801        let batch = get_record_batch(None, false);
802
803        let table = DeltaTable::new_in_memory()
804            .create()
805            .with_columns(table_schema.fields().cloned())
806            .await
807            .unwrap();
808        assert_eq!(table.version(), Some(0));
809
810        // write some data
811        let metadata = HashMap::from_iter(vec![("k1".to_string(), json!("v1.1"))]);
812        let mut table = table
813            .write(vec![batch.clone()])
814            .with_save_mode(SaveMode::Append)
815            .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone()))
816            .await
817            .unwrap();
818        assert_eq!(table.version(), Some(1));
819        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
820
821        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
822        assert_eq!(write_metrics.num_added_rows, batch.num_rows());
823        assert_eq!(
824            write_metrics.num_added_files,
825            table.snapshot().unwrap().log_data().num_files()
826        );
827        assert_common_write_metrics(write_metrics);
828
829        table.load().await.unwrap();
830        let history: Vec<CommitInfo> = table.history(None).await.unwrap().collect();
831        assert_eq!(history.len(), 2);
832        assert_eq!(
833            history[0]
834                .info
835                .clone()
836                .into_iter()
837                .filter(|(k, _)| k == "k1")
838                .collect::<HashMap<String, Value>>(),
839            metadata
840        );
841
842        // append some data
843        let metadata: HashMap<String, Value> =
844            HashMap::from_iter(vec![("k1".to_string(), json!("v1.2"))]);
845        let mut table = table
846            .write(vec![batch.clone()])
847            .with_save_mode(SaveMode::Append)
848            .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone()))
849            .await
850            .unwrap();
851        assert_eq!(table.version(), Some(2));
852        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
853        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
854        assert_eq!(write_metrics.num_added_rows, batch.num_rows());
855        assert_eq!(write_metrics.num_added_files, 1);
856        assert_common_write_metrics(write_metrics);
857
858        table.load().await.unwrap();
859        let history: Vec<CommitInfo> = table.history(None).await.unwrap().collect();
860        assert_eq!(history.len(), 3);
861        assert_eq!(
862            history[0]
863                .info
864                .clone()
865                .into_iter()
866                .filter(|(k, _)| k == "k1")
867                .collect::<HashMap<String, Value>>(),
868            metadata
869        );
870
871        // overwrite table
872        let metadata: HashMap<String, Value> =
873            HashMap::from_iter(vec![("k2".to_string(), json!("v2.1"))]);
874        let mut table = table
875            .write(vec![batch.clone()])
876            .with_save_mode(SaveMode::Overwrite)
877            .with_commit_properties(CommitProperties::default().with_metadata(metadata.clone()))
878            .await
879            .unwrap();
880        assert_eq!(table.version(), Some(3));
881        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
882        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
883        assert_eq!(write_metrics.num_added_rows, batch.num_rows());
884        assert!(write_metrics.num_removed_files > 0);
885        assert_common_write_metrics(write_metrics);
886
887        table.load().await.unwrap();
888        let history: Vec<CommitInfo> = table.history(None).await.unwrap().collect();
889        assert_eq!(history.len(), 4);
890        assert_eq!(
891            history[0]
892                .info
893                .clone()
894                .into_iter()
895                .filter(|(k, _)| k == "k2")
896                .collect::<HashMap<String, Value>>(),
897            metadata
898        );
899    }
900
901    #[tokio::test]
902    async fn test_write_different_types() {
903        // Ensure write data is casted when data of a different type from the table is provided.
904
905        // Validate String -> Int is err
906        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
907            "value",
908            DataType::Int32,
909            true,
910        )]));
911
912        let batch = RecordBatch::try_new(
913            Arc::clone(&schema),
914            vec![Arc::new(Int32Array::from(vec![Some(0), None]))],
915        )
916        .unwrap();
917        let table = DeltaTable::new_in_memory()
918            .write(vec![batch])
919            .await
920            .unwrap();
921        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
922        assert_eq!(write_metrics.num_added_rows, 2);
923        assert_common_write_metrics(write_metrics);
924
925        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
926            "value",
927            DataType::Utf8,
928            true,
929        )]));
930
931        let batch = RecordBatch::try_new(
932            Arc::clone(&schema),
933            vec![Arc::new(StringArray::from(vec![
934                Some("Test123".to_owned()),
935                Some("123".to_owned()),
936                None,
937            ]))],
938        )
939        .unwrap();
940
941        // Test cast options
942        let table = table
943            .write(vec![batch.clone()])
944            .with_cast_safety(true)
945            .await
946            .unwrap();
947
948        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
949        assert_eq!(write_metrics.num_added_rows, 3);
950        assert_common_write_metrics(write_metrics);
951
952        let expected = [
953            "+-------+",
954            "| value |",
955            "+-------+",
956            "|       |",
957            "|       |",
958            "|       |",
959            "| 123   |",
960            "| 0     |",
961            "+-------+",
962        ];
963        let actual = get_data(&table).await;
964        assert_batches_sorted_eq!(&expected, &actual);
965
966        let res = table.write(vec![batch]).await;
967        assert!(res.is_err());
968
969        // Validate the datetime -> string behavior
970        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
971            "value",
972            arrow::datatypes::DataType::Utf8,
973            true,
974        )]));
975
976        let batch = RecordBatch::try_new(
977            Arc::clone(&schema),
978            vec![Arc::new(StringArray::from(vec![Some(
979                "2023-06-03 15:35:00".to_owned(),
980            )]))],
981        )
982        .unwrap();
983        let table = DeltaTable::new_in_memory()
984            .write(vec![batch])
985            .await
986            .unwrap();
987
988        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
989        assert_eq!(write_metrics.num_added_rows, 1);
990        assert_common_write_metrics(write_metrics);
991
992        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
993            "value",
994            DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".to_string().into())),
995            true,
996        )]));
997        let batch = RecordBatch::try_new(
998            Arc::clone(&schema),
999            vec![Arc::new(
1000                TimestampMicrosecondArray::from(vec![Some(10000)]).with_timezone("UTC"),
1001            )],
1002        )
1003        .unwrap();
1004
1005        let _res = table.write(vec![batch]).await.unwrap();
1006        let expected = [
1007            "+--------------------------+",
1008            "| value                    |",
1009            "+--------------------------+",
1010            "| 1970-01-01T00:00:00.010Z |",
1011            "| 2023-06-03 15:35:00      |",
1012            "+--------------------------+",
1013        ];
1014        let actual = get_data(&_res).await;
1015        assert_batches_sorted_eq!(&expected, &actual);
1016    }
1017
1018    #[tokio::test]
1019    async fn test_write_nonexistent() {
1020        let batch = get_record_batch(None, false);
1021        let table = DeltaTable::new_in_memory()
1022            .write(vec![batch])
1023            .with_save_mode(SaveMode::ErrorIfExists)
1024            .await
1025            .unwrap();
1026        assert_eq!(table.version(), Some(0));
1027        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 1);
1028        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1029        assert_common_write_metrics(write_metrics);
1030    }
1031
1032    #[tokio::test]
1033    async fn test_write_partitioned() {
1034        let batch = get_record_batch(None, false);
1035        let table = DeltaTable::new_in_memory()
1036            .write(vec![batch.clone()])
1037            .with_save_mode(SaveMode::ErrorIfExists)
1038            .with_partition_columns(["modified"])
1039            .await
1040            .unwrap();
1041        assert_eq!(table.version(), Some(0));
1042        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 2);
1043        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1044        assert_eq!(write_metrics.num_added_files, 2);
1045        assert_common_write_metrics(write_metrics);
1046
1047        let table = DeltaTable::new_in_memory()
1048            .write(vec![batch])
1049            .with_save_mode(SaveMode::ErrorIfExists)
1050            .with_partition_columns(["modified", "id"])
1051            .await
1052            .unwrap();
1053        assert_eq!(table.version(), Some(0));
1054        assert_eq!(table.snapshot().unwrap().log_data().num_files(), 4);
1055
1056        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1057        assert_eq!(write_metrics.num_added_files, 4);
1058        assert_common_write_metrics(write_metrics);
1059    }
1060
1061    #[tokio::test]
1062    async fn test_write_partitioned_parallel_writers() {
1063        let batch = get_record_batch(None, false);
1064
1065        let multi_stream_input: Arc<dyn TableProvider> = Arc::new(
1066            MemTable::try_new(
1067                batch.schema(),
1068                vec![
1069                    vec![batch.clone()],
1070                    vec![batch.clone()],
1071                    vec![batch.clone()],
1072                ],
1073            )
1074            .unwrap(),
1075        );
1076        let multi_stream_plan =
1077            LogicalPlanBuilder::scan("source", provider_as_source(multi_stream_input), None)
1078                .unwrap()
1079                .build()
1080                .unwrap();
1081
1082        let parallel_table = DeltaTable::new_in_memory()
1083            .write(vec![])
1084            .with_save_mode(SaveMode::ErrorIfExists)
1085            .with_input_plan(multi_stream_plan)
1086            .with_partition_columns(["modified"])
1087            .await
1088            .unwrap();
1089
1090        let single_writer_table = DeltaTable::new_in_memory()
1091            .write(vec![batch.clone(), batch.clone(), batch.clone()])
1092            .with_save_mode(SaveMode::ErrorIfExists)
1093            .with_partition_columns(["modified"])
1094            .await
1095            .unwrap();
1096
1097        let parallel_data = get_data_sorted(&parallel_table, "modified, id, value").await;
1098        let single_writer_data = get_data_sorted(&single_writer_table, "modified, id, value").await;
1099        assert_eq!(parallel_data, single_writer_data);
1100
1101        let parallel_files = parallel_table.snapshot().unwrap().log_data().num_files();
1102        let single_writer_files = single_writer_table
1103            .snapshot()
1104            .unwrap()
1105            .log_data()
1106            .num_files();
1107        assert_eq!(parallel_files, single_writer_files);
1108        assert_eq!(parallel_files, 2);
1109
1110        let parallel_write_metrics: WriteMetrics = get_write_metrics(&parallel_table).await;
1111        let single_writer_metrics: WriteMetrics = get_write_metrics(&single_writer_table).await;
1112        assert_eq!(
1113            parallel_write_metrics.num_added_files,
1114            single_writer_metrics.num_added_files
1115        );
1116        assert_eq!(parallel_write_metrics.num_added_files, 2);
1117    }
1118
1119    #[tokio::test]
1120    async fn test_write_partitioned_parallel_writers_error_propagation() {
1121        let batch = get_record_batch(None, false);
1122
1123        let schema: StructType = serde_json::from_value(json!({
1124            "type": "struct",
1125            "fields": [
1126                {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1127                {"name": "value", "type": "integer", "nullable": true, "metadata": {
1128                    "delta.invariants": "{\"expression\": { \"expression\": \"value < 6\"} }"
1129                }},
1130                {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1131            ]
1132        }))
1133        .unwrap();
1134
1135        let table = DeltaTable::new_in_memory()
1136            .create()
1137            .with_save_mode(SaveMode::ErrorIfExists)
1138            .with_columns(schema.fields().cloned())
1139            .with_partition_columns(["modified"])
1140            .await
1141            .unwrap();
1142
1143        let multi_stream_input: Arc<dyn TableProvider> = Arc::new(
1144            MemTable::try_new(
1145                batch.schema(),
1146                vec![
1147                    vec![batch.clone()],
1148                    vec![batch.clone()],
1149                    vec![batch.clone()],
1150                ],
1151            )
1152            .unwrap(),
1153        );
1154        let multi_stream_plan =
1155            LogicalPlanBuilder::scan("source", provider_as_source(multi_stream_input), None)
1156                .unwrap()
1157                .build()
1158                .unwrap();
1159
1160        let result = table
1161            .write(vec![])
1162            .with_save_mode(SaveMode::Append)
1163            .with_input_plan(multi_stream_plan)
1164            .await;
1165
1166        assert!(
1167            result.is_err(),
1168            "write should fail when invariant is violated in parallel writers"
1169        );
1170    }
1171
1172    #[tokio::test]
1173    async fn test_merge_schema() {
1174        let batch = get_record_batch(None, false);
1175        let table = DeltaTable::new_in_memory()
1176            .write(vec![batch.clone()])
1177            .with_save_mode(SaveMode::ErrorIfExists)
1178            .await
1179            .unwrap();
1180        assert_eq!(table.version(), Some(0));
1181
1182        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1183        assert_common_write_metrics(write_metrics);
1184
1185        let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1186        for field in batch.schema().fields() {
1187            if field.name() != "modified" {
1188                new_schema_builder.push(field.clone());
1189            }
1190        }
1191        new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1192        let new_schema = new_schema_builder.finish();
1193        let new_fields = new_schema.fields();
1194        let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1195        assert_eq!(new_names, vec!["id", "value", "inserted_by"]);
1196        let inserted_by = StringArray::from(vec![
1197            Some("A1"),
1198            Some("B1"),
1199            None,
1200            Some("B2"),
1201            Some("A3"),
1202            Some("A4"),
1203            None,
1204            None,
1205            Some("B4"),
1206            Some("A5"),
1207            Some("A7"),
1208        ]);
1209        let new_batch = RecordBatch::try_new(
1210            Arc::new(new_schema),
1211            vec![
1212                Arc::new(batch.column_by_name("id").unwrap().clone()),
1213                Arc::new(batch.column_by_name("value").unwrap().clone()),
1214                Arc::new(inserted_by),
1215            ],
1216        )
1217        .unwrap();
1218
1219        let mut table = table
1220            .write(vec![new_batch])
1221            .with_save_mode(SaveMode::Append)
1222            .with_schema_mode(SchemaMode::Merge)
1223            .await
1224            .unwrap();
1225        table.load().await.unwrap();
1226        assert_eq!(table.version(), Some(1));
1227        let new_schema = table.snapshot().unwrap().metadata().parse_schema().unwrap();
1228        let fields = new_schema.fields();
1229        let names = fields.map(|f| f.name()).collect::<Vec<_>>();
1230        assert_eq!(names, vec!["id", "value", "modified", "inserted_by"]);
1231
1232        // <https://github.com/delta-io/delta-rs/issues/2925>
1233        let metadata = table
1234            .snapshot()
1235            .expect("Failed to retrieve updated snapshot")
1236            .metadata();
1237        assert_ne!(
1238            None,
1239            metadata.created_time(),
1240            "Created time should be the milliseconds since epoch of when the action was created"
1241        );
1242
1243        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1244        assert_common_write_metrics(write_metrics);
1245    }
1246
1247    #[tokio::test]
1248    async fn test_merge_schema_with_partitions() {
1249        let batch = get_record_batch(None, false);
1250        let table = DeltaTable::new_in_memory()
1251            .write(vec![batch.clone()])
1252            .with_partition_columns(vec!["id", "value"])
1253            .with_save_mode(SaveMode::ErrorIfExists)
1254            .await
1255            .unwrap();
1256        assert_eq!(table.version(), Some(0));
1257
1258        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1259        assert_common_write_metrics(write_metrics);
1260
1261        let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1262        for field in batch.schema().fields() {
1263            if field.name() != "modified" {
1264                new_schema_builder.push(field.clone());
1265            }
1266        }
1267        new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1268        let new_schema = new_schema_builder.finish();
1269        let new_fields = new_schema.fields();
1270        let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1271        assert_eq!(new_names, vec!["id", "value", "inserted_by"]);
1272        let inserted_by = StringArray::from(vec![
1273            Some("A1"),
1274            Some("B1"),
1275            None,
1276            Some("B2"),
1277            Some("A3"),
1278            Some("A4"),
1279            None,
1280            None,
1281            Some("B4"),
1282            Some("A5"),
1283            Some("A7"),
1284        ]);
1285        let new_batch = RecordBatch::try_new(
1286            Arc::new(new_schema),
1287            vec![
1288                Arc::new(batch.column_by_name("id").unwrap().clone()),
1289                Arc::new(batch.column_by_name("value").unwrap().clone()),
1290                Arc::new(inserted_by),
1291            ],
1292        )
1293        .unwrap();
1294        let table = table
1295            .write(vec![new_batch])
1296            .with_save_mode(SaveMode::Append)
1297            .with_schema_mode(SchemaMode::Merge)
1298            .await
1299            .unwrap();
1300
1301        assert_eq!(table.version(), Some(1));
1302        let new_schema = table.snapshot().unwrap().metadata().parse_schema().unwrap();
1303        let fields = new_schema.fields();
1304        let mut names = fields.map(|f| f.name()).collect::<Vec<_>>();
1305        names.sort();
1306        assert_eq!(names, vec!["id", "inserted_by", "modified", "value"]);
1307        let part_cols = table.snapshot().unwrap().metadata().partition_columns();
1308        assert_eq!(part_cols, ["id".to_string(), "value".to_string()]); // we want to preserve partitions
1309
1310        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1311        assert_common_write_metrics(write_metrics);
1312    }
1313
1314    #[tokio::test]
1315    async fn test_merge_schema_with_partitions_allows_source_missing_partition_column() -> TestResult
1316    {
1317        let batch = get_record_batch(None, false);
1318        let table = modified_partitioned_table(&batch).await?;
1319
1320        let evolved_schema = Arc::new(ArrowSchema::new(vec![
1321            batch.schema().field(0).as_ref().clone(),
1322            batch.schema().field(1).as_ref().clone(),
1323            Field::new("inserted_by", DataType::Utf8, true),
1324        ]));
1325        let evolved_batch = RecordBatch::try_new(
1326            evolved_schema,
1327            vec![
1328                batch.column(0).clone(),
1329                batch.column(1).clone(),
1330                Arc::new(StringArray::from(vec![
1331                    Some("A1"),
1332                    Some("B1"),
1333                    None,
1334                    Some("B2"),
1335                    Some("A3"),
1336                    Some("A4"),
1337                    None,
1338                    None,
1339                    Some("B4"),
1340                    Some("A5"),
1341                    Some("A7"),
1342                ])),
1343            ],
1344        )?;
1345
1346        let table = table
1347            .write(vec![evolved_batch])
1348            .with_save_mode(SaveMode::Append)
1349            .with_schema_mode(SchemaMode::Merge)
1350            .await?;
1351
1352        assert_eq!(table.version(), Some(1));
1353        let schema = table.snapshot().unwrap().metadata().parse_schema()?;
1354        let names = schema
1355            .fields()
1356            .map(|field| field.name())
1357            .collect::<Vec<_>>();
1358        assert_eq!(names, vec!["id", "value", "modified", "inserted_by"]);
1359        assert_eq!(
1360            table.snapshot().unwrap().metadata().partition_columns(),
1361            &vec!["modified".to_string()]
1362        );
1363
1364        Ok(())
1365    }
1366
1367    #[tokio::test]
1368    async fn test_merge_schema_preserves_existing_field_metadata() {
1369        let schema: StructType = serde_json::from_value(json!({
1370            "type": "struct",
1371            "fields": [
1372                {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1373                {"name": "value", "type": "integer", "nullable": true, "metadata": {
1374                    "delta.invariants": "{\"expression\": { \"expression\": \"value < 12\"} }",
1375                    "delta.userMetadata": "preserve-me"
1376                }},
1377                {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1378            ]
1379        }))
1380        .unwrap();
1381
1382        let table = DeltaTable::new_in_memory()
1383            .create()
1384            .with_save_mode(SaveMode::ErrorIfExists)
1385            .with_columns(schema.fields().cloned())
1386            .await
1387            .unwrap()
1388            .write(vec![get_record_batch(None, false)])
1389            .await
1390            .unwrap();
1391
1392        let batch = get_record_batch(None, false);
1393        let evolved_schema = Arc::new(ArrowSchema::new(vec![
1394            batch.schema().field(0).as_ref().clone(),
1395            batch.schema().field(1).as_ref().clone(),
1396            batch.schema().field(2).as_ref().clone(),
1397            Field::new("inserted_by", DataType::Utf8, true),
1398        ]));
1399        let evolved_batch = RecordBatch::try_new(
1400            evolved_schema,
1401            vec![
1402                batch.column(0).clone(),
1403                batch.column(1).clone(),
1404                batch.column(2).clone(),
1405                Arc::new(StringArray::from(vec![
1406                    Some("A1"),
1407                    Some("B1"),
1408                    None,
1409                    Some("B2"),
1410                    Some("A3"),
1411                    Some("A4"),
1412                    None,
1413                    None,
1414                    Some("B4"),
1415                    Some("A5"),
1416                    Some("A7"),
1417                ])),
1418            ],
1419        )
1420        .unwrap();
1421
1422        let table = table
1423            .write(vec![evolved_batch])
1424            .with_save_mode(SaveMode::Append)
1425            .with_schema_mode(SchemaMode::Merge)
1426            .await
1427            .unwrap();
1428
1429        let schema = table.snapshot().unwrap().metadata().parse_schema().unwrap();
1430        let value = schema.field("value").unwrap();
1431        assert_eq!(
1432            value.metadata.get("delta.invariants"),
1433            Some(&MetadataValue::String(
1434                "{\"expression\": { \"expression\": \"value < 12\"} }".to_string()
1435            ))
1436        );
1437        assert_eq!(
1438            value.metadata.get("delta.userMetadata"),
1439            Some(&MetadataValue::String("preserve-me".to_string()))
1440        );
1441    }
1442
1443    #[tokio::test]
1444    async fn test_overwrite_schema() {
1445        let batch = get_record_batch(None, false);
1446        let table = DeltaTable::new_in_memory()
1447            .write(vec![batch.clone()])
1448            .with_save_mode(SaveMode::ErrorIfExists)
1449            .await
1450            .unwrap();
1451        assert_eq!(table.version(), Some(0));
1452        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1453        assert_common_write_metrics(write_metrics);
1454        let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1455        for field in batch.schema().fields() {
1456            if field.name() != "modified" {
1457                new_schema_builder.push(field.clone());
1458            }
1459        }
1460        new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1461        let new_schema = new_schema_builder.finish();
1462        let new_fields = new_schema.fields();
1463        let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1464        assert_eq!(new_names, vec!["id", "value", "inserted_by"]);
1465        let inserted_by = StringArray::from(vec![
1466            Some("A1"),
1467            Some("B1"),
1468            None,
1469            Some("B2"),
1470            Some("A3"),
1471            Some("A4"),
1472            None,
1473            None,
1474            Some("B4"),
1475            Some("A5"),
1476            Some("A7"),
1477        ]);
1478        let new_batch = RecordBatch::try_new(
1479            Arc::new(new_schema),
1480            vec![
1481                Arc::new(batch.column_by_name("id").unwrap().clone()),
1482                Arc::new(batch.column_by_name("value").unwrap().clone()),
1483                Arc::new(inserted_by),
1484            ],
1485        )
1486        .unwrap();
1487
1488        let table = table
1489            .write(vec![new_batch])
1490            .with_save_mode(SaveMode::Append)
1491            .with_schema_mode(SchemaMode::Overwrite)
1492            .await;
1493        assert!(table.is_err());
1494    }
1495
1496    #[tokio::test]
1497    async fn test_overwrite_schema_can_change_partition_columns_without_schema_change() -> TestResult
1498    {
1499        let batch = get_record_batch(None, false);
1500
1501        let table = modified_partitioned_table(&batch).await?;
1502
1503        assert_eq!(
1504            table.snapshot().unwrap().metadata().partition_columns(),
1505            &vec!["modified".to_string()]
1506        );
1507        let initial_num_files = table.snapshot().unwrap().log_data().num_files();
1508
1509        let table = table
1510            .write(vec![batch])
1511            .with_save_mode(SaveMode::Overwrite)
1512            .with_schema_mode(SchemaMode::Overwrite)
1513            .with_partition_columns(["id"])
1514            .await?;
1515
1516        assert_eq!(table.version(), Some(1));
1517        assert_eq!(
1518            table.snapshot().unwrap().metadata().partition_columns(),
1519            &vec!["id".to_string()]
1520        );
1521
1522        let add_paths = table
1523            .snapshot()
1524            .unwrap()
1525            .log_data()
1526            .iter()
1527            .map(|add| add.path().into_owned())
1528            .collect::<Vec<_>>();
1529        assert!(!add_paths.is_empty());
1530        assert!(add_paths.iter().all(|path| path.contains("id=")));
1531
1532        let remove_actions = latest_remove_actions(&table).await?;
1533        assert_eq!(remove_actions.len(), initial_num_files);
1534        assert!(
1535            remove_actions
1536                .iter()
1537                .all(|remove| remove.deletion_timestamp.is_some())
1538        );
1539
1540        let commit_info: Vec<_> = table.history(Some(1)).await?.collect();
1541        let operation_parameters = commit_info[0].operation_parameters.as_ref().unwrap();
1542        assert_eq!(operation_parameters["partitionBy"], json!("[\"id\"]"));
1543
1544        Ok(())
1545    }
1546
1547    #[tokio::test]
1548    async fn test_overwrite_schema_can_change_schema_and_partition_columns() -> TestResult {
1549        let batch = get_record_batch(None, false);
1550        let table = modified_partitioned_table(&batch).await?;
1551
1552        let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1553        for field in batch.schema().fields() {
1554            if field.name() != "modified" {
1555                new_schema_builder.push(field.clone());
1556            }
1557        }
1558        new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1559        let new_schema = new_schema_builder.finish();
1560        let inserted_by = StringArray::from(vec![
1561            Some("A1"),
1562            Some("B1"),
1563            None,
1564            Some("B2"),
1565            Some("A3"),
1566            Some("A4"),
1567            None,
1568            None,
1569            Some("B4"),
1570            Some("A5"),
1571            Some("A7"),
1572        ]);
1573        let new_batch = RecordBatch::try_new(
1574            Arc::new(new_schema),
1575            vec![
1576                Arc::new(batch.column_by_name("id").unwrap().clone()),
1577                Arc::new(batch.column_by_name("value").unwrap().clone()),
1578                Arc::new(inserted_by),
1579            ],
1580        )?;
1581
1582        let table = table
1583            .write(vec![new_batch])
1584            .with_save_mode(SaveMode::Overwrite)
1585            .with_schema_mode(SchemaMode::Overwrite)
1586            .with_partition_columns(["inserted_by"])
1587            .await?;
1588
1589        let schema = table.snapshot().unwrap().metadata().parse_schema()?;
1590        let names = schema
1591            .fields()
1592            .map(|field| field.name())
1593            .collect::<Vec<_>>();
1594        assert_eq!(names, vec!["id", "value", "inserted_by"]);
1595        assert_eq!(
1596            table.snapshot().unwrap().metadata().partition_columns(),
1597            &vec!["inserted_by".to_string()]
1598        );
1599
1600        let add_paths = table
1601            .snapshot()
1602            .unwrap()
1603            .log_data()
1604            .iter()
1605            .map(|add| add.path().into_owned())
1606            .collect::<Vec<_>>();
1607        assert!(!add_paths.is_empty());
1608        assert!(add_paths.iter().all(|path| path.contains("inserted_by=")));
1609
1610        Ok(())
1611    }
1612
1613    #[tokio::test]
1614    async fn test_overwrite_schema_partition_change_preserves_table_metadata() -> TestResult {
1615        let batch = get_record_batch(None, false);
1616        let table = DeltaTable::new_in_memory()
1617            .write(vec![batch.clone()])
1618            .with_partition_columns(["modified"])
1619            .with_table_name("preserve_name")
1620            .with_description("preserve_description")
1621            .await?;
1622
1623        let initial_metadata = table.snapshot().unwrap().metadata().clone();
1624        let initial_created_time = initial_metadata.created_time();
1625
1626        let table = table
1627            .write(vec![batch])
1628            .with_save_mode(SaveMode::Overwrite)
1629            .with_schema_mode(SchemaMode::Overwrite)
1630            .with_partition_columns(["id"])
1631            .await?;
1632
1633        let metadata = table.snapshot().unwrap().metadata();
1634        assert_eq!(metadata.id(), initial_metadata.id());
1635        assert_eq!(metadata.name(), Some("preserve_name"));
1636        assert_eq!(metadata.description(), Some("preserve_description"));
1637        assert_eq!(metadata.created_time(), initial_created_time);
1638        assert_eq!(metadata.partition_columns(), &vec!["id".to_string()]);
1639
1640        Ok(())
1641    }
1642
1643    #[tokio::test]
1644    async fn test_overwrite_schema_can_remove_partition_columns() -> TestResult {
1645        let batch = get_record_batch(None, false);
1646        let table = modified_partitioned_table(&batch).await?;
1647
1648        let table = table
1649            .write(vec![batch])
1650            .with_save_mode(SaveMode::Overwrite)
1651            .with_schema_mode(SchemaMode::Overwrite)
1652            .with_partition_columns(std::iter::empty::<&str>())
1653            .await?;
1654
1655        assert_eq!(table.version(), Some(1));
1656        assert!(
1657            table
1658                .snapshot()
1659                .unwrap()
1660                .metadata()
1661                .partition_columns()
1662                .is_empty()
1663        );
1664
1665        let add_paths = table
1666            .snapshot()
1667            .unwrap()
1668            .log_data()
1669            .iter()
1670            .map(|add| add.path().into_owned())
1671            .collect::<Vec<_>>();
1672        assert!(!add_paths.is_empty());
1673        assert!(add_paths.iter().all(|path| !path.contains('/')));
1674
1675        Ok(())
1676    }
1677
1678    #[tokio::test]
1679    async fn test_append_rejects_partition_column_change() -> TestResult {
1680        let batch = get_record_batch(None, false);
1681        let table = modified_partitioned_table(&batch).await?;
1682
1683        let result = table
1684            .write(vec![batch])
1685            .with_save_mode(SaveMode::Append)
1686            .with_partition_columns(["id"])
1687            .await;
1688
1689        let err = result.expect_err("append should reject partition column change");
1690        match expect_write_error(&err) {
1691            WriteError::PartitionColumnMismatch { expected, got } => {
1692                assert_eq!(expected, &vec!["modified".to_string()]);
1693                assert_eq!(got, &vec!["id".to_string()]);
1694            }
1695            other => panic!("unexpected error: {other:?}"),
1696        }
1697        Ok(())
1698    }
1699
1700    #[tokio::test]
1701    async fn test_overwrite_without_schema_overwrite_rejects_partition_column_change() -> TestResult
1702    {
1703        let batch = get_record_batch(None, false);
1704        let table = modified_partitioned_table(&batch).await?;
1705
1706        let result = table
1707            .write(vec![batch])
1708            .with_save_mode(SaveMode::Overwrite)
1709            .with_partition_columns(["id"])
1710            .await;
1711
1712        assert!(matches!(result, Err(DeltaTableError::GenericError { .. })));
1713        Ok(())
1714    }
1715
1716    #[tokio::test]
1717    async fn test_replace_where_rejects_partition_column_change() -> TestResult {
1718        let batch = get_record_batch(None, false);
1719        let table = modified_partitioned_table(&batch).await?;
1720
1721        let result = table
1722            .write(vec![batch])
1723            .with_save_mode(SaveMode::Overwrite)
1724            .with_schema_mode(SchemaMode::Overwrite)
1725            .with_partition_columns(["id"])
1726            .with_replace_where(col("id").eq(lit("A")))
1727            .await;
1728
1729        assert!(matches!(result, Err(DeltaTableError::GenericError { .. })));
1730        Ok(())
1731    }
1732
1733    #[tokio::test]
1734    async fn test_overwrite_schema_preserves_partition_columns_when_omitted() -> TestResult {
1735        let batch = get_record_batch(None, false);
1736        let table = modified_partitioned_table(&batch).await?;
1737
1738        let table = table
1739            .write(vec![batch])
1740            .with_save_mode(SaveMode::Overwrite)
1741            .with_schema_mode(SchemaMode::Overwrite)
1742            .await?;
1743
1744        assert_eq!(
1745            table.snapshot().unwrap().metadata().partition_columns(),
1746            &vec!["modified".to_string()]
1747        );
1748
1749        Ok(())
1750    }
1751
1752    #[tokio::test]
1753    async fn test_overwrite_schema_rejects_missing_new_partition_column() -> TestResult {
1754        let batch = get_record_batch(None, false);
1755        let table = modified_partitioned_table(&batch).await?;
1756
1757        let result = table
1758            .write(vec![batch])
1759            .with_save_mode(SaveMode::Overwrite)
1760            .with_schema_mode(SchemaMode::Overwrite)
1761            .with_partition_columns(["missing_partition"])
1762            .await;
1763
1764        let err = result.expect_err("missing partition column should fail");
1765        match expect_write_error(&err) {
1766            WriteError::MissingPartitionColumns { columns } => {
1767                assert_eq!(columns, &vec!["missing_partition".to_string()]);
1768            }
1769            other => panic!("unexpected error: {other:?}"),
1770        }
1771
1772        Ok(())
1773    }
1774
1775    #[tokio::test]
1776    async fn test_overwrite_check() {
1777        // If you do not pass a schema mode, we want to check the schema
1778        let batch = get_record_batch(None, false);
1779        let table = DeltaTable::new_in_memory()
1780            .write(vec![batch.clone()])
1781            .with_save_mode(SaveMode::ErrorIfExists)
1782            .await
1783            .unwrap();
1784        assert_eq!(table.version(), Some(0));
1785        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1786        assert_common_write_metrics(write_metrics);
1787
1788        let mut new_schema_builder = arrow_schema::SchemaBuilder::new();
1789
1790        new_schema_builder.push(Field::new("inserted_by", DataType::Utf8, true));
1791        let new_schema = new_schema_builder.finish();
1792        let new_fields = new_schema.fields();
1793        let new_names = new_fields.iter().map(|f| f.name()).collect::<Vec<_>>();
1794        assert_eq!(new_names, vec!["inserted_by"]);
1795        let inserted_by = StringArray::from(vec![
1796            Some("A1"),
1797            Some("B1"),
1798            None,
1799            Some("B2"),
1800            Some("A3"),
1801            Some("A4"),
1802            None,
1803            None,
1804            Some("B4"),
1805            Some("A5"),
1806            Some("A7"),
1807        ]);
1808        let new_batch =
1809            RecordBatch::try_new(Arc::new(new_schema), vec![Arc::new(inserted_by)]).unwrap();
1810
1811        let table = table
1812            .write(vec![new_batch])
1813            .with_save_mode(SaveMode::Append)
1814            .await;
1815        assert!(table.is_err());
1816    }
1817
1818    #[tokio::test]
1819    async fn test_check_invariants() {
1820        let batch = get_record_batch(None, false);
1821        let schema: StructType = serde_json::from_value(json!({
1822            "type": "struct",
1823            "fields": [
1824                {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1825                {"name": "value", "type": "integer", "nullable": true, "metadata": {
1826                    "delta.invariants": "{\"expression\": { \"expression\": \"value < 12\"} }"
1827                }},
1828                {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1829            ]
1830        }))
1831        .unwrap();
1832        let table = DeltaTable::new_in_memory()
1833            .create()
1834            .with_save_mode(SaveMode::ErrorIfExists)
1835            .with_columns(schema.fields().cloned())
1836            .await
1837            .unwrap();
1838        assert_eq!(table.version(), Some(0));
1839
1840        let table = table.write(vec![batch.clone()]).await.unwrap();
1841        assert_eq!(table.version(), Some(1));
1842        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1843        assert_common_write_metrics(write_metrics);
1844
1845        let schema: StructType = serde_json::from_value(json!({
1846            "type": "struct",
1847            "fields": [
1848                {"name": "id", "type": "string", "nullable": true, "metadata": {}},
1849                {"name": "value", "type": "integer", "nullable": true, "metadata": {
1850                    "delta.invariants": "{\"expression\": { \"expression\": \"value < 6\"} }"
1851                }},
1852                {"name": "modified", "type": "string", "nullable": true, "metadata": {}},
1853            ]
1854        }))
1855        .unwrap();
1856        let table = DeltaTable::new_in_memory()
1857            .create()
1858            .with_save_mode(SaveMode::ErrorIfExists)
1859            .with_columns(schema.fields().cloned())
1860            .await
1861            .unwrap();
1862        assert_eq!(table.version(), Some(0));
1863
1864        let table = table.write(vec![batch.clone()]).await;
1865        assert!(table.is_err());
1866    }
1867
1868    #[tokio::test]
1869    async fn test_nested_struct() {
1870        let table_schema = get_delta_schema_with_nested_struct();
1871        let batch = get_record_batch_with_nested_struct();
1872
1873        let table = DeltaTable::new_in_memory()
1874            .create()
1875            .with_columns(table_schema.fields().cloned())
1876            .await
1877            .unwrap();
1878        assert_eq!(table.version(), Some(0));
1879
1880        let table = table
1881            .write(vec![batch.clone()])
1882            .with_save_mode(SaveMode::Append)
1883            .await
1884            .unwrap();
1885        assert_eq!(table.version(), Some(1));
1886        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1887        assert_common_write_metrics(write_metrics);
1888
1889        let actual = get_data(&table).await;
1890        let expected = DataType::Struct(Fields::from(vec![Field::new(
1891            "count",
1892            DataType::Int32,
1893            true,
1894        )]));
1895        assert_eq!(
1896            actual[0].column_by_name("nested").unwrap().data_type(),
1897            &expected
1898        );
1899    }
1900
1901    #[tokio::test]
1902    async fn test_special_characters_write_read() {
1903        let tmp_dir = tempfile::tempdir().unwrap();
1904        let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap();
1905
1906        let schema = Arc::new(ArrowSchema::new(vec![
1907            Field::new("string", DataType::Utf8, true),
1908            Field::new("data", DataType::Utf8, true),
1909        ]));
1910
1911        let str_values = StringArray::from(vec![r#"$%&/()=^"[]#*?._- {=}|`<>~/\r\n+"#]);
1912        let data_values = StringArray::from(vec!["test"]);
1913
1914        let batch = RecordBatch::try_new(schema, vec![Arc::new(str_values), Arc::new(data_values)])
1915            .unwrap();
1916
1917        let ops = DeltaTable::try_from_url(
1918            ensure_table_uri(tmp_path.as_os_str().to_str().unwrap()).unwrap(),
1919        )
1920        .await
1921        .unwrap();
1922
1923        let table = ops
1924            .write([batch.clone()])
1925            .with_partition_columns(["string"])
1926            .await
1927            .unwrap();
1928        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1929        assert_common_write_metrics(write_metrics);
1930
1931        let table_uri = url::Url::from_directory_path(&tmp_path).unwrap();
1932        let table = crate::open_table(table_uri).await.unwrap();
1933        let (_table, stream) = table.scan_table().await.unwrap();
1934        let data: Vec<RecordBatch> = collect_sendable_stream(stream).await.unwrap();
1935
1936        let expected = vec![
1937            r#"+----------------------------------+------+"#,
1938            r#"| string                           | data |"#,
1939            r#"+----------------------------------+------+"#,
1940            r#"| $%&/()=^"[]#*?._- {=}|`<>~/\r\n+ | test |"#,
1941            r#"+----------------------------------+------+"#,
1942        ];
1943
1944        assert_batches_eq!(&expected, &data);
1945    }
1946
1947    #[tokio::test]
1948    async fn test_replace_where() {
1949        let schema = get_arrow_schema(&None);
1950
1951        let batch = RecordBatch::try_new(
1952            Arc::clone(&schema),
1953            vec![
1954                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
1955                Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
1956                Arc::new(arrow::array::StringArray::from(vec![
1957                    "2021-02-02",
1958                    "2021-02-03",
1959                    "2021-02-02",
1960                    "2021-02-04",
1961                ])),
1962            ],
1963        )
1964        .unwrap();
1965
1966        let table = DeltaTable::new_in_memory()
1967            .write(vec![batch])
1968            .with_save_mode(SaveMode::Append)
1969            .await
1970            .unwrap();
1971        assert_eq!(table.version(), Some(0));
1972        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1973        assert_eq!(write_metrics.num_added_rows, 4);
1974        assert_common_write_metrics(write_metrics);
1975
1976        let batch_add = RecordBatch::try_new(
1977            Arc::clone(&schema),
1978            vec![
1979                Arc::new(arrow::array::StringArray::from(vec!["C"])),
1980                Arc::new(arrow::array::Int32Array::from(vec![50])),
1981                Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])),
1982            ],
1983        )
1984        .unwrap();
1985
1986        let table = table
1987            .write(vec![batch_add])
1988            .with_save_mode(SaveMode::Overwrite)
1989            .with_replace_where(col("id").eq(lit("C")))
1990            .await
1991            .unwrap();
1992        assert_eq!(table.version(), Some(1));
1993        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
1994        assert_eq!(write_metrics.num_added_rows, 1);
1995        assert_common_write_metrics(write_metrics);
1996
1997        let expected = [
1998            "+----+-------+------------+",
1999            "| id | value | modified   |",
2000            "+----+-------+------------+",
2001            "| A  | 0     | 2021-02-02 |",
2002            "| B  | 20    | 2021-02-03 |",
2003            "| C  | 50    | 2023-01-01 |",
2004            "+----+-------+------------+",
2005        ];
2006        let actual = get_data(&table).await;
2007        assert_batches_sorted_eq!(&expected, &actual);
2008    }
2009
2010    #[tokio::test]
2011    async fn test_replace_where_fail_not_matching_predicate() {
2012        let schema = get_arrow_schema(&None);
2013        let batch = RecordBatch::try_new(
2014            Arc::clone(&schema),
2015            vec![
2016                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "C"])),
2017                Arc::new(arrow::array::Int32Array::from(vec![0, 20, 10, 100])),
2018                Arc::new(arrow::array::StringArray::from(vec![
2019                    "2021-02-02",
2020                    "2021-02-03",
2021                    "2021-02-02",
2022                    "2021-02-04",
2023                ])),
2024            ],
2025        )
2026        .unwrap();
2027
2028        let table = DeltaTable::new_in_memory()
2029            .write(vec![batch])
2030            .with_save_mode(SaveMode::Append)
2031            .await
2032            .unwrap();
2033        assert_eq!(table.version(), Some(0));
2034        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2035        assert_common_write_metrics(write_metrics);
2036
2037        // Take clones of these before an operation resulting in error, otherwise it will
2038        // be impossible to refer to an in-memory table
2039        let table_logstore = table.log_store.clone();
2040        let table_state = table.state.clone().unwrap();
2041
2042        // An attempt to write records non conforming to predicate should fail
2043        let batch_fail = RecordBatch::try_new(
2044            Arc::clone(&schema),
2045            vec![
2046                Arc::new(arrow::array::StringArray::from(vec!["D"])),
2047                Arc::new(arrow::array::Int32Array::from(vec![1000])),
2048                Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])),
2049            ],
2050        )
2051        .unwrap();
2052
2053        let table = table
2054            .write(vec![batch_fail])
2055            .with_save_mode(SaveMode::Overwrite)
2056            .with_replace_where(col("id").eq(lit("C")))
2057            .await;
2058        assert!(table.is_err());
2059
2060        // Verify that table state hasn't changed
2061        let table = DeltaTable::new_with_state(table_logstore, table_state);
2062        assert_eq!(table.get_latest_version().await.unwrap(), 0);
2063    }
2064
2065    #[tokio::test]
2066    async fn test_replace_where_no_matching_files_still_validates_input() {
2067        let schema = get_arrow_schema(&None);
2068        let batch = RecordBatch::try_new(
2069            Arc::clone(&schema),
2070            vec![
2071                Arc::new(arrow::array::StringArray::from(vec!["A", "B"])),
2072                Arc::new(arrow::array::Int32Array::from(vec![10, 20])),
2073                Arc::new(arrow::array::StringArray::from(vec![
2074                    "2021-02-02",
2075                    "2021-02-03",
2076                ])),
2077            ],
2078        )
2079        .unwrap();
2080
2081        let table = DeltaTable::new_in_memory()
2082            .write(vec![batch])
2083            .with_save_mode(SaveMode::Append)
2084            .await
2085            .unwrap();
2086        assert_eq!(table.version(), Some(0));
2087
2088        let table_logstore = table.log_store();
2089        let table_state = table.state.clone().unwrap();
2090
2091        let batch_fail = RecordBatch::try_new(
2092            Arc::clone(&schema),
2093            vec![
2094                Arc::new(arrow::array::StringArray::from(vec!["D"])),
2095                Arc::new(arrow::array::Int32Array::from(vec![1000])),
2096                Arc::new(arrow::array::StringArray::from(vec!["2023-01-01"])),
2097            ],
2098        )
2099        .unwrap();
2100
2101        let result = table
2102            .write(vec![batch_fail])
2103            .with_save_mode(SaveMode::Overwrite)
2104            .with_replace_where(col("id").eq(lit("Z")))
2105            .await;
2106        assert!(result.is_err());
2107
2108        let table = DeltaTable::new_with_state(table_logstore, table_state);
2109        assert_eq!(table.get_latest_version().await.unwrap(), 0);
2110    }
2111
2112    #[tokio::test]
2113    async fn test_write_preserves_user_insert_marker_column_outside_rewrite() {
2114        let schema = Arc::new(ArrowSchema::new(vec![
2115            Field::new("id", DataType::Utf8, true),
2116            Field::new("value", DataType::Int32, true),
2117            Field::new("modified", DataType::Utf8, true),
2118            Field::new(
2119                super::plan::WRITE_INSERT_MARKER_COLUMN,
2120                DataType::Boolean,
2121                true,
2122            ),
2123        ]));
2124
2125        let batch = RecordBatch::try_new(
2126            schema,
2127            vec![
2128                Arc::new(StringArray::from(vec![Some("A"), Some("B"), Some("C")])),
2129                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2130                Arc::new(StringArray::from(vec![
2131                    Some("2021-02-02"),
2132                    Some("2021-02-03"),
2133                    Some("2021-02-04"),
2134                ])),
2135                Arc::new(arrow::array::BooleanArray::from(vec![
2136                    Some(false),
2137                    Some(true),
2138                    Some(false),
2139                ])),
2140            ],
2141        )
2142        .unwrap();
2143
2144        let table = DeltaTable::new_in_memory()
2145            .write(vec![batch])
2146            .with_save_mode(SaveMode::Append)
2147            .await
2148            .unwrap();
2149
2150        let actual = get_data_sorted(
2151            &table,
2152            format!(
2153                "id,value,modified,{}",
2154                super::plan::WRITE_INSERT_MARKER_COLUMN
2155            )
2156            .as_str(),
2157        )
2158        .await;
2159        assert_batches_sorted_eq!(
2160            &[
2161                "+----+-------+------------+-------------------------+",
2162                "| id | value | modified   | __delta_rs_write_insert |",
2163                "+----+-------+------------+-------------------------+",
2164                "| A  | 1     | 2021-02-02 | false                   |",
2165                "| B  | 2     | 2021-02-03 | true                    |",
2166                "| C  | 3     | 2021-02-04 | false                   |",
2167                "+----+-------+------------+-------------------------+",
2168            ],
2169            &actual
2170        );
2171    }
2172
2173    #[tokio::test]
2174    async fn test_replace_where_preserves_user_insert_marker_column() {
2175        let schema = Arc::new(ArrowSchema::new(vec![
2176            Field::new("id", DataType::Utf8, true),
2177            Field::new("value", DataType::Int32, true),
2178            Field::new("modified", DataType::Utf8, true),
2179            Field::new(
2180                super::plan::WRITE_INSERT_MARKER_COLUMN,
2181                DataType::Boolean,
2182                true,
2183            ),
2184        ]));
2185
2186        let batch = RecordBatch::try_new(
2187            Arc::clone(&schema),
2188            vec![
2189                Arc::new(StringArray::from(vec![Some("A"), Some("B"), Some("C")])),
2190                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2191                Arc::new(StringArray::from(vec![
2192                    Some("2021-02-02"),
2193                    Some("2021-02-03"),
2194                    Some("2021-02-04"),
2195                ])),
2196                Arc::new(arrow::array::BooleanArray::from(vec![
2197                    Some(false),
2198                    Some(false),
2199                    Some(true),
2200                ])),
2201            ],
2202        )
2203        .unwrap();
2204
2205        let table = DeltaTable::new_in_memory()
2206            .write(vec![batch])
2207            .with_save_mode(SaveMode::Append)
2208            .await
2209            .unwrap();
2210
2211        let replacement_batch = RecordBatch::try_new(
2212            schema,
2213            vec![
2214                Arc::new(StringArray::from(vec![Some("C")])),
2215                Arc::new(Int32Array::from(vec![Some(3)])),
2216                Arc::new(StringArray::from(vec![Some("2023-01-01")])),
2217                Arc::new(arrow::array::BooleanArray::from(vec![Some(false)])),
2218            ],
2219        )
2220        .unwrap();
2221
2222        let table = table
2223            .write(vec![replacement_batch])
2224            .with_save_mode(SaveMode::Overwrite)
2225            .with_replace_where(col("value").eq(lit(3)))
2226            .await
2227            .expect("replaceWhere should preserve user columns named like internal markers");
2228
2229        let actual = get_data_sorted(
2230            &table,
2231            format!(
2232                "id,value,modified,{}",
2233                super::plan::WRITE_INSERT_MARKER_COLUMN
2234            )
2235            .as_str(),
2236        )
2237        .await;
2238        assert_batches_sorted_eq!(
2239            &[
2240                "+----+-------+------------+-------------------------+",
2241                "| id | value | modified   | __delta_rs_write_insert |",
2242                "+----+-------+------------+-------------------------+",
2243                "| A  | 1     | 2021-02-02 | false                   |",
2244                "| B  | 2     | 2021-02-03 | false                   |",
2245                "| C  | 3     | 2023-01-01 | false                   |",
2246                "+----+-------+------------+-------------------------+",
2247            ],
2248            &actual
2249        );
2250    }
2251
2252    #[tokio::test]
2253    async fn test_replace_where_merge_schema_rescues_existing_rows() -> TestResult {
2254        let base_schema = Arc::new(ArrowSchema::new(vec![
2255            Field::new("id", DataType::Utf8, true),
2256            Field::new("value", DataType::Int32, true),
2257            Field::new("modified", DataType::Utf8, true),
2258        ]));
2259        let base_batch = RecordBatch::try_new(
2260            Arc::clone(&base_schema),
2261            vec![
2262                Arc::new(StringArray::from(vec![Some("A"), Some("B"), Some("C")])),
2263                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2264                Arc::new(StringArray::from(vec![
2265                    Some("2021-02-02"),
2266                    Some("2021-02-03"),
2267                    Some("2021-02-04"),
2268                ])),
2269            ],
2270        )?;
2271
2272        let table = DeltaTable::new_in_memory()
2273            .write(vec![base_batch])
2274            .with_save_mode(SaveMode::Append)
2275            .await?;
2276
2277        let merge_schema = Arc::new(ArrowSchema::new(vec![
2278            Field::new("id", DataType::Utf8, true),
2279            Field::new("value", DataType::Int32, true),
2280            Field::new("modified", DataType::Utf8, true),
2281            Field::new("inserted_by", DataType::Utf8, true),
2282        ]));
2283        let replacement_batch = RecordBatch::try_new(
2284            merge_schema,
2285            vec![
2286                Arc::new(StringArray::from(vec![Some("C")])),
2287                Arc::new(Int32Array::from(vec![Some(3)])),
2288                Arc::new(StringArray::from(vec![Some("2023-01-01")])),
2289                Arc::new(StringArray::from(vec![Some("rewrite")])),
2290            ],
2291        )?;
2292
2293        let table = table
2294            .write(vec![replacement_batch])
2295            .with_save_mode(SaveMode::Overwrite)
2296            .with_schema_mode(SchemaMode::Merge)
2297            .with_replace_where(col("value").eq(lit(3)))
2298            .await?;
2299
2300        let actual = get_data_sorted(&table, "id,value,modified,inserted_by").await;
2301        assert_batches_sorted_eq!(
2302            &[
2303                "+----+-------+------------+-------------+",
2304                "| id | value | modified   | inserted_by |",
2305                "+----+-------+------------+-------------+",
2306                "| A  | 1     | 2021-02-02 |             |",
2307                "| B  | 2     | 2021-02-03 |             |",
2308                "| C  | 3     | 2023-01-01 | rewrite     |",
2309                "+----+-------+------------+-------------+",
2310            ],
2311            &actual
2312        );
2313
2314        Ok(())
2315    }
2316
2317    fn mixed_case_replace_where_batches() -> TestResult<(Arc<ArrowSchema>, RecordBatch, RecordBatch)>
2318    {
2319        let schema = Arc::new(ArrowSchema::new(vec![
2320            Field::new("utcDate", DataType::Utf8, true),
2321            Field::new("homeTeam", DataType::Utf8, true),
2322            Field::new("score", DataType::Utf8, true),
2323        ]));
2324        let base_batch = RecordBatch::try_new(
2325            Arc::clone(&schema),
2326            vec![
2327                Arc::new(StringArray::from(vec![
2328                    Some("2008-08-16T15:00:00Z"),
2329                    Some("2009-05-16T15:00:00Z"),
2330                ])),
2331                Arc::new(StringArray::from(vec![Some("Everton"), Some("Everton")])),
2332                Arc::new(StringArray::from(vec![Some("0-1"), Some("3-1")])),
2333            ],
2334        )?;
2335        let replacement_batch = RecordBatch::try_new(
2336            Arc::clone(&schema),
2337            vec![
2338                Arc::new(StringArray::from(vec![Some("2010-01-01T15:00:00Z")])),
2339                Arc::new(StringArray::from(vec![Some("Everton")])),
2340                Arc::new(StringArray::from(vec![Some("0-1")])),
2341            ],
2342        )?;
2343
2344        Ok((schema, base_batch, replacement_batch))
2345    }
2346
2347    #[tokio::test]
2348    async fn test_replace_where_preserves_mixed_case_columns_when_rescuing_rows() -> TestResult {
2349        let (_, base_batch, replacement_batch) = mixed_case_replace_where_batches()?;
2350
2351        let table = DeltaTable::new_in_memory()
2352            .write(vec![base_batch])
2353            .with_save_mode(SaveMode::Append)
2354            .await?;
2355
2356        let table = table
2357            .write(vec![replacement_batch])
2358            .with_save_mode(SaveMode::Overwrite)
2359            .with_schema_mode(SchemaMode::Overwrite)
2360            .with_replace_where(col("score").eq(lit("0-1")))
2361            .await?;
2362
2363        let actual = get_data_sorted(&table, r#""utcDate","homeTeam",score"#).await;
2364        assert_batches_sorted_eq!(
2365            &[
2366                "+----------------------+----------+-------+",
2367                "| utcDate              | homeTeam | score |",
2368                "+----------------------+----------+-------+",
2369                "| 2009-05-16T15:00:00Z | Everton  | 3-1   |",
2370                "| 2010-01-01T15:00:00Z | Everton  | 0-1   |",
2371                "+----------------------+----------+-------+",
2372            ],
2373            &actual
2374        );
2375
2376        Ok(())
2377    }
2378
2379    #[tokio::test]
2380    async fn test_replace_where_preserves_live_rows_with_deletion_vectors() -> TestResult {
2381        let (_temp_dir, table) = open_copied_table_fixture(
2382            &crate::test_utils::TestTables::WithDvSmall.as_path(),
2383            "table-with-dv-small",
2384        )
2385        .await?;
2386
2387        let source_files = table
2388            .get_active_add_actions_by_partitions(&[])
2389            .try_collect::<Vec<_>>()
2390            .await?;
2391        assert_eq!(source_files.len(), 1);
2392        let source_path = source_files[0].path().to_string();
2393        let source_deletion_vector = source_files[0].deletion_vector_descriptor();
2394        assert!(
2395            source_deletion_vector.is_some(),
2396            "expected DV-backed source file"
2397        );
2398        assert_eq!(
2399            query_i32_rows(&table, "SELECT value FROM test ORDER BY value", "value").await?,
2400            vec![1, 2, 3, 4, 5, 6, 7, 8]
2401        );
2402
2403        let replacement_batch = RecordBatch::try_new(
2404            Arc::new(ArrowSchema::new(vec![Field::new(
2405                "value",
2406                DataType::Int32,
2407                true,
2408            )])),
2409            vec![Arc::new(Int32Array::from(vec![Some(50)]))],
2410        )?;
2411
2412        let table = table
2413            .write(vec![replacement_batch])
2414            .with_save_mode(SaveMode::Overwrite)
2415            .with_replace_where("value = 5 OR value = 50")
2416            .await?;
2417        assert_eq!(table.version(), Some(2));
2418
2419        assert_eq!(
2420            query_i32_rows(&table, "SELECT value FROM test ORDER BY value", "value").await?,
2421            vec![1, 2, 3, 4, 6, 7, 8, 50]
2422        );
2423
2424        let remove_actions = latest_remove_actions(&table).await?;
2425
2426        assert_eq!(remove_actions.len(), 1);
2427        let remove = &remove_actions[0];
2428        assert_eq!(remove.path, source_path);
2429        assert_eq!(remove.deletion_vector, source_deletion_vector);
2430
2431        Ok(())
2432    }
2433
2434    #[tokio::test]
2435    async fn test_replace_where_rewrites_multiple_files_with_deletion_vectors() -> TestResult {
2436        let (_temp_dir, table) = open_copied_table_fixture(
2437            &crate::test_utils::TestTables::WithDvSmall.as_path(),
2438            "table-with-dv-small",
2439        )
2440        .await?;
2441
2442        let source_files = table
2443            .get_active_add_actions_by_partitions(&[])
2444            .try_collect::<Vec<_>>()
2445            .await?;
2446        assert_eq!(source_files.len(), 1);
2447        let dv_source = source_files
2448            .into_iter()
2449            .next()
2450            .expect("expected DV-backed source file");
2451        assert!(
2452            dv_source.deletion_vector_descriptor().is_some(),
2453            "expected DV-backed source file"
2454        );
2455
2456        let append_batch = RecordBatch::try_new(
2457            Arc::new(ArrowSchema::new(vec![Field::new(
2458                "value",
2459                DataType::Int32,
2460                true,
2461            )])),
2462            vec![Arc::new(Int32Array::from(vec![Some(0), Some(9)]))],
2463        )?;
2464
2465        let table = table
2466            .write(vec![append_batch])
2467            .with_save_mode(SaveMode::Append)
2468            .await?;
2469
2470        let source_files = table
2471            .get_active_add_actions_by_partitions(&[])
2472            .try_collect::<Vec<_>>()
2473            .await?;
2474        assert_eq!(source_files.len(), 2);
2475        let appended_source = source_files
2476            .iter()
2477            .find(|file| file.path() != dv_source.path())
2478            .expect("expected appended source file");
2479        assert!(
2480            appended_source.deletion_vector_descriptor().is_none(),
2481            "expected appended source without DV metadata"
2482        );
2483
2484        let replacement_batch = RecordBatch::try_new(
2485            Arc::new(ArrowSchema::new(vec![Field::new(
2486                "value",
2487                DataType::Int32,
2488                true,
2489            )])),
2490            vec![Arc::new(Int32Array::from(vec![Some(50)]))],
2491        )?;
2492
2493        let table = table
2494            .write(vec![replacement_batch])
2495            .with_save_mode(SaveMode::Overwrite)
2496            .with_replace_where("value >= 5")
2497            .await?;
2498        assert_eq!(table.version(), Some(3));
2499
2500        assert_eq!(
2501            query_i32_rows(&table, "SELECT value FROM test ORDER BY value", "value").await?,
2502            vec![0, 1, 2, 3, 4, 50]
2503        );
2504
2505        let remove_actions = latest_remove_actions(&table).await?;
2506
2507        assert_eq!(remove_actions.len(), 2);
2508        assert!(
2509            remove_actions.iter().any(|remove| {
2510                remove.path == dv_source.path()
2511                    && remove.deletion_vector == dv_source.deletion_vector_descriptor()
2512            }),
2513            "expected tombstone for DV-backed source file"
2514        );
2515        assert!(
2516            remove_actions.iter().any(|remove| {
2517                remove.path == appended_source.path() && remove.deletion_vector.is_none()
2518            }),
2519            "expected tombstone for appended non-DV source file"
2520        );
2521
2522        Ok(())
2523    }
2524
2525    #[tokio::test]
2526    async fn test_replace_where_real_world_deletion_logs_preserve_live_rows() -> TestResult {
2527        let fixture_path = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
2528            .join("../test/tests/data/table_with_deletion_logs");
2529        let (_temp_dir, table) =
2530            open_copied_table_fixture(&fixture_path, "table_with_deletion_logs").await?;
2531
2532        let source_files = table
2533            .get_active_add_actions_by_partitions(&[])
2534            .try_collect::<Vec<_>>()
2535            .await?;
2536        let dv_sources = source_files
2537            .iter()
2538            .filter_map(|file| {
2539                file.deletion_vector_descriptor()
2540                    .map(|descriptor| (file.path().to_string(), descriptor))
2541            })
2542            .collect::<std::collections::HashMap<_, _>>();
2543        assert!(
2544            !dv_sources.is_empty(),
2545            "expected at least one active DV-backed source file"
2546        );
2547
2548        let initial_counts = query_single_i64_row(
2549            &table,
2550            "SELECT \
2551                SUM(CASE WHEN id < 100 THEN 1 ELSE 0 END) AS matching_rows, \
2552                SUM(CASE WHEN id >= 100 THEN 1 ELSE 0 END) AS preserved_rows \
2553             FROM test",
2554        )
2555        .await?;
2556        let matching_rows = initial_counts[0];
2557        let preserved_rows = initial_counts[1];
2558        assert!(matching_rows > 0, "expected fixture rows matching id < 100");
2559        assert!(preserved_rows > 0, "expected fixture rows with id >= 100");
2560
2561        let replacement_batch = RecordBatch::try_new(
2562            Arc::new(ArrowSchema::new(vec![
2563                Field::new("address", DataType::Utf8, true),
2564                Field::new("age", DataType::Float64, true),
2565                Field::new("company", DataType::Utf8, true),
2566                Field::new("id", DataType::Int64, true),
2567                Field::new("name", DataType::Utf8, true),
2568                Field::new("nbr", DataType::Int64, true),
2569                Field::new("phone_number", DataType::Utf8, true),
2570            ])),
2571            vec![
2572                Arc::new(StringArray::from(vec![Some("Replacement Ave")])),
2573                Arc::new(Float64Array::from(vec![Some(42.0)])),
2574                Arc::new(StringArray::from(vec![Some("delta-rs")])),
2575                Arc::new(Int64Array::from(vec![Some(42)])),
2576                Arc::new(StringArray::from(vec![Some("replacement")])),
2577                Arc::new(Int64Array::from(vec![Some(4242)])),
2578                Arc::new(StringArray::from(vec![Some("555-4242")])),
2579            ],
2580        )?;
2581
2582        let table = table
2583            .write(vec![replacement_batch])
2584            .with_save_mode(SaveMode::Overwrite)
2585            .with_replace_where("id < 100")
2586            .await?;
2587
2588        let final_counts = query_single_i64_row(
2589            &table,
2590            "SELECT \
2591                COUNT(*) AS total_rows, \
2592                SUM(CASE WHEN id < 100 THEN 1 ELSE 0 END) AS matching_rows, \
2593                SUM(CASE WHEN id >= 100 THEN 1 ELSE 0 END) AS preserved_rows, \
2594                SUM(CASE WHEN id = 42 AND name = 'replacement' THEN 1 ELSE 0 END) AS replacement_rows \
2595             FROM test",
2596        )
2597        .await?;
2598
2599        assert_eq!(final_counts[0], preserved_rows + 1);
2600        assert_eq!(final_counts[1], 1);
2601        assert_eq!(final_counts[2], preserved_rows);
2602        assert_eq!(final_counts[3], 1);
2603
2604        let remove_actions = latest_remove_actions(&table).await?;
2605
2606        assert!(
2607            remove_actions.iter().any(|remove| {
2608                dv_sources
2609                    .get(&remove.path)
2610                    .is_some_and(|descriptor| remove.deletion_vector.as_ref() == Some(descriptor))
2611            }),
2612            "expected at least one DV-backed tombstone preserving its deletion vector"
2613        );
2614
2615        Ok(())
2616    }
2617
2618    #[tokio::test]
2619    async fn test_overwrite_without_files_is_rejected() -> TestResult {
2620        let temp_dir = tempfile::tempdir()?;
2621        let table_path = temp_dir.path().join("without_files_overwrite");
2622        std::fs::create_dir(&table_path)?;
2623        let table_uri = ensure_table_uri(table_path.to_str().unwrap())?;
2624
2625        DeltaTable::try_from_url(table_uri.clone())
2626            .await?
2627            .write(vec![get_record_batch(None, false)])
2628            .await?;
2629
2630        let table = crate::DeltaTableBuilder::from_url(table_uri)?
2631            .without_files()
2632            .load()
2633            .await?;
2634
2635        assert_eq!(table.version(), Some(0));
2636
2637        // Phase 3 now routes overwrite planning through matched-file discovery, so this guard
2638        // stays covered here to ensure we still fail before any rewrite planning starts.
2639        let err = table
2640            .write(vec![get_record_batch(None, false)])
2641            .with_save_mode(SaveMode::Overwrite)
2642            .await
2643            .expect_err("overwrite should fail when table was loaded without files");
2644
2645        assert!(matches!(
2646            err,
2647            DeltaTableError::NotInitializedWithFiles(operation) if operation == "WRITE"
2648        ));
2649
2650        Ok(())
2651    }
2652
2653    #[tokio::test]
2654    async fn test_replace_where_partitioned() {
2655        let schema = get_arrow_schema(&None);
2656
2657        let batch = get_record_batch(None, false);
2658
2659        let table = DeltaTable::new_in_memory()
2660            .write(vec![batch])
2661            .with_partition_columns(["id", "value"])
2662            .with_save_mode(SaveMode::Append)
2663            .await
2664            .unwrap();
2665        assert_eq!(table.version(), Some(0));
2666        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2667        assert_common_write_metrics(write_metrics);
2668
2669        let batch_add = RecordBatch::try_new(
2670            Arc::clone(&schema),
2671            vec![
2672                Arc::new(arrow::array::StringArray::from(vec!["A", "A", "A"])),
2673                Arc::new(arrow::array::Int32Array::from(vec![11, 13, 15])),
2674                Arc::new(arrow::array::StringArray::from(vec![
2675                    "2024-02-02",
2676                    "2024-02-02",
2677                    "2024-02-01",
2678                ])),
2679            ],
2680        )
2681        .unwrap();
2682
2683        let table = table
2684            .write(vec![batch_add])
2685            .with_save_mode(SaveMode::Overwrite)
2686            .with_replace_where(col("id").eq(lit("A")))
2687            .await
2688            .unwrap();
2689        assert_eq!(table.version(), Some(1));
2690        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2691        assert_eq!(write_metrics.num_added_rows, 3);
2692        assert_common_write_metrics(write_metrics);
2693
2694        let expected = [
2695            "+----+-------+------------+",
2696            "| id | value | modified   |",
2697            "+----+-------+------------+",
2698            "| A  | 11    | 2024-02-02 |",
2699            "| A  | 13    | 2024-02-02 |",
2700            "| A  | 15    | 2024-02-01 |",
2701            "| B  | 2     | 2021-02-02 |",
2702            "| B  | 4     | 2021-02-01 |",
2703            "| B  | 8     | 2021-02-01 |",
2704            "| B  | 9     | 2021-02-01 |",
2705            "+----+-------+------------+",
2706        ];
2707        let actual = get_data_sorted(&table, "id,value,modified").await;
2708        assert_batches_sorted_eq!(&expected, &actual);
2709    }
2710
2711    #[tokio::test]
2712    async fn test_dont_write_cdc_with_overwrite() -> TestResult {
2713        let delta_schema = TestSchemas::simple();
2714        let table: DeltaTable = DeltaTable::new_in_memory()
2715            .create()
2716            .with_columns(delta_schema.fields().cloned())
2717            .with_partition_columns(["id"])
2718            .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2719            .await
2720            .unwrap();
2721        assert_eq!(table.version(), Some(0));
2722
2723        let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
2724
2725        let batch = RecordBatch::try_new(
2726            Arc::clone(&schema),
2727            vec![
2728                Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
2729                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2730                Arc::new(StringArray::from(vec![
2731                    Some("yes"),
2732                    Some("yes"),
2733                    Some("no"),
2734                ])),
2735            ],
2736        )
2737        .unwrap();
2738
2739        let second_batch = RecordBatch::try_new(
2740            Arc::clone(&schema),
2741            vec![
2742                Arc::new(StringArray::from(vec![Some("3")])),
2743                Arc::new(Int32Array::from(vec![Some(10)])),
2744                Arc::new(StringArray::from(vec![Some("yes")])),
2745            ],
2746        )
2747        .unwrap();
2748
2749        let table = table
2750            .write(vec![batch])
2751            .await
2752            .expect("Failed to write first batch");
2753        assert_eq!(table.version(), Some(1));
2754        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2755        assert_eq!(write_metrics.num_added_rows, 3);
2756        assert_common_write_metrics(write_metrics);
2757
2758        let table = table
2759            .write([second_batch])
2760            .with_save_mode(crate::protocol::SaveMode::Overwrite)
2761            .await
2762            .unwrap();
2763        assert_eq!(table.version(), Some(2));
2764        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2765        assert_eq!(write_metrics.num_added_rows, 1);
2766        assert!(write_metrics.num_removed_files > 0);
2767        assert_common_write_metrics(write_metrics);
2768
2769        let snapshot_bytes = table
2770            .log_store
2771            .read_commit_entry(2)
2772            .await?
2773            .expect("failed to get snapshot bytes");
2774        let version_actions = get_actions(2, &snapshot_bytes)?;
2775
2776        let cdc_actions = version_actions
2777            .iter()
2778            .filter(|action| matches!(action, &&Action::Cdc(_)))
2779            .collect_vec();
2780        assert!(cdc_actions.is_empty());
2781        Ok(())
2782    }
2783
2784    #[tokio::test]
2785    async fn test_dont_write_cdc_with_overwrite_predicate_partitioned() -> TestResult {
2786        let delta_schema = TestSchemas::simple();
2787        let table: DeltaTable = DeltaTable::new_in_memory()
2788            .create()
2789            .with_columns(delta_schema.fields().cloned())
2790            .with_partition_columns(["id"])
2791            .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2792            .await
2793            .unwrap();
2794        assert_eq!(table.version(), Some(0));
2795
2796        let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
2797
2798        let batch = RecordBatch::try_new(
2799            Arc::clone(&schema),
2800            vec![
2801                Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
2802                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2803                Arc::new(StringArray::from(vec![
2804                    Some("yes"),
2805                    Some("yes"),
2806                    Some("no"),
2807                ])),
2808            ],
2809        )
2810        .unwrap();
2811
2812        let second_batch = RecordBatch::try_new(
2813            Arc::clone(&schema),
2814            vec![
2815                Arc::new(StringArray::from(vec![Some("3")])),
2816                Arc::new(Int32Array::from(vec![Some(10)])),
2817                Arc::new(StringArray::from(vec![Some("yes")])),
2818            ],
2819        )
2820        .unwrap();
2821
2822        let table = table
2823            .write(vec![batch])
2824            .await
2825            .expect("Failed to write first batch");
2826        assert_eq!(table.version(), Some(1));
2827        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2828        assert_eq!(write_metrics.num_added_rows, 3);
2829        assert_common_write_metrics(write_metrics);
2830
2831        let table = table
2832            .write([second_batch])
2833            .with_save_mode(crate::protocol::SaveMode::Overwrite)
2834            .with_replace_where("id='3'")
2835            .await
2836            .unwrap();
2837        assert_eq!(table.version(), Some(2));
2838        let write_metrics: WriteMetrics = get_write_metrics(&table).await;
2839        assert_eq!(write_metrics.num_added_rows, 1);
2840        assert!(write_metrics.num_removed_files > 0);
2841        assert_common_write_metrics(write_metrics);
2842
2843        let snapshot_bytes = table
2844            .log_store
2845            .read_commit_entry(2)
2846            .await?
2847            .expect("failed to get snapshot bytes");
2848        let version_actions = get_actions(2, &snapshot_bytes)?;
2849
2850        let cdc_actions = version_actions
2851            .iter()
2852            .filter(|action| matches!(action, &&Action::Cdc(_)))
2853            .collect_vec();
2854        assert!(cdc_actions.is_empty());
2855        Ok(())
2856    }
2857
2858    #[tokio::test]
2859    async fn test_dont_write_cdc_with_overwrite_predicate_unpartitioned() -> TestResult {
2860        let delta_schema = TestSchemas::simple();
2861        let table: DeltaTable = DeltaTable::new_in_memory()
2862            .create()
2863            .with_columns(delta_schema.fields().cloned())
2864            .with_partition_columns(["id"])
2865            .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2866            .await
2867            .unwrap();
2868        assert_eq!(table.version(), Some(0));
2869
2870        let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
2871
2872        let batch = RecordBatch::try_new(
2873            Arc::clone(&schema),
2874            vec![
2875                Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
2876                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
2877                Arc::new(StringArray::from(vec![
2878                    Some("yes"),
2879                    Some("yes"),
2880                    Some("no"),
2881                ])),
2882            ],
2883        )
2884        .unwrap();
2885
2886        let second_batch = RecordBatch::try_new(
2887            Arc::clone(&schema),
2888            vec![
2889                Arc::new(StringArray::from(vec![Some("3")])),
2890                Arc::new(Int32Array::from(vec![Some(3)])),
2891                Arc::new(StringArray::from(vec![Some("yes")])),
2892            ],
2893        )
2894        .unwrap();
2895
2896        let table = table
2897            .write(vec![batch])
2898            .await
2899            .expect("Failed to write first batch");
2900        assert_eq!(table.version(), Some(1));
2901
2902        let table = table
2903            .write([second_batch])
2904            .with_save_mode(crate::protocol::SaveMode::Overwrite)
2905            .with_replace_where("value=3")
2906            .await
2907            .unwrap();
2908        assert_eq!(table.version(), Some(2));
2909
2910        let ctx = SessionContext::new();
2911        let cdf_scan = table
2912            .clone()
2913            .scan_cdf()
2914            .with_starting_version(0)
2915            .build(&ctx.state(), None)
2916            .await
2917            .expect("Failed to load CDF");
2918
2919        let mut batches = collect(cdf_scan, ctx.state().task_ctx())
2920            .await
2921            .expect("Failed to collect batches");
2922
2923        // The batches will contain a current _commit_timestamp which shouldn't be check_append_only
2924        let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect();
2925
2926        assert_batches_sorted_eq! {[
2927            "+-------+----------+----+--------------+-----------------+",
2928            "| value | modified | id | _change_type | _commit_version |",
2929            "+-------+----------+----+--------------+-----------------+",
2930            "| 1     | yes      | 1  | insert       | 1               |",
2931            "| 2     | yes      | 2  | insert       | 1               |",
2932            "| 3     | no       | 3  | delete       | 2               |",
2933            "| 3     | no       | 3  | insert       | 1               |",
2934            "| 3     | yes      | 3  | insert       | 2               |",
2935            "+-------+----------+----+--------------+-----------------+",
2936        ], &batches }
2937
2938        let snapshot_bytes = table
2939            .log_store
2940            .read_commit_entry(2)
2941            .await?
2942            .expect("failed to get snapshot bytes");
2943        let version_actions = get_actions(2, &snapshot_bytes)?;
2944
2945        let cdc_actions = version_actions
2946            .iter()
2947            .filter(|action| matches!(action, &&Action::Cdc(_)))
2948            .collect_vec();
2949        assert!(!cdc_actions.is_empty());
2950        Ok(())
2951    }
2952
2953    #[tokio::test]
2954    async fn test_write_cdc_with_replace_where_preserves_mixed_case_columns() -> TestResult {
2955        let (schema, base_batch, replacement_batch) = mixed_case_replace_where_batches()?;
2956        let delta_schema: StructType = Arc::clone(&schema).try_into_kernel()?;
2957        let table: DeltaTable = DeltaTable::new_in_memory()
2958            .create()
2959            .with_columns(delta_schema.fields().cloned())
2960            .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
2961            .await?;
2962        assert_eq!(table.version(), Some(0));
2963
2964        let table = table.write(vec![base_batch]).await?;
2965        assert_eq!(table.version(), Some(1));
2966
2967        let table = table
2968            .write(vec![replacement_batch])
2969            .with_save_mode(crate::protocol::SaveMode::Overwrite)
2970            .with_replace_where("score='0-1'")
2971            .await?;
2972        assert_eq!(table.version(), Some(2));
2973
2974        let ctx = SessionContext::new();
2975        let cdf_scan = table
2976            .clone()
2977            .scan_cdf()
2978            .with_starting_version(0)
2979            .build(&ctx.state(), None)
2980            .await
2981            .expect("Failed to load CDF");
2982        let mut batches = collect(cdf_scan, ctx.state().task_ctx())
2983            .await
2984            .expect("Failed to collect CDF batches");
2985
2986        let commit_timestamp_index = batches
2987            .first()
2988            .expect("expected CDF batches")
2989            .schema()
2990            .index_of("_commit_timestamp")
2991            .expect("expected CDF commit timestamp column");
2992        let _: Vec<_> = batches
2993            .iter_mut()
2994            .map(|batch| batch.remove_column(commit_timestamp_index))
2995            .collect();
2996
2997        assert_batches_sorted_eq! {[
2998            "+----------------------+----------+-------+--------------+-----------------+",
2999            "| utcDate              | homeTeam | score | _change_type | _commit_version |",
3000            "+----------------------+----------+-------+--------------+-----------------+",
3001            "| 2008-08-16T15:00:00Z | Everton  | 0-1   | delete       | 2               |",
3002            "| 2008-08-16T15:00:00Z | Everton  | 0-1   | insert       | 1               |",
3003            "| 2009-05-16T15:00:00Z | Everton  | 3-1   | insert       | 1               |",
3004            "| 2010-01-01T15:00:00Z | Everton  | 0-1   | insert       | 2               |",
3005            "+----------------------+----------+-------+--------------+-----------------+",
3006        ], &batches }
3007
3008        Ok(())
3009    }
3010
3011    #[tokio::test]
3012    async fn test_write_cdc_with_overwrite_predicate_partitioned_parallel_input() -> TestResult {
3013        let delta_schema = TestSchemas::simple();
3014        let table: DeltaTable = DeltaTable::new_in_memory()
3015            .create()
3016            .with_columns(delta_schema.fields().cloned())
3017            .with_partition_columns(["id"])
3018            .with_configuration_property(TableProperty::EnableChangeDataFeed, Some("true"))
3019            .await
3020            .unwrap();
3021        assert_eq!(table.version(), Some(0));
3022
3023        let schema: Arc<ArrowSchema> = Arc::new(delta_schema.try_into_arrow()?);
3024
3025        let batch = RecordBatch::try_new(
3026            Arc::clone(&schema),
3027            vec![
3028                Arc::new(StringArray::from(vec![Some("1"), Some("2"), Some("3")])),
3029                Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)])),
3030                Arc::new(StringArray::from(vec![
3031                    Some("yes"),
3032                    Some("yes"),
3033                    Some("no"),
3034                ])),
3035            ],
3036        )
3037        .unwrap();
3038
3039        let second_batch = RecordBatch::try_new(
3040            Arc::clone(&schema),
3041            vec![
3042                Arc::new(StringArray::from(vec![Some("3")])),
3043                Arc::new(Int32Array::from(vec![Some(3)])),
3044                Arc::new(StringArray::from(vec![Some("updated")])),
3045            ],
3046        )
3047        .unwrap();
3048
3049        let table = table
3050            .write(vec![batch])
3051            .await
3052            .expect("Failed to write first batch");
3053        assert_eq!(table.version(), Some(1));
3054
3055        let multi_stream_input: Arc<dyn TableProvider> = Arc::new(
3056            MemTable::try_new(
3057                second_batch.schema(),
3058                vec![
3059                    vec![second_batch.clone()],
3060                    vec![second_batch.clone()],
3061                    vec![second_batch.clone()],
3062                ],
3063            )
3064            .unwrap(),
3065        );
3066        let multi_stream_plan =
3067            LogicalPlanBuilder::scan("source", provider_as_source(multi_stream_input), None)?
3068                .build()?;
3069
3070        let table = table
3071            .write(vec![])
3072            .with_input_plan(multi_stream_plan)
3073            .with_save_mode(crate::protocol::SaveMode::Overwrite)
3074            .with_replace_where("value=3")
3075            .await
3076            .unwrap();
3077        assert_eq!(table.version(), Some(2));
3078
3079        let snapshot_bytes = table
3080            .log_store
3081            .read_commit_entry(2)
3082            .await?
3083            .expect("failed to get snapshot bytes");
3084        let version_actions = get_actions(2, &snapshot_bytes)?;
3085
3086        let cdc_actions = version_actions
3087            .iter()
3088            .filter(|action| matches!(action, &&Action::Cdc(_)))
3089            .collect_vec();
3090        assert!(!cdc_actions.is_empty());
3091
3092        let ctx = SessionContext::new();
3093        let cdf_scan = table
3094            .clone()
3095            .scan_cdf()
3096            .with_starting_version(0)
3097            .build(&ctx.state(), None)
3098            .await
3099            .expect("Failed to load CDF");
3100        let mut batches = collect(cdf_scan, ctx.state().task_ctx())
3101            .await
3102            .expect("Failed to collect CDF batches");
3103
3104        // _commit_timestamp is dynamic, drop it for stable assertions.
3105        let _: Vec<_> = batches.iter_mut().map(|b| b.remove_column(5)).collect();
3106
3107        assert_batches_sorted_eq! {[
3108            "+-------+----------+----+--------------+-----------------+",
3109            "| value | modified | id | _change_type | _commit_version |",
3110            "+-------+----------+----+--------------+-----------------+",
3111            "| 1     | yes      | 1  | insert       | 1               |",
3112            "| 2     | yes      | 2  | insert       | 1               |",
3113            "| 3     | no       | 3  | delete       | 2               |",
3114            "| 3     | no       | 3  | insert       | 1               |",
3115            "| 3     | updated  | 3  | insert       | 2               |",
3116            "| 3     | updated  | 3  | insert       | 2               |",
3117            "| 3     | updated  | 3  | insert       | 2               |",
3118            "+-------+----------+----+--------------+-----------------+",
3119        ], &batches }
3120
3121        let expected_table = [
3122            "+-------+----------+----+",
3123            "| value | modified | id |",
3124            "+-------+----------+----+",
3125            "| 1     | yes      | 1  |",
3126            "| 2     | yes      | 2  |",
3127            "| 3     | updated  | 3  |",
3128            "| 3     | updated  | 3  |",
3129            "| 3     | updated  | 3  |",
3130            "+-------+----------+----+",
3131        ];
3132        let actual_table = get_data_sorted(&table, "value, modified, id").await;
3133        assert_batches_sorted_eq!(&expected_table, &actual_table);
3134        Ok(())
3135    }
3136
3137    /// SMall module to collect test cases which validate the [WriteBuilder]'s
3138    /// check_preconditions() function
3139    mod check_preconditions_test {
3140        use super::*;
3141
3142        #[tokio::test]
3143        async fn test_schema_overwrite_on_append() -> DeltaResult<()> {
3144            let table_schema = get_delta_schema();
3145            let batch = get_record_batch(None, false);
3146            let table = DeltaTable::new_in_memory()
3147                .create()
3148                .with_columns(table_schema.fields().cloned())
3149                .await?;
3150            let writer = table
3151                .write(vec![batch])
3152                .with_schema_mode(SchemaMode::Overwrite)
3153                .with_save_mode(SaveMode::Append);
3154
3155            let check = writer.check_preconditions().await;
3156            assert!(check.is_err());
3157            Ok(())
3158        }
3159
3160        #[tokio::test]
3161        async fn test_savemode_overwrite_on_append_table() -> DeltaResult<()> {
3162            let table_schema = get_delta_schema();
3163            let batch = get_record_batch(None, false);
3164            let table = DeltaTable::new_in_memory()
3165                .create()
3166                .with_configuration_property(TableProperty::AppendOnly, Some("true".to_string()))
3167                .with_columns(table_schema.fields().cloned())
3168                .await?;
3169            let writer = table.write(vec![batch]).with_save_mode(SaveMode::Overwrite);
3170
3171            let check = writer.check_preconditions().await;
3172            assert!(check.is_err());
3173            Ok(())
3174        }
3175
3176        #[tokio::test]
3177        async fn test_empty_set_of_batches() -> DeltaResult<()> {
3178            let table_schema = get_delta_schema();
3179            let table = DeltaTable::new_in_memory()
3180                .create()
3181                .with_columns(table_schema.fields().cloned())
3182                .await?;
3183            let writer = table.write(vec![]);
3184
3185            match writer.check_preconditions().await {
3186                Ok(_) => panic!("Expected check_preconditions to fail!"),
3187                Err(DeltaTableError::GenericError { .. }) => {}
3188                Err(e) => panic!("Unexpected error returned: {e:#?}"),
3189            }
3190            Ok(())
3191        }
3192
3193        #[tokio::test]
3194        async fn test_errorifexists() -> DeltaResult<()> {
3195            let table_schema = get_delta_schema();
3196            let batch = get_record_batch(None, false);
3197            let table = DeltaTable::new_in_memory()
3198                .create()
3199                .with_columns(table_schema.fields().cloned())
3200                .await?;
3201            let writer = table
3202                .write(vec![batch])
3203                .with_save_mode(SaveMode::ErrorIfExists);
3204
3205            match writer.check_preconditions().await {
3206                Ok(_) => panic!("Expected check_preconditions to fail!"),
3207                Err(DeltaTableError::GenericError { .. }) => {}
3208                Err(e) => panic!("Unexpected error returned: {e:#?}"),
3209            }
3210            Ok(())
3211        }
3212
3213        #[tokio::test]
3214        async fn test_allow_empty_batches_with_input_plan() -> DeltaResult<()> {
3215            let table_schema = get_delta_schema();
3216            let table = DeltaTable::new_in_memory()
3217                .create()
3218                .with_columns(table_schema.fields().cloned())
3219                .await?;
3220
3221            let ctx = SessionContext::new();
3222            let plan = ctx
3223                .sql("SELECT 1 as id")
3224                .await
3225                .unwrap()
3226                .logical_plan()
3227                .clone();
3228            let writer =
3229                WriteBuilder::new(table.log_store.clone(), table.state.map(|f| f.snapshot))
3230                    .with_input_plan(plan)
3231                    .with_save_mode(SaveMode::Overwrite);
3232
3233            let _ = writer.check_preconditions().await?;
3234            Ok(())
3235        }
3236
3237        #[tokio::test]
3238        async fn test_no_snapshot_create_actions() -> DeltaResult<()> {
3239            let table_schema = get_delta_schema();
3240            let table = DeltaTable::new_in_memory()
3241                .create()
3242                .with_columns(table_schema.fields().cloned())
3243                .await?;
3244            let batch = get_record_batch(None, false);
3245            let writer =
3246                WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![batch]);
3247
3248            let actions = writer.check_preconditions().await?;
3249            assert_eq!(
3250                actions.len(),
3251                2,
3252                "Expecting a Protocol and a Metadata action in {actions:?}"
3253            );
3254
3255            Ok(())
3256        }
3257
3258        #[tokio::test]
3259        async fn test_no_snapshot_err_no_batches_check() -> DeltaResult<()> {
3260            let table_schema = get_delta_schema();
3261            let table = DeltaTable::new_in_memory()
3262                .create()
3263                .with_columns(table_schema.fields().cloned())
3264                .await?;
3265            let writer =
3266                WriteBuilder::new(table.log_store.clone(), None).with_input_batches(vec![]);
3267
3268            match writer.check_preconditions().await {
3269                Ok(_) => panic!("Expected check_preconditions to fail!"),
3270                Err(DeltaTableError::GenericError { .. }) => {}
3271                Err(e) => panic!("Unexpected error returned: {e:#?}"),
3272            }
3273
3274            Ok(())
3275        }
3276    }
3277
3278    #[tokio::test]
3279    async fn test_preserve_nullability_on_overwrite() -> TestResult {
3280        // Test that nullability constraints are preserved when overwriting with mode=overwrite, schema_mode=None
3281        use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
3282        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3283        use std::sync::Arc;
3284
3285        // Create initial table with non-nullable columns
3286        let initial_schema = Arc::new(ArrowSchema::new(vec![
3287            Field::new("id", DataType::Int64, false), // non-nullable
3288            Field::new("name", DataType::Utf8, true), // nullable
3289            Field::new("active", DataType::Boolean, false), // non-nullable
3290            Field::new("count", DataType::Int32, false), // non-nullable
3291        ]));
3292
3293        let initial_batch = RecordBatch::try_new(
3294            initial_schema.clone(),
3295            vec![
3296                Arc::new(Int64Array::from(vec![1, 2, 3])),
3297                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
3298                Arc::new(BooleanArray::from(vec![true, false, true])),
3299                Arc::new(Int32Array::from(vec![10, 20, 30])),
3300            ],
3301        )?;
3302
3303        // Create initial table
3304        let table = DeltaTable::new_in_memory()
3305            .write(vec![initial_batch])
3306            .with_save_mode(SaveMode::Overwrite)
3307            .await?;
3308
3309        // Verify initial schema has correct nullability
3310        let schema = table.snapshot().unwrap().schema();
3311        let schema_fields: Vec<_> = schema.fields().collect();
3312        assert!(!schema_fields[0].is_nullable(), "id should be non-nullable");
3313        assert!(schema_fields[1].is_nullable(), "name should be nullable");
3314        assert!(
3315            !schema_fields[2].is_nullable(),
3316            "active should be non-nullable"
3317        );
3318        assert!(
3319            !schema_fields[3].is_nullable(),
3320            "count should be non-nullable"
3321        );
3322
3323        // Create new data with all nullable fields (simulating data from sources like Pandas)
3324        let new_schema = Arc::new(ArrowSchema::new(vec![
3325            Field::new("id", DataType::Int64, true), // nullable in new data
3326            Field::new("name", DataType::Utf8, true), // nullable in new data
3327            Field::new("active", DataType::Boolean, true), // nullable in new data
3328            Field::new("count", DataType::Int32, true), // nullable in new data
3329        ]));
3330
3331        let new_batch = RecordBatch::try_new(
3332            new_schema,
3333            vec![
3334                Arc::new(Int64Array::from(vec![Some(4), Some(5), Some(6)])),
3335                Arc::new(StringArray::from(vec![
3336                    Some("David"),
3337                    Some("Eve"),
3338                    Some("Frank"),
3339                ])),
3340                Arc::new(BooleanArray::from(vec![
3341                    Some(false),
3342                    Some(true),
3343                    Some(false),
3344                ])),
3345                Arc::new(Int32Array::from(vec![Some(40), Some(50), Some(60)])),
3346            ],
3347        )?;
3348
3349        // Overwrite with schema_mode=None (default) - should preserve nullability
3350        let table = table
3351            .write(vec![new_batch])
3352            .with_save_mode(SaveMode::Overwrite)
3353            // schema_mode is None by default
3354            .await?;
3355
3356        // Verify that nullability constraints are preserved
3357        let schema = table.snapshot().unwrap().schema();
3358        let final_fields: Vec<_> = schema.fields().collect();
3359        assert!(
3360            !final_fields[0].is_nullable(),
3361            "id should remain non-nullable after overwrite"
3362        );
3363        assert!(
3364            final_fields[1].is_nullable(),
3365            "name should remain nullable after overwrite"
3366        );
3367        assert!(
3368            !final_fields[2].is_nullable(),
3369            "active should remain non-nullable after overwrite"
3370        );
3371        assert!(
3372            !final_fields[3].is_nullable(),
3373            "count should remain non-nullable after overwrite"
3374        );
3375
3376        // Verify the data was actually overwritten by checking version increased
3377        assert_eq!(table.version(), Some(1)); // Version should be 1 after overwrite
3378
3379        Ok(())
3380    }
3381
3382    #[tokio::test]
3383    async fn test_schema_mode_none_enforces_constraints_on_overwrite() -> TestResult {
3384        // Test that schema_mode=None with mode=overwrite:
3385        // 1. Does NOT update/overwrite the schema
3386        // 2. ENFORCES existing constraints (e.g., non-nullable fields)
3387        use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
3388        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3389        use std::sync::Arc;
3390
3391        // Create initial table with strict constraints
3392        let initial_schema = Arc::new(ArrowSchema::new(vec![
3393            Field::new("id", DataType::Int64, false), // NON-NULLABLE
3394            Field::new("name", DataType::Utf8, true), // nullable
3395            Field::new("active", DataType::Boolean, false), // NON-NULLABLE
3396            Field::new("count", DataType::Int32, false), // NON-NULLABLE
3397        ]));
3398
3399        let initial_batch = RecordBatch::try_new(
3400            initial_schema.clone(),
3401            vec![
3402                Arc::new(Int64Array::from(vec![1, 2, 3])),
3403                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob"), None])),
3404                Arc::new(BooleanArray::from(vec![true, false, true])),
3405                Arc::new(Int32Array::from(vec![10, 20, 30])),
3406            ],
3407        )?;
3408
3409        // Create initial table
3410        let table = DeltaTable::new_in_memory()
3411            .write(vec![initial_batch])
3412            .with_save_mode(SaveMode::Overwrite)
3413            .await?;
3414
3415        // Capture initial schema for comparison
3416        let initial_schema_fields: Vec<_> = table
3417            .snapshot()
3418            .unwrap()
3419            .schema()
3420            .fields()
3421            .cloned()
3422            .collect();
3423
3424        // Test 1: Verify schema is NOT changed even when incoming data has different nullability
3425        let new_schema_all_nullable = Arc::new(ArrowSchema::new(vec![
3426            Field::new("id", DataType::Int64, true), // nullable in new data
3427            Field::new("name", DataType::Utf8, true), // nullable in new data
3428            Field::new("active", DataType::Boolean, true), // nullable in new data
3429            Field::new("count", DataType::Int32, true), // nullable in new data
3430        ]));
3431
3432        let valid_batch = RecordBatch::try_new(
3433            new_schema_all_nullable.clone(),
3434            vec![
3435                Arc::new(Int64Array::from(vec![Some(4), Some(5), Some(6)])),
3436                Arc::new(StringArray::from(vec![
3437                    Some("David"),
3438                    Some("Eve"),
3439                    Some("Frank"),
3440                ])),
3441                Arc::new(BooleanArray::from(vec![
3442                    Some(false),
3443                    Some(true),
3444                    Some(false),
3445                ])),
3446                Arc::new(Int32Array::from(vec![Some(40), Some(50), Some(60)])),
3447            ],
3448        )?;
3449
3450        // This should succeed - data is valid even though schema differs
3451        let table = table
3452            .write(vec![valid_batch])
3453            .with_save_mode(SaveMode::Overwrite)
3454            // schema_mode is None by default
3455            .await?;
3456
3457        // Verify schema was NOT updated
3458        let schema = table.snapshot().unwrap().schema();
3459        let after_overwrite_fields: Vec<_> = schema.fields().collect();
3460
3461        // Schema should be EXACTLY the same as before
3462        assert_eq!(
3463            after_overwrite_fields.len(),
3464            initial_schema_fields.len(),
3465            "Schema should have same number of fields"
3466        );
3467
3468        for (i, field) in after_overwrite_fields.iter().enumerate() {
3469            assert_eq!(
3470                field.is_nullable(),
3471                initial_schema_fields[i].is_nullable(),
3472                "Field '{}' nullability should not change",
3473                field.name()
3474            );
3475        }
3476
3477        // Specifically verify non-nullable fields are still non-nullable
3478        assert!(
3479            !after_overwrite_fields[0].is_nullable(),
3480            "id must remain non-nullable"
3481        );
3482        assert!(
3483            !after_overwrite_fields[2].is_nullable(),
3484            "active must remain non-nullable"
3485        );
3486        assert!(
3487            !after_overwrite_fields[3].is_nullable(),
3488            "count must remain non-nullable"
3489        );
3490
3491        // Test 2: Verify constraints are ENFORCED - attempt to write NULL to non-nullable field
3492        // This should FAIL because we're trying to violate the non-nullable constraint
3493
3494        // Create data with NULL in a non-nullable field
3495        let invalid_batch = RecordBatch::try_new(
3496            new_schema_all_nullable.clone(),
3497            vec![
3498                Arc::new(Int64Array::from(vec![Some(7), None, Some(9)])), // NULL in non-nullable id!
3499                Arc::new(StringArray::from(vec![
3500                    Some("George"),
3501                    Some("Helen"),
3502                    Some("Ivan"),
3503                ])),
3504                Arc::new(BooleanArray::from(vec![
3505                    Some(true),
3506                    Some(false),
3507                    Some(true),
3508                ])),
3509                Arc::new(Int32Array::from(vec![Some(70), Some(80), Some(90)])),
3510            ],
3511        )?;
3512
3513        // This should fail because id is non-nullable in the table schema
3514        let result = table
3515            .clone()
3516            .write(vec![invalid_batch])
3517            .with_save_mode(SaveMode::Overwrite)
3518            .await;
3519
3520        // The write should fail due to constraint violation
3521        assert!(
3522            result.is_err(),
3523            "Writing NULL to non-nullable field should fail"
3524        );
3525
3526        // Test 3: Also test with NULL in another non-nullable field (active)
3527        let invalid_batch_2 = RecordBatch::try_new(
3528            new_schema_all_nullable.clone(),
3529            vec![
3530                Arc::new(Int64Array::from(vec![Some(10), Some(11), Some(12)])),
3531                Arc::new(StringArray::from(vec![
3532                    Some("Jane"),
3533                    Some("Karl"),
3534                    Some("Lisa"),
3535                ])),
3536                Arc::new(BooleanArray::from(vec![Some(true), None, Some(false)])), // NULL in non-nullable active!
3537                Arc::new(Int32Array::from(vec![Some(100), Some(110), Some(120)])),
3538            ],
3539        )?;
3540
3541        let result2 = table
3542            .clone()
3543            .write(vec![invalid_batch_2])
3544            .with_save_mode(SaveMode::Overwrite)
3545            .await;
3546
3547        // This should also fail
3548        assert!(
3549            result2.is_err(),
3550            "Writing NULL to non-nullable 'active' field should fail"
3551        );
3552
3553        // Verify the table data and schema remain unchanged after failed writes
3554        let schema = table.snapshot().unwrap().schema();
3555        let final_fields: Vec<_> = schema.fields().collect();
3556
3557        // Schema should still be the original schema
3558        assert!(
3559            !final_fields[0].is_nullable(),
3560            "id still non-nullable after failed writes"
3561        );
3562        assert!(
3563            !final_fields[2].is_nullable(),
3564            "active still non-nullable after failed writes"
3565        );
3566        assert!(
3567            !final_fields[3].is_nullable(),
3568            "count still non-nullable after failed writes"
3569        );
3570
3571        Ok(())
3572    }
3573
3574    #[tokio::test]
3575    async fn test_schema_preserved_with_replace_where() -> TestResult {
3576        // Test that schema is preserved when using overwrite with predicate (replaceWhere)
3577        use arrow_array::{BooleanArray, Int32Array, Int64Array, RecordBatch, StringArray};
3578        use arrow_schema::{DataType, Field, Schema as ArrowSchema};
3579        use std::sync::Arc;
3580
3581        // Create initial table with mixed nullability
3582        let initial_schema = Arc::new(ArrowSchema::new(vec![
3583            Field::new("id", DataType::Int64, false), // non-nullable
3584            Field::new("name", DataType::Utf8, true), // nullable
3585            Field::new("active", DataType::Boolean, false), // non-nullable
3586            Field::new("count", DataType::Int32, false), // non-nullable
3587        ]));
3588
3589        let initial_batch = RecordBatch::try_new(
3590            initial_schema.clone(),
3591            vec![
3592                Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5])),
3593                Arc::new(StringArray::from(vec![
3594                    Some("Alice"),
3595                    Some("Bob"),
3596                    None,
3597                    Some("David"),
3598                    Some("Eve"),
3599                ])),
3600                Arc::new(BooleanArray::from(vec![true, false, true, false, true])),
3601                Arc::new(Int32Array::from(vec![10, 20, 30, 40, 50])),
3602            ],
3603        )?;
3604
3605        let table = DeltaTable::new_in_memory()
3606            .write(vec![initial_batch])
3607            .with_save_mode(SaveMode::Overwrite)
3608            .await?;
3609
3610        // Capture initial schema
3611        let initial_fields: Vec<_> = table
3612            .snapshot()
3613            .unwrap()
3614            .schema()
3615            .fields()
3616            .cloned()
3617            .collect();
3618
3619        // Create new data with all nullable fields (typical from Pandas)
3620        let new_schema = Arc::new(ArrowSchema::new(vec![
3621            Field::new("id", DataType::Int64, true), // nullable in new data
3622            Field::new("name", DataType::Utf8, true), // nullable
3623            Field::new("active", DataType::Boolean, true), // nullable
3624            Field::new("count", DataType::Int32, true), // nullable
3625        ]));
3626
3627        let replacement_batch = RecordBatch::try_new(
3628            new_schema.clone(),
3629            vec![
3630                Arc::new(Int64Array::from(vec![Some(2), Some(4)])), // Replace ids 2 and 4
3631                Arc::new(StringArray::from(vec![Some("Bob2"), Some("David2")])),
3632                Arc::new(BooleanArray::from(vec![Some(true), Some(true)])),
3633                Arc::new(Int32Array::from(vec![Some(200), Some(400)])),
3634            ],
3635        )?;
3636
3637        // Use replaceWhere to selectively overwrite
3638        let table = table
3639            .write(vec![replacement_batch])
3640            .with_save_mode(SaveMode::Overwrite)
3641            .with_replace_where("id = 2 OR id = 4")
3642            .await?;
3643
3644        // Verify schema is preserved
3645        let schema = table.snapshot().unwrap().schema();
3646        let final_fields: Vec<_> = schema.fields().collect();
3647
3648        for (i, field) in final_fields.iter().enumerate() {
3649            assert_eq!(
3650                field.is_nullable(),
3651                initial_fields[i].is_nullable(),
3652                "Field '{}' nullability should be preserved with replaceWhere",
3653                field.name()
3654            );
3655        }
3656
3657        // Now test that constraints are still enforced with replaceWhere
3658        let invalid_batch = RecordBatch::try_new(
3659            new_schema,
3660            vec![
3661                Arc::new(Int64Array::from(vec![None, Some(3)])), // NULL in non-nullable id!
3662                Arc::new(StringArray::from(vec![Some("Invalid"), Some("Valid")])),
3663                Arc::new(BooleanArray::from(vec![Some(false), Some(false)])),
3664                Arc::new(Int32Array::from(vec![Some(999), Some(333)])),
3665            ],
3666        )?;
3667
3668        let result = table
3669            .write(vec![invalid_batch])
3670            .with_save_mode(SaveMode::Overwrite)
3671            .with_replace_where("id = 1 OR id = 3")
3672            .await;
3673
3674        assert!(
3675            result.is_err(),
3676            "replaceWhere should still enforce non-nullable constraints"
3677        );
3678
3679        Ok(())
3680    }
3681
3682    #[tokio::test]
3683    async fn test_write_date64_normalizes_to_date32() {
3684        use arrow_array::Date64Array;
3685
3686        let schema = Arc::new(ArrowSchema::new(vec![
3687            Field::new("id", DataType::Int32, false),
3688            Field::new("sales_date", DataType::Date64, true),
3689        ]));
3690        let millis = 1760918400000i64; // 2025 10 20 in ms since epoch
3691        let batch = RecordBatch::try_new(
3692            schema,
3693            vec![
3694                Arc::new(Int32Array::from(vec![1])),
3695                Arc::new(Date64Array::from(vec![millis])),
3696            ],
3697        )
3698        .unwrap();
3699
3700        let table = DeltaTable::new_in_memory()
3701            .write(vec![batch])
3702            .await
3703            .unwrap();
3704
3705        let table_schema = table.snapshot().unwrap().schema();
3706        let date_field = table_schema.field("sales_date").unwrap();
3707        assert_eq!(date_field.data_type(), &crate::kernel::DataType::DATE);
3708
3709        let batches = get_data(&table).await;
3710        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
3711        assert_eq!(total_rows, 1);
3712        assert_eq!(
3713            batches[0]
3714                .schema()
3715                .field_with_name("sales_date")
3716                .unwrap()
3717                .data_type(),
3718            &DataType::Date32,
3719        );
3720    }
3721
3722    #[cfg(not(feature = "nanosecond-timestamps"))]
3723    #[tokio::test]
3724    async fn test_write_timestamp_ns_normalizes_to_us() {
3725        test_write_timestamp_ns_maybe_normalization(TimeUnit::Microsecond).await;
3726    }
3727
3728    #[cfg(feature = "nanosecond-timestamps")]
3729    #[tokio::test]
3730    async fn test_write_timestamp_ns_stays_ns() {
3731        test_write_timestamp_ns_maybe_normalization(TimeUnit::Nanosecond).await;
3732    }
3733
3734    async fn test_write_timestamp_ns_maybe_normalization(unit: TimeUnit) {
3735        use arrow_array::TimestampNanosecondArray;
3736
3737        let schema = Arc::new(ArrowSchema::new(vec![
3738            Field::new("id", DataType::Int32, false),
3739            Field::new(
3740                "ts",
3741                DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
3742                true,
3743            ),
3744        ]));
3745        let nanos = 1_760_961_600_123_456_789_i64;
3746        let batch = RecordBatch::try_new(
3747            schema,
3748            vec![
3749                Arc::new(Int32Array::from(vec![1])),
3750                Arc::new(TimestampNanosecondArray::from(vec![nanos]).with_timezone("UTC")),
3751            ],
3752        )
3753        .unwrap();
3754
3755        let table = DeltaTable::new_in_memory()
3756            .write(vec![batch])
3757            .await
3758            .unwrap();
3759
3760        let batches = get_data(&table).await;
3761        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 1);
3762        let schema = batches[0].schema();
3763        let result_field = schema.field_with_name("ts").unwrap();
3764        assert_eq!(
3765            result_field.data_type(),
3766            &DataType::Timestamp(unit, Some("UTC".into())),
3767        );
3768    }
3769
3770    #[tokio::test]
3771    async fn test_write_large_utf8_normalizes_to_utf8() {
3772        use arrow_array::LargeStringArray;
3773
3774        let schema = Arc::new(ArrowSchema::new(vec![
3775            Field::new("id", DataType::Int32, false),
3776            Field::new("name", DataType::LargeUtf8, true),
3777        ]));
3778        let batch = RecordBatch::try_new(
3779            schema,
3780            vec![
3781                Arc::new(Int32Array::from(vec![1, 2])),
3782                Arc::new(LargeStringArray::from(vec!["hello", "world"])),
3783            ],
3784        )
3785        .unwrap();
3786
3787        let table = DeltaTable::new_in_memory()
3788            .write(vec![batch])
3789            .await
3790            .unwrap();
3791
3792        let table_schema = table.snapshot().unwrap().schema();
3793        assert_eq!(
3794            table_schema.field("name").unwrap().data_type(),
3795            &crate::kernel::DataType::STRING,
3796        );
3797
3798        let batches = get_data(&table).await;
3799        assert_eq!(batches.iter().map(|b| b.num_rows()).sum::<usize>(), 2);
3800    }
3801
3802    #[tokio::test]
3803    async fn test_append_date64_to_existing_date32_table() {
3804        use arrow_array::{Date32Array, Date64Array};
3805
3806        let schema32 = Arc::new(ArrowSchema::new(vec![
3807            Field::new("id", DataType::Int32, false),
3808            Field::new("d", DataType::Date32, true),
3809        ]));
3810        let batch32 = RecordBatch::try_new(
3811            schema32,
3812            vec![
3813                Arc::new(Int32Array::from(vec![1])),
3814                Arc::new(Date32Array::from(vec![19650])),
3815            ],
3816        )
3817        .unwrap();
3818
3819        let table = DeltaTable::new_in_memory()
3820            .write(vec![batch32])
3821            .await
3822            .unwrap();
3823
3824        let schema64 = Arc::new(ArrowSchema::new(vec![
3825            Field::new("id", DataType::Int32, false),
3826            Field::new("d", DataType::Date64, true),
3827        ]));
3828        let batch64 = RecordBatch::try_new(
3829            schema64,
3830            vec![
3831                Arc::new(Int32Array::from(vec![2])),
3832                Arc::new(Date64Array::from(vec![1760918400000i64])),
3833            ],
3834        )
3835        .unwrap();
3836
3837        let table = table
3838            .write(vec![batch64])
3839            .with_save_mode(SaveMode::Append)
3840            .await
3841            .unwrap();
3842
3843        let batches = get_data(&table).await;
3844        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
3845        assert_eq!(total_rows, 2);
3846    }
3847}