Skip to main content

indexlake_datafusion/
insert.rs

1use std::sync::Arc;
2
3use arrow::array::{ArrayRef, Int64Array, RecordBatch};
4use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
5use datafusion_common::DataFusionError;
6use datafusion_common::stats::Precision;
7use datafusion_execution::{SendableRecordBatchStream, TaskContext};
8use datafusion_expr::dml::InsertOp;
9use datafusion_physical_expr::EquivalenceProperties;
10use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
11use datafusion_physical_plan::{
12    DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
13    Partitioning, PlanProperties,
14};
15use futures::{StreamExt, TryStreamExt};
16use indexlake::ILError;
17use indexlake::table::TableInsertion;
18
19use crate::LazyTable;
20
21#[derive(Debug)]
22pub struct IndexLakeInsertExec {
23    pub lazy_table: LazyTable,
24    pub input: Arc<dyn ExecutionPlan>,
25    pub insert_op: InsertOp,
26    pub bypass_insert_threshold: usize,
27    cache: PlanProperties,
28}
29
30impl IndexLakeInsertExec {
31    pub fn try_new(
32        lazy_table: LazyTable,
33        input: Arc<dyn ExecutionPlan>,
34        insert_op: InsertOp,
35        bypass_insert_threshold: usize,
36    ) -> Result<Self, DataFusionError> {
37        match insert_op {
38            InsertOp::Append | InsertOp::Overwrite => {}
39            InsertOp::Replace => {
40                return Err(DataFusionError::NotImplemented(
41                    "Replace is not supported for indexlake table".to_string(),
42                ));
43            }
44        }
45
46        let cache = PlanProperties::new(
47            EquivalenceProperties::new(make_count_schema()),
48            Partitioning::UnknownPartitioning(1),
49            input.pipeline_behavior(),
50            input.boundedness(),
51        );
52
53        Ok(Self {
54            lazy_table,
55            input,
56            insert_op,
57            bypass_insert_threshold,
58            cache,
59        })
60    }
61}
62
63impl ExecutionPlan for IndexLakeInsertExec {
64    fn name(&self) -> &str {
65        "IndexLakeInsertExec"
66    }
67
68    fn as_any(&self) -> &dyn std::any::Any {
69        self
70    }
71
72    fn properties(&self) -> &PlanProperties {
73        &self.cache
74    }
75
76    fn required_input_distribution(&self) -> Vec<Distribution> {
77        vec![Distribution::SinglePartition]
78    }
79
80    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
81        vec![&self.input]
82    }
83
84    fn with_new_children(
85        self: Arc<Self>,
86        children: Vec<Arc<dyn ExecutionPlan>>,
87    ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
88        let exec = IndexLakeInsertExec::try_new(
89            self.lazy_table.clone(),
90            children[0].clone(),
91            self.insert_op,
92            self.bypass_insert_threshold,
93        )?;
94        Ok(Arc::new(exec))
95    }
96
97    fn execute(
98        &self,
99        partition: usize,
100        context: Arc<TaskContext>,
101    ) -> Result<SendableRecordBatchStream, DataFusionError> {
102        if partition != 0 {
103            return Err(DataFusionError::Execution(
104                "IndexLakeInsertExec can only be executed on a single partition".to_string(),
105            ));
106        }
107
108        let mut input_stream = self.input.execute(partition, context)?;
109        let lazy_table = self.lazy_table.clone();
110        let input = self.input.clone();
111        let insert_op = self.insert_op;
112        let bypass_insert_threshold = self.bypass_insert_threshold;
113
114        let stream = futures::stream::once(async move {
115            let table = lazy_table
116                .get_or_load()
117                .await
118                .map_err(|e| DataFusionError::External(Box::new(e)))?;
119
120            match insert_op {
121                InsertOp::Append => {}
122                InsertOp::Overwrite => {
123                    table
124                        .truncate()
125                        .await
126                        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
127                }
128                InsertOp::Replace => {
129                    return Err(DataFusionError::Execution(
130                        "Replace is not supported".to_string(),
131                    ));
132                }
133            }
134
135            match input.partition_statistics(None).map(|stat| stat.num_rows) {
136                Ok(Precision::Exact(num_rows)) | Ok(Precision::Inexact(num_rows))
137                    if num_rows > bypass_insert_threshold =>
138                {
139                    let stream = input_stream
140                        .map_err(|err| {
141                            ILError::invalid_input(format!(
142                                "Failed to get batch from stream: {err}"
143                            ))
144                        })
145                        .boxed();
146                    let count = table.bypass_insert(stream).await.map_err(|e| {
147                        DataFusionError::Execution(format!(
148                            "Failed to bypass insert into indexlake: {e}"
149                        ))
150                    })?;
151                    make_result_batch(count as i64)
152                }
153                _ => {
154                    let mut count = 0i64;
155                    while let Some(batch) = input_stream.next().await {
156                        let batch = batch?;
157                        count += batch.num_rows() as i64;
158
159                        table
160                            .insert(TableInsertion::new(vec![batch]).with_try_dump(false))
161                            .await
162                            .map_err(|e| DataFusionError::Execution(e.to_string()))?;
163                    }
164
165                    // trigger dump
166                    table
167                        .insert(TableInsertion::new(vec![]).with_try_dump(true))
168                        .await
169                        .map_err(|e| DataFusionError::Execution(e.to_string()))?;
170
171                    make_result_batch(count)
172                }
173            }
174        })
175        .boxed();
176
177        Ok(Box::pin(RecordBatchStreamAdapter::new(
178            make_count_schema(),
179            stream,
180        )))
181    }
182}
183
184fn make_result_batch(count: i64) -> Result<RecordBatch, DataFusionError> {
185    let schema = make_count_schema();
186    let array = Arc::new(Int64Array::from(vec![count])) as ArrayRef;
187    let batch = RecordBatch::try_new(schema, vec![array])?;
188    Ok(batch)
189}
190
191pub fn make_count_schema() -> SchemaRef {
192    Arc::new(Schema::new(vec![Field::new(
193        "count",
194        DataType::Int64,
195        false,
196    )]))
197}
198
199impl DisplayAs for IndexLakeInsertExec {
200    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
201        write!(
202            f,
203            "IndexLakeInsertExec: table={}.{}",
204            self.lazy_table.namespace_name, self.lazy_table.table_name
205        )?;
206        if let Ok(stats) = self.input.partition_statistics(None) {
207            match stats.num_rows {
208                Precision::Exact(rows) => write!(f, ", rows={rows}")?,
209                Precision::Inexact(rows) => write!(f, ", rows~={rows}")?,
210                Precision::Absent => {}
211            }
212        }
213        Ok(())
214    }
215}