datafusion_delta_sharing/datasource/
table.rs1use std::{any::Any, sync::Arc};
4
5use datafusion::{
6 arrow::datatypes::{Field, Schema, SchemaRef},
7 common::{stats::Statistics, Constraints},
8 datasource::TableProvider,
9 error::Result as DataFusionResult,
10 execution::context::SessionState,
11 logical_expr::{utils::conjunction, Expr, LogicalPlan, TableProviderFilterPushDown, TableType},
12 physical_plan::ExecutionPlan,
13};
14
15use crate::{
16 client::{
17 action::{File, Metadata, Protocol},
18 DeltaSharingClient,
19 },
20 securable::Table,
21 DeltaSharingError, Profile,
22};
23
24use super::{expr::Op, scan::DeltaSharingScanBuilder, schema::StructType};
25
26#[derive(Debug, Default)]
28pub struct DeltaSharingTableBuilder {
29 profile: Option<Profile>,
30 table: Option<Table>,
31}
32
33impl DeltaSharingTableBuilder {
34 pub fn new() -> Self {
36 Default::default()
37 }
38
39 pub fn with_profile(mut self, profile: Profile) -> Self {
41 self.profile = Some(profile);
42 self
43 }
44
45 pub fn with_table(mut self, table: Table) -> Self {
47 self.table = Some(table);
48 self
49 }
50
51 pub async fn build(self) -> Result<DeltaSharingTable, DeltaSharingError> {
53 let (Some(profile), Some(table)) = (self.profile, self.table) else {
54 return Err(DeltaSharingError::other("Missing profile or table"));
55 };
56
57 let client = DeltaSharingClient::new(profile);
58 let (protocol, metadata) = client.get_table_metadata(&table).await?;
59
60 Ok(DeltaSharingTable {
61 client,
62 table,
63 _protocol: protocol,
64 metadata,
65 })
66 }
67}
68
69pub struct DeltaSharingTable {
71 client: DeltaSharingClient,
72 table: Table,
73 _protocol: Protocol,
74 metadata: Metadata,
75}
76
77impl DeltaSharingTable {
78 pub async fn try_from_str(s: &str) -> Result<Self, DeltaSharingError> {
94 let (profile_path, table_fqn) = s.split_once('#').ok_or(DeltaSharingError::other("The connection string should be formatted as `<path/to/profile>#<share_name>.<schema_name>.<table_name>"))?;
95 let profile = Profile::try_from_path(profile_path)?;
96 let table = table_fqn.parse::<Table>()?;
97
98 DeltaSharingTableBuilder::new()
99 .with_profile(profile)
100 .with_table(table)
101 .build()
102 .await
103 }
104
105 fn arrow_schema(&self) -> SchemaRef {
106 let s: StructType = serde_json::from_str(self.metadata.schema_string()).unwrap();
107 let fields = s
108 .fields()
109 .iter()
110 .map(|f| f.try_into())
111 .collect::<Result<Vec<Field>, _>>()
112 .unwrap();
113 Arc::new(Schema::new(fields))
114 }
115
116 fn schema(&self) -> Schema {
117 let s: StructType = serde_json::from_str(self.metadata.schema_string()).unwrap();
118 let fields = s
119 .fields()
120 .iter()
121 .map(|f| f.try_into())
122 .collect::<Result<Vec<Field>, _>>()
123 .unwrap();
124 Schema::new(fields)
125 }
126
127 async fn list_files_for_scan(
128 &self,
129 filter: Option<Op>,
130 limit: Option<usize>,
131 ) -> Result<Vec<File>, DeltaSharingError> {
132 let mapped_limit = limit.map(|l| l as u32);
133 let mapped_filter = filter.map(|f| f.to_string_repr());
134 self.client
135 .get_table_data(&self.table, mapped_filter, mapped_limit)
136 .await
137 }
138
139 fn partition_columns(&self) -> Vec<String> {
140 self.metadata.partition_columns().to_vec()
141 }
142}
143
144#[async_trait::async_trait]
145impl TableProvider for DeltaSharingTable {
146 fn as_any(&self) -> &dyn Any {
147 self
148 }
149
150 fn schema(&self) -> SchemaRef {
151 self.arrow_schema()
152 }
153
154 fn constraints(&self) -> Option<&Constraints> {
155 None
156 }
157
158 fn table_type(&self) -> TableType {
159 TableType::Base
160 }
161
162 fn get_table_definition(&self) -> Option<&str> {
163 None
164 }
165
166 fn get_logical_plan(&self) -> Option<&LogicalPlan> {
167 None
168 }
169
170 fn get_column_default(&self, _column: &str) -> Option<&Expr> {
171 None
172 }
173
174 async fn scan(
175 &self,
176 _state: &SessionState,
177 projection: Option<&Vec<usize>>,
178 filters: &[Expr],
179 limit: Option<usize>,
180 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
181 let filter = conjunction(filters.iter().cloned())
183 .and_then(|expr| Op::from_expr(&expr, self.arrow_schema()).ok());
184
185 let files = self.list_files_for_scan(filter, limit).await?;
187
188 let scan = DeltaSharingScanBuilder::new(self.schema(), self.partition_columns())
190 .with_projection(projection.cloned())
191 .with_files(files)
192 .build()
193 .unwrap();
194
195 Ok(Arc::new(scan))
196 }
197
198 fn supports_filters_pushdown(
199 &self,
200 filters: &[&Expr],
201 ) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
202 filters
203 .iter()
204 .map(|f| {
205 let op = Op::from_expr(f, self.arrow_schema());
206 if op.is_ok() {
207 Ok(TableProviderFilterPushDown::Inexact)
208 } else {
209 Ok(TableProviderFilterPushDown::Unsupported)
210 }
211 })
212 .collect()
213 }
214
215 fn statistics(&self) -> Option<Statistics> {
216 None
217 }
218}