1use crate::{
2 Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3 transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
8use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{
11 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
12};
13use futures::TryStreamExt;
14use std::any::Any;
15use std::sync::Arc;
16
17#[derive(Debug)]
18pub struct RemoteTableExec {
19 pub(crate) conn_options: ConnectionOptions,
20 pub(crate) sql: String,
21 pub(crate) table_schema: SchemaRef,
22 pub(crate) remote_schema: Option<RemoteSchemaRef>,
23 pub(crate) projection: Option<Vec<usize>>,
24 pub(crate) unparsed_filters: Vec<String>,
25 pub(crate) limit: Option<usize>,
26 pub(crate) transform: Arc<dyn Transform>,
27 conn: Arc<dyn Connection>,
28 plan_properties: PlanProperties,
29}
30
31impl RemoteTableExec {
32 #[allow(clippy::too_many_arguments)]
33 pub fn try_new(
34 conn_options: ConnectionOptions,
35 sql: String,
36 table_schema: SchemaRef,
37 remote_schema: Option<RemoteSchemaRef>,
38 projection: Option<Vec<usize>>,
39 unparsed_filters: Vec<String>,
40 limit: Option<usize>,
41 transform: Arc<dyn Transform>,
42 conn: Arc<dyn Connection>,
43 ) -> DFResult<Self> {
44 let transformed_table_schema = transform_schema(
45 table_schema.clone(),
46 transform.as_ref(),
47 remote_schema.as_ref(),
48 )?;
49 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
50 let plan_properties = PlanProperties::new(
51 EquivalenceProperties::new(projected_schema),
52 Partitioning::UnknownPartitioning(1),
53 EmissionType::Incremental,
54 Boundedness::Bounded,
55 );
56 Ok(Self {
57 conn_options,
58 sql,
59 table_schema,
60 remote_schema,
61 projection,
62 unparsed_filters,
63 limit,
64 transform,
65 conn,
66 plan_properties,
67 })
68 }
69}
70
71impl ExecutionPlan for RemoteTableExec {
72 fn name(&self) -> &str {
73 "RemoteTableExec"
74 }
75
76 fn as_any(&self) -> &dyn Any {
77 self
78 }
79
80 fn properties(&self) -> &PlanProperties {
81 &self.plan_properties
82 }
83
84 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
85 vec![]
86 }
87
88 fn with_new_children(
89 self: Arc<Self>,
90 _children: Vec<Arc<dyn ExecutionPlan>>,
91 ) -> DFResult<Arc<dyn ExecutionPlan>> {
92 Ok(self)
93 }
94
95 fn execute(
96 &self,
97 partition: usize,
98 _context: Arc<TaskContext>,
99 ) -> DFResult<SendableRecordBatchStream> {
100 assert_eq!(partition, 0);
101 let schema = self.schema();
102 let fut = build_and_transform_stream(
103 self.conn.clone(),
104 self.conn_options.clone(),
105 self.sql.clone(),
106 self.table_schema.clone(),
107 self.remote_schema.clone(),
108 self.projection.clone(),
109 self.unparsed_filters.clone(),
110 self.limit,
111 self.transform.clone(),
112 );
113 let stream = futures::stream::once(fut).try_flatten();
114 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
115 }
116
117 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
118 if self
119 .conn_options
120 .db_type()
121 .support_rewrite_with_filters_limit(&self.sql)
122 {
123 Some(Arc::new(Self {
124 conn_options: self.conn_options.clone(),
125 sql: self.sql.clone(),
126 table_schema: self.table_schema.clone(),
127 remote_schema: self.remote_schema.clone(),
128 projection: self.projection.clone(),
129 unparsed_filters: self.unparsed_filters.clone(),
130 limit,
131 transform: self.transform.clone(),
132 conn: self.conn.clone(),
133 plan_properties: self.plan_properties.clone(),
134 }))
135 } else {
136 None
137 }
138 }
139
140 fn fetch(&self) -> Option<usize> {
141 self.limit
142 }
143}
144
145#[allow(clippy::too_many_arguments)]
146async fn build_and_transform_stream(
147 conn: Arc<dyn Connection>,
148 conn_options: ConnectionOptions,
149 sql: String,
150 table_schema: SchemaRef,
151 remote_schema: Option<RemoteSchemaRef>,
152 projection: Option<Vec<usize>>,
153 unparsed_filters: Vec<String>,
154 limit: Option<usize>,
155 transform: Arc<dyn Transform>,
156) -> DFResult<SendableRecordBatchStream> {
157 let limit = if conn_options
158 .db_type()
159 .support_rewrite_with_filters_limit(&sql)
160 {
161 limit
162 } else {
163 None
164 };
165
166 let stream = conn
167 .query(
168 &conn_options,
169 &sql,
170 table_schema.clone(),
171 projection.as_ref(),
172 unparsed_filters.as_slice(),
173 limit,
174 )
175 .await?;
176
177 Ok(Box::pin(TransformStream::try_new(
178 stream,
179 transform.clone(),
180 table_schema,
181 projection,
182 remote_schema,
183 )?))
184}
185
186impl DisplayAs for RemoteTableExec {
187 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
188 write!(
189 f,
190 "RemoteTableExec: limit={:?}, filters=[{}]",
191 self.limit,
192 self.unparsed_filters.join(", ")
193 )
194 }
195}