datafusion_remote_table/
exec.rs1use crate::{Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::execution::{SendableRecordBatchStream, TaskContext};
4use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
5use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
6use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7use datafusion::physical_plan::{
8 project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
9};
10use futures::TryStreamExt;
11use std::any::Any;
12use std::sync::Arc;
13
14#[derive(Debug)]
15pub struct RemoteTableExec {
16 pub(crate) conn_options: ConnectionOptions,
17 pub(crate) sql: String,
18 pub(crate) table_schema: SchemaRef,
19 pub(crate) remote_schema: Option<RemoteSchemaRef>,
20 pub(crate) projection: Option<Vec<usize>>,
21 pub(crate) transform: Option<Arc<dyn Transform>>,
22 conn: Arc<dyn Connection>,
23 plan_properties: PlanProperties,
24}
25
26impl RemoteTableExec {
27 pub fn try_new(
28 conn_options: ConnectionOptions,
29 sql: String,
30 table_schema: SchemaRef,
31 remote_schema: Option<RemoteSchemaRef>,
32 projection: Option<Vec<usize>>,
33 transform: Option<Arc<dyn Transform>>,
34 conn: Arc<dyn Connection>,
35 ) -> DFResult<Self> {
36 let projected_schema = project_schema(&table_schema, projection.as_ref())?;
37 let plan_properties = PlanProperties::new(
38 EquivalenceProperties::new(projected_schema),
39 Partitioning::UnknownPartitioning(1),
40 EmissionType::Incremental,
41 Boundedness::Bounded,
42 );
43 Ok(Self {
44 conn_options,
45 sql,
46 table_schema,
47 remote_schema,
48 projection,
49 transform,
50 conn,
51 plan_properties,
52 })
53 }
54}
55
56impl ExecutionPlan for RemoteTableExec {
57 fn name(&self) -> &str {
58 "RemoteTableExec"
59 }
60
61 fn as_any(&self) -> &dyn Any {
62 self
63 }
64
65 fn properties(&self) -> &PlanProperties {
66 &self.plan_properties
67 }
68
69 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
70 vec![]
71 }
72
73 fn with_new_children(
74 self: Arc<Self>,
75 _children: Vec<Arc<dyn ExecutionPlan>>,
76 ) -> DFResult<Arc<dyn ExecutionPlan>> {
77 Ok(self)
78 }
79
80 fn execute(
81 &self,
82 partition: usize,
83 _context: Arc<TaskContext>,
84 ) -> DFResult<SendableRecordBatchStream> {
85 assert_eq!(partition, 0);
86 let schema = self.schema();
87 let fut = build_and_transform_stream(
88 self.conn.clone(),
89 self.conn_options.clone(),
90 self.sql.clone(),
91 self.table_schema.clone(),
92 self.remote_schema.clone(),
93 self.projection.clone(),
94 self.transform.clone(),
95 );
96 let stream = futures::stream::once(fut).try_flatten();
97 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
98 }
99}
100
101async fn build_and_transform_stream(
102 conn: Arc<dyn Connection>,
103 conn_options: ConnectionOptions,
104 sql: String,
105 table_schema: SchemaRef,
106 remote_schema: Option<RemoteSchemaRef>,
107 projection: Option<Vec<usize>>,
108 transform: Option<Arc<dyn Transform>>,
109) -> DFResult<SendableRecordBatchStream> {
110 let stream = conn
111 .query(
112 &conn_options,
113 &sql,
114 table_schema.clone(),
115 projection.as_ref(),
116 )
117 .await?;
118 if let Some(transform) = transform.as_ref() {
119 Ok(Box::pin(TransformStream::try_new(
120 stream,
121 transform.clone(),
122 table_schema,
123 projection,
124 remote_schema,
125 )?))
126 } else {
127 Ok(stream)
128 }
129}
130
131impl DisplayAs for RemoteTableExec {
132 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
133 write!(f, "RemoteTableExec")
134 }
135}