Skip to main content

deltalake_core/writer/
mod.rs

1//! Abstractions and implementations for writing data to delta tables
2
3use arrow::{datatypes::SchemaRef, error::ArrowError};
4use async_trait::async_trait;
5use object_store::Error as ObjectStoreError;
6use parquet::errors::ParquetError;
7use serde_json::Value;
8
9use crate::DeltaTable;
10use crate::errors::{DeltaTableError, unsupported_column_mapping_write};
11use crate::kernel::transaction::{CommitBuilder, CommitProperties};
12use crate::kernel::{Action, Add, Version};
13use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode};
14
15pub use json::JsonWriter;
16pub use record_batch::RecordBatchWriter;
17
18pub mod json;
19pub mod record_batch;
20pub(crate) mod stats;
21pub mod utils;
22
23#[cfg(test)]
24pub mod test_utils;
25
26pub(crate) fn ensure_legacy_writer_supports_table(
27    table: &DeltaTable,
28    operation: &str,
29) -> Result<(), DeltaTableError> {
30    if table
31        .snapshot()?
32        .table_config()
33        .column_mapping_mode
34        .is_some_and(|mode| mode != delta_kernel::table_features::ColumnMappingMode::None)
35    {
36        return Err(unsupported_column_mapping_write(operation));
37    }
38
39    Ok(())
40}
41
42/// Enum representing an error when calling [`DeltaWriter`].
43#[derive(thiserror::Error, Debug)]
44pub(crate) enum DeltaWriterError {
45    /// Partition column is missing in a record written to delta.
46    #[error("Missing partition column: {0}")]
47    MissingPartitionColumn(String),
48
49    /// The Arrow RecordBatch schema does not match the expected schema.
50    #[error(
51        "Arrow RecordBatch schema does not match: RecordBatch schema: {record_batch_schema}, {expected_schema}"
52    )]
53    SchemaMismatch {
54        /// The record batch schema.
55        record_batch_schema: SchemaRef,
56        /// The schema of the target delta table.
57        expected_schema: SchemaRef,
58    },
59
60    /// An Arrow RecordBatch could not be created from the JSON buffer.
61    #[error("Arrow RecordBatch created from JSON buffer is a None value")]
62    EmptyRecordBatch,
63
64    /// A record was written that was not a JSON object.
65    #[error("Record {0} is not a JSON object")]
66    InvalidRecord(String),
67
68    /// Indicates that a partial write was performed and error records were discarded.
69    #[error("Failed to write some values to parquet. Sample error: {sample_error}.")]
70    PartialParquetWrite {
71        /// Vec of tuples where the first element of each tuple is the skipped value and the second element is the [`ParquetError`] associated with it.
72        skipped_values: Vec<(Value, ParquetError)>,
73        /// A sample [`ParquetError`] representing the overall partial write.
74        sample_error: ParquetError,
75    },
76
77    /// Serialization of delta log statistics failed.
78    #[error("Failed to write statistics value {debug_value} with logical type {logical_type:?}")]
79    StatsParsingFailed {
80        debug_value: String,
81        logical_type: Option<parquet::basic::LogicalType>,
82    },
83
84    /// JSON serialization failed
85    #[error("Failed to serialize data to JSON: {source}")]
86    JSONSerializationFailed {
87        #[from]
88        source: serde_json::Error,
89    },
90
91    /// underlying object store returned an error.
92    #[error("ObjectStore interaction failed: {source}")]
93    ObjectStore {
94        /// The wrapped [`ObjectStoreError`]
95        #[from]
96        source: ObjectStoreError,
97    },
98
99    /// Arrow returned an error.
100    #[error("Arrow interaction failed: {source}")]
101    Arrow {
102        /// The wrapped [`ArrowError`]
103        #[from]
104        source: ArrowError,
105    },
106
107    /// Parquet write failed.
108    #[error("Parquet write failed: {source}")]
109    Parquet {
110        /// The wrapped [`ParquetError`]
111        #[from]
112        source: ParquetError,
113    },
114
115    /// Error returned from std::io
116    #[error("std::io::Error: {source}")]
117    Io {
118        /// The wrapped [`std::io::Error`]
119        #[from]
120        source: std::io::Error,
121    },
122
123    /// Error returned
124    #[error(transparent)]
125    DeltaTable(#[from] DeltaTableError),
126}
127
128impl From<DeltaWriterError> for DeltaTableError {
129    fn from(err: DeltaWriterError) -> Self {
130        match err {
131            DeltaWriterError::Arrow { source } => DeltaTableError::Arrow { source },
132            DeltaWriterError::Io { source } => DeltaTableError::Io { source },
133            DeltaWriterError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
134            DeltaWriterError::Parquet { source } => DeltaTableError::Parquet { source },
135            DeltaWriterError::DeltaTable(e) => e,
136            DeltaWriterError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
137                msg: err.to_string(),
138            },
139            _ => DeltaTableError::Generic(err.to_string()),
140        }
141    }
142}
143
144/// Write mode for the [DeltaWriter]
145#[derive(Copy, Clone, Debug, PartialEq)]
146pub enum WriteMode {
147    /// Default write mode which will return an error if schemas do not match correctly
148    Default,
149    /// Merge the schema of the table with the newly written data
150    ///
151    /// [Read more here](https://delta.io/blog/2023-02-08-delta-lake-schema-evolution/)
152    MergeSchema,
153}
154
155#[async_trait]
156/// Trait for writing data to Delta tables
157pub trait DeltaWriter<T> {
158    /// Write a chunk of values into the internal write buffers with the default write mode
159    async fn write(&mut self, values: T) -> Result<(), DeltaTableError>;
160
161    /// Wreite a chunk of values into the internal write buffers with the specified [WriteMode]
162    async fn write_with_mode(&mut self, values: T, mode: WriteMode) -> Result<(), DeltaTableError>;
163
164    /// Flush the internal write buffers to files in the delta table folder structure.
165    /// The corresponding delta [`Add`] actions are returned and should be committed via a transaction.
166    async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError>;
167
168    /// Flush the internal write buffers to files in the delta table folder structure.
169    /// and commit the changes to the Delta log, creating a new table version.
170    async fn flush_and_commit(
171        &mut self,
172        table: &mut DeltaTable,
173    ) -> Result<Version, DeltaTableError> {
174        let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect();
175        flush_and_commit(adds, table, None).await
176    }
177}
178
179/// Method for flushing to be used by writers
180pub(crate) async fn flush_and_commit(
181    adds: Vec<Action>,
182    table: &mut DeltaTable,
183    commit_properties: Option<CommitProperties>,
184) -> Result<Version, DeltaTableError> {
185    let snapshot = table.snapshot()?;
186    let partition_cols: Vec<String> = snapshot.metadata().partition_columns().into();
187    let partition_by = if !partition_cols.is_empty() {
188        Some(partition_cols)
189    } else {
190        None
191    };
192    let operation = DeltaOperation::Write {
193        mode: SaveMode::Append,
194        partition_by,
195        predicate: None,
196    };
197
198    let finalized = CommitBuilder::from(commit_properties.unwrap_or_default())
199        .with_actions(adds)
200        .build(Some(snapshot), table.log_store.clone(), operation)
201        .await?;
202    table.state = Some(finalized.snapshot());
203    Ok(finalized.version())
204}
205
206#[cfg(test)]
207mod tests {
208    use delta_kernel::schema::DataType;
209
210    use super::*;
211    use crate::DeltaResult;
212    use pretty_assertions::assert_ne;
213
214    /// This test doesn't have a great way to _validate_ that logs are not cleaned up as part of
215    /// the second commit.
216    ///
217    /// Instead I just added some prints in the [PostCommit] logic to validate that the property
218    /// was getting pulled through correctly in the non-default case.
219    ///
220    /// The _ideal_ testing scenario would probably be to propagate metrics out of
221    /// [flush_and_commit] but that's an API change we isn't desirable at the moment
222    #[tokio::test]
223    async fn test_flush_and_commit() -> DeltaResult<()> {
224        let mut table = DeltaTable::new_in_memory()
225            .create()
226            .with_table_name("my_table")
227            .with_column(
228                "id",
229                DataType::Primitive(delta_kernel::schema::PrimitiveType::Long),
230                true,
231                None,
232            )
233            .with_configuration_property(
234                crate::TableProperty::LogRetentionDuration,
235                Some("interval 0 days"),
236            )
237            .await?;
238
239        let add = Add::default();
240        let actions = vec![Action::Add(add)];
241        let first_version = flush_and_commit(actions, &mut table, None).await?;
242
243        let add = Add::default();
244        let actions = vec![Action::Add(add)];
245
246        let properties = CommitProperties::default().with_cleanup_expired_logs(Some(false));
247        let second_version = flush_and_commit(actions, &mut table, Some(properties)).await?;
248        assert_ne!(
249            second_version, first_version,
250            "flush_and_commit did not create a version apparently?"
251        );
252        Ok(())
253    }
254}