datafusion_remote_table/
insert.rs

1use crate::{ConnectionOptions, DFResult, LazyPool, Literalize, RemoteSchemaRef};
2use arrow::array::{ArrayRef, Int64Array, RecordBatch};
3use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4use datafusion_common::stats::Precision;
5use datafusion_execution::{SendableRecordBatchStream, TaskContext};
6use datafusion_physical_expr::EquivalenceProperties;
7use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion_physical_plan::{
9    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
10    PlanProperties,
11};
12use futures::StreamExt;
13use std::sync::Arc;
14
15#[derive(Debug)]
16pub struct RemoteTableInsertExec {
17    pub(crate) input: Arc<dyn ExecutionPlan>,
18    pub(crate) conn_options: Arc<ConnectionOptions>,
19    pub(crate) pool: LazyPool,
20    pub(crate) literalizer: Arc<dyn Literalize>,
21    pub(crate) table: Vec<String>,
22    pub(crate) remote_schema: RemoteSchemaRef,
23    plan_properties: PlanProperties,
24}
25
26impl RemoteTableInsertExec {
27    pub fn new(
28        input: Arc<dyn ExecutionPlan>,
29        conn_options: Arc<ConnectionOptions>,
30        pool: LazyPool,
31        literalizer: Arc<dyn Literalize>,
32        table: Vec<String>,
33        remote_schema: RemoteSchemaRef,
34    ) -> Self {
35        // TODO sqlite does not support parallel insert
36        let plan_properties = PlanProperties::new(
37            EquivalenceProperties::new(make_count_schema()),
38            Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
39            input.pipeline_behavior(),
40            input.boundedness(),
41        );
42        Self {
43            input,
44            conn_options,
45            pool,
46            literalizer,
47            table,
48            remote_schema,
49            plan_properties,
50        }
51    }
52}
53
54impl ExecutionPlan for RemoteTableInsertExec {
55    fn name(&self) -> &str {
56        "RemoteTableInsertExec"
57    }
58
59    fn as_any(&self) -> &dyn std::any::Any {
60        self
61    }
62
63    fn properties(&self) -> &PlanProperties {
64        &self.plan_properties
65    }
66
67    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
68        vec![&self.input]
69    }
70
71    fn with_new_children(
72        self: Arc<Self>,
73        children: Vec<Arc<dyn ExecutionPlan>>,
74    ) -> DFResult<Arc<dyn ExecutionPlan>> {
75        let input = children[0].clone();
76        let exec = Self::new(
77            input,
78            self.conn_options.clone(),
79            self.pool.clone(),
80            self.literalizer.clone(),
81            self.table.clone(),
82            self.remote_schema.clone(),
83        );
84        Ok(Arc::new(exec))
85    }
86
87    fn execute(
88        &self,
89        partition: usize,
90        context: Arc<TaskContext>,
91    ) -> DFResult<SendableRecordBatchStream> {
92        let mut input_stream = self.input.execute(partition, context)?;
93        let conn_options = self.conn_options.clone();
94        let literalizer = self.literalizer.clone();
95        let table = self.table.clone();
96        let remote_schema = self.remote_schema.clone();
97        let pool = self.pool.clone();
98
99        let stream = futures::stream::once(async move {
100            let conn = pool.get().await?;
101
102            let mut total_count = 0;
103            while let Some(batch) = input_stream.next().await {
104                let batch = batch?;
105                let count = conn
106                    .insert(
107                        &conn_options,
108                        literalizer.clone(),
109                        &table,
110                        remote_schema.clone(),
111                        batch,
112                    )
113                    .await?;
114                total_count += count;
115            }
116            make_result_batch(total_count as i64)
117        })
118        .boxed();
119
120        Ok(Box::pin(RecordBatchStreamAdapter::new(
121            make_count_schema(),
122            stream,
123        )))
124    }
125}
126
127impl DisplayAs for RemoteTableInsertExec {
128    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
129        write!(f, "RemoteTableInsertExec: table={}", self.table.join("."))?;
130        if let Ok(stats) = self.input.partition_statistics(None) {
131            match stats.num_rows {
132                Precision::Exact(rows) => write!(f, ", rows={rows}")?,
133                Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
134                Precision::Absent => {}
135            }
136        }
137        Ok(())
138    }
139}
140
141fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
142    let schema = make_count_schema();
143    let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
144    let batch = RecordBatch::try_new(schema, vec![array])?;
145    Ok(batch)
146}
147
148fn make_count_schema() -> SchemaRef {
149    Arc::new(Schema::new(vec![Field::new(
150        "count",
151        DataType::Int64,
152        false,
153    )]))
154}