1use crate::{
2 ConnectionOptions, DFResult, DefaultTransform, LazyPool, RemoteSchemaRef, RemoteSource,
3 Transform, TransformStream, transform_schema,
4};
5use arrow::datatypes::SchemaRef;
6use datafusion_common::DataFusionError;
7use datafusion_common::Statistics;
8use datafusion_common::stats::Precision;
9use datafusion_execution::{SendableRecordBatchStream, TaskContext};
10use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion_physical_plan::display::ProjectSchemaDisplay;
12use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
13use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
14use datafusion_physical_plan::{
15 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
16};
17use futures::TryStreamExt;
18use std::any::Any;
19use std::sync::Arc;
20
21#[derive(Debug)]
22pub struct RemoteTableScanExec {
23 pub(crate) conn_options: Arc<ConnectionOptions>,
24 pub(crate) pool: LazyPool,
25 pub(crate) source: RemoteSource,
26 pub(crate) table_schema: SchemaRef,
27 pub(crate) remote_schema: Option<RemoteSchemaRef>,
28 pub(crate) projection: Option<Vec<usize>>,
29 pub(crate) unparsed_filters: Vec<String>,
30 pub(crate) limit: Option<usize>,
31 pub(crate) transform: Arc<dyn Transform>,
32 pub(crate) row_count: Option<usize>,
33 plan_properties: PlanProperties,
34}
35
36impl RemoteTableScanExec {
37 #[allow(clippy::too_many_arguments)]
38 pub fn try_new(
39 conn_options: Arc<ConnectionOptions>,
40 pool: LazyPool,
41 source: RemoteSource,
42 table_schema: SchemaRef,
43 remote_schema: Option<RemoteSchemaRef>,
44 projection: Option<Vec<usize>>,
45 unparsed_filters: Vec<String>,
46 limit: Option<usize>,
47 transform: Arc<dyn Transform>,
48 row_count: Option<usize>,
49 ) -> DFResult<Self> {
50 let transformed_table_schema = transform_schema(
51 transform.as_ref(),
52 table_schema.clone(),
53 remote_schema.as_ref(),
54 conn_options.db_type(),
55 )?;
56 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
57 let plan_properties = PlanProperties::new(
58 EquivalenceProperties::new(projected_schema),
59 Partitioning::UnknownPartitioning(1),
60 EmissionType::Incremental,
61 Boundedness::Bounded,
62 );
63 Ok(Self {
64 conn_options,
65 pool,
66 source,
67 table_schema,
68 remote_schema,
69 projection,
70 unparsed_filters,
71 limit,
72 transform,
73 row_count,
74 plan_properties,
75 })
76 }
77}
78
79impl ExecutionPlan for RemoteTableScanExec {
80 fn name(&self) -> &str {
81 "RemoteTableExec"
82 }
83
84 fn as_any(&self) -> &dyn Any {
85 self
86 }
87
88 fn properties(&self) -> &PlanProperties {
89 &self.plan_properties
90 }
91
92 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
93 vec![]
94 }
95
96 fn with_new_children(
97 self: Arc<Self>,
98 _children: Vec<Arc<dyn ExecutionPlan>>,
99 ) -> DFResult<Arc<dyn ExecutionPlan>> {
100 Ok(self)
101 }
102
103 fn execute(
104 &self,
105 partition: usize,
106 _context: Arc<TaskContext>,
107 ) -> DFResult<SendableRecordBatchStream> {
108 assert_eq!(partition, 0);
109 let schema = self.schema();
110 let fut = build_and_transform_stream(
111 self.pool.clone(),
112 self.conn_options.clone(),
113 self.source.clone(),
114 self.table_schema.clone(),
115 self.remote_schema.clone(),
116 self.projection.clone(),
117 self.unparsed_filters.clone(),
118 self.limit,
119 self.transform.clone(),
120 );
121 let stream = futures::stream::once(fut).try_flatten();
122 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
123 }
124
125 fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
126 if let Some(partition) = partition
127 && partition != 0
128 {
129 return Err(DataFusionError::Plan(format!(
130 "Invalid partition index: {partition}"
131 )));
132 }
133
134 if let Some(count) = self.row_count {
135 let column_stat = Statistics::unknown_column(self.schema().as_ref());
136 let row_count_after_limit = if let Some(limit) = self.limit {
137 std::cmp::min(count, limit)
138 } else {
139 count
140 };
141 Ok(Statistics {
142 num_rows: Precision::Exact(row_count_after_limit),
143 total_byte_size: Precision::Absent,
144 column_statistics: column_stat,
145 })
146 } else {
147 Ok(Statistics::new_unknown(self.schema().as_ref()))
148 }
149 }
150
151 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
152 let db_type = self.conn_options.db_type();
153 if db_type.support_rewrite_with_filters_limit(&self.source) {
154 Some(Arc::new(Self {
155 conn_options: self.conn_options.clone(),
156 source: self.source.clone(),
157 table_schema: self.table_schema.clone(),
158 remote_schema: self.remote_schema.clone(),
159 projection: self.projection.clone(),
160 unparsed_filters: self.unparsed_filters.clone(),
161 limit,
162 transform: self.transform.clone(),
163 pool: self.pool.clone(),
164 row_count: self.row_count,
165 plan_properties: self.plan_properties.clone(),
166 }))
167 } else {
168 None
169 }
170 }
171
172 fn fetch(&self) -> Option<usize> {
173 self.limit
174 }
175}
176
177#[allow(clippy::too_many_arguments)]
178async fn build_and_transform_stream(
179 pool: LazyPool,
180 conn_options: Arc<ConnectionOptions>,
181 source: RemoteSource,
182 table_schema: SchemaRef,
183 remote_schema: Option<RemoteSchemaRef>,
184 projection: Option<Vec<usize>>,
185 unparsed_filters: Vec<String>,
186 limit: Option<usize>,
187 transform: Arc<dyn Transform>,
188) -> DFResult<SendableRecordBatchStream> {
189 let db_type = conn_options.db_type();
190 let limit = if db_type.support_rewrite_with_filters_limit(&source) {
191 limit
192 } else {
193 None
194 };
195
196 let conn = pool.get().await?;
197
198 if transform.as_any().is::<DefaultTransform>() {
199 conn.query(
200 &conn_options,
201 &source,
202 table_schema.clone(),
203 projection.as_ref(),
204 unparsed_filters.as_slice(),
205 limit,
206 )
207 .await
208 } else {
209 let Some(remote_schema) = remote_schema else {
210 return Err(DataFusionError::Execution(
211 "remote_schema is required for non-default transform".to_string(),
212 ));
213 };
214 let stream = conn
215 .query(
216 &conn_options,
217 &source,
218 table_schema.clone(),
219 None,
220 unparsed_filters.as_slice(),
221 limit,
222 )
223 .await?;
224 Ok(Box::pin(TransformStream::try_new(
225 stream,
226 transform.clone(),
227 table_schema,
228 projection,
229 remote_schema,
230 db_type,
231 )?))
232 }
233}
234
235impl DisplayAs for RemoteTableScanExec {
236 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
237 write!(
238 f,
239 "RemoteTableExec: source={}",
240 match &self.source {
241 RemoteSource::Query(_query) => "query".to_string(),
242 RemoteSource::Table(table) => table.join("."),
243 }
244 )?;
245 let projected_schema = self.schema();
246 if has_projection(self.projection.as_ref(), self.table_schema.fields().len()) {
247 write!(
248 f,
249 ", projection={}",
250 ProjectSchemaDisplay(&projected_schema)
251 )?;
252 }
253 if let Some(limit) = self.limit {
254 write!(f, ", limit={limit}")?;
255 }
256 if !self.unparsed_filters.is_empty() {
257 write!(f, ", filters=[{}]", self.unparsed_filters.join(", "))?;
258 }
259 Ok(())
260 }
261}
262
263fn has_projection(projection: Option<&Vec<usize>>, table_columns: usize) -> bool {
264 if let Some(projection) = projection {
265 if projection.len() != table_columns {
266 return true;
267 }
268 for (i, index) in projection.iter().enumerate() {
269 if *index != i {
270 return true;
271 }
272 }
273 }
274 false
275}