datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
10use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
11use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
12use datafusion::physical_plan::{
13    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
14};
15use futures::TryStreamExt;
16use log::{debug, warn};
17use std::any::Any;
18use std::sync::Arc;
19
20#[derive(Debug)]
21pub struct RemoteTableExec {
22    pub(crate) conn_options: ConnectionOptions,
23    pub(crate) sql: String,
24    pub(crate) table_schema: SchemaRef,
25    pub(crate) remote_schema: Option<RemoteSchemaRef>,
26    pub(crate) projection: Option<Vec<usize>>,
27    pub(crate) unparsed_filters: Vec<String>,
28    pub(crate) limit: Option<usize>,
29    pub(crate) transform: Arc<dyn Transform>,
30    conn: Arc<dyn Connection>,
31    plan_properties: PlanProperties,
32}
33
34impl RemoteTableExec {
35    #[allow(clippy::too_many_arguments)]
36    pub fn try_new(
37        conn_options: ConnectionOptions,
38        sql: String,
39        table_schema: SchemaRef,
40        remote_schema: Option<RemoteSchemaRef>,
41        projection: Option<Vec<usize>>,
42        unparsed_filters: Vec<String>,
43        limit: Option<usize>,
44        transform: Arc<dyn Transform>,
45        conn: Arc<dyn Connection>,
46    ) -> DFResult<Self> {
47        let transformed_table_schema = transform_schema(
48            table_schema.clone(),
49            transform.as_ref(),
50            remote_schema.as_ref(),
51        )?;
52        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
53        let plan_properties = PlanProperties::new(
54            EquivalenceProperties::new(projected_schema),
55            Partitioning::UnknownPartitioning(1),
56            EmissionType::Incremental,
57            Boundedness::Bounded,
58        );
59        Ok(Self {
60            conn_options,
61            sql,
62            table_schema,
63            remote_schema,
64            projection,
65            unparsed_filters,
66            limit,
67            transform,
68            conn,
69            plan_properties,
70        })
71    }
72}
73
74impl ExecutionPlan for RemoteTableExec {
75    fn name(&self) -> &str {
76        "RemoteTableExec"
77    }
78
79    fn as_any(&self) -> &dyn Any {
80        self
81    }
82
83    fn properties(&self) -> &PlanProperties {
84        &self.plan_properties
85    }
86
87    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88        vec![]
89    }
90
91    fn with_new_children(
92        self: Arc<Self>,
93        _children: Vec<Arc<dyn ExecutionPlan>>,
94    ) -> DFResult<Arc<dyn ExecutionPlan>> {
95        Ok(self)
96    }
97
98    fn execute(
99        &self,
100        partition: usize,
101        _context: Arc<TaskContext>,
102    ) -> DFResult<SendableRecordBatchStream> {
103        assert_eq!(partition, 0);
104        let schema = self.schema();
105        let fut = build_and_transform_stream(
106            self.conn.clone(),
107            self.conn_options.clone(),
108            self.sql.clone(),
109            self.table_schema.clone(),
110            self.remote_schema.clone(),
111            self.projection.clone(),
112            self.unparsed_filters.clone(),
113            self.limit,
114            self.transform.clone(),
115        );
116        let stream = futures::stream::once(fut).try_flatten();
117        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
118    }
119
120    fn statistics(&self) -> DFResult<Statistics> {
121        if let Some(count1_query) = self.conn_options.db_type().try_count1_query(&self.sql) {
122            let conn = self.conn.clone();
123            let conn_options = self.conn_options.clone();
124            let row_count_result = tokio::task::block_in_place(|| {
125                tokio::runtime::Handle::current().block_on(async {
126                    conn_options
127                        .db_type()
128                        .fetch_count(conn, &conn_options, &count1_query)
129                        .await
130                })
131            });
132
133            match row_count_result {
134                Ok(row_count) => {
135                    let column_stat = Statistics::unknown_column(self.schema().as_ref());
136                    Ok(Statistics {
137                        num_rows: Precision::Exact(row_count),
138                        total_byte_size: Precision::Absent,
139                        column_statistics: column_stat,
140                    })
141                }
142                Err(e) => {
143                    warn!("[remote-table] Failed to fetch exec statistics: {e}");
144                    Err(e)
145                }
146            }
147        } else {
148            debug!(
149                "[remote-table] Query can not be rewritten as count1 query: {}",
150                self.sql
151            );
152            Ok(Statistics::new_unknown(self.schema().as_ref()))
153        }
154    }
155
156    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
157        if self
158            .conn_options
159            .db_type()
160            .support_rewrite_with_filters_limit(&self.sql)
161        {
162            Some(Arc::new(Self {
163                conn_options: self.conn_options.clone(),
164                sql: self.sql.clone(),
165                table_schema: self.table_schema.clone(),
166                remote_schema: self.remote_schema.clone(),
167                projection: self.projection.clone(),
168                unparsed_filters: self.unparsed_filters.clone(),
169                limit,
170                transform: self.transform.clone(),
171                conn: self.conn.clone(),
172                plan_properties: self.plan_properties.clone(),
173            }))
174        } else {
175            None
176        }
177    }
178
179    fn fetch(&self) -> Option<usize> {
180        self.limit
181    }
182}
183
184#[allow(clippy::too_many_arguments)]
185async fn build_and_transform_stream(
186    conn: Arc<dyn Connection>,
187    conn_options: ConnectionOptions,
188    sql: String,
189    table_schema: SchemaRef,
190    remote_schema: Option<RemoteSchemaRef>,
191    projection: Option<Vec<usize>>,
192    unparsed_filters: Vec<String>,
193    limit: Option<usize>,
194    transform: Arc<dyn Transform>,
195) -> DFResult<SendableRecordBatchStream> {
196    let limit = if conn_options
197        .db_type()
198        .support_rewrite_with_filters_limit(&sql)
199    {
200        limit
201    } else {
202        None
203    };
204
205    let stream = conn
206        .query(
207            &conn_options,
208            &sql,
209            table_schema.clone(),
210            projection.as_ref(),
211            unparsed_filters.as_slice(),
212            limit,
213        )
214        .await?;
215
216    Ok(Box::pin(TransformStream::try_new(
217        stream,
218        transform.clone(),
219        table_schema,
220        projection,
221        remote_schema,
222    )?))
223}
224
225impl DisplayAs for RemoteTableExec {
226    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
227        write!(
228            f,
229            "RemoteTableExec: limit={:?}, filters=[{}]",
230            self.limit,
231            self.unparsed_filters.join(", ")
232        )
233    }
234}