datafusion_remote_table/
insert.rs1use crate::{Connection, ConnectionOptions, DFResult, Literalize, RemoteSchemaRef};
2use datafusion::arrow::array::{ArrayRef, RecordBatch, UInt64Array};
3use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4use datafusion::common::stats::Precision;
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_expr::EquivalenceProperties;
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::physical_plan::{
9 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
10 PlanProperties,
11};
12use futures::StreamExt;
13use std::sync::Arc;
14
15#[derive(Debug)]
16pub struct RemoteTableInsertExec {
17 pub(crate) input: Arc<dyn ExecutionPlan>,
18 pub(crate) conn_options: Arc<ConnectionOptions>,
19 pub(crate) literalizer: Arc<dyn Literalize>,
20 pub(crate) table: Vec<String>,
21 pub(crate) remote_schema: RemoteSchemaRef,
22 pub(crate) conn: Arc<dyn Connection>,
23 plan_properties: PlanProperties,
24}
25
26impl RemoteTableInsertExec {
27 pub fn new(
28 input: Arc<dyn ExecutionPlan>,
29 conn_options: Arc<ConnectionOptions>,
30 literalizer: Arc<dyn Literalize>,
31 table: Vec<String>,
32 remote_schema: RemoteSchemaRef,
33 conn: Arc<dyn Connection>,
34 ) -> Self {
35 let plan_properties = PlanProperties::new(
37 EquivalenceProperties::new(make_count_schema()),
38 Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
39 input.pipeline_behavior(),
40 input.boundedness(),
41 );
42 Self {
43 input,
44 conn_options,
45 literalizer,
46 table,
47 remote_schema,
48 conn,
49 plan_properties,
50 }
51 }
52}
53
54impl ExecutionPlan for RemoteTableInsertExec {
55 fn name(&self) -> &str {
56 "RemoteTableInsertExec"
57 }
58
59 fn as_any(&self) -> &dyn std::any::Any {
60 self
61 }
62
63 fn properties(&self) -> &PlanProperties {
64 &self.plan_properties
65 }
66
67 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
68 vec![&self.input]
69 }
70
71 fn with_new_children(
72 self: Arc<Self>,
73 children: Vec<Arc<dyn ExecutionPlan>>,
74 ) -> DFResult<Arc<dyn ExecutionPlan>> {
75 let input = children[0].clone();
76 let exec = Self::new(
77 input,
78 self.conn_options.clone(),
79 self.literalizer.clone(),
80 self.table.clone(),
81 self.remote_schema.clone(),
82 self.conn.clone(),
83 );
84 Ok(Arc::new(exec))
85 }
86
87 fn execute(
88 &self,
89 partition: usize,
90 context: Arc<TaskContext>,
91 ) -> DFResult<SendableRecordBatchStream> {
92 let input_stream = self.input.execute(partition, context)?;
93 let conn_options = self.conn_options.clone();
94 let literalizer = self.literalizer.clone();
95 let table = self.table.clone();
96 let remote_schema = self.remote_schema.clone();
97 let conn = self.conn.clone();
98
99 let stream = futures::stream::once(async move {
100 let count = conn
101 .insert(
102 &conn_options,
103 literalizer.clone(),
104 &table,
105 remote_schema.clone(),
106 input_stream,
107 )
108 .await?;
109 make_result_batch(count as u64)
110 })
111 .boxed();
112
113 Ok(Box::pin(RecordBatchStreamAdapter::new(
114 make_count_schema(),
115 stream,
116 )))
117 }
118}
119
120impl DisplayAs for RemoteTableInsertExec {
121 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
122 write!(f, "RemoteTableInsertExec: table={}", self.table.join("."))?;
123 if let Ok(stats) = self.input.partition_statistics(None) {
124 match stats.num_rows {
125 Precision::Exact(rows) => write!(f, ", rows={rows}")?,
126 Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
127 Precision::Absent => {}
128 }
129 }
130 Ok(())
131 }
132}
133
134fn make_result_batch(count: u64) -> DFResult<RecordBatch> {
135 let schema = make_count_schema();
136 let array = Arc::new(UInt64Array::from(vec![count])) as ArrayRef;
137 let batch = RecordBatch::try_new(schema, vec![array])?;
138 Ok(batch)
139}
140
141fn make_count_schema() -> SchemaRef {
142 Arc::new(Schema::new(vec![Field::new(
143 "count",
144 DataType::UInt64,
145 false,
146 )]))
147}