datafusion_remote_table/
insert.rs

1use crate::{Connection, ConnectionOptions, DFResult, Literalize, RemoteSchemaRef};
2use datafusion::arrow::array::{ArrayRef, RecordBatch, UInt64Array};
3use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4use datafusion::common::stats::Precision;
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_expr::EquivalenceProperties;
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::physical_plan::{
9    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
10    PlanProperties,
11};
12use futures::StreamExt;
13use std::sync::Arc;
14
15#[derive(Debug)]
16pub struct RemoteTableInsertExec {
17    pub(crate) input: Arc<dyn ExecutionPlan>,
18    pub(crate) conn_options: Arc<ConnectionOptions>,
19    pub(crate) literalizer: Arc<dyn Literalize>,
20    pub(crate) table: Vec<String>,
21    pub(crate) remote_schema: RemoteSchemaRef,
22    pub(crate) conn: Arc<dyn Connection>,
23    plan_properties: PlanProperties,
24}
25
26impl RemoteTableInsertExec {
27    pub fn new(
28        input: Arc<dyn ExecutionPlan>,
29        conn_options: Arc<ConnectionOptions>,
30        literalizer: Arc<dyn Literalize>,
31        table: Vec<String>,
32        remote_schema: RemoteSchemaRef,
33        conn: Arc<dyn Connection>,
34    ) -> Self {
35        // TODO sqlite does not support parallel insert
36        let plan_properties = PlanProperties::new(
37            EquivalenceProperties::new(make_count_schema()),
38            Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
39            input.pipeline_behavior(),
40            input.boundedness(),
41        );
42        Self {
43            input,
44            conn_options,
45            literalizer,
46            table,
47            remote_schema,
48            conn,
49            plan_properties,
50        }
51    }
52}
53
54impl ExecutionPlan for RemoteTableInsertExec {
55    fn name(&self) -> &str {
56        "RemoteTableInsertExec"
57    }
58
59    fn as_any(&self) -> &dyn std::any::Any {
60        self
61    }
62
63    fn properties(&self) -> &PlanProperties {
64        &self.plan_properties
65    }
66
67    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
68        vec![&self.input]
69    }
70
71    fn with_new_children(
72        self: Arc<Self>,
73        children: Vec<Arc<dyn ExecutionPlan>>,
74    ) -> DFResult<Arc<dyn ExecutionPlan>> {
75        let input = children[0].clone();
76        let exec = Self::new(
77            input,
78            self.conn_options.clone(),
79            self.literalizer.clone(),
80            self.table.clone(),
81            self.remote_schema.clone(),
82            self.conn.clone(),
83        );
84        Ok(Arc::new(exec))
85    }
86
87    fn execute(
88        &self,
89        partition: usize,
90        context: Arc<TaskContext>,
91    ) -> DFResult<SendableRecordBatchStream> {
92        let input_stream = self.input.execute(partition, context)?;
93        let conn_options = self.conn_options.clone();
94        let literalizer = self.literalizer.clone();
95        let table = self.table.clone();
96        let remote_schema = self.remote_schema.clone();
97        let conn = self.conn.clone();
98
99        let stream = futures::stream::once(async move {
100            let count = conn
101                .insert(
102                    &conn_options,
103                    literalizer.clone(),
104                    &table,
105                    remote_schema.clone(),
106                    input_stream,
107                )
108                .await?;
109            make_result_batch(count as u64)
110        })
111        .boxed();
112
113        Ok(Box::pin(RecordBatchStreamAdapter::new(
114            make_count_schema(),
115            stream,
116        )))
117    }
118}
119
120impl DisplayAs for RemoteTableInsertExec {
121    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122        write!(f, "RemoteTableInsertExec: table={}", self.table.join("."))?;
123        if let Ok(stats) = self.input.partition_statistics(None) {
124            match stats.num_rows {
125                Precision::Exact(rows) => write!(f, ", rows={rows}")?,
126                Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
127                Precision::Absent => {}
128            }
129        }
130        Ok(())
131    }
132}
133
134fn make_result_batch(count: u64) -> DFResult<RecordBatch> {
135    let schema = make_count_schema();
136    let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
137    let batch = RecordBatch::try_new(schema, vec![array])?;
138    Ok(batch)
139}
140
141fn make_count_schema() -> SchemaRef {
142    Arc::new(Schema::new(vec![Field::new(
143        "count",
144        DataType::UInt64,
145        false,
146    )]))
147}