Skip to main content

deltalake_core/operations/write/
writer.rs

1//! Abstractions and implementations for writing data to delta tables
2
3use std::collections::HashMap;
4use std::num::NonZeroU64;
5use std::sync::OnceLock;
6
7use arrow_array::RecordBatch;
8use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
9use delta_kernel::expressions::Scalar;
10use delta_kernel::table_properties::DataSkippingNumIndexedCols;
11use futures::{StreamExt, TryStreamExt};
12use indexmap::IndexMap;
13use object_store::buffered::BufWriter;
14use object_store::path::Path;
15use parquet::arrow::AsyncArrowWriter;
16use parquet::arrow::async_writer::ParquetObjectWriter;
17use parquet::basic::Compression;
18use parquet::file::properties::WriterProperties;
19use tokio::task::JoinSet;
20use tracing::*;
21
22use crate::errors::{DeltaResult, DeltaTableError};
23use crate::kernel::{Add, PartitionsExt};
24use crate::logstore::ObjectStoreRef;
25use crate::parquet_utils::default_writer_properties;
26use crate::writer::record_batch::{PartitionResult, divide_by_partition_values};
27use crate::writer::stats::create_add;
28use crate::writer::utils::{
29    arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
30};
31
32use parquet::file::metadata::ParquetMetaData;
33
34const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
35const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;
36const DEFAULT_MAX_CONCURRENCY_TASKS: usize = 10;
37
38fn upload_part_size() -> usize {
39    static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
40    *UPLOAD_SIZE.get_or_init(|| {
41        std::env::var("DELTARS_UPLOAD_PART_SIZE")
42            .ok()
43            .and_then(|s| s.parse::<usize>().ok())
44            .map(|size| {
45                if size < DEFAULT_UPLOAD_PART_SIZE {
46                    // Minimum part size in GCS and S3
47                    debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
48                    DEFAULT_UPLOAD_PART_SIZE
49                } else if size > 1024 * 1024 * 1024 * 5 {
50                    // Maximum part size in GCS and S3
51                    debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
52                    1024 * 1024 * 1024 * 5
53                } else {
54                    size
55                }
56            })
57            .unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
58    })
59}
60
61fn get_max_concurrency_tasks() -> usize {
62    static MAX_CONCURRENCY_TASKS: OnceLock<usize> = OnceLock::new();
63    *MAX_CONCURRENCY_TASKS.get_or_init(|| {
64        std::env::var("DELTARS_MAX_CONCURRENCY_TASKS")
65            .ok()
66            .and_then(|s| s.parse::<usize>().ok())
67            .unwrap_or(DEFAULT_MAX_CONCURRENCY_TASKS)
68    })
69}
70
71/// Upload a parquet file to object store and return metadata for creating an Add action
72#[instrument(skip(arrow_writer), fields(rows = 0, size = 0))]
73async fn upload_parquet_file(
74    mut arrow_writer: AsyncArrowWriter<ParquetObjectWriter>,
75    path: Path,
76) -> DeltaResult<(Path, usize, ParquetMetaData)> {
77    let metadata = arrow_writer.finish().await?;
78    let file_size = arrow_writer.bytes_written();
79    Span::current().record("rows", metadata.file_metadata().num_rows());
80    Span::current().record("size", file_size);
81    debug!("multipart upload completed successfully");
82
83    Ok((path, file_size, metadata))
84}
85
86fn sort_completed_writes_by_path<T>(results: &mut [(Path, usize, T)]) {
87    results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
88}
89
90#[derive(thiserror::Error, Debug)]
91enum WriteError {
92    #[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")]
93    SchemaMismatch {
94        schema: ArrowSchemaRef,
95        expected_schema: ArrowSchemaRef,
96    },
97
98    #[error("Error creating add action: {source}")]
99    CreateAdd {
100        source: Box<dyn std::error::Error + Send + Sync + 'static>,
101    },
102
103    #[error("Error handling Arrow data: {source}")]
104    Arrow {
105        #[from]
106        source: ArrowError,
107    },
108
109    #[error("Error partitioning record batch: {0}")]
110    Partitioning(String),
111}
112
113impl From<WriteError> for DeltaTableError {
114    fn from(err: WriteError) -> Self {
115        match err {
116            WriteError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
117                msg: err.to_string(),
118            },
119            WriteError::Arrow { source } => DeltaTableError::Arrow { source },
120            _ => DeltaTableError::GenericError {
121                source: Box::new(err),
122            },
123        }
124    }
125}
126
127/// Configuration to write data into Delta tables
128#[derive(Debug, Clone)]
129pub struct WriterConfig {
130    /// Schema of the delta table
131    table_schema: ArrowSchemaRef,
132    /// Column names for columns the table is partitioned by
133    partition_columns: Vec<String>,
134    /// Properties passed to underlying parquet writer
135    writer_properties: WriterProperties,
136    /// Size above which we will write a buffered parquet file to disk.
137    /// If None, the writer will not create a new file until the writer is closed.
138    target_file_size: Option<NonZeroU64>,
139    /// Row chunks passed to parquet writer. This and the internal parquet writer settings
140    /// determine how fine granular we can track / control the size of resulting files.
141    write_batch_size: usize,
142    /// Num index cols to collect stats for
143    num_indexed_cols: DataSkippingNumIndexedCols,
144    /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
145    stats_columns: Option<Vec<String>>,
146}
147
148impl WriterConfig {
149    /// Create a new instance of [WriterConfig].
150    pub fn new(
151        table_schema: ArrowSchemaRef,
152        partition_columns: Vec<String>,
153        writer_properties: Option<WriterProperties>,
154        target_file_size: Option<NonZeroU64>,
155        write_batch_size: Option<usize>,
156        num_indexed_cols: DataSkippingNumIndexedCols,
157        stats_columns: Option<Vec<String>>,
158    ) -> Self {
159        let writer_properties =
160            writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
161        let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
162
163        Self {
164            table_schema,
165            partition_columns,
166            writer_properties,
167            target_file_size,
168            write_batch_size,
169            num_indexed_cols,
170            stats_columns,
171        }
172    }
173
174    /// Schema of files written to disk
175    pub fn file_schema(&self) -> ArrowSchemaRef {
176        arrow_schema_without_partitions(&self.table_schema, &self.partition_columns)
177    }
178}
179
180/// A parquet writer implementation tailored to the needs of writing data to a delta table.
181pub struct DeltaWriter {
182    /// An object store pointing at Delta table root
183    object_store: ObjectStoreRef,
184    /// configuration for the writers
185    config: WriterConfig,
186    /// partition writers for individual partitions
187    partition_writers: HashMap<Path, PartitionWriter>,
188}
189
190impl DeltaWriter {
191    /// Create a new instance of [`DeltaWriter`]
192    pub fn new(object_store: ObjectStoreRef, config: WriterConfig) -> Self {
193        Self {
194            object_store,
195            config,
196            partition_writers: HashMap::new(),
197        }
198    }
199
200    /// Apply custom writer_properties to the underlying parquet writer
201    pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
202        self.config.writer_properties = writer_properties;
203        self
204    }
205
206    fn divide_by_partition_values(
207        &mut self,
208        values: &RecordBatch,
209    ) -> DeltaResult<Vec<PartitionResult>> {
210        Ok(divide_by_partition_values(
211            self.config.file_schema(),
212            self.config.partition_columns.clone(),
213            values,
214        )
215        .map_err(|err| WriteError::Partitioning(err.to_string()))?)
216    }
217
218    /// Write a batch to the partition induced by the partition_values. The record batch is expected
219    /// to be pre-partitioned and only contain rows that belong into the same partition.
220    /// However, it should still contain the partition columns.
221    pub async fn write_partition(
222        &mut self,
223        record_batch: RecordBatch,
224        partition_values: &IndexMap<String, Scalar>,
225    ) -> DeltaResult<()> {
226        let partition_key = Path::parse(partition_values.hive_partition_path())?;
227
228        let record_batch =
229            record_batch_without_partitions(&record_batch, &self.config.partition_columns)?;
230
231        match self.partition_writers.get_mut(&partition_key) {
232            Some(writer) => {
233                writer.write(&record_batch).await?;
234            }
235            None => {
236                let config = PartitionWriterConfig::try_new(
237                    self.config.file_schema(),
238                    partition_values.clone(),
239                    Some(self.config.writer_properties.clone()),
240                    self.config.target_file_size,
241                    Some(self.config.write_batch_size),
242                    None,
243                )?;
244                let mut writer = PartitionWriter::try_with_config(
245                    self.object_store.clone(),
246                    config,
247                    self.config.num_indexed_cols,
248                    self.config.stats_columns.clone(),
249                )?;
250                writer.write(&record_batch).await?;
251                let _ = self.partition_writers.insert(partition_key, writer);
252            }
253        }
254
255        Ok(())
256    }
257
258    /// Buffers record batches in-memory per partition up to appx. `target_file_size` for a partition.
259    /// Flushes data to storage once a full file can be written.
260    ///
261    /// The `close` method has to be invoked to write all data still buffered
262    /// and get the list of all written files.
263    pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
264        for result in self.divide_by_partition_values(batch)? {
265            self.write_partition(result.record_batch, &result.partition_values)
266                .await?;
267        }
268        Ok(())
269    }
270
271    /// Close the writer and get the new [Add] actions.
272    ///
273    /// This will flush all remaining data.
274    pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
275        let writers = std::mem::take(&mut self.partition_writers);
276        let actions = futures::stream::iter(writers)
277            .map(|(_, writer)| async move {
278                let writer_actions = writer.close().await?;
279                Ok::<_, DeltaTableError>(writer_actions)
280            })
281            .buffered(num_cpus::get())
282            .try_fold(Vec::new(), |mut acc, actions| {
283                acc.extend(actions);
284                futures::future::ready(Ok(acc))
285            })
286            .await?;
287
288        Ok(actions)
289    }
290}
291
292/// Write configuration for partition writers
293#[derive(Debug, Clone)]
294pub struct PartitionWriterConfig {
295    /// Schema of the data written to disk
296    file_schema: ArrowSchemaRef,
297    /// Prefix applied to all paths
298    prefix: Path,
299    /// Values for all partition columns
300    partition_values: IndexMap<String, Scalar>,
301    /// Properties passed to underlying parquet writer
302    writer_properties: WriterProperties,
303    /// Size above which we will write a buffered parquet file to disk.
304    /// If None, the writer will not create a new file until the writer is closed.
305    target_file_size: Option<NonZeroU64>,
306    /// Row chunks passed to parquet writer. This and the internal parquet writer settings
307    /// determine how fine granular we can track / control the size of resulting files.
308    write_batch_size: usize,
309    /// Concurrency level for writing to object store
310    max_concurrency_tasks: usize,
311}
312
313impl PartitionWriterConfig {
314    /// Create a new instance of [PartitionWriterConfig]
315    pub fn try_new(
316        file_schema: ArrowSchemaRef,
317        partition_values: IndexMap<String, Scalar>,
318        writer_properties: Option<WriterProperties>,
319        target_file_size: Option<NonZeroU64>,
320        write_batch_size: Option<usize>,
321        max_concurrency_tasks: Option<usize>,
322    ) -> DeltaResult<Self> {
323        let part_path = partition_values.hive_partition_path();
324        let prefix = Path::parse(part_path)?;
325        let writer_properties =
326            writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
327        let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
328
329        Ok(Self {
330            file_schema,
331            prefix,
332            partition_values,
333            writer_properties,
334            target_file_size,
335            write_batch_size,
336            max_concurrency_tasks: max_concurrency_tasks.unwrap_or_else(get_max_concurrency_tasks),
337        })
338    }
339}
340
341enum LazyArrowWriter {
342    Initialized(Path, ObjectStoreRef, PartitionWriterConfig),
343    Writing(Path, AsyncArrowWriter<ParquetObjectWriter>),
344}
345
346impl LazyArrowWriter {
347    async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
348        match self {
349            LazyArrowWriter::Initialized(path, object_store, config) => {
350                let writer = ParquetObjectWriter::from_buf_writer(
351                    BufWriter::with_capacity(
352                        object_store.clone(),
353                        path.clone(),
354                        upload_part_size(),
355                    )
356                    .with_max_concurrency(config.max_concurrency_tasks),
357                );
358                let mut arrow_writer = AsyncArrowWriter::try_new(
359                    writer,
360                    config.file_schema.clone(),
361                    Some(config.writer_properties.clone()),
362                )?;
363                arrow_writer.write(batch).await?;
364                *self = LazyArrowWriter::Writing(path.clone(), arrow_writer);
365            }
366            LazyArrowWriter::Writing(_, arrow_writer) => {
367                arrow_writer.write(batch).await?;
368            }
369        }
370
371        Ok(())
372    }
373
374    fn estimated_size(&self) -> usize {
375        match self {
376            LazyArrowWriter::Initialized(_, _, _) => 0,
377            LazyArrowWriter::Writing(_, arrow_writer) => {
378                arrow_writer.bytes_written() + arrow_writer.in_progress_size()
379            }
380        }
381    }
382}
383
384/// Partition writer implementation
385/// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files.
386/// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes.
387/// When you complete writing you get back a list of Add actions that can be used to update the Delta table commit log.
388pub struct PartitionWriter {
389    object_store: ObjectStoreRef,
390    writer_id: uuid::Uuid,
391    config: PartitionWriterConfig,
392    writer: LazyArrowWriter,
393    part_counter: usize,
394    /// Num index cols to collect stats for
395    num_indexed_cols: DataSkippingNumIndexedCols,
396    /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
397    stats_columns: Option<Vec<String>>,
398    in_flight_writers: JoinSet<DeltaResult<(Path, usize, ParquetMetaData)>>,
399}
400
401impl PartitionWriter {
402    /// Create a new instance of [`PartitionWriter`] from [`PartitionWriterConfig`]
403    pub fn try_with_config(
404        object_store: ObjectStoreRef,
405        config: PartitionWriterConfig,
406        num_indexed_cols: DataSkippingNumIndexedCols,
407        stats_columns: Option<Vec<String>>,
408    ) -> DeltaResult<Self> {
409        let writer_id = uuid::Uuid::new_v4();
410        let first_path = next_data_path(&config.prefix, 0, &writer_id, &config.writer_properties);
411        let writer = Self::create_writer(object_store.clone(), first_path.clone(), &config)?;
412
413        Ok(Self {
414            object_store,
415            writer_id,
416            config,
417            writer,
418            part_counter: 0,
419            num_indexed_cols,
420            stats_columns,
421            in_flight_writers: JoinSet::new(),
422        })
423    }
424
425    fn create_writer(
426        object_store: ObjectStoreRef,
427        path: Path,
428        config: &PartitionWriterConfig,
429    ) -> DeltaResult<LazyArrowWriter> {
430        let state = LazyArrowWriter::Initialized(path, object_store.clone(), config.clone());
431        Ok(state)
432    }
433
434    fn next_data_path(&mut self) -> Path {
435        self.part_counter += 1;
436
437        next_data_path(
438            &self.config.prefix,
439            self.part_counter,
440            &self.writer_id,
441            &self.config.writer_properties,
442        )
443    }
444
445    fn reset_writer(&mut self) -> DeltaResult<()> {
446        let next_path = self.next_data_path();
447        let new_writer = Self::create_writer(self.object_store.clone(), next_path, &self.config)?;
448        let state = std::mem::replace(&mut self.writer, new_writer);
449
450        if let LazyArrowWriter::Writing(path, arrow_writer) = state {
451            self.in_flight_writers
452                .spawn(upload_parquet_file(arrow_writer, path));
453        }
454        Ok(())
455    }
456
457    /// Buffers record batches in-memory up to appx. `target_file_size`.
458    /// Flushes data to storage once a full file can be written.
459    ///
460    /// The `close` method has to be invoked to write all data still buffered
461    /// and get the list of all written files.
462    pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
463        if batch.schema() != self.config.file_schema {
464            return Err(WriteError::SchemaMismatch {
465                schema: batch.schema(),
466                expected_schema: self.config.file_schema.clone(),
467            }
468            .into());
469        }
470
471        let max_offset = batch.num_rows();
472        for offset in (0..max_offset).step_by(self.config.write_batch_size) {
473            let length = usize::min(self.config.write_batch_size, max_offset - offset);
474            self.writer
475                .write_batch(&batch.slice(offset, length))
476                .await?;
477            if let Some(target_file_size) = self.config.target_file_size {
478                let estimated_size = self.writer.estimated_size();
479                // flush currently buffered data to disk once we meet or exceed the target file size.
480                if estimated_size as u64 >= target_file_size.get() {
481                    debug!("Writing file with estimated size {estimated_size:?} in background.");
482                    self.reset_writer()?;
483                }
484            }
485        }
486
487        Ok(())
488    }
489
490    /// Close the writer and get the new [Add] actions.
491    ///
492    /// This will flush any remaining data and collect all Add actions from background tasks.
493    pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
494        if let LazyArrowWriter::Writing(path, arrow_writer) = self.writer {
495            self.in_flight_writers
496                .spawn(upload_parquet_file(arrow_writer, path));
497        }
498
499        let mut results = Vec::new();
500        while let Some(result) = self.in_flight_writers.join_next().await {
501            match result {
502                Ok(Ok(data)) => results.push(data),
503                Ok(Err(e)) => {
504                    return Err(e);
505                }
506                Err(e) => {
507                    return Err(DeltaTableError::GenericError {
508                        source: Box::new(e),
509                    });
510                }
511            }
512        }
513
514        sort_completed_writes_by_path(&mut results);
515
516        let adds = results
517            .into_iter()
518            .map(|(path, file_size, metadata)| {
519                create_add(
520                    &self.config.partition_values,
521                    path.to_string(),
522                    file_size as i64,
523                    &metadata,
524                    self.num_indexed_cols,
525                    &self.stats_columns,
526                )
527                .map_err(|err| WriteError::CreateAdd {
528                    source: Box::new(err),
529                })
530            })
531            .collect::<Result<Vec<_>, _>>()?;
532
533        Ok(adds)
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use crate::DeltaTableBuilder;
541    use crate::crate_version;
542    use crate::logstore::tests::flatten_list_stream as list;
543    use crate::table::config::DEFAULT_NUM_INDEX_COLS;
544    use crate::writer::test_utils::*;
545    use arrow::array::{Int32Array, StringArray};
546    use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
547    use object_store::ObjectStoreExt as _;
548    use parquet::schema::types::ColumnPath;
549    use std::sync::Arc;
550
551    fn get_delta_writer(
552        object_store: ObjectStoreRef,
553        batch: &RecordBatch,
554        writer_properties: Option<WriterProperties>,
555        target_file_size: Option<NonZeroU64>,
556        write_batch_size: Option<usize>,
557    ) -> DeltaWriter {
558        let config = WriterConfig::new(
559            batch.schema(),
560            vec![],
561            writer_properties,
562            target_file_size,
563            write_batch_size,
564            DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
565            None,
566        );
567        DeltaWriter::new(object_store, config)
568    }
569
570    fn get_partition_writer(
571        object_store: ObjectStoreRef,
572        batch: &RecordBatch,
573        writer_properties: Option<WriterProperties>,
574        target_file_size: Option<NonZeroU64>,
575        write_batch_size: Option<usize>,
576    ) -> PartitionWriter {
577        let config = PartitionWriterConfig::try_new(
578            batch.schema(),
579            IndexMap::new(),
580            writer_properties,
581            target_file_size,
582            write_batch_size,
583            None,
584        )
585        .unwrap();
586        PartitionWriter::try_with_config(
587            object_store,
588            config,
589            DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
590            None,
591        )
592        .unwrap()
593    }
594
595    fn assert_default_created_by(writer_properties: &WriterProperties) {
596        assert_eq!(
597            writer_properties.created_by(),
598            format!("delta-rs version {}", crate_version())
599        );
600    }
601
602    #[test]
603    fn test_writer_config_defaults_include_delta_rs_created_by() {
604        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
605            "id",
606            DataType::Int32,
607            true,
608        )]));
609        let config = WriterConfig::new(
610            schema,
611            vec![],
612            None,
613            None,
614            None,
615            DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
616            None,
617        );
618
619        assert_default_created_by(&config.writer_properties);
620        assert_eq!(
621            config
622                .writer_properties
623                .compression(&ColumnPath::from("id")),
624            Compression::SNAPPY
625        );
626    }
627
628    #[test]
629    fn test_partition_writer_config_defaults_include_delta_rs_created_by() {
630        let schema = Arc::new(ArrowSchema::new(vec![Field::new(
631            "id",
632            DataType::Int32,
633            true,
634        )]));
635        let config =
636            PartitionWriterConfig::try_new(schema, IndexMap::new(), None, None, None, None)
637                .unwrap();
638
639        assert_default_created_by(&config.writer_properties);
640        assert_eq!(
641            config
642                .writer_properties
643                .compression(&ColumnPath::from("id")),
644            Compression::SNAPPY
645        );
646    }
647
648    #[tokio::test]
649    async fn test_write_partition() {
650        let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
651            .unwrap()
652            .build_storage()
653            .unwrap();
654        let object_store = log_store.object_store(None);
655        let batch = get_record_batch(None, false);
656
657        // write single un-partitioned batch
658        let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None);
659        writer.write(&batch).await.unwrap();
660        let files = list(object_store.as_ref(), None).await.unwrap();
661        assert_eq!(files.len(), 0);
662        let adds = writer.close().await.unwrap();
663        let files = list(object_store.as_ref(), None).await.unwrap();
664        assert_eq!(files.len(), 1);
665        assert_eq!(files.len(), adds.len());
666        let head = object_store
667            .head(&Path::from(adds[0].path.clone()))
668            .await
669            .unwrap();
670        assert_eq!(head.size, adds[0].size as u64)
671    }
672
673    #[tokio::test]
674    async fn test_write_partition_with_parts() {
675        let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
676        let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
677        let schema = Arc::new(ArrowSchema::new(vec![
678            Field::new("id", DataType::Utf8, true),
679            Field::new("value", DataType::Int32, true),
680        ]));
681        let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
682
683        let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
684            .unwrap()
685            .build_storage()
686            .unwrap()
687            .object_store(None);
688        let properties = WriterProperties::builder()
689            .set_max_row_group_row_count(Some(1024))
690            .build();
691        // configure small target file size and and row group size so we can observe multiple files written
692        let mut writer = get_partition_writer(
693            object_store,
694            &batch,
695            Some(properties),
696            Some(NonZeroU64::new(10_000).unwrap()),
697            None,
698        );
699        writer.write(&batch).await.unwrap();
700
701        // check that we have written more then once file, and no more then 1 is below target size
702        let adds = writer.close().await.unwrap();
703        assert!(adds.len() > 1);
704        let target_file_count = adds
705            .iter()
706            .fold(0, |acc, add| acc + (add.size > 10_000) as i32);
707        assert!(target_file_count >= adds.len() as i32 - 1)
708    }
709
710    #[tokio::test]
711    async fn test_unflushed_row_group_size() {
712        let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
713        let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
714        let schema = Arc::new(ArrowSchema::new(vec![
715            Field::new("id", DataType::Utf8, true),
716            Field::new("value", DataType::Int32, true),
717        ]));
718        let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
719
720        let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
721            .unwrap()
722            .build_storage()
723            .unwrap()
724            .object_store(None);
725        // configure small target file size so we can observe multiple files written
726        let mut writer = get_partition_writer(
727            object_store,
728            &batch,
729            None,
730            Some(NonZeroU64::new(10_000).unwrap()),
731            None,
732        );
733        writer.write(&batch).await.unwrap();
734
735        // check that we have written more then once file, and no more then 1 is below target size
736        let adds = writer.close().await.unwrap();
737        assert!(adds.len() > 1);
738        let target_file_count = adds
739            .iter()
740            .fold(0, |acc, add| acc + (add.size > 10_000) as i32);
741        assert!(target_file_count >= adds.len() as i32 - 1)
742    }
743
744    #[tokio::test]
745    async fn test_do_not_write_empty_file_on_close() {
746        let base_int = Arc::new(Int32Array::from((0..10000_i32).collect::<Vec<i32>>()));
747        let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
748        let schema = Arc::new(ArrowSchema::new(vec![
749            Field::new("id", DataType::Utf8, true),
750            Field::new("value", DataType::Int32, true),
751        ]));
752        let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
753
754        let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
755            .unwrap()
756            .build_storage()
757            .unwrap()
758            .object_store(None);
759        // configure high batch size and low file size to observe one file written and flushed immediately
760        // upon writing batch, then ensures the buffer is empty upon closing writer
761        let mut writer = get_partition_writer(
762            object_store,
763            &batch,
764            None,
765            Some(NonZeroU64::new(9000).unwrap()),
766            Some(10000),
767        );
768        writer.write(&batch).await.unwrap();
769
770        let adds = writer.close().await.unwrap();
771        assert_eq!(adds.len(), 1);
772    }
773
774    #[test]
775    fn test_sort_completed_writes_by_path() {
776        let mut results = vec![
777            (Path::from("part-00002.parquet"), 3, 2_u8),
778            (Path::from("part-00000.parquet"), 1, 0_u8),
779            (Path::from("part-00001.parquet"), 2, 1_u8),
780        ];
781
782        sort_completed_writes_by_path(&mut results);
783
784        let ordered_paths = results
785            .iter()
786            .map(|(path, _, _)| path.as_ref())
787            .collect::<Vec<_>>();
788        assert_eq!(
789            ordered_paths,
790            vec![
791                "part-00000.parquet",
792                "part-00001.parquet",
793                "part-00002.parquet"
794            ]
795        );
796    }
797
798    #[tokio::test]
799    async fn test_write_mismatched_schema() {
800        let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
801            .unwrap()
802            .build_storage()
803            .unwrap();
804        let object_store = log_store.object_store(None);
805        let batch = get_record_batch(None, false);
806
807        // write single un-partitioned batch
808        let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None);
809        writer.write(&batch).await.unwrap();
810        // Ensure the write hasn't been flushed
811        let files = list(object_store.as_ref(), None).await.unwrap();
812        assert_eq!(files.len(), 0);
813
814        // Create a second batch with a different schema
815        let second_schema = Arc::new(ArrowSchema::new(vec![
816            Field::new("id", DataType::Int32, true),
817            Field::new("name", DataType::Utf8, true),
818        ]));
819        let second_batch = RecordBatch::try_new(
820            second_schema,
821            vec![
822                Arc::new(Int32Array::from(vec![Some(1), Some(2)])),
823                Arc::new(StringArray::from(vec![Some("will"), Some("robert")])),
824            ],
825        )
826        .unwrap();
827
828        let result = writer.write(&second_batch).await;
829        assert!(result.is_err());
830
831        match result {
832            Ok(_) => {
833                panic!("Should not have successfully written");
834            }
835            Err(e) => {
836                match e {
837                    DeltaTableError::SchemaMismatch { .. } => {
838                        // this is expected
839                    }
840                    others => {
841                        panic!("Got the wrong error: {others:?}");
842                    }
843                }
844            }
845        };
846    }
847}