datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::error::DataFusionError;
9use datafusion::execution::{SendableRecordBatchStream, TaskContext};
10use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
12use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
13use datafusion::physical_plan::{
14    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
15};
16use futures::TryStreamExt;
17use log::{debug, warn};
18use std::any::Any;
19use std::sync::Arc;
20
21#[derive(Debug)]
22pub struct RemoteTableExec {
23    pub(crate) conn_options: ConnectionOptions,
24    pub(crate) sql: String,
25    pub(crate) table_schema: SchemaRef,
26    pub(crate) remote_schema: Option<RemoteSchemaRef>,
27    pub(crate) projection: Option<Vec<usize>>,
28    pub(crate) unparsed_filters: Vec<String>,
29    pub(crate) limit: Option<usize>,
30    pub(crate) transform: Arc<dyn Transform>,
31    pub(crate) conn: Arc<dyn Connection>,
32    plan_properties: PlanProperties,
33}
34
35impl RemoteTableExec {
36    #[allow(clippy::too_many_arguments)]
37    pub fn try_new(
38        conn_options: ConnectionOptions,
39        sql: String,
40        table_schema: SchemaRef,
41        remote_schema: Option<RemoteSchemaRef>,
42        projection: Option<Vec<usize>>,
43        unparsed_filters: Vec<String>,
44        limit: Option<usize>,
45        transform: Arc<dyn Transform>,
46        conn: Arc<dyn Connection>,
47    ) -> DFResult<Self> {
48        let transformed_table_schema = transform_schema(
49            table_schema.clone(),
50            transform.as_ref(),
51            remote_schema.as_ref(),
52        )?;
53        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
54        let plan_properties = PlanProperties::new(
55            EquivalenceProperties::new(projected_schema),
56            Partitioning::UnknownPartitioning(1),
57            EmissionType::Incremental,
58            Boundedness::Bounded,
59        );
60        Ok(Self {
61            conn_options,
62            sql,
63            table_schema,
64            remote_schema,
65            projection,
66            unparsed_filters,
67            limit,
68            transform,
69            conn,
70            plan_properties,
71        })
72    }
73}
74
75impl ExecutionPlan for RemoteTableExec {
76    fn name(&self) -> &str {
77        "RemoteTableExec"
78    }
79
80    fn as_any(&self) -> &dyn Any {
81        self
82    }
83
84    fn properties(&self) -> &PlanProperties {
85        &self.plan_properties
86    }
87
88    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
89        vec![]
90    }
91
92    fn with_new_children(
93        self: Arc<Self>,
94        _children: Vec<Arc<dyn ExecutionPlan>>,
95    ) -> DFResult<Arc<dyn ExecutionPlan>> {
96        Ok(self)
97    }
98
99    fn execute(
100        &self,
101        partition: usize,
102        _context: Arc<TaskContext>,
103    ) -> DFResult<SendableRecordBatchStream> {
104        assert_eq!(partition, 0);
105        let schema = self.schema();
106        let fut = build_and_transform_stream(
107            self.conn.clone(),
108            self.conn_options.clone(),
109            self.sql.clone(),
110            self.table_schema.clone(),
111            self.remote_schema.clone(),
112            self.projection.clone(),
113            self.unparsed_filters.clone(),
114            self.limit,
115            self.transform.clone(),
116        );
117        let stream = futures::stream::once(fut).try_flatten();
118        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
119    }
120
121    fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
122        if let Some(partition) = partition {
123            if partition != 0 {
124                return Err(DataFusionError::Plan(format!(
125                    "Invalid partition index: {partition}"
126                )));
127            }
128        }
129        let db_type = self.conn_options.db_type();
130        let limit = if db_type.support_rewrite_with_filters_limit(&self.sql) {
131            self.limit
132        } else {
133            None
134        };
135        let real_sql = db_type.rewrite_query(&self.sql, &self.unparsed_filters, limit);
136
137        if let Some(count1_query) = db_type.try_count1_query(&real_sql) {
138            let conn = self.conn.clone();
139            let conn_options = self.conn_options.clone();
140            let row_count_result = tokio::task::block_in_place(|| {
141                tokio::runtime::Handle::current().block_on(async {
142                    db_type
143                        .fetch_count(conn, &conn_options, &count1_query)
144                        .await
145                })
146            });
147
148            match row_count_result {
149                Ok(row_count) => {
150                    let column_stat = Statistics::unknown_column(self.schema().as_ref());
151                    Ok(Statistics {
152                        num_rows: Precision::Exact(row_count),
153                        total_byte_size: Precision::Absent,
154                        column_statistics: column_stat,
155                    })
156                }
157                Err(e) => {
158                    warn!("[remote-table] Failed to fetch exec statistics: {e}");
159                    Err(e)
160                }
161            }
162        } else {
163            debug!(
164                "[remote-table] Query can not be rewritten as count1 query: {}",
165                self.sql
166            );
167            Ok(Statistics::new_unknown(self.schema().as_ref()))
168        }
169    }
170
171    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
172        if self
173            .conn_options
174            .db_type()
175            .support_rewrite_with_filters_limit(&self.sql)
176        {
177            Some(Arc::new(Self {
178                conn_options: self.conn_options.clone(),
179                sql: self.sql.clone(),
180                table_schema: self.table_schema.clone(),
181                remote_schema: self.remote_schema.clone(),
182                projection: self.projection.clone(),
183                unparsed_filters: self.unparsed_filters.clone(),
184                limit,
185                transform: self.transform.clone(),
186                conn: self.conn.clone(),
187                plan_properties: self.plan_properties.clone(),
188            }))
189        } else {
190            None
191        }
192    }
193
194    fn fetch(&self) -> Option<usize> {
195        self.limit
196    }
197}
198
199#[allow(clippy::too_many_arguments)]
200async fn build_and_transform_stream(
201    conn: Arc<dyn Connection>,
202    conn_options: ConnectionOptions,
203    sql: String,
204    table_schema: SchemaRef,
205    remote_schema: Option<RemoteSchemaRef>,
206    projection: Option<Vec<usize>>,
207    unparsed_filters: Vec<String>,
208    limit: Option<usize>,
209    transform: Arc<dyn Transform>,
210) -> DFResult<SendableRecordBatchStream> {
211    let limit = if conn_options
212        .db_type()
213        .support_rewrite_with_filters_limit(&sql)
214    {
215        limit
216    } else {
217        None
218    };
219
220    let stream = conn
221        .query(
222            &conn_options,
223            &sql,
224            table_schema.clone(),
225            projection.as_ref(),
226            unparsed_filters.as_slice(),
227            limit,
228        )
229        .await?;
230
231    Ok(Box::pin(TransformStream::try_new(
232        stream,
233        transform.clone(),
234        table_schema,
235        projection,
236        remote_schema,
237    )?))
238}
239
240impl DisplayAs for RemoteTableExec {
241    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
242        write!(
243            f,
244            "RemoteTableExec: limit={:?}, filters=[{}]",
245            self.limit,
246            self.unparsed_filters.join(", ")
247        )
248    }
249}