1use crate::{
2 Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3 transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
8use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{
11 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
12};
13use futures::TryStreamExt;
14use std::any::Any;
15use std::sync::Arc;
16
17#[derive(Debug)]
18pub struct RemoteTableExec {
19 pub(crate) conn_options: ConnectionOptions,
20 pub(crate) sql: String,
21 pub(crate) table_schema: SchemaRef,
22 pub(crate) remote_schema: Option<RemoteSchemaRef>,
23 pub(crate) projection: Option<Vec<usize>>,
24 pub(crate) limit: Option<usize>,
25 pub(crate) transform: Option<Arc<dyn Transform>>,
26 conn: Arc<dyn Connection>,
27 plan_properties: PlanProperties,
28}
29
30impl RemoteTableExec {
31 #[allow(clippy::too_many_arguments)]
32 pub fn try_new(
33 conn_options: ConnectionOptions,
34 sql: String,
35 table_schema: SchemaRef,
36 remote_schema: Option<RemoteSchemaRef>,
37 projection: Option<Vec<usize>>,
38 limit: Option<usize>,
39 transform: Option<Arc<dyn Transform>>,
40 conn: Arc<dyn Connection>,
41 ) -> DFResult<Self> {
42 let transformed_table_schema = transform_schema(
43 table_schema.clone(),
44 transform.as_ref(),
45 remote_schema.as_ref(),
46 )?;
47 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
48 let plan_properties = PlanProperties::new(
49 EquivalenceProperties::new(projected_schema),
50 Partitioning::UnknownPartitioning(1),
51 EmissionType::Incremental,
52 Boundedness::Bounded,
53 );
54 Ok(Self {
55 conn_options,
56 sql,
57 table_schema,
58 remote_schema,
59 projection,
60 limit,
61 transform,
62 conn,
63 plan_properties,
64 })
65 }
66}
67
68impl ExecutionPlan for RemoteTableExec {
69 fn name(&self) -> &str {
70 "RemoteTableExec"
71 }
72
73 fn as_any(&self) -> &dyn Any {
74 self
75 }
76
77 fn properties(&self) -> &PlanProperties {
78 &self.plan_properties
79 }
80
81 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
82 vec![]
83 }
84
85 fn with_new_children(
86 self: Arc<Self>,
87 _children: Vec<Arc<dyn ExecutionPlan>>,
88 ) -> DFResult<Arc<dyn ExecutionPlan>> {
89 Ok(self)
90 }
91
92 fn execute(
93 &self,
94 partition: usize,
95 _context: Arc<TaskContext>,
96 ) -> DFResult<SendableRecordBatchStream> {
97 assert_eq!(partition, 0);
98 let schema = self.schema();
99 let fut = build_and_transform_stream(
100 self.conn.clone(),
101 self.conn_options.clone(),
102 self.sql.clone(),
103 self.table_schema.clone(),
104 self.remote_schema.clone(),
105 self.projection.clone(),
106 self.limit,
107 self.transform.clone(),
108 );
109 let stream = futures::stream::once(fut).try_flatten();
110 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
111 }
112
113 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
114 Some(Arc::new(Self {
115 conn_options: self.conn_options.clone(),
116 sql: self.sql.clone(),
117 table_schema: self.table_schema.clone(),
118 remote_schema: self.remote_schema.clone(),
119 projection: self.projection.clone(),
120 limit,
121 transform: self.transform.clone(),
122 conn: self.conn.clone(),
123 plan_properties: self.plan_properties.clone(),
124 }))
125 }
126
127 fn fetch(&self) -> Option<usize> {
128 self.limit
129 }
130}
131
132#[allow(clippy::too_many_arguments)]
133async fn build_and_transform_stream(
134 conn: Arc<dyn Connection>,
135 conn_options: ConnectionOptions,
136 sql: String,
137 table_schema: SchemaRef,
138 remote_schema: Option<RemoteSchemaRef>,
139 projection: Option<Vec<usize>>,
140 limit: Option<usize>,
141 transform: Option<Arc<dyn Transform>>,
142) -> DFResult<SendableRecordBatchStream> {
143 let stream = conn
144 .query(
145 &conn_options,
146 &sql,
147 table_schema.clone(),
148 projection.as_ref(),
149 limit,
150 )
151 .await?;
152 if let Some(transform) = transform.as_ref() {
153 Ok(Box::pin(TransformStream::try_new(
154 stream,
155 transform.clone(),
156 table_schema,
157 projection,
158 remote_schema,
159 )?))
160 } else {
161 Ok(stream)
162 }
163}
164
165impl DisplayAs for RemoteTableExec {
166 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
167 write!(f, "RemoteTableExec: limit={:?}", self.limit)
168 }
169}