Skip to main content

deltalake_core/delta_datafusion/
mod.rs

1//! Datafusion integration for Delta Table
2//!
3//! Example:
4//!
5//! ```rust
6//! use std::sync::Arc;
7//! use datafusion::execution::context::SessionContext;
8//!
9//! async {
10//!   let mut ctx = SessionContext::new();
11//!   let table = deltalake_core::open_table_with_storage_options(
12//!       url::Url::parse("memory://").unwrap(),
13//!       std::collections::HashMap::new()
14//!   )
15//!       .await
16//!       .unwrap();
17//!   ctx.register_table("demo", Arc::new(table)).unwrap();
18//!
19//!   let batches = ctx
20//!       .sql("SELECT * FROM demo").await.unwrap()
21//!       .collect()
22//!       .await.unwrap();
23//! };
24//! ```
25
26use std::fmt::Debug;
27use std::sync::Arc;
28
29use arrow::array::types::UInt16Type;
30use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
31use arrow_cast::display::array_value_to_string;
32use arrow_cast::{CastOptions, cast_with_options};
33use arrow_schema::{
34    DataType as ArrowDataType, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef,
35    TimeUnit,
36};
37use datafusion::catalog::{Session, TableProviderFactory};
38use datafusion::common::scalar::ScalarValue;
39use datafusion::common::{
40    Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema,
41};
42use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
43use datafusion::datasource::{MemTable, TableProvider};
44use datafusion::execution::TaskContext;
45use datafusion::execution::context::SessionContext;
46use datafusion::execution::runtime_env::RuntimeEnv;
47use datafusion::logical_expr::logical_plan::CreateExternalTable;
48use datafusion::logical_expr::utils::conjunction;
49use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
50use datafusion::physical_optimizer::pruning::PruningPredicate;
51use datafusion::physical_plan::{ExecutionPlan, Statistics};
52use datafusion_proto::logical_plan::LogicalExtensionCodec;
53use datafusion_proto::physical_plan::PhysicalExtensionCodec;
54use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
55use either::Either;
56use itertools::Itertools;
57use url::Url;
58
59use crate::delta_datafusion::expr::parse_predicate_expression;
60use crate::delta_datafusion::table_provider::DeltaScanWire;
61use crate::ensure_table_uri;
62use crate::errors::{DeltaResult, DeltaTableError};
63use crate::kernel::{
64    Add, DataCheck, EagerSnapshot, Invariant, LogDataHandler, Snapshot, StructTypeExt,
65};
66use crate::logstore::{LogStore, LogStoreRef};
67use crate::table::config::TablePropertiesExt as _;
68use crate::table::state::DeltaTableState;
69use crate::table::{Constraint, GeneratedColumn};
70use crate::{DeltaTable, open_table, open_table_with_storage_options};
71
72pub(crate) use self::session::session_state_from_session;
73pub use self::session::{
74    DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, DeltaSessionContext,
75    create_session,
76};
77pub use self::table_provider::next::DeltaScan as DeltaScanNext;
78pub use self::table_provider::next::SnapshotWrapper;
79pub(crate) use find_files::*;
80
81pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
82
83pub mod cdf;
84pub mod engine;
85pub mod expr;
86mod find_files;
87pub mod logical;
88pub mod physical;
89pub mod planner;
90mod schema_adapter;
91mod session;
92mod table_provider;
93
94pub use cdf::scan::DeltaCdfTableProvider;
95pub(crate) use table_provider::DeltaScanBuilder;
96pub use table_provider::{DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider};
97
98impl From<DeltaTableError> for DataFusionError {
99    fn from(err: DeltaTableError) -> Self {
100        match err {
101            DeltaTableError::Arrow { source } => DataFusionError::from(source),
102            DeltaTableError::Io { source } => DataFusionError::IoError(source),
103            DeltaTableError::ObjectStore { source } => DataFusionError::from(source),
104            DeltaTableError::Parquet { source } => DataFusionError::from(source),
105            _ => DataFusionError::External(Box::new(err)),
106        }
107    }
108}
109
110impl From<DataFusionError> for DeltaTableError {
111    fn from(err: DataFusionError) -> Self {
112        match err {
113            DataFusionError::ArrowError(source, _) => DeltaTableError::from(*source),
114            DataFusionError::IoError(source) => DeltaTableError::Io { source },
115            DataFusionError::ObjectStore(source) => DeltaTableError::from(*source),
116            DataFusionError::ParquetError(source) => DeltaTableError::from(*source),
117            _ => DeltaTableError::Generic(err.to_string()),
118        }
119    }
120}
121
122/// Convenience trait for calling common methods on snapshot hierarchies
123pub trait DataFusionMixins {
124    /// The physical datafusion schema of a table
125    fn read_schema(&self) -> ArrowSchemaRef;
126
127    /// Get the table schema as an [`ArrowSchemaRef`]
128    fn input_schema(&self) -> ArrowSchemaRef;
129
130    /// Parse an expression string into a datafusion [`Expr`]
131    fn parse_predicate_expression(
132        &self,
133        expr: impl AsRef<str>,
134        session: &dyn Session,
135    ) -> DeltaResult<Expr>;
136}
137
138impl DataFusionMixins for Snapshot {
139    fn read_schema(&self) -> ArrowSchemaRef {
140        _arrow_schema(
141            self.arrow_schema(),
142            self.metadata().partition_columns(),
143            true,
144        )
145    }
146
147    fn input_schema(&self) -> ArrowSchemaRef {
148        _arrow_schema(
149            self.arrow_schema(),
150            self.metadata().partition_columns(),
151            false,
152        )
153    }
154
155    fn parse_predicate_expression(
156        &self,
157        expr: impl AsRef<str>,
158        session: &dyn Session,
159    ) -> DeltaResult<Expr> {
160        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
161        parse_predicate_expression(&schema, expr, session)
162    }
163}
164
165impl DataFusionMixins for LogDataHandler<'_> {
166    fn read_schema(&self) -> ArrowSchemaRef {
167        _arrow_schema(
168            Arc::new(
169                self.table_configuration()
170                    .schema()
171                    .as_ref()
172                    .try_into_arrow()
173                    .unwrap(),
174            ),
175            self.table_configuration().metadata().partition_columns(),
176            true,
177        )
178    }
179
180    fn input_schema(&self) -> ArrowSchemaRef {
181        _arrow_schema(
182            Arc::new(
183                self.table_configuration()
184                    .schema()
185                    .as_ref()
186                    .try_into_arrow()
187                    .unwrap(),
188            ),
189            self.table_configuration().metadata().partition_columns(),
190            false,
191        )
192    }
193
194    fn parse_predicate_expression(
195        &self,
196        expr: impl AsRef<str>,
197        session: &dyn Session,
198    ) -> DeltaResult<Expr> {
199        let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
200        parse_predicate_expression(&schema, expr, session)
201    }
202}
203
204impl DataFusionMixins for EagerSnapshot {
205    fn read_schema(&self) -> ArrowSchemaRef {
206        self.snapshot().read_schema()
207    }
208
209    fn input_schema(&self) -> ArrowSchemaRef {
210        self.snapshot().input_schema()
211    }
212
213    fn parse_predicate_expression(
214        &self,
215        expr: impl AsRef<str>,
216        session: &dyn Session,
217    ) -> DeltaResult<Expr> {
218        self.snapshot().parse_predicate_expression(expr, session)
219    }
220}
221
222fn _arrow_schema(
223    schema: SchemaRef,
224    partition_columns: &[String],
225    wrap_partitions: bool,
226) -> ArrowSchemaRef {
227    let fields = schema
228        .fields()
229        .into_iter()
230        .filter(|f| !partition_columns.contains(&f.name().to_string()))
231        .cloned()
232        .chain(
233            // We need stable order between logical and physical schemas, but the order of
234            // partitioning columns is not always the same in the json schema and the array
235            partition_columns.iter().map(|partition_col| {
236                let field = schema.field_with_name(partition_col).unwrap();
237                let corrected = if wrap_partitions {
238                    match field.data_type() {
239                        // Only dictionary-encode types that may be large
240                        // // https://github.com/apache/arrow-datafusion/pull/5545
241                        ArrowDataType::Utf8
242                        | ArrowDataType::LargeUtf8
243                        | ArrowDataType::Binary
244                        | ArrowDataType::LargeBinary => {
245                            wrap_partition_type_in_dict(field.data_type().clone())
246                        }
247                        _ => field.data_type().clone(),
248                    }
249                } else {
250                    field.data_type().clone()
251                };
252                Arc::new(field.clone().with_data_type(corrected))
253            }),
254        )
255        .collect::<Vec<_>>();
256    Arc::new(ArrowSchema::new(fields))
257}
258
259pub(crate) fn files_matching_predicate<'a>(
260    log_data: LogDataHandler<'a>,
261    filters: &[Expr],
262) -> DeltaResult<impl Iterator<Item = Add> + 'a> {
263    if let Some(Some(predicate)) =
264        (!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
265    {
266        let expr = SessionContext::new()
267            .create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
268        let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
269        let mask = pruning_predicate.prune(&log_data)?;
270
271        Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
272            |(file, keep_file)| {
273                if keep_file {
274                    Some(file.add_action())
275                } else {
276                    None
277                }
278            },
279        )))
280    } else {
281        Ok(Either::Right(
282            log_data.into_iter().map(|file| file.add_action()),
283        ))
284    }
285}
286
287pub(crate) fn get_path_column<'a>(
288    batch: &'a RecordBatch,
289    path_column: &str,
290) -> DeltaResult<TypedDictionaryArray<'a, UInt16Type, StringArray>> {
291    let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string());
292    batch
293        .column_by_name(path_column)
294        .unwrap()
295        .as_any()
296        .downcast_ref::<DictionaryArray<UInt16Type>>()
297        .ok_or_else(err)?
298        .downcast_dict::<StringArray>()
299        .ok_or_else(err)
300}
301
302impl DeltaTableState {
303    /// Provide table level statistics to Datafusion
304    pub fn datafusion_table_statistics(&self) -> Option<Statistics> {
305        self.snapshot.log_data().statistics()
306    }
307}
308
309// each delta table must register a specific object store, since paths are internally
310// handled relative to the table root.
311pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) {
312    let object_store_url = store.object_store_url();
313    let url: &Url = object_store_url.as_ref();
314    env.register_object_store(url, store.object_store(None));
315}
316
317pub(crate) fn get_null_of_arrow_type(t: &ArrowDataType) -> DeltaResult<ScalarValue> {
318    match t {
319        ArrowDataType::Null => Ok(ScalarValue::Null),
320        ArrowDataType::Boolean => Ok(ScalarValue::Boolean(None)),
321        ArrowDataType::Int8 => Ok(ScalarValue::Int8(None)),
322        ArrowDataType::Int16 => Ok(ScalarValue::Int16(None)),
323        ArrowDataType::Int32 => Ok(ScalarValue::Int32(None)),
324        ArrowDataType::Int64 => Ok(ScalarValue::Int64(None)),
325        ArrowDataType::UInt8 => Ok(ScalarValue::UInt8(None)),
326        ArrowDataType::UInt16 => Ok(ScalarValue::UInt16(None)),
327        ArrowDataType::UInt32 => Ok(ScalarValue::UInt32(None)),
328        ArrowDataType::UInt64 => Ok(ScalarValue::UInt64(None)),
329        ArrowDataType::Float32 => Ok(ScalarValue::Float32(None)),
330        ArrowDataType::Float64 => Ok(ScalarValue::Float64(None)),
331        ArrowDataType::Date32 => Ok(ScalarValue::Date32(None)),
332        ArrowDataType::Date64 => Ok(ScalarValue::Date64(None)),
333        ArrowDataType::Binary => Ok(ScalarValue::Binary(None)),
334        ArrowDataType::FixedSizeBinary(size) => {
335            Ok(ScalarValue::FixedSizeBinary(size.to_owned(), None))
336        }
337        ArrowDataType::LargeBinary => Ok(ScalarValue::LargeBinary(None)),
338        ArrowDataType::Utf8 => Ok(ScalarValue::Utf8(None)),
339        ArrowDataType::LargeUtf8 => Ok(ScalarValue::LargeUtf8(None)),
340        ArrowDataType::Decimal128(precision, scale) => Ok(ScalarValue::Decimal128(
341            None,
342            precision.to_owned(),
343            scale.to_owned(),
344        )),
345        ArrowDataType::Timestamp(unit, tz) => {
346            let tz = tz.to_owned();
347            Ok(match unit {
348                TimeUnit::Second => ScalarValue::TimestampSecond(None, tz),
349                TimeUnit::Millisecond => ScalarValue::TimestampMillisecond(None, tz),
350                TimeUnit::Microsecond => ScalarValue::TimestampMicrosecond(None, tz),
351                TimeUnit::Nanosecond => ScalarValue::TimestampNanosecond(None, tz),
352            })
353        }
354        ArrowDataType::Dictionary(k, v) => Ok(ScalarValue::Dictionary(
355            k.clone(),
356            Box::new(get_null_of_arrow_type(v)?),
357        )),
358        //Unsupported types...
359        ArrowDataType::Float16
360        | ArrowDataType::Decimal32(_, _)
361        | ArrowDataType::Decimal64(_, _)
362        | ArrowDataType::Decimal256(_, _)
363        | ArrowDataType::Union(_, _)
364        | ArrowDataType::LargeList(_)
365        | ArrowDataType::Struct(_)
366        | ArrowDataType::List(_)
367        | ArrowDataType::FixedSizeList(_, _)
368        | ArrowDataType::Time32(_)
369        | ArrowDataType::Time64(_)
370        | ArrowDataType::Duration(_)
371        | ArrowDataType::Interval(_)
372        | ArrowDataType::RunEndEncoded(_, _)
373        | ArrowDataType::BinaryView
374        | ArrowDataType::Utf8View
375        | ArrowDataType::LargeListView(_)
376        | ArrowDataType::ListView(_)
377        | ArrowDataType::Map(_, _) => Err(DeltaTableError::Generic(format!(
378            "Unsupported data type for Delta Lake {t}"
379        ))),
380    }
381}
382
383fn parse_date(
384    stat_val: &serde_json::Value,
385    field_dt: &ArrowDataType,
386) -> DataFusionResult<ScalarValue> {
387    let string = match stat_val {
388        serde_json::Value::String(s) => s.to_owned(),
389        _ => stat_val.to_string(),
390    };
391
392    let time_micro = ScalarValue::try_from_string(string, &ArrowDataType::Date32)?;
393    let cast_arr = cast_with_options(
394        &time_micro.to_array()?,
395        field_dt,
396        &CastOptions {
397            safe: false,
398            ..Default::default()
399        },
400    )?;
401    ScalarValue::try_from_array(&cast_arr, 0)
402}
403
404fn parse_timestamp(
405    stat_val: &serde_json::Value,
406    field_dt: &ArrowDataType,
407) -> DataFusionResult<ScalarValue> {
408    let string = match stat_val {
409        serde_json::Value::String(s) => s.to_owned(),
410        _ => stat_val.to_string(),
411    };
412
413    let time_micro = ScalarValue::try_from_string(
414        string,
415        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
416    )?;
417    let cast_arr = cast_with_options(
418        &time_micro.to_array()?,
419        field_dt,
420        &CastOptions {
421            safe: false,
422            ..Default::default()
423        },
424    )?;
425    ScalarValue::try_from_array(&cast_arr, 0)
426}
427
428pub(crate) fn to_correct_scalar_value(
429    stat_val: &serde_json::Value,
430    field_dt: &ArrowDataType,
431) -> DataFusionResult<Option<ScalarValue>> {
432    match stat_val {
433        serde_json::Value::Array(_) => Ok(None),
434        serde_json::Value::Object(_) => Ok(None),
435        serde_json::Value::Null => Ok(Some(get_null_of_arrow_type(field_dt)?)),
436        serde_json::Value::String(string_val) => match field_dt {
437            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
438            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
439            _ => Ok(Some(ScalarValue::try_from_string(
440                string_val.to_owned(),
441                field_dt,
442            )?)),
443        },
444        other => match field_dt {
445            ArrowDataType::Timestamp(_, _) => Ok(Some(parse_timestamp(stat_val, field_dt)?)),
446            ArrowDataType::Date32 => Ok(Some(parse_date(stat_val, field_dt)?)),
447            _ => Ok(Some(ScalarValue::try_from_string(
448                other.to_string(),
449                field_dt,
450            )?)),
451        },
452    }
453}
454
455/// Responsible for checking batches of data conform to table's invariants, constraints and nullability.
456#[derive(Clone, Default)]
457pub struct DeltaDataChecker {
458    constraints: Vec<Constraint>,
459    invariants: Vec<Invariant>,
460    generated_columns: Vec<GeneratedColumn>,
461    non_nullable_columns: Vec<String>,
462    ctx: SessionContext,
463}
464
465impl DeltaDataChecker {
466    /// Create a new DeltaDataChecker with no invariants or constraints
467    pub fn empty() -> Self {
468        Self {
469            invariants: vec![],
470            constraints: vec![],
471            generated_columns: vec![],
472            non_nullable_columns: vec![],
473            ctx: DeltaSessionContext::default().into(),
474        }
475    }
476
477    /// Create a new DeltaDataChecker with a specified set of invariants
478    pub fn new_with_invariants(invariants: Vec<Invariant>) -> Self {
479        Self {
480            invariants,
481            constraints: vec![],
482            generated_columns: vec![],
483            non_nullable_columns: vec![],
484            ctx: DeltaSessionContext::default().into(),
485        }
486    }
487
488    /// Create a new DeltaDataChecker with a specified set of constraints
489    pub fn new_with_constraints(constraints: Vec<Constraint>) -> Self {
490        Self {
491            constraints,
492            invariants: vec![],
493            generated_columns: vec![],
494            non_nullable_columns: vec![],
495            ctx: DeltaSessionContext::default().into(),
496        }
497    }
498
499    /// Create a new DeltaDataChecker with a specified set of generated columns
500    pub fn new_with_generated_columns(generated_columns: Vec<GeneratedColumn>) -> Self {
501        Self {
502            constraints: vec![],
503            invariants: vec![],
504            generated_columns,
505            non_nullable_columns: vec![],
506            ctx: DeltaSessionContext::default().into(),
507        }
508    }
509
510    /// Specify the Datafusion context
511    pub fn with_session_context(mut self, context: SessionContext) -> Self {
512        self.ctx = context;
513        self
514    }
515
516    /// Add the specified set of constraints to the current DeltaDataChecker's constraints
517    pub fn with_extra_constraints(mut self, constraints: Vec<Constraint>) -> Self {
518        self.constraints.extend(constraints);
519        self
520    }
521
522    /// Create a new DeltaDataChecker
523    pub fn new(snapshot: &EagerSnapshot) -> Self {
524        let invariants = snapshot.schema().get_invariants().unwrap_or_default();
525        let generated_columns = snapshot
526            .schema()
527            .get_generated_columns()
528            .unwrap_or_default();
529        let constraints = snapshot.table_properties().get_constraints();
530        let non_nullable_columns = snapshot
531            .schema()
532            .fields()
533            .filter_map(|f| {
534                if !f.is_nullable() {
535                    Some(f.name().clone())
536                } else {
537                    None
538                }
539            })
540            .collect_vec();
541        Self {
542            invariants,
543            constraints,
544            generated_columns,
545            non_nullable_columns,
546            ctx: DeltaSessionContext::default().into(),
547        }
548    }
549
550    /// Check that a record batch conforms to table's invariants.
551    ///
552    /// If it does not, it will return [DeltaTableError::InvalidData] with a list
553    /// of values that violated each invariant.
554    pub async fn check_batch(&self, record_batch: &RecordBatch) -> Result<(), DeltaTableError> {
555        self.check_nullability(record_batch)?;
556        self.enforce_checks(record_batch, &self.invariants).await?;
557        self.enforce_checks(record_batch, &self.constraints).await?;
558        self.enforce_checks(record_batch, &self.generated_columns)
559            .await
560    }
561
562    /// Return true if all the nullability checks are valid
563    fn check_nullability(&self, record_batch: &RecordBatch) -> Result<bool, DeltaTableError> {
564        let mut violations = Vec::with_capacity(self.non_nullable_columns.len());
565        for col in self.non_nullable_columns.iter() {
566            if let Some(arr) = record_batch.column_by_name(col) {
567                if arr.null_count() > 0 {
568                    violations.push(format!(
569                        "Non-nullable column violation for {col}, found {} null values",
570                        arr.null_count()
571                    ));
572                }
573            } else {
574                violations.push(format!(
575                    "Non-nullable column violation for {col}, not found in batch!"
576                ));
577            }
578        }
579        if !violations.is_empty() {
580            Err(DeltaTableError::InvalidData { violations })
581        } else {
582            Ok(true)
583        }
584    }
585
586    async fn enforce_checks<C: DataCheck>(
587        &self,
588        record_batch: &RecordBatch,
589        checks: &[C],
590    ) -> Result<(), DeltaTableError> {
591        if checks.is_empty() {
592            return Ok(());
593        }
594        let table = MemTable::try_new(record_batch.schema(), vec![vec![record_batch.clone()]])?;
595        table.schema();
596        // Use a random table name to avoid clashes when running multiple parallel tasks, e.g. when using a partitioned table
597        let table_name: String = uuid::Uuid::new_v4().to_string();
598        self.ctx.register_table(&table_name, Arc::new(table))?;
599
600        let mut violations: Vec<String> = Vec::with_capacity(checks.len());
601
602        for check in checks {
603            if check.get_name().contains('.') {
604                return Err(DeltaTableError::Generic(
605                    "delta constraints for nested columns are not supported at the moment."
606                        .to_string(),
607                ));
608            }
609
610            let field_to_select = if check.as_any().is::<Constraint>() {
611                "*"
612            } else {
613                check.get_name()
614            };
615            let sql = format!(
616                "SELECT {} FROM `{table_name}` WHERE NOT ({}) LIMIT 1",
617                field_to_select,
618                check.get_expression()
619            );
620
621            let dfs: Vec<RecordBatch> = self.ctx.sql(&sql).await?.collect().await?;
622            if !dfs.is_empty() && dfs[0].num_rows() > 0 {
623                let value: String = dfs[0]
624                    .columns()
625                    .iter()
626                    .map(|c| array_value_to_string(c, 0).unwrap_or(String::from("null")))
627                    .join(", ");
628
629                let msg = format!(
630                    "Check or Invariant ({}) violated by value in row: [{value}]",
631                    check.get_expression(),
632                );
633                violations.push(msg);
634            }
635        }
636
637        self.ctx.deregister_table(&table_name)?;
638        if !violations.is_empty() {
639            Err(DeltaTableError::InvalidData { violations })
640        } else {
641            Ok(())
642        }
643    }
644}
645
646/// A codec for deltalake physical plans
647#[derive(Debug)]
648pub struct DeltaPhysicalCodec {}
649
650impl PhysicalExtensionCodec for DeltaPhysicalCodec {
651    fn try_decode(
652        &self,
653        buf: &[u8],
654        inputs: &[Arc<dyn ExecutionPlan>],
655        _registry: &TaskContext,
656    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
657        let wire: DeltaScanWire = serde_json::from_reader(buf)
658            .map_err(|_| DataFusionError::Internal("Unable to decode DeltaScan".to_string()))?;
659        let delta_scan = DeltaScan::new(
660            &wire.table_url,
661            wire.config,
662            (*inputs)[0].clone(),
663            wire.logical_schema,
664        );
665        Ok(Arc::new(delta_scan))
666    }
667
668    fn try_encode(
669        &self,
670        node: Arc<dyn ExecutionPlan>,
671        buf: &mut Vec<u8>,
672    ) -> Result<(), DataFusionError> {
673        let delta_scan = node
674            .as_any()
675            .downcast_ref::<DeltaScan>()
676            .ok_or_else(|| DataFusionError::Internal("Not a delta scan!".to_string()))?;
677
678        let wire = DeltaScanWire::from(delta_scan);
679        serde_json::to_writer(buf, &wire)
680            .map_err(|_| DataFusionError::Internal("Unable to encode delta scan!".to_string()))?;
681        Ok(())
682    }
683}
684
685/// Does serde on DeltaTables
686#[derive(Debug)]
687pub struct DeltaLogicalCodec {}
688
689impl LogicalExtensionCodec for DeltaLogicalCodec {
690    fn try_decode(
691        &self,
692        _buf: &[u8],
693        _inputs: &[LogicalPlan],
694        _ctx: &TaskContext,
695    ) -> Result<Extension, DataFusionError> {
696        todo!("DeltaLogicalCodec")
697    }
698
699    fn try_encode(&self, _node: &Extension, _buf: &mut Vec<u8>) -> Result<(), DataFusionError> {
700        todo!("DeltaLogicalCodec")
701    }
702
703    fn try_decode_table_provider(
704        &self,
705        buf: &[u8],
706        _table_ref: &TableReference,
707        _schema: SchemaRef,
708        _ctx: &TaskContext,
709    ) -> Result<Arc<dyn TableProvider>, DataFusionError> {
710        let provider: DeltaTable = serde_json::from_slice(buf)
711            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))?;
712        Ok(Arc::new(provider))
713    }
714
715    fn try_encode_table_provider(
716        &self,
717        _table_ref: &TableReference,
718        node: Arc<dyn TableProvider>,
719        buf: &mut Vec<u8>,
720    ) -> Result<(), DataFusionError> {
721        let table = node
722            .as_ref()
723            .as_any()
724            .downcast_ref::<DeltaTable>()
725            .ok_or_else(|| {
726                DataFusionError::Internal("Can't encode non-delta tables".to_string())
727            })?;
728        serde_json::to_writer(buf, table)
729            .map_err(|_| DataFusionError::Internal("Error encoding delta table".to_string()))
730    }
731}
732
733/// Responsible for creating deltatables
734#[derive(Debug)]
735pub struct DeltaTableFactory {}
736
737#[async_trait::async_trait]
738impl TableProviderFactory for DeltaTableFactory {
739    async fn create(
740        &self,
741        _ctx: &dyn Session,
742        cmd: &CreateExternalTable,
743    ) -> datafusion::error::Result<Arc<dyn TableProvider>> {
744        let provider = if cmd.options.is_empty() {
745            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
746            open_table(table_url).await?
747        } else {
748            let table_url = ensure_table_uri(&cmd.to_owned().location)?;
749            open_table_with_storage_options(table_url, cmd.to_owned().options).await?
750        };
751        Ok(Arc::new(provider))
752    }
753}
754
755/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
756pub struct DeltaColumn {
757    inner: Column,
758}
759
760impl From<&str> for DeltaColumn {
761    fn from(c: &str) -> Self {
762        DeltaColumn {
763            inner: Column::from_qualified_name_ignore_case(c),
764        }
765    }
766}
767
768/// Create a column, cloning the string
769impl From<&String> for DeltaColumn {
770    fn from(c: &String) -> Self {
771        DeltaColumn {
772            inner: Column::from_qualified_name_ignore_case(c),
773        }
774    }
775}
776
777/// Create a column, reusing the existing string
778impl From<String> for DeltaColumn {
779    fn from(c: String) -> Self {
780        DeltaColumn {
781            inner: Column::from_qualified_name_ignore_case(c),
782        }
783    }
784}
785
786impl From<DeltaColumn> for Column {
787    fn from(value: DeltaColumn) -> Self {
788        value.inner
789    }
790}
791
792/// Create a column, reusing the existing datafusion column
793impl From<Column> for DeltaColumn {
794    fn from(c: Column) -> Self {
795        DeltaColumn { inner: c }
796    }
797}
798
799#[cfg(test)]
800mod tests {
801    use crate::logstore::ObjectStoreRef;
802    use crate::logstore::default_logstore::DefaultLogStore;
803    use crate::operations::write::SchemaMode;
804    use crate::writer::test_utils::get_delta_schema;
805    use arrow::array::StructArray;
806    use arrow::datatypes::{Field, Schema};
807    use arrow_array::cast::AsArray;
808    use bytes::Bytes;
809    use datafusion::assert_batches_sorted_eq;
810    use datafusion::config::TableParquetOptions;
811    use datafusion::datasource::physical_plan::{FileScanConfig, ParquetSource};
812    use datafusion::datasource::source::DataSourceExec;
813    use datafusion::logical_expr::lit;
814    use datafusion::physical_plan::empty::EmptyExec;
815    use datafusion::physical_plan::{ExecutionPlanVisitor, PhysicalExpr, visit_execution_plan};
816    use datafusion::prelude::{SessionConfig, col};
817    use datafusion_datasource::file::FileSource as _;
818    use datafusion_proto::physical_plan::AsExecutionPlan;
819    use datafusion_proto::protobuf;
820    use delta_kernel::path::{LogPathFileType, ParsedLogPath};
821    use delta_kernel::schema::ArrayType;
822    use futures::{StreamExt, stream::BoxStream};
823    use object_store::ObjectMeta;
824    use object_store::{
825        GetOptions, GetResult, ListResult, MultipartUpload, ObjectStore, PutMultipartOptions,
826        PutOptions, PutPayload, PutResult, path::Path,
827    };
828    use serde_json::json;
829    use std::fmt::{self, Debug, Display, Formatter};
830    use std::ops::Range;
831    use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel};
832
833    use super::*;
834
835    // test deserialization of serialized partition values.
836    // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
837    #[test]
838    fn test_to_correct_scalar_value() {
839        let reference_pairs = &[
840            (
841                json!("2015"),
842                ArrowDataType::Int16,
843                ScalarValue::Int16(Some(2015)),
844            ),
845            (
846                json!("2015"),
847                ArrowDataType::Int32,
848                ScalarValue::Int32(Some(2015)),
849            ),
850            (
851                json!("2015"),
852                ArrowDataType::Int64,
853                ScalarValue::Int64(Some(2015)),
854            ),
855            (
856                json!("2015"),
857                ArrowDataType::Float32,
858                ScalarValue::Float32(Some(2015_f32)),
859            ),
860            (
861                json!("2015"),
862                ArrowDataType::Float64,
863                ScalarValue::Float64(Some(2015_f64)),
864            ),
865            (
866                json!(2015),
867                ArrowDataType::Float64,
868                ScalarValue::Float64(Some(2015_f64)),
869            ),
870            (
871                json!("2015-01-01"),
872                ArrowDataType::Date32,
873                ScalarValue::Date32(Some(16436)),
874            ),
875            // (
876            //     json!("2015-01-01"),
877            //     ArrowDataType::Date64,
878            //     ScalarValue::Date64(Some(16436)),
879            // ),
880            // TODO(roeap) there seem to be differences in how precisions are handled locally and in CI, need to investigate
881            // (
882            //     json!("2020-09-08 13:42:29"),
883            //     ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
884            //     ScalarValue::TimestampNanosecond(Some(1599565349000000000), None),
885            // ),
886            // (
887            //     json!("2020-09-08 13:42:29"),
888            //     ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
889            //     ScalarValue::TimestampMicrosecond(Some(1599565349000000), None),
890            // ),
891            // (
892            //     json!("2020-09-08 13:42:29"),
893            //     ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
894            //     ScalarValue::TimestampMillisecond(Some(1599565349000), None),
895            // ),
896            (
897                json!(true),
898                ArrowDataType::Boolean,
899                ScalarValue::Boolean(Some(true)),
900            ),
901        ];
902
903        for (raw, data_type, ref_scalar) in reference_pairs {
904            let scalar = to_correct_scalar_value(raw, data_type).unwrap().unwrap();
905            assert_eq!(*ref_scalar, scalar)
906        }
907    }
908
909    #[tokio::test]
910    async fn test_enforce_invariants() {
911        let schema = Arc::new(Schema::new(vec![
912            Field::new("a", ArrowDataType::Utf8, false),
913            Field::new("b", ArrowDataType::Int32, false),
914        ]));
915        let batch = RecordBatch::try_new(
916            Arc::clone(&schema),
917            vec![
918                Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
919                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
920            ],
921        )
922        .unwrap();
923        // Empty invariants is okay
924        let invariants: Vec<Invariant> = vec![];
925        assert!(
926            DeltaDataChecker::new_with_invariants(invariants)
927                .check_batch(&batch)
928                .await
929                .is_ok()
930        );
931
932        // Valid invariants return Ok(())
933        let invariants = vec![
934            Invariant::new("a", "a is not null"),
935            Invariant::new("b", "b < 1000"),
936        ];
937        assert!(
938            DeltaDataChecker::new_with_invariants(invariants)
939                .check_batch(&batch)
940                .await
941                .is_ok()
942        );
943
944        // Violated invariants returns an error with list of violations
945        let invariants = vec![
946            Invariant::new("a", "a is null"),
947            Invariant::new("b", "b < 100"),
948        ];
949        let result = DeltaDataChecker::new_with_invariants(invariants)
950            .check_batch(&batch)
951            .await;
952        assert!(result.is_err());
953        assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
954        if let Err(DeltaTableError::InvalidData { violations }) = result {
955            assert_eq!(violations.len(), 2);
956        }
957
958        // Irrelevant invariants return a different error
959        let invariants = vec![Invariant::new("c", "c > 2000")];
960        let result = DeltaDataChecker::new_with_invariants(invariants)
961            .check_batch(&batch)
962            .await;
963        assert!(result.is_err());
964
965        // Nested invariants are unsupported
966        let struct_fields = schema.fields().clone();
967        let schema = Arc::new(Schema::new(vec![Field::new(
968            "x",
969            ArrowDataType::Struct(struct_fields),
970            false,
971        )]));
972        let inner = Arc::new(StructArray::from(batch));
973        let batch = RecordBatch::try_new(schema, vec![inner]).unwrap();
974
975        let invariants = vec![Invariant::new("x.b", "x.b < 1000")];
976        let result = DeltaDataChecker::new_with_invariants(invariants)
977            .check_batch(&batch)
978            .await;
979        assert!(result.is_err());
980        assert!(matches!(result, Err(DeltaTableError::Generic { .. })));
981    }
982
983    #[tokio::test]
984    async fn test_enforce_constraints() {
985        let schema = Arc::new(Schema::new(vec![
986            Field::new("a", ArrowDataType::Utf8, false),
987            Field::new("b", ArrowDataType::Int32, false),
988        ]));
989        let batch = RecordBatch::try_new(
990            Arc::clone(&schema),
991            vec![
992                Arc::new(arrow::array::StringArray::from(vec!["a", "b", "c", "d"])),
993                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
994            ],
995        )
996        .unwrap();
997        // Empty constraints is okay
998        let constraints: Vec<Constraint> = vec![];
999        assert!(
1000            DeltaDataChecker::new_with_constraints(constraints)
1001                .check_batch(&batch)
1002                .await
1003                .is_ok()
1004        );
1005
1006        // Valid invariants return Ok(())
1007        let constraints = vec![
1008            Constraint::new("custom_a", "a is not null"),
1009            Constraint::new("custom_b", "b < 1000"),
1010        ];
1011        assert!(
1012            DeltaDataChecker::new_with_constraints(constraints)
1013                .check_batch(&batch)
1014                .await
1015                .is_ok()
1016        );
1017
1018        // Violated invariants returns an error with list of violations
1019        let constraints = vec![
1020            Constraint::new("custom_a", "a is null"),
1021            Constraint::new("custom_B", "b < 100"),
1022        ];
1023        let result = DeltaDataChecker::new_with_constraints(constraints)
1024            .check_batch(&batch)
1025            .await;
1026        assert!(result.is_err());
1027        assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
1028        if let Err(DeltaTableError::InvalidData { violations }) = result {
1029            assert_eq!(violations.len(), 2);
1030        }
1031
1032        // Irrelevant constraints return a different error
1033        let constraints = vec![Constraint::new("custom_c", "c > 2000")];
1034        let result = DeltaDataChecker::new_with_constraints(constraints)
1035            .check_batch(&batch)
1036            .await;
1037        assert!(result.is_err());
1038    }
1039
1040    /// Ensure that constraints when there are spaces in the field name still work
1041    ///
1042    /// See <https://github.com/delta-io/delta-rs/pull/3374>
1043    #[tokio::test]
1044    async fn test_constraints_with_spacey_fields() -> DeltaResult<()> {
1045        let schema = Arc::new(Schema::new(vec![
1046            Field::new("a", ArrowDataType::Utf8, false),
1047            Field::new("b bop", ArrowDataType::Int32, false),
1048        ]));
1049        let batch = RecordBatch::try_new(
1050            Arc::clone(&schema),
1051            vec![
1052                Arc::new(arrow::array::StringArray::from(vec![
1053                    "a", "b bop", "c", "d",
1054                ])),
1055                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 10, 100])),
1056            ],
1057        )?;
1058
1059        // Valid invariants return Ok(())
1060        let constraints = vec![
1061            Constraint::new("custom a", "a is not null"),
1062            Constraint::new("custom_b", "`b bop` < 1000"),
1063        ];
1064        assert!(
1065            DeltaDataChecker::new_with_constraints(constraints)
1066                .check_batch(&batch)
1067                .await
1068                .is_ok()
1069        );
1070
1071        // Violated invariants returns an error with list of violations
1072        let constraints = vec![
1073            Constraint::new("custom_a", "a is null"),
1074            Constraint::new("custom_B", "\"b bop\" < 100"),
1075        ];
1076        let result = DeltaDataChecker::new_with_constraints(constraints)
1077            .check_batch(&batch)
1078            .await;
1079        assert!(result.is_err());
1080        assert!(matches!(result, Err(DeltaTableError::InvalidData { .. })));
1081        if let Err(DeltaTableError::InvalidData { violations }) = result {
1082            assert_eq!(violations.len(), 2);
1083        }
1084
1085        // Irrelevant constraints return a different error
1086        let constraints = vec![Constraint::new("custom_c", "c > 2000")];
1087        let result = DeltaDataChecker::new_with_constraints(constraints)
1088            .check_batch(&batch)
1089            .await;
1090        assert!(result.is_err());
1091        Ok(())
1092    }
1093
1094    #[test]
1095    fn roundtrip_test_delta_exec_plan() {
1096        let ctx = SessionContext::new();
1097        let codec = DeltaPhysicalCodec {};
1098
1099        let schema = Arc::new(Schema::new(vec![
1100            Field::new("a", ArrowDataType::Utf8, false),
1101            Field::new("b", ArrowDataType::Int32, false),
1102        ]));
1103        let exec_plan = Arc::from(DeltaScan::new(
1104            &Url::parse("s3://my_bucket/this/is/some/path").unwrap(),
1105            DeltaScanConfig::default(),
1106            Arc::from(EmptyExec::new(schema.clone())),
1107            schema.clone(),
1108        ));
1109        let proto: protobuf::PhysicalPlanNode =
1110            protobuf::PhysicalPlanNode::try_from_physical_plan(exec_plan.clone(), &codec)
1111                .expect("to proto");
1112
1113        let task_ctx = ctx.task_ctx();
1114        let result_exec_plan: Arc<dyn ExecutionPlan> = proto
1115            .try_into_physical_plan(&task_ctx, &codec)
1116            .expect("from proto");
1117        assert_eq!(format!("{exec_plan:?}"), format!("{result_exec_plan:?}"));
1118    }
1119
1120    #[tokio::test]
1121    async fn delta_table_provider_with_config() {
1122        let table_path = std::path::Path::new("../test/tests/data/delta-2.2.0-partitioned-types")
1123            .canonicalize()
1124            .unwrap();
1125        let table_url = url::Url::from_directory_path(table_path).unwrap();
1126        let table = crate::open_table(table_url).await.unwrap();
1127        let config = DeltaScanConfigBuilder::new()
1128            .with_file_column_name(&"file_source")
1129            .build(table.snapshot().unwrap().snapshot())
1130            .unwrap();
1131
1132        let log_store = table.log_store();
1133        let provider = DeltaTableProvider::try_new(
1134            table.snapshot().unwrap().snapshot().clone(),
1135            log_store,
1136            config,
1137        )
1138        .unwrap();
1139        let ctx = SessionContext::new();
1140        ctx.register_table("test", Arc::new(provider)).unwrap();
1141
1142        let df = ctx.sql("select * from test").await.unwrap();
1143        let actual = df.collect().await.unwrap();
1144        let expected = vec![
1145            "+----+----+----+-------------------------------------------------------------------------------+",
1146            "| c3 | c1 | c2 | file_source                                                                   |",
1147            "+----+----+----+-------------------------------------------------------------------------------+",
1148            "| 4  | 6  | a  | c1=6/c2=a/part-00011-10619b10-b691-4fd0-acc4-2a9608499d7c.c000.snappy.parquet |",
1149            "| 5  | 4  | c  | c1=4/c2=c/part-00003-f525f459-34f9-46f5-82d6-d42121d883fd.c000.snappy.parquet |",
1150            "| 6  | 5  | b  | c1=5/c2=b/part-00007-4e73fa3b-2c88-424a-8051-f8b54328ffdb.c000.snappy.parquet |",
1151            "+----+----+----+-------------------------------------------------------------------------------+",
1152        ];
1153        assert_batches_sorted_eq!(&expected, &actual);
1154    }
1155
1156    #[tokio::test]
1157    async fn delta_scan_mixed_partition_order() {
1158        // Tests issue (1787) where partition columns were incorrect when they
1159        // have a different order in the metadata and table schema
1160        let schema = Arc::new(ArrowSchema::new(vec![
1161            Field::new("modified", ArrowDataType::Utf8, true),
1162            Field::new("id", ArrowDataType::Utf8, true),
1163            Field::new("value", ArrowDataType::Int32, true),
1164        ]));
1165
1166        let table = DeltaTable::new_in_memory()
1167            .create()
1168            .with_columns(get_delta_schema().fields().cloned())
1169            .with_partition_columns(["modified", "id"])
1170            .await
1171            .unwrap();
1172        assert_eq!(table.version(), Some(0));
1173
1174        let batch = RecordBatch::try_new(
1175            schema.clone(),
1176            vec![
1177                Arc::new(arrow::array::StringArray::from(vec![
1178                    "2021-02-01",
1179                    "2021-02-01",
1180                    "2021-02-02",
1181                    "2021-02-02",
1182                ])),
1183                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1184                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1185            ],
1186        )
1187        .unwrap();
1188        // write some data
1189        let table = table
1190            .write(vec![batch.clone()])
1191            .with_save_mode(crate::protocol::SaveMode::Append)
1192            .await
1193            .unwrap();
1194
1195        let config = DeltaScanConfigBuilder::new()
1196            .build(table.snapshot().unwrap().snapshot())
1197            .unwrap();
1198
1199        let log_store = table.log_store();
1200        let provider = DeltaTableProvider::try_new(
1201            table.snapshot().unwrap().snapshot().clone(),
1202            log_store,
1203            config,
1204        )
1205        .unwrap();
1206        let logical_schema = provider.schema();
1207        let ctx = SessionContext::new();
1208        ctx.register_table("test", Arc::new(provider)).unwrap();
1209
1210        let expected_logical_order = vec!["value", "modified", "id"];
1211        let actual_order: Vec<String> = logical_schema
1212            .fields()
1213            .iter()
1214            .map(|f| f.name().to_owned())
1215            .collect();
1216
1217        let df = ctx.sql("select * from test").await.unwrap();
1218        let actual = df.collect().await.unwrap();
1219        let expected = vec![
1220            "+-------+------------+----+",
1221            "| value | modified   | id |",
1222            "+-------+------------+----+",
1223            "| 1     | 2021-02-01 | A  |",
1224            "| 10    | 2021-02-01 | B  |",
1225            "| 100   | 2021-02-02 | D  |",
1226            "| 20    | 2021-02-02 | C  |",
1227            "+-------+------------+----+",
1228        ];
1229        assert_batches_sorted_eq!(&expected, &actual);
1230        assert_eq!(expected_logical_order, actual_order);
1231    }
1232
1233    #[tokio::test]
1234    async fn delta_scan_case_sensitive() {
1235        let schema = Arc::new(ArrowSchema::new(vec![
1236            Field::new("moDified", ArrowDataType::Utf8, true),
1237            Field::new("ID", ArrowDataType::Utf8, true),
1238            Field::new("vaLue", ArrowDataType::Int32, true),
1239        ]));
1240
1241        let batch = RecordBatch::try_new(
1242            schema.clone(),
1243            vec![
1244                Arc::new(arrow::array::StringArray::from(vec![
1245                    "2021-02-01",
1246                    "2021-02-01",
1247                    "2021-02-02",
1248                    "2021-02-02",
1249                ])),
1250                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1251                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1252            ],
1253        )
1254        .unwrap();
1255        // write some data
1256        let table = DeltaTable::new_in_memory()
1257            .write(vec![batch.clone()])
1258            .with_save_mode(crate::protocol::SaveMode::Append)
1259            .await
1260            .unwrap();
1261
1262        let config = DeltaScanConfigBuilder::new()
1263            .build(table.snapshot().unwrap().snapshot())
1264            .unwrap();
1265        let log = table.log_store();
1266
1267        let provider =
1268            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1269                .unwrap();
1270        let ctx: SessionContext = DeltaSessionContext::default().into();
1271        ctx.register_table("test", Arc::new(provider)).unwrap();
1272
1273        let df = ctx
1274            .sql("select ID, moDified, vaLue from test")
1275            .await
1276            .unwrap();
1277        let actual = df.collect().await.unwrap();
1278        let expected = vec![
1279            "+----+------------+-------+",
1280            "| ID | moDified   | vaLue |",
1281            "+----+------------+-------+",
1282            "| A  | 2021-02-01 | 1     |",
1283            "| B  | 2021-02-01 | 10    |",
1284            "| C  | 2021-02-02 | 20    |",
1285            "| D  | 2021-02-02 | 100   |",
1286            "+----+------------+-------+",
1287        ];
1288        assert_batches_sorted_eq!(&expected, &actual);
1289
1290        /* TODO: Datafusion doesn't have any options to prevent case-sensitivity with the col func */
1291        /*
1292        let df = ctx
1293            .table("test")
1294            .await
1295            .unwrap()
1296            .select(vec![col("ID"), col("moDified"), col("vaLue")])
1297            .unwrap();
1298        let actual = df.collect().await.unwrap();
1299        assert_batches_sorted_eq!(&expected, &actual);
1300        */
1301    }
1302
1303    #[tokio::test]
1304    async fn delta_scan_supports_missing_columns() {
1305        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1306            "col_1",
1307            ArrowDataType::Utf8,
1308            true,
1309        )]));
1310
1311        let batch1 = RecordBatch::try_new(
1312            schema1.clone(),
1313            vec![Arc::new(arrow::array::StringArray::from(vec![
1314                Some("A"),
1315                Some("B"),
1316            ]))],
1317        )
1318        .unwrap();
1319
1320        let schema2 = Arc::new(ArrowSchema::new(vec![
1321            Field::new("col_1", ArrowDataType::Utf8, true),
1322            Field::new("col_2", ArrowDataType::Utf8, true),
1323        ]));
1324
1325        let batch2 = RecordBatch::try_new(
1326            schema2.clone(),
1327            vec![
1328                Arc::new(arrow::array::StringArray::from(vec![
1329                    Some("E"),
1330                    Some("F"),
1331                    Some("G"),
1332                ])),
1333                Arc::new(arrow::array::StringArray::from(vec![
1334                    Some("E2"),
1335                    Some("F2"),
1336                    Some("G2"),
1337                ])),
1338            ],
1339        )
1340        .unwrap();
1341
1342        let table = DeltaTable::new_in_memory()
1343            .write(vec![batch2])
1344            .with_save_mode(crate::protocol::SaveMode::Append)
1345            .await
1346            .unwrap();
1347
1348        let table = table
1349            .write(vec![batch1])
1350            .with_schema_mode(SchemaMode::Merge)
1351            .with_save_mode(crate::protocol::SaveMode::Append)
1352            .await
1353            .unwrap();
1354
1355        let config = DeltaScanConfigBuilder::new()
1356            .build(table.snapshot().unwrap().snapshot())
1357            .unwrap();
1358        let log = table.log_store();
1359
1360        let provider =
1361            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1362                .unwrap();
1363        let ctx: SessionContext = DeltaSessionContext::default().into();
1364        ctx.register_table("test", Arc::new(provider)).unwrap();
1365
1366        let df = ctx.sql("select col_1, col_2 from test").await.unwrap();
1367        let actual = df.collect().await.unwrap();
1368        let expected = vec![
1369            "+-------+-------+",
1370            "| col_1 | col_2 |",
1371            "+-------+-------+",
1372            "| A     |       |",
1373            "| B     |       |",
1374            "| E     | E2    |",
1375            "| F     | F2    |",
1376            "| G     | G2    |",
1377            "+-------+-------+",
1378        ];
1379        assert_batches_sorted_eq!(&expected, &actual);
1380    }
1381
1382    #[tokio::test]
1383    async fn delta_scan_supports_pushdown() {
1384        let schema = Arc::new(ArrowSchema::new(vec![
1385            Field::new("col_1", ArrowDataType::Utf8, false),
1386            Field::new("col_2", ArrowDataType::Utf8, false),
1387        ]));
1388
1389        let batch = RecordBatch::try_new(
1390            schema.clone(),
1391            vec![
1392                Arc::new(arrow::array::StringArray::from(vec![
1393                    Some("A"),
1394                    Some("B"),
1395                    Some("C"),
1396                ])),
1397                Arc::new(arrow::array::StringArray::from(vec![
1398                    Some("A2"),
1399                    Some("B2"),
1400                    Some("C2"),
1401                ])),
1402            ],
1403        )
1404        .unwrap();
1405
1406        let table = DeltaTable::new_in_memory()
1407            .write(vec![batch])
1408            .with_save_mode(crate::protocol::SaveMode::Append)
1409            .await
1410            .unwrap();
1411
1412        let config = DeltaScanConfigBuilder::new()
1413            .build(table.snapshot().unwrap().snapshot())
1414            .unwrap();
1415        let log = table.log_store();
1416
1417        let provider =
1418            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1419                .unwrap();
1420
1421        let mut cfg = SessionConfig::default();
1422        cfg.options_mut().execution.parquet.pushdown_filters = true;
1423        let ctx = SessionContext::new_with_config(cfg);
1424        ctx.register_table("test", Arc::new(provider)).unwrap();
1425
1426        let df = ctx
1427            .sql("select col_1, col_2 from test WHERE col_1 = 'A'")
1428            .await
1429            .unwrap();
1430        let actual = df.collect().await.unwrap();
1431        let expected = vec![
1432            "+-------+-------+",
1433            "| col_1 | col_2 |",
1434            "+-------+-------+",
1435            "| A     | A2    |",
1436            "+-------+-------+",
1437        ];
1438        assert_batches_sorted_eq!(&expected, &actual);
1439    }
1440
1441    #[tokio::test]
1442    async fn delta_scan_supports_nested_missing_columns() {
1443        let column1_schema1: arrow::datatypes::Fields =
1444            vec![Field::new("col_1a", ArrowDataType::Utf8, true)].into();
1445        let schema1 = Arc::new(ArrowSchema::new(vec![Field::new(
1446            "col_1",
1447            ArrowDataType::Struct(column1_schema1.clone()),
1448            true,
1449        )]));
1450
1451        let batch1 = RecordBatch::try_new(
1452            schema1.clone(),
1453            vec![Arc::new(StructArray::new(
1454                column1_schema1,
1455                vec![Arc::new(arrow::array::StringArray::from(vec![
1456                    Some("A"),
1457                    Some("B"),
1458                ]))],
1459                None,
1460            ))],
1461        )
1462        .unwrap();
1463
1464        let column1_schema2: arrow_schema::Fields = vec![
1465            Field::new("col_1a", ArrowDataType::Utf8, true),
1466            Field::new("col_1b", ArrowDataType::Utf8, true),
1467        ]
1468        .into();
1469        let schema2 = Arc::new(ArrowSchema::new(vec![Field::new(
1470            "col_1",
1471            ArrowDataType::Struct(column1_schema2.clone()),
1472            true,
1473        )]));
1474
1475        let batch2 = RecordBatch::try_new(
1476            schema2.clone(),
1477            vec![Arc::new(StructArray::new(
1478                column1_schema2,
1479                vec![
1480                    Arc::new(arrow::array::StringArray::from(vec![
1481                        Some("E"),
1482                        Some("F"),
1483                        Some("G"),
1484                    ])),
1485                    Arc::new(arrow::array::StringArray::from(vec![
1486                        Some("E2"),
1487                        Some("F2"),
1488                        Some("G2"),
1489                    ])),
1490                ],
1491                None,
1492            ))],
1493        )
1494        .unwrap();
1495
1496        let table = DeltaTable::new_in_memory()
1497            .write(vec![batch1])
1498            .with_save_mode(crate::protocol::SaveMode::Append)
1499            .await
1500            .unwrap();
1501
1502        let table = table
1503            .write(vec![batch2])
1504            .with_schema_mode(SchemaMode::Merge)
1505            .with_save_mode(crate::protocol::SaveMode::Append)
1506            .await
1507            .unwrap();
1508
1509        let config = DeltaScanConfigBuilder::new()
1510            .build(table.snapshot().unwrap().snapshot())
1511            .unwrap();
1512        let log = table.log_store();
1513
1514        let provider =
1515            DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config)
1516                .unwrap();
1517        let ctx: SessionContext = DeltaSessionContext::default().into();
1518        ctx.register_table("test", Arc::new(provider)).unwrap();
1519
1520        let df = ctx
1521            .sql("select col_1.col_1a, col_1.col_1b from test")
1522            .await
1523            .unwrap();
1524        let actual = df.collect().await.unwrap();
1525        let expected = vec![
1526            "+--------------------+--------------------+",
1527            "| test.col_1[col_1a] | test.col_1[col_1b] |",
1528            "+--------------------+--------------------+",
1529            "| A                  |                    |",
1530            "| B                  |                    |",
1531            "| E                  | E2                 |",
1532            "| F                  | F2                 |",
1533            "| G                  | G2                 |",
1534            "+--------------------+--------------------+",
1535        ];
1536        assert_batches_sorted_eq!(&expected, &actual);
1537    }
1538
1539    #[tokio::test]
1540    async fn test_multiple_predicate_pushdown() {
1541        use crate::datafusion::prelude::SessionContext;
1542        let schema = Arc::new(ArrowSchema::new(vec![
1543            Field::new("moDified", ArrowDataType::Utf8, true),
1544            Field::new("id", ArrowDataType::Utf8, true),
1545            Field::new("vaLue", ArrowDataType::Int32, true),
1546        ]));
1547
1548        let batch = RecordBatch::try_new(
1549            schema.clone(),
1550            vec![
1551                Arc::new(arrow::array::StringArray::from(vec![
1552                    "2021-02-01",
1553                    "2021-02-01",
1554                    "2021-02-02",
1555                    "2021-02-02",
1556                ])),
1557                Arc::new(arrow::array::StringArray::from(vec!["A", "B", "C", "D"])),
1558                Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])),
1559            ],
1560        )
1561        .unwrap();
1562        // write some data
1563        let table = DeltaTable::new_in_memory()
1564            .write(vec![batch.clone()])
1565            .with_save_mode(crate::protocol::SaveMode::Append)
1566            .await
1567            .unwrap();
1568
1569        let datafusion = SessionContext::new();
1570        let table = Arc::new(table);
1571
1572        datafusion.register_table("snapshot", table).unwrap();
1573
1574        let df = datafusion
1575            .sql("select * from snapshot where id > 10000 and id < 20000")
1576            .await
1577            .unwrap();
1578
1579        df.collect().await.unwrap();
1580    }
1581
1582    #[tokio::test]
1583    async fn test_delta_scan_builder_no_scan_config() {
1584        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1585        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1586        let table = DeltaTable::new_in_memory()
1587            .write(vec![batch])
1588            .with_save_mode(crate::protocol::SaveMode::Append)
1589            .await
1590            .unwrap();
1591
1592        let ctx = SessionContext::new();
1593        let state = ctx.state();
1594        let scan = DeltaScanBuilder::new(
1595            table.snapshot().unwrap().snapshot(),
1596            table.log_store(),
1597            &state,
1598        )
1599        .with_filter(Some(col("a").eq(lit("s"))))
1600        .build()
1601        .await
1602        .unwrap();
1603
1604        let mut visitor = ParquetVisitor::default();
1605        visit_execution_plan(&scan, &mut visitor).unwrap();
1606
1607        assert_eq!(visitor.predicate.unwrap().to_string(), "a@0 = s");
1608    }
1609
1610    #[tokio::test]
1611    async fn test_delta_scan_builder_scan_config_disable_pushdown() {
1612        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1613        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1614        let table = DeltaTable::new_in_memory()
1615            .write(vec![batch])
1616            .with_save_mode(crate::protocol::SaveMode::Append)
1617            .await
1618            .unwrap();
1619
1620        let snapshot = table.snapshot().unwrap();
1621        let ctx = SessionContext::new();
1622        let state = ctx.state();
1623        let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1624            .with_filter(Some(col("a").eq(lit("s"))))
1625            .with_scan_config(
1626                DeltaScanConfigBuilder::new()
1627                    .with_parquet_pushdown(false)
1628                    .build(snapshot.snapshot())
1629                    .unwrap(),
1630            )
1631            .build()
1632            .await
1633            .unwrap();
1634
1635        let mut visitor = ParquetVisitor::default();
1636        visit_execution_plan(&scan, &mut visitor).unwrap();
1637
1638        assert!(visitor.predicate.is_none());
1639    }
1640
1641    #[tokio::test]
1642    async fn test_delta_scan_applies_parquet_options() {
1643        let arr: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["s"]));
1644        let batch = RecordBatch::try_from_iter_with_nullable(vec![("a", arr, false)]).unwrap();
1645        let table = DeltaTable::new_in_memory()
1646            .write(vec![batch])
1647            .with_save_mode(crate::protocol::SaveMode::Append)
1648            .await
1649            .unwrap();
1650
1651        let snapshot = table.snapshot().unwrap();
1652
1653        let mut config = SessionConfig::default();
1654        config.options_mut().execution.parquet.pushdown_filters = true;
1655        let ctx = SessionContext::new_with_config(config);
1656        let state = ctx.state();
1657
1658        let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state)
1659            .build()
1660            .await
1661            .unwrap();
1662
1663        let mut visitor = ParquetVisitor::default();
1664        visit_execution_plan(&scan, &mut visitor).unwrap();
1665
1666        assert_eq!(ctx.copied_table_options().parquet, visitor.options.unwrap());
1667    }
1668
1669    /// Extracts fields from the parquet scan
1670    #[derive(Default)]
1671    struct ParquetVisitor {
1672        predicate: Option<Arc<dyn PhysicalExpr>>,
1673        options: Option<TableParquetOptions>,
1674    }
1675
1676    impl ExecutionPlanVisitor for ParquetVisitor {
1677        type Error = DataFusionError;
1678
1679        fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
1680            let Some(datasource_exec) = plan.as_any().downcast_ref::<DataSourceExec>() else {
1681                return Ok(true);
1682            };
1683
1684            let Some(scan_config) = datasource_exec
1685                .data_source()
1686                .as_any()
1687                .downcast_ref::<FileScanConfig>()
1688            else {
1689                return Ok(true);
1690            };
1691
1692            if let Some(parquet_source) = scan_config
1693                .file_source
1694                .as_any()
1695                .downcast_ref::<ParquetSource>()
1696            {
1697                self.options = Some(parquet_source.table_parquet_options().clone());
1698                self.predicate = parquet_source.filter();
1699            }
1700
1701            Ok(true)
1702        }
1703    }
1704
1705    #[tokio::test]
1706    async fn passes_sanity_checker_when_all_files_filtered() {
1707        // Run a query that filters out all files and sorts.
1708        // Verify that it returns an empty set of rows without panicking.
1709        //
1710        // Historically, we had a bug that caused us to emit a query plan with 0 partitions, which
1711        // datafusion rejected.
1712        let table_path = std::path::Path::new("../test/tests/data/delta-2.2.0-partitioned-types")
1713            .canonicalize()
1714            .unwrap();
1715        let table_url = url::Url::from_directory_path(table_path).unwrap();
1716        let table = crate::open_table(table_url).await.unwrap();
1717        let ctx = SessionContext::new();
1718        ctx.register_table("test", Arc::new(table)).unwrap();
1719
1720        let df = ctx
1721            .sql("select * from test where c3 = 100 ORDER BY c1 ASC")
1722            .await
1723            .unwrap();
1724        let actual = df.collect().await.unwrap();
1725
1726        assert_eq!(actual.len(), 0);
1727    }
1728
1729    #[tokio::test]
1730    async fn test_check_nullability() -> DeltaResult<()> {
1731        use arrow::array::StringArray;
1732
1733        let data_checker = DeltaDataChecker {
1734            non_nullable_columns: vec!["zed".to_string(), "yap".to_string()],
1735            ..Default::default()
1736        };
1737
1738        let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
1739        let nulls: Arc<dyn Array> = Arc::new(StringArray::new_null(1));
1740        let batch = RecordBatch::try_from_iter(vec![("a", arr), ("zed", nulls)]).unwrap();
1741
1742        let result = data_checker.check_nullability(&batch);
1743        assert!(
1744            result.is_err(),
1745            "The result should have errored! {result:?}"
1746        );
1747
1748        let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
1749        let batch = RecordBatch::try_from_iter(vec![("zed", arr)]).unwrap();
1750        let result = data_checker.check_nullability(&batch);
1751        assert!(
1752            result.is_err(),
1753            "The result should have errored! {result:?}"
1754        );
1755
1756        let arr: Arc<dyn Array> = Arc::new(StringArray::from(vec!["s"]));
1757        let batch = RecordBatch::try_from_iter(vec![("zed", arr.clone()), ("yap", arr)]).unwrap();
1758        let _ = data_checker.check_nullability(&batch)?;
1759
1760        Ok(())
1761    }
1762
1763    #[tokio::test]
1764    async fn test_delta_scan_uses_parquet_column_pruning() {
1765        let small: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec!["a"]));
1766        let large: Arc<dyn Array> = Arc::new(arrow::array::StringArray::from(vec![
1767            "b".repeat(1024).as_str(),
1768        ]));
1769        let batch = RecordBatch::try_from_iter(vec![("small", small), ("large", large)]).unwrap();
1770        let table = DeltaTable::new_in_memory()
1771            .write(vec![batch])
1772            .with_save_mode(crate::protocol::SaveMode::Append)
1773            .await
1774            .unwrap();
1775
1776        let config = DeltaScanConfigBuilder::new()
1777            .build(table.snapshot().unwrap().snapshot())
1778            .unwrap();
1779
1780        let (object_store, mut operations) =
1781            RecordingObjectStore::new(table.log_store().object_store(None));
1782        // this uses an in memory store pointing at root...
1783        let both_store = Arc::new(object_store);
1784        let log_store = DefaultLogStore::new(
1785            both_store.clone(),
1786            both_store,
1787            table.log_store().config().clone(),
1788        );
1789        let provider = DeltaTableProvider::try_new(
1790            table.snapshot().unwrap().snapshot().clone(),
1791            Arc::new(log_store),
1792            config,
1793        )
1794        .unwrap();
1795        let ctx = SessionContext::new();
1796        ctx.register_table("test", Arc::new(provider)).unwrap();
1797        let state = ctx.state();
1798
1799        let df = ctx.sql("select small from test").await.unwrap();
1800        let plan = df.create_physical_plan().await.unwrap();
1801
1802        let mut stream = plan.execute(0, state.task_ctx()).unwrap();
1803        let Some(Ok(batch)) = stream.next().await else {
1804            panic!()
1805        };
1806        assert!(stream.next().await.is_none());
1807        assert_eq!(1, batch.num_columns());
1808        assert_eq!(1, batch.num_rows());
1809        let small = batch.column_by_name("small").unwrap().as_string::<i32>();
1810        assert_eq!("a", small.iter().next().unwrap().unwrap());
1811
1812        let expected = vec![
1813            ObjectStoreOperation::Get(LocationType::Commit),
1814            ObjectStoreOperation::GetRange(LocationType::Data, 957..965),
1815            ObjectStoreOperation::GetRange(LocationType::Data, 326..957),
1816        ];
1817        let mut actual = Vec::new();
1818        operations.recv_many(&mut actual, 3).await;
1819        assert_eq!(expected, actual);
1820    }
1821
1822    #[tokio::test]
1823    async fn test_push_down_filter_panic_2602() -> DeltaResult<()> {
1824        use crate::kernel::schema::{DataType, PrimitiveType};
1825        let ctx = SessionContext::new();
1826        let table = DeltaTable::new_in_memory()
1827            .create()
1828            .with_column("id", DataType::Primitive(PrimitiveType::Long), true, None)
1829            .with_column(
1830                "name",
1831                DataType::Primitive(PrimitiveType::String),
1832                true,
1833                None,
1834            )
1835            .with_column("b", DataType::Primitive(PrimitiveType::Boolean), true, None)
1836            .with_column(
1837                "ts",
1838                DataType::Primitive(PrimitiveType::Timestamp),
1839                true,
1840                None,
1841            )
1842            .with_column("dt", DataType::Primitive(PrimitiveType::Date), true, None)
1843            .with_column(
1844                "zap",
1845                DataType::Array(Box::new(ArrayType::new(
1846                    DataType::Primitive(PrimitiveType::Boolean),
1847                    true,
1848                ))),
1849                true,
1850                None,
1851            )
1852            .await?;
1853
1854        ctx.register_table("snapshot", Arc::new(table)).unwrap();
1855
1856        let df = ctx
1857            .sql("select * from snapshot where id > 10000 and id < 20000")
1858            .await
1859            .unwrap();
1860
1861        let _ = df.collect().await?;
1862        Ok(())
1863    }
1864
1865    /// Records operations made by the inner object store on a channel obtained at construction
1866    struct RecordingObjectStore {
1867        inner: ObjectStoreRef,
1868        operations: UnboundedSender<ObjectStoreOperation>,
1869    }
1870
1871    impl RecordingObjectStore {
1872        /// Returns an object store and a channel recording all operations made by the inner object store
1873        fn new(inner: ObjectStoreRef) -> (Self, UnboundedReceiver<ObjectStoreOperation>) {
1874            let (operations, operations_receiver) = unbounded_channel();
1875            (Self { inner, operations }, operations_receiver)
1876        }
1877    }
1878
1879    impl Display for RecordingObjectStore {
1880        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1881            Display::fmt(&self.inner, f)
1882        }
1883    }
1884
1885    impl Debug for RecordingObjectStore {
1886        fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1887            Debug::fmt(&self.inner, f)
1888        }
1889    }
1890
1891    #[derive(Debug, PartialEq)]
1892    enum ObjectStoreOperation {
1893        GetRanges(LocationType, Vec<Range<u64>>),
1894        GetRange(LocationType, Range<u64>),
1895        GetOpts(LocationType),
1896        Get(LocationType),
1897    }
1898
1899    #[derive(Debug, PartialEq)]
1900    enum LocationType {
1901        Data,
1902        Commit,
1903    }
1904
1905    impl From<&Path> for LocationType {
1906        fn from(value: &Path) -> Self {
1907            let dummy_url = Url::parse("dummy:///").unwrap();
1908            let parsed = ParsedLogPath::try_from(dummy_url.join(value.as_ref()).unwrap()).unwrap();
1909            if let Some(parsed) = parsed
1910                && matches!(parsed.file_type, LogPathFileType::Commit)
1911            {
1912                return LocationType::Commit;
1913            }
1914            if value.to_string().starts_with("part-") {
1915                LocationType::Data
1916            } else {
1917                panic!("Unknown location type: {value:?}")
1918            }
1919        }
1920    }
1921
1922    // Currently only read operations are recorded. Extend as necessary.
1923    #[async_trait::async_trait]
1924    impl ObjectStore for RecordingObjectStore {
1925        async fn put(
1926            &self,
1927            location: &Path,
1928            payload: PutPayload,
1929        ) -> object_store::Result<PutResult> {
1930            self.inner.put(location, payload).await
1931        }
1932
1933        async fn put_opts(
1934            &self,
1935            location: &Path,
1936            payload: PutPayload,
1937            opts: PutOptions,
1938        ) -> object_store::Result<PutResult> {
1939            self.inner.put_opts(location, payload, opts).await
1940        }
1941
1942        async fn put_multipart(
1943            &self,
1944            location: &Path,
1945        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1946            self.inner.put_multipart(location).await
1947        }
1948
1949        async fn put_multipart_opts(
1950            &self,
1951            location: &Path,
1952            opts: PutMultipartOptions,
1953        ) -> object_store::Result<Box<dyn MultipartUpload>> {
1954            self.inner.put_multipart_opts(location, opts).await
1955        }
1956
1957        async fn get(&self, location: &Path) -> object_store::Result<GetResult> {
1958            self.operations
1959                .send(ObjectStoreOperation::Get(location.into()))
1960                .unwrap();
1961            self.inner.get(location).await
1962        }
1963
1964        async fn get_opts(
1965            &self,
1966            location: &Path,
1967            options: GetOptions,
1968        ) -> object_store::Result<GetResult> {
1969            self.operations
1970                .send(ObjectStoreOperation::GetOpts(location.into()))
1971                .unwrap();
1972            self.inner.get_opts(location, options).await
1973        }
1974
1975        async fn get_range(
1976            &self,
1977            location: &Path,
1978            range: Range<u64>,
1979        ) -> object_store::Result<Bytes> {
1980            self.operations
1981                .send(ObjectStoreOperation::GetRange(
1982                    location.into(),
1983                    range.clone(),
1984                ))
1985                .unwrap();
1986            self.inner.get_range(location, range).await
1987        }
1988
1989        async fn get_ranges(
1990            &self,
1991            location: &Path,
1992            ranges: &[Range<u64>],
1993        ) -> object_store::Result<Vec<Bytes>> {
1994            self.operations
1995                .send(ObjectStoreOperation::GetRanges(
1996                    location.into(),
1997                    ranges.to_vec(),
1998                ))
1999                .unwrap();
2000            self.inner.get_ranges(location, ranges).await
2001        }
2002
2003        async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
2004            self.inner.head(location).await
2005        }
2006
2007        async fn delete(&self, location: &Path) -> object_store::Result<()> {
2008            self.inner.delete(location).await
2009        }
2010
2011        fn delete_stream<'a>(
2012            &'a self,
2013            locations: BoxStream<'a, object_store::Result<Path>>,
2014        ) -> BoxStream<'a, object_store::Result<Path>> {
2015            self.inner.delete_stream(locations)
2016        }
2017
2018        fn list(
2019            &self,
2020            prefix: Option<&Path>,
2021        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
2022            self.inner.list(prefix)
2023        }
2024
2025        fn list_with_offset(
2026            &self,
2027            prefix: Option<&Path>,
2028            offset: &Path,
2029        ) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
2030            self.inner.list_with_offset(prefix, offset)
2031        }
2032
2033        async fn list_with_delimiter(
2034            &self,
2035            prefix: Option<&Path>,
2036        ) -> object_store::Result<ListResult> {
2037            self.inner.list_with_delimiter(prefix).await
2038        }
2039
2040        async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2041            self.inner.copy(from, to).await
2042        }
2043
2044        async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2045            self.inner.rename(from, to).await
2046        }
2047
2048        async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2049            self.inner.copy_if_not_exists(from, to).await
2050        }
2051
2052        async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
2053            self.inner.rename_if_not_exists(from, to).await
2054        }
2055    }
2056}