1use crate::query::df_graph::common::{
22 arrow_err, collect_all_partitions, compute_plan_properties, execute_subplan, extract_row_params,
23};
24use crate::query::df_graph::{GraphExecutionContext, MutationContext};
25use crate::query::planner::LogicalPlan;
26use arrow_array::builder::{
27 BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder, UInt64Builder,
28};
29use arrow_array::{ArrayRef, RecordBatch};
30use arrow_schema::{DataType, SchemaRef};
31use datafusion::common::Result as DFResult;
32use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
35use datafusion::prelude::SessionContext;
36use futures::Stream;
37use parking_lot::RwLock;
38use std::any::Any;
39use std::collections::HashMap;
40use std::collections::hash_map::DefaultHasher;
41use std::fmt;
42use std::hash::{Hash, Hasher};
43use std::pin::Pin;
44use std::sync::Arc;
45use std::task::{Context, Poll};
46use uni_common::Value;
47use uni_common::core::schema::Schema as UniSchema;
48use uni_cypher::ast::{Expr, UnaryOp};
49use uni_store::storage::manager::StorageManager;
50
51pub struct GraphApplyExec {
58 input_exec: Arc<dyn ExecutionPlan>,
61
62 subquery_plan: LogicalPlan,
64
65 input_filter: Option<Expr>,
67
68 graph_ctx: Arc<GraphExecutionContext>,
70
71 session_ctx: Arc<RwLock<SessionContext>>,
73
74 storage: Arc<StorageManager>,
76
77 schema_info: Arc<UniSchema>,
79
80 params: HashMap<String, Value>,
82
83 output_schema: SchemaRef,
86
87 kept_input_indices: Arc<[usize]>,
93
94 kept_input_overrides: Arc<[Option<(String, String)>]>,
100
101 properties: Arc<PlanProperties>,
103
104 mutation_ctx: Option<Arc<MutationContext>>,
108
109 metrics: ExecutionPlanMetricsSet,
111}
112
113impl fmt::Debug for GraphApplyExec {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 f.debug_struct("GraphApplyExec")
116 .field("has_input_filter", &self.input_filter.is_some())
117 .finish()
118 }
119}
120
121impl GraphApplyExec {
122 #[expect(clippy::too_many_arguments)]
124 pub fn new(
125 input_exec: Arc<dyn ExecutionPlan>,
126 subquery_plan: LogicalPlan,
127 input_filter: Option<Expr>,
128 graph_ctx: Arc<GraphExecutionContext>,
129 session_ctx: Arc<RwLock<SessionContext>>,
130 storage: Arc<StorageManager>,
131 schema_info: Arc<UniSchema>,
132 params: HashMap<String, Value>,
133 output_schema: SchemaRef,
134 kept_input_indices: Vec<usize>,
135 kept_input_overrides: Vec<Option<(String, String)>>,
136 mutation_ctx: Option<Arc<MutationContext>>,
137 ) -> Self {
138 let properties = compute_plan_properties(output_schema.clone());
139
140 Self {
141 input_exec,
142 subquery_plan,
143 input_filter,
144 graph_ctx,
145 session_ctx,
146 storage,
147 schema_info,
148 params,
149 output_schema,
150 kept_input_indices: kept_input_indices.into(),
151 kept_input_overrides: kept_input_overrides.into(),
152 properties,
153 mutation_ctx,
154 metrics: ExecutionPlanMetricsSet::new(),
155 }
156 }
157}
158
159impl DisplayAs for GraphApplyExec {
160 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
161 write!(
162 f,
163 "GraphApplyExec: filter={}",
164 if self.input_filter.is_some() {
165 "yes"
166 } else {
167 "none"
168 }
169 )
170 }
171}
172
173impl ExecutionPlan for GraphApplyExec {
174 fn name(&self) -> &str {
175 "GraphApplyExec"
176 }
177
178 fn as_any(&self) -> &dyn Any {
179 self
180 }
181
182 fn schema(&self) -> SchemaRef {
183 self.output_schema.clone()
184 }
185
186 fn properties(&self) -> &Arc<PlanProperties> {
187 &self.properties
188 }
189
190 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
191 vec![]
193 }
194
195 fn with_new_children(
196 self: Arc<Self>,
197 children: Vec<Arc<dyn ExecutionPlan>>,
198 ) -> DFResult<Arc<dyn ExecutionPlan>> {
199 if !children.is_empty() {
200 return Err(datafusion::error::DataFusionError::Plan(
201 "GraphApplyExec has no children".to_string(),
202 ));
203 }
204 Ok(self)
205 }
206
207 fn execute(
208 &self,
209 partition: usize,
210 _context: Arc<TaskContext>,
211 ) -> DFResult<SendableRecordBatchStream> {
212 let metrics = BaselineMetrics::new(&self.metrics, partition);
213
214 let input_exec = self.input_exec.clone();
215 let subquery_plan = self.subquery_plan.clone();
216 let input_filter = self.input_filter.clone();
217 let graph_ctx = self.graph_ctx.clone();
218 let session_ctx = self.session_ctx.clone();
219 let storage = self.storage.clone();
220 let schema_info = self.schema_info.clone();
221 let params = self.params.clone();
222 let output_schema = self.output_schema.clone();
223 let kept_input_indices = self.kept_input_indices.clone();
224 let kept_input_overrides = self.kept_input_overrides.clone();
225 let mutation_ctx = self.mutation_ctx.clone();
226
227 let fut = async move {
228 run_apply(
229 input_exec,
230 &subquery_plan,
231 input_filter.as_ref(),
232 &graph_ctx,
233 &session_ctx,
234 &storage,
235 &schema_info,
236 ¶ms,
237 &output_schema,
238 &kept_input_indices,
239 &kept_input_overrides,
240 mutation_ctx.as_ref(),
241 )
242 .await
243 };
244
245 Ok(Box::pin(ApplyStream {
246 state: ApplyStreamState::Running(Box::pin(fut)),
247 schema: self.output_schema.clone(),
248 metrics,
249 }))
250 }
251
252 fn metrics(&self) -> Option<MetricsSet> {
253 Some(self.metrics.clone_inner())
254 }
255}
256
257fn batches_to_row_maps(batches: &[RecordBatch]) -> Vec<HashMap<String, Value>> {
263 batches
264 .iter()
265 .flat_map(|batch| {
266 (0..batch.num_rows()).map(move |row_idx| extract_row_params(batch, row_idx))
267 })
268 .collect()
269}
270
271fn evaluate_filter(filter: &Expr, row: &HashMap<String, Value>) -> bool {
276 match filter {
277 Expr::BinaryOp { left, op, right } => {
278 use uni_cypher::ast::BinaryOp;
279 match op {
280 BinaryOp::And => evaluate_filter(left, row) && evaluate_filter(right, row),
281 BinaryOp::Or => evaluate_filter(left, row) || evaluate_filter(right, row),
282 _ => {
283 let left_val = resolve_expr_value(left, row);
284 let right_val = resolve_expr_value(right, row);
285 evaluate_comparison(op, &left_val, &right_val)
286 }
287 }
288 }
289 Expr::UnaryOp {
290 op: UnaryOp::Not,
291 expr,
292 } => !evaluate_filter(expr, row),
293 _ => {
294 let val = resolve_expr_value(filter, row);
296 val.as_bool().unwrap_or(false)
297 }
298 }
299}
300
301fn resolve_expr_value(expr: &Expr, row: &HashMap<String, Value>) -> Value {
303 match expr {
304 Expr::Literal(lit) => lit.to_value(),
305 Expr::Variable(name) => row.get(name).cloned().unwrap_or(Value::Null),
306 Expr::Property(base_expr, key) => {
307 if let Expr::Variable(var) = base_expr.as_ref() {
308 let col_name = format!("{}.{}", var, key);
310 row.get(&col_name).cloned().unwrap_or(Value::Null)
311 } else {
312 Value::Null
313 }
314 }
315 _ => Value::Null,
316 }
317}
318
319fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
321 match (a, b) {
322 (Value::Int(a), Value::Int(b)) => Some(a.cmp(b)),
323 (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
324 (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
325 (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
326 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
327 _ => None,
328 }
329}
330
331fn evaluate_comparison(op: &uni_cypher::ast::BinaryOp, left: &Value, right: &Value) -> bool {
336 use std::cmp::Ordering;
337 use uni_cypher::ast::BinaryOp;
338
339 match op {
340 BinaryOp::Eq => left == right,
341 BinaryOp::NotEq => left != right,
342 BinaryOp::Lt => compare_values(left, right) == Some(Ordering::Less),
343 BinaryOp::LtEq => matches!(
344 compare_values(left, right),
345 Some(Ordering::Less | Ordering::Equal)
346 ),
347 BinaryOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
348 BinaryOp::GtEq => matches!(
349 compare_values(left, right),
350 Some(Ordering::Greater | Ordering::Equal)
351 ),
352 _ => false,
353 }
354}
355
356fn build_column<B, T>(
361 rows: &[HashMap<String, Value>],
362 col_name: &str,
363 mut builder: B,
364 extract: impl Fn(&Value) -> Option<T>,
365) -> ArrayRef
366where
367 B: arrow_array::builder::ArrayBuilder,
368 B: PrimitiveAppend<T>,
369{
370 for row in rows {
371 match row.get(col_name).and_then(&extract) {
372 Some(v) => builder.append_typed_value(v),
373 None => builder.append_typed_null(),
374 }
375 }
376 Arc::new(builder.finish_to_array())
377}
378
379trait PrimitiveAppend<T> {
384 fn append_typed_value(&mut self, val: T);
385 fn append_typed_null(&mut self);
386 fn finish_to_array(self) -> ArrayRef;
387}
388
389macro_rules! impl_primitive_append {
390 ($builder:ty, $native:ty, $array:ty) => {
391 impl PrimitiveAppend<$native> for $builder {
392 fn append_typed_value(&mut self, val: $native) {
393 self.append_value(val);
394 }
395 fn append_typed_null(&mut self) {
396 self.append_null();
397 }
398 fn finish_to_array(mut self) -> ArrayRef {
399 Arc::new(self.finish()) as ArrayRef
400 }
401 }
402 };
403}
404
405impl_primitive_append!(UInt64Builder, u64, arrow_array::UInt64Array);
406impl_primitive_append!(Int64Builder, i64, arrow_array::Int64Array);
407impl_primitive_append!(Int32Builder, i32, arrow_array::Int32Array);
408impl_primitive_append!(Float64Builder, f64, arrow_array::Float64Array);
409impl_primitive_append!(BooleanBuilder, bool, arrow_array::BooleanArray);
410
411fn rows_to_batch(rows: &[HashMap<String, Value>], schema: &SchemaRef) -> DFResult<RecordBatch> {
413 if rows.is_empty() {
414 return Ok(RecordBatch::new_empty(schema.clone()));
415 }
416
417 let num_rows = rows.len();
418 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
419
420 for field in schema.fields() {
421 let col_name = field.name();
422 let col = match field.data_type() {
423 DataType::UInt64 => build_column(
424 rows,
425 col_name,
426 UInt64Builder::with_capacity(num_rows),
427 |v| v.as_u64().or_else(|| v.as_i64().map(|i| i as u64)),
428 ),
429 DataType::Int64 => build_column(
430 rows,
431 col_name,
432 Int64Builder::with_capacity(num_rows),
433 Value::as_i64,
434 ),
435 DataType::Int32 => {
436 build_column(rows, col_name, Int32Builder::with_capacity(num_rows), |v| {
437 v.as_i64().map(|i| i as i32)
438 })
439 }
440 DataType::Float64 => build_column(
441 rows,
442 col_name,
443 Float64Builder::with_capacity(num_rows),
444 Value::as_f64,
445 ),
446 DataType::Boolean => build_column(
447 rows,
448 col_name,
449 BooleanBuilder::with_capacity(num_rows),
450 Value::as_bool,
451 ),
452 DataType::LargeBinary => {
453 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(
454 num_rows,
455 num_rows * 64,
456 );
457 for row in rows {
458 match row.get(col_name) {
459 Some(val) if !val.is_null() => {
460 let cv_bytes = uni_common::cypher_value_codec::encode(val);
461 builder.append_value(&cv_bytes);
462 }
463 _ => builder.append_null(),
464 }
465 }
466 Arc::new(builder.finish()) as ArrayRef
467 }
468 DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
469 let mut builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
470 for row in rows {
471 match row.get(col_name) {
472 Some(Value::List(items)) => {
473 for item in items {
474 match item {
475 Value::String(s) => builder.values().append_value(s),
476 Value::Null => builder.values().append_null(),
477 other => builder.values().append_value(format!("{other}")),
478 }
479 }
480 builder.append(true);
481 }
482 _ => builder.append_null(),
483 }
484 }
485 Arc::new(builder.finish()) as ArrayRef
486 }
487 DataType::Null => Arc::new(arrow_array::NullArray::new(num_rows)) as ArrayRef,
488 _ => {
490 let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
491 for row in rows {
492 match row.get(col_name) {
493 Some(Value::Null) | None => builder.append_null(),
494 Some(Value::String(s)) => builder.append_value(s),
495 Some(other) => builder.append_value(format!("{other}")),
496 }
497 }
498 Arc::new(builder.finish()) as ArrayRef
499 }
500 };
501 columns.push(col);
502 }
503
504 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
505}
506
507fn slice_kept_row(batch: &RecordBatch, row_idx: usize, kept: &[usize]) -> Vec<ArrayRef> {
510 kept.iter()
511 .map(|&i| batch.column(i).slice(row_idx, 1))
512 .collect()
513}
514
515fn is_procedure_call(plan: &LogicalPlan) -> bool {
518 match plan {
519 LogicalPlan::ProcedureCall { .. } => true,
520 LogicalPlan::Project { input, .. }
521 | LogicalPlan::Filter { input, .. }
522 | LogicalPlan::Sort { input, .. }
523 | LogicalPlan::Limit { input, .. }
524 | LogicalPlan::Distinct { input } => is_procedure_call(input),
525 _ => false,
526 }
527}
528
529fn plan_contains_writes(plan: &LogicalPlan) -> bool {
534 use crate::query::planner::LogicalPlan as LP;
535 match plan {
536 LP::Create { .. }
537 | LP::CreateBatch { .. }
538 | LP::Merge { .. }
539 | LP::Delete { .. }
540 | LP::Set { .. }
541 | LP::Remove { .. }
542 | LP::Foreach { .. } => true,
543 LP::Project { input, .. }
544 | LP::Filter { input, .. }
545 | LP::Sort { input, .. }
546 | LP::Limit { input, .. }
547 | LP::Distinct { input }
548 | LP::Unwind { input, .. }
549 | LP::Aggregate { input, .. } => plan_contains_writes(input),
550 LP::Apply {
551 input, subquery, ..
552 }
553 | LP::SubqueryCall { input, subquery } => {
554 plan_contains_writes(input) || plan_contains_writes(subquery)
555 }
556 _ => false,
557 }
558}
559
560fn hash_row_params(params: &HashMap<String, Value>) -> u64 {
564 let mut hasher = DefaultHasher::new();
565 let mut entries: Vec<_> = params.iter().collect();
566 entries.sort_unstable_by_key(|(k, _)| *k);
567 for (key, val) in entries {
568 key.hash(&mut hasher);
569 format!("{val:?}").hash(&mut hasher);
570 }
571 hasher.finish()
572}
573
574fn is_batch_eligible(filtered_entries: &[(&RecordBatch, usize, HashMap<String, Value>)]) -> bool {
579 if filtered_entries.len() < 2 {
580 return false;
581 }
582
583 filtered_entries
585 .iter()
586 .any(|(_, _, row_params)| row_params.keys().any(|k| k.ends_with("._vid")))
587}
588
589#[expect(clippy::too_many_arguments)]
594async fn run_apply(
595 input_exec: Arc<dyn ExecutionPlan>,
596 subquery_plan: &LogicalPlan,
597 input_filter: Option<&Expr>,
598 graph_ctx: &Arc<GraphExecutionContext>,
599 session_ctx: &Arc<RwLock<SessionContext>>,
600 storage: &Arc<StorageManager>,
601 schema_info: &Arc<UniSchema>,
602 params: &HashMap<String, Value>,
603 output_schema: &SchemaRef,
604 kept_input_indices: &[usize],
605 kept_input_overrides: &[Option<(String, String)>],
606 mutation_ctx: Option<&Arc<MutationContext>>,
607) -> DFResult<RecordBatch> {
608 let apply_start = std::time::Instant::now();
609 let is_proc_call = is_procedure_call(subquery_plan);
610 tracing::debug!("run_apply: is_procedure_call={}", is_proc_call);
611
612 let task_ctx = session_ctx.read().task_ctx();
614 let input_batches = collect_all_partitions(&input_exec, task_ctx).await?;
615
616 let mut filtered_entries: Vec<(&RecordBatch, usize, HashMap<String, Value>)> = Vec::new();
619 for batch in &input_batches {
620 for row_idx in 0..batch.num_rows() {
621 let row_params = extract_row_params(batch, row_idx);
622 if let Some(filter) = input_filter
623 && !evaluate_filter(filter, &row_params)
624 {
625 continue;
626 }
627 filtered_entries.push((batch, row_idx, row_params));
628 }
629 }
630
631 tracing::debug!(
632 "run_apply: filtered_entries count = {}",
633 filtered_entries.len()
634 );
635
636 let subquery_has_writes = plan_contains_writes(subquery_plan);
637
638 let is_unit_subquery = output_schema.fields().len() == kept_input_indices.len();
647 if filtered_entries.is_empty() {
648 if is_unit_subquery || subquery_has_writes {
649 return Ok(RecordBatch::new_empty(output_schema.clone()));
650 }
651 let sub_batches = execute_subplan(
652 subquery_plan,
653 params,
654 &HashMap::new(), graph_ctx,
656 session_ctx,
657 storage,
658 schema_info,
659 mutation_ctx,
660 )
661 .await?;
662 let sub_rows = batches_to_row_maps(&sub_batches);
663 return rows_to_batch(&sub_rows, output_schema);
664 }
665
666 let has_filter = input_filter.is_some();
673
674 if is_batch_eligible(&filtered_entries) && !is_proc_call && has_filter && !subquery_has_writes {
675 tracing::debug!("run_apply: batching eligible, attempting batch execution");
676
677 let mut vid_values: HashMap<String, Vec<Value>> = HashMap::new();
679 for (_, _, row_params) in &filtered_entries {
680 for (key, value) in row_params {
681 if key.ends_with("._vid") {
682 vid_values
683 .entry(key.clone())
684 .or_default()
685 .push(value.clone());
686 }
687 }
688 }
689
690 let mut batched_params = params.clone();
692 for (key, values) in &vid_values {
693 batched_params.insert(key.clone(), Value::List(values.clone()));
694 }
695
696 if let Some((_, _, first_row_params)) = filtered_entries.first() {
699 for (key, value) in first_row_params {
700 if !key.ends_with("._vid") {
701 batched_params
702 .entry(key.clone())
703 .or_insert_with(|| value.clone());
704 }
705 }
706 }
707
708 let subplan_start = std::time::Instant::now();
710 let sub_batches = execute_subplan(
711 subquery_plan,
712 &batched_params,
713 &HashMap::new(),
714 graph_ctx,
715 session_ctx,
716 storage,
717 schema_info,
718 mutation_ctx,
719 )
720 .await?;
721 let subplan_elapsed = subplan_start.elapsed();
722 tracing::debug!(
723 "run_apply: batch execute_subplan took {:?}",
724 subplan_elapsed
725 );
726
727 let sub_rows = batches_to_row_maps(&sub_batches);
729 let mut sub_index: HashMap<i64, Vec<&HashMap<String, Value>>> = HashMap::new();
730
731 let vid_key = vid_values.keys().next().expect("at least one VID key");
733
734 for sub_row in &sub_rows {
735 if let Some(Value::Int(vid)) = sub_row.get(vid_key) {
736 sub_index.entry(*vid).or_default().push(sub_row);
737 }
738 }
739
740 let num_input_cols = kept_input_indices.len();
744 let num_output_cols = output_schema.fields().len();
745 let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
746
747 for (batch, row_idx, row_params) in &filtered_entries {
748 let input_vid = if let Some(Value::Int(vid)) = row_params.get(vid_key) {
750 *vid
751 } else {
752 continue; };
754
755 let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
756
757 if let Some(matching_sub_rows) = sub_index.get(&input_vid) {
759 for sub_row in matching_sub_rows {
760 append_cross_join_row(
761 &mut column_arrays,
762 &input_row_arrays,
763 sub_row,
764 output_schema,
765 num_input_cols,
766 kept_input_overrides,
767 is_unit_subquery,
768 )?;
769 }
770 } else if is_unit_subquery {
771 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
774 column_arrays[col_idx].push(arr.clone());
775 }
776 }
777 }
779
780 let result = concat_column_arrays(&column_arrays, output_schema);
781
782 let apply_elapsed = apply_start.elapsed();
783 tracing::debug!(
784 "run_apply: completed (batched) in {:?}, 1 subplan execution",
785 apply_elapsed
786 );
787
788 return result;
789 }
790
791 let num_input_cols = kept_input_indices.len();
798 let num_output_cols = output_schema.fields().len();
799 let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
801
802 let mut total_subplan_time = std::time::Duration::ZERO;
803 let mut subplan_executions = 0;
804
805 let mut subplan_cache: HashMap<u64, Vec<HashMap<String, Value>>> = HashMap::new();
807 let mut cache_hits = 0;
808
809 for (batch, row_idx, row_params) in &filtered_entries {
810 let (sub_params, sub_outer_values) = if is_procedure_call(subquery_plan) {
814 (params.clone(), row_params.clone())
816 } else {
817 let mut merged = params.clone();
819 merged.extend(row_params.clone());
820 (merged, HashMap::new())
821 };
822
823 let params_hash = hash_row_params(row_params);
825 let sub_rows = if let Some(cached_rows) = subplan_cache.get(¶ms_hash) {
826 cache_hits += 1;
828 tracing::debug!(
829 "run_apply: cache hit for params hash {}, skipping execute_subplan",
830 params_hash
831 );
832 cached_rows.clone()
833 } else {
834 let subplan_start = std::time::Instant::now();
836 let sub_batches = execute_subplan(
837 subquery_plan,
838 &sub_params,
839 &sub_outer_values,
840 graph_ctx,
841 session_ctx,
842 storage,
843 schema_info,
844 mutation_ctx,
845 )
846 .await?;
847 let subplan_elapsed = subplan_start.elapsed();
848 total_subplan_time += subplan_elapsed;
849 subplan_executions += 1;
850
851 tracing::debug!(
852 "run_apply: execute_subplan #{} took {:?}",
853 subplan_executions,
854 subplan_elapsed
855 );
856
857 let rows = batches_to_row_maps(&sub_batches);
858 subplan_cache.insert(params_hash, rows.clone());
859 rows
860 };
861
862 let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
863
864 if sub_rows.is_empty() {
865 if is_unit_subquery {
866 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
869 column_arrays[col_idx].push(arr.clone());
870 }
871 }
872 continue;
874 }
875
876 for sub_row in &sub_rows {
877 append_cross_join_row(
878 &mut column_arrays,
879 &input_row_arrays,
880 sub_row,
881 output_schema,
882 num_input_cols,
883 kept_input_overrides,
884 is_unit_subquery,
885 )?;
886 }
887 }
888
889 let result = concat_column_arrays(&column_arrays, output_schema);
891
892 let apply_elapsed = apply_start.elapsed();
893 tracing::debug!(
894 "run_apply: completed in {:?}, {} subplan executions, {} cache hits, {:?} total subplan time",
895 apply_elapsed,
896 subplan_executions,
897 cache_hits,
898 total_subplan_time
899 );
900
901 result
902}
903
904fn single_row_array<B, T>(mut builder: B, val: Option<T>) -> ArrayRef
906where
907 B: PrimitiveAppend<T>,
908{
909 match val {
910 Some(v) => builder.append_typed_value(v),
911 None => builder.append_typed_null(),
912 }
913 builder.finish_to_array()
914}
915
916fn value_to_single_row_array(val: &Value, data_type: &DataType) -> DFResult<ArrayRef> {
918 Ok(match data_type {
919 DataType::UInt64 => single_row_array(
920 UInt64Builder::with_capacity(1),
921 val.as_u64().or_else(|| val.as_i64().map(|v| v as u64)),
922 ),
923 DataType::Int64 => single_row_array(Int64Builder::with_capacity(1), val.as_i64()),
924 DataType::Int32 => single_row_array(
925 Int32Builder::with_capacity(1),
926 val.as_i64().map(|v| v as i32),
927 ),
928 DataType::Float64 => single_row_array(Float64Builder::with_capacity(1), val.as_f64()),
929 DataType::Boolean => single_row_array(BooleanBuilder::with_capacity(1), val.as_bool()),
930 DataType::Null => Arc::new(arrow_array::NullArray::new(1)) as ArrayRef,
931 DataType::LargeBinary => {
932 let mut b = arrow_array::builder::LargeBinaryBuilder::with_capacity(1, 64);
933 if val.is_null() {
934 b.append_null();
935 } else {
936 let cv_bytes = uni_common::cypher_value_codec::encode(val);
937 b.append_value(&cv_bytes);
938 }
939 Arc::new(b.finish()) as ArrayRef
940 }
941 DataType::Utf8 => {
942 let mut b = StringBuilder::with_capacity(1, 64);
943 match val {
944 Value::Null => b.append_null(),
945 Value::String(s) => b.append_value(s),
946 other => b.append_value(format!("{other}")),
947 }
948 Arc::new(b.finish()) as ArrayRef
949 }
950 DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
951 let mut b = arrow_array::builder::ListBuilder::new(StringBuilder::new());
952 match val {
953 Value::List(items) => {
954 for item in items {
955 match item {
956 Value::String(s) => b.values().append_value(s),
957 Value::Null => b.values().append_null(),
958 other => b.values().append_value(format!("{other}")),
959 }
960 }
961 b.append(true);
962 }
963 Value::Null => b.append_null(),
964 other => {
965 b.values().append_value(format!("{other}"));
966 b.append(true);
967 }
968 }
969 Arc::new(b.finish()) as ArrayRef
970 }
971 DataType::Struct(fields) => {
972 let map_view: Option<&HashMap<String, Value>> = match val {
980 Value::Map(m) => Some(m),
981 Value::Node(n) => Some(&n.properties),
982 Value::Edge(e) => Some(&e.properties),
983 _ => None,
984 };
985 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(fields.len());
986 for child_field in fields.iter() {
987 let child_val = map_view
988 .and_then(|m| m.get(child_field.name()))
989 .cloned()
990 .unwrap_or(Value::Null);
991 child_arrays.push(value_to_single_row_array(
992 &child_val,
993 child_field.data_type(),
994 )?);
995 }
996 let pairs: Vec<(Arc<arrow_schema::Field>, ArrayRef)> =
997 fields.iter().cloned().zip(child_arrays).collect();
998 Arc::new(arrow_array::StructArray::from(pairs)) as ArrayRef
999 }
1000 _ => {
1001 debug_assert!(
1002 false,
1003 "value_to_single_row_array: unhandled DataType {:?} — mirror the arm in rows_to_batch",
1004 data_type
1005 );
1006 let mut b = StringBuilder::with_capacity(1, 64);
1007 match val {
1008 Value::Null => b.append_null(),
1009 Value::String(s) => b.append_value(s),
1010 other => b.append_value(format!("{other}")),
1011 }
1012 Arc::new(b.finish()) as ArrayRef
1013 }
1014 })
1015}
1016
1017fn append_cross_join_row(
1033 column_arrays: &mut [Vec<ArrayRef>],
1034 input_row_arrays: &[ArrayRef],
1035 sub_row: &HashMap<String, Value>,
1036 output_schema: &SchemaRef,
1037 num_input_cols: usize,
1038 kept_input_overrides: &[Option<(String, String)>],
1039 is_unit_subquery: bool,
1040) -> DFResult<()> {
1041 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
1044 if let Some(Some((var, prop))) = kept_input_overrides.get(col_idx) {
1045 let extracted = match sub_row.get(var) {
1046 Some(Value::Map(m)) => m.get(prop).cloned().unwrap_or(Value::Null),
1047 Some(Value::Node(n)) => n.properties.get(prop).cloned().unwrap_or(Value::Null),
1048 Some(Value::Edge(e)) => e.properties.get(prop).cloned().unwrap_or(Value::Null),
1049 _ => Value::Null,
1050 };
1051 let field = &output_schema.fields()[col_idx];
1052 let new_arr = value_to_single_row_array(&extracted, field.data_type())?;
1053 column_arrays[col_idx].push(new_arr);
1054 continue;
1055 }
1056 if is_unit_subquery {
1057 let field = &output_schema.fields()[col_idx];
1070 let refreshed: Option<Value> = if let Some(dot) = field.name().find('.') {
1071 let base = &field.name()[..dot];
1072 let prop = &field.name()[dot + 1..];
1073 match sub_row.get(base) {
1074 Some(Value::Map(m)) => m.get(prop).cloned(),
1075 Some(Value::Node(n)) => n.properties.get(prop).cloned(),
1076 Some(Value::Edge(e)) => e.properties.get(prop).cloned(),
1077 _ => None,
1078 }
1079 } else {
1080 sub_row.get(field.name()).cloned()
1081 };
1082 if let Some(val) = refreshed {
1083 let new_arr = value_to_single_row_array(&val, field.data_type())?;
1084 column_arrays[col_idx].push(new_arr);
1085 continue;
1086 }
1087 }
1088 column_arrays[col_idx].push(arr.clone());
1089 }
1090
1091 let num_output_cols = output_schema.fields().len();
1093 for (col_arr, field) in column_arrays[num_input_cols..num_output_cols]
1094 .iter_mut()
1095 .zip(output_schema.fields()[num_input_cols..num_output_cols].iter())
1096 {
1097 let col_name = field.name();
1098 let val = sub_row.get(col_name).cloned().unwrap_or(Value::Null);
1099 let arr = value_to_single_row_array(&val, field.data_type())?;
1100 col_arr.push(arr);
1101 }
1102 Ok(())
1103}
1104
1105fn concat_column_arrays(
1109 column_arrays: &[Vec<ArrayRef>],
1110 output_schema: &SchemaRef,
1111) -> DFResult<RecordBatch> {
1112 if column_arrays[0].is_empty() {
1113 return Ok(RecordBatch::new_empty(output_schema.clone()));
1114 }
1115
1116 let mut final_columns: Vec<ArrayRef> = Vec::with_capacity(column_arrays.len());
1117 for arrays in column_arrays {
1118 let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(|a| a.as_ref()).collect();
1119 let concatenated = arrow::compute::concat(&refs).map_err(arrow_err)?;
1120 final_columns.push(concatenated);
1121 }
1122
1123 RecordBatch::try_new(output_schema.clone(), final_columns).map_err(arrow_err)
1124}
1125
1126enum ApplyStreamState {
1132 Running(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
1134 Done,
1136}
1137
1138struct ApplyStream {
1140 state: ApplyStreamState,
1141 schema: SchemaRef,
1142 metrics: BaselineMetrics,
1143}
1144
1145impl Stream for ApplyStream {
1146 type Item = DFResult<RecordBatch>;
1147
1148 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1149 let metrics = self.metrics.clone();
1150 let _timer = metrics.elapsed_compute().timer();
1151 match &mut self.state {
1152 ApplyStreamState::Running(fut) => match fut.as_mut().poll(cx) {
1153 Poll::Ready(Ok(batch)) => {
1154 self.metrics.record_output(batch.num_rows());
1155 self.state = ApplyStreamState::Done;
1156 Poll::Ready(Some(Ok(batch)))
1157 }
1158 Poll::Ready(Err(e)) => {
1159 self.state = ApplyStreamState::Done;
1160 Poll::Ready(Some(Err(e)))
1161 }
1162 Poll::Pending => Poll::Pending,
1163 },
1164 ApplyStreamState::Done => Poll::Ready(None),
1165 }
1166 }
1167}
1168
1169impl RecordBatchStream for ApplyStream {
1170 fn schema(&self) -> SchemaRef {
1171 self.schema.clone()
1172 }
1173}