datafusion_delta_sharing/datasource/
table.rs

1//! Delta Sharing table
2
3use std::{any::Any, sync::Arc};
4
5use datafusion::{
6    arrow::datatypes::{Field, Schema, SchemaRef},
7    common::{stats::Statistics, Constraints},
8    datasource::TableProvider,
9    error::Result as DataFusionResult,
10    execution::context::SessionState,
11    logical_expr::{utils::conjunction, Expr, LogicalPlan, TableProviderFilterPushDown, TableType},
12    physical_plan::ExecutionPlan,
13};
14
15use crate::{
16    client::{
17        action::{File, Metadata, Protocol},
18        DeltaSharingClient,
19    },
20    securable::Table,
21    DeltaSharingError, Profile,
22};
23
24use super::{expr::Op, scan::DeltaSharingScanBuilder, schema::StructType};
25
26/// Builder for [`DeltaSharingTable`]
27#[derive(Debug, Default)]
28pub struct DeltaSharingTableBuilder {
29    profile: Option<Profile>,
30    table: Option<Table>,
31}
32
33impl DeltaSharingTableBuilder {
34    /// Create a new DeltaSharingTableBuilder
35    pub fn new() -> Self {
36        Default::default()
37    }
38
39    /// Set the profile for the DeltaSharingTable
40    pub fn with_profile(mut self, profile: Profile) -> Self {
41        self.profile = Some(profile);
42        self
43    }
44
45    /// Set the table for the DeltaSharingTable
46    pub fn with_table(mut self, table: Table) -> Self {
47        self.table = Some(table);
48        self
49    }
50
51    /// Build the DeltaSharingTable
52    pub async fn build(self) -> Result<DeltaSharingTable, DeltaSharingError> {
53        let (Some(profile), Some(table)) = (self.profile, self.table) else {
54            return Err(DeltaSharingError::other("Missing profile or table"));
55        };
56
57        let client = DeltaSharingClient::new(profile);
58        let (protocol, metadata) = client.get_table_metadata(&table).await?;
59
60        Ok(DeltaSharingTable {
61            client,
62            table,
63            _protocol: protocol,
64            metadata,
65        })
66    }
67}
68
69/// Delta Sharing implementation of [`TableProvider`]`
70pub struct DeltaSharingTable {
71    client: DeltaSharingClient,
72    table: Table,
73    _protocol: Protocol,
74    metadata: Metadata,
75}
76
77impl DeltaSharingTable {
78    /// Create a new DeltaSharingTable from a connection string
79    ///
80    /// The connection string should be formatted as
81    /// `<path/to/profile>#<share_name>.<schema_name>.<table_name>`
82    ///
83    /// Example:
84    /// ```no_run,rust
85    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
86    /// # async {
87    /// use datafusion_delta_sharing::DeltaSharingTable;
88    ///
89    /// let table = DeltaSharingTable::try_from_str("./path/to/profile.share#share.schema.table").await?;
90    /// # Ok::<(), Box<dyn std::error::Error>>(()) };
91    /// # Ok(()) }
92    /// ```
93    pub async fn try_from_str(s: &str) -> Result<Self, DeltaSharingError> {
94        let (profile_path, table_fqn) = s.split_once('#').ok_or(DeltaSharingError::other("The connection string should be formatted as `<path/to/profile>#<share_name>.<schema_name>.<table_name>"))?;
95        let profile = Profile::try_from_path(profile_path)?;
96        let table = table_fqn.parse::<Table>()?;
97
98        DeltaSharingTableBuilder::new()
99            .with_profile(profile)
100            .with_table(table)
101            .build()
102            .await
103    }
104
105    fn arrow_schema(&self) -> SchemaRef {
106        let s: StructType = serde_json::from_str(self.metadata.schema_string()).unwrap();
107        let fields = s
108            .fields()
109            .iter()
110            .map(|f| f.try_into())
111            .collect::<Result<Vec<Field>, _>>()
112            .unwrap();
113        Arc::new(Schema::new(fields))
114    }
115
116    fn schema(&self) -> Schema {
117        let s: StructType = serde_json::from_str(self.metadata.schema_string()).unwrap();
118        let fields = s
119            .fields()
120            .iter()
121            .map(|f| f.try_into())
122            .collect::<Result<Vec<Field>, _>>()
123            .unwrap();
124        Schema::new(fields)
125    }
126
127    async fn list_files_for_scan(
128        &self,
129        filter: Option<Op>,
130        limit: Option<usize>,
131    ) -> Result<Vec<File>, DeltaSharingError> {
132        let mapped_limit = limit.map(|l| l as u32);
133        let mapped_filter = filter.map(|f| f.to_string_repr());
134        self.client
135            .get_table_data(&self.table, mapped_filter, mapped_limit)
136            .await
137    }
138
139    fn partition_columns(&self) -> Vec<String> {
140        self.metadata.partition_columns().to_vec()
141    }
142}
143
144#[async_trait::async_trait]
145impl TableProvider for DeltaSharingTable {
146    fn as_any(&self) -> &dyn Any {
147        self
148    }
149
150    fn schema(&self) -> SchemaRef {
151        self.arrow_schema()
152    }
153
154    fn constraints(&self) -> Option<&Constraints> {
155        None
156    }
157
158    fn table_type(&self) -> TableType {
159        TableType::Base
160    }
161
162    fn get_table_definition(&self) -> Option<&str> {
163        None
164    }
165
166    fn get_logical_plan(&self) -> Option<&LogicalPlan> {
167        None
168    }
169
170    fn get_column_default(&self, _column: &str) -> Option<&Expr> {
171        None
172    }
173
174    async fn scan(
175        &self,
176        _state: &SessionState,
177        projection: Option<&Vec<usize>>,
178        filters: &[Expr],
179        limit: Option<usize>,
180    ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
181        // Convert filters to Delta Sharing filter
182        let filter = conjunction(filters.iter().cloned())
183            .and_then(|expr| Op::from_expr(&expr, self.arrow_schema()).ok());
184
185        // Fetch files satisfying filters & limit (best effort)
186        let files = self.list_files_for_scan(filter, limit).await?;
187
188        // Build Delta Sharing scan
189        let scan = DeltaSharingScanBuilder::new(self.schema(), self.partition_columns())
190            .with_projection(projection.cloned())
191            .with_files(files)
192            .build()
193            .unwrap();
194
195        Ok(Arc::new(scan))
196    }
197
198    fn supports_filters_pushdown(
199        &self,
200        filters: &[&Expr],
201    ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
202        filters
203            .iter()
204            .map(|f| {
205                let op = Op::from_expr(f, self.arrow_schema());
206                if op.is_ok() {
207                    Ok(TableProviderFilterPushDown::Inexact)
208                } else {
209                    Ok(TableProviderFilterPushDown::Unsupported)
210                }
211            })
212            .collect()
213    }
214
215    fn statistics(&self) -> Option<Statistics> {
216        None
217    }
218}