datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
10use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
11use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
12use datafusion::physical_plan::{
13    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
14};
15use futures::TryStreamExt;
16use log::{debug, warn};
17use std::any::Any;
18use std::sync::Arc;
19
20#[derive(Debug)]
21pub struct RemoteTableExec {
22    pub(crate) conn_options: ConnectionOptions,
23    pub(crate) sql: String,
24    pub(crate) table_schema: SchemaRef,
25    pub(crate) remote_schema: Option<RemoteSchemaRef>,
26    pub(crate) projection: Option<Vec<usize>>,
27    pub(crate) unparsed_filters: Vec<String>,
28    pub(crate) limit: Option<usize>,
29    pub(crate) transform: Arc<dyn Transform>,
30    pub(crate) conn: Arc<dyn Connection>,
31    plan_properties: PlanProperties,
32}
33
34impl RemoteTableExec {
35    #[allow(clippy::too_many_arguments)]
36    pub fn try_new(
37        conn_options: ConnectionOptions,
38        sql: String,
39        table_schema: SchemaRef,
40        remote_schema: Option<RemoteSchemaRef>,
41        projection: Option<Vec<usize>>,
42        unparsed_filters: Vec<String>,
43        limit: Option<usize>,
44        transform: Arc<dyn Transform>,
45        conn: Arc<dyn Connection>,
46    ) -> DFResult<Self> {
47        let transformed_table_schema = transform_schema(
48            table_schema.clone(),
49            transform.as_ref(),
50            remote_schema.as_ref(),
51        )?;
52        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
53        let plan_properties = PlanProperties::new(
54            EquivalenceProperties::new(projected_schema),
55            Partitioning::UnknownPartitioning(1),
56            EmissionType::Incremental,
57            Boundedness::Bounded,
58        );
59        Ok(Self {
60            conn_options,
61            sql,
62            table_schema,
63            remote_schema,
64            projection,
65            unparsed_filters,
66            limit,
67            transform,
68            conn,
69            plan_properties,
70        })
71    }
72}
73
74impl ExecutionPlan for RemoteTableExec {
75    fn name(&self) -> &str {
76        "RemoteTableExec"
77    }
78
79    fn as_any(&self) -> &dyn Any {
80        self
81    }
82
83    fn properties(&self) -> &PlanProperties {
84        &self.plan_properties
85    }
86
87    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88        vec![]
89    }
90
91    fn with_new_children(
92        self: Arc<Self>,
93        _children: Vec<Arc<dyn ExecutionPlan>>,
94    ) -> DFResult<Arc<dyn ExecutionPlan>> {
95        Ok(self)
96    }
97
98    fn execute(
99        &self,
100        partition: usize,
101        _context: Arc<TaskContext>,
102    ) -> DFResult<SendableRecordBatchStream> {
103        assert_eq!(partition, 0);
104        let schema = self.schema();
105        let fut = build_and_transform_stream(
106            self.conn.clone(),
107            self.conn_options.clone(),
108            self.sql.clone(),
109            self.table_schema.clone(),
110            self.remote_schema.clone(),
111            self.projection.clone(),
112            self.unparsed_filters.clone(),
113            self.limit,
114            self.transform.clone(),
115        );
116        let stream = futures::stream::once(fut).try_flatten();
117        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
118    }
119
120    fn statistics(&self) -> DFResult<Statistics> {
121        let db_type = self.conn_options.db_type();
122        let limit = if db_type.support_rewrite_with_filters_limit(&self.sql) {
123            self.limit
124        } else {
125            None
126        };
127        let real_sql = db_type.rewrite_query(&self.sql, &self.unparsed_filters, limit);
128
129        if let Some(count1_query) = db_type.try_count1_query(&real_sql) {
130            let conn = self.conn.clone();
131            let conn_options = self.conn_options.clone();
132            let row_count_result = tokio::task::block_in_place(|| {
133                tokio::runtime::Handle::current().block_on(async {
134                    db_type
135                        .fetch_count(conn, &conn_options, &count1_query)
136                        .await
137                })
138            });
139
140            match row_count_result {
141                Ok(row_count) => {
142                    let column_stat = Statistics::unknown_column(self.schema().as_ref());
143                    Ok(Statistics {
144                        num_rows: Precision::Exact(row_count),
145                        total_byte_size: Precision::Absent,
146                        column_statistics: column_stat,
147                    })
148                }
149                Err(e) => {
150                    warn!("[remote-table] Failed to fetch exec statistics: {e}");
151                    Err(e)
152                }
153            }
154        } else {
155            debug!(
156                "[remote-table] Query can not be rewritten as count1 query: {}",
157                self.sql
158            );
159            Ok(Statistics::new_unknown(self.schema().as_ref()))
160        }
161    }
162
163    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
164        if self
165            .conn_options
166            .db_type()
167            .support_rewrite_with_filters_limit(&self.sql)
168        {
169            Some(Arc::new(Self {
170                conn_options: self.conn_options.clone(),
171                sql: self.sql.clone(),
172                table_schema: self.table_schema.clone(),
173                remote_schema: self.remote_schema.clone(),
174                projection: self.projection.clone(),
175                unparsed_filters: self.unparsed_filters.clone(),
176                limit,
177                transform: self.transform.clone(),
178                conn: self.conn.clone(),
179                plan_properties: self.plan_properties.clone(),
180            }))
181        } else {
182            None
183        }
184    }
185
186    fn fetch(&self) -> Option<usize> {
187        self.limit
188    }
189}
190
191#[allow(clippy::too_many_arguments)]
192async fn build_and_transform_stream(
193    conn: Arc<dyn Connection>,
194    conn_options: ConnectionOptions,
195    sql: String,
196    table_schema: SchemaRef,
197    remote_schema: Option<RemoteSchemaRef>,
198    projection: Option<Vec<usize>>,
199    unparsed_filters: Vec<String>,
200    limit: Option<usize>,
201    transform: Arc<dyn Transform>,
202) -> DFResult<SendableRecordBatchStream> {
203    let limit = if conn_options
204        .db_type()
205        .support_rewrite_with_filters_limit(&sql)
206    {
207        limit
208    } else {
209        None
210    };
211
212    let stream = conn
213        .query(
214            &conn_options,
215            &sql,
216            table_schema.clone(),
217            projection.as_ref(),
218            unparsed_filters.as_slice(),
219            limit,
220        )
221        .await?;
222
223    Ok(Box::pin(TransformStream::try_new(
224        stream,
225        transform.clone(),
226        table_schema,
227        projection,
228        remote_schema,
229    )?))
230}
231
232impl DisplayAs for RemoteTableExec {
233    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
234        write!(
235            f,
236            "RemoteTableExec: limit={:?}, filters=[{}]",
237            self.limit,
238            self.unparsed_filters.join(", ")
239        )
240    }
241}