1use crate::query::df_graph::GraphExecutionContext;
22use crate::query::df_graph::common::{
23 arrow_err, collect_all_partitions, compute_plan_properties, execute_subplan, extract_row_params,
24};
25use crate::query::planner::LogicalPlan;
26use arrow_array::builder::{
27 BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder, UInt64Builder,
28};
29use arrow_array::{ArrayRef, RecordBatch};
30use arrow_schema::{DataType, SchemaRef};
31use datafusion::common::Result as DFResult;
32use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
35use datafusion::prelude::SessionContext;
36use futures::Stream;
37use parking_lot::RwLock;
38use std::any::Any;
39use std::collections::HashMap;
40use std::collections::hash_map::DefaultHasher;
41use std::fmt;
42use std::hash::{Hash, Hasher};
43use std::pin::Pin;
44use std::sync::Arc;
45use std::task::{Context, Poll};
46use uni_common::Value;
47use uni_common::core::schema::Schema as UniSchema;
48use uni_cypher::ast::{Expr, UnaryOp};
49use uni_store::storage::manager::StorageManager;
50
51pub struct GraphApplyExec {
58 input_exec: Arc<dyn ExecutionPlan>,
61
62 subquery_plan: LogicalPlan,
64
65 input_filter: Option<Expr>,
67
68 graph_ctx: Arc<GraphExecutionContext>,
70
71 session_ctx: Arc<RwLock<SessionContext>>,
73
74 storage: Arc<StorageManager>,
76
77 schema_info: Arc<UniSchema>,
79
80 params: HashMap<String, Value>,
82
83 output_schema: SchemaRef,
85
86 properties: PlanProperties,
88
89 metrics: ExecutionPlanMetricsSet,
91}
92
93impl fmt::Debug for GraphApplyExec {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95 f.debug_struct("GraphApplyExec")
96 .field("has_input_filter", &self.input_filter.is_some())
97 .finish()
98 }
99}
100
101impl GraphApplyExec {
102 #[expect(clippy::too_many_arguments)]
104 pub fn new(
105 input_exec: Arc<dyn ExecutionPlan>,
106 subquery_plan: LogicalPlan,
107 input_filter: Option<Expr>,
108 graph_ctx: Arc<GraphExecutionContext>,
109 session_ctx: Arc<RwLock<SessionContext>>,
110 storage: Arc<StorageManager>,
111 schema_info: Arc<UniSchema>,
112 params: HashMap<String, Value>,
113 output_schema: SchemaRef,
114 ) -> Self {
115 let properties = compute_plan_properties(output_schema.clone());
116
117 Self {
118 input_exec,
119 subquery_plan,
120 input_filter,
121 graph_ctx,
122 session_ctx,
123 storage,
124 schema_info,
125 params,
126 output_schema,
127 properties,
128 metrics: ExecutionPlanMetricsSet::new(),
129 }
130 }
131}
132
133impl DisplayAs for GraphApplyExec {
134 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
135 write!(
136 f,
137 "GraphApplyExec: filter={}",
138 if self.input_filter.is_some() {
139 "yes"
140 } else {
141 "none"
142 }
143 )
144 }
145}
146
147impl ExecutionPlan for GraphApplyExec {
148 fn name(&self) -> &str {
149 "GraphApplyExec"
150 }
151
152 fn as_any(&self) -> &dyn Any {
153 self
154 }
155
156 fn schema(&self) -> SchemaRef {
157 self.output_schema.clone()
158 }
159
160 fn properties(&self) -> &PlanProperties {
161 &self.properties
162 }
163
164 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
165 vec![]
167 }
168
169 fn with_new_children(
170 self: Arc<Self>,
171 children: Vec<Arc<dyn ExecutionPlan>>,
172 ) -> DFResult<Arc<dyn ExecutionPlan>> {
173 if !children.is_empty() {
174 return Err(datafusion::error::DataFusionError::Plan(
175 "GraphApplyExec has no children".to_string(),
176 ));
177 }
178 Ok(self)
179 }
180
181 fn execute(
182 &self,
183 partition: usize,
184 _context: Arc<TaskContext>,
185 ) -> DFResult<SendableRecordBatchStream> {
186 let metrics = BaselineMetrics::new(&self.metrics, partition);
187
188 let input_exec = self.input_exec.clone();
189 let subquery_plan = self.subquery_plan.clone();
190 let input_filter = self.input_filter.clone();
191 let graph_ctx = self.graph_ctx.clone();
192 let session_ctx = self.session_ctx.clone();
193 let storage = self.storage.clone();
194 let schema_info = self.schema_info.clone();
195 let params = self.params.clone();
196 let output_schema = self.output_schema.clone();
197
198 let fut = async move {
199 run_apply(
200 input_exec,
201 &subquery_plan,
202 input_filter.as_ref(),
203 &graph_ctx,
204 &session_ctx,
205 &storage,
206 &schema_info,
207 ¶ms,
208 &output_schema,
209 )
210 .await
211 };
212
213 Ok(Box::pin(ApplyStream {
214 state: ApplyStreamState::Running(Box::pin(fut)),
215 schema: self.output_schema.clone(),
216 metrics,
217 }))
218 }
219
220 fn metrics(&self) -> Option<MetricsSet> {
221 Some(self.metrics.clone_inner())
222 }
223}
224
225fn batches_to_row_maps(batches: &[RecordBatch]) -> Vec<HashMap<String, Value>> {
231 batches
232 .iter()
233 .flat_map(|batch| {
234 (0..batch.num_rows()).map(move |row_idx| extract_row_params(batch, row_idx))
235 })
236 .collect()
237}
238
239fn evaluate_filter(filter: &Expr, row: &HashMap<String, Value>) -> bool {
244 match filter {
245 Expr::BinaryOp { left, op, right } => {
246 use uni_cypher::ast::BinaryOp;
247 match op {
248 BinaryOp::And => evaluate_filter(left, row) && evaluate_filter(right, row),
249 BinaryOp::Or => evaluate_filter(left, row) || evaluate_filter(right, row),
250 _ => {
251 let left_val = resolve_expr_value(left, row);
252 let right_val = resolve_expr_value(right, row);
253 evaluate_comparison(op, &left_val, &right_val)
254 }
255 }
256 }
257 Expr::UnaryOp {
258 op: UnaryOp::Not,
259 expr,
260 } => !evaluate_filter(expr, row),
261 _ => {
262 let val = resolve_expr_value(filter, row);
264 val.as_bool().unwrap_or(false)
265 }
266 }
267}
268
269fn resolve_expr_value(expr: &Expr, row: &HashMap<String, Value>) -> Value {
271 match expr {
272 Expr::Literal(lit) => lit.to_value(),
273 Expr::Variable(name) => row.get(name).cloned().unwrap_or(Value::Null),
274 Expr::Property(base_expr, key) => {
275 if let Expr::Variable(var) = base_expr.as_ref() {
276 let col_name = format!("{}.{}", var, key);
278 row.get(&col_name).cloned().unwrap_or(Value::Null)
279 } else {
280 Value::Null
281 }
282 }
283 _ => Value::Null,
284 }
285}
286
287fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
289 match (a, b) {
290 (Value::Int(a), Value::Int(b)) => Some(a.cmp(b)),
291 (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
292 (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
293 (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
294 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
295 _ => None,
296 }
297}
298
299fn evaluate_comparison(op: &uni_cypher::ast::BinaryOp, left: &Value, right: &Value) -> bool {
304 use std::cmp::Ordering;
305 use uni_cypher::ast::BinaryOp;
306
307 match op {
308 BinaryOp::Eq => left == right,
309 BinaryOp::NotEq => left != right,
310 BinaryOp::Lt => compare_values(left, right) == Some(Ordering::Less),
311 BinaryOp::LtEq => matches!(
312 compare_values(left, right),
313 Some(Ordering::Less | Ordering::Equal)
314 ),
315 BinaryOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
316 BinaryOp::GtEq => matches!(
317 compare_values(left, right),
318 Some(Ordering::Greater | Ordering::Equal)
319 ),
320 _ => false,
321 }
322}
323
324fn build_column<B, T>(
329 rows: &[HashMap<String, Value>],
330 col_name: &str,
331 mut builder: B,
332 extract: impl Fn(&Value) -> Option<T>,
333) -> ArrayRef
334where
335 B: arrow_array::builder::ArrayBuilder,
336 B: PrimitiveAppend<T>,
337{
338 for row in rows {
339 match row.get(col_name).and_then(&extract) {
340 Some(v) => builder.append_typed_value(v),
341 None => builder.append_typed_null(),
342 }
343 }
344 Arc::new(builder.finish_to_array())
345}
346
347trait PrimitiveAppend<T> {
352 fn append_typed_value(&mut self, val: T);
353 fn append_typed_null(&mut self);
354 fn finish_to_array(self) -> ArrayRef;
355}
356
357macro_rules! impl_primitive_append {
358 ($builder:ty, $native:ty, $array:ty) => {
359 impl PrimitiveAppend<$native> for $builder {
360 fn append_typed_value(&mut self, val: $native) {
361 self.append_value(val);
362 }
363 fn append_typed_null(&mut self) {
364 self.append_null();
365 }
366 fn finish_to_array(mut self) -> ArrayRef {
367 Arc::new(self.finish()) as ArrayRef
368 }
369 }
370 };
371}
372
373impl_primitive_append!(UInt64Builder, u64, arrow_array::UInt64Array);
374impl_primitive_append!(Int64Builder, i64, arrow_array::Int64Array);
375impl_primitive_append!(Int32Builder, i32, arrow_array::Int32Array);
376impl_primitive_append!(Float64Builder, f64, arrow_array::Float64Array);
377impl_primitive_append!(BooleanBuilder, bool, arrow_array::BooleanArray);
378
379fn rows_to_batch(rows: &[HashMap<String, Value>], schema: &SchemaRef) -> DFResult<RecordBatch> {
381 if rows.is_empty() {
382 return Ok(RecordBatch::new_empty(schema.clone()));
383 }
384
385 let num_rows = rows.len();
386 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
387
388 for field in schema.fields() {
389 let col_name = field.name();
390 let col = match field.data_type() {
391 DataType::UInt64 => build_column(
392 rows,
393 col_name,
394 UInt64Builder::with_capacity(num_rows),
395 |v| v.as_u64().or_else(|| v.as_i64().map(|i| i as u64)),
396 ),
397 DataType::Int64 => build_column(
398 rows,
399 col_name,
400 Int64Builder::with_capacity(num_rows),
401 Value::as_i64,
402 ),
403 DataType::Int32 => {
404 build_column(rows, col_name, Int32Builder::with_capacity(num_rows), |v| {
405 v.as_i64().map(|i| i as i32)
406 })
407 }
408 DataType::Float64 => build_column(
409 rows,
410 col_name,
411 Float64Builder::with_capacity(num_rows),
412 Value::as_f64,
413 ),
414 DataType::Boolean => build_column(
415 rows,
416 col_name,
417 BooleanBuilder::with_capacity(num_rows),
418 Value::as_bool,
419 ),
420 DataType::LargeBinary => {
421 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(
422 num_rows,
423 num_rows * 64,
424 );
425 for row in rows {
426 match row.get(col_name) {
427 Some(val) if !val.is_null() => {
428 let cv_bytes = uni_common::cypher_value_codec::encode(val);
429 builder.append_value(&cv_bytes);
430 }
431 _ => builder.append_null(),
432 }
433 }
434 Arc::new(builder.finish()) as ArrayRef
435 }
436 DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
437 let mut builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
438 for row in rows {
439 match row.get(col_name) {
440 Some(Value::List(items)) => {
441 for item in items {
442 match item {
443 Value::String(s) => builder.values().append_value(s),
444 Value::Null => builder.values().append_null(),
445 other => builder.values().append_value(format!("{other}")),
446 }
447 }
448 builder.append(true);
449 }
450 _ => builder.append_null(),
451 }
452 }
453 Arc::new(builder.finish()) as ArrayRef
454 }
455 DataType::Null => Arc::new(arrow_array::NullArray::new(num_rows)) as ArrayRef,
456 _ => {
458 let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
459 for row in rows {
460 match row.get(col_name) {
461 Some(Value::Null) | None => builder.append_null(),
462 Some(Value::String(s)) => builder.append_value(s),
463 Some(other) => builder.append_value(format!("{other}")),
464 }
465 }
466 Arc::new(builder.finish()) as ArrayRef
467 }
468 };
469 columns.push(col);
470 }
471
472 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
473}
474
475fn slice_row(batch: &RecordBatch, row_idx: usize) -> Vec<ArrayRef> {
477 batch
478 .columns()
479 .iter()
480 .map(|col| col.slice(row_idx, 1))
481 .collect()
482}
483
484fn is_procedure_call(plan: &LogicalPlan) -> bool {
487 match plan {
488 LogicalPlan::ProcedureCall { .. } => true,
489 LogicalPlan::Project { input, .. }
490 | LogicalPlan::Filter { input, .. }
491 | LogicalPlan::Sort { input, .. }
492 | LogicalPlan::Limit { input, .. }
493 | LogicalPlan::Distinct { input } => is_procedure_call(input),
494 _ => false,
495 }
496}
497
498fn hash_row_params(params: &HashMap<String, Value>) -> u64 {
502 let mut hasher = DefaultHasher::new();
503 let mut entries: Vec<_> = params.iter().collect();
504 entries.sort_unstable_by_key(|(k, _)| *k);
505 for (key, val) in entries {
506 key.hash(&mut hasher);
507 format!("{val:?}").hash(&mut hasher);
508 }
509 hasher.finish()
510}
511
512fn is_batch_eligible(filtered_entries: &[(&RecordBatch, usize, HashMap<String, Value>)]) -> bool {
517 if filtered_entries.len() < 2 {
518 return false;
519 }
520
521 filtered_entries
523 .iter()
524 .any(|(_, _, row_params)| row_params.keys().any(|k| k.ends_with("._vid")))
525}
526
527#[expect(clippy::too_many_arguments)]
532async fn run_apply(
533 input_exec: Arc<dyn ExecutionPlan>,
534 subquery_plan: &LogicalPlan,
535 input_filter: Option<&Expr>,
536 graph_ctx: &Arc<GraphExecutionContext>,
537 session_ctx: &Arc<RwLock<SessionContext>>,
538 storage: &Arc<StorageManager>,
539 schema_info: &Arc<UniSchema>,
540 params: &HashMap<String, Value>,
541 output_schema: &SchemaRef,
542) -> DFResult<RecordBatch> {
543 let apply_start = std::time::Instant::now();
544 let is_proc_call = is_procedure_call(subquery_plan);
545 tracing::debug!("run_apply: is_procedure_call={}", is_proc_call);
546
547 let task_ctx = session_ctx.read().task_ctx();
549 let input_batches = collect_all_partitions(&input_exec, task_ctx).await?;
550
551 let mut filtered_entries: Vec<(&RecordBatch, usize, HashMap<String, Value>)> = Vec::new();
554 for batch in &input_batches {
555 for row_idx in 0..batch.num_rows() {
556 let row_params = extract_row_params(batch, row_idx);
557 if let Some(filter) = input_filter
558 && !evaluate_filter(filter, &row_params)
559 {
560 continue;
561 }
562 filtered_entries.push((batch, row_idx, row_params));
563 }
564 }
565
566 tracing::debug!(
567 "run_apply: filtered_entries count = {}",
568 filtered_entries.len()
569 );
570
571 if filtered_entries.is_empty() {
573 let sub_batches = execute_subplan(
574 subquery_plan,
575 params,
576 &HashMap::new(), graph_ctx,
578 session_ctx,
579 storage,
580 schema_info,
581 )
582 .await?;
583 let sub_rows = batches_to_row_maps(&sub_batches);
584 return rows_to_batch(&sub_rows, output_schema);
585 }
586
587 let has_filter = input_filter.is_some();
594
595 if is_batch_eligible(&filtered_entries) && !is_proc_call && has_filter {
596 tracing::debug!("run_apply: batching eligible, attempting batch execution");
597
598 let mut vid_values: HashMap<String, Vec<Value>> = HashMap::new();
600 for (_, _, row_params) in &filtered_entries {
601 for (key, value) in row_params {
602 if key.ends_with("._vid") {
603 vid_values
604 .entry(key.clone())
605 .or_default()
606 .push(value.clone());
607 }
608 }
609 }
610
611 let mut batched_params = params.clone();
613 for (key, values) in &vid_values {
614 batched_params.insert(key.clone(), Value::List(values.clone()));
615 }
616
617 if let Some((_, _, first_row_params)) = filtered_entries.first() {
620 for (key, value) in first_row_params {
621 if !key.ends_with("._vid") {
622 batched_params
623 .entry(key.clone())
624 .or_insert_with(|| value.clone());
625 }
626 }
627 }
628
629 let subplan_start = std::time::Instant::now();
631 let sub_batches = execute_subplan(
632 subquery_plan,
633 &batched_params,
634 &HashMap::new(),
635 graph_ctx,
636 session_ctx,
637 storage,
638 schema_info,
639 )
640 .await?;
641 let subplan_elapsed = subplan_start.elapsed();
642 tracing::debug!(
643 "run_apply: batch execute_subplan took {:?}",
644 subplan_elapsed
645 );
646
647 let sub_rows = batches_to_row_maps(&sub_batches);
649 let mut sub_index: HashMap<i64, Vec<&HashMap<String, Value>>> = HashMap::new();
650
651 let vid_key = vid_values.keys().next().expect("at least one VID key");
653
654 for sub_row in &sub_rows {
655 if let Some(Value::Int(vid)) = sub_row.get(vid_key) {
656 sub_index.entry(*vid).or_default().push(sub_row);
657 }
658 }
659
660 let input_schema = input_batches[0].schema();
662 let num_input_cols = input_schema.fields().len();
663 let num_output_cols = output_schema.fields().len();
664 let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
665
666 for (batch, row_idx, row_params) in &filtered_entries {
667 let input_vid = if let Some(Value::Int(vid)) = row_params.get(vid_key) {
669 *vid
670 } else {
671 continue; };
673
674 if let Some(matching_sub_rows) = sub_index.get(&input_vid) {
676 let input_row_arrays = slice_row(batch, *row_idx);
677
678 for sub_row in matching_sub_rows {
679 append_cross_join_row(
680 &mut column_arrays,
681 &input_row_arrays,
682 sub_row,
683 output_schema,
684 num_input_cols,
685 )?;
686 }
687 }
688 }
690
691 let result = concat_column_arrays(&column_arrays, output_schema);
692
693 let apply_elapsed = apply_start.elapsed();
694 tracing::debug!(
695 "run_apply: completed (batched) in {:?}, 1 subplan execution",
696 apply_elapsed
697 );
698
699 return result;
700 }
701
702 let input_schema = input_batches[0].schema();
706 let num_input_cols = input_schema.fields().len();
707 let num_output_cols = output_schema.fields().len();
708 let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
710
711 let mut total_subplan_time = std::time::Duration::ZERO;
712 let mut subplan_executions = 0;
713
714 let mut subplan_cache: HashMap<u64, Vec<HashMap<String, Value>>> = HashMap::new();
716 let mut cache_hits = 0;
717
718 for (batch, row_idx, row_params) in &filtered_entries {
719 let (sub_params, sub_outer_values) = if is_procedure_call(subquery_plan) {
723 (params.clone(), row_params.clone())
725 } else {
726 let mut merged = params.clone();
728 merged.extend(row_params.clone());
729 (merged, HashMap::new())
730 };
731
732 let params_hash = hash_row_params(row_params);
734 let sub_rows = if let Some(cached_rows) = subplan_cache.get(¶ms_hash) {
735 cache_hits += 1;
737 tracing::debug!(
738 "run_apply: cache hit for params hash {}, skipping execute_subplan",
739 params_hash
740 );
741 cached_rows.clone()
742 } else {
743 let subplan_start = std::time::Instant::now();
745 let sub_batches = execute_subplan(
746 subquery_plan,
747 &sub_params,
748 &sub_outer_values,
749 graph_ctx,
750 session_ctx,
751 storage,
752 schema_info,
753 )
754 .await?;
755 let subplan_elapsed = subplan_start.elapsed();
756 total_subplan_time += subplan_elapsed;
757 subplan_executions += 1;
758
759 tracing::debug!(
760 "run_apply: execute_subplan #{} took {:?}",
761 subplan_executions,
762 subplan_elapsed
763 );
764
765 let rows = batches_to_row_maps(&sub_batches);
766 subplan_cache.insert(params_hash, rows.clone());
767 rows
768 };
769
770 let input_row_arrays = slice_row(batch, *row_idx);
771
772 if sub_rows.is_empty() {
773 continue;
775 }
776
777 for sub_row in &sub_rows {
778 append_cross_join_row(
779 &mut column_arrays,
780 &input_row_arrays,
781 sub_row,
782 output_schema,
783 num_input_cols,
784 )?;
785 }
786 }
787
788 let result = concat_column_arrays(&column_arrays, output_schema);
790
791 let apply_elapsed = apply_start.elapsed();
792 tracing::debug!(
793 "run_apply: completed in {:?}, {} subplan executions, {} cache hits, {:?} total subplan time",
794 apply_elapsed,
795 subplan_executions,
796 cache_hits,
797 total_subplan_time
798 );
799
800 result
801}
802
803fn single_row_array<B, T>(mut builder: B, val: Option<T>) -> ArrayRef
805where
806 B: PrimitiveAppend<T>,
807{
808 match val {
809 Some(v) => builder.append_typed_value(v),
810 None => builder.append_typed_null(),
811 }
812 builder.finish_to_array()
813}
814
815fn value_to_single_row_array(val: &Value, data_type: &DataType) -> DFResult<ArrayRef> {
817 Ok(match data_type {
818 DataType::UInt64 => single_row_array(
819 UInt64Builder::with_capacity(1),
820 val.as_u64().or_else(|| val.as_i64().map(|v| v as u64)),
821 ),
822 DataType::Int64 => single_row_array(Int64Builder::with_capacity(1), val.as_i64()),
823 DataType::Int32 => single_row_array(
824 Int32Builder::with_capacity(1),
825 val.as_i64().map(|v| v as i32),
826 ),
827 DataType::Float64 => single_row_array(Float64Builder::with_capacity(1), val.as_f64()),
828 DataType::Boolean => single_row_array(BooleanBuilder::with_capacity(1), val.as_bool()),
829 DataType::Null => Arc::new(arrow_array::NullArray::new(1)) as ArrayRef,
830 _ => {
831 let mut b = StringBuilder::with_capacity(1, 64);
832 match val {
833 Value::Null => b.append_null(),
834 Value::String(s) => b.append_value(s),
835 other => b.append_value(format!("{other}")),
836 }
837 Arc::new(b.finish()) as ArrayRef
838 }
839 })
840}
841
842fn append_cross_join_row(
847 column_arrays: &mut [Vec<ArrayRef>],
848 input_row_arrays: &[ArrayRef],
849 sub_row: &HashMap<String, Value>,
850 output_schema: &SchemaRef,
851 num_input_cols: usize,
852) -> DFResult<()> {
853 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
855 column_arrays[col_idx].push(arr.clone());
856 }
857
858 let num_output_cols = output_schema.fields().len();
860 for (col_arr, field) in column_arrays[num_input_cols..num_output_cols]
861 .iter_mut()
862 .zip(output_schema.fields()[num_input_cols..num_output_cols].iter())
863 {
864 let col_name = field.name();
865 let val = sub_row.get(col_name).cloned().unwrap_or(Value::Null);
866 let arr = value_to_single_row_array(&val, field.data_type())?;
867 col_arr.push(arr);
868 }
869 Ok(())
870}
871
872fn concat_column_arrays(
876 column_arrays: &[Vec<ArrayRef>],
877 output_schema: &SchemaRef,
878) -> DFResult<RecordBatch> {
879 if column_arrays[0].is_empty() {
880 return Ok(RecordBatch::new_empty(output_schema.clone()));
881 }
882
883 let mut final_columns: Vec<ArrayRef> = Vec::with_capacity(column_arrays.len());
884 for arrays in column_arrays {
885 let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(|a| a.as_ref()).collect();
886 let concatenated = arrow::compute::concat(&refs).map_err(arrow_err)?;
887 final_columns.push(concatenated);
888 }
889
890 RecordBatch::try_new(output_schema.clone(), final_columns).map_err(arrow_err)
891}
892
893enum ApplyStreamState {
899 Running(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
901 Done,
903}
904
905struct ApplyStream {
907 state: ApplyStreamState,
908 schema: SchemaRef,
909 metrics: BaselineMetrics,
910}
911
912impl Stream for ApplyStream {
913 type Item = DFResult<RecordBatch>;
914
915 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
916 match &mut self.state {
917 ApplyStreamState::Running(fut) => match fut.as_mut().poll(cx) {
918 Poll::Ready(Ok(batch)) => {
919 self.metrics.record_output(batch.num_rows());
920 self.state = ApplyStreamState::Done;
921 Poll::Ready(Some(Ok(batch)))
922 }
923 Poll::Ready(Err(e)) => {
924 self.state = ApplyStreamState::Done;
925 Poll::Ready(Some(Err(e)))
926 }
927 Poll::Pending => Poll::Pending,
928 },
929 ApplyStreamState::Done => Poll::Ready(None),
930 }
931 }
932}
933
934impl RecordBatchStream for ApplyStream {
935 fn schema(&self) -> SchemaRef {
936 self.schema.clone()
937 }
938}