datafusion_remote_table/
scan.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, DefaultTransform, RemoteSchemaRef, RemoteSource,
3    Transform, TransformStream, transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::error::DataFusionError;
9use datafusion::execution::{SendableRecordBatchStream, TaskContext};
10use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion::physical_plan::display::ProjectSchemaDisplay;
12use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use datafusion::physical_plan::{
15    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
16};
17use futures::TryStreamExt;
18use log::{debug, warn};
19use std::any::Any;
20use std::sync::Arc;
21
22#[derive(Debug)]
23pub struct RemoteTableScanExec {
24    pub(crate) conn_options: Arc<ConnectionOptions>,
25    pub(crate) source: RemoteSource,
26    pub(crate) table_schema: SchemaRef,
27    pub(crate) remote_schema: Option<RemoteSchemaRef>,
28    pub(crate) projection: Option<Vec<usize>>,
29    pub(crate) unparsed_filters: Vec<String>,
30    pub(crate) limit: Option<usize>,
31    pub(crate) transform: Arc<dyn Transform>,
32    pub(crate) conn: Arc<dyn Connection>,
33    plan_properties: PlanProperties,
34}
35
36impl RemoteTableScanExec {
37    #[allow(clippy::too_many_arguments)]
38    pub fn try_new(
39        conn_options: Arc<ConnectionOptions>,
40        source: RemoteSource,
41        table_schema: SchemaRef,
42        remote_schema: Option<RemoteSchemaRef>,
43        projection: Option<Vec<usize>>,
44        unparsed_filters: Vec<String>,
45        limit: Option<usize>,
46        transform: Arc<dyn Transform>,
47        conn: Arc<dyn Connection>,
48    ) -> DFResult<Self> {
49        let transformed_table_schema = transform_schema(
50            transform.as_ref(),
51            table_schema.clone(),
52            remote_schema.as_ref(),
53            conn_options.db_type(),
54        )?;
55        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
56        let plan_properties = PlanProperties::new(
57            EquivalenceProperties::new(projected_schema),
58            Partitioning::UnknownPartitioning(1),
59            EmissionType::Incremental,
60            Boundedness::Bounded,
61        );
62        Ok(Self {
63            conn_options,
64            source,
65            table_schema,
66            remote_schema,
67            projection,
68            unparsed_filters,
69            limit,
70            transform,
71            conn,
72            plan_properties,
73        })
74    }
75}
76
77impl ExecutionPlan for RemoteTableScanExec {
78    fn name(&self) -> &str {
79        "RemoteTableExec"
80    }
81
82    fn as_any(&self) -> &dyn Any {
83        self
84    }
85
86    fn properties(&self) -> &PlanProperties {
87        &self.plan_properties
88    }
89
90    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
91        vec![]
92    }
93
94    fn with_new_children(
95        self: Arc<Self>,
96        _children: Vec<Arc<dyn ExecutionPlan>>,
97    ) -> DFResult<Arc<dyn ExecutionPlan>> {
98        Ok(self)
99    }
100
101    fn execute(
102        &self,
103        partition: usize,
104        _context: Arc<TaskContext>,
105    ) -> DFResult<SendableRecordBatchStream> {
106        assert_eq!(partition, 0);
107        let schema = self.schema();
108        let fut = build_and_transform_stream(
109            self.conn.clone(),
110            self.conn_options.clone(),
111            self.source.clone(),
112            self.table_schema.clone(),
113            self.remote_schema.clone(),
114            self.projection.clone(),
115            self.unparsed_filters.clone(),
116            self.limit,
117            self.transform.clone(),
118        );
119        let stream = futures::stream::once(fut).try_flatten();
120        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
121    }
122
123    fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
124        if let Some(partition) = partition
125            && partition != 0
126        {
127            return Err(DataFusionError::Plan(format!(
128                "Invalid partition index: {partition}"
129            )));
130        }
131        let db_type = self.conn_options.db_type();
132        let limit = if db_type.support_rewrite_with_filters_limit(&self.source) {
133            self.limit
134        } else {
135            None
136        };
137        let real_sql = db_type.rewrite_query(&self.source, &self.unparsed_filters, limit);
138
139        if let Some(count1_query) = db_type.try_count1_query(&RemoteSource::Query(real_sql)) {
140            let conn = self.conn.clone();
141            let conn_options = self.conn_options.clone();
142            let row_count_result = tokio::task::block_in_place(|| {
143                tokio::runtime::Handle::current().block_on(async {
144                    db_type
145                        .fetch_count(conn, &conn_options, &count1_query)
146                        .await
147                })
148            });
149
150            match row_count_result {
151                Ok(row_count) => {
152                    let column_stat = Statistics::unknown_column(self.schema().as_ref());
153                    Ok(Statistics {
154                        num_rows: Precision::Exact(row_count),
155                        total_byte_size: Precision::Absent,
156                        column_statistics: column_stat,
157                    })
158                }
159                Err(e) => {
160                    warn!("[remote-table] Failed to fetch exec statistics: {e}");
161                    Err(e)
162                }
163            }
164        } else {
165            debug!(
166                "[remote-table] Query can not be rewritten as count1 query: {}",
167                self.source
168            );
169            Ok(Statistics::new_unknown(self.schema().as_ref()))
170        }
171    }
172
173    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
174        let db_type = self.conn_options.db_type();
175        if db_type.support_rewrite_with_filters_limit(&self.source) {
176            Some(Arc::new(Self {
177                conn_options: self.conn_options.clone(),
178                source: self.source.clone(),
179                table_schema: self.table_schema.clone(),
180                remote_schema: self.remote_schema.clone(),
181                projection: self.projection.clone(),
182                unparsed_filters: self.unparsed_filters.clone(),
183                limit,
184                transform: self.transform.clone(),
185                conn: self.conn.clone(),
186                plan_properties: self.plan_properties.clone(),
187            }))
188        } else {
189            None
190        }
191    }
192
193    fn fetch(&self) -> Option<usize> {
194        self.limit
195    }
196}
197
198#[allow(clippy::too_many_arguments)]
199async fn build_and_transform_stream(
200    conn: Arc<dyn Connection>,
201    conn_options: Arc<ConnectionOptions>,
202    source: RemoteSource,
203    table_schema: SchemaRef,
204    remote_schema: Option<RemoteSchemaRef>,
205    projection: Option<Vec<usize>>,
206    unparsed_filters: Vec<String>,
207    limit: Option<usize>,
208    transform: Arc<dyn Transform>,
209) -> DFResult<SendableRecordBatchStream> {
210    let db_type = conn_options.db_type();
211    let limit = if db_type.support_rewrite_with_filters_limit(&source) {
212        limit
213    } else {
214        None
215    };
216
217    if transform.as_any().is::<DefaultTransform>() {
218        conn.query(
219            &conn_options,
220            &source,
221            table_schema.clone(),
222            projection.as_ref(),
223            unparsed_filters.as_slice(),
224            limit,
225        )
226        .await
227    } else {
228        let Some(remote_schema) = remote_schema else {
229            return Err(DataFusionError::Execution(
230                "remote_schema is required for non-default transform".to_string(),
231            ));
232        };
233        let stream = conn
234            .query(
235                &conn_options,
236                &source,
237                table_schema.clone(),
238                None,
239                unparsed_filters.as_slice(),
240                limit,
241            )
242            .await?;
243        Ok(Box::pin(TransformStream::try_new(
244            stream,
245            transform.clone(),
246            table_schema,
247            projection,
248            remote_schema,
249            db_type,
250        )?))
251    }
252}
253
254impl DisplayAs for RemoteTableScanExec {
255    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
256        write!(
257            f,
258            "RemoteTableExec: source={}",
259            match &self.source {
260                RemoteSource::Query(_query) => "query".to_string(),
261                RemoteSource::Table(table) => table.join("."),
262            }
263        )?;
264        let projected_schema = self.schema();
265        if has_projection(self.projection.as_ref(), self.table_schema.fields().len()) {
266            write!(
267                f,
268                ", projection={}",
269                ProjectSchemaDisplay(&projected_schema)
270            )?;
271        }
272        if let Some(limit) = self.limit {
273            write!(f, ", limit={limit}")?;
274        }
275        if !self.unparsed_filters.is_empty() {
276            write!(f, ", filters=[{}]", self.unparsed_filters.join(", "))?;
277        }
278        Ok(())
279    }
280}
281
282fn has_projection(projection: Option<&Vec<usize>>, table_columns: usize) -> bool {
283    if let Some(projection) = projection {
284        if projection.len() != table_columns {
285            return true;
286        }
287        for (i, index) in projection.iter().enumerate() {
288            if *index != i {
289                return true;
290            }
291        }
292    }
293    false
294}