1use std::collections::HashMap;
24use std::fmt;
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::SchemaRef;
30use datafusion::catalog::Session;
31use datafusion::execution::context::SessionState;
32use datafusion::execution::memory_pool::FairSpillPool;
33use datafusion::execution::runtime_env::RuntimeEnvBuilder;
34use datafusion::execution::SessionStateBuilder;
35use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
36use delta_kernel::expressions::Scalar;
37use delta_kernel::table_properties::DataSkippingNumIndexedCols;
38use futures::future::BoxFuture;
39use futures::stream::BoxStream;
40use futures::{Future, StreamExt, TryStreamExt};
41use indexmap::IndexMap;
42use itertools::Itertools;
43use num_cpus;
44use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
45use parquet::basic::{Compression, ZstdLevel};
46use parquet::errors::ParquetError;
47use parquet::file::properties::WriterProperties;
48use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
49use tracing::*;
50use uuid::Uuid;
51
52use super::write::writer::{PartitionWriter, PartitionWriterConfig};
53use super::{CustomExecuteHandler, Operation};
54use crate::delta_datafusion::DeltaTableProvider;
55use crate::errors::{DeltaResult, DeltaTableError};
56use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL};
57use crate::kernel::{resolve_snapshot, EagerSnapshot};
58use crate::kernel::{scalars::ScalarExt, Action, Add, PartitionsExt, Remove};
59use crate::logstore::{LogStore, LogStoreRef, ObjectStoreRef};
60use crate::protocol::DeltaOperation;
61use crate::table::config::TablePropertiesExt as _;
62use crate::table::state::DeltaTableState;
63use crate::writer::utils::arrow_schema_without_partitions;
64use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter};
65
66#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct Metrics {
70 pub num_files_added: u64,
72 pub num_files_removed: u64,
74 #[serde(
76 serialize_with = "serialize_metric_details",
77 deserialize_with = "deserialize_metric_details"
78 )]
79 pub files_added: MetricDetails,
80 #[serde(
82 serialize_with = "serialize_metric_details",
83 deserialize_with = "deserialize_metric_details"
84 )]
85 pub files_removed: MetricDetails,
86 pub partitions_optimized: u64,
88 pub num_batches: u64,
90 pub total_considered_files: usize,
92 pub total_files_skipped: usize,
94 pub preserve_insertion_order: bool,
96}
97
98fn serialize_metric_details<S>(value: &MetricDetails, serializer: S) -> Result<S::Ok, S::Error>
100where
101 S: Serializer,
102{
103 serializer.serialize_str(&value.to_string())
104}
105
106fn deserialize_metric_details<'de, D>(deserializer: D) -> Result<MetricDetails, D::Error>
108where
109 D: Deserializer<'de>,
110{
111 let s: String = Deserialize::deserialize(deserializer)?;
112 serde_json::from_str(&s).map_err(DeError::custom)
113}
114
115#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct MetricDetails {
120 pub avg: f64,
122 pub max: i64,
124 pub min: i64,
126 pub total_files: usize,
128 pub total_size: i64,
130}
131
132impl MetricDetails {
133 pub fn add(&mut self, partial: &MetricDetails) {
135 self.min = std::cmp::min(self.min, partial.min);
136 self.max = std::cmp::max(self.max, partial.max);
137 self.total_files += partial.total_files;
138 self.total_size += partial.total_size;
139 self.avg = self.total_size as f64 / self.total_files as f64;
140 }
141}
142
143impl fmt::Display for MetricDetails {
144 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146 serde_json::to_string(self).map_err(|_| fmt::Error)?.fmt(f)
147 }
148}
149
150#[derive(Debug)]
151pub struct PartialMetrics {
153 pub num_files_added: u64,
155 pub num_files_removed: u64,
157 pub files_added: MetricDetails,
159 pub files_removed: MetricDetails,
161 pub num_batches: u64,
163}
164
165impl Metrics {
166 pub fn add(&mut self, partial: &PartialMetrics) {
168 self.num_files_added += partial.num_files_added;
169 self.num_files_removed += partial.num_files_removed;
170 self.files_added.add(&partial.files_added);
171 self.files_removed.add(&partial.files_removed);
172 self.num_batches += partial.num_batches;
173 }
174}
175
176impl Default for MetricDetails {
177 fn default() -> Self {
178 MetricDetails {
179 min: i64::MAX,
180 max: 0,
181 avg: 0.0,
182 total_files: 0,
183 total_size: 0,
184 }
185 }
186}
187
188#[derive(Debug)]
190pub enum OptimizeType {
191 Compact,
193 ZOrder(Vec<String>),
195}
196
197pub struct OptimizeBuilder<'a> {
202 snapshot: Option<EagerSnapshot>,
204 log_store: LogStoreRef,
206 filters: &'a [PartitionFilter],
208 target_size: Option<u64>,
210 writer_properties: Option<WriterProperties>,
212 commit_properties: CommitProperties,
214 preserve_insertion_order: bool,
216 max_concurrent_tasks: usize,
218 max_spill_size: usize,
220 optimize_type: OptimizeType,
222 session: Option<Arc<dyn Session>>,
224 min_commit_interval: Option<Duration>,
225 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
226}
227
228impl super::Operation for OptimizeBuilder<'_> {
229 fn log_store(&self) -> &LogStoreRef {
230 &self.log_store
231 }
232 fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
233 self.custom_execute_handler.clone()
234 }
235}
236
237impl<'a> OptimizeBuilder<'a> {
238 pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
240 Self {
241 snapshot,
242 log_store,
243 filters: &[],
244 target_size: None,
245 writer_properties: None,
246 commit_properties: CommitProperties::default(),
247 preserve_insertion_order: false,
248 max_concurrent_tasks: num_cpus::get(),
249 max_spill_size: 20 * 1024 * 1024 * 1024, optimize_type: OptimizeType::Compact,
251 min_commit_interval: None,
252 session: None,
253 custom_execute_handler: None,
254 }
255 }
256
257 pub fn with_type(mut self, optimize_type: OptimizeType) -> Self {
259 self.optimize_type = optimize_type;
260 self
261 }
262
263 pub fn with_filters(mut self, filters: &'a [PartitionFilter]) -> Self {
265 self.filters = filters;
266 self
267 }
268
269 pub fn with_target_size(mut self, target: u64) -> Self {
271 self.target_size = Some(target);
272 self
273 }
274
275 pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
277 self.writer_properties = Some(writer_properties);
278 self
279 }
280
281 pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
283 self.commit_properties = commit_properties;
284 self
285 }
286
287 pub fn with_preserve_insertion_order(mut self, preserve_insertion_order: bool) -> Self {
289 self.preserve_insertion_order = preserve_insertion_order;
290 self
291 }
292
293 pub fn with_max_concurrent_tasks(mut self, max_concurrent_tasks: usize) -> Self {
295 self.max_concurrent_tasks = max_concurrent_tasks;
296 self
297 }
298
299 #[deprecated(
301 since = "0.29.0",
302 note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`"
303 )]
304 pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self {
305 self.max_spill_size = max_spill_size;
306 self
307 }
308
309 pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
311 self.min_commit_interval = Some(min_commit_interval);
312 self
313 }
314
315 pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
317 self.custom_execute_handler = Some(handler);
318 self
319 }
320
321 pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
323 self.session = Some(session);
324 self
325 }
326}
327
328impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
329 type Output = DeltaResult<(DeltaTable, Metrics)>;
330 type IntoFuture = BoxFuture<'a, Self::Output>;
331
332 fn into_future(self) -> Self::IntoFuture {
333 let this = self;
334
335 Box::pin(async move {
336 let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?;
337 PROTOCOL.can_write_to(&snapshot)?;
338
339 let operation_id = this.get_operation_id();
340 this.pre_execute(operation_id).await?;
341
342 let writer_properties = this.writer_properties.unwrap_or_else(|| {
343 WriterProperties::builder()
344 .set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
345 .set_created_by(format!("delta-rs version {}", crate_version()))
346 .build()
347 });
348 let session = this
349 .session
350 .and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
351 .unwrap_or_else(|| {
352 let memory_pool = FairSpillPool::new(this.max_spill_size);
353 let runtime = RuntimeEnvBuilder::new()
354 .with_memory_pool(Arc::new(memory_pool))
355 .build_arc()
356 .unwrap();
357 SessionStateBuilder::new()
358 .with_default_features()
359 .with_runtime_env(runtime)
360 .build()
361 });
362 let plan = create_merge_plan(
363 &this.log_store,
364 this.optimize_type,
365 &snapshot,
366 this.filters,
367 this.target_size.to_owned(),
368 writer_properties,
369 session,
370 )
371 .await?;
372
373 let metrics = plan
374 .execute(
375 this.log_store.clone(),
376 &snapshot,
377 this.max_concurrent_tasks,
378 this.min_commit_interval,
379 this.commit_properties.clone(),
380 operation_id,
381 this.custom_execute_handler.as_ref(),
382 )
383 .await?;
384
385 if let Some(handler) = this.custom_execute_handler {
386 handler.post_execute(&this.log_store, operation_id).await?;
387 }
388 let mut table =
389 DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot));
390 table.update().await?;
391 Ok((table, metrics))
392 })
393 }
394}
395
396#[derive(Debug, Clone)]
397struct OptimizeInput {
398 target_size: u64,
399 predicate: Option<String>,
400}
401
402impl From<OptimizeInput> for DeltaOperation {
403 fn from(opt_input: OptimizeInput) -> Self {
404 DeltaOperation::Optimize {
405 target_size: opt_input.target_size as i64,
406 predicate: opt_input.predicate,
407 }
408 }
409}
410
411fn create_remove(
413 path: &str,
414 partitions: &IndexMap<String, Scalar>,
415 size: i64,
416) -> Result<Action, DeltaTableError> {
417 let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
419 let deletion_time = deletion_time.as_millis() as i64;
420
421 Ok(Action::Remove(Remove {
422 path: path.to_string(),
423 deletion_timestamp: Some(deletion_time),
424 data_change: false,
425 extended_file_metadata: None,
426 partition_values: Some(
427 partitions
428 .iter()
429 .map(|(k, v)| {
430 (
431 k.clone(),
432 if v.is_null() {
433 None
434 } else {
435 Some(v.serialize())
436 },
437 )
438 })
439 .collect(),
440 ),
441 size: Some(size),
442 deletion_vector: None,
443 tags: None,
444 base_row_id: None,
445 default_row_commit_version: None,
446 }))
447}
448
449#[derive(Debug)]
454enum OptimizeOperations {
455 Compact(HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)>),
460 ZOrder(
462 Vec<String>,
463 HashMap<String, (IndexMap<String, Scalar>, MergeBin)>,
464 Box<SessionState>,
465 ),
466 }
468
469impl Default for OptimizeOperations {
470 fn default() -> Self {
471 OptimizeOperations::Compact(HashMap::new())
472 }
473}
474
475#[derive(Debug)]
476pub struct MergePlan {
478 operations: OptimizeOperations,
479 metrics: Metrics,
481 task_parameters: Arc<MergeTaskParameters>,
483 read_table_version: i64,
485}
486
487#[derive(Debug)]
489pub struct MergeTaskParameters {
490 input_parameters: OptimizeInput,
492 file_schema: SchemaRef,
494 writer_properties: WriterProperties,
496 num_indexed_cols: DataSkippingNumIndexedCols,
498 stats_columns: Option<Vec<String>>,
500}
501
502type ParquetReadStream = BoxStream<'static, Result<RecordBatch, ParquetError>>;
504
505impl MergePlan {
506 async fn rewrite_files<F>(
511 task_parameters: Arc<MergeTaskParameters>,
512 partition_values: IndexMap<String, Scalar>,
513 files: MergeBin,
514 object_store: ObjectStoreRef,
515 read_stream: F,
516 ) -> Result<(Vec<Action>, PartialMetrics), DeltaTableError>
517 where
518 F: Future<Output = Result<ParquetReadStream, DeltaTableError>> + Send + 'static,
519 {
520 debug!("Rewriting files in partition: {partition_values:?}");
521 let mut partial_actions = files
523 .iter()
524 .map(|file_meta| {
525 create_remove(file_meta.path.as_ref(), &partition_values, file_meta.size)
526 })
527 .collect::<Result<Vec<_>, DeltaTableError>>()?;
528
529 let files_removed = files
530 .iter()
531 .fold(MetricDetails::default(), |mut curr, file| {
532 curr.total_files += 1;
533 curr.total_size += file.size;
534 curr.max = std::cmp::max(curr.max, file.size);
535 curr.min = std::cmp::min(curr.min, file.size);
536 curr
537 });
538
539 let mut partial_metrics = PartialMetrics {
540 num_files_added: 0,
541 num_files_removed: files.len() as u64,
542 files_added: MetricDetails::default(),
543 files_removed,
544 num_batches: 0,
545 };
546
547 let writer_config = PartitionWriterConfig::try_new(
549 task_parameters.file_schema.clone(),
550 partition_values.clone(),
551 Some(task_parameters.writer_properties.clone()),
552 Some(task_parameters.input_parameters.target_size as usize),
553 None,
554 None,
555 )?;
556 let mut writer = PartitionWriter::try_with_config(
557 object_store,
558 writer_config,
559 task_parameters.num_indexed_cols,
560 task_parameters.stats_columns.clone(),
561 )?;
562
563 let mut read_stream = read_stream.await?;
564
565 while let Some(maybe_batch) = read_stream.next().await {
566 let mut batch = maybe_batch?;
567
568 batch = crate::kernel::schema::cast::cast_record_batch(
569 &batch,
570 task_parameters.file_schema.clone(),
571 false,
572 true,
573 )?;
574 partial_metrics.num_batches += 1;
575 writer.write(&batch).await?;
576 }
577
578 let add_actions = writer.close().await?.into_iter().map(|mut add| {
579 add.data_change = false;
580
581 let size = add.size;
582
583 partial_metrics.num_files_added += 1;
584 partial_metrics.files_added.total_files += 1;
585 partial_metrics.files_added.total_size += size;
586 partial_metrics.files_added.max = std::cmp::max(partial_metrics.files_added.max, size);
587 partial_metrics.files_added.min = std::cmp::min(partial_metrics.files_added.min, size);
588
589 Action::Add(add)
590 });
591 partial_actions.extend(add_actions);
592
593 debug!("Finished rewriting files in partition: {partition_values:?}");
594
595 Ok((partial_actions, partial_metrics))
596 }
597
598 async fn read_zorder(
600 files: MergeBin,
601 context: Arc<zorder::ZOrderExecContext>,
602 table_provider: DeltaTableProvider,
603 ) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
604 use datafusion::common::Column;
605 use datafusion::logical_expr::expr::ScalarFunction;
606 use datafusion::logical_expr::{Expr, ScalarUDF};
607
608 let provider = table_provider.with_files(files.files);
609 let df = context.ctx.read_table(Arc::new(provider))?;
610
611 let cols = context
612 .columns
613 .iter()
614 .map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col)))
615 .collect_vec();
616 let expr = Expr::ScalarFunction(ScalarFunction::new_udf(
617 Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)),
618 cols,
619 ));
620 let df = df.sort(vec![expr.sort(true, true)])?;
621
622 let stream = df
623 .execute_stream()
624 .await?
625 .map_err(|err| {
626 ParquetError::General(format!("Z-order failed while scanning data: {err:?}"))
627 })
628 .boxed();
629
630 Ok(stream)
631 }
632
633 #[allow(clippy::too_many_arguments)]
635 #[instrument(skip_all, fields(operation = "optimize", version = snapshot.version()))]
636 pub async fn execute(
637 mut self,
638 log_store: LogStoreRef,
639 snapshot: &EagerSnapshot,
640 max_concurrent_tasks: usize,
641 min_commit_interval: Option<Duration>,
642 commit_properties: CommitProperties,
643 operation_id: Uuid,
644 handle: Option<&Arc<dyn CustomExecuteHandler>>,
645 ) -> Result<Metrics, DeltaTableError> {
646 let operations = std::mem::take(&mut self.operations);
647 info!("starting optimize execution");
648 let object_store = log_store.object_store(Some(operation_id));
649
650 let stream = match operations {
651 OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
652 .flat_map(|(_, (partition, bins))| {
653 futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
654 })
655 .map(|(partition, files)| {
656 debug!(
657 "merging a group of {} files in partition {partition:?}",
658 files.len(),
659 );
660 for file in files.iter() {
661 debug!(" file {}", file.path);
662 }
663 let object_store_ref = object_store.clone();
664 let batch_stream = futures::stream::iter(files.clone())
665 .then(move |file| {
666 let object_store_ref = object_store_ref.clone();
667 let meta = ObjectMeta::try_from(file).unwrap();
668 async move {
669 let file_reader =
670 ParquetObjectReader::new(object_store_ref, meta.location)
671 .with_file_size(meta.size);
672 ParquetRecordBatchStreamBuilder::new(file_reader)
673 .await?
674 .build()
675 }
676 })
677 .try_flatten()
678 .boxed();
679
680 let rewrite_result = tokio::task::spawn(Self::rewrite_files(
681 self.task_parameters.clone(),
682 partition,
683 files,
684 object_store.clone(),
685 futures::future::ready(Ok(batch_stream)),
686 ));
687 util::flatten_join_error(rewrite_result)
688 })
689 .boxed(),
690 OptimizeOperations::ZOrder(zorder_columns, bins, state) => {
691 debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
692
693 let exec_context = Arc::new(zorder::ZOrderExecContext::new(
694 zorder_columns,
695 *state,
696 object_store,
697 )?);
698 let task_parameters = self.task_parameters.clone();
699
700 use crate::delta_datafusion::DataFusionMixins;
701 use crate::delta_datafusion::DeltaScanConfigBuilder;
702 use crate::delta_datafusion::DeltaTableProvider;
703
704 let scan_config = DeltaScanConfigBuilder::default()
705 .with_file_column(false)
706 .with_schema(snapshot.input_schema())
707 .build(snapshot)?;
708
709 let log_store = log_store.clone();
712 futures::stream::iter(bins)
713 .map(move |(_, (partition, files))| {
714 let batch_stream = Self::read_zorder(
715 files.clone(),
716 exec_context.clone(),
717 DeltaTableProvider::try_new(
718 snapshot.clone(),
719 log_store.clone(),
720 scan_config.clone(),
721 )
722 .unwrap(),
723 );
724 let rewrite_result = tokio::task::spawn(Self::rewrite_files(
725 task_parameters.clone(),
726 partition,
727 files,
728 log_store.object_store(Some(operation_id)),
729 batch_stream,
730 ));
731 util::flatten_join_error(rewrite_result)
732 })
733 .boxed()
734 }
735 };
736
737 let mut stream = stream.buffer_unordered(max_concurrent_tasks);
738
739 let mut table =
740 DeltaTable::new_with_state(log_store.clone(), DeltaTableState::new(snapshot.clone()));
741
742 let mut actions = vec![];
745
746 let orig_metrics = std::mem::take(&mut self.metrics);
748 let mut buffered_metrics = orig_metrics.clone();
749 let mut total_metrics = orig_metrics.clone();
750
751 let mut last_commit = Instant::now();
752 let mut commits_made = 0;
753 let mut snapshot = snapshot.clone();
754 loop {
755 let next = stream.next().await.transpose()?;
756
757 let end = next.is_none();
758
759 if let Some((partial_actions, partial_metrics)) = next {
760 debug!("Recording metrics for a completed partition");
761 actions.extend(partial_actions);
762 buffered_metrics.add(&partial_metrics);
763 total_metrics.add(&partial_metrics);
764 }
765
766 let now = Instant::now();
767 let mature = match min_commit_interval {
768 None => false,
769 Some(i) => now.duration_since(last_commit) > i,
770 };
771 if !actions.is_empty() && (mature || end) {
772 let actions = std::mem::take(&mut actions);
773 last_commit = now;
774
775 buffered_metrics.preserve_insertion_order = true;
776 let mut properties = CommitProperties::default();
777 properties.app_metadata = commit_properties.app_metadata.clone();
778 properties
779 .app_metadata
780 .insert("readVersion".to_owned(), self.read_table_version.into());
781 let maybe_map_metrics = serde_json::to_value(std::mem::replace(
782 &mut buffered_metrics,
783 orig_metrics.clone(),
784 ));
785 if let Ok(map) = maybe_map_metrics {
786 properties
787 .app_metadata
788 .insert("operationMetrics".to_owned(), map);
789 }
790
791 debug!("committing {} actions", actions.len());
792
793 let commit = CommitBuilder::from(properties)
794 .with_actions(actions)
795 .with_operation_id(operation_id)
796 .with_post_commit_hook_handler(handle.cloned())
797 .with_max_retries(DEFAULT_RETRIES + commits_made)
798 .build(
799 Some(&snapshot),
800 log_store.clone(),
801 self.task_parameters.input_parameters.clone().into(),
802 )
803 .await?;
804 snapshot = commit.snapshot().snapshot;
805 commits_made += 1;
806 }
807
808 if end {
809 break;
810 }
811 }
812
813 total_metrics.preserve_insertion_order = true;
814 if total_metrics.num_files_added == 0 {
815 total_metrics.files_added.min = 0;
816 }
817 if total_metrics.num_files_removed == 0 {
818 total_metrics.files_removed.min = 0;
819 }
820
821 table.state = Some(DeltaTableState::new(snapshot));
822
823 Ok(total_metrics)
824 }
825}
826
827#[instrument(skip_all, fields(operation = "create_merge_plan", version = snapshot.version()))]
829pub async fn create_merge_plan(
830 log_store: &dyn LogStore,
831 optimize_type: OptimizeType,
832 snapshot: &EagerSnapshot,
833 filters: &[PartitionFilter],
834 target_size: Option<u64>,
835 writer_properties: WriterProperties,
836 session: SessionState,
837) -> Result<MergePlan, DeltaTableError> {
838 let target_size =
839 target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get());
840 let partitions_keys = snapshot.metadata().partition_columns();
841
842 let (operations, metrics) = match optimize_type {
843 OptimizeType::Compact => {
844 info!("building compaction plan");
845 build_compaction_plan(log_store, snapshot, filters, target_size).await?
846 }
847 OptimizeType::ZOrder(zorder_columns) => {
848 info!("building z-order plan");
849 build_zorder_plan(
850 log_store,
851 zorder_columns,
852 snapshot,
853 partitions_keys,
854 filters,
855 session,
856 )
857 .await?
858 }
859 };
860
861 info!(
862 partitions_optimized = metrics.partitions_optimized,
863 total_considered_files = metrics.total_considered_files,
864 "merge plan created"
865 );
866
867 let input_parameters = OptimizeInput {
868 target_size,
869 predicate: serde_json::to_string(filters).ok(),
870 };
871 let file_schema = arrow_schema_without_partitions(
872 &Arc::new(snapshot.schema().as_ref().try_into_arrow()?),
873 partitions_keys,
874 );
875
876 Ok(MergePlan {
877 operations,
878 metrics,
879 task_parameters: Arc::new(MergeTaskParameters {
880 input_parameters,
881 file_schema,
882 writer_properties,
883 num_indexed_cols: snapshot.table_properties().num_indexed_cols(),
884 stats_columns: snapshot
885 .table_properties()
886 .data_skipping_stats_columns
887 .as_ref()
888 .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
889 }),
890 read_table_version: snapshot.version(),
891 })
892}
893
894#[derive(Debug, Clone)]
896struct MergeBin {
897 files: Vec<Add>,
898 size_bytes: u64,
899}
900
901impl MergeBin {
902 pub fn new() -> Self {
903 MergeBin {
904 files: Vec::new(),
905 size_bytes: 0,
906 }
907 }
908
909 fn total_file_size(&self) -> u64 {
910 self.size_bytes
911 }
912
913 fn len(&self) -> usize {
914 self.files.len()
915 }
916
917 fn add(&mut self, add: Add) {
918 self.size_bytes += add.size as u64;
919 self.files.push(add);
920 }
921
922 fn iter(&self) -> impl Iterator<Item = &Add> {
923 self.files.iter()
924 }
925}
926
927impl IntoIterator for MergeBin {
928 type Item = Add;
929 type IntoIter = std::vec::IntoIter<Self::Item>;
930
931 fn into_iter(self) -> Self::IntoIter {
932 self.files.into_iter()
933 }
934}
935
936async fn build_compaction_plan(
937 log_store: &dyn LogStore,
938 snapshot: &EagerSnapshot,
939 filters: &[PartitionFilter],
940 target_size: u64,
941) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
942 let mut metrics = Metrics::default();
943
944 let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, Vec<Add>)> = HashMap::new();
945 let mut file_stream = snapshot.file_views_by_partitions(log_store, filters);
946 while let Some(file) = file_stream.next().await {
947 let file = file?;
948 metrics.total_considered_files += 1;
949 let object_meta = ObjectMeta::try_from(&file)?;
950 if object_meta.size > target_size {
951 metrics.total_files_skipped += 1;
952 continue;
953 }
954 let partition_values = file
955 .partition_values()
956 .map(|v| {
957 v.fields()
958 .iter()
959 .zip(v.values().iter())
960 .map(|(k, v)| (k.name().to_string(), v.clone()))
961 .collect::<IndexMap<_, _>>()
962 })
963 .unwrap_or_default();
964
965 partition_files
966 .entry(partition_values.hive_partition_path())
967 .or_insert_with(|| (partition_values, vec![]))
968 .1
969 .push(file.add_action());
970 }
971
972 for (_, file) in partition_files.values_mut() {
973 file.sort_by(|a, b| b.size.cmp(&a.size));
975 }
976
977 let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
978 for (part, (partition, files)) in partition_files {
979 let mut merge_bins = vec![MergeBin::new()];
980
981 'files: for file in files {
982 for bin in merge_bins.iter_mut() {
983 if bin.total_file_size() + file.size as u64 <= target_size {
984 bin.add(file);
985 continue 'files;
987 }
988 }
989 let mut new_bin = MergeBin::new();
991 new_bin.add(file);
992 merge_bins.push(new_bin);
993 }
994
995 operations.insert(part, (partition, merge_bins));
996 }
997
998 for (_, (_, bins)) in operations.iter_mut() {
1000 bins.retain(|bin| {
1001 if bin.len() == 1 {
1002 metrics.total_files_skipped += 1;
1003 false
1004 } else {
1005 true
1006 }
1007 })
1008 }
1009 operations.retain(|_, (_, files)| !files.is_empty());
1010
1011 metrics.partitions_optimized = operations.len() as u64;
1012
1013 Ok((OptimizeOperations::Compact(operations), metrics))
1014}
1015
1016async fn build_zorder_plan(
1017 log_store: &dyn LogStore,
1018 zorder_columns: Vec<String>,
1019 snapshot: &EagerSnapshot,
1020 partition_keys: &[String],
1021 filters: &[PartitionFilter],
1022 session: SessionState,
1023) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
1024 if zorder_columns.is_empty() {
1025 return Err(DeltaTableError::Generic(
1026 "Z-order requires at least one column".to_string(),
1027 ));
1028 }
1029 let zorder_partition_cols = zorder_columns
1030 .iter()
1031 .filter(|col| partition_keys.contains(col))
1032 .collect_vec();
1033 if !zorder_partition_cols.is_empty() {
1034 return Err(DeltaTableError::Generic(format!(
1035 "Z-order columns cannot be partition columns. Found: {zorder_partition_cols:?}"
1036 )));
1037 }
1038 let field_names = snapshot
1039 .schema()
1040 .fields()
1041 .map(|field| field.name().to_string())
1042 .collect_vec();
1043 let unknown_columns = zorder_columns
1044 .iter()
1045 .filter(|col| !field_names.contains(col))
1046 .collect_vec();
1047 if !unknown_columns.is_empty() {
1048 return Err(DeltaTableError::Generic(
1049 format!("Z-order columns must be present in the table schema. Unknown columns: {unknown_columns:?}"),
1050 ));
1051 }
1052
1053 let mut metrics = Metrics::default();
1055
1056 let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, MergeBin)> = HashMap::new();
1057 let mut file_stream = snapshot.file_views_by_partitions(log_store, filters);
1058 while let Some(file) = file_stream.next().await {
1059 let file = file?;
1060 let partition_values = file
1061 .partition_values()
1062 .map(|v| {
1063 v.fields()
1064 .iter()
1065 .zip(v.values().iter())
1066 .map(|(k, v)| (k.name().to_string(), v.clone()))
1067 .collect::<IndexMap<_, _>>()
1068 })
1069 .unwrap_or_default();
1070 metrics.total_considered_files += 1;
1071 partition_files
1072 .entry(partition_values.hive_partition_path())
1073 .or_insert_with(|| (partition_values, MergeBin::new()))
1074 .1
1075 .add(file.add_action());
1076 debug!("partition_files inside the zorder plan: {partition_files:?}");
1077 }
1078
1079 let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, Box::new(session));
1080 Ok((operation, metrics))
1081}
1082
1083pub(super) mod util {
1084 use super::*;
1085 use futures::Future;
1086 use tokio::task::JoinError;
1087
1088 pub async fn flatten_join_error<T, E>(
1089 future: impl Future<Output = Result<Result<T, E>, JoinError>>,
1090 ) -> Result<T, DeltaTableError>
1091 where
1092 E: Into<DeltaTableError>,
1093 {
1094 match future.await {
1095 Ok(Ok(result)) => Ok(result),
1096 Ok(Err(error)) => Err(error.into()),
1097 Err(error) => Err(DeltaTableError::GenericError {
1098 source: Box::new(error),
1099 }),
1100 }
1101 }
1102}
1103
1104pub(super) mod zorder {
1106 use super::*;
1107
1108 use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
1109 use arrow_array::{Array, ArrayRef, BinaryArray};
1110 use arrow_buffer::bit_util::{get_bit_raw, set_bit_raw, unset_bit_raw};
1111 use arrow_row::{Row, RowConverter, SortField};
1112 use arrow_schema::ArrowError;
1113 pub use self::datafusion::ZOrderExecContext;
1116
1117 pub(super) mod datafusion {
1118 use super::*;
1119 use url::Url;
1120
1121 use ::datafusion::common::DataFusionError;
1122 use ::datafusion::logical_expr::{
1123 ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
1124 Volatility,
1125 };
1126 use ::datafusion::prelude::SessionContext;
1127 use arrow_schema::DataType;
1128 use itertools::Itertools;
1129 use std::any::Any;
1130
1131 pub const ZORDER_UDF_NAME: &str = "zorder_key";
1132
1133 pub struct ZOrderExecContext {
1134 pub columns: Arc<[String]>,
1135 pub ctx: SessionContext,
1136 }
1137
1138 impl ZOrderExecContext {
1139 pub fn new(
1140 columns: Vec<String>,
1141 session: SessionState,
1142 object_store_ref: ObjectStoreRef,
1143 ) -> Result<Self, DataFusionError> {
1144 let columns = columns.into();
1145
1146 let ctx = SessionContext::new_with_state(session);
1147 ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
1148 ctx.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store_ref);
1149 Ok(Self { columns, ctx })
1150 }
1151 }
1152
1153 #[derive(Debug, Hash, PartialEq, Eq)]
1155 pub struct ZOrderUDF;
1156
1157 impl ScalarUDFImpl for ZOrderUDF {
1158 fn as_any(&self) -> &dyn Any {
1159 self
1160 }
1161
1162 fn name(&self) -> &str {
1163 ZORDER_UDF_NAME
1164 }
1165
1166 fn signature(&self) -> &Signature {
1167 &Signature {
1168 type_signature: TypeSignature::VariadicAny,
1169 volatility: Volatility::Immutable,
1170 }
1171 }
1172
1173 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
1174 Ok(DataType::Binary)
1175 }
1176
1177 fn invoke_with_args(
1178 &self,
1179 args: ScalarFunctionArgs,
1180 ) -> ::datafusion::common::Result<ColumnarValue> {
1181 zorder_key_datafusion(&args.args)
1182 }
1183 }
1184
1185 fn zorder_key_datafusion(
1187 columns: &[ColumnarValue],
1188 ) -> Result<ColumnarValue, DataFusionError> {
1189 debug!("zorder_key_datafusion: {columns:#?}");
1190 let length = columns
1191 .iter()
1192 .map(|col| match col {
1193 ColumnarValue::Array(array) => array.len(),
1194 ColumnarValue::Scalar(_) => 1,
1195 })
1196 .max()
1197 .ok_or(DataFusionError::NotImplemented(
1198 "z-order on zero columns.".to_string(),
1199 ))?;
1200 let columns: Vec<ArrayRef> = columns
1201 .iter()
1202 .map(|col| col.clone().into_array(length))
1203 .try_collect()?;
1204 let array = zorder_key(&columns)?;
1205 Ok(ColumnarValue::Array(array))
1206 }
1207
1208 #[cfg(test)]
1209 mod tests {
1210 use super::*;
1211 use ::datafusion::assert_batches_eq;
1212 use arrow_array::{Int32Array, StringArray};
1213 use arrow_ord::sort::sort_to_indices;
1214 use arrow_schema::Field;
1215 use arrow_select::take::take;
1216 use rand::Rng;
1217 #[test]
1218 fn test_order() {
1219 let int: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1220 let str: ArrayRef = Arc::new(StringArray::from(vec![
1221 Some("a"),
1222 Some("x"),
1223 Some("a"),
1224 Some("x"),
1225 None,
1226 ]));
1227 let int_large: ArrayRef = Arc::new(Int32Array::from(vec![10000, 2000, 300, 40, 5]));
1228 let batch = RecordBatch::try_from_iter(vec![
1229 ("int", int),
1230 ("str", str),
1231 ("int_large", int_large),
1232 ])
1233 .unwrap();
1234
1235 let expected_1 = vec![
1236 "+-----+-----+-----------+",
1237 "| int | str | int_large |",
1238 "+-----+-----+-----------+",
1239 "| 1 | a | 10000 |",
1240 "| 2 | x | 2000 |",
1241 "| 3 | a | 300 |",
1242 "| 4 | x | 40 |",
1243 "| 5 | | 5 |",
1244 "+-----+-----+-----------+",
1245 ];
1246 let expected_2 = vec![
1247 "+-----+-----+-----------+",
1248 "| int | str | int_large |",
1249 "+-----+-----+-----------+",
1250 "| 5 | | 5 |",
1251 "| 1 | a | 10000 |",
1252 "| 3 | a | 300 |",
1253 "| 2 | x | 2000 |",
1254 "| 4 | x | 40 |",
1255 "+-----+-----+-----------+",
1256 ];
1257 let expected_3 = vec![
1258 "+-----+-----+-----------+",
1259 "| int | str | int_large |",
1260 "+-----+-----+-----------+",
1261 "| 5 | | 5 |",
1262 "| 4 | x | 40 |",
1263 "| 2 | x | 2000 |",
1264 "| 3 | a | 300 |",
1265 "| 1 | a | 10000 |",
1266 "+-----+-----+-----------+",
1267 ];
1268
1269 let expected = [expected_1, expected_2, expected_3];
1270
1271 let indices = Int32Array::from(shuffled_indices().to_vec());
1272 let shuffled_columns = batch
1273 .columns()
1274 .iter()
1275 .map(|c| take(c, &indices, None).unwrap())
1276 .collect::<Vec<_>>();
1277 let shuffled_batch =
1278 RecordBatch::try_new(batch.schema(), shuffled_columns).unwrap();
1279
1280 for i in 1..=batch.num_columns() {
1281 let columns = (0..i)
1282 .map(|idx| shuffled_batch.column(idx).clone())
1283 .collect::<Vec<_>>();
1284
1285 let order_keys = zorder_key(&columns).unwrap();
1286 let indices = sort_to_indices(order_keys.as_ref(), None, None).unwrap();
1287 let sorted_columns = shuffled_batch
1288 .columns()
1289 .iter()
1290 .map(|c| take(c, &indices, None).unwrap())
1291 .collect::<Vec<_>>();
1292 let sorted_batch =
1293 RecordBatch::try_new(batch.schema(), sorted_columns).unwrap();
1294
1295 assert_batches_eq!(expected[i - 1], &[sorted_batch]);
1296 }
1297 }
1298 fn shuffled_indices() -> [i32; 5] {
1299 let mut rng = rand::thread_rng();
1300 let mut array = [0, 1, 2, 3, 4];
1301 for i in (1..array.len()).rev() {
1302 let j = rng.gen_range(0..=i);
1303 array.swap(i, j);
1304 }
1305 array
1306 }
1307
1308 #[tokio::test]
1309 async fn test_zorder_mixed_case() {
1310 use arrow_schema::Schema as ArrowSchema;
1311 let schema = Arc::new(ArrowSchema::new(vec![
1312 Field::new("moDified", DataType::Utf8, true),
1313 Field::new("ID", DataType::Utf8, true),
1314 Field::new("vaLue", DataType::Int32, true),
1315 ]));
1316
1317 let batch = RecordBatch::try_new(
1318 schema.clone(),
1319 vec![
1320 Arc::new(arrow::array::StringArray::from(vec![
1321 "2021-02-01",
1322 "2021-02-01",
1323 "2021-02-02",
1324 "2021-02-02",
1325 ])),
1326 Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1327 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1328 ],
1329 )
1330 .unwrap();
1331 let table = crate::DeltaOps::new_in_memory()
1333 .write(vec![batch.clone()])
1334 .with_save_mode(crate::protocol::SaveMode::Append)
1335 .await
1336 .unwrap();
1337
1338 let res = crate::DeltaOps(table)
1339 .optimize()
1340 .with_type(OptimizeType::ZOrder(vec!["moDified".into()]))
1341 .await;
1342 assert!(res.is_ok());
1343 }
1344
1345 #[tokio::test]
1347 async fn test_zorder_space_in_partition_value() {
1348 use arrow_schema::Schema as ArrowSchema;
1349 let _ = pretty_env_logger::try_init();
1350 let schema = Arc::new(ArrowSchema::new(vec![
1351 Field::new("modified", DataType::Utf8, true),
1352 Field::new("country", DataType::Utf8, true),
1353 Field::new("value", DataType::Int32, true),
1354 ]));
1355
1356 let batch = RecordBatch::try_new(
1357 schema.clone(),
1358 vec![
1359 Arc::new(arrow::array::StringArray::from(vec![
1360 "2021-02-01",
1361 "2021-02-01",
1362 "2021-02-02",
1363 "2021-02-02",
1364 ])),
1365 Arc::new(arrow::array::StringArray::from(vec![
1366 "Germany",
1367 "China",
1368 "Canada",
1369 "Dominican Republic",
1370 ])),
1371 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1372 ],
1375 )
1376 .unwrap();
1377 let table = crate::DeltaOps::new_in_memory()
1379 .write(vec![batch.clone()])
1380 .with_partition_columns(vec!["country"])
1381 .with_save_mode(crate::protocol::SaveMode::Overwrite)
1382 .await
1383 .unwrap();
1384
1385 let res = crate::DeltaOps(table)
1386 .optimize()
1387 .with_type(OptimizeType::ZOrder(vec!["modified".into()]))
1388 .await;
1389 assert!(res.is_ok(), "Failed to optimize: {res:#?}");
1390 }
1391
1392 #[tokio::test]
1393 async fn test_zorder_space_in_partition_value_garbage() {
1394 use arrow_schema::Schema as ArrowSchema;
1395 let _ = pretty_env_logger::try_init();
1396 let schema = Arc::new(ArrowSchema::new(vec![
1397 Field::new("modified", DataType::Utf8, true),
1398 Field::new("country", DataType::Utf8, true),
1399 Field::new("value", DataType::Int32, true),
1400 ]));
1401
1402 let batch = RecordBatch::try_new(
1403 schema.clone(),
1404 vec![
1405 Arc::new(arrow::array::StringArray::from(vec![
1406 "2021-02-01",
1407 "2021-02-01",
1408 "2021-02-02",
1409 "2021-02-02",
1410 ])),
1411 Arc::new(arrow::array::StringArray::from(vec![
1412 "Germany", "China", "Canada", "USA$$!",
1413 ])),
1414 Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1415 ],
1416 )
1417 .unwrap();
1418 let table = crate::DeltaOps::new_in_memory()
1420 .write(vec![batch.clone()])
1421 .with_partition_columns(vec!["country"])
1422 .with_save_mode(crate::protocol::SaveMode::Overwrite)
1423 .await
1424 .unwrap();
1425
1426 let res = crate::DeltaOps(table)
1427 .optimize()
1428 .with_type(OptimizeType::ZOrder(vec!["modified".into()]))
1429 .await;
1430 assert!(res.is_ok(), "Failed to optimize: {res:#?}");
1431 }
1432 }
1433 }
1434
1435 pub fn zorder_key(columns: &[ArrayRef]) -> Result<ArrayRef, ArrowError> {
1441 if columns.is_empty() {
1442 return Err(ArrowError::InvalidArgumentError(
1443 "Cannot zorder empty columns".to_string(),
1444 ));
1445 }
1446
1447 let out_length = columns[0].len();
1449
1450 if columns.iter().any(|col| col.len() != out_length) {
1451 return Err(ArrowError::InvalidArgumentError(
1452 "All columns must have the same length".to_string(),
1453 ));
1454 }
1455
1456 let value_size: usize = columns.len() * 16;
1458
1459 let mut out: Vec<u8> = vec![0; out_length * value_size];
1461
1462 for (col_pos, col) in columns.iter().enumerate() {
1463 set_bits_for_column(col.clone(), col_pos, columns.len(), &mut out)?;
1464 }
1465
1466 let offsets = (0..=out_length)
1467 .map(|i| (i * value_size) as i32)
1468 .collect::<Vec<i32>>();
1469
1470 let out_arr = BinaryArray::try_new(
1471 OffsetBuffer::new(ScalarBuffer::from(offsets)),
1472 Buffer::from_vec(out),
1473 None,
1474 )?;
1475
1476 Ok(Arc::new(out_arr))
1477 }
1478
1479 fn set_bits_for_column(
1488 input: ArrayRef,
1489 col_pos: usize,
1490 num_columns: usize,
1491 out: &mut Vec<u8>,
1492 ) -> Result<(), ArrowError> {
1493 let converter = RowConverter::new(vec![SortField::new(input.data_type().clone())])?;
1495 let rows = converter.convert_columns(&[input])?;
1496
1497 for (row_i, row) in rows.iter().enumerate() {
1498 let row_offset = row_i * num_columns * 16;
1500 for bit_i in 0..128 {
1501 let bit = row.get_bit(bit_i);
1502 let bit_pos = (bit_i * num_columns) + col_pos;
1507 let out_pos = (row_offset * 8) + bit_pos;
1508 if bit {
1510 unsafe { set_bit_raw(out.as_mut_ptr(), out_pos) };
1511 } else {
1512 unsafe { unset_bit_raw(out.as_mut_ptr(), out_pos) };
1513 }
1514 }
1515 }
1516
1517 Ok(())
1518 }
1519
1520 trait RowBitUtil {
1521 fn get_bit(&self, bit_i: usize) -> bool;
1522 }
1523
1524 impl RowBitUtil for Row<'_> {
1525 fn get_bit(&self, bit_i: usize) -> bool {
1527 let byte_i = bit_i / 8;
1528 let bytes = self.as_ref();
1529 if byte_i >= bytes.len() {
1530 return false;
1531 }
1532 unsafe { get_bit_raw(bytes.as_ptr(), bit_i) }
1534 }
1535 }
1536
1537 #[cfg(test)]
1538 mod test {
1539 use arrow_array::{
1540 cast::as_generic_binary_array, new_empty_array, StringArray, UInt8Array,
1541 };
1542 use arrow_schema::DataType;
1543
1544 use super::*;
1545 use crate::ensure_table_uri;
1546
1547 #[test]
1548 fn test_rejects_no_columns() {
1549 let columns = vec![];
1550 let result = zorder_key(&columns);
1551 assert!(result.is_err());
1552 }
1553
1554 #[test]
1555 fn test_handles_no_rows() {
1556 let columns: Vec<ArrayRef> = vec![
1557 Arc::new(new_empty_array(&DataType::Int64)),
1558 Arc::new(new_empty_array(&DataType::Utf8)),
1559 ];
1560 let result = zorder_key(columns.as_slice());
1561 assert!(result.is_ok());
1562 let result = result.unwrap();
1563 assert_eq!(result.len(), 0);
1564 }
1565
1566 #[test]
1567 fn test_basics() {
1568 let columns: Vec<ArrayRef> = vec![
1569 Arc::new(StringArray::from(vec![Some("a"), Some("b"), None])),
1571 Arc::new(StringArray::from(vec![
1573 "delta-rs: A native Rust library for Delta Lake, with bindings into Python",
1574 "cat",
1575 "",
1576 ])),
1577 Arc::new(UInt8Array::from(vec![Some(1), Some(4), None])),
1578 ];
1579 let result = zorder_key(columns.as_slice()).unwrap();
1580 assert_eq!(result.len(), 3);
1581 assert_eq!(result.data_type(), &DataType::Binary);
1582 assert_eq!(result.null_count(), 0);
1583
1584 let data: &BinaryArray = as_generic_binary_array(result.as_ref());
1585 assert_eq!(data.value_data().len(), 3 * 16 * 3);
1586 assert!(data.iter().all(|x| x.unwrap().len() == 3 * 16));
1587 }
1588
1589 #[tokio::test]
1590 async fn works_on_spark_table() {
1591 use crate::DeltaOps;
1592 use tempfile::TempDir;
1593 let tmp_dir = TempDir::new().expect("Failed to make temp dir");
1595 let table_name = "delta-1.2.1-only-struct-stats";
1596
1597 let source_path = format!("../test/tests/data/{table_name}");
1599 fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()).unwrap();
1600
1601 let table_uri =
1602 ensure_table_uri(tmp_dir.path().join(table_name).to_str().unwrap()).unwrap();
1603 let (_, metrics) = DeltaOps::try_from_uri(table_uri)
1605 .await
1606 .unwrap()
1607 .optimize()
1608 .await
1609 .unwrap();
1610
1611 assert_eq!(metrics.num_files_added, 1);
1613 }
1614 }
1615}