datafusion_remote_table/
scan.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, RemoteSource, Transform,
3    TransformStream, transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::error::DataFusionError;
9use datafusion::execution::{SendableRecordBatchStream, TaskContext};
10use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion::physical_plan::display::ProjectSchemaDisplay;
12use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use datafusion::physical_plan::{
15    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
16};
17use futures::TryStreamExt;
18use log::{debug, warn};
19use std::any::Any;
20use std::sync::Arc;
21
22#[derive(Debug)]
23pub struct RemoteTableScanExec {
24    pub(crate) conn_options: Arc<ConnectionOptions>,
25    pub(crate) source: RemoteSource,
26    pub(crate) table_schema: SchemaRef,
27    pub(crate) remote_schema: Option<RemoteSchemaRef>,
28    pub(crate) projection: Option<Vec<usize>>,
29    pub(crate) unparsed_filters: Vec<String>,
30    pub(crate) limit: Option<usize>,
31    pub(crate) transform: Arc<dyn Transform>,
32    pub(crate) conn: Arc<dyn Connection>,
33    plan_properties: PlanProperties,
34}
35
36impl RemoteTableScanExec {
37    #[allow(clippy::too_many_arguments)]
38    pub fn try_new(
39        conn_options: Arc<ConnectionOptions>,
40        source: RemoteSource,
41        table_schema: SchemaRef,
42        remote_schema: Option<RemoteSchemaRef>,
43        projection: Option<Vec<usize>>,
44        unparsed_filters: Vec<String>,
45        limit: Option<usize>,
46        transform: Arc<dyn Transform>,
47        conn: Arc<dyn Connection>,
48    ) -> DFResult<Self> {
49        let transformed_table_schema = transform_schema(
50            table_schema.clone(),
51            transform.as_ref(),
52            remote_schema.as_ref(),
53        )?;
54        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
55        let plan_properties = PlanProperties::new(
56            EquivalenceProperties::new(projected_schema),
57            Partitioning::UnknownPartitioning(1),
58            EmissionType::Incremental,
59            Boundedness::Bounded,
60        );
61        Ok(Self {
62            conn_options,
63            source,
64            table_schema,
65            remote_schema,
66            projection,
67            unparsed_filters,
68            limit,
69            transform,
70            conn,
71            plan_properties,
72        })
73    }
74}
75
76impl ExecutionPlan for RemoteTableScanExec {
77    fn name(&self) -> &str {
78        "RemoteTableExec"
79    }
80
81    fn as_any(&self) -> &dyn Any {
82        self
83    }
84
85    fn properties(&self) -> &PlanProperties {
86        &self.plan_properties
87    }
88
89    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
90        vec![]
91    }
92
93    fn with_new_children(
94        self: Arc<Self>,
95        _children: Vec<Arc<dyn ExecutionPlan>>,
96    ) -> DFResult<Arc<dyn ExecutionPlan>> {
97        Ok(self)
98    }
99
100    fn execute(
101        &self,
102        partition: usize,
103        _context: Arc<TaskContext>,
104    ) -> DFResult<SendableRecordBatchStream> {
105        assert_eq!(partition, 0);
106        let schema = self.schema();
107        let fut = build_and_transform_stream(
108            self.conn.clone(),
109            self.conn_options.clone(),
110            self.source.clone(),
111            self.table_schema.clone(),
112            self.remote_schema.clone(),
113            self.projection.clone(),
114            self.unparsed_filters.clone(),
115            self.limit,
116            self.transform.clone(),
117        );
118        let stream = futures::stream::once(fut).try_flatten();
119        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
120    }
121
122    fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
123        if let Some(partition) = partition
124            && partition != 0
125        {
126            return Err(DataFusionError::Plan(format!(
127                "Invalid partition index: {partition}"
128            )));
129        }
130        let db_type = self.conn_options.db_type();
131        let limit = if db_type.support_rewrite_with_filters_limit(&self.source) {
132            self.limit
133        } else {
134            None
135        };
136        let real_sql = db_type.rewrite_query(&self.source, &self.unparsed_filters, limit);
137
138        if let Some(count1_query) = db_type.try_count1_query(&RemoteSource::Query(real_sql)) {
139            let conn = self.conn.clone();
140            let conn_options = self.conn_options.clone();
141            let row_count_result = tokio::task::block_in_place(|| {
142                tokio::runtime::Handle::current().block_on(async {
143                    db_type
144                        .fetch_count(conn, &conn_options, &count1_query)
145                        .await
146                })
147            });
148
149            match row_count_result {
150                Ok(row_count) => {
151                    let column_stat = Statistics::unknown_column(self.schema().as_ref());
152                    Ok(Statistics {
153                        num_rows: Precision::Exact(row_count),
154                        total_byte_size: Precision::Absent,
155                        column_statistics: column_stat,
156                    })
157                }
158                Err(e) => {
159                    warn!("[remote-table] Failed to fetch exec statistics: {e}");
160                    Err(e)
161                }
162            }
163        } else {
164            debug!(
165                "[remote-table] Query can not be rewritten as count1 query: {}",
166                self.source
167            );
168            Ok(Statistics::new_unknown(self.schema().as_ref()))
169        }
170    }
171
172    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
173        let db_type = self.conn_options.db_type();
174        if db_type.support_rewrite_with_filters_limit(&self.source) {
175            Some(Arc::new(Self {
176                conn_options: self.conn_options.clone(),
177                source: self.source.clone(),
178                table_schema: self.table_schema.clone(),
179                remote_schema: self.remote_schema.clone(),
180                projection: self.projection.clone(),
181                unparsed_filters: self.unparsed_filters.clone(),
182                limit,
183                transform: self.transform.clone(),
184                conn: self.conn.clone(),
185                plan_properties: self.plan_properties.clone(),
186            }))
187        } else {
188            None
189        }
190    }
191
192    fn fetch(&self) -> Option<usize> {
193        self.limit
194    }
195}
196
197#[allow(clippy::too_many_arguments)]
198async fn build_and_transform_stream(
199    conn: Arc<dyn Connection>,
200    conn_options: Arc<ConnectionOptions>,
201    source: RemoteSource,
202    table_schema: SchemaRef,
203    remote_schema: Option<RemoteSchemaRef>,
204    projection: Option<Vec<usize>>,
205    unparsed_filters: Vec<String>,
206    limit: Option<usize>,
207    transform: Arc<dyn Transform>,
208) -> DFResult<SendableRecordBatchStream> {
209    let limit = if conn_options
210        .db_type()
211        .support_rewrite_with_filters_limit(&source)
212    {
213        limit
214    } else {
215        None
216    };
217
218    let stream = conn
219        .query(
220            &conn_options,
221            &source,
222            table_schema.clone(),
223            projection.as_ref(),
224            unparsed_filters.as_slice(),
225            limit,
226        )
227        .await?;
228
229    Ok(Box::pin(TransformStream::try_new(
230        stream,
231        transform.clone(),
232        table_schema,
233        projection,
234        remote_schema,
235    )?))
236}
237
238impl DisplayAs for RemoteTableScanExec {
239    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
240        write!(
241            f,
242            "RemoteTableExec: source={}",
243            match &self.source {
244                RemoteSource::Query(_query) => "query".to_string(),
245                RemoteSource::Table(table) => table.join("."),
246            }
247        )?;
248        let projected_schema = self.schema();
249        if has_projection(self.projection.as_ref(), self.table_schema.fields().len()) {
250            write!(
251                f,
252                ", projection={}",
253                ProjectSchemaDisplay(&projected_schema)
254            )?;
255        }
256        if let Some(limit) = self.limit {
257            write!(f, ", limit={limit}")?;
258        }
259        if !self.unparsed_filters.is_empty() {
260            write!(f, ", filters=[{}]", self.unparsed_filters.join(", "))?;
261        }
262        Ok(())
263    }
264}
265
266fn has_projection(projection: Option<&Vec<usize>>, table_columns: usize) -> bool {
267    if let Some(projection) = projection {
268        if projection.len() != table_columns {
269            return true;
270        }
271        for (i, index) in projection.iter().enumerate() {
272            if *index != i {
273                return true;
274            }
275        }
276    }
277    false
278}