datafusion_remote_table/
insert.rs1use crate::{ConnectionOptions, DFResult, LazyPool, Literalize, RemoteSchemaRef};
2use arrow::array::{ArrayRef, Int64Array, RecordBatch};
3use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
4use datafusion_common::stats::Precision;
5use datafusion_execution::{SendableRecordBatchStream, TaskContext};
6use datafusion_physical_expr::EquivalenceProperties;
7use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion_physical_plan::{
9 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
10 PlanProperties,
11};
12use futures::StreamExt;
13use std::sync::Arc;
14
15#[derive(Debug)]
16pub struct RemoteTableInsertExec {
17 pub(crate) input: Arc<dyn ExecutionPlan>,
18 pub(crate) conn_options: Arc<ConnectionOptions>,
19 pub(crate) pool: LazyPool,
20 pub(crate) literalizer: Arc<dyn Literalize>,
21 pub(crate) table: Vec<String>,
22 pub(crate) remote_schema: RemoteSchemaRef,
23 plan_properties: PlanProperties,
24}
25
26impl RemoteTableInsertExec {
27 pub fn new(
28 input: Arc<dyn ExecutionPlan>,
29 conn_options: Arc<ConnectionOptions>,
30 pool: LazyPool,
31 literalizer: Arc<dyn Literalize>,
32 table: Vec<String>,
33 remote_schema: RemoteSchemaRef,
34 ) -> Self {
35 let plan_properties = PlanProperties::new(
37 EquivalenceProperties::new(make_count_schema()),
38 Partitioning::UnknownPartitioning(input.output_partitioning().partition_count()),
39 input.pipeline_behavior(),
40 input.boundedness(),
41 );
42 Self {
43 input,
44 conn_options,
45 pool,
46 literalizer,
47 table,
48 remote_schema,
49 plan_properties,
50 }
51 }
52}
53
54impl ExecutionPlan for RemoteTableInsertExec {
55 fn name(&self) -> &str {
56 "RemoteTableInsertExec"
57 }
58
59 fn as_any(&self) -> &dyn std::any::Any {
60 self
61 }
62
63 fn properties(&self) -> &PlanProperties {
64 &self.plan_properties
65 }
66
67 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
68 vec![&self.input]
69 }
70
71 fn with_new_children(
72 self: Arc<Self>,
73 children: Vec<Arc<dyn ExecutionPlan>>,
74 ) -> DFResult<Arc<dyn ExecutionPlan>> {
75 let input = children[0].clone();
76 let exec = Self::new(
77 input,
78 self.conn_options.clone(),
79 self.pool.clone(),
80 self.literalizer.clone(),
81 self.table.clone(),
82 self.remote_schema.clone(),
83 );
84 Ok(Arc::new(exec))
85 }
86
87 fn execute(
88 &self,
89 partition: usize,
90 context: Arc<TaskContext>,
91 ) -> DFResult<SendableRecordBatchStream> {
92 let mut input_stream = self.input.execute(partition, context)?;
93 let conn_options = self.conn_options.clone();
94 let literalizer = self.literalizer.clone();
95 let table = self.table.clone();
96 let remote_schema = self.remote_schema.clone();
97 let pool = self.pool.clone();
98
99 let stream = futures::stream::once(async move {
100 let conn = pool.get().await?;
101
102 let mut total_count = 0;
103 while let Some(batch) = input_stream.next().await {
104 let batch = batch?;
105 let count = conn
106 .insert(
107 &conn_options,
108 literalizer.clone(),
109 &table,
110 remote_schema.clone(),
111 batch,
112 )
113 .await?;
114 total_count += count;
115 }
116 make_result_batch(total_count as i64)
117 })
118 .boxed();
119
120 Ok(Box::pin(RecordBatchStreamAdapter::new(
121 make_count_schema(),
122 stream,
123 )))
124 }
125}
126
127impl DisplayAs for RemoteTableInsertExec {
128 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
129 write!(f, "RemoteTableInsertExec: table={}", self.table.join("."))?;
130 if let Ok(stats) = self.input.partition_statistics(None) {
131 match stats.num_rows {
132 Precision::Exact(rows) => write!(f, ", rows={rows}")?,
133 Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
134 Precision::Absent => {}
135 }
136 }
137 Ok(())
138 }
139}
140
141fn make_result_batch(count: i64) -> DFResult<RecordBatch> {
142 let schema = make_count_schema();
143 let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
144 let batch = RecordBatch::try_new(schema, vec![array])?;
145 Ok(batch)
146}
147
148fn make_count_schema() -> SchemaRef {
149 Arc::new(Schema::new(vec![Field::new(
150 "count",
151 DataType::Int64,
152 false,
153 )]))
154}