deltalake_core/writer/
mod.rs1use arrow::{datatypes::SchemaRef, error::ArrowError};
4use async_trait::async_trait;
5use object_store::Error as ObjectStoreError;
6use parquet::errors::ParquetError;
7use serde_json::Value;
8
9use crate::DeltaTable;
10use crate::errors::{DeltaTableError, unsupported_column_mapping_write};
11use crate::kernel::transaction::{CommitBuilder, CommitProperties};
12use crate::kernel::{Action, Add, Version};
13use crate::protocol::{ColumnCountStat, DeltaOperation, SaveMode};
14
15pub use json::JsonWriter;
16pub use record_batch::RecordBatchWriter;
17
18pub mod json;
19pub mod record_batch;
20pub(crate) mod stats;
21pub mod utils;
22
23#[cfg(test)]
24pub mod test_utils;
25
26pub(crate) fn ensure_legacy_writer_supports_table(
27 table: &DeltaTable,
28 operation: &str,
29) -> Result<(), DeltaTableError> {
30 if table
31 .snapshot()?
32 .table_config()
33 .column_mapping_mode
34 .is_some_and(|mode| mode != delta_kernel::table_features::ColumnMappingMode::None)
35 {
36 return Err(unsupported_column_mapping_write(operation));
37 }
38
39 Ok(())
40}
41
42#[derive(thiserror::Error, Debug)]
44pub(crate) enum DeltaWriterError {
45 #[error("Missing partition column: {0}")]
47 MissingPartitionColumn(String),
48
49 #[error(
51 "Arrow RecordBatch schema does not match: RecordBatch schema: {record_batch_schema}, {expected_schema}"
52 )]
53 SchemaMismatch {
54 record_batch_schema: SchemaRef,
56 expected_schema: SchemaRef,
58 },
59
60 #[error("Arrow RecordBatch created from JSON buffer is a None value")]
62 EmptyRecordBatch,
63
64 #[error("Record {0} is not a JSON object")]
66 InvalidRecord(String),
67
68 #[error("Failed to write some values to parquet. Sample error: {sample_error}.")]
70 PartialParquetWrite {
71 skipped_values: Vec<(Value, ParquetError)>,
73 sample_error: ParquetError,
75 },
76
77 #[error("Failed to write statistics value {debug_value} with logical type {logical_type:?}")]
79 StatsParsingFailed {
80 debug_value: String,
81 logical_type: Option<parquet::basic::LogicalType>,
82 },
83
84 #[error("Failed to serialize data to JSON: {source}")]
86 JSONSerializationFailed {
87 #[from]
88 source: serde_json::Error,
89 },
90
91 #[error("ObjectStore interaction failed: {source}")]
93 ObjectStore {
94 #[from]
96 source: ObjectStoreError,
97 },
98
99 #[error("Arrow interaction failed: {source}")]
101 Arrow {
102 #[from]
104 source: ArrowError,
105 },
106
107 #[error("Parquet write failed: {source}")]
109 Parquet {
110 #[from]
112 source: ParquetError,
113 },
114
115 #[error("std::io::Error: {source}")]
117 Io {
118 #[from]
120 source: std::io::Error,
121 },
122
123 #[error(transparent)]
125 DeltaTable(#[from] DeltaTableError),
126}
127
128impl From<DeltaWriterError> for DeltaTableError {
129 fn from(err: DeltaWriterError) -> Self {
130 match err {
131 DeltaWriterError::Arrow { source } => DeltaTableError::Arrow { source },
132 DeltaWriterError::Io { source } => DeltaTableError::Io { source },
133 DeltaWriterError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
134 DeltaWriterError::Parquet { source } => DeltaTableError::Parquet { source },
135 DeltaWriterError::DeltaTable(e) => e,
136 DeltaWriterError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
137 msg: err.to_string(),
138 },
139 _ => DeltaTableError::Generic(err.to_string()),
140 }
141 }
142}
143
144#[derive(Copy, Clone, Debug, PartialEq)]
146pub enum WriteMode {
147 Default,
149 MergeSchema,
153}
154
155#[async_trait]
156pub trait DeltaWriter<T> {
158 async fn write(&mut self, values: T) -> Result<(), DeltaTableError>;
160
161 async fn write_with_mode(&mut self, values: T, mode: WriteMode) -> Result<(), DeltaTableError>;
163
164 async fn flush(&mut self) -> Result<Vec<Add>, DeltaTableError>;
167
168 async fn flush_and_commit(
171 &mut self,
172 table: &mut DeltaTable,
173 ) -> Result<Version, DeltaTableError> {
174 let adds: Vec<_> = self.flush().await?.drain(..).map(Action::Add).collect();
175 flush_and_commit(adds, table, None).await
176 }
177}
178
179pub(crate) async fn flush_and_commit(
181 adds: Vec<Action>,
182 table: &mut DeltaTable,
183 commit_properties: Option<CommitProperties>,
184) -> Result<Version, DeltaTableError> {
185 let snapshot = table.snapshot()?;
186 let partition_cols: Vec<String> = snapshot.metadata().partition_columns().into();
187 let partition_by = if !partition_cols.is_empty() {
188 Some(partition_cols)
189 } else {
190 None
191 };
192 let operation = DeltaOperation::Write {
193 mode: SaveMode::Append,
194 partition_by,
195 predicate: None,
196 };
197
198 let finalized = CommitBuilder::from(commit_properties.unwrap_or_default())
199 .with_actions(adds)
200 .build(Some(snapshot), table.log_store.clone(), operation)
201 .await?;
202 table.state = Some(finalized.snapshot());
203 Ok(finalized.version())
204}
205
206#[cfg(test)]
207mod tests {
208 use delta_kernel::schema::DataType;
209
210 use super::*;
211 use crate::DeltaResult;
212 use pretty_assertions::assert_ne;
213
214 #[tokio::test]
223 async fn test_flush_and_commit() -> DeltaResult<()> {
224 let mut table = DeltaTable::new_in_memory()
225 .create()
226 .with_table_name("my_table")
227 .with_column(
228 "id",
229 DataType::Primitive(delta_kernel::schema::PrimitiveType::Long),
230 true,
231 None,
232 )
233 .with_configuration_property(
234 crate::TableProperty::LogRetentionDuration,
235 Some("interval 0 days"),
236 )
237 .await?;
238
239 let add = Add::default();
240 let actions = vec![Action::Add(add)];
241 let first_version = flush_and_commit(actions, &mut table, None).await?;
242
243 let add = Add::default();
244 let actions = vec![Action::Add(add)];
245
246 let properties = CommitProperties::default().with_cleanup_expired_logs(Some(false));
247 let second_version = flush_and_commit(actions, &mut table, Some(properties)).await?;
248 assert_ne!(
249 second_version, first_version,
250 "flush_and_commit did not create a version apparently?"
251 );
252 Ok(())
253 }
254}