indexlake_datafusion/
insert.rs1use std::sync::Arc;
2
3use arrow::array::{ArrayRef, Int64Array, RecordBatch};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5use datafusion_common::DataFusionError;
6use datafusion_common::stats::Precision;
7use datafusion_execution::{SendableRecordBatchStream, TaskContext};
8use datafusion_expr::dml::InsertOp;
9use datafusion_physical_expr::EquivalenceProperties;
10use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
11use datafusion_physical_plan::{
12 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
13 Partitioning, PlanProperties,
14};
15use futures::{StreamExt, TryStreamExt};
16use indexlake::ILError;
17use indexlake::table::TableInsertion;
18
19use crate::LazyTable;
20
21#[derive(Debug)]
22pub struct IndexLakeInsertExec {
23 pub lazy_table: LazyTable,
24 pub input: Arc<dyn ExecutionPlan>,
25 pub insert_op: InsertOp,
26 pub bypass_insert_threshold: usize,
27 cache: PlanProperties,
28}
29
30impl IndexLakeInsertExec {
31 pub fn try_new(
32 lazy_table: LazyTable,
33 input: Arc<dyn ExecutionPlan>,
34 insert_op: InsertOp,
35 bypass_insert_threshold: usize,
36 ) -> Result<Self, DataFusionError> {
37 match insert_op {
38 InsertOp::Append | InsertOp::Overwrite => {}
39 InsertOp::Replace => {
40 return Err(DataFusionError::NotImplemented(
41 "Replace is not supported for indexlake table".to_string(),
42 ));
43 }
44 }
45
46 let cache = PlanProperties::new(
47 EquivalenceProperties::new(make_count_schema()),
48 Partitioning::UnknownPartitioning(1),
49 input.pipeline_behavior(),
50 input.boundedness(),
51 );
52
53 Ok(Self {
54 lazy_table,
55 input,
56 insert_op,
57 bypass_insert_threshold,
58 cache,
59 })
60 }
61}
62
63impl ExecutionPlan for IndexLakeInsertExec {
64 fn name(&self) -> &str {
65 "IndexLakeInsertExec"
66 }
67
68 fn as_any(&self) -> &dyn std::any::Any {
69 self
70 }
71
72 fn properties(&self) -> &PlanProperties {
73 &self.cache
74 }
75
76 fn required_input_distribution(&self) -> Vec<Distribution> {
77 vec![Distribution::SinglePartition]
78 }
79
80 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
81 vec![&self.input]
82 }
83
84 fn with_new_children(
85 self: Arc<Self>,
86 children: Vec<Arc<dyn ExecutionPlan>>,
87 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
88 let exec = IndexLakeInsertExec::try_new(
89 self.lazy_table.clone(),
90 children[0].clone(),
91 self.insert_op,
92 self.bypass_insert_threshold,
93 )?;
94 Ok(Arc::new(exec))
95 }
96
97 fn execute(
98 &self,
99 partition: usize,
100 context: Arc<TaskContext>,
101 ) -> Result<SendableRecordBatchStream, DataFusionError> {
102 if partition != 0 {
103 return Err(DataFusionError::Execution(
104 "IndexLakeInsertExec can only be executed on a single partition".to_string(),
105 ));
106 }
107
108 let mut input_stream = self.input.execute(partition, context)?;
109 let lazy_table = self.lazy_table.clone();
110 let input = self.input.clone();
111 let insert_op = self.insert_op;
112 let bypass_insert_threshold = self.bypass_insert_threshold;
113
114 let stream = futures::stream::once(async move {
115 let table = lazy_table
116 .get_or_load()
117 .await
118 .map_err(|e| DataFusionError::External(Box::new(e)))?;
119
120 match insert_op {
121 InsertOp::Append => {}
122 InsertOp::Overwrite => {
123 table
124 .truncate()
125 .await
126 .map_err(|e| DataFusionError::Execution(e.to_string()))?;
127 }
128 InsertOp::Replace => {
129 return Err(DataFusionError::Execution(
130 "Replace is not supported".to_string(),
131 ));
132 }
133 }
134
135 match input.partition_statistics(None).map(|stat| stat.num_rows) {
136 Ok(Precision::Exact(num_rows)) | Ok(Precision::Inexact(num_rows))
137 if num_rows > bypass_insert_threshold =>
138 {
139 let stream = input_stream
140 .map_err(|err| {
141 ILError::invalid_input(format!(
142 "Failed to get batch from stream: {err}"
143 ))
144 })
145 .boxed();
146 let count = table.bypass_insert(stream).await.map_err(|e| {
147 DataFusionError::Execution(format!(
148 "Failed to bypass insert into indexlake: {e}"
149 ))
150 })?;
151 make_result_batch(count as i64)
152 }
153 _ => {
154 let mut count = 0i64;
155 while let Some(batch) = input_stream.next().await {
156 let batch = batch?;
157 count += batch.num_rows() as i64;
158
159 table
160 .insert(TableInsertion::new(vec![batch]).with_try_dump(false))
161 .await
162 .map_err(|e| DataFusionError::Execution(e.to_string()))?;
163 }
164
165 table
167 .insert(TableInsertion::new(vec![]).with_try_dump(true))
168 .await
169 .map_err(|e| DataFusionError::Execution(e.to_string()))?;
170
171 make_result_batch(count)
172 }
173 }
174 })
175 .boxed();
176
177 Ok(Box::pin(RecordBatchStreamAdapter::new(
178 make_count_schema(),
179 stream,
180 )))
181 }
182}
183
184fn make_result_batch(count: i64) -> Result<RecordBatch, DataFusionError> {
185 let schema = make_count_schema();
186 let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
187 let batch = RecordBatch::try_new(schema, vec![array])?;
188 Ok(batch)
189}
190
191pub fn make_count_schema() -> SchemaRef {
192 Arc::new(Schema::new(vec![Field::new(
193 "count",
194 DataType::Int64,
195 false,
196 )]))
197}
198
199impl DisplayAs for IndexLakeInsertExec {
200 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201 write!(
202 f,
203 "IndexLakeInsertExec: table={}.{}",
204 self.lazy_table.namespace_name, self.lazy_table.table_name
205 )?;
206 if let Ok(stats) = self.input.partition_statistics(None) {
207 match stats.num_rows {
208 Precision::Exact(rows) => write!(f, ", rows={rows}")?,
209 Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
210 Precision::Absent => {}
211 }
212 }
213 Ok(())
214 }
215}