1use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow::datatypes::Schema;
22use arrow::ipc::writer::StreamWriter;
23use datafusion_common::{
24 internal_datafusion_err, internal_err, not_impl_err, DataFusionError, Result,
25};
26use datafusion_datasource::file_scan_config::FileScanConfig;
27use datafusion_datasource::file_sink_config::FileSink;
28use datafusion_datasource::file_sink_config::FileSinkConfig;
29use datafusion_datasource::{FileRange, PartitionedFile};
30use datafusion_datasource_csv::file_format::CsvSink;
31use datafusion_datasource_json::file_format::JsonSink;
32#[cfg(feature = "parquet")]
33use datafusion_datasource_parquet::file_format::ParquetSink;
34use datafusion_expr::WindowFrame;
35use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindowExpr};
36use datafusion_physical_expr::ScalarFunctionExpr;
37use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr;
38use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
39use datafusion_physical_plan::expressions::LikeExpr;
40use datafusion_physical_plan::expressions::{
41 BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr,
42 Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn,
43};
44use datafusion_physical_plan::udaf::AggregateFunctionExpr;
45use datafusion_physical_plan::windows::{PlainAggregateWindowExpr, WindowUDFExpr};
46use datafusion_physical_plan::{Partitioning, PhysicalExpr, WindowExpr};
47
48use crate::protobuf::{
49 self, physical_aggregate_expr_node, physical_window_expr_node, PhysicalSortExprNode,
50 PhysicalSortExprNodeCollection,
51};
52
53use super::PhysicalExtensionCodec;
54
55pub fn serialize_physical_aggr_expr(
56 aggr_expr: Arc<AggregateFunctionExpr>,
57 codec: &dyn PhysicalExtensionCodec,
58) -> Result<protobuf::PhysicalExprNode> {
59 let expressions = serialize_physical_exprs(&aggr_expr.expressions(), codec)?;
60 let order_bys =
61 serialize_physical_sort_exprs(aggr_expr.order_bys().iter().cloned(), codec)?;
62
63 let name = aggr_expr.fun().name().to_string();
64 let mut buf = Vec::new();
65 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
66 Ok(protobuf::PhysicalExprNode {
67 expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr(
68 protobuf::PhysicalAggregateExprNode {
69 aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)),
70 expr: expressions,
71 ordering_req: order_bys,
72 distinct: aggr_expr.is_distinct(),
73 ignore_nulls: aggr_expr.ignore_nulls(),
74 fun_definition: (!buf.is_empty()).then_some(buf),
75 human_display: aggr_expr.human_display().to_string(),
76 },
77 )),
78 })
79}
80
81fn serialize_physical_window_aggr_expr(
82 aggr_expr: &AggregateFunctionExpr,
83 _window_frame: &WindowFrame,
84 codec: &dyn PhysicalExtensionCodec,
85) -> Result<(physical_window_expr_node::WindowFunction, Option<Vec<u8>>)> {
86 let mut buf = Vec::new();
89 codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?;
90 Ok((
91 physical_window_expr_node::WindowFunction::UserDefinedAggrFunction(
92 aggr_expr.fun().name().to_string(),
93 ),
94 (!buf.is_empty()).then_some(buf),
95 ))
96}
97
98pub fn serialize_physical_window_expr(
99 window_expr: &Arc<dyn WindowExpr>,
100 codec: &dyn PhysicalExtensionCodec,
101) -> Result<protobuf::PhysicalWindowExprNode> {
102 let expr = window_expr.as_any();
103 let args = window_expr.expressions().to_vec();
104 let window_frame = window_expr.get_window_frame();
105
106 let (window_function, fun_definition, ignore_nulls, distinct) =
107 if let Some(plain_aggr_window_expr) =
108 expr.downcast_ref::<PlainAggregateWindowExpr>()
109 {
110 let aggr_expr = plain_aggr_window_expr.get_aggregate_expr();
111 let (window_function, fun_definition) =
112 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
113 (
114 window_function,
115 fun_definition,
116 aggr_expr.ignore_nulls(),
117 aggr_expr.is_distinct(),
118 )
119 } else if let Some(sliding_aggr_window_expr) =
120 expr.downcast_ref::<SlidingAggregateWindowExpr>()
121 {
122 let aggr_expr = sliding_aggr_window_expr.get_aggregate_expr();
123 let (window_function, fun_definition) =
124 serialize_physical_window_aggr_expr(aggr_expr, window_frame, codec)?;
125 (
126 window_function,
127 fun_definition,
128 aggr_expr.ignore_nulls(),
129 aggr_expr.is_distinct(),
130 )
131 } else if let Some(udf_window_expr) = expr.downcast_ref::<StandardWindowExpr>() {
132 if let Some(expr) = udf_window_expr
133 .get_standard_func_expr()
134 .as_any()
135 .downcast_ref::<WindowUDFExpr>()
136 {
137 let mut buf = Vec::new();
138 codec.try_encode_udwf(expr.fun(), &mut buf)?;
139 (
140 physical_window_expr_node::WindowFunction::UserDefinedWindowFunction(
141 expr.fun().name().to_string(),
142 ),
143 (!buf.is_empty()).then_some(buf),
144 false, false,
146 )
147 } else {
148 return not_impl_err!(
149 "User-defined window function not supported: {window_expr:?}"
150 );
151 }
152 } else {
153 return not_impl_err!("WindowExpr not supported: {window_expr:?}");
154 };
155
156 let args = serialize_physical_exprs(&args, codec)?;
157 let partition_by = serialize_physical_exprs(window_expr.partition_by(), codec)?;
158 let order_by = serialize_physical_sort_exprs(window_expr.order_by().to_vec(), codec)?;
159 let window_frame: protobuf::WindowFrame = window_frame
160 .as_ref()
161 .try_into()
162 .map_err(|e| internal_datafusion_err!("{e}"))?;
163
164 Ok(protobuf::PhysicalWindowExprNode {
165 args,
166 partition_by,
167 order_by,
168 window_frame: Some(window_frame),
169 window_function: Some(window_function),
170 name: window_expr.name().to_string(),
171 fun_definition,
172 ignore_nulls,
173 distinct,
174 })
175}
176
177pub fn serialize_physical_sort_exprs<I>(
178 sort_exprs: I,
179 codec: &dyn PhysicalExtensionCodec,
180) -> Result<Vec<PhysicalSortExprNode>>
181where
182 I: IntoIterator<Item = PhysicalSortExpr>,
183{
184 sort_exprs
185 .into_iter()
186 .map(|sort_expr| serialize_physical_sort_expr(sort_expr, codec))
187 .collect()
188}
189
190pub fn serialize_physical_sort_expr(
191 sort_expr: PhysicalSortExpr,
192 codec: &dyn PhysicalExtensionCodec,
193) -> Result<PhysicalSortExprNode> {
194 let PhysicalSortExpr { expr, options } = sort_expr;
195 let expr = serialize_physical_expr(&expr, codec)?;
196 Ok(PhysicalSortExprNode {
197 expr: Some(Box::new(expr)),
198 asc: !options.descending,
199 nulls_first: options.nulls_first,
200 })
201}
202
203pub fn serialize_physical_exprs<'a, I>(
204 values: I,
205 codec: &dyn PhysicalExtensionCodec,
206) -> Result<Vec<protobuf::PhysicalExprNode>>
207where
208 I: IntoIterator<Item = &'a Arc<dyn PhysicalExpr>>,
209{
210 values
211 .into_iter()
212 .map(|value| serialize_physical_expr(value, codec))
213 .collect()
214}
215
216pub fn serialize_physical_expr(
221 value: &Arc<dyn PhysicalExpr>,
222 codec: &dyn PhysicalExtensionCodec,
223) -> Result<protobuf::PhysicalExprNode> {
224 let value = snapshot_physical_expr(Arc::clone(value))?;
227 let expr = value.as_any();
228
229 if let Some(expr) = expr.downcast_ref::<Column>() {
230 Ok(protobuf::PhysicalExprNode {
231 expr_type: Some(protobuf::physical_expr_node::ExprType::Column(
232 protobuf::PhysicalColumn {
233 name: expr.name().to_string(),
234 index: expr.index() as u32,
235 },
236 )),
237 })
238 } else if let Some(expr) = expr.downcast_ref::<UnKnownColumn>() {
239 Ok(protobuf::PhysicalExprNode {
240 expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn(
241 protobuf::UnknownColumn {
242 name: expr.name().to_string(),
243 },
244 )),
245 })
246 } else if let Some(expr) = expr.downcast_ref::<BinaryExpr>() {
247 let binary_expr = Box::new(protobuf::PhysicalBinaryExprNode {
248 l: Some(Box::new(serialize_physical_expr(expr.left(), codec)?)),
249 r: Some(Box::new(serialize_physical_expr(expr.right(), codec)?)),
250 op: format!("{:?}", expr.op()),
251 });
252
253 Ok(protobuf::PhysicalExprNode {
254 expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr(
255 binary_expr,
256 )),
257 })
258 } else if let Some(expr) = expr.downcast_ref::<CaseExpr>() {
259 Ok(protobuf::PhysicalExprNode {
260 expr_type: Some(
261 protobuf::physical_expr_node::ExprType::Case(
262 Box::new(
263 protobuf::PhysicalCaseNode {
264 expr: expr
265 .expr()
266 .map(|exp| {
267 serialize_physical_expr(exp, codec).map(Box::new)
268 })
269 .transpose()?,
270 when_then_expr: expr
271 .when_then_expr()
272 .iter()
273 .map(|(when_expr, then_expr)| {
274 serialize_when_then_expr(when_expr, then_expr, codec)
275 })
276 .collect::<Result<
277 Vec<protobuf::PhysicalWhenThen>,
278 DataFusionError,
279 >>()?,
280 else_expr: expr
281 .else_expr()
282 .map(|a| serialize_physical_expr(a, codec).map(Box::new))
283 .transpose()?,
284 },
285 ),
286 ),
287 ),
288 })
289 } else if let Some(expr) = expr.downcast_ref::<NotExpr>() {
290 Ok(protobuf::PhysicalExprNode {
291 expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new(
292 protobuf::PhysicalNot {
293 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
294 },
295 ))),
296 })
297 } else if let Some(expr) = expr.downcast_ref::<IsNullExpr>() {
298 Ok(protobuf::PhysicalExprNode {
299 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr(
300 Box::new(protobuf::PhysicalIsNull {
301 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
302 }),
303 )),
304 })
305 } else if let Some(expr) = expr.downcast_ref::<IsNotNullExpr>() {
306 Ok(protobuf::PhysicalExprNode {
307 expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr(
308 Box::new(protobuf::PhysicalIsNotNull {
309 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
310 }),
311 )),
312 })
313 } else if let Some(expr) = expr.downcast_ref::<InListExpr>() {
314 Ok(protobuf::PhysicalExprNode {
315 expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new(
316 protobuf::PhysicalInListNode {
317 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
318 list: serialize_physical_exprs(expr.list(), codec)?,
319 negated: expr.negated(),
320 },
321 ))),
322 })
323 } else if let Some(expr) = expr.downcast_ref::<NegativeExpr>() {
324 Ok(protobuf::PhysicalExprNode {
325 expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new(
326 protobuf::PhysicalNegativeNode {
327 expr: Some(Box::new(serialize_physical_expr(expr.arg(), codec)?)),
328 },
329 ))),
330 })
331 } else if let Some(lit) = expr.downcast_ref::<Literal>() {
332 Ok(protobuf::PhysicalExprNode {
333 expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(
334 lit.value().try_into()?,
335 )),
336 })
337 } else if let Some(cast) = expr.downcast_ref::<CastExpr>() {
338 Ok(protobuf::PhysicalExprNode {
339 expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new(
340 protobuf::PhysicalCastNode {
341 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
342 arrow_type: Some(cast.cast_type().try_into()?),
343 },
344 ))),
345 })
346 } else if let Some(cast) = expr.downcast_ref::<TryCastExpr>() {
347 Ok(protobuf::PhysicalExprNode {
348 expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new(
349 protobuf::PhysicalTryCastNode {
350 expr: Some(Box::new(serialize_physical_expr(cast.expr(), codec)?)),
351 arrow_type: Some(cast.cast_type().try_into()?),
352 },
353 ))),
354 })
355 } else if let Some(expr) = expr.downcast_ref::<ScalarFunctionExpr>() {
356 let mut buf = Vec::new();
357 codec.try_encode_udf(expr.fun(), &mut buf)?;
358 Ok(protobuf::PhysicalExprNode {
359 expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf(
360 protobuf::PhysicalScalarUdfNode {
361 name: expr.name().to_string(),
362 args: serialize_physical_exprs(expr.args(), codec)?,
363 fun_definition: (!buf.is_empty()).then_some(buf),
364 return_type: Some(expr.return_type().try_into()?),
365 nullable: expr.nullable(),
366 return_field_name: expr
367 .return_field(&Schema::empty())?
368 .name()
369 .to_string(),
370 },
371 )),
372 })
373 } else if let Some(expr) = expr.downcast_ref::<LikeExpr>() {
374 Ok(protobuf::PhysicalExprNode {
375 expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new(
376 protobuf::PhysicalLikeExprNode {
377 negated: expr.negated(),
378 case_insensitive: expr.case_insensitive(),
379 expr: Some(Box::new(serialize_physical_expr(expr.expr(), codec)?)),
380 pattern: Some(Box::new(serialize_physical_expr(
381 expr.pattern(),
382 codec,
383 )?)),
384 },
385 ))),
386 })
387 } else {
388 let mut buf: Vec<u8> = vec![];
389 match codec.try_encode_expr(&value, &mut buf) {
390 Ok(_) => {
391 let inputs: Vec<protobuf::PhysicalExprNode> = value
392 .children()
393 .into_iter()
394 .map(|e| serialize_physical_expr(e, codec))
395 .collect::<Result<_>>()?;
396 Ok(protobuf::PhysicalExprNode {
397 expr_type: Some(protobuf::physical_expr_node::ExprType::Extension(
398 protobuf::PhysicalExtensionExprNode { expr: buf, inputs },
399 )),
400 })
401 }
402 Err(e) => internal_err!(
403 "Unsupported physical expr and extension codec failed with [{e}]. Expr: {value:?}"
404 ),
405 }
406 }
407}
408
409pub fn serialize_partitioning(
410 partitioning: &Partitioning,
411 codec: &dyn PhysicalExtensionCodec,
412) -> Result<protobuf::Partitioning> {
413 let serialized_partitioning = match partitioning {
414 Partitioning::RoundRobinBatch(partition_count) => protobuf::Partitioning {
415 partition_method: Some(protobuf::partitioning::PartitionMethod::RoundRobin(
416 *partition_count as u64,
417 )),
418 },
419 Partitioning::Hash(exprs, partition_count) => {
420 let serialized_exprs = serialize_physical_exprs(exprs, codec)?;
421 protobuf::Partitioning {
422 partition_method: Some(protobuf::partitioning::PartitionMethod::Hash(
423 protobuf::PhysicalHashRepartition {
424 hash_expr: serialized_exprs,
425 partition_count: *partition_count as u64,
426 },
427 )),
428 }
429 }
430 Partitioning::UnknownPartitioning(partition_count) => protobuf::Partitioning {
431 partition_method: Some(protobuf::partitioning::PartitionMethod::Unknown(
432 *partition_count as u64,
433 )),
434 },
435 };
436 Ok(serialized_partitioning)
437}
438
439fn serialize_when_then_expr(
440 when_expr: &Arc<dyn PhysicalExpr>,
441 then_expr: &Arc<dyn PhysicalExpr>,
442 codec: &dyn PhysicalExtensionCodec,
443) -> Result<protobuf::PhysicalWhenThen> {
444 Ok(protobuf::PhysicalWhenThen {
445 when_expr: Some(serialize_physical_expr(when_expr, codec)?),
446 then_expr: Some(serialize_physical_expr(then_expr, codec)?),
447 })
448}
449
450impl TryFrom<&PartitionedFile> for protobuf::PartitionedFile {
451 type Error = DataFusionError;
452
453 fn try_from(pf: &PartitionedFile) -> Result<Self> {
454 let last_modified = pf.object_meta.last_modified;
455 let last_modified_ns = last_modified.timestamp_nanos_opt().ok_or_else(|| {
456 DataFusionError::Plan(format!(
457 "Invalid timestamp on PartitionedFile::ObjectMeta: {last_modified}"
458 ))
459 })? as u64;
460 Ok(protobuf::PartitionedFile {
461 path: pf.object_meta.location.as_ref().to_owned(),
462 size: pf.object_meta.size,
463 last_modified_ns,
464 partition_values: pf
465 .partition_values
466 .iter()
467 .map(|v| v.try_into())
468 .collect::<Result<Vec<_>, _>>()?,
469 range: pf.range.as_ref().map(|r| r.try_into()).transpose()?,
470 statistics: pf.statistics.as_ref().map(|s| s.as_ref().into()),
471 })
472 }
473}
474
475impl TryFrom<&FileRange> for protobuf::FileRange {
476 type Error = DataFusionError;
477
478 fn try_from(value: &FileRange) -> Result<Self> {
479 Ok(protobuf::FileRange {
480 start: value.start,
481 end: value.end,
482 })
483 }
484}
485
486impl TryFrom<&[PartitionedFile]> for protobuf::FileGroup {
487 type Error = DataFusionError;
488
489 fn try_from(gr: &[PartitionedFile]) -> Result<Self, Self::Error> {
490 Ok(protobuf::FileGroup {
491 files: gr
492 .iter()
493 .map(|f| f.try_into())
494 .collect::<Result<Vec<_>, _>>()?,
495 })
496 }
497}
498
499pub fn serialize_file_scan_config(
500 conf: &FileScanConfig,
501 codec: &dyn PhysicalExtensionCodec,
502) -> Result<protobuf::FileScanExecConf> {
503 let file_groups = conf
504 .file_groups
505 .iter()
506 .map(|p| p.files().try_into())
507 .collect::<Result<Vec<_>, _>>()?;
508
509 let mut output_orderings = vec![];
510 for order in &conf.output_ordering {
511 let ordering = serialize_physical_sort_exprs(order.to_vec(), codec)?;
512 output_orderings.push(ordering)
513 }
514
515 let mut fields = conf
518 .file_schema()
519 .fields()
520 .iter()
521 .cloned()
522 .collect::<Vec<_>>();
523 fields.extend(conf.table_partition_cols().iter().cloned());
524
525 let schema = Arc::new(
526 arrow::datatypes::Schema::new(fields.clone())
527 .with_metadata(conf.file_schema().metadata.clone()),
528 );
529
530 Ok(protobuf::FileScanExecConf {
531 file_groups,
532 statistics: Some((&conf.file_source.statistics().unwrap()).into()),
533 limit: conf.limit.map(|l| protobuf::ScanLimit { limit: l as u32 }),
534 projection: conf
535 .projection_exprs
536 .as_ref()
537 .map(|p| p.column_indices())
538 .unwrap_or((0..schema.fields().len()).collect::<Vec<_>>())
539 .iter()
540 .map(|n| *n as u32)
541 .collect(),
542 schema: Some(schema.as_ref().try_into()?),
543 table_partition_cols: conf
544 .table_partition_cols()
545 .iter()
546 .map(|x| x.name().clone())
547 .collect::<Vec<_>>(),
548 object_store_url: conf.object_store_url.to_string(),
549 output_ordering: output_orderings
550 .into_iter()
551 .map(|e| PhysicalSortExprNodeCollection {
552 physical_sort_expr_nodes: e,
553 })
554 .collect::<Vec<_>>(),
555 constraints: Some(conf.constraints.clone().into()),
556 batch_size: conf.batch_size.map(|s| s as u64),
557 })
558}
559
560pub fn serialize_maybe_filter(
561 expr: Option<Arc<dyn PhysicalExpr>>,
562 codec: &dyn PhysicalExtensionCodec,
563) -> Result<protobuf::MaybeFilter> {
564 match expr {
565 None => Ok(protobuf::MaybeFilter { expr: None }),
566 Some(expr) => Ok(protobuf::MaybeFilter {
567 expr: Some(serialize_physical_expr(&expr, codec)?),
568 }),
569 }
570}
571
572pub fn serialize_record_batches(batches: &[RecordBatch]) -> Result<Vec<u8>> {
573 if batches.is_empty() {
574 return Ok(vec![]);
575 }
576 let schema = batches[0].schema();
577 let mut buf = Vec::new();
578 let mut writer = StreamWriter::try_new(&mut buf, &schema)?;
579 for batch in batches {
580 writer.write(batch)?;
581 }
582 writer.finish()?;
583 Ok(buf)
584}
585
586impl TryFrom<&JsonSink> for protobuf::JsonSink {
587 type Error = DataFusionError;
588
589 fn try_from(value: &JsonSink) -> Result<Self, Self::Error> {
590 Ok(Self {
591 config: Some(value.config().try_into()?),
592 writer_options: Some(value.writer_options().try_into()?),
593 })
594 }
595}
596
597impl TryFrom<&CsvSink> for protobuf::CsvSink {
598 type Error = DataFusionError;
599
600 fn try_from(value: &CsvSink) -> Result<Self, Self::Error> {
601 Ok(Self {
602 config: Some(value.config().try_into()?),
603 writer_options: Some(value.writer_options().try_into()?),
604 })
605 }
606}
607
608#[cfg(feature = "parquet")]
609impl TryFrom<&ParquetSink> for protobuf::ParquetSink {
610 type Error = DataFusionError;
611
612 fn try_from(value: &ParquetSink) -> Result<Self, Self::Error> {
613 Ok(Self {
614 config: Some(value.config().try_into()?),
615 parquet_options: Some(value.parquet_options().try_into()?),
616 })
617 }
618}
619
620impl TryFrom<&FileSinkConfig> for protobuf::FileSinkConfig {
621 type Error = DataFusionError;
622
623 fn try_from(conf: &FileSinkConfig) -> Result<Self, Self::Error> {
624 let file_groups = conf
625 .file_group
626 .iter()
627 .map(TryInto::try_into)
628 .collect::<Result<Vec<_>>>()?;
629 let table_paths = conf
630 .table_paths
631 .iter()
632 .map(ToString::to_string)
633 .collect::<Vec<_>>();
634 let table_partition_cols = conf
635 .table_partition_cols
636 .iter()
637 .map(|(name, data_type)| {
638 Ok(protobuf::PartitionColumn {
639 name: name.to_owned(),
640 arrow_type: Some(data_type.try_into()?),
641 })
642 })
643 .collect::<Result<Vec<_>>>()?;
644 Ok(Self {
645 object_store_url: conf.object_store_url.to_string(),
646 file_groups,
647 table_paths,
648 output_schema: Some(conf.output_schema.as_ref().try_into()?),
649 table_partition_cols,
650 keep_partition_by_columns: conf.keep_partition_by_columns,
651 insert_op: conf.insert_op as i32,
652 file_extension: conf.file_extension.to_string(),
653 })
654 }
655}