datafusion_remote_table/
scan.rs

1use crate::{
2    ConnectionOptions, DFResult, DefaultTransform, LazyPool, RemoteSchemaRef, RemoteSource,
3    Transform, TransformStream, transform_schema,
4};
5use arrow::datatypes::SchemaRef;
6use datafusion_common::DataFusionError;
7use datafusion_common::Statistics;
8use datafusion_common::stats::Precision;
9use datafusion_execution::{SendableRecordBatchStream, TaskContext};
10use datafusion_physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion_physical_plan::display::ProjectSchemaDisplay;
12use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
13use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
14use datafusion_physical_plan::{
15    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
16};
17use futures::TryStreamExt;
18use std::any::Any;
19use std::sync::Arc;
20
21#[derive(Debug)]
22pub struct RemoteTableScanExec {
23    pub(crate) conn_options: Arc<ConnectionOptions>,
24    pub(crate) pool: LazyPool,
25    pub(crate) source: RemoteSource,
26    pub(crate) table_schema: SchemaRef,
27    pub(crate) remote_schema: Option<RemoteSchemaRef>,
28    pub(crate) projection: Option<Vec<usize>>,
29    pub(crate) unparsed_filters: Vec<String>,
30    pub(crate) limit: Option<usize>,
31    pub(crate) transform: Arc<dyn Transform>,
32    pub(crate) row_count: Option<usize>,
33    plan_properties: PlanProperties,
34}
35
36impl RemoteTableScanExec {
37    #[allow(clippy::too_many_arguments)]
38    pub fn try_new(
39        conn_options: Arc<ConnectionOptions>,
40        pool: LazyPool,
41        source: RemoteSource,
42        table_schema: SchemaRef,
43        remote_schema: Option<RemoteSchemaRef>,
44        projection: Option<Vec<usize>>,
45        unparsed_filters: Vec<String>,
46        limit: Option<usize>,
47        transform: Arc<dyn Transform>,
48        row_count: Option<usize>,
49    ) -> DFResult<Self> {
50        let transformed_table_schema = transform_schema(
51            transform.as_ref(),
52            table_schema.clone(),
53            remote_schema.as_ref(),
54            conn_options.db_type(),
55        )?;
56        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
57        let plan_properties = PlanProperties::new(
58            EquivalenceProperties::new(projected_schema),
59            Partitioning::UnknownPartitioning(1),
60            EmissionType::Incremental,
61            Boundedness::Bounded,
62        );
63        Ok(Self {
64            conn_options,
65            pool,
66            source,
67            table_schema,
68            remote_schema,
69            projection,
70            unparsed_filters,
71            limit,
72            transform,
73            row_count,
74            plan_properties,
75        })
76    }
77}
78
79impl ExecutionPlan for RemoteTableScanExec {
80    fn name(&self) -> &str {
81        "RemoteTableExec"
82    }
83
84    fn as_any(&self) -> &dyn Any {
85        self
86    }
87
88    fn properties(&self) -> &PlanProperties {
89        &self.plan_properties
90    }
91
92    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
93        vec![]
94    }
95
96    fn with_new_children(
97        self: Arc<Self>,
98        _children: Vec<Arc<dyn ExecutionPlan>>,
99    ) -> DFResult<Arc<dyn ExecutionPlan>> {
100        Ok(self)
101    }
102
103    fn execute(
104        &self,
105        partition: usize,
106        _context: Arc<TaskContext>,
107    ) -> DFResult<SendableRecordBatchStream> {
108        assert_eq!(partition, 0);
109        let schema = self.schema();
110        let fut = build_and_transform_stream(
111            self.pool.clone(),
112            self.conn_options.clone(),
113            self.source.clone(),
114            self.table_schema.clone(),
115            self.remote_schema.clone(),
116            self.projection.clone(),
117            self.unparsed_filters.clone(),
118            self.limit,
119            self.transform.clone(),
120        );
121        let stream = futures::stream::once(fut).try_flatten();
122        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
123    }
124
125    fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
126        if let Some(partition) = partition
127            && partition != 0
128        {
129            return Err(DataFusionError::Plan(format!(
130                "Invalid partition index: {partition}"
131            )));
132        }
133
134        if let Some(count) = self.row_count {
135            let column_stat = Statistics::unknown_column(self.schema().as_ref());
136            let row_count_after_limit = if let Some(limit) = self.limit {
137                std::cmp::min(count, limit)
138            } else {
139                count
140            };
141            Ok(Statistics {
142                num_rows: Precision::Exact(row_count_after_limit),
143                total_byte_size: Precision::Absent,
144                column_statistics: column_stat,
145            })
146        } else {
147            Ok(Statistics::new_unknown(self.schema().as_ref()))
148        }
149    }
150
151    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
152        let db_type = self.conn_options.db_type();
153        if db_type.support_rewrite_with_filters_limit(&self.source) {
154            Some(Arc::new(Self {
155                conn_options: self.conn_options.clone(),
156                source: self.source.clone(),
157                table_schema: self.table_schema.clone(),
158                remote_schema: self.remote_schema.clone(),
159                projection: self.projection.clone(),
160                unparsed_filters: self.unparsed_filters.clone(),
161                limit,
162                transform: self.transform.clone(),
163                pool: self.pool.clone(),
164                row_count: self.row_count,
165                plan_properties: self.plan_properties.clone(),
166            }))
167        } else {
168            None
169        }
170    }
171
172    fn fetch(&self) -> Option<usize> {
173        self.limit
174    }
175}
176
177#[allow(clippy::too_many_arguments)]
178async fn build_and_transform_stream(
179    pool: LazyPool,
180    conn_options: Arc<ConnectionOptions>,
181    source: RemoteSource,
182    table_schema: SchemaRef,
183    remote_schema: Option<RemoteSchemaRef>,
184    projection: Option<Vec<usize>>,
185    unparsed_filters: Vec<String>,
186    limit: Option<usize>,
187    transform: Arc<dyn Transform>,
188) -> DFResult<SendableRecordBatchStream> {
189    let db_type = conn_options.db_type();
190    let limit = if db_type.support_rewrite_with_filters_limit(&source) {
191        limit
192    } else {
193        None
194    };
195
196    let conn = pool.get().await?;
197
198    if transform.as_any().is::<DefaultTransform>() {
199        conn.query(
200            &conn_options,
201            &source,
202            table_schema.clone(),
203            projection.as_ref(),
204            unparsed_filters.as_slice(),
205            limit,
206        )
207        .await
208    } else {
209        let Some(remote_schema) = remote_schema else {
210            return Err(DataFusionError::Execution(
211                "remote_schema is required for non-default transform".to_string(),
212            ));
213        };
214        let stream = conn
215            .query(
216                &conn_options,
217                &source,
218                table_schema.clone(),
219                None,
220                unparsed_filters.as_slice(),
221                limit,
222            )
223            .await?;
224        Ok(Box::pin(TransformStream::try_new(
225            stream,
226            transform.clone(),
227            table_schema,
228            projection,
229            remote_schema,
230            db_type,
231        )?))
232    }
233}
234
235impl DisplayAs for RemoteTableScanExec {
236    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
237        write!(
238            f,
239            "RemoteTableExec: source={}",
240            match &self.source {
241                RemoteSource::Query(_query) => "query".to_string(),
242                RemoteSource::Table(table) => table.join("."),
243            }
244        )?;
245        let projected_schema = self.schema();
246        if has_projection(self.projection.as_ref(), self.table_schema.fields().len()) {
247            write!(
248                f,
249                ", projection={}",
250                ProjectSchemaDisplay(&projected_schema)
251            )?;
252        }
253        if let Some(limit) = self.limit {
254            write!(f, ", limit={limit}")?;
255        }
256        if !self.unparsed_filters.is_empty() {
257            write!(f, ", filters=[{}]", self.unparsed_filters.join(", "))?;
258        }
259        Ok(())
260    }
261}
262
263fn has_projection(projection: Option<&Vec<usize>>, table_columns: usize) -> bool {
264    if let Some(projection) = projection {
265        if projection.len() != table_columns {
266            return true;
267        }
268        for (i, index) in projection.iter().enumerate() {
269            if *index != i {
270                return true;
271            }
272        }
273    }
274    false
275}