1use crate::{
2 Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3 transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
10use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
11use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
12use datafusion::physical_plan::{
13 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
14};
15use futures::TryStreamExt;
16use log::{debug, warn};
17use std::any::Any;
18use std::sync::Arc;
19
20#[derive(Debug)]
21pub struct RemoteTableExec {
22 pub(crate) conn_options: ConnectionOptions,
23 pub(crate) sql: String,
24 pub(crate) table_schema: SchemaRef,
25 pub(crate) remote_schema: Option<RemoteSchemaRef>,
26 pub(crate) projection: Option<Vec<usize>>,
27 pub(crate) unparsed_filters: Vec<String>,
28 pub(crate) limit: Option<usize>,
29 pub(crate) transform: Arc<dyn Transform>,
30 pub(crate) conn: Arc<dyn Connection>,
31 plan_properties: PlanProperties,
32}
33
34impl RemoteTableExec {
35 #[allow(clippy::too_many_arguments)]
36 pub fn try_new(
37 conn_options: ConnectionOptions,
38 sql: String,
39 table_schema: SchemaRef,
40 remote_schema: Option<RemoteSchemaRef>,
41 projection: Option<Vec<usize>>,
42 unparsed_filters: Vec<String>,
43 limit: Option<usize>,
44 transform: Arc<dyn Transform>,
45 conn: Arc<dyn Connection>,
46 ) -> DFResult<Self> {
47 let transformed_table_schema = transform_schema(
48 table_schema.clone(),
49 transform.as_ref(),
50 remote_schema.as_ref(),
51 )?;
52 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
53 let plan_properties = PlanProperties::new(
54 EquivalenceProperties::new(projected_schema),
55 Partitioning::UnknownPartitioning(1),
56 EmissionType::Incremental,
57 Boundedness::Bounded,
58 );
59 Ok(Self {
60 conn_options,
61 sql,
62 table_schema,
63 remote_schema,
64 projection,
65 unparsed_filters,
66 limit,
67 transform,
68 conn,
69 plan_properties,
70 })
71 }
72}
73
74impl ExecutionPlan for RemoteTableExec {
75 fn name(&self) -> &str {
76 "RemoteTableExec"
77 }
78
79 fn as_any(&self) -> &dyn Any {
80 self
81 }
82
83 fn properties(&self) -> &PlanProperties {
84 &self.plan_properties
85 }
86
87 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88 vec![]
89 }
90
91 fn with_new_children(
92 self: Arc<Self>,
93 _children: Vec<Arc<dyn ExecutionPlan>>,
94 ) -> DFResult<Arc<dyn ExecutionPlan>> {
95 Ok(self)
96 }
97
98 fn execute(
99 &self,
100 partition: usize,
101 _context: Arc<TaskContext>,
102 ) -> DFResult<SendableRecordBatchStream> {
103 assert_eq!(partition, 0);
104 let schema = self.schema();
105 let fut = build_and_transform_stream(
106 self.conn.clone(),
107 self.conn_options.clone(),
108 self.sql.clone(),
109 self.table_schema.clone(),
110 self.remote_schema.clone(),
111 self.projection.clone(),
112 self.unparsed_filters.clone(),
113 self.limit,
114 self.transform.clone(),
115 );
116 let stream = futures::stream::once(fut).try_flatten();
117 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
118 }
119
120 fn statistics(&self) -> DFResult<Statistics> {
121 let db_type = self.conn_options.db_type();
122 let limit = if db_type.support_rewrite_with_filters_limit(&self.sql) {
123 self.limit
124 } else {
125 None
126 };
127 let real_sql = db_type.rewrite_query(&self.sql, &self.unparsed_filters, limit);
128
129 if let Some(count1_query) = db_type.try_count1_query(&real_sql) {
130 let conn = self.conn.clone();
131 let conn_options = self.conn_options.clone();
132 let row_count_result = tokio::task::block_in_place(|| {
133 tokio::runtime::Handle::current().block_on(async {
134 db_type
135 .fetch_count(conn, &conn_options, &count1_query)
136 .await
137 })
138 });
139
140 match row_count_result {
141 Ok(row_count) => {
142 let column_stat = Statistics::unknown_column(self.schema().as_ref());
143 Ok(Statistics {
144 num_rows: Precision::Exact(row_count),
145 total_byte_size: Precision::Absent,
146 column_statistics: column_stat,
147 })
148 }
149 Err(e) => {
150 warn!("[remote-table] Failed to fetch exec statistics: {e}");
151 Err(e)
152 }
153 }
154 } else {
155 debug!(
156 "[remote-table] Query can not be rewritten as count1 query: {}",
157 self.sql
158 );
159 Ok(Statistics::new_unknown(self.schema().as_ref()))
160 }
161 }
162
163 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
164 if self
165 .conn_options
166 .db_type()
167 .support_rewrite_with_filters_limit(&self.sql)
168 {
169 Some(Arc::new(Self {
170 conn_options: self.conn_options.clone(),
171 sql: self.sql.clone(),
172 table_schema: self.table_schema.clone(),
173 remote_schema: self.remote_schema.clone(),
174 projection: self.projection.clone(),
175 unparsed_filters: self.unparsed_filters.clone(),
176 limit,
177 transform: self.transform.clone(),
178 conn: self.conn.clone(),
179 plan_properties: self.plan_properties.clone(),
180 }))
181 } else {
182 None
183 }
184 }
185
186 fn fetch(&self) -> Option<usize> {
187 self.limit
188 }
189}
190
191#[allow(clippy::too_many_arguments)]
192async fn build_and_transform_stream(
193 conn: Arc<dyn Connection>,
194 conn_options: ConnectionOptions,
195 sql: String,
196 table_schema: SchemaRef,
197 remote_schema: Option<RemoteSchemaRef>,
198 projection: Option<Vec<usize>>,
199 unparsed_filters: Vec<String>,
200 limit: Option<usize>,
201 transform: Arc<dyn Transform>,
202) -> DFResult<SendableRecordBatchStream> {
203 let limit = if conn_options
204 .db_type()
205 .support_rewrite_with_filters_limit(&sql)
206 {
207 limit
208 } else {
209 None
210 };
211
212 let stream = conn
213 .query(
214 &conn_options,
215 &sql,
216 table_schema.clone(),
217 projection.as_ref(),
218 unparsed_filters.as_slice(),
219 limit,
220 )
221 .await?;
222
223 Ok(Box::pin(TransformStream::try_new(
224 stream,
225 transform.clone(),
226 table_schema,
227 projection,
228 remote_schema,
229 )?))
230}
231
232impl DisplayAs for RemoteTableExec {
233 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
234 write!(
235 f,
236 "RemoteTableExec: limit={:?}, filters=[{}]",
237 self.limit,
238 self.unparsed_filters.join(", ")
239 )
240 }
241}