1use crate::query::df_graph::common::{
22 arrow_err, collect_all_partitions, compute_plan_properties, execute_subplan, extract_row_params,
23};
24use crate::query::df_graph::{GraphExecutionContext, MutationContext};
25use crate::query::planner::LogicalPlan;
26use arrow_array::builder::{
27 BooleanBuilder, Float64Builder, Int32Builder, Int64Builder, StringBuilder, UInt64Builder,
28};
29use arrow_array::{ArrayRef, RecordBatch};
30use arrow_schema::{DataType, SchemaRef};
31use datafusion::common::Result as DFResult;
32use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
33use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
34use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
35use datafusion::prelude::SessionContext;
36use futures::Stream;
37use parking_lot::RwLock;
38use std::any::Any;
39use std::collections::{BTreeMap, HashMap};
40use std::fmt;
41use std::pin::Pin;
42use std::sync::Arc;
43use std::task::{Context, Poll};
44use uni_common::Value;
45use uni_common::core::schema::Schema as UniSchema;
46use uni_cypher::ast::{Expr, UnaryOp};
47use uni_store::storage::manager::StorageManager;
48
49pub struct GraphApplyExec {
56 input_exec: Arc<dyn ExecutionPlan>,
59
60 subquery_plan: LogicalPlan,
62
63 input_filter: Option<Expr>,
65
66 graph_ctx: Arc<GraphExecutionContext>,
68
69 session_ctx: Arc<RwLock<SessionContext>>,
71
72 storage: Arc<StorageManager>,
74
75 schema_info: Arc<UniSchema>,
77
78 params: HashMap<String, Value>,
80
81 output_schema: SchemaRef,
84
85 kept_input_indices: Arc<[usize]>,
91
92 kept_input_overrides: Arc<[Option<(String, String)>]>,
98
99 properties: Arc<PlanProperties>,
101
102 mutation_ctx: Option<Arc<MutationContext>>,
106
107 metrics: ExecutionPlanMetricsSet,
109}
110
111impl fmt::Debug for GraphApplyExec {
112 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
113 f.debug_struct("GraphApplyExec")
114 .field("has_input_filter", &self.input_filter.is_some())
115 .finish()
116 }
117}
118
119impl GraphApplyExec {
120 #[expect(clippy::too_many_arguments)]
122 pub fn new(
123 input_exec: Arc<dyn ExecutionPlan>,
124 subquery_plan: LogicalPlan,
125 input_filter: Option<Expr>,
126 graph_ctx: Arc<GraphExecutionContext>,
127 session_ctx: Arc<RwLock<SessionContext>>,
128 storage: Arc<StorageManager>,
129 schema_info: Arc<UniSchema>,
130 params: HashMap<String, Value>,
131 output_schema: SchemaRef,
132 kept_input_indices: Vec<usize>,
133 kept_input_overrides: Vec<Option<(String, String)>>,
134 mutation_ctx: Option<Arc<MutationContext>>,
135 ) -> Self {
136 let properties = compute_plan_properties(output_schema.clone());
137
138 Self {
139 input_exec,
140 subquery_plan,
141 input_filter,
142 graph_ctx,
143 session_ctx,
144 storage,
145 schema_info,
146 params,
147 output_schema,
148 kept_input_indices: kept_input_indices.into(),
149 kept_input_overrides: kept_input_overrides.into(),
150 properties,
151 mutation_ctx,
152 metrics: ExecutionPlanMetricsSet::new(),
153 }
154 }
155}
156
157impl DisplayAs for GraphApplyExec {
158 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
159 write!(
160 f,
161 "GraphApplyExec: filter={}",
162 if self.input_filter.is_some() {
163 "yes"
164 } else {
165 "none"
166 }
167 )
168 }
169}
170
171impl ExecutionPlan for GraphApplyExec {
172 fn name(&self) -> &str {
173 "GraphApplyExec"
174 }
175
176 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn schema(&self) -> SchemaRef {
181 self.output_schema.clone()
182 }
183
184 fn properties(&self) -> &Arc<PlanProperties> {
185 &self.properties
186 }
187
188 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
189 vec![]
191 }
192
193 fn with_new_children(
194 self: Arc<Self>,
195 children: Vec<Arc<dyn ExecutionPlan>>,
196 ) -> DFResult<Arc<dyn ExecutionPlan>> {
197 if !children.is_empty() {
198 return Err(datafusion::error::DataFusionError::Plan(
199 "GraphApplyExec has no children".to_string(),
200 ));
201 }
202 Ok(self)
203 }
204
205 fn execute(
206 &self,
207 partition: usize,
208 _context: Arc<TaskContext>,
209 ) -> DFResult<SendableRecordBatchStream> {
210 let metrics = BaselineMetrics::new(&self.metrics, partition);
211
212 let input_exec = self.input_exec.clone();
213 let subquery_plan = self.subquery_plan.clone();
214 let input_filter = self.input_filter.clone();
215 let graph_ctx = self.graph_ctx.clone();
216 let session_ctx = self.session_ctx.clone();
217 let storage = self.storage.clone();
218 let schema_info = self.schema_info.clone();
219 let params = self.params.clone();
220 let output_schema = self.output_schema.clone();
221 let kept_input_indices = self.kept_input_indices.clone();
222 let kept_input_overrides = self.kept_input_overrides.clone();
223 let mutation_ctx = self.mutation_ctx.clone();
224
225 let fut = async move {
226 run_apply(
227 input_exec,
228 &subquery_plan,
229 input_filter.as_ref(),
230 &graph_ctx,
231 &session_ctx,
232 &storage,
233 &schema_info,
234 ¶ms,
235 &output_schema,
236 &kept_input_indices,
237 &kept_input_overrides,
238 mutation_ctx.as_ref(),
239 )
240 .await
241 };
242
243 Ok(Box::pin(ApplyStream {
244 state: ApplyStreamState::Running(Box::pin(fut)),
245 schema: self.output_schema.clone(),
246 metrics,
247 }))
248 }
249
250 fn metrics(&self) -> Option<MetricsSet> {
251 Some(self.metrics.clone_inner())
252 }
253}
254
255fn batches_to_row_maps(batches: &[RecordBatch]) -> Vec<HashMap<String, Value>> {
261 batches
262 .iter()
263 .flat_map(|batch| {
264 (0..batch.num_rows()).map(move |row_idx| extract_row_params(batch, row_idx))
265 })
266 .collect()
267}
268
269fn evaluate_filter(filter: &Expr, row: &HashMap<String, Value>) -> bool {
274 match filter {
275 Expr::BinaryOp { left, op, right } => {
276 use uni_cypher::ast::BinaryOp;
277 match op {
278 BinaryOp::And => evaluate_filter(left, row) && evaluate_filter(right, row),
279 BinaryOp::Or => evaluate_filter(left, row) || evaluate_filter(right, row),
280 _ => {
281 let left_val = resolve_expr_value(left, row);
282 let right_val = resolve_expr_value(right, row);
283 evaluate_comparison(op, &left_val, &right_val)
284 }
285 }
286 }
287 Expr::UnaryOp {
288 op: UnaryOp::Not,
289 expr,
290 } => !evaluate_filter(expr, row),
291 _ => {
292 let val = resolve_expr_value(filter, row);
294 val.as_bool().unwrap_or(false)
295 }
296 }
297}
298
299fn resolve_expr_value(expr: &Expr, row: &HashMap<String, Value>) -> Value {
301 match expr {
302 Expr::Literal(lit) => lit.to_value(),
303 Expr::Variable(name) => row.get(name).cloned().unwrap_or(Value::Null),
304 Expr::Property(base_expr, key) => {
305 if let Expr::Variable(var) = base_expr.as_ref() {
306 let col_name = format!("{}.{}", var, key);
308 row.get(&col_name).cloned().unwrap_or(Value::Null)
309 } else {
310 Value::Null
311 }
312 }
313 _ => Value::Null,
314 }
315}
316
317fn compare_values(a: &Value, b: &Value) -> Option<std::cmp::Ordering> {
319 match (a, b) {
320 (Value::Int(a), Value::Int(b)) => Some(a.cmp(b)),
321 (Value::Float(a), Value::Float(b)) => a.partial_cmp(b),
322 (Value::Int(a), Value::Float(b)) => (*a as f64).partial_cmp(b),
323 (Value::Float(a), Value::Int(b)) => a.partial_cmp(&(*b as f64)),
324 (Value::String(a), Value::String(b)) => Some(a.cmp(b)),
325 _ => None,
326 }
327}
328
329fn evaluate_comparison(op: &uni_cypher::ast::BinaryOp, left: &Value, right: &Value) -> bool {
334 use std::cmp::Ordering;
335 use uni_cypher::ast::BinaryOp;
336
337 match op {
338 BinaryOp::Eq => left == right,
339 BinaryOp::NotEq => left != right,
340 BinaryOp::Lt => compare_values(left, right) == Some(Ordering::Less),
341 BinaryOp::LtEq => matches!(
342 compare_values(left, right),
343 Some(Ordering::Less | Ordering::Equal)
344 ),
345 BinaryOp::Gt => compare_values(left, right) == Some(Ordering::Greater),
346 BinaryOp::GtEq => matches!(
347 compare_values(left, right),
348 Some(Ordering::Greater | Ordering::Equal)
349 ),
350 _ => false,
351 }
352}
353
354fn build_column<B, T>(
359 rows: &[HashMap<String, Value>],
360 col_name: &str,
361 mut builder: B,
362 extract: impl Fn(&Value) -> Option<T>,
363) -> ArrayRef
364where
365 B: arrow_array::builder::ArrayBuilder,
366 B: PrimitiveAppend<T>,
367{
368 for row in rows {
369 match row.get(col_name).and_then(&extract) {
370 Some(v) => builder.append_typed_value(v),
371 None => builder.append_typed_null(),
372 }
373 }
374 Arc::new(builder.finish_to_array())
375}
376
377trait PrimitiveAppend<T> {
382 fn append_typed_value(&mut self, val: T);
383 fn append_typed_null(&mut self);
384 fn finish_to_array(self) -> ArrayRef;
385}
386
387macro_rules! impl_primitive_append {
388 ($builder:ty, $native:ty, $array:ty) => {
389 impl PrimitiveAppend<$native> for $builder {
390 fn append_typed_value(&mut self, val: $native) {
391 self.append_value(val);
392 }
393 fn append_typed_null(&mut self) {
394 self.append_null();
395 }
396 fn finish_to_array(mut self) -> ArrayRef {
397 Arc::new(self.finish()) as ArrayRef
398 }
399 }
400 };
401}
402
403impl_primitive_append!(UInt64Builder, u64, arrow_array::UInt64Array);
404impl_primitive_append!(Int64Builder, i64, arrow_array::Int64Array);
405impl_primitive_append!(Int32Builder, i32, arrow_array::Int32Array);
406impl_primitive_append!(Float64Builder, f64, arrow_array::Float64Array);
407impl_primitive_append!(BooleanBuilder, bool, arrow_array::BooleanArray);
408
409fn rows_to_batch(rows: &[HashMap<String, Value>], schema: &SchemaRef) -> DFResult<RecordBatch> {
411 if rows.is_empty() {
412 return Ok(RecordBatch::new_empty(schema.clone()));
413 }
414
415 let num_rows = rows.len();
416 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
417
418 for field in schema.fields() {
419 let col_name = field.name();
420 let col = match field.data_type() {
421 DataType::UInt64 => build_column(
422 rows,
423 col_name,
424 UInt64Builder::with_capacity(num_rows),
425 |v| v.as_u64().or_else(|| v.as_i64().map(|i| i as u64)),
426 ),
427 DataType::Int64 => build_column(
428 rows,
429 col_name,
430 Int64Builder::with_capacity(num_rows),
431 Value::as_i64,
432 ),
433 DataType::Int32 => {
434 build_column(rows, col_name, Int32Builder::with_capacity(num_rows), |v| {
435 v.as_i64().map(|i| i as i32)
436 })
437 }
438 DataType::Float64 => build_column(
439 rows,
440 col_name,
441 Float64Builder::with_capacity(num_rows),
442 Value::as_f64,
443 ),
444 DataType::Boolean => build_column(
445 rows,
446 col_name,
447 BooleanBuilder::with_capacity(num_rows),
448 Value::as_bool,
449 ),
450 DataType::LargeBinary => {
451 let mut builder = arrow_array::builder::LargeBinaryBuilder::with_capacity(
452 num_rows,
453 num_rows * 64,
454 );
455 for row in rows {
456 match row.get(col_name) {
457 Some(val) if !val.is_null() => {
458 let cv_bytes = uni_common::cypher_value_codec::encode(val);
459 builder.append_value(&cv_bytes);
460 }
461 _ => builder.append_null(),
462 }
463 }
464 Arc::new(builder.finish()) as ArrayRef
465 }
466 DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
467 let mut builder = arrow_array::builder::ListBuilder::new(StringBuilder::new());
468 for row in rows {
469 match row.get(col_name) {
470 Some(Value::List(items)) => {
471 for item in items {
472 match item {
473 Value::String(s) => builder.values().append_value(s),
474 Value::Null => builder.values().append_null(),
475 other => builder.values().append_value(format!("{other}")),
476 }
477 }
478 builder.append(true);
479 }
480 _ => builder.append_null(),
481 }
482 }
483 Arc::new(builder.finish()) as ArrayRef
484 }
485 DataType::Null => Arc::new(arrow_array::NullArray::new(num_rows)) as ArrayRef,
486 _ => {
488 let mut builder = StringBuilder::with_capacity(num_rows, num_rows * 32);
489 for row in rows {
490 match row.get(col_name) {
491 Some(Value::Null) | None => builder.append_null(),
492 Some(Value::String(s)) => builder.append_value(s),
493 Some(other) => builder.append_value(format!("{other}")),
494 }
495 }
496 Arc::new(builder.finish()) as ArrayRef
497 }
498 };
499 columns.push(col);
500 }
501
502 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
503}
504
505fn slice_kept_row(batch: &RecordBatch, row_idx: usize, kept: &[usize]) -> Vec<ArrayRef> {
508 kept.iter()
509 .map(|&i| batch.column(i).slice(row_idx, 1))
510 .collect()
511}
512
513fn is_procedure_call(plan: &LogicalPlan) -> bool {
516 match plan {
517 LogicalPlan::ProcedureCall { .. } => true,
518 LogicalPlan::Project { input, .. }
519 | LogicalPlan::Filter { input, .. }
520 | LogicalPlan::Sort { input, .. }
521 | LogicalPlan::Limit { input, .. }
522 | LogicalPlan::Distinct { input } => is_procedure_call(input),
523 _ => false,
524 }
525}
526
527fn plan_contains_writes(plan: &LogicalPlan) -> bool {
532 use crate::query::planner::LogicalPlan as LP;
533 match plan {
534 LP::Create { .. }
535 | LP::CreateBatch { .. }
536 | LP::Merge { .. }
537 | LP::Delete { .. }
538 | LP::Set { .. }
539 | LP::Remove { .. }
540 | LP::Foreach { .. } => true,
541 LP::Project { input, .. }
542 | LP::Filter { input, .. }
543 | LP::Sort { input, .. }
544 | LP::Limit { input, .. }
545 | LP::Distinct { input }
546 | LP::Unwind { input, .. }
547 | LP::Aggregate { input, .. } => plan_contains_writes(input),
548 LP::Apply {
549 input, subquery, ..
550 }
551 | LP::SubqueryCall { input, subquery } => {
552 plan_contains_writes(input) || plan_contains_writes(subquery)
553 }
554 _ => false,
555 }
556}
557
558fn canonical_params_key(params: &HashMap<String, Value>) -> BTreeMap<String, Value> {
566 params.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
567}
568
569fn is_batch_eligible(filtered_entries: &[(&RecordBatch, usize, HashMap<String, Value>)]) -> bool {
574 if filtered_entries.len() < 2 {
575 return false;
576 }
577
578 filtered_entries
580 .iter()
581 .any(|(_, _, row_params)| row_params.keys().any(|k| k.ends_with("._vid")))
582}
583
584#[expect(clippy::too_many_arguments)]
589async fn run_apply(
590 input_exec: Arc<dyn ExecutionPlan>,
591 subquery_plan: &LogicalPlan,
592 input_filter: Option<&Expr>,
593 graph_ctx: &Arc<GraphExecutionContext>,
594 session_ctx: &Arc<RwLock<SessionContext>>,
595 storage: &Arc<StorageManager>,
596 schema_info: &Arc<UniSchema>,
597 params: &HashMap<String, Value>,
598 output_schema: &SchemaRef,
599 kept_input_indices: &[usize],
600 kept_input_overrides: &[Option<(String, String)>],
601 mutation_ctx: Option<&Arc<MutationContext>>,
602) -> DFResult<RecordBatch> {
603 let apply_start = std::time::Instant::now();
604 let is_proc_call = is_procedure_call(subquery_plan);
605 tracing::debug!("run_apply: is_procedure_call={}", is_proc_call);
606
607 let task_ctx = session_ctx.read().task_ctx();
609 let input_batches = collect_all_partitions(&input_exec, task_ctx).await?;
610
611 let mut filtered_entries: Vec<(&RecordBatch, usize, HashMap<String, Value>)> = Vec::new();
614 for batch in &input_batches {
615 for row_idx in 0..batch.num_rows() {
616 let row_params = extract_row_params(batch, row_idx);
617 if let Some(filter) = input_filter
618 && !evaluate_filter(filter, &row_params)
619 {
620 continue;
621 }
622 filtered_entries.push((batch, row_idx, row_params));
623 }
624 }
625
626 tracing::debug!(
627 "run_apply: filtered_entries count = {}",
628 filtered_entries.len()
629 );
630
631 let subquery_has_writes = plan_contains_writes(subquery_plan);
632
633 let is_unit_subquery = output_schema.fields().len() == kept_input_indices.len();
642 if filtered_entries.is_empty() {
643 if is_unit_subquery || subquery_has_writes {
644 return Ok(RecordBatch::new_empty(output_schema.clone()));
645 }
646 let sub_batches = execute_subplan(
647 subquery_plan,
648 params,
649 &HashMap::new(), graph_ctx,
651 session_ctx,
652 storage,
653 schema_info,
654 mutation_ctx,
655 )
656 .await?;
657 let sub_rows = batches_to_row_maps(&sub_batches);
658 return rows_to_batch(&sub_rows, output_schema);
659 }
660
661 let has_filter = input_filter.is_some();
668
669 if is_batch_eligible(&filtered_entries) && !is_proc_call && has_filter && !subquery_has_writes {
670 tracing::debug!("run_apply: batching eligible, attempting batch execution");
671
672 let mut vid_values: HashMap<String, Vec<Value>> = HashMap::new();
674 for (_, _, row_params) in &filtered_entries {
675 for (key, value) in row_params {
676 if key.ends_with("._vid") {
677 vid_values
678 .entry(key.clone())
679 .or_default()
680 .push(value.clone());
681 }
682 }
683 }
684
685 let mut batched_params = params.clone();
687 for (key, values) in &vid_values {
688 batched_params.insert(key.clone(), Value::List(values.clone()));
689 }
690
691 if let Some((_, _, first_row_params)) = filtered_entries.first() {
694 for (key, value) in first_row_params {
695 if !key.ends_with("._vid") {
696 batched_params
697 .entry(key.clone())
698 .or_insert_with(|| value.clone());
699 }
700 }
701 }
702
703 let subplan_start = std::time::Instant::now();
705 let sub_batches = execute_subplan(
706 subquery_plan,
707 &batched_params,
708 &HashMap::new(),
709 graph_ctx,
710 session_ctx,
711 storage,
712 schema_info,
713 mutation_ctx,
714 )
715 .await?;
716 let subplan_elapsed = subplan_start.elapsed();
717 tracing::debug!(
718 "run_apply: batch execute_subplan took {:?}",
719 subplan_elapsed
720 );
721
722 let sub_rows = batches_to_row_maps(&sub_batches);
724 let mut sub_index: HashMap<i64, Vec<&HashMap<String, Value>>> = HashMap::new();
725
726 let vid_key = vid_values.keys().next().expect("at least one VID key");
728
729 for sub_row in &sub_rows {
730 if let Some(Value::Int(vid)) = sub_row.get(vid_key) {
731 sub_index.entry(*vid).or_default().push(sub_row);
732 }
733 }
734
735 let num_input_cols = kept_input_indices.len();
739 let num_output_cols = output_schema.fields().len();
740 let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
741
742 for (batch, row_idx, row_params) in &filtered_entries {
743 let input_vid = if let Some(Value::Int(vid)) = row_params.get(vid_key) {
745 *vid
746 } else {
747 continue; };
749
750 let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
751
752 if let Some(matching_sub_rows) = sub_index.get(&input_vid) {
754 for sub_row in matching_sub_rows {
755 append_cross_join_row(
756 &mut column_arrays,
757 &input_row_arrays,
758 sub_row,
759 output_schema,
760 num_input_cols,
761 kept_input_overrides,
762 is_unit_subquery,
763 )?;
764 }
765 } else if is_unit_subquery {
766 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
769 column_arrays[col_idx].push(arr.clone());
770 }
771 }
772 }
774
775 let result = concat_column_arrays(&column_arrays, output_schema);
776
777 let apply_elapsed = apply_start.elapsed();
778 tracing::debug!(
779 "run_apply: completed (batched) in {:?}, 1 subplan execution",
780 apply_elapsed
781 );
782
783 return result;
784 }
785
786 let num_input_cols = kept_input_indices.len();
793 let num_output_cols = output_schema.fields().len();
794 let mut column_arrays: Vec<Vec<ArrayRef>> = vec![Vec::new(); num_output_cols];
796
797 let mut total_subplan_time = std::time::Duration::ZERO;
798 let mut subplan_executions = 0;
799
800 let mut subplan_cache: HashMap<BTreeMap<String, Value>, Vec<HashMap<String, Value>>> =
804 HashMap::new();
805 let mut cache_hits = 0;
806
807 for (batch, row_idx, row_params) in &filtered_entries {
808 let (sub_params, sub_outer_values) = if is_procedure_call(subquery_plan) {
812 (params.clone(), row_params.clone())
814 } else {
815 let mut merged = params.clone();
817 merged.extend(row_params.clone());
818 (merged, HashMap::new())
819 };
820
821 let params_key = canonical_params_key(row_params);
823 let sub_rows = if let Some(cached_rows) = subplan_cache.get(¶ms_key) {
824 cache_hits += 1;
826 tracing::debug!("run_apply: cache hit for row params, skipping execute_subplan");
827 cached_rows.clone()
828 } else {
829 let subplan_start = std::time::Instant::now();
831 let sub_batches = execute_subplan(
832 subquery_plan,
833 &sub_params,
834 &sub_outer_values,
835 graph_ctx,
836 session_ctx,
837 storage,
838 schema_info,
839 mutation_ctx,
840 )
841 .await?;
842 let subplan_elapsed = subplan_start.elapsed();
843 total_subplan_time += subplan_elapsed;
844 subplan_executions += 1;
845
846 tracing::debug!(
847 "run_apply: execute_subplan #{} took {:?}",
848 subplan_executions,
849 subplan_elapsed
850 );
851
852 let rows = batches_to_row_maps(&sub_batches);
853 subplan_cache.insert(params_key, rows.clone());
854 rows
855 };
856
857 let input_row_arrays = slice_kept_row(batch, *row_idx, kept_input_indices);
858
859 if sub_rows.is_empty() {
860 if is_unit_subquery {
861 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
864 column_arrays[col_idx].push(arr.clone());
865 }
866 }
867 continue;
869 }
870
871 for sub_row in &sub_rows {
872 append_cross_join_row(
873 &mut column_arrays,
874 &input_row_arrays,
875 sub_row,
876 output_schema,
877 num_input_cols,
878 kept_input_overrides,
879 is_unit_subquery,
880 )?;
881 }
882 }
883
884 let result = concat_column_arrays(&column_arrays, output_schema);
886
887 let apply_elapsed = apply_start.elapsed();
888 tracing::debug!(
889 "run_apply: completed in {:?}, {} subplan executions, {} cache hits, {:?} total subplan time",
890 apply_elapsed,
891 subplan_executions,
892 cache_hits,
893 total_subplan_time
894 );
895
896 result
897}
898
899fn single_row_array<B, T>(mut builder: B, val: Option<T>) -> ArrayRef
901where
902 B: PrimitiveAppend<T>,
903{
904 match val {
905 Some(v) => builder.append_typed_value(v),
906 None => builder.append_typed_null(),
907 }
908 builder.finish_to_array()
909}
910
911fn value_to_single_row_array(val: &Value, data_type: &DataType) -> DFResult<ArrayRef> {
913 Ok(match data_type {
914 DataType::UInt64 => single_row_array(
915 UInt64Builder::with_capacity(1),
916 val.as_u64().or_else(|| val.as_i64().map(|v| v as u64)),
917 ),
918 DataType::Int64 => single_row_array(Int64Builder::with_capacity(1), val.as_i64()),
919 DataType::Int32 => single_row_array(
920 Int32Builder::with_capacity(1),
921 val.as_i64().map(|v| v as i32),
922 ),
923 DataType::Float64 => single_row_array(Float64Builder::with_capacity(1), val.as_f64()),
924 DataType::Boolean => single_row_array(BooleanBuilder::with_capacity(1), val.as_bool()),
925 DataType::Null => Arc::new(arrow_array::NullArray::new(1)) as ArrayRef,
926 DataType::LargeBinary => {
927 let mut b = arrow_array::builder::LargeBinaryBuilder::with_capacity(1, 64);
928 if val.is_null() {
929 b.append_null();
930 } else {
931 let cv_bytes = uni_common::cypher_value_codec::encode(val);
932 b.append_value(&cv_bytes);
933 }
934 Arc::new(b.finish()) as ArrayRef
935 }
936 DataType::Utf8 => {
937 let mut b = StringBuilder::with_capacity(1, 64);
938 match val {
939 Value::Null => b.append_null(),
940 Value::String(s) => b.append_value(s),
941 other => b.append_value(format!("{other}")),
942 }
943 Arc::new(b.finish()) as ArrayRef
944 }
945 DataType::List(inner_field) if inner_field.data_type() == &DataType::Utf8 => {
946 let mut b = arrow_array::builder::ListBuilder::new(StringBuilder::new());
947 match val {
948 Value::List(items) => {
949 for item in items {
950 match item {
951 Value::String(s) => b.values().append_value(s),
952 Value::Null => b.values().append_null(),
953 other => b.values().append_value(format!("{other}")),
954 }
955 }
956 b.append(true);
957 }
958 Value::Null => b.append_null(),
959 other => {
960 b.values().append_value(format!("{other}"));
961 b.append(true);
962 }
963 }
964 Arc::new(b.finish()) as ArrayRef
965 }
966 DataType::Struct(fields) => {
967 let map_view: Option<&HashMap<String, Value>> = match val {
975 Value::Map(m) => Some(m),
976 Value::Node(n) => Some(&n.properties),
977 Value::Edge(e) => Some(&e.properties),
978 _ => None,
979 };
980 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(fields.len());
981 for child_field in fields.iter() {
982 let child_val = map_view
983 .and_then(|m| m.get(child_field.name()))
984 .cloned()
985 .unwrap_or(Value::Null);
986 child_arrays.push(value_to_single_row_array(
987 &child_val,
988 child_field.data_type(),
989 )?);
990 }
991 let pairs: Vec<(Arc<arrow_schema::Field>, ArrayRef)> =
992 fields.iter().cloned().zip(child_arrays).collect();
993 Arc::new(arrow_array::StructArray::from(pairs)) as ArrayRef
994 }
995 _ => {
996 debug_assert!(
997 false,
998 "value_to_single_row_array: unhandled DataType {:?} — mirror the arm in rows_to_batch",
999 data_type
1000 );
1001 let mut b = StringBuilder::with_capacity(1, 64);
1002 match val {
1003 Value::Null => b.append_null(),
1004 Value::String(s) => b.append_value(s),
1005 other => b.append_value(format!("{other}")),
1006 }
1007 Arc::new(b.finish()) as ArrayRef
1008 }
1009 })
1010}
1011
1012fn append_cross_join_row(
1028 column_arrays: &mut [Vec<ArrayRef>],
1029 input_row_arrays: &[ArrayRef],
1030 sub_row: &HashMap<String, Value>,
1031 output_schema: &SchemaRef,
1032 num_input_cols: usize,
1033 kept_input_overrides: &[Option<(String, String)>],
1034 is_unit_subquery: bool,
1035) -> DFResult<()> {
1036 for (col_idx, arr) in input_row_arrays.iter().enumerate() {
1039 if let Some(Some((var, prop))) = kept_input_overrides.get(col_idx) {
1040 let extracted = match sub_row.get(var) {
1041 Some(Value::Map(m)) => m.get(prop).cloned().unwrap_or(Value::Null),
1042 Some(Value::Node(n)) => n.properties.get(prop).cloned().unwrap_or(Value::Null),
1043 Some(Value::Edge(e)) => e.properties.get(prop).cloned().unwrap_or(Value::Null),
1044 _ => Value::Null,
1045 };
1046 let field = &output_schema.fields()[col_idx];
1047 let new_arr = value_to_single_row_array(&extracted, field.data_type())?;
1048 column_arrays[col_idx].push(new_arr);
1049 continue;
1050 }
1051 if is_unit_subquery {
1052 let field = &output_schema.fields()[col_idx];
1065 let refreshed: Option<Value> = if let Some(dot) = field.name().find('.') {
1066 let base = &field.name()[..dot];
1067 let prop = &field.name()[dot + 1..];
1068 match sub_row.get(base) {
1069 Some(Value::Map(m)) => m.get(prop).cloned(),
1070 Some(Value::Node(n)) => n.properties.get(prop).cloned(),
1071 Some(Value::Edge(e)) => e.properties.get(prop).cloned(),
1072 _ => None,
1073 }
1074 } else {
1075 sub_row.get(field.name()).cloned()
1076 };
1077 if let Some(val) = refreshed {
1078 let new_arr = value_to_single_row_array(&val, field.data_type())?;
1079 column_arrays[col_idx].push(new_arr);
1080 continue;
1081 }
1082 }
1083 column_arrays[col_idx].push(arr.clone());
1084 }
1085
1086 let num_output_cols = output_schema.fields().len();
1088 for (col_arr, field) in column_arrays[num_input_cols..num_output_cols]
1089 .iter_mut()
1090 .zip(output_schema.fields()[num_input_cols..num_output_cols].iter())
1091 {
1092 let col_name = field.name();
1093 let val = sub_row.get(col_name).cloned().unwrap_or(Value::Null);
1094 let arr = value_to_single_row_array(&val, field.data_type())?;
1095 col_arr.push(arr);
1096 }
1097 Ok(())
1098}
1099
1100fn concat_column_arrays(
1104 column_arrays: &[Vec<ArrayRef>],
1105 output_schema: &SchemaRef,
1106) -> DFResult<RecordBatch> {
1107 if column_arrays[0].is_empty() {
1108 return Ok(RecordBatch::new_empty(output_schema.clone()));
1109 }
1110
1111 let mut final_columns: Vec<ArrayRef> = Vec::with_capacity(column_arrays.len());
1112 for arrays in column_arrays {
1113 let refs: Vec<&dyn arrow_array::Array> = arrays.iter().map(|a| a.as_ref()).collect();
1114 let concatenated = arrow::compute::concat(&refs).map_err(arrow_err)?;
1115 final_columns.push(concatenated);
1116 }
1117
1118 RecordBatch::try_new(output_schema.clone(), final_columns).map_err(arrow_err)
1119}
1120
1121enum ApplyStreamState {
1127 Running(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
1129 Done,
1131}
1132
1133struct ApplyStream {
1135 state: ApplyStreamState,
1136 schema: SchemaRef,
1137 metrics: BaselineMetrics,
1138}
1139
1140impl Stream for ApplyStream {
1141 type Item = DFResult<RecordBatch>;
1142
1143 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1144 let metrics = self.metrics.clone();
1145 let _timer = metrics.elapsed_compute().timer();
1146 match &mut self.state {
1147 ApplyStreamState::Running(fut) => match fut.as_mut().poll(cx) {
1148 Poll::Ready(Ok(batch)) => {
1149 self.metrics.record_output(batch.num_rows());
1150 self.state = ApplyStreamState::Done;
1151 Poll::Ready(Some(Ok(batch)))
1152 }
1153 Poll::Ready(Err(e)) => {
1154 self.state = ApplyStreamState::Done;
1155 Poll::Ready(Some(Err(e)))
1156 }
1157 Poll::Pending => Poll::Pending,
1158 },
1159 ApplyStreamState::Done => Poll::Ready(None),
1160 }
1161 }
1162}
1163
1164impl RecordBatchStream for ApplyStream {
1165 fn schema(&self) -> SchemaRef {
1166 self.schema.clone()
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::*;
1173
1174 fn params(pairs: &[(&str, Value)]) -> HashMap<String, Value> {
1175 pairs
1176 .iter()
1177 .map(|(k, v)| (k.to_string(), v.clone()))
1178 .collect()
1179 }
1180
1181 #[test]
1185 fn test_canonical_params_key_distinguishes_by_value() {
1186 let a = canonical_params_key(¶ms(&[("x", Value::Int(1))]));
1187 let b = canonical_params_key(¶ms(&[("x", Value::Int(2))]));
1188 let c = canonical_params_key(¶ms(&[("x", Value::Int(1))]));
1189 assert_ne!(a, b, "different values must yield different keys");
1190 assert_eq!(a, c, "equal values must yield equal keys");
1191
1192 let m1 = params(&[("a", Value::Int(1)), ("b", Value::String("z".into()))]);
1194 let m2 = params(&[("b", Value::String("z".into())), ("a", Value::Int(1))]);
1195 assert_eq!(canonical_params_key(&m1), canonical_params_key(&m2));
1196
1197 let mut cache: HashMap<BTreeMap<String, Value>, &str> = HashMap::new();
1199 cache.insert(a.clone(), "row-1");
1200 cache.insert(b.clone(), "row-2");
1201 assert_eq!(cache.get(&a), Some(&"row-1"));
1202 assert_eq!(cache.get(&b), Some(&"row-2"));
1203 assert_eq!(cache.len(), 2);
1204 }
1205}