1use crate::{
2 Connection, ConnectionOptions, DFResult, RemoteSchemaRef, RemoteSource, Transform,
3 TransformStream, transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::error::DataFusionError;
9use datafusion::execution::{SendableRecordBatchStream, TaskContext};
10use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion::physical_plan::display::ProjectSchemaDisplay;
12use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use datafusion::physical_plan::{
15 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
16};
17use futures::TryStreamExt;
18use log::{debug, warn};
19use std::any::Any;
20use std::sync::Arc;
21
22#[derive(Debug)]
23pub struct RemoteTableScanExec {
24 pub(crate) conn_options: Arc<ConnectionOptions>,
25 pub(crate) source: RemoteSource,
26 pub(crate) table_schema: SchemaRef,
27 pub(crate) remote_schema: Option<RemoteSchemaRef>,
28 pub(crate) projection: Option<Vec<usize>>,
29 pub(crate) unparsed_filters: Vec<String>,
30 pub(crate) limit: Option<usize>,
31 pub(crate) transform: Arc<dyn Transform>,
32 pub(crate) conn: Arc<dyn Connection>,
33 plan_properties: PlanProperties,
34}
35
36impl RemoteTableScanExec {
37 #[allow(clippy::too_many_arguments)]
38 pub fn try_new(
39 conn_options: Arc<ConnectionOptions>,
40 source: RemoteSource,
41 table_schema: SchemaRef,
42 remote_schema: Option<RemoteSchemaRef>,
43 projection: Option<Vec<usize>>,
44 unparsed_filters: Vec<String>,
45 limit: Option<usize>,
46 transform: Arc<dyn Transform>,
47 conn: Arc<dyn Connection>,
48 ) -> DFResult<Self> {
49 let transformed_table_schema = transform_schema(
50 table_schema.clone(),
51 transform.as_ref(),
52 remote_schema.as_ref(),
53 )?;
54 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
55 let plan_properties = PlanProperties::new(
56 EquivalenceProperties::new(projected_schema),
57 Partitioning::UnknownPartitioning(1),
58 EmissionType::Incremental,
59 Boundedness::Bounded,
60 );
61 Ok(Self {
62 conn_options,
63 source,
64 table_schema,
65 remote_schema,
66 projection,
67 unparsed_filters,
68 limit,
69 transform,
70 conn,
71 plan_properties,
72 })
73 }
74}
75
76impl ExecutionPlan for RemoteTableScanExec {
77 fn name(&self) -> &str {
78 "RemoteTableExec"
79 }
80
81 fn as_any(&self) -> &dyn Any {
82 self
83 }
84
85 fn properties(&self) -> &PlanProperties {
86 &self.plan_properties
87 }
88
89 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
90 vec![]
91 }
92
93 fn with_new_children(
94 self: Arc<Self>,
95 _children: Vec<Arc<dyn ExecutionPlan>>,
96 ) -> DFResult<Arc<dyn ExecutionPlan>> {
97 Ok(self)
98 }
99
100 fn execute(
101 &self,
102 partition: usize,
103 _context: Arc<TaskContext>,
104 ) -> DFResult<SendableRecordBatchStream> {
105 assert_eq!(partition, 0);
106 let schema = self.schema();
107 let fut = build_and_transform_stream(
108 self.conn.clone(),
109 self.conn_options.clone(),
110 self.source.clone(),
111 self.table_schema.clone(),
112 self.remote_schema.clone(),
113 self.projection.clone(),
114 self.unparsed_filters.clone(),
115 self.limit,
116 self.transform.clone(),
117 );
118 let stream = futures::stream::once(fut).try_flatten();
119 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
120 }
121
122 fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
123 if let Some(partition) = partition
124 && partition != 0
125 {
126 return Err(DataFusionError::Plan(format!(
127 "Invalid partition index: {partition}"
128 )));
129 }
130 let db_type = self.conn_options.db_type();
131 let limit = if db_type.support_rewrite_with_filters_limit(&self.source) {
132 self.limit
133 } else {
134 None
135 };
136 let real_sql = db_type.rewrite_query(&self.source, &self.unparsed_filters, limit);
137
138 if let Some(count1_query) = db_type.try_count1_query(&RemoteSource::Query(real_sql)) {
139 let conn = self.conn.clone();
140 let conn_options = self.conn_options.clone();
141 let row_count_result = tokio::task::block_in_place(|| {
142 tokio::runtime::Handle::current().block_on(async {
143 db_type
144 .fetch_count(conn, &conn_options, &count1_query)
145 .await
146 })
147 });
148
149 match row_count_result {
150 Ok(row_count) => {
151 let column_stat = Statistics::unknown_column(self.schema().as_ref());
152 Ok(Statistics {
153 num_rows: Precision::Exact(row_count),
154 total_byte_size: Precision::Absent,
155 column_statistics: column_stat,
156 })
157 }
158 Err(e) => {
159 warn!("[remote-table] Failed to fetch exec statistics: {e}");
160 Err(e)
161 }
162 }
163 } else {
164 debug!(
165 "[remote-table] Query can not be rewritten as count1 query: {}",
166 self.source
167 );
168 Ok(Statistics::new_unknown(self.schema().as_ref()))
169 }
170 }
171
172 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
173 let db_type = self.conn_options.db_type();
174 if db_type.support_rewrite_with_filters_limit(&self.source) {
175 Some(Arc::new(Self {
176 conn_options: self.conn_options.clone(),
177 source: self.source.clone(),
178 table_schema: self.table_schema.clone(),
179 remote_schema: self.remote_schema.clone(),
180 projection: self.projection.clone(),
181 unparsed_filters: self.unparsed_filters.clone(),
182 limit,
183 transform: self.transform.clone(),
184 conn: self.conn.clone(),
185 plan_properties: self.plan_properties.clone(),
186 }))
187 } else {
188 None
189 }
190 }
191
192 fn fetch(&self) -> Option<usize> {
193 self.limit
194 }
195}
196
197#[allow(clippy::too_many_arguments)]
198async fn build_and_transform_stream(
199 conn: Arc<dyn Connection>,
200 conn_options: Arc<ConnectionOptions>,
201 source: RemoteSource,
202 table_schema: SchemaRef,
203 remote_schema: Option<RemoteSchemaRef>,
204 projection: Option<Vec<usize>>,
205 unparsed_filters: Vec<String>,
206 limit: Option<usize>,
207 transform: Arc<dyn Transform>,
208) -> DFResult<SendableRecordBatchStream> {
209 let limit = if conn_options
210 .db_type()
211 .support_rewrite_with_filters_limit(&source)
212 {
213 limit
214 } else {
215 None
216 };
217
218 let stream = conn
219 .query(
220 &conn_options,
221 &source,
222 table_schema.clone(),
223 projection.as_ref(),
224 unparsed_filters.as_slice(),
225 limit,
226 )
227 .await?;
228
229 Ok(Box::pin(TransformStream::try_new(
230 stream,
231 transform.clone(),
232 table_schema,
233 projection,
234 remote_schema,
235 )?))
236}
237
238impl DisplayAs for RemoteTableScanExec {
239 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
240 write!(
241 f,
242 "RemoteTableExec: source={}",
243 match &self.source {
244 RemoteSource::Query(_query) => "query".to_string(),
245 RemoteSource::Table(table) => table.join("."),
246 }
247 )?;
248 let projected_schema = self.schema();
249 if has_projection(self.projection.as_ref(), self.table_schema.fields().len()) {
250 write!(
251 f,
252 ", projection={}",
253 ProjectSchemaDisplay(&projected_schema)
254 )?;
255 }
256 if let Some(limit) = self.limit {
257 write!(f, ", limit={limit}")?;
258 }
259 if !self.unparsed_filters.is_empty() {
260 write!(f, ", filters=[{}]", self.unparsed_filters.join(", "))?;
261 }
262 Ok(())
263 }
264}
265
266fn has_projection(projection: Option<&Vec<usize>>, table_columns: usize) -> bool {
267 if let Some(projection) = projection {
268 if projection.len() != table_columns {
269 return true;
270 }
271 for (i, index) in projection.iter().enumerate() {
272 if *index != i {
273 return true;
274 }
275 }
276 }
277 false
278}