deltalake_core/operations/
optimize.rs

1//! Optimize a Delta Table
2//!
3//! Perform bin-packing on a Delta Table which merges small files into a large
4//! file. Bin-packing reduces the number of API calls required for read
5//! operations.
6//!
7//! Optimize will fail if a concurrent write operation removes files from the
8//! table (such as in an overwrite). It will always succeed if concurrent writers
9//! are only appending.
10//!
11//! Optimize increments the table's version and creates remove actions for
12//! optimized files. Optimize does not delete files from storage. To delete
13//! files that were removed, call `vacuum` on [`DeltaTable`].
14//!
15//! See [`OptimizeBuilder`] for configuration.
16//!
17//! # Example
18//! ```rust ignore
19//! let table = open_table("../path/to/table")?;
20//! let (table, metrics) = OptimizeBuilder::new(table.object_store(), table.state).await?;
21//! ````
22
23use std::collections::HashMap;
24use std::fmt;
25use std::sync::Arc;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27
28use arrow::array::RecordBatch;
29use arrow::datatypes::SchemaRef;
30use datafusion::catalog::Session;
31use datafusion::execution::context::SessionState;
32use datafusion::execution::memory_pool::FairSpillPool;
33use datafusion::execution::runtime_env::RuntimeEnvBuilder;
34use datafusion::execution::SessionStateBuilder;
35use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
36use delta_kernel::expressions::Scalar;
37use delta_kernel::table_properties::DataSkippingNumIndexedCols;
38use futures::future::BoxFuture;
39use futures::stream::BoxStream;
40use futures::{Future, StreamExt, TryStreamExt};
41use indexmap::IndexMap;
42use itertools::Itertools;
43use num_cpus;
44use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
45use parquet::basic::{Compression, ZstdLevel};
46use parquet::errors::ParquetError;
47use parquet::file::properties::WriterProperties;
48use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer};
49use tracing::*;
50use uuid::Uuid;
51
52use super::write::writer::{PartitionWriter, PartitionWriterConfig};
53use super::{CustomExecuteHandler, Operation};
54use crate::delta_datafusion::DeltaTableProvider;
55use crate::errors::{DeltaResult, DeltaTableError};
56use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL};
57use crate::kernel::{resolve_snapshot, EagerSnapshot};
58use crate::kernel::{scalars::ScalarExt, Action, Add, PartitionsExt, Remove};
59use crate::logstore::{LogStore, LogStoreRef, ObjectStoreRef};
60use crate::protocol::DeltaOperation;
61use crate::table::config::TablePropertiesExt as _;
62use crate::table::state::DeltaTableState;
63use crate::writer::utils::arrow_schema_without_partitions;
64use crate::{crate_version, DeltaTable, ObjectMeta, PartitionFilter};
65
66/// Metrics from Optimize
67#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
68#[serde(rename_all = "camelCase")]
69pub struct Metrics {
70    /// Number of optimized files added
71    pub num_files_added: u64,
72    /// Number of unoptimized files removed
73    pub num_files_removed: u64,
74    /// Detailed metrics for the add operation
75    #[serde(
76        serialize_with = "serialize_metric_details",
77        deserialize_with = "deserialize_metric_details"
78    )]
79    pub files_added: MetricDetails,
80    /// Detailed metrics for the remove operation
81    #[serde(
82        serialize_with = "serialize_metric_details",
83        deserialize_with = "deserialize_metric_details"
84    )]
85    pub files_removed: MetricDetails,
86    /// Number of partitions that had at least one file optimized
87    pub partitions_optimized: u64,
88    /// The number of batches written
89    pub num_batches: u64,
90    /// How many files were considered during optimization. Not every file considered is optimized
91    pub total_considered_files: usize,
92    /// How many files were considered for optimization but were skipped
93    pub total_files_skipped: usize,
94    /// The order of records from source files is preserved
95    pub preserve_insertion_order: bool,
96}
97
98// Custom serialization function that serializes metric details as a string
99fn serialize_metric_details<S>(value: &MetricDetails, serializer: S) -> Result<S::Ok, S::Error>
100where
101    S: Serializer,
102{
103    serializer.serialize_str(&value.to_string())
104}
105
106// Custom deserialization that parses a JSON string into MetricDetails
107fn deserialize_metric_details<'de, D>(deserializer: D) -> Result<MetricDetails, D::Error>
108where
109    D: Deserializer<'de>,
110{
111    let s: String = Deserialize::deserialize(deserializer)?;
112    serde_json::from_str(&s).map_err(DeError::custom)
113}
114
115/// Statistics on files for a particular operation
116/// Operation can be remove or add
117#[derive(Debug, PartialEq, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct MetricDetails {
120    /// Average file size of a operation
121    pub avg: f64,
122    /// Maximum file size of a operation
123    pub max: i64,
124    /// Minimum file size of a operation
125    pub min: i64,
126    /// Number of files encountered during operation
127    pub total_files: usize,
128    /// Sum of file sizes of a operation
129    pub total_size: i64,
130}
131
132impl MetricDetails {
133    /// Add a partial metric to the metrics
134    pub fn add(&mut self, partial: &MetricDetails) {
135        self.min = std::cmp::min(self.min, partial.min);
136        self.max = std::cmp::max(self.max, partial.max);
137        self.total_files += partial.total_files;
138        self.total_size += partial.total_size;
139        self.avg = self.total_size as f64 / self.total_files as f64;
140    }
141}
142
143impl fmt::Display for MetricDetails {
144    /// Display the metric details using serde serialization
145    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
146        serde_json::to_string(self).map_err(|_| fmt::Error)?.fmt(f)
147    }
148}
149
150#[derive(Debug)]
151/// Metrics for a single partition
152pub struct PartialMetrics {
153    /// Number of optimized files added
154    pub num_files_added: u64,
155    /// Number of unoptimized files removed
156    pub num_files_removed: u64,
157    /// Detailed metrics for the add operation
158    pub files_added: MetricDetails,
159    /// Detailed metrics for the remove operation
160    pub files_removed: MetricDetails,
161    /// The number of batches written
162    pub num_batches: u64,
163}
164
165impl Metrics {
166    /// Add a partial metric to the metrics
167    pub fn add(&mut self, partial: &PartialMetrics) {
168        self.num_files_added += partial.num_files_added;
169        self.num_files_removed += partial.num_files_removed;
170        self.files_added.add(&partial.files_added);
171        self.files_removed.add(&partial.files_removed);
172        self.num_batches += partial.num_batches;
173    }
174}
175
176impl Default for MetricDetails {
177    fn default() -> Self {
178        MetricDetails {
179            min: i64::MAX,
180            max: 0,
181            avg: 0.0,
182            total_files: 0,
183            total_size: 0,
184        }
185    }
186}
187
188/// Type of optimization to perform.
189#[derive(Debug)]
190pub enum OptimizeType {
191    /// Compact files into pre-determined bins
192    Compact,
193    /// Z-order files based on provided columns
194    ZOrder(Vec<String>),
195}
196
197/// Optimize a Delta table with given options
198///
199/// If a target file size is not provided then `delta.targetFileSize` from the
200/// table's configuration is read. Otherwise a default value is used.
201pub struct OptimizeBuilder<'a> {
202    /// A snapshot of the to-be-optimized table's state
203    snapshot: Option<EagerSnapshot>,
204    /// Delta object store for handling data files
205    log_store: LogStoreRef,
206    /// Filters to select specific table partitions to be optimized
207    filters: &'a [PartitionFilter],
208    /// Desired file size after bin-packing files
209    target_size: Option<u64>,
210    /// Properties passed to underlying parquet writer
211    writer_properties: Option<WriterProperties>,
212    /// Commit properties and configuration
213    commit_properties: CommitProperties,
214    /// Whether to preserve insertion order within files (default false)
215    preserve_insertion_order: bool,
216    /// Maximum number of concurrent tasks (default is number of cpus)
217    max_concurrent_tasks: usize,
218    /// Maximum number of bytes allowed in memory before spilling to disk
219    max_spill_size: usize,
220    /// Optimize type
221    optimize_type: OptimizeType,
222    /// Datafusion session state relevant for executing the input plan
223    session: Option<Arc<dyn Session>>,
224    min_commit_interval: Option<Duration>,
225    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
226}
227
228impl super::Operation for OptimizeBuilder<'_> {
229    fn log_store(&self) -> &LogStoreRef {
230        &self.log_store
231    }
232    fn get_custom_execute_handler(&self) -> Option<Arc<dyn CustomExecuteHandler>> {
233        self.custom_execute_handler.clone()
234    }
235}
236
237impl<'a> OptimizeBuilder<'a> {
238    /// Create a new [`OptimizeBuilder`]
239    pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
240        Self {
241            snapshot,
242            log_store,
243            filters: &[],
244            target_size: None,
245            writer_properties: None,
246            commit_properties: CommitProperties::default(),
247            preserve_insertion_order: false,
248            max_concurrent_tasks: num_cpus::get(),
249            max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
250            optimize_type: OptimizeType::Compact,
251            min_commit_interval: None,
252            session: None,
253            custom_execute_handler: None,
254        }
255    }
256
257    /// Choose the type of optimization to perform. Defaults to [OptimizeType::Compact].
258    pub fn with_type(mut self, optimize_type: OptimizeType) -> Self {
259        self.optimize_type = optimize_type;
260        self
261    }
262
263    /// Only optimize files that return true for the specified partition filter
264    pub fn with_filters(mut self, filters: &'a [PartitionFilter]) -> Self {
265        self.filters = filters;
266        self
267    }
268
269    /// Set the target file size
270    pub fn with_target_size(mut self, target: u64) -> Self {
271        self.target_size = Some(target);
272        self
273    }
274
275    /// Writer properties passed to parquet writer
276    pub fn with_writer_properties(mut self, writer_properties: WriterProperties) -> Self {
277        self.writer_properties = Some(writer_properties);
278        self
279    }
280
281    /// Additional information to write to the commit
282    pub fn with_commit_properties(mut self, commit_properties: CommitProperties) -> Self {
283        self.commit_properties = commit_properties;
284        self
285    }
286
287    /// Whether to preserve insertion order within files
288    pub fn with_preserve_insertion_order(mut self, preserve_insertion_order: bool) -> Self {
289        self.preserve_insertion_order = preserve_insertion_order;
290        self
291    }
292
293    /// Max number of concurrent tasks
294    pub fn with_max_concurrent_tasks(mut self, max_concurrent_tasks: usize) -> Self {
295        self.max_concurrent_tasks = max_concurrent_tasks;
296        self
297    }
298
299    /// Max spill size
300    #[deprecated(
301        since = "0.29.0",
302        note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`"
303    )]
304    pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self {
305        self.max_spill_size = max_spill_size;
306        self
307    }
308
309    /// Min commit interval
310    pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
311        self.min_commit_interval = Some(min_commit_interval);
312        self
313    }
314
315    /// Set a custom execute handler, for pre and post execution
316    pub fn with_custom_execute_handler(mut self, handler: Arc<dyn CustomExecuteHandler>) -> Self {
317        self.custom_execute_handler = Some(handler);
318        self
319    }
320
321    /// The Datafusion session state to use
322    pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
323        self.session = Some(session);
324        self
325    }
326}
327
328impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
329    type Output = DeltaResult<(DeltaTable, Metrics)>;
330    type IntoFuture = BoxFuture<'a, Self::Output>;
331
332    fn into_future(self) -> Self::IntoFuture {
333        let this = self;
334
335        Box::pin(async move {
336            let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?;
337            PROTOCOL.can_write_to(&snapshot)?;
338
339            let operation_id = this.get_operation_id();
340            this.pre_execute(operation_id).await?;
341
342            let writer_properties = this.writer_properties.unwrap_or_else(|| {
343                WriterProperties::builder()
344                    .set_compression(Compression::ZSTD(ZstdLevel::try_new(4).unwrap()))
345                    .set_created_by(format!("delta-rs version {}", crate_version()))
346                    .build()
347            });
348            let session = this
349                .session
350                .and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
351                .unwrap_or_else(|| {
352                    let memory_pool = FairSpillPool::new(this.max_spill_size);
353                    let runtime = RuntimeEnvBuilder::new()
354                        .with_memory_pool(Arc::new(memory_pool))
355                        .build_arc()
356                        .unwrap();
357                    SessionStateBuilder::new()
358                        .with_default_features()
359                        .with_runtime_env(runtime)
360                        .build()
361                });
362            let plan = create_merge_plan(
363                &this.log_store,
364                this.optimize_type,
365                &snapshot,
366                this.filters,
367                this.target_size.to_owned(),
368                writer_properties,
369                session,
370            )
371            .await?;
372
373            let metrics = plan
374                .execute(
375                    this.log_store.clone(),
376                    &snapshot,
377                    this.max_concurrent_tasks,
378                    this.min_commit_interval,
379                    this.commit_properties.clone(),
380                    operation_id,
381                    this.custom_execute_handler.as_ref(),
382                )
383                .await?;
384
385            if let Some(handler) = this.custom_execute_handler {
386                handler.post_execute(&this.log_store, operation_id).await?;
387            }
388            let mut table =
389                DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot));
390            table.update().await?;
391            Ok((table, metrics))
392        })
393    }
394}
395
396#[derive(Debug, Clone)]
397struct OptimizeInput {
398    target_size: u64,
399    predicate: Option<String>,
400}
401
402impl From<OptimizeInput> for DeltaOperation {
403    fn from(opt_input: OptimizeInput) -> Self {
404        DeltaOperation::Optimize {
405            target_size: opt_input.target_size as i64,
406            predicate: opt_input.predicate,
407        }
408    }
409}
410
411/// Generate an appropriate remove action for the optimization task
412fn create_remove(
413    path: &str,
414    partitions: &IndexMap<String, Scalar>,
415    size: i64,
416) -> Result<Action, DeltaTableError> {
417    // NOTE unwrap is safe since UNIX_EPOCH will always be earlier then now.
418    let deletion_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
419    let deletion_time = deletion_time.as_millis() as i64;
420
421    Ok(Action::Remove(Remove {
422        path: path.to_string(),
423        deletion_timestamp: Some(deletion_time),
424        data_change: false,
425        extended_file_metadata: None,
426        partition_values: Some(
427            partitions
428                .iter()
429                .map(|(k, v)| {
430                    (
431                        k.clone(),
432                        if v.is_null() {
433                            None
434                        } else {
435                            Some(v.serialize())
436                        },
437                    )
438                })
439                .collect(),
440        ),
441        size: Some(size),
442        deletion_vector: None,
443        tags: None,
444        base_row_id: None,
445        default_row_commit_version: None,
446    }))
447}
448
449/// Layout for optimizing a plan
450///
451/// Within each partition, we identify a set of files that need to be merged
452/// together and/or sorted together.
453#[derive(Debug)]
454enum OptimizeOperations {
455    /// Plan to compact files into pre-determined bins
456    ///
457    /// Bins are determined by the bin-packing algorithm to reach an optimal size.
458    /// Files that are large enough already are skipped. Bins of size 1 are dropped.
459    Compact(HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)>),
460    /// Plan to Z-order each partition
461    ZOrder(
462        Vec<String>,
463        HashMap<String, (IndexMap<String, Scalar>, MergeBin)>,
464        Box<SessionState>,
465    ),
466    // TODO: Sort
467}
468
469impl Default for OptimizeOperations {
470    fn default() -> Self {
471        OptimizeOperations::Compact(HashMap::new())
472    }
473}
474
475#[derive(Debug)]
476/// Encapsulates the operations required to optimize a Delta Table
477pub struct MergePlan {
478    operations: OptimizeOperations,
479    /// Metrics collected during operation
480    metrics: Metrics,
481    /// Parameters passed down to merge tasks
482    task_parameters: Arc<MergeTaskParameters>,
483    /// Version of the table at beginning of optimization. Used for conflict resolution.
484    read_table_version: i64,
485}
486
487/// Parameters passed to individual merge tasks
488#[derive(Debug)]
489pub struct MergeTaskParameters {
490    /// Parameters passed to optimize operation
491    input_parameters: OptimizeInput,
492    /// Schema of written files
493    file_schema: SchemaRef,
494    /// Properties passed to parquet writer
495    writer_properties: WriterProperties,
496    /// Num index cols to collect stats for
497    num_indexed_cols: DataSkippingNumIndexedCols,
498    /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols
499    stats_columns: Option<Vec<String>>,
500}
501
502/// A stream of record batches, with a ParquetError on failure.
503type ParquetReadStream = BoxStream<'static, Result<RecordBatch, ParquetError>>;
504
505impl MergePlan {
506    /// Rewrites files in a single partition.
507    ///
508    /// Returns a vector of add and remove actions, as well as the partial metrics
509    /// collected during the operation.
510    async fn rewrite_files<F>(
511        task_parameters: Arc<MergeTaskParameters>,
512        partition_values: IndexMap<String, Scalar>,
513        files: MergeBin,
514        object_store: ObjectStoreRef,
515        read_stream: F,
516    ) -> Result<(Vec<Action>, PartialMetrics), DeltaTableError>
517    where
518        F: Future<Output = Result<ParquetReadStream, DeltaTableError>> + Send + 'static,
519    {
520        debug!("Rewriting files in partition: {partition_values:?}");
521        // First, initialize metrics
522        let mut partial_actions = files
523            .iter()
524            .map(|file_meta| {
525                create_remove(file_meta.path.as_ref(), &partition_values, file_meta.size)
526            })
527            .collect::<Result<Vec<_>, DeltaTableError>>()?;
528
529        let files_removed = files
530            .iter()
531            .fold(MetricDetails::default(), |mut curr, file| {
532                curr.total_files += 1;
533                curr.total_size += file.size;
534                curr.max = std::cmp::max(curr.max, file.size);
535                curr.min = std::cmp::min(curr.min, file.size);
536                curr
537            });
538
539        let mut partial_metrics = PartialMetrics {
540            num_files_added: 0,
541            num_files_removed: files.len() as u64,
542            files_added: MetricDetails::default(),
543            files_removed,
544            num_batches: 0,
545        };
546
547        // Next, initialize the writer
548        let writer_config = PartitionWriterConfig::try_new(
549            task_parameters.file_schema.clone(),
550            partition_values.clone(),
551            Some(task_parameters.writer_properties.clone()),
552            Some(task_parameters.input_parameters.target_size as usize),
553            None,
554            None,
555        )?;
556        let mut writer = PartitionWriter::try_with_config(
557            object_store,
558            writer_config,
559            task_parameters.num_indexed_cols,
560            task_parameters.stats_columns.clone(),
561        )?;
562
563        let mut read_stream = read_stream.await?;
564
565        while let Some(maybe_batch) = read_stream.next().await {
566            let mut batch = maybe_batch?;
567
568            batch = crate::kernel::schema::cast::cast_record_batch(
569                &batch,
570                task_parameters.file_schema.clone(),
571                false,
572                true,
573            )?;
574            partial_metrics.num_batches += 1;
575            writer.write(&batch).await?;
576        }
577
578        let add_actions = writer.close().await?.into_iter().map(|mut add| {
579            add.data_change = false;
580
581            let size = add.size;
582
583            partial_metrics.num_files_added += 1;
584            partial_metrics.files_added.total_files += 1;
585            partial_metrics.files_added.total_size += size;
586            partial_metrics.files_added.max = std::cmp::max(partial_metrics.files_added.max, size);
587            partial_metrics.files_added.min = std::cmp::min(partial_metrics.files_added.min, size);
588
589            Action::Add(add)
590        });
591        partial_actions.extend(add_actions);
592
593        debug!("Finished rewriting files in partition: {partition_values:?}");
594
595        Ok((partial_actions, partial_metrics))
596    }
597
598    /// Datafusion-based z-order read.
599    async fn read_zorder(
600        files: MergeBin,
601        context: Arc<zorder::ZOrderExecContext>,
602        table_provider: DeltaTableProvider,
603    ) -> Result<BoxStream<'static, Result<RecordBatch, ParquetError>>, DeltaTableError> {
604        use datafusion::common::Column;
605        use datafusion::logical_expr::expr::ScalarFunction;
606        use datafusion::logical_expr::{Expr, ScalarUDF};
607
608        let provider = table_provider.with_files(files.files);
609        let df = context.ctx.read_table(Arc::new(provider))?;
610
611        let cols = context
612            .columns
613            .iter()
614            .map(|col| Expr::Column(Column::from_qualified_name_ignore_case(col)))
615            .collect_vec();
616        let expr = Expr::ScalarFunction(ScalarFunction::new_udf(
617            Arc::new(ScalarUDF::from(zorder::datafusion::ZOrderUDF)),
618            cols,
619        ));
620        let df = df.sort(vec![expr.sort(true, true)])?;
621
622        let stream = df
623            .execute_stream()
624            .await?
625            .map_err(|err| {
626                ParquetError::General(format!("Z-order failed while scanning data: {err:?}"))
627            })
628            .boxed();
629
630        Ok(stream)
631    }
632
633    /// Perform the operations outlined in the plan.
634    #[allow(clippy::too_many_arguments)]
635    #[instrument(skip_all, fields(operation = "optimize", version = snapshot.version()))]
636    pub async fn execute(
637        mut self,
638        log_store: LogStoreRef,
639        snapshot: &EagerSnapshot,
640        max_concurrent_tasks: usize,
641        min_commit_interval: Option<Duration>,
642        commit_properties: CommitProperties,
643        operation_id: Uuid,
644        handle: Option<&Arc<dyn CustomExecuteHandler>>,
645    ) -> Result<Metrics, DeltaTableError> {
646        let operations = std::mem::take(&mut self.operations);
647        info!("starting optimize execution");
648        let object_store = log_store.object_store(Some(operation_id));
649
650        let stream = match operations {
651            OptimizeOperations::Compact(bins) => futures::stream::iter(bins)
652                .flat_map(|(_, (partition, bins))| {
653                    futures::stream::iter(bins).map(move |bin| (partition.clone(), bin))
654                })
655                .map(|(partition, files)| {
656                    debug!(
657                        "merging a group of {} files in partition {partition:?}",
658                        files.len(),
659                    );
660                    for file in files.iter() {
661                        debug!("  file {}", file.path);
662                    }
663                    let object_store_ref = object_store.clone();
664                    let batch_stream = futures::stream::iter(files.clone())
665                        .then(move |file| {
666                            let object_store_ref = object_store_ref.clone();
667                            let meta = ObjectMeta::try_from(file).unwrap();
668                            async move {
669                                let file_reader =
670                                    ParquetObjectReader::new(object_store_ref, meta.location)
671                                        .with_file_size(meta.size);
672                                ParquetRecordBatchStreamBuilder::new(file_reader)
673                                    .await?
674                                    .build()
675                            }
676                        })
677                        .try_flatten()
678                        .boxed();
679
680                    let rewrite_result = tokio::task::spawn(Self::rewrite_files(
681                        self.task_parameters.clone(),
682                        partition,
683                        files,
684                        object_store.clone(),
685                        futures::future::ready(Ok(batch_stream)),
686                    ));
687                    util::flatten_join_error(rewrite_result)
688                })
689                .boxed(),
690            OptimizeOperations::ZOrder(zorder_columns, bins, state) => {
691                debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}");
692
693                let exec_context = Arc::new(zorder::ZOrderExecContext::new(
694                    zorder_columns,
695                    *state,
696                    object_store,
697                )?);
698                let task_parameters = self.task_parameters.clone();
699
700                use crate::delta_datafusion::DataFusionMixins;
701                use crate::delta_datafusion::DeltaScanConfigBuilder;
702                use crate::delta_datafusion::DeltaTableProvider;
703
704                let scan_config = DeltaScanConfigBuilder::default()
705                    .with_file_column(false)
706                    .with_schema(snapshot.input_schema())
707                    .build(snapshot)?;
708
709                // For each rewrite evaluate the predicate and then modify each expression
710                // to either compute the new value or obtain the old one then write these batches
711                let log_store = log_store.clone();
712                futures::stream::iter(bins)
713                    .map(move |(_, (partition, files))| {
714                        let batch_stream = Self::read_zorder(
715                            files.clone(),
716                            exec_context.clone(),
717                            DeltaTableProvider::try_new(
718                                snapshot.clone(),
719                                log_store.clone(),
720                                scan_config.clone(),
721                            )
722                            .unwrap(),
723                        );
724                        let rewrite_result = tokio::task::spawn(Self::rewrite_files(
725                            task_parameters.clone(),
726                            partition,
727                            files,
728                            log_store.object_store(Some(operation_id)),
729                            batch_stream,
730                        ));
731                        util::flatten_join_error(rewrite_result)
732                    })
733                    .boxed()
734            }
735        };
736
737        let mut stream = stream.buffer_unordered(max_concurrent_tasks);
738
739        let mut table =
740            DeltaTable::new_with_state(log_store.clone(), DeltaTableState::new(snapshot.clone()));
741
742        // Actions buffered so far. These will be flushed either at the end
743        // or when we reach the commit interval.
744        let mut actions = vec![];
745
746        // Each time we commit, we'll reset buffered_metrics to orig_metrics.
747        let orig_metrics = std::mem::take(&mut self.metrics);
748        let mut buffered_metrics = orig_metrics.clone();
749        let mut total_metrics = orig_metrics.clone();
750
751        let mut last_commit = Instant::now();
752        let mut commits_made = 0;
753        let mut snapshot = snapshot.clone();
754        loop {
755            let next = stream.next().await.transpose()?;
756
757            let end = next.is_none();
758
759            if let Some((partial_actions, partial_metrics)) = next {
760                debug!("Recording metrics for a completed partition");
761                actions.extend(partial_actions);
762                buffered_metrics.add(&partial_metrics);
763                total_metrics.add(&partial_metrics);
764            }
765
766            let now = Instant::now();
767            let mature = match min_commit_interval {
768                None => false,
769                Some(i) => now.duration_since(last_commit) > i,
770            };
771            if !actions.is_empty() && (mature || end) {
772                let actions = std::mem::take(&mut actions);
773                last_commit = now;
774
775                buffered_metrics.preserve_insertion_order = true;
776                let mut properties = CommitProperties::default();
777                properties.app_metadata = commit_properties.app_metadata.clone();
778                properties
779                    .app_metadata
780                    .insert("readVersion".to_owned(), self.read_table_version.into());
781                let maybe_map_metrics = serde_json::to_value(std::mem::replace(
782                    &mut buffered_metrics,
783                    orig_metrics.clone(),
784                ));
785                if let Ok(map) = maybe_map_metrics {
786                    properties
787                        .app_metadata
788                        .insert("operationMetrics".to_owned(), map);
789                }
790
791                debug!("committing {} actions", actions.len());
792
793                let commit = CommitBuilder::from(properties)
794                    .with_actions(actions)
795                    .with_operation_id(operation_id)
796                    .with_post_commit_hook_handler(handle.cloned())
797                    .with_max_retries(DEFAULT_RETRIES + commits_made)
798                    .build(
799                        Some(&snapshot),
800                        log_store.clone(),
801                        self.task_parameters.input_parameters.clone().into(),
802                    )
803                    .await?;
804                snapshot = commit.snapshot().snapshot;
805                commits_made += 1;
806            }
807
808            if end {
809                break;
810            }
811        }
812
813        total_metrics.preserve_insertion_order = true;
814        if total_metrics.num_files_added == 0 {
815            total_metrics.files_added.min = 0;
816        }
817        if total_metrics.num_files_removed == 0 {
818            total_metrics.files_removed.min = 0;
819        }
820
821        table.state = Some(DeltaTableState::new(snapshot));
822
823        Ok(total_metrics)
824    }
825}
826
827/// Build a Plan on which files to merge together. See [OptimizeBuilder]
828#[instrument(skip_all, fields(operation = "create_merge_plan", version = snapshot.version()))]
829pub async fn create_merge_plan(
830    log_store: &dyn LogStore,
831    optimize_type: OptimizeType,
832    snapshot: &EagerSnapshot,
833    filters: &[PartitionFilter],
834    target_size: Option<u64>,
835    writer_properties: WriterProperties,
836    session: SessionState,
837) -> Result<MergePlan, DeltaTableError> {
838    let target_size =
839        target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get());
840    let partitions_keys = snapshot.metadata().partition_columns();
841
842    let (operations, metrics) = match optimize_type {
843        OptimizeType::Compact => {
844            info!("building compaction plan");
845            build_compaction_plan(log_store, snapshot, filters, target_size).await?
846        }
847        OptimizeType::ZOrder(zorder_columns) => {
848            info!("building z-order plan");
849            build_zorder_plan(
850                log_store,
851                zorder_columns,
852                snapshot,
853                partitions_keys,
854                filters,
855                session,
856            )
857            .await?
858        }
859    };
860
861    info!(
862        partitions_optimized = metrics.partitions_optimized,
863        total_considered_files = metrics.total_considered_files,
864        "merge plan created"
865    );
866
867    let input_parameters = OptimizeInput {
868        target_size,
869        predicate: serde_json::to_string(filters).ok(),
870    };
871    let file_schema = arrow_schema_without_partitions(
872        &Arc::new(snapshot.schema().as_ref().try_into_arrow()?),
873        partitions_keys,
874    );
875
876    Ok(MergePlan {
877        operations,
878        metrics,
879        task_parameters: Arc::new(MergeTaskParameters {
880            input_parameters,
881            file_schema,
882            writer_properties,
883            num_indexed_cols: snapshot.table_properties().num_indexed_cols(),
884            stats_columns: snapshot
885                .table_properties()
886                .data_skipping_stats_columns
887                .as_ref()
888                .map(|v| v.iter().map(|v| v.to_string()).collect::<Vec<String>>()),
889        }),
890        read_table_version: snapshot.version(),
891    })
892}
893
894/// A collection of bins for a particular partition
895#[derive(Debug, Clone)]
896struct MergeBin {
897    files: Vec<Add>,
898    size_bytes: u64,
899}
900
901impl MergeBin {
902    pub fn new() -> Self {
903        MergeBin {
904            files: Vec::new(),
905            size_bytes: 0,
906        }
907    }
908
909    fn total_file_size(&self) -> u64 {
910        self.size_bytes
911    }
912
913    fn len(&self) -> usize {
914        self.files.len()
915    }
916
917    fn add(&mut self, add: Add) {
918        self.size_bytes += add.size as u64;
919        self.files.push(add);
920    }
921
922    fn iter(&self) -> impl Iterator<Item = &Add> {
923        self.files.iter()
924    }
925}
926
927impl IntoIterator for MergeBin {
928    type Item = Add;
929    type IntoIter = std::vec::IntoIter<Self::Item>;
930
931    fn into_iter(self) -> Self::IntoIter {
932        self.files.into_iter()
933    }
934}
935
936async fn build_compaction_plan(
937    log_store: &dyn LogStore,
938    snapshot: &EagerSnapshot,
939    filters: &[PartitionFilter],
940    target_size: u64,
941) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
942    let mut metrics = Metrics::default();
943
944    let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, Vec<Add>)> = HashMap::new();
945    let mut file_stream = snapshot.file_views_by_partitions(log_store, filters);
946    while let Some(file) = file_stream.next().await {
947        let file = file?;
948        metrics.total_considered_files += 1;
949        let object_meta = ObjectMeta::try_from(&file)?;
950        if object_meta.size > target_size {
951            metrics.total_files_skipped += 1;
952            continue;
953        }
954        let partition_values = file
955            .partition_values()
956            .map(|v| {
957                v.fields()
958                    .iter()
959                    .zip(v.values().iter())
960                    .map(|(k, v)| (k.name().to_string(), v.clone()))
961                    .collect::<IndexMap<_, _>>()
962            })
963            .unwrap_or_default();
964
965        partition_files
966            .entry(partition_values.hive_partition_path())
967            .or_insert_with(|| (partition_values, vec![]))
968            .1
969            .push(file.add_action());
970    }
971
972    for (_, file) in partition_files.values_mut() {
973        // Sort files by size: largest to smallest
974        file.sort_by(|a, b| b.size.cmp(&a.size));
975    }
976
977    let mut operations: HashMap<String, (IndexMap<String, Scalar>, Vec<MergeBin>)> = HashMap::new();
978    for (part, (partition, files)) in partition_files {
979        let mut merge_bins = vec![MergeBin::new()];
980
981        'files: for file in files {
982            for bin in merge_bins.iter_mut() {
983                if bin.total_file_size() + file.size as u64 <= target_size {
984                    bin.add(file);
985                    // Move to next file
986                    continue 'files;
987                }
988            }
989            // Didn't find a bin to add to, so create a new one
990            let mut new_bin = MergeBin::new();
991            new_bin.add(file);
992            merge_bins.push(new_bin);
993        }
994
995        operations.insert(part, (partition, merge_bins));
996    }
997
998    // Prune merge bins with only 1 file, since they have no effect
999    for (_, (_, bins)) in operations.iter_mut() {
1000        bins.retain(|bin| {
1001            if bin.len() == 1 {
1002                metrics.total_files_skipped += 1;
1003                false
1004            } else {
1005                true
1006            }
1007        })
1008    }
1009    operations.retain(|_, (_, files)| !files.is_empty());
1010
1011    metrics.partitions_optimized = operations.len() as u64;
1012
1013    Ok((OptimizeOperations::Compact(operations), metrics))
1014}
1015
1016async fn build_zorder_plan(
1017    log_store: &dyn LogStore,
1018    zorder_columns: Vec<String>,
1019    snapshot: &EagerSnapshot,
1020    partition_keys: &[String],
1021    filters: &[PartitionFilter],
1022    session: SessionState,
1023) -> Result<(OptimizeOperations, Metrics), DeltaTableError> {
1024    if zorder_columns.is_empty() {
1025        return Err(DeltaTableError::Generic(
1026            "Z-order requires at least one column".to_string(),
1027        ));
1028    }
1029    let zorder_partition_cols = zorder_columns
1030        .iter()
1031        .filter(|col| partition_keys.contains(col))
1032        .collect_vec();
1033    if !zorder_partition_cols.is_empty() {
1034        return Err(DeltaTableError::Generic(format!(
1035            "Z-order columns cannot be partition columns. Found: {zorder_partition_cols:?}"
1036        )));
1037    }
1038    let field_names = snapshot
1039        .schema()
1040        .fields()
1041        .map(|field| field.name().to_string())
1042        .collect_vec();
1043    let unknown_columns = zorder_columns
1044        .iter()
1045        .filter(|col| !field_names.contains(col))
1046        .collect_vec();
1047    if !unknown_columns.is_empty() {
1048        return Err(DeltaTableError::Generic(
1049            format!("Z-order columns must be present in the table schema. Unknown columns: {unknown_columns:?}"),
1050        ));
1051    }
1052
1053    // For now, just be naive and optimize all files in each selected partition.
1054    let mut metrics = Metrics::default();
1055
1056    let mut partition_files: HashMap<String, (IndexMap<String, Scalar>, MergeBin)> = HashMap::new();
1057    let mut file_stream = snapshot.file_views_by_partitions(log_store, filters);
1058    while let Some(file) = file_stream.next().await {
1059        let file = file?;
1060        let partition_values = file
1061            .partition_values()
1062            .map(|v| {
1063                v.fields()
1064                    .iter()
1065                    .zip(v.values().iter())
1066                    .map(|(k, v)| (k.name().to_string(), v.clone()))
1067                    .collect::<IndexMap<_, _>>()
1068            })
1069            .unwrap_or_default();
1070        metrics.total_considered_files += 1;
1071        partition_files
1072            .entry(partition_values.hive_partition_path())
1073            .or_insert_with(|| (partition_values, MergeBin::new()))
1074            .1
1075            .add(file.add_action());
1076        debug!("partition_files inside the zorder plan: {partition_files:?}");
1077    }
1078
1079    let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, Box::new(session));
1080    Ok((operation, metrics))
1081}
1082
1083pub(super) mod util {
1084    use super::*;
1085    use futures::Future;
1086    use tokio::task::JoinError;
1087
1088    pub async fn flatten_join_error<T, E>(
1089        future: impl Future<Output = Result<Result<T, E>, JoinError>>,
1090    ) -> Result<T, DeltaTableError>
1091    where
1092        E: Into<DeltaTableError>,
1093    {
1094        match future.await {
1095            Ok(Ok(result)) => Ok(result),
1096            Ok(Err(error)) => Err(error.into()),
1097            Err(error) => Err(DeltaTableError::GenericError {
1098                source: Box::new(error),
1099            }),
1100        }
1101    }
1102}
1103
1104/// Z-order utilities
1105pub(super) mod zorder {
1106    use super::*;
1107
1108    use arrow::buffer::{Buffer, OffsetBuffer, ScalarBuffer};
1109    use arrow_array::{Array, ArrayRef, BinaryArray};
1110    use arrow_buffer::bit_util::{get_bit_raw, set_bit_raw, unset_bit_raw};
1111    use arrow_row::{Row, RowConverter, SortField};
1112    use arrow_schema::ArrowError;
1113    // use arrow_schema::Schema as ArrowSchema;
1114
1115    pub use self::datafusion::ZOrderExecContext;
1116
1117    pub(super) mod datafusion {
1118        use super::*;
1119        use url::Url;
1120
1121        use ::datafusion::common::DataFusionError;
1122        use ::datafusion::logical_expr::{
1123            ColumnarValue, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature, TypeSignature,
1124            Volatility,
1125        };
1126        use ::datafusion::prelude::SessionContext;
1127        use arrow_schema::DataType;
1128        use itertools::Itertools;
1129        use std::any::Any;
1130
1131        pub const ZORDER_UDF_NAME: &str = "zorder_key";
1132
1133        pub struct ZOrderExecContext {
1134            pub columns: Arc<[String]>,
1135            pub ctx: SessionContext,
1136        }
1137
1138        impl ZOrderExecContext {
1139            pub fn new(
1140                columns: Vec<String>,
1141                session: SessionState,
1142                object_store_ref: ObjectStoreRef,
1143            ) -> Result<Self, DataFusionError> {
1144                let columns = columns.into();
1145
1146                let ctx = SessionContext::new_with_state(session);
1147                ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF));
1148                ctx.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store_ref);
1149                Ok(Self { columns, ctx })
1150            }
1151        }
1152
1153        // DataFusion UDF impl for zorder_key
1154        #[derive(Debug, Hash, PartialEq, Eq)]
1155        pub struct ZOrderUDF;
1156
1157        impl ScalarUDFImpl for ZOrderUDF {
1158            fn as_any(&self) -> &dyn Any {
1159                self
1160            }
1161
1162            fn name(&self) -> &str {
1163                ZORDER_UDF_NAME
1164            }
1165
1166            fn signature(&self) -> &Signature {
1167                &Signature {
1168                    type_signature: TypeSignature::VariadicAny,
1169                    volatility: Volatility::Immutable,
1170                }
1171            }
1172
1173            fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
1174                Ok(DataType::Binary)
1175            }
1176
1177            fn invoke_with_args(
1178                &self,
1179                args: ScalarFunctionArgs,
1180            ) -> ::datafusion::common::Result<ColumnarValue> {
1181                zorder_key_datafusion(&args.args)
1182            }
1183        }
1184
1185        /// Datafusion zorder UDF body
1186        fn zorder_key_datafusion(
1187            columns: &[ColumnarValue],
1188        ) -> Result<ColumnarValue, DataFusionError> {
1189            debug!("zorder_key_datafusion: {columns:#?}");
1190            let length = columns
1191                .iter()
1192                .map(|col| match col {
1193                    ColumnarValue::Array(array) => array.len(),
1194                    ColumnarValue::Scalar(_) => 1,
1195                })
1196                .max()
1197                .ok_or(DataFusionError::NotImplemented(
1198                    "z-order on zero columns.".to_string(),
1199                ))?;
1200            let columns: Vec<ArrayRef> = columns
1201                .iter()
1202                .map(|col| col.clone().into_array(length))
1203                .try_collect()?;
1204            let array = zorder_key(&columns)?;
1205            Ok(ColumnarValue::Array(array))
1206        }
1207
1208        #[cfg(test)]
1209        mod tests {
1210            use super::*;
1211            use ::datafusion::assert_batches_eq;
1212            use arrow_array::{Int32Array, StringArray};
1213            use arrow_ord::sort::sort_to_indices;
1214            use arrow_schema::Field;
1215            use arrow_select::take::take;
1216            use rand::Rng;
1217            #[test]
1218            fn test_order() {
1219                let int: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
1220                let str: ArrayRef = Arc::new(StringArray::from(vec![
1221                    Some("a"),
1222                    Some("x"),
1223                    Some("a"),
1224                    Some("x"),
1225                    None,
1226                ]));
1227                let int_large: ArrayRef = Arc::new(Int32Array::from(vec![10000, 2000, 300, 40, 5]));
1228                let batch = RecordBatch::try_from_iter(vec![
1229                    ("int", int),
1230                    ("str", str),
1231                    ("int_large", int_large),
1232                ])
1233                .unwrap();
1234
1235                let expected_1 = vec![
1236                    "+-----+-----+-----------+",
1237                    "| int | str | int_large |",
1238                    "+-----+-----+-----------+",
1239                    "| 1   | a   | 10000     |",
1240                    "| 2   | x   | 2000      |",
1241                    "| 3   | a   | 300       |",
1242                    "| 4   | x   | 40        |",
1243                    "| 5   |     | 5         |",
1244                    "+-----+-----+-----------+",
1245                ];
1246                let expected_2 = vec![
1247                    "+-----+-----+-----------+",
1248                    "| int | str | int_large |",
1249                    "+-----+-----+-----------+",
1250                    "| 5   |     | 5         |",
1251                    "| 1   | a   | 10000     |",
1252                    "| 3   | a   | 300       |",
1253                    "| 2   | x   | 2000      |",
1254                    "| 4   | x   | 40        |",
1255                    "+-----+-----+-----------+",
1256                ];
1257                let expected_3 = vec![
1258                    "+-----+-----+-----------+",
1259                    "| int | str | int_large |",
1260                    "+-----+-----+-----------+",
1261                    "| 5   |     | 5         |",
1262                    "| 4   | x   | 40        |",
1263                    "| 2   | x   | 2000      |",
1264                    "| 3   | a   | 300       |",
1265                    "| 1   | a   | 10000     |",
1266                    "+-----+-----+-----------+",
1267                ];
1268
1269                let expected = [expected_1, expected_2, expected_3];
1270
1271                let indices = Int32Array::from(shuffled_indices().to_vec());
1272                let shuffled_columns = batch
1273                    .columns()
1274                    .iter()
1275                    .map(|c| take(c, &indices, None).unwrap())
1276                    .collect::<Vec<_>>();
1277                let shuffled_batch =
1278                    RecordBatch::try_new(batch.schema(), shuffled_columns).unwrap();
1279
1280                for i in 1..=batch.num_columns() {
1281                    let columns = (0..i)
1282                        .map(|idx| shuffled_batch.column(idx).clone())
1283                        .collect::<Vec<_>>();
1284
1285                    let order_keys = zorder_key(&columns).unwrap();
1286                    let indices = sort_to_indices(order_keys.as_ref(), None, None).unwrap();
1287                    let sorted_columns = shuffled_batch
1288                        .columns()
1289                        .iter()
1290                        .map(|c| take(c, &indices, None).unwrap())
1291                        .collect::<Vec<_>>();
1292                    let sorted_batch =
1293                        RecordBatch::try_new(batch.schema(), sorted_columns).unwrap();
1294
1295                    assert_batches_eq!(expected[i - 1], &[sorted_batch]);
1296                }
1297            }
1298            fn shuffled_indices() -> [i32; 5] {
1299                let mut rng = rand::thread_rng();
1300                let mut array = [0, 1, 2, 3, 4];
1301                for i in (1..array.len()).rev() {
1302                    let j = rng.gen_range(0..=i);
1303                    array.swap(i, j);
1304                }
1305                array
1306            }
1307
1308            #[tokio::test]
1309            async fn test_zorder_mixed_case() {
1310                use arrow_schema::Schema as ArrowSchema;
1311                let schema = Arc::new(ArrowSchema::new(vec![
1312                    Field::new("moDified", DataType::Utf8, true),
1313                    Field::new("ID", DataType::Utf8, true),
1314                    Field::new("vaLue", DataType::Int32, true),
1315                ]));
1316
1317                let batch = RecordBatch::try_new(
1318                    schema.clone(),
1319                    vec![
1320                        Arc::new(arrow::array::StringArray::from(vec![
1321                            "2021-02-01",
1322                            "2021-02-01",
1323                            "2021-02-02",
1324                            "2021-02-02",
1325                        ])),
1326                        Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1327                        Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1328                    ],
1329                )
1330                .unwrap();
1331                // write some data
1332                let table = crate::DeltaOps::new_in_memory()
1333                    .write(vec![batch.clone()])
1334                    .with_save_mode(crate::protocol::SaveMode::Append)
1335                    .await
1336                    .unwrap();
1337
1338                let res = crate::DeltaOps(table)
1339                    .optimize()
1340                    .with_type(OptimizeType::ZOrder(vec!["moDified".into()]))
1341                    .await;
1342                assert!(res.is_ok());
1343            }
1344
1345            /// Issue <https://github.com/delta-io/delta-rs/issues/2834>
1346            #[tokio::test]
1347            async fn test_zorder_space_in_partition_value() {
1348                use arrow_schema::Schema as ArrowSchema;
1349                let _ = pretty_env_logger::try_init();
1350                let schema = Arc::new(ArrowSchema::new(vec![
1351                    Field::new("modified", DataType::Utf8, true),
1352                    Field::new("country", DataType::Utf8, true),
1353                    Field::new("value", DataType::Int32, true),
1354                ]));
1355
1356                let batch = RecordBatch::try_new(
1357                    schema.clone(),
1358                    vec![
1359                        Arc::new(arrow::array::StringArray::from(vec![
1360                            "2021-02-01",
1361                            "2021-02-01",
1362                            "2021-02-02",
1363                            "2021-02-02",
1364                        ])),
1365                        Arc::new(arrow::array::StringArray::from(vec![
1366                            "Germany",
1367                            "China",
1368                            "Canada",
1369                            "Dominican Republic",
1370                        ])),
1371                        Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1372                        //Arc::new(arrow::array::StringArray::from(vec!["Dominican Republic"])),
1373                        //Arc::new(arrow::array::Int32Array::from(vec![100])),
1374                    ],
1375                )
1376                .unwrap();
1377                // write some data
1378                let table = crate::DeltaOps::new_in_memory()
1379                    .write(vec![batch.clone()])
1380                    .with_partition_columns(vec!["country"])
1381                    .with_save_mode(crate::protocol::SaveMode::Overwrite)
1382                    .await
1383                    .unwrap();
1384
1385                let res = crate::DeltaOps(table)
1386                    .optimize()
1387                    .with_type(OptimizeType::ZOrder(vec!["modified".into()]))
1388                    .await;
1389                assert!(res.is_ok(), "Failed to optimize: {res:#?}");
1390            }
1391
1392            #[tokio::test]
1393            async fn test_zorder_space_in_partition_value_garbage() {
1394                use arrow_schema::Schema as ArrowSchema;
1395                let _ = pretty_env_logger::try_init();
1396                let schema = Arc::new(ArrowSchema::new(vec![
1397                    Field::new("modified", DataType::Utf8, true),
1398                    Field::new("country", DataType::Utf8, true),
1399                    Field::new("value", DataType::Int32, true),
1400                ]));
1401
1402                let batch = RecordBatch::try_new(
1403                    schema.clone(),
1404                    vec![
1405                        Arc::new(arrow::array::StringArray::from(vec![
1406                            "2021-02-01",
1407                            "2021-02-01",
1408                            "2021-02-02",
1409                            "2021-02-02",
1410                        ])),
1411                        Arc::new(arrow::array::StringArray::from(vec![
1412                            "Germany", "China", "Canada", "USA$$!",
1413                        ])),
1414                        Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1415                    ],
1416                )
1417                .unwrap();
1418                // write some data
1419                let table = crate::DeltaOps::new_in_memory()
1420                    .write(vec![batch.clone()])
1421                    .with_partition_columns(vec!["country"])
1422                    .with_save_mode(crate::protocol::SaveMode::Overwrite)
1423                    .await
1424                    .unwrap();
1425
1426                let res = crate::DeltaOps(table)
1427                    .optimize()
1428                    .with_type(OptimizeType::ZOrder(vec!["modified".into()]))
1429                    .await;
1430                assert!(res.is_ok(), "Failed to optimize: {res:#?}");
1431            }
1432        }
1433    }
1434
1435    /// Creates a new binary array containing the zorder keys for the given columns
1436    ///
1437    /// Each value is 16 bytes * number of columns. Each column is converted into
1438    /// its row binary representation, and then the first 16 bytes are taken.
1439    /// These truncated values are interleaved in the array values.
1440    pub fn zorder_key(columns: &[ArrayRef]) -> Result<ArrayRef, ArrowError> {
1441        if columns.is_empty() {
1442            return Err(ArrowError::InvalidArgumentError(
1443                "Cannot zorder empty columns".to_string(),
1444            ));
1445        }
1446
1447        // length is length of first array or 1 if all scalars
1448        let out_length = columns[0].len();
1449
1450        if columns.iter().any(|col| col.len() != out_length) {
1451            return Err(ArrowError::InvalidArgumentError(
1452                "All columns must have the same length".to_string(),
1453            ));
1454        }
1455
1456        // We are taking 128 bits (16 bytes) from each value. Shorter values will be padded.
1457        let value_size: usize = columns.len() * 16;
1458
1459        // Initialize with zeros
1460        let mut out: Vec<u8> = vec![0; out_length * value_size];
1461
1462        for (col_pos, col) in columns.iter().enumerate() {
1463            set_bits_for_column(col.clone(), col_pos, columns.len(), &mut out)?;
1464        }
1465
1466        let offsets = (0..=out_length)
1467            .map(|i| (i * value_size) as i32)
1468            .collect::<Vec<i32>>();
1469
1470        let out_arr = BinaryArray::try_new(
1471            OffsetBuffer::new(ScalarBuffer::from(offsets)),
1472            Buffer::from_vec(out),
1473            None,
1474        )?;
1475
1476        Ok(Arc::new(out_arr))
1477    }
1478
1479    /// Given an input array, will set the bits in the output array
1480    ///
1481    /// Arguments:
1482    /// * `input` - The input array
1483    /// * `col_pos` - The position of the column. Used to determine position
1484    ///   when interleaving.
1485    /// * `num_columns` - The number of columns in the input array. Used to
1486    ///   determine offset when interleaving.
1487    fn set_bits_for_column(
1488        input: ArrayRef,
1489        col_pos: usize,
1490        num_columns: usize,
1491        out: &mut Vec<u8>,
1492    ) -> Result<(), ArrowError> {
1493        // Convert array to rows
1494        let converter = RowConverter::new(vec![SortField::new(input.data_type().clone())])?;
1495        let rows = converter.convert_columns(&[input])?;
1496
1497        for (row_i, row) in rows.iter().enumerate() {
1498            // How many bytes to get to this row's out position
1499            let row_offset = row_i * num_columns * 16;
1500            for bit_i in 0..128 {
1501                let bit = row.get_bit(bit_i);
1502                // Position of bit within the value. We place a value every
1503                // `num_columns` bits, offset by `col_pos` when interleaving.
1504                // So if there are 3 columns, and we are the second column, then
1505                // we place values at index: 1, 4, 7, 10, etc.
1506                let bit_pos = (bit_i * num_columns) + col_pos;
1507                let out_pos = (row_offset * 8) + bit_pos;
1508                // Safety: we pre-sized the output vector in the outer function
1509                if bit {
1510                    unsafe { set_bit_raw(out.as_mut_ptr(), out_pos) };
1511                } else {
1512                    unsafe { unset_bit_raw(out.as_mut_ptr(), out_pos) };
1513                }
1514            }
1515        }
1516
1517        Ok(())
1518    }
1519
1520    trait RowBitUtil {
1521        fn get_bit(&self, bit_i: usize) -> bool;
1522    }
1523
1524    impl RowBitUtil for Row<'_> {
1525        /// Get the bit at the given index, or just give false if the index is out of bounds
1526        fn get_bit(&self, bit_i: usize) -> bool {
1527            let byte_i = bit_i / 8;
1528            let bytes = self.as_ref();
1529            if byte_i >= bytes.len() {
1530                return false;
1531            }
1532            // Safety: we just did a bounds check above
1533            unsafe { get_bit_raw(bytes.as_ptr(), bit_i) }
1534        }
1535    }
1536
1537    #[cfg(test)]
1538    mod test {
1539        use arrow_array::{
1540            cast::as_generic_binary_array, new_empty_array, StringArray, UInt8Array,
1541        };
1542        use arrow_schema::DataType;
1543
1544        use super::*;
1545        use crate::ensure_table_uri;
1546
1547        #[test]
1548        fn test_rejects_no_columns() {
1549            let columns = vec![];
1550            let result = zorder_key(&columns);
1551            assert!(result.is_err());
1552        }
1553
1554        #[test]
1555        fn test_handles_no_rows() {
1556            let columns: Vec<ArrayRef> = vec![
1557                Arc::new(new_empty_array(&DataType::Int64)),
1558                Arc::new(new_empty_array(&DataType::Utf8)),
1559            ];
1560            let result = zorder_key(columns.as_slice());
1561            assert!(result.is_ok());
1562            let result = result.unwrap();
1563            assert_eq!(result.len(), 0);
1564        }
1565
1566        #[test]
1567        fn test_basics() {
1568            let columns: Vec<ArrayRef> = vec![
1569                // Small strings
1570                Arc::new(StringArray::from(vec![Some("a"), Some("b"), None])),
1571                // Strings of various sizes
1572                Arc::new(StringArray::from(vec![
1573                    "delta-rs: A native Rust library for Delta Lake, with bindings into Python",
1574                    "cat",
1575                    "",
1576                ])),
1577                Arc::new(UInt8Array::from(vec![Some(1), Some(4), None])),
1578            ];
1579            let result = zorder_key(columns.as_slice()).unwrap();
1580            assert_eq!(result.len(), 3);
1581            assert_eq!(result.data_type(), &DataType::Binary);
1582            assert_eq!(result.null_count(), 0);
1583
1584            let data: &BinaryArray = as_generic_binary_array(result.as_ref());
1585            assert_eq!(data.value_data().len(), 3 * 16 * 3);
1586            assert!(data.iter().all(|x| x.unwrap().len() == 3 * 16));
1587        }
1588
1589        #[tokio::test]
1590        async fn works_on_spark_table() {
1591            use crate::DeltaOps;
1592            use tempfile::TempDir;
1593            // Create a temporary directory
1594            let tmp_dir = TempDir::new().expect("Failed to make temp dir");
1595            let table_name = "delta-1.2.1-only-struct-stats";
1596
1597            // Copy recursively from the test data directory to the temporary directory
1598            let source_path = format!("../test/tests/data/{table_name}");
1599            fs_extra::dir::copy(source_path, tmp_dir.path(), &Default::default()).unwrap();
1600
1601            let table_uri =
1602                ensure_table_uri(tmp_dir.path().join(table_name).to_str().unwrap()).unwrap();
1603            // Run optimize
1604            let (_, metrics) = DeltaOps::try_from_uri(table_uri)
1605                .await
1606                .unwrap()
1607                .optimize()
1608                .await
1609                .unwrap();
1610
1611            // Verify it worked
1612            assert_eq!(metrics.num_files_added, 1);
1613        }
1614    }
1615}