datafusion_remote_table/
insert.rs1use crate::{Connection, ConnectionOptions, DFResult, Literalize, RemoteSchemaRef};
2use datafusion::arrow::array::{ArrayRef, Int64Array, RecordBatch};
3use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4use datafusion::common::stats::Precision;
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_expr::EquivalenceProperties;
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::physical_plan::{
9 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
10 PlanProperties,
11};
12use futures::StreamExt;
13use std::sync::Arc;
14
15#[derive(Debug)]
16pub struct RemoteTableInsertExec {
17 pub(crate) input: Arc<dyn ExecutionPlan>,
18 pub(crate) conn_options: Arc<ConnectionOptions>,
19 pub(crate) literalizer: Arc<dyn Literalize>,
20 pub(crate) table: Vec<String>,
21 pub(crate) remote_schema: RemoteSchemaRef,
22 pub(crate) conn: Arc<dyn Connection>,
23 plan_properties: PlanProperties,
24}
25
26impl RemoteTableInsertExec {
27 pub fn new(
28 input: Arc<dyn ExecutionPlan>,
29 conn_options: Arc<ConnectionOptions>,
30 literalizer: Arc<dyn Literalize>,
31 table: Vec<String>,
32 remote_schema: RemoteSchemaRef,
33 conn: Arc<dyn Connection>,
34 ) -> Self {
35 let plan_properties = PlanProperties::new(
37 EquivalenceProperties::new(make_count_schema()),
38 Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
39 input.pipeline_behavior(),
40 input.boundedness(),
41 );
42 Self {
43 input,
44 conn_options,
45 literalizer,
46 table,
47 remote_schema,
48 conn,
49 plan_properties,
50 }
51 }
52}
53
54impl ExecutionPlan for RemoteTableInsertExec {
55 fn name(&self) -> &str {
56 "RemoteTableInsertExec"
57 }
58
59 fn as_any(&self) -> &dyn std::any::Any {
60 self
61 }
62
63 fn properties(&self) -> &PlanProperties {
64 &self.plan_properties
65 }
66
67 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
68 vec![&self.input]
69 }
70
71 fn with_new_children(
72 self: Arc<Self>,
73 children: Vec<Arc<dyn ExecutionPlan>>,
74 ) -> DFResult<Arc<dyn ExecutionPlan>> {
75 let input = children[0].clone();
76 let exec = Self::new(
77 input,
78 self.conn_options.clone(),
79 self.literalizer.clone(),
80 self.table.clone(),
81 self.remote_schema.clone(),
82 self.conn.clone(),
83 );
84 Ok(Arc::new(exec))
85 }
86
87 fn execute(
88 &self,
89 partition: usize,
90 context: Arc<TaskContext>,
91 ) -> DFResult<SendableRecordBatchStream> {
92 let mut input_stream = self.input.execute(partition, context)?;
93 let conn_options = self.conn_options.clone();
94 let literalizer = self.literalizer.clone();
95 let table = self.table.clone();
96 let remote_schema = self.remote_schema.clone();
97 let conn = self.conn.clone();
98
99 let stream = futures::stream::once(async move {
100 let mut total_count = 0;
101 while let Some(batch) = input_stream.next().await {
102 let batch = batch?;
103 let count = conn
104 .insert(
105 &conn_options,
106 literalizer.clone(),
107 &table,
108 remote_schema.clone(),
109 batch,
110 )
111 .await?;
112 total_count += count;
113 }
114 make_result_batch(total_count as i64)
115 })
116 .boxed();
117
118 Ok(Box::pin(RecordBatchStreamAdapter::new(
119 make_count_schema(),
120 stream,
121 )))
122 }
123}
124
125impl DisplayAs for RemoteTableInsertExec {
126 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
127 write!(f, "RemoteTableInsertExec: table={}", self.table.join("."))?;
128 if let Ok(stats) = self.input.partition_statistics(None) {
129 match stats.num_rows {
130 Precision::Exact(rows) => write!(f, ", rows={rows}")?,
131 Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
132 Precision::Absent => {}
133 }
134 }
135 Ok(())
136 }
137}
138
139fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
140 let schema = make_count_schema();
141 let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
142 let batch = RecordBatch::try_new(schema, vec![array])?;
143 Ok(batch)
144}
145
146fn make_count_schema() -> SchemaRef {
147 Arc::new(Schema::new(vec![Field::new(
148 "count",
149 DataType::Int64,
150 false,
151 )]))
152}