1use std::sync::Arc;
2
3use arrow::datatypes::SchemaRef;
4use datafusion_common::DataFusionError;
5use datafusion_execution::TaskContext;
6use datafusion_expr::dml::InsertOp;
7use datafusion_physical_plan::ExecutionPlan;
8use datafusion_proto::logical_plan::DefaultLogicalExtensionCodec;
9use datafusion_proto::logical_plan::from_proto::parse_exprs;
10use datafusion_proto::logical_plan::to_proto::serialize_exprs;
11use datafusion_proto::physical_plan::PhysicalExtensionCodec;
12use datafusion_proto::protobuf::Schema as ProtoSchema;
13use indexlake::catalog::{DataFileRecord, RowValidity};
14use indexlake::storage::DataFileFormat;
15use prost::Message;
16use uuid::Uuid;
17
18use crate::index_lake_physical_plan_node::IndexLakePhysicalPlanType;
19use crate::{
20 DataFile, DataFiles, IndexLakeInsertExec, IndexLakeInsertExecNode, IndexLakePhysicalPlanNode,
21 IndexLakeScanExec, IndexLakeScanExecNode, LazyTable,
22};
23
24#[derive(Debug)]
25pub struct IndexLakePhysicalCodec {
26 client: Arc<indexlake::Client>,
27}
28
29impl IndexLakePhysicalCodec {
30 pub fn new(client: Arc<indexlake::Client>) -> Self {
31 Self { client }
32 }
33}
34
35impl PhysicalExtensionCodec for IndexLakePhysicalCodec {
36 fn try_decode(
37 &self,
38 buf: &[u8],
39 inputs: &[Arc<dyn ExecutionPlan>],
40 ctx: &TaskContext,
41 ) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
42 let indexlake_node = IndexLakePhysicalPlanNode::decode(buf).map_err(|e| {
43 DataFusionError::Internal(format!(
44 "Failed to decode indexlake physical plan node: {e:?}"
45 ))
46 })?;
47 let indexlake_plan = indexlake_node.index_lake_physical_plan_type.ok_or_else(|| {
48 DataFusionError::Internal(
49 "Failed to decode indexlake physical plan node due to physical plan type is none".to_string()
50 )
51 })?;
52
53 match indexlake_plan {
54 IndexLakePhysicalPlanType::Scan(node) => {
55 let schema = parse_schema(node.schema)?;
56
57 let data_files = parse_data_files(node.data_files)?;
58
59 let projection = parse_projection(node.projection.as_ref());
60 let filters = parse_exprs(&node.filters, ctx, &DefaultLogicalExtensionCodec {})?;
61
62 let lazy_table =
63 LazyTable::new(self.client.clone(), node.namespace_name, node.table_name);
64
65 Ok(Arc::new(IndexLakeScanExec::try_new(
66 lazy_table,
67 schema,
68 node.partition_count as usize,
69 data_files,
70 projection,
71 filters,
72 node.batch_size as usize,
73 node.limit.map(|l| l as usize),
74 )?))
75 }
76 IndexLakePhysicalPlanType::Insert(node) => {
77 if inputs.len() != 1 {
78 return Err(DataFusionError::Internal(format!(
79 "IndexLakeInsertExec requires exactly one input, got {}",
80 inputs.len()
81 )));
82 }
83 let input = inputs[0].clone();
84
85 let insert_op = parse_insert_op(node.insert_op)?;
86
87 let lazy_table =
88 LazyTable::new(self.client.clone(), node.namespace_name, node.table_name);
89
90 Ok(Arc::new(IndexLakeInsertExec::try_new(
91 lazy_table,
92 input,
93 insert_op,
94 node.bypass_insert_threshold as usize,
95 )?))
96 }
97 }
98 }
99
100 fn try_encode(
101 &self,
102 node: Arc<dyn ExecutionPlan>,
103 buf: &mut Vec<u8>,
104 ) -> Result<(), DataFusionError> {
105 if let Some(exec) = node.as_any().downcast_ref::<IndexLakeScanExec>() {
106 let projection = serialize_projection(exec.projection.as_ref());
107
108 let filters = serialize_exprs(&exec.filters, &DefaultLogicalExtensionCodec {})?;
109
110 let data_files = serialize_data_files(exec.data_files.as_ref())?;
111
112 let schema = serialize_schema(&exec.output_schema)?;
113
114 let proto = IndexLakePhysicalPlanNode {
115 index_lake_physical_plan_type: Some(IndexLakePhysicalPlanType::Scan(
116 IndexLakeScanExecNode {
117 namespace_name: exec.lazy_table.namespace_name.clone(),
118 table_name: exec.lazy_table.table_name.clone(),
119 partition_count: exec.partition_count as u32,
120 data_files,
121 projection,
122 filters,
123 batch_size: exec.batch_size as u32,
124 limit: exec.limit.map(|l| l as u32),
125 schema: Some(schema),
126 },
127 )),
128 };
129
130 proto.encode(buf).map_err(|e| {
131 DataFusionError::Internal(format!(
132 "Failed to encode indexlake scan execution plan: {e:?}"
133 ))
134 })?;
135
136 Ok(())
137 } else if let Some(exec) = node.as_any().downcast_ref::<IndexLakeInsertExec>() {
138 let insert_op = serialize_insert_op(exec.insert_op);
139
140 let proto = IndexLakePhysicalPlanNode {
141 index_lake_physical_plan_type: Some(IndexLakePhysicalPlanType::Insert(
142 IndexLakeInsertExecNode {
143 namespace_name: exec.lazy_table.namespace_name.clone(),
144 table_name: exec.lazy_table.table_name.clone(),
145 insert_op,
146 bypass_insert_threshold: exec.bypass_insert_threshold as u32,
147 },
148 )),
149 };
150
151 proto.encode(buf).map_err(|e| {
152 DataFusionError::Internal(format!(
153 "Failed to encode indexlake insert execution plan: {e:?}"
154 ))
155 })?;
156
157 Ok(())
158 } else {
159 Err(DataFusionError::NotImplemented(format!(
160 "IndexLakePhysicalCodec does not support encoding {}",
161 node.name()
162 )))
163 }
164 }
165}
166
167fn serialize_schema(schema: &SchemaRef) -> Result<ProtoSchema, DataFusionError> {
168 let proto: ProtoSchema = schema
169 .as_ref()
170 .try_into()
171 .map_err(|e| DataFusionError::Internal(format!("Failed to serialize schema: {e:?}")))?;
172 Ok(proto)
173}
174
175fn parse_schema(proto: Option<ProtoSchema>) -> Result<SchemaRef, DataFusionError> {
176 let proto =
177 proto.ok_or_else(|| DataFusionError::Internal("Missing schema in protobuf".to_string()))?;
178 let schema: arrow::datatypes::Schema = (&proto)
179 .try_into()
180 .map_err(|e| DataFusionError::Internal(format!("Failed to parse schema: {e:?}")))?;
181 Ok(Arc::new(schema))
182}
183
184fn serialize_projection(projection: Option<&Vec<usize>>) -> Option<crate::protobuf::Projection> {
185 projection.map(|p| crate::protobuf::Projection {
186 projection: p.iter().map(|n| *n as u32).collect(),
187 })
188}
189
190fn parse_projection(projection: Option<&crate::protobuf::Projection>) -> Option<Vec<usize>> {
191 projection.map(|p| p.projection.iter().map(|n| *n as usize).collect())
192}
193
194fn serialize_insert_op(insert_op: InsertOp) -> i32 {
195 let proto = match insert_op {
196 InsertOp::Append => datafusion_proto::protobuf::InsertOp::Append,
197 InsertOp::Overwrite => datafusion_proto::protobuf::InsertOp::Overwrite,
198 InsertOp::Replace => datafusion_proto::protobuf::InsertOp::Replace,
199 };
200 proto.into()
201}
202
203fn parse_insert_op(insert_op: i32) -> Result<InsertOp, DataFusionError> {
204 let proto = datafusion_proto::protobuf::InsertOp::try_from(insert_op)
205 .map_err(|e| DataFusionError::Internal(format!("Failed to parse insert op: {e:?}")))?;
206 match proto {
207 datafusion_proto::protobuf::InsertOp::Append => Ok(InsertOp::Append),
208 datafusion_proto::protobuf::InsertOp::Overwrite => Ok(InsertOp::Overwrite),
209 datafusion_proto::protobuf::InsertOp::Replace => Ok(InsertOp::Replace),
210 }
211}
212
213fn serialize_data_files(
214 records: Option<&Arc<Vec<DataFileRecord>>>,
215) -> Result<Option<DataFiles>, DataFusionError> {
216 match records {
217 Some(records) => {
218 let mut proto_data_files = vec![];
219 for record in records.iter() {
220 proto_data_files.push(DataFile {
221 data_file_id: record.data_file_id.as_bytes().to_vec(),
222 table_id: record.table_id.as_bytes().to_vec(),
223 format: serialize_data_file_format(record.format),
224 relative_path: record.relative_path.clone(),
225 size: record.size,
226 record_count: record.record_count,
227 validity: record.validity.bytes().to_vec(),
228 });
229 }
230 Ok(Some(DataFiles {
231 files: proto_data_files,
232 }))
233 }
234 None => Ok(None),
235 }
236}
237
238fn parse_data_files(
239 proto_data_files: Option<DataFiles>,
240) -> Result<Option<Arc<Vec<DataFileRecord>>>, DataFusionError> {
241 match proto_data_files {
242 Some(proto_data_files) => {
243 let mut records = vec![];
244 for proto_data_file in proto_data_files.files {
245 records.push(DataFileRecord {
246 data_file_id: Uuid::from_slice(&proto_data_file.data_file_id).map_err(|e| {
247 DataFusionError::Internal(format!("Failed to parse data file id: {e:?}"))
248 })?,
249 table_id: Uuid::from_slice(&proto_data_file.table_id).map_err(|e| {
250 DataFusionError::Internal(format!("Failed to parse table id: {e:?}"))
251 })?,
252 format: parse_data_file_format(proto_data_file.format)?,
253 relative_path: proto_data_file.relative_path,
254 size: proto_data_file.size,
255 record_count: proto_data_file.record_count,
256 validity: RowValidity::from(
257 proto_data_file.validity,
258 proto_data_file.record_count as usize,
259 ),
260 });
261 }
262 Ok(Some(Arc::new(records)))
263 }
264 None => Ok(None),
265 }
266}
267
268fn serialize_data_file_format(format: DataFileFormat) -> i32 {
269 let proto_format = match format {
270 DataFileFormat::ParquetV1 => crate::protobuf::DataFileFormat::ParquetV1,
271 DataFileFormat::ParquetV2 => crate::protobuf::DataFileFormat::ParquetV2,
272 };
273 proto_format.into()
274}
275
276fn parse_data_file_format(format: i32) -> Result<DataFileFormat, DataFusionError> {
277 let proto_format = crate::protobuf::DataFileFormat::try_from(format).map_err(|e| {
278 DataFusionError::Internal(format!("Failed to parse data file format: {e:?}"))
279 })?;
280 match proto_format {
281 crate::protobuf::DataFileFormat::ParquetV1 => Ok(DataFileFormat::ParquetV1),
282 crate::protobuf::DataFileFormat::ParquetV2 => Ok(DataFileFormat::ParquetV2),
283 }
284}