1use std::collections::HashMap;
4use std::num::NonZeroU64;
5use std::sync::OnceLock;
6
7use arrow_array::RecordBatch;
8use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef};
9use delta_kernel::expressions::Scalar;
10use delta_kernel::table_properties::DataSkippingNumIndexedCols;
11use futures::{StreamExt, TryStreamExt};
12use indexmap::IndexMap;
13use object_store::buffered::BufWriter;
14use object_store::path::Path;
15use parquet::arrow::AsyncArrowWriter;
16use parquet::arrow::async_writer::ParquetObjectWriter;
17use parquet::basic::Compression;
18use parquet::file::properties::WriterProperties;
19use tokio::task::JoinSet;
20use tracing::*;
21
22use crate::errors::{DeltaResult, DeltaTableError};
23use crate::kernel::{Add, PartitionsExt};
24use crate::logstore::ObjectStoreRef;
25use crate::parquet_utils::default_writer_properties;
26use crate::writer::record_batch::{PartitionResult, divide_by_partition_values};
27use crate::writer::stats::create_add;
28use crate::writer::utils::{
29 arrow_schema_without_partitions, next_data_path, record_batch_without_partitions,
30};
31
32use parquet::file::metadata::ParquetMetaData;
33
34const DEFAULT_WRITE_BATCH_SIZE: usize = 1024;
35const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5;
36const DEFAULT_MAX_CONCURRENCY_TASKS: usize = 10;
37
38fn upload_part_size() -> usize {
39 static UPLOAD_SIZE: OnceLock<usize> = OnceLock::new();
40 *UPLOAD_SIZE.get_or_init(|| {
41 std::env::var("DELTARS_UPLOAD_PART_SIZE")
42 .ok()
43 .and_then(|s| s.parse::<usize>().ok())
44 .map(|size| {
45 if size < DEFAULT_UPLOAD_PART_SIZE {
46 debug!("DELTARS_UPLOAD_PART_SIZE must be at least 5MB, therefore falling back on default of 5MB.");
48 DEFAULT_UPLOAD_PART_SIZE
49 } else if size > 1024 * 1024 * 1024 * 5 {
50 debug!("DELTARS_UPLOAD_PART_SIZE must not be higher than 5GB, therefore capping it at 5GB.");
52 1024 * 1024 * 1024 * 5
53 } else {
54 size
55 }
56 })
57 .unwrap_or(DEFAULT_UPLOAD_PART_SIZE)
58 })
59}
60
61fn get_max_concurrency_tasks() -> usize {
62 static MAX_CONCURRENCY_TASKS: OnceLock<usize> = OnceLock::new();
63 *MAX_CONCURRENCY_TASKS.get_or_init(|| {
64 std::env::var("DELTARS_MAX_CONCURRENCY_TASKS")
65 .ok()
66 .and_then(|s| s.parse::<usize>().ok())
67 .unwrap_or(DEFAULT_MAX_CONCURRENCY_TASKS)
68 })
69}
70
71#[instrument(skip(arrow_writer), fields(rows = 0, size = 0))]
73async fn upload_parquet_file(
74 mut arrow_writer: AsyncArrowWriter<ParquetObjectWriter>,
75 path: Path,
76) -> DeltaResult<(Path, usize, ParquetMetaData)> {
77 let metadata = arrow_writer.finish().await?;
78 let file_size = arrow_writer.bytes_written();
79 Span::current().record("rows", metadata.file_metadata().num_rows());
80 Span::current().record("size", file_size);
81 debug!("multipart upload completed successfully");
82
83 Ok((path, file_size, metadata))
84}
85
86fn sort_completed_writes_by_path<T>(results: &mut [(Path, usize, T)]) {
87 results.sort_unstable_by(|a, b| a.0.cmp(&b.0));
88}
89
90#[derive(thiserror::Error, Debug)]
91enum WriteError {
92 #[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")]
93 SchemaMismatch {
94 schema: ArrowSchemaRef,
95 expected_schema: ArrowSchemaRef,
96 },
97
98 #[error("Error creating add action: {source}")]
99 CreateAdd {
100 source: Box<dyn std::error::Error + Send + Sync + 'static>,
101 },
102
103 #[error("Error handling Arrow data: {source}")]
104 Arrow {
105 #[from]
106 source: ArrowError,
107 },
108
109 #[error("Error partitioning record batch: {0}")]
110 Partitioning(String),
111}
112
113impl From<WriteError> for DeltaTableError {
114 fn from(err: WriteError) -> Self {
115 match err {
116 WriteError::SchemaMismatch { .. } => DeltaTableError::SchemaMismatch {
117 msg: err.to_string(),
118 },
119 WriteError::Arrow { source } => DeltaTableError::Arrow { source },
120 _ => DeltaTableError::GenericError {
121 source: Box::new(err),
122 },
123 }
124 }
125}
126
127#[derive(Debug, Clone)]
129pub struct WriterConfig {
130 table_schema: ArrowSchemaRef,
132 partition_columns: Vec<String>,
134 writer_properties: WriterProperties,
136 target_file_size: Option<NonZeroU64>,
139 write_batch_size: usize,
142 num_indexed_cols: DataSkippingNumIndexedCols,
144 stats_columns: Option<Vec<String>>,
146}
147
148impl WriterConfig {
149 pub fn new(
151 table_schema: ArrowSchemaRef,
152 partition_columns: Vec<String>,
153 writer_properties: Option<WriterProperties>,
154 target_file_size: Option<NonZeroU64>,
155 write_batch_size: Option<usize>,
156 num_indexed_cols: DataSkippingNumIndexedCols,
157 stats_columns: Option<Vec<String>>,
158 ) -> Self {
159 let writer_properties =
160 writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
161 let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
162
163 Self {
164 table_schema,
165 partition_columns,
166 writer_properties,
167 target_file_size,
168 write_batch_size,
169 num_indexed_cols,
170 stats_columns,
171 }
172 }
173
174 pub fn file_schema(&self) -> ArrowSchemaRef {
176 arrow_schema_without_partitions(&self.table_schema, &self.partition_columns)
177 }
178}
179
180pub struct DeltaWriter {
182 object_store: ObjectStoreRef,
184 config: WriterConfig,
186 partition_writers: HashMap<Path, PartitionWriter>,
188}
189
190impl DeltaWriter {
191 pub fn new(object_store: ObjectStoreRef, config: WriterConfig) -> Self {
193 Self {
194 object_store,
195 config,
196 partition_writers: HashMap::new(),
197 }
198 }
199
200 pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
202 self.config.writer_properties = writer_properties;
203 self
204 }
205
206 fn divide_by_partition_values(
207 &mut self,
208 values: &RecordBatch,
209 ) -> DeltaResult<Vec<PartitionResult>> {
210 Ok(divide_by_partition_values(
211 self.config.file_schema(),
212 self.config.partition_columns.clone(),
213 values,
214 )
215 .map_err(|err| WriteError::Partitioning(err.to_string()))?)
216 }
217
218 pub async fn write_partition(
222 &mut self,
223 record_batch: RecordBatch,
224 partition_values: &IndexMap<String, Scalar>,
225 ) -> DeltaResult<()> {
226 let partition_key = Path::parse(partition_values.hive_partition_path())?;
227
228 let record_batch =
229 record_batch_without_partitions(&record_batch, &self.config.partition_columns)?;
230
231 match self.partition_writers.get_mut(&partition_key) {
232 Some(writer) => {
233 writer.write(&record_batch).await?;
234 }
235 None => {
236 let config = PartitionWriterConfig::try_new(
237 self.config.file_schema(),
238 partition_values.clone(),
239 Some(self.config.writer_properties.clone()),
240 self.config.target_file_size,
241 Some(self.config.write_batch_size),
242 None,
243 )?;
244 let mut writer = PartitionWriter::try_with_config(
245 self.object_store.clone(),
246 config,
247 self.config.num_indexed_cols,
248 self.config.stats_columns.clone(),
249 )?;
250 writer.write(&record_batch).await?;
251 let _ = self.partition_writers.insert(partition_key, writer);
252 }
253 }
254
255 Ok(())
256 }
257
258 pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
264 for result in self.divide_by_partition_values(batch)? {
265 self.write_partition(result.record_batch, &result.partition_values)
266 .await?;
267 }
268 Ok(())
269 }
270
271 pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
275 let writers = std::mem::take(&mut self.partition_writers);
276 let actions = futures::stream::iter(writers)
277 .map(|(_, writer)| async move {
278 let writer_actions = writer.close().await?;
279 Ok::<_, DeltaTableError>(writer_actions)
280 })
281 .buffered(num_cpus::get())
282 .try_fold(Vec::new(), |mut acc, actions| {
283 acc.extend(actions);
284 futures::future::ready(Ok(acc))
285 })
286 .await?;
287
288 Ok(actions)
289 }
290}
291
292#[derive(Debug, Clone)]
294pub struct PartitionWriterConfig {
295 file_schema: ArrowSchemaRef,
297 prefix: Path,
299 partition_values: IndexMap<String, Scalar>,
301 writer_properties: WriterProperties,
303 target_file_size: Option<NonZeroU64>,
306 write_batch_size: usize,
309 max_concurrency_tasks: usize,
311}
312
313impl PartitionWriterConfig {
314 pub fn try_new(
316 file_schema: ArrowSchemaRef,
317 partition_values: IndexMap<String, Scalar>,
318 writer_properties: Option<WriterProperties>,
319 target_file_size: Option<NonZeroU64>,
320 write_batch_size: Option<usize>,
321 max_concurrency_tasks: Option<usize>,
322 ) -> DeltaResult<Self> {
323 let part_path = partition_values.hive_partition_path();
324 let prefix = Path::parse(part_path)?;
325 let writer_properties =
326 writer_properties.unwrap_or_else(|| default_writer_properties(Compression::SNAPPY));
327 let write_batch_size = write_batch_size.unwrap_or(DEFAULT_WRITE_BATCH_SIZE);
328
329 Ok(Self {
330 file_schema,
331 prefix,
332 partition_values,
333 writer_properties,
334 target_file_size,
335 write_batch_size,
336 max_concurrency_tasks: max_concurrency_tasks.unwrap_or_else(get_max_concurrency_tasks),
337 })
338 }
339}
340
341enum LazyArrowWriter {
342 Initialized(Path, ObjectStoreRef, PartitionWriterConfig),
343 Writing(Path, AsyncArrowWriter<ParquetObjectWriter>),
344}
345
346impl LazyArrowWriter {
347 async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
348 match self {
349 LazyArrowWriter::Initialized(path, object_store, config) => {
350 let writer = ParquetObjectWriter::from_buf_writer(
351 BufWriter::with_capacity(
352 object_store.clone(),
353 path.clone(),
354 upload_part_size(),
355 )
356 .with_max_concurrency(config.max_concurrency_tasks),
357 );
358 let mut arrow_writer = AsyncArrowWriter::try_new(
359 writer,
360 config.file_schema.clone(),
361 Some(config.writer_properties.clone()),
362 )?;
363 arrow_writer.write(batch).await?;
364 *self = LazyArrowWriter::Writing(path.clone(), arrow_writer);
365 }
366 LazyArrowWriter::Writing(_, arrow_writer) => {
367 arrow_writer.write(batch).await?;
368 }
369 }
370
371 Ok(())
372 }
373
374 fn estimated_size(&self) -> usize {
375 match self {
376 LazyArrowWriter::Initialized(_, _, _) => 0,
377 LazyArrowWriter::Writing(_, arrow_writer) => {
378 arrow_writer.bytes_written() + arrow_writer.in_progress_size()
379 }
380 }
381 }
382}
383
384pub struct PartitionWriter {
389 object_store: ObjectStoreRef,
390 writer_id: uuid::Uuid,
391 config: PartitionWriterConfig,
392 writer: LazyArrowWriter,
393 part_counter: usize,
394 num_indexed_cols: DataSkippingNumIndexedCols,
396 stats_columns: Option<Vec<String>>,
398 in_flight_writers: JoinSet<DeltaResult<(Path, usize, ParquetMetaData)>>,
399}
400
401impl PartitionWriter {
402 pub fn try_with_config(
404 object_store: ObjectStoreRef,
405 config: PartitionWriterConfig,
406 num_indexed_cols: DataSkippingNumIndexedCols,
407 stats_columns: Option<Vec<String>>,
408 ) -> DeltaResult<Self> {
409 let writer_id = uuid::Uuid::new_v4();
410 let first_path = next_data_path(&config.prefix, 0, &writer_id, &config.writer_properties);
411 let writer = Self::create_writer(object_store.clone(), first_path.clone(), &config)?;
412
413 Ok(Self {
414 object_store,
415 writer_id,
416 config,
417 writer,
418 part_counter: 0,
419 num_indexed_cols,
420 stats_columns,
421 in_flight_writers: JoinSet::new(),
422 })
423 }
424
425 fn create_writer(
426 object_store: ObjectStoreRef,
427 path: Path,
428 config: &PartitionWriterConfig,
429 ) -> DeltaResult<LazyArrowWriter> {
430 let state = LazyArrowWriter::Initialized(path, object_store.clone(), config.clone());
431 Ok(state)
432 }
433
434 fn next_data_path(&mut self) -> Path {
435 self.part_counter += 1;
436
437 next_data_path(
438 &self.config.prefix,
439 self.part_counter,
440 &self.writer_id,
441 &self.config.writer_properties,
442 )
443 }
444
445 fn reset_writer(&mut self) -> DeltaResult<()> {
446 let next_path = self.next_data_path();
447 let new_writer = Self::create_writer(self.object_store.clone(), next_path, &self.config)?;
448 let state = std::mem::replace(&mut self.writer, new_writer);
449
450 if let LazyArrowWriter::Writing(path, arrow_writer) = state {
451 self.in_flight_writers
452 .spawn(upload_parquet_file(arrow_writer, path));
453 }
454 Ok(())
455 }
456
457 pub async fn write(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
463 if batch.schema() != self.config.file_schema {
464 return Err(WriteError::SchemaMismatch {
465 schema: batch.schema(),
466 expected_schema: self.config.file_schema.clone(),
467 }
468 .into());
469 }
470
471 let max_offset = batch.num_rows();
472 for offset in (0..max_offset).step_by(self.config.write_batch_size) {
473 let length = usize::min(self.config.write_batch_size, max_offset - offset);
474 self.writer
475 .write_batch(&batch.slice(offset, length))
476 .await?;
477 if let Some(target_file_size) = self.config.target_file_size {
478 let estimated_size = self.writer.estimated_size();
479 if estimated_size as u64 >= target_file_size.get() {
481 debug!("Writing file with estimated size {estimated_size:?} in background.");
482 self.reset_writer()?;
483 }
484 }
485 }
486
487 Ok(())
488 }
489
490 pub async fn close(mut self) -> DeltaResult<Vec<Add>> {
494 if let LazyArrowWriter::Writing(path, arrow_writer) = self.writer {
495 self.in_flight_writers
496 .spawn(upload_parquet_file(arrow_writer, path));
497 }
498
499 let mut results = Vec::new();
500 while let Some(result) = self.in_flight_writers.join_next().await {
501 match result {
502 Ok(Ok(data)) => results.push(data),
503 Ok(Err(e)) => {
504 return Err(e);
505 }
506 Err(e) => {
507 return Err(DeltaTableError::GenericError {
508 source: Box::new(e),
509 });
510 }
511 }
512 }
513
514 sort_completed_writes_by_path(&mut results);
515
516 let adds = results
517 .into_iter()
518 .map(|(path, file_size, metadata)| {
519 create_add(
520 &self.config.partition_values,
521 path.to_string(),
522 file_size as i64,
523 &metadata,
524 self.num_indexed_cols,
525 &self.stats_columns,
526 )
527 .map_err(|err| WriteError::CreateAdd {
528 source: Box::new(err),
529 })
530 })
531 .collect::<Result<Vec<_>, _>>()?;
532
533 Ok(adds)
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use crate::DeltaTableBuilder;
541 use crate::crate_version;
542 use crate::logstore::tests::flatten_list_stream as list;
543 use crate::table::config::DEFAULT_NUM_INDEX_COLS;
544 use crate::writer::test_utils::*;
545 use arrow::array::{Int32Array, StringArray};
546 use arrow::datatypes::{DataType, Field, Schema as ArrowSchema};
547 use object_store::ObjectStoreExt as _;
548 use parquet::schema::types::ColumnPath;
549 use std::sync::Arc;
550
551 fn get_delta_writer(
552 object_store: ObjectStoreRef,
553 batch: &RecordBatch,
554 writer_properties: Option<WriterProperties>,
555 target_file_size: Option<NonZeroU64>,
556 write_batch_size: Option<usize>,
557 ) -> DeltaWriter {
558 let config = WriterConfig::new(
559 batch.schema(),
560 vec![],
561 writer_properties,
562 target_file_size,
563 write_batch_size,
564 DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
565 None,
566 );
567 DeltaWriter::new(object_store, config)
568 }
569
570 fn get_partition_writer(
571 object_store: ObjectStoreRef,
572 batch: &RecordBatch,
573 writer_properties: Option<WriterProperties>,
574 target_file_size: Option<NonZeroU64>,
575 write_batch_size: Option<usize>,
576 ) -> PartitionWriter {
577 let config = PartitionWriterConfig::try_new(
578 batch.schema(),
579 IndexMap::new(),
580 writer_properties,
581 target_file_size,
582 write_batch_size,
583 None,
584 )
585 .unwrap();
586 PartitionWriter::try_with_config(
587 object_store,
588 config,
589 DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
590 None,
591 )
592 .unwrap()
593 }
594
595 fn assert_default_created_by(writer_properties: &WriterProperties) {
596 assert_eq!(
597 writer_properties.created_by(),
598 format!("delta-rs version {}", crate_version())
599 );
600 }
601
602 #[test]
603 fn test_writer_config_defaults_include_delta_rs_created_by() {
604 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
605 "id",
606 DataType::Int32,
607 true,
608 )]));
609 let config = WriterConfig::new(
610 schema,
611 vec![],
612 None,
613 None,
614 None,
615 DataSkippingNumIndexedCols::NumColumns(DEFAULT_NUM_INDEX_COLS),
616 None,
617 );
618
619 assert_default_created_by(&config.writer_properties);
620 assert_eq!(
621 config
622 .writer_properties
623 .compression(&ColumnPath::from("id")),
624 Compression::SNAPPY
625 );
626 }
627
628 #[test]
629 fn test_partition_writer_config_defaults_include_delta_rs_created_by() {
630 let schema = Arc::new(ArrowSchema::new(vec![Field::new(
631 "id",
632 DataType::Int32,
633 true,
634 )]));
635 let config =
636 PartitionWriterConfig::try_new(schema, IndexMap::new(), None, None, None, None)
637 .unwrap();
638
639 assert_default_created_by(&config.writer_properties);
640 assert_eq!(
641 config
642 .writer_properties
643 .compression(&ColumnPath::from("id")),
644 Compression::SNAPPY
645 );
646 }
647
648 #[tokio::test]
649 async fn test_write_partition() {
650 let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
651 .unwrap()
652 .build_storage()
653 .unwrap();
654 let object_store = log_store.object_store(None);
655 let batch = get_record_batch(None, false);
656
657 let mut writer = get_partition_writer(object_store.clone(), &batch, None, None, None);
659 writer.write(&batch).await.unwrap();
660 let files = list(object_store.as_ref(), None).await.unwrap();
661 assert_eq!(files.len(), 0);
662 let adds = writer.close().await.unwrap();
663 let files = list(object_store.as_ref(), None).await.unwrap();
664 assert_eq!(files.len(), 1);
665 assert_eq!(files.len(), adds.len());
666 let head = object_store
667 .head(&Path::from(adds[0].path.clone()))
668 .await
669 .unwrap();
670 assert_eq!(head.size, adds[0].size as u64)
671 }
672
673 #[tokio::test]
674 async fn test_write_partition_with_parts() {
675 let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
676 let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
677 let schema = Arc::new(ArrowSchema::new(vec![
678 Field::new("id", DataType::Utf8, true),
679 Field::new("value", DataType::Int32, true),
680 ]));
681 let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
682
683 let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
684 .unwrap()
685 .build_storage()
686 .unwrap()
687 .object_store(None);
688 let properties = WriterProperties::builder()
689 .set_max_row_group_row_count(Some(1024))
690 .build();
691 let mut writer = get_partition_writer(
693 object_store,
694 &batch,
695 Some(properties),
696 Some(NonZeroU64::new(10_000).unwrap()),
697 None,
698 );
699 writer.write(&batch).await.unwrap();
700
701 let adds = writer.close().await.unwrap();
703 assert!(adds.len() > 1);
704 let target_file_count = adds
705 .iter()
706 .fold(0, |acc, add| acc + (add.size > 10_000) as i32);
707 assert!(target_file_count >= adds.len() as i32 - 1)
708 }
709
710 #[tokio::test]
711 async fn test_unflushed_row_group_size() {
712 let base_int = Arc::new(Int32Array::from((0..10000).collect::<Vec<i32>>()));
713 let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
714 let schema = Arc::new(ArrowSchema::new(vec![
715 Field::new("id", DataType::Utf8, true),
716 Field::new("value", DataType::Int32, true),
717 ]));
718 let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
719
720 let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
721 .unwrap()
722 .build_storage()
723 .unwrap()
724 .object_store(None);
725 let mut writer = get_partition_writer(
727 object_store,
728 &batch,
729 None,
730 Some(NonZeroU64::new(10_000).unwrap()),
731 None,
732 );
733 writer.write(&batch).await.unwrap();
734
735 let adds = writer.close().await.unwrap();
737 assert!(adds.len() > 1);
738 let target_file_count = adds
739 .iter()
740 .fold(0, |acc, add| acc + (add.size > 10_000) as i32);
741 assert!(target_file_count >= adds.len() as i32 - 1)
742 }
743
744 #[tokio::test]
745 async fn test_do_not_write_empty_file_on_close() {
746 let base_int = Arc::new(Int32Array::from((0..10000_i32).collect::<Vec<i32>>()));
747 let base_str = Arc::new(StringArray::from(vec!["A"; 10000]));
748 let schema = Arc::new(ArrowSchema::new(vec![
749 Field::new("id", DataType::Utf8, true),
750 Field::new("value", DataType::Int32, true),
751 ]));
752 let batch = RecordBatch::try_new(schema, vec![base_str, base_int]).unwrap();
753
754 let object_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
755 .unwrap()
756 .build_storage()
757 .unwrap()
758 .object_store(None);
759 let mut writer = get_partition_writer(
762 object_store,
763 &batch,
764 None,
765 Some(NonZeroU64::new(9000).unwrap()),
766 Some(10000),
767 );
768 writer.write(&batch).await.unwrap();
769
770 let adds = writer.close().await.unwrap();
771 assert_eq!(adds.len(), 1);
772 }
773
774 #[test]
775 fn test_sort_completed_writes_by_path() {
776 let mut results = vec![
777 (Path::from("part-00002.parquet"), 3, 2_u8),
778 (Path::from("part-00000.parquet"), 1, 0_u8),
779 (Path::from("part-00001.parquet"), 2, 1_u8),
780 ];
781
782 sort_completed_writes_by_path(&mut results);
783
784 let ordered_paths = results
785 .iter()
786 .map(|(path, _, _)| path.as_ref())
787 .collect::<Vec<_>>();
788 assert_eq!(
789 ordered_paths,
790 vec![
791 "part-00000.parquet",
792 "part-00001.parquet",
793 "part-00002.parquet"
794 ]
795 );
796 }
797
798 #[tokio::test]
799 async fn test_write_mismatched_schema() {
800 let log_store = DeltaTableBuilder::from_url(url::Url::parse("memory:///").unwrap())
801 .unwrap()
802 .build_storage()
803 .unwrap();
804 let object_store = log_store.object_store(None);
805 let batch = get_record_batch(None, false);
806
807 let mut writer = get_delta_writer(object_store.clone(), &batch, None, None, None);
809 writer.write(&batch).await.unwrap();
810 let files = list(object_store.as_ref(), None).await.unwrap();
812 assert_eq!(files.len(), 0);
813
814 let second_schema = Arc::new(ArrowSchema::new(vec![
816 Field::new("id", DataType::Int32, true),
817 Field::new("name", DataType::Utf8, true),
818 ]));
819 let second_batch = RecordBatch::try_new(
820 second_schema,
821 vec![
822 Arc::new(Int32Array::from(vec![Some(1), Some(2)])),
823 Arc::new(StringArray::from(vec![Some("will"), Some("robert")])),
824 ],
825 )
826 .unwrap();
827
828 let result = writer.write(&second_batch).await;
829 assert!(result.is_err());
830
831 match result {
832 Ok(_) => {
833 panic!("Should not have successfully written");
834 }
835 Err(e) => {
836 match e {
837 DeltaTableError::SchemaMismatch { .. } => {
838 }
840 others => {
841 panic!("Got the wrong error: {others:?}");
842 }
843 }
844 }
845 };
846 }
847}