1use crate::{
2 Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3 transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::error::DataFusionError;
9use datafusion::execution::{SendableRecordBatchStream, TaskContext};
10use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
12use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
13use datafusion::physical_plan::{
14 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
15};
16use futures::TryStreamExt;
17use log::{debug, warn};
18use std::any::Any;
19use std::sync::Arc;
20
21#[derive(Debug)]
22pub struct RemoteTableExec {
23 pub(crate) conn_options: ConnectionOptions,
24 pub(crate) sql: String,
25 pub(crate) table_schema: SchemaRef,
26 pub(crate) remote_schema: Option<RemoteSchemaRef>,
27 pub(crate) projection: Option<Vec<usize>>,
28 pub(crate) unparsed_filters: Vec<String>,
29 pub(crate) limit: Option<usize>,
30 pub(crate) transform: Arc<dyn Transform>,
31 pub(crate) conn: Arc<dyn Connection>,
32 plan_properties: PlanProperties,
33}
34
35impl RemoteTableExec {
36 #[allow(clippy::too_many_arguments)]
37 pub fn try_new(
38 conn_options: ConnectionOptions,
39 sql: String,
40 table_schema: SchemaRef,
41 remote_schema: Option<RemoteSchemaRef>,
42 projection: Option<Vec<usize>>,
43 unparsed_filters: Vec<String>,
44 limit: Option<usize>,
45 transform: Arc<dyn Transform>,
46 conn: Arc<dyn Connection>,
47 ) -> DFResult<Self> {
48 let transformed_table_schema = transform_schema(
49 table_schema.clone(),
50 transform.as_ref(),
51 remote_schema.as_ref(),
52 )?;
53 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
54 let plan_properties = PlanProperties::new(
55 EquivalenceProperties::new(projected_schema),
56 Partitioning::UnknownPartitioning(1),
57 EmissionType::Incremental,
58 Boundedness::Bounded,
59 );
60 Ok(Self {
61 conn_options,
62 sql,
63 table_schema,
64 remote_schema,
65 projection,
66 unparsed_filters,
67 limit,
68 transform,
69 conn,
70 plan_properties,
71 })
72 }
73}
74
75impl ExecutionPlan for RemoteTableExec {
76 fn name(&self) -> &str {
77 "RemoteTableExec"
78 }
79
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn properties(&self) -> &PlanProperties {
85 &self.plan_properties
86 }
87
88 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
89 vec![]
90 }
91
92 fn with_new_children(
93 self: Arc<Self>,
94 _children: Vec<Arc<dyn ExecutionPlan>>,
95 ) -> DFResult<Arc<dyn ExecutionPlan>> {
96 Ok(self)
97 }
98
99 fn execute(
100 &self,
101 partition: usize,
102 _context: Arc<TaskContext>,
103 ) -> DFResult<SendableRecordBatchStream> {
104 assert_eq!(partition, 0);
105 let schema = self.schema();
106 let fut = build_and_transform_stream(
107 self.conn.clone(),
108 self.conn_options.clone(),
109 self.sql.clone(),
110 self.table_schema.clone(),
111 self.remote_schema.clone(),
112 self.projection.clone(),
113 self.unparsed_filters.clone(),
114 self.limit,
115 self.transform.clone(),
116 );
117 let stream = futures::stream::once(fut).try_flatten();
118 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
119 }
120
121 fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
122 if let Some(partition) = partition {
123 if partition != 0 {
124 return Err(DataFusionError::Plan(format!(
125 "Invalid partition index: {partition}"
126 )));
127 }
128 }
129 let db_type = self.conn_options.db_type();
130 let limit = if db_type.support_rewrite_with_filters_limit(&self.sql) {
131 self.limit
132 } else {
133 None
134 };
135 let real_sql = db_type.rewrite_query(&self.sql, &self.unparsed_filters, limit);
136
137 if let Some(count1_query) = db_type.try_count1_query(&real_sql) {
138 let conn = self.conn.clone();
139 let conn_options = self.conn_options.clone();
140 let row_count_result = tokio::task::block_in_place(|| {
141 tokio::runtime::Handle::current().block_on(async {
142 db_type
143 .fetch_count(conn, &conn_options, &count1_query)
144 .await
145 })
146 });
147
148 match row_count_result {
149 Ok(row_count) => {
150 let column_stat = Statistics::unknown_column(self.schema().as_ref());
151 Ok(Statistics {
152 num_rows: Precision::Exact(row_count),
153 total_byte_size: Precision::Absent,
154 column_statistics: column_stat,
155 })
156 }
157 Err(e) => {
158 warn!("[remote-table] Failed to fetch exec statistics: {e}");
159 Err(e)
160 }
161 }
162 } else {
163 debug!(
164 "[remote-table] Query can not be rewritten as count1 query: {}",
165 self.sql
166 );
167 Ok(Statistics::new_unknown(self.schema().as_ref()))
168 }
169 }
170
171 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
172 if self
173 .conn_options
174 .db_type()
175 .support_rewrite_with_filters_limit(&self.sql)
176 {
177 Some(Arc::new(Self {
178 conn_options: self.conn_options.clone(),
179 sql: self.sql.clone(),
180 table_schema: self.table_schema.clone(),
181 remote_schema: self.remote_schema.clone(),
182 projection: self.projection.clone(),
183 unparsed_filters: self.unparsed_filters.clone(),
184 limit,
185 transform: self.transform.clone(),
186 conn: self.conn.clone(),
187 plan_properties: self.plan_properties.clone(),
188 }))
189 } else {
190 None
191 }
192 }
193
194 fn fetch(&self) -> Option<usize> {
195 self.limit
196 }
197}
198
199#[allow(clippy::too_many_arguments)]
200async fn build_and_transform_stream(
201 conn: Arc<dyn Connection>,
202 conn_options: ConnectionOptions,
203 sql: String,
204 table_schema: SchemaRef,
205 remote_schema: Option<RemoteSchemaRef>,
206 projection: Option<Vec<usize>>,
207 unparsed_filters: Vec<String>,
208 limit: Option<usize>,
209 transform: Arc<dyn Transform>,
210) -> DFResult<SendableRecordBatchStream> {
211 let limit = if conn_options
212 .db_type()
213 .support_rewrite_with_filters_limit(&sql)
214 {
215 limit
216 } else {
217 None
218 };
219
220 let stream = conn
221 .query(
222 &conn_options,
223 &sql,
224 table_schema.clone(),
225 projection.as_ref(),
226 unparsed_filters.as_slice(),
227 limit,
228 )
229 .await?;
230
231 Ok(Box::pin(TransformStream::try_new(
232 stream,
233 transform.clone(),
234 table_schema,
235 projection,
236 remote_schema,
237 )?))
238}
239
240impl DisplayAs for RemoteTableExec {
241 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
242 write!(
243 f,
244 "RemoteTableExec: limit={:?}, filters=[{}]",
245 self.limit,
246 self.unparsed_filters.join(", ")
247 )
248 }
249}