1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68fn prepend_existing_path(
75 existing_path: &arrow_array::StructArray,
76 row_idx: usize,
77 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79 query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81 let nodes_list = existing_path
83 .column(0)
84 .as_any()
85 .downcast_ref::<arrow_array::ListArray>()
86 .unwrap();
87 let node_values = nodes_list.value(row_idx);
88 let node_struct = node_values
89 .as_any()
90 .downcast_ref::<arrow_array::StructArray>()
91 .unwrap();
92 let vid_col = node_struct
93 .column(0)
94 .as_any()
95 .downcast_ref::<UInt64Array>()
96 .unwrap();
97 for i in 0..vid_col.len() {
98 append_node_to_struct(
99 nodes_builder.values(),
100 Vid::from(vid_col.value(i)),
101 query_ctx,
102 );
103 }
104
105 let rels_list = existing_path
107 .column(1)
108 .as_any()
109 .downcast_ref::<arrow_array::ListArray>()
110 .unwrap();
111 let edge_values = rels_list.value(row_idx);
112 let edge_struct = edge_values
113 .as_any()
114 .downcast_ref::<arrow_array::StructArray>()
115 .unwrap();
116 let eid_col = edge_struct
117 .column(0)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap();
121 let type_col = edge_struct
122 .column(1)
123 .as_any()
124 .downcast_ref::<arrow_array::StringArray>()
125 .unwrap();
126 let src_col = edge_struct
127 .column(2)
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .unwrap();
131 let dst_col = edge_struct
132 .column(3)
133 .as_any()
134 .downcast_ref::<UInt64Array>()
135 .unwrap();
136 for i in 0..eid_col.len() {
137 append_edge_to_struct(
138 rels_builder.values(),
139 Eid::from(eid_col.value(i)),
140 type_col.value(i),
141 src_col.value(i),
142 dst_col.value(i),
143 query_ctx,
144 );
145 }
146}
147
148fn resolve_edge_property_type(
153 prop: &str,
154 schema_props: Option<
155 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156 >,
157) -> DataType {
158 if prop == "overflow_json" {
159 DataType::LargeBinary
160 } else {
161 schema_props
162 .and_then(|props| props.get(prop))
163 .map(|meta| meta.r#type.to_arrow())
164 .unwrap_or(DataType::LargeBinary)
165 }
166}
167
168use crate::query::df_graph::common::merged_edge_schema_props;
169
170type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
172
173pub struct GraphTraverseExec {
198 input: Arc<dyn ExecutionPlan>,
200
201 source_column: String,
203
204 edge_type_ids: Vec<u32>,
206
207 direction: Direction,
209
210 target_variable: String,
212
213 edge_variable: Option<String>,
215
216 edge_properties: Vec<String>,
218
219 target_properties: Vec<String>,
221
222 target_label_name: Option<String>,
224
225 target_label_id: Option<u16>,
227
228 graph_ctx: Arc<GraphExecutionContext>,
230
231 optional: bool,
233
234 optional_pattern_vars: HashSet<String>,
237
238 bound_target_column: Option<String>,
241
242 used_edge_columns: Vec<String>,
245
246 schema: SchemaRef,
248
249 properties: PlanProperties,
251
252 metrics: ExecutionPlanMetricsSet,
254}
255
256impl fmt::Debug for GraphTraverseExec {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("GraphTraverseExec")
259 .field("source_column", &self.source_column)
260 .field("edge_type_ids", &self.edge_type_ids)
261 .field("direction", &self.direction)
262 .field("target_variable", &self.target_variable)
263 .field("edge_variable", &self.edge_variable)
264 .finish()
265 }
266}
267
268impl GraphTraverseExec {
269 #[expect(clippy::too_many_arguments)]
285 pub fn new(
286 input: Arc<dyn ExecutionPlan>,
287 source_column: impl Into<String>,
288 edge_type_ids: Vec<u32>,
289 direction: Direction,
290 target_variable: impl Into<String>,
291 edge_variable: Option<String>,
292 edge_properties: Vec<String>,
293 target_properties: Vec<String>,
294 target_label_name: Option<String>,
295 target_label_id: Option<u16>,
296 graph_ctx: Arc<GraphExecutionContext>,
297 optional: bool,
298 optional_pattern_vars: HashSet<String>,
299 bound_target_column: Option<String>,
300 used_edge_columns: Vec<String>,
301 ) -> Self {
302 let source_column = source_column.into();
303 let target_variable = target_variable.into();
304
305 let uni_schema = graph_ctx.storage().schema_manager().schema();
307 let label_props = target_label_name
308 .as_deref()
309 .and_then(|ln| uni_schema.properties.get(ln));
310 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
311 let edge_props = if merged_edge_props.is_empty() {
312 None
313 } else {
314 Some(&merged_edge_props)
315 };
316
317 let schema = Self::build_schema(
319 input.schema(),
320 &target_variable,
321 edge_variable.as_deref(),
322 &edge_properties,
323 &target_properties,
324 label_props,
325 edge_props,
326 optional,
327 );
328
329 let properties = compute_plan_properties(schema.clone());
330
331 Self {
332 input,
333 source_column,
334 edge_type_ids,
335 direction,
336 target_variable,
337 edge_variable,
338 edge_properties,
339 target_properties,
340 target_label_name,
341 target_label_id,
342 graph_ctx,
343 optional,
344 optional_pattern_vars,
345 bound_target_column,
346 used_edge_columns,
347 schema,
348 properties,
349 metrics: ExecutionPlanMetricsSet::new(),
350 }
351 }
352
353 #[expect(
355 clippy::too_many_arguments,
356 reason = "Schema construction needs all field metadata"
357 )]
358 fn build_schema(
359 input_schema: SchemaRef,
360 target_variable: &str,
361 edge_variable: Option<&str>,
362 edge_properties: &[String],
363 target_properties: &[String],
364 label_props: Option<
365 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
366 >,
367 edge_props: Option<
368 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
369 >,
370 optional: bool,
371 ) -> SchemaRef {
372 let mut fields: Vec<Field> = input_schema
373 .fields()
374 .iter()
375 .map(|f| f.as_ref().clone())
376 .collect();
377
378 let target_vid_name = format!("{}._vid", target_variable);
380 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
381
382 fields.push(Field::new(
384 format!("{}._labels", target_variable),
385 labels_data_type(),
386 true,
387 ));
388
389 for prop_name in target_properties {
391 let col_name = format!("{}.{}", target_variable, prop_name);
392 let arrow_type = resolve_property_type(prop_name, label_props);
393 fields.push(Field::new(&col_name, arrow_type, true));
394 }
395
396 if let Some(edge_var) = edge_variable {
398 let edge_id_name = format!("{}._eid", edge_var);
399 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
400
401 fields.push(Field::new(
403 format!("{}._type", edge_var),
404 DataType::Utf8,
405 true,
406 ));
407
408 for prop_name in edge_properties {
410 let prop_col_name = format!("{}.{}", edge_var, prop_name);
411 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
412 fields.push(Field::new(&prop_col_name, arrow_type, true));
413 }
414 } else {
415 let internal_eid_name = format!("__eid_to_{}", target_variable);
418 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
419 }
420
421 Arc::new(Schema::new(fields))
422 }
423}
424
425impl DisplayAs for GraphTraverseExec {
426 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427 write!(
428 f,
429 "GraphTraverseExec: {} --[{:?}]--> {}",
430 self.source_column, self.edge_type_ids, self.target_variable
431 )?;
432 if let Some(ref edge_var) = self.edge_variable {
433 write!(f, " as {}", edge_var)?;
434 }
435 Ok(())
436 }
437}
438
439impl ExecutionPlan for GraphTraverseExec {
440 fn name(&self) -> &str {
441 "GraphTraverseExec"
442 }
443
444 fn as_any(&self) -> &dyn Any {
445 self
446 }
447
448 fn schema(&self) -> SchemaRef {
449 self.schema.clone()
450 }
451
452 fn properties(&self) -> &PlanProperties {
453 &self.properties
454 }
455
456 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
457 vec![&self.input]
458 }
459
460 fn with_new_children(
461 self: Arc<Self>,
462 children: Vec<Arc<dyn ExecutionPlan>>,
463 ) -> DFResult<Arc<dyn ExecutionPlan>> {
464 if children.len() != 1 {
465 return Err(datafusion::error::DataFusionError::Plan(
466 "GraphTraverseExec requires exactly one child".to_string(),
467 ));
468 }
469
470 Ok(Arc::new(Self::new(
471 children[0].clone(),
472 self.source_column.clone(),
473 self.edge_type_ids.clone(),
474 self.direction,
475 self.target_variable.clone(),
476 self.edge_variable.clone(),
477 self.edge_properties.clone(),
478 self.target_properties.clone(),
479 self.target_label_name.clone(),
480 self.target_label_id,
481 self.graph_ctx.clone(),
482 self.optional,
483 self.optional_pattern_vars.clone(),
484 self.bound_target_column.clone(),
485 self.used_edge_columns.clone(),
486 )))
487 }
488
489 fn execute(
490 &self,
491 partition: usize,
492 context: Arc<TaskContext>,
493 ) -> DFResult<SendableRecordBatchStream> {
494 let input_stream = self.input.execute(partition, context)?;
495
496 let metrics = BaselineMetrics::new(&self.metrics, partition);
497
498 let warm_fut = self
499 .graph_ctx
500 .warming_future(self.edge_type_ids.clone(), self.direction);
501
502 Ok(Box::pin(GraphTraverseStream {
503 input: input_stream,
504 source_column: self.source_column.clone(),
505 edge_type_ids: self.edge_type_ids.clone(),
506 direction: self.direction,
507 target_variable: self.target_variable.clone(),
508 edge_variable: self.edge_variable.clone(),
509 edge_properties: self.edge_properties.clone(),
510 target_properties: self.target_properties.clone(),
511 target_label_name: self.target_label_name.clone(),
512 graph_ctx: self.graph_ctx.clone(),
513 optional: self.optional,
514 optional_pattern_vars: self.optional_pattern_vars.clone(),
515 bound_target_column: self.bound_target_column.clone(),
516 used_edge_columns: self.used_edge_columns.clone(),
517 schema: self.schema.clone(),
518 state: TraverseStreamState::Warming(warm_fut),
519 metrics,
520 }))
521 }
522
523 fn metrics(&self) -> Option<MetricsSet> {
524 Some(self.metrics.clone_inner())
525 }
526}
527
528enum TraverseStreamState {
530 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
532 Reading,
534 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
536 Done,
538}
539
540struct GraphTraverseStream {
542 input: SendableRecordBatchStream,
544
545 source_column: String,
547
548 edge_type_ids: Vec<u32>,
550
551 direction: Direction,
553
554 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
556 target_variable: String,
557
558 edge_variable: Option<String>,
560
561 edge_properties: Vec<String>,
563
564 target_properties: Vec<String>,
566
567 target_label_name: Option<String>,
569
570 graph_ctx: Arc<GraphExecutionContext>,
572
573 optional: bool,
575
576 optional_pattern_vars: HashSet<String>,
578
579 bound_target_column: Option<String>,
581
582 used_edge_columns: Vec<String>,
584
585 schema: SchemaRef,
587
588 state: TraverseStreamState,
590
591 metrics: BaselineMetrics,
593}
594
595impl GraphTraverseStream {
596 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
599 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
600 datafusion::error::DataFusionError::Execution(format!(
601 "Source column '{}' not found",
602 self.source_column
603 ))
604 })?;
605
606 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
607 let source_vids: &UInt64Array = &source_vid_cow;
608
609 let bound_target_cow = self
612 .bound_target_column
613 .as_ref()
614 .and_then(|col| batch.column_by_name(col))
615 .map(|c| column_as_vid_array(c.as_ref()))
616 .transpose()?;
617 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
618
619 let used_edge_arrays: Vec<&UInt64Array> = self
621 .used_edge_columns
622 .iter()
623 .filter_map(|col| {
624 batch
625 .column_by_name(col)
626 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
627 })
628 .collect();
629
630 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
631 let is_undirected = matches!(self.direction, Direction::Both);
632
633 for (row_idx, source_vid) in source_vids.iter().enumerate() {
634 let Some(src) = source_vid else {
635 continue;
636 };
637
638 let expected_target = bound_target_vids.map(|arr| {
644 if arr.is_null(row_idx) {
645 None
646 } else {
647 Some(arr.value(row_idx))
648 }
649 });
650
651 let used_eids: HashSet<u64> = used_edge_arrays
653 .iter()
654 .filter_map(|arr| {
655 if arr.is_null(row_idx) {
656 None
657 } else {
658 Some(arr.value(row_idx))
659 }
660 })
661 .collect();
662
663 let vid = Vid::from(src);
664 let mut seen_edges: HashSet<u64> = HashSet::new();
667
668 for &edge_type in &self.edge_type_ids {
669 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
670
671 for (target_vid, eid) in neighbors {
672 let eid_u64 = eid.as_u64();
673
674 if used_eids.contains(&eid_u64) {
676 continue;
677 }
678
679 if is_undirected && !seen_edges.insert(eid_u64) {
681 continue;
682 }
683
684 if let Some(expected_opt) = expected_target {
687 let Some(expected) = expected_opt else {
688 continue;
689 };
690 if target_vid.as_u64() != expected {
691 continue;
692 }
693 }
694
695 if let Some(ref label_name) = self.target_label_name {
698 let query_ctx = self.graph_ctx.query_context();
699 if let Some(vertex_labels) =
700 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
701 {
702 if !vertex_labels.contains(label_name) {
704 continue;
705 }
706 }
707 }
709
710 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
711 }
712 }
713 }
714
715 Ok(expanded_rows)
716 }
717}
718
719fn build_target_labels_column(
721 target_vids: &[Vid],
722 target_label_name: &Option<String>,
723 graph_ctx: &GraphExecutionContext,
724) -> ArrayRef {
725 use arrow_array::builder::{ListBuilder, StringBuilder};
726 let mut labels_builder = ListBuilder::new(StringBuilder::new());
727 let query_ctx = graph_ctx.query_context();
728 for vid in target_vids {
729 let row_labels: Vec<String> =
730 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
731 Some(labels) => labels,
732 None => {
733 if let Some(label_name) = target_label_name {
735 vec![label_name.clone()]
736 } else {
737 vec![]
738 }
739 }
740 };
741 let values = labels_builder.values();
742 for lbl in &row_labels {
743 values.append_value(lbl);
744 }
745 labels_builder.append(true);
746 }
747 Arc::new(labels_builder.finish())
748}
749
750async fn build_target_property_columns(
752 target_vids: &[Vid],
753 target_properties: &[String],
754 target_label_name: &Option<String>,
755 graph_ctx: &Arc<GraphExecutionContext>,
756) -> DFResult<Vec<ArrayRef>> {
757 let mut columns = Vec::new();
758
759 if let Some(label_name) = target_label_name {
760 let property_manager = graph_ctx.property_manager();
761 let query_ctx = graph_ctx.query_context();
762
763 let props_map = property_manager
764 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
765 .await
766 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
767
768 let uni_schema = graph_ctx.storage().schema_manager().schema();
769 let label_props = uni_schema.properties.get(label_name.as_str());
770
771 for prop_name in target_properties {
772 let data_type = resolve_property_type(prop_name, label_props);
773 let column =
774 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
775 columns.push(column);
776 }
777 } else {
778 let non_internal_props: Vec<&str> = target_properties
780 .iter()
781 .filter(|p| *p != "_all_props")
782 .map(|s| s.as_str())
783 .collect();
784 let property_manager = graph_ctx.property_manager();
785 let query_ctx = graph_ctx.query_context();
786
787 let props_map = if !non_internal_props.is_empty() {
788 property_manager
789 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
790 .await
791 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
792 } else {
793 std::collections::HashMap::new()
794 };
795
796 for prop_name in target_properties {
797 if prop_name == "_all_props" {
798 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
799 } else {
800 let column = build_property_column_static(
801 target_vids,
802 &props_map,
803 prop_name,
804 &arrow::datatypes::DataType::LargeBinary,
805 )?;
806 columns.push(column);
807 }
808 }
809 }
810
811 Ok(columns)
812}
813
814fn build_all_props_column(
816 target_vids: &[Vid],
817 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
818 graph_ctx: &Arc<GraphExecutionContext>,
819) -> ArrayRef {
820 use crate::query::df_graph::scan::encode_cypher_value;
821 use arrow_array::builder::LargeBinaryBuilder;
822
823 let mut builder = LargeBinaryBuilder::new();
824 let l0_ctx = graph_ctx.l0_context();
825 for vid in target_vids {
826 let mut merged_props = serde_json::Map::new();
827 if let Some(vid_props) = props_map.get(vid) {
828 for (k, v) in vid_props.iter() {
829 let json_val: serde_json::Value = v.clone().into();
830 merged_props.insert(k.to_string(), json_val);
831 }
832 }
833 for l0 in l0_ctx.iter_l0_buffers() {
834 let guard = l0.read();
835 if let Some(l0_props) = guard.vertex_properties.get(vid) {
836 for (k, v) in l0_props.iter() {
837 let json_val: serde_json::Value = v.clone().into();
838 merged_props.insert(k.to_string(), json_val);
839 }
840 }
841 }
842 if merged_props.is_empty() {
843 builder.append_null();
844 } else {
845 let json = serde_json::Value::Object(merged_props);
846 match encode_cypher_value(&json) {
847 Ok(bytes) => builder.append_value(bytes),
848 Err(_) => builder.append_null(),
849 }
850 }
851 }
852 Arc::new(builder.finish())
853}
854
855async fn build_edge_columns(
857 expansions: &[(usize, Vid, u64, u32)],
858 edge_properties: &[String],
859 edge_type_ids: &[u32],
860 graph_ctx: &Arc<GraphExecutionContext>,
861) -> DFResult<Vec<ArrayRef>> {
862 let mut columns = Vec::new();
863
864 let eids: Vec<Eid> = expansions
865 .iter()
866 .map(|(_, _, eid, _)| Eid::from(*eid))
867 .collect();
868 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
869 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
870
871 {
873 let uni_schema = graph_ctx.storage().schema_manager().schema();
874 let mut type_builder = arrow_array::builder::StringBuilder::new();
875 for (_, _, _, edge_type_id) in expansions {
876 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
877 type_builder.append_value(&name);
878 } else {
879 type_builder.append_null();
880 }
881 }
882 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
883 }
884
885 if !edge_properties.is_empty() {
886 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
887 let property_manager = graph_ctx.property_manager();
888 let query_ctx = graph_ctx.query_context();
889
890 let props_map = property_manager
891 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
892 .await
893 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
894
895 let uni_schema = graph_ctx.storage().schema_manager().schema();
896 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
897 let edge_type_props = if merged_edge_props.is_empty() {
898 None
899 } else {
900 Some(&merged_edge_props)
901 };
902
903 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
904
905 for prop_name in edge_properties {
906 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
907 let column =
908 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
909 columns.push(column);
910 }
911 }
912
913 Ok(columns)
914}
915
916#[expect(
921 clippy::too_many_arguments,
922 reason = "Standalone async fn needs all context passed explicitly"
923)]
924async fn build_traverse_output_batch(
925 input: RecordBatch,
926 expansions: Vec<(usize, Vid, u64, u32)>,
927 schema: SchemaRef,
928 edge_variable: Option<String>,
929 edge_properties: Vec<String>,
930 edge_type_ids: Vec<u32>,
931 target_properties: Vec<String>,
932 target_label_name: Option<String>,
933 graph_ctx: Arc<GraphExecutionContext>,
934 optional: bool,
935 optional_pattern_vars: HashSet<String>,
936) -> DFResult<RecordBatch> {
937 if expansions.is_empty() {
938 if !optional {
939 return Ok(RecordBatch::new_empty(schema));
940 }
941 let unmatched_reps = collect_unmatched_optional_group_rows(
942 &input,
943 &HashSet::new(),
944 &schema,
945 &optional_pattern_vars,
946 )?;
947 if unmatched_reps.is_empty() {
948 return Ok(RecordBatch::new_empty(schema));
949 }
950 return build_optional_null_batch_for_rows_with_optional_vars(
951 &input,
952 &unmatched_reps,
953 &schema,
954 &optional_pattern_vars,
955 );
956 }
957
958 let indices: Vec<u64> = expansions
960 .iter()
961 .map(|(idx, _, _, _)| *idx as u64)
962 .collect();
963 let indices_array = UInt64Array::from(indices);
964 let mut columns: Vec<ArrayRef> = input
965 .columns()
966 .iter()
967 .map(|col| take(col.as_ref(), &indices_array, None))
968 .collect::<Result<_, _>>()?;
969
970 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
972 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
973 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
974
975 columns.push(build_target_labels_column(
977 &target_vids,
978 &target_label_name,
979 &graph_ctx,
980 ));
981
982 if !target_properties.is_empty() {
984 let prop_cols = build_target_property_columns(
985 &target_vids,
986 &target_properties,
987 &target_label_name,
988 &graph_ctx,
989 )
990 .await?;
991 columns.extend(prop_cols);
992 }
993
994 if edge_variable.is_some() {
996 let edge_cols =
997 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
998 columns.extend(edge_cols);
999 } else {
1000 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1001 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1002 }
1003
1004 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1005
1006 if optional {
1008 let matched_indices: HashSet<usize> =
1009 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1010 let unmatched = collect_unmatched_optional_group_rows(
1011 &input,
1012 &matched_indices,
1013 &schema,
1014 &optional_pattern_vars,
1015 )?;
1016
1017 if !unmatched.is_empty() {
1018 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1019 &input,
1020 &unmatched,
1021 &schema,
1022 &optional_pattern_vars,
1023 )?;
1024 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1025 .map_err(arrow_err)?;
1026 return Ok(combined);
1027 }
1028 }
1029
1030 Ok(expanded_batch)
1031}
1032
1033fn build_optional_null_batch_for_rows(
1036 input: &RecordBatch,
1037 unmatched_indices: &[usize],
1038 schema: &SchemaRef,
1039) -> DFResult<RecordBatch> {
1040 let num_rows = unmatched_indices.len();
1041 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1042 let indices_array = UInt64Array::from(indices);
1043
1044 let mut columns: Vec<ArrayRef> = Vec::new();
1046 for col in input.columns() {
1047 let taken = take(col.as_ref(), &indices_array, None)?;
1048 columns.push(taken);
1049 }
1050 for field in schema.fields().iter().skip(input.num_columns()) {
1052 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1053 }
1054 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1055}
1056
1057fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1058 optional_vars.contains(col_name)
1059 || optional_vars.iter().any(|var| {
1060 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1061 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1062 })
1063}
1064
1065fn collect_unmatched_optional_group_rows(
1066 input: &RecordBatch,
1067 matched_indices: &HashSet<usize>,
1068 schema: &SchemaRef,
1069 optional_vars: &HashSet<String>,
1070) -> DFResult<Vec<usize>> {
1071 if input.num_rows() == 0 {
1072 return Ok(Vec::new());
1073 }
1074
1075 if optional_vars.is_empty() {
1076 return Ok((0..input.num_rows())
1077 .filter(|idx| !matched_indices.contains(idx))
1078 .collect());
1079 }
1080
1081 let source_vid_indices: Vec<usize> = schema
1082 .fields()
1083 .iter()
1084 .enumerate()
1085 .filter_map(|(idx, field)| {
1086 if idx >= input.num_columns() {
1087 return None;
1088 }
1089 let name = field.name();
1090 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1091 Some(idx)
1092 } else {
1093 None
1094 }
1095 })
1096 .collect();
1097
1098 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1101
1102 for row_idx in 0..input.num_rows() {
1103 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1104 let entry = groups.entry(key.clone());
1105 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1106 group_order.push(key.clone());
1107 }
1108 let matched = matched_indices.contains(&row_idx);
1109 entry
1110 .and_modify(|(_, any_matched)| *any_matched |= matched)
1111 .or_insert((row_idx, matched));
1112 }
1113
1114 Ok(group_order
1115 .into_iter()
1116 .filter_map(|key| {
1117 groups
1118 .get(&key)
1119 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1120 })
1121 .collect())
1122}
1123
1124fn compute_optional_group_key(
1125 batch: &RecordBatch,
1126 row_idx: usize,
1127 source_vid_indices: &[usize],
1128) -> DFResult<Vec<u8>> {
1129 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1130 for &col_idx in source_vid_indices {
1131 let col = batch.column(col_idx);
1132 let vid_cow = column_as_vid_array(col.as_ref())?;
1133 let arr: &UInt64Array = &vid_cow;
1134 if arr.is_null(row_idx) {
1135 key.extend_from_slice(&u64::MAX.to_le_bytes());
1136 } else {
1137 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1138 }
1139 }
1140 Ok(key)
1141}
1142
1143fn build_optional_null_batch_for_rows_with_optional_vars(
1144 input: &RecordBatch,
1145 unmatched_indices: &[usize],
1146 schema: &SchemaRef,
1147 optional_vars: &HashSet<String>,
1148) -> DFResult<RecordBatch> {
1149 if optional_vars.is_empty() {
1150 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1151 }
1152
1153 let num_rows = unmatched_indices.len();
1154 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1155 let indices_array = UInt64Array::from(indices);
1156
1157 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1158 for (col_idx, field) in schema.fields().iter().enumerate() {
1159 if col_idx < input.num_columns() {
1160 if is_optional_column_for_vars(field.name(), optional_vars) {
1161 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1162 } else {
1163 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1164 columns.push(taken);
1165 }
1166 } else {
1167 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1168 }
1169 }
1170
1171 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1172}
1173
1174impl Stream for GraphTraverseStream {
1175 type Item = DFResult<RecordBatch>;
1176
1177 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1178 loop {
1179 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1180
1181 match state {
1182 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1183 Poll::Ready(Ok(())) => {
1184 self.state = TraverseStreamState::Reading;
1185 }
1187 Poll::Ready(Err(e)) => {
1188 self.state = TraverseStreamState::Done;
1189 return Poll::Ready(Some(Err(e)));
1190 }
1191 Poll::Pending => {
1192 self.state = TraverseStreamState::Warming(fut);
1193 return Poll::Pending;
1194 }
1195 },
1196 TraverseStreamState::Reading => {
1197 if let Err(e) = self.graph_ctx.check_timeout() {
1199 return Poll::Ready(Some(Err(
1200 datafusion::error::DataFusionError::Execution(e.to_string()),
1201 )));
1202 }
1203
1204 match self.input.poll_next_unpin(cx) {
1205 Poll::Ready(Some(Ok(batch))) => {
1206 let expansions = match self.expand_neighbors(&batch) {
1208 Ok(exp) => exp,
1209 Err(e) => {
1210 self.state = TraverseStreamState::Reading;
1211 return Poll::Ready(Some(Err(e)));
1212 }
1213 };
1214
1215 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1217 {
1218 let result = build_traverse_output_batch_sync(
1219 &batch,
1220 &expansions,
1221 &self.schema,
1222 self.edge_variable.as_ref(),
1223 &self.graph_ctx,
1224 self.optional,
1225 &self.optional_pattern_vars,
1226 );
1227 self.state = TraverseStreamState::Reading;
1228 if let Ok(ref r) = result {
1229 self.metrics.record_output(r.num_rows());
1230 }
1231 return Poll::Ready(Some(result));
1232 }
1233
1234 let schema = self.schema.clone();
1236 let edge_variable = self.edge_variable.clone();
1237 let edge_properties = self.edge_properties.clone();
1238 let edge_type_ids = self.edge_type_ids.clone();
1239 let target_properties = self.target_properties.clone();
1240 let target_label_name = self.target_label_name.clone();
1241 let graph_ctx = self.graph_ctx.clone();
1242
1243 let optional = self.optional;
1244 let optional_pattern_vars = self.optional_pattern_vars.clone();
1245
1246 let fut = build_traverse_output_batch(
1247 batch,
1248 expansions,
1249 schema,
1250 edge_variable,
1251 edge_properties,
1252 edge_type_ids,
1253 target_properties,
1254 target_label_name,
1255 graph_ctx,
1256 optional,
1257 optional_pattern_vars,
1258 );
1259
1260 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1261 }
1263 Poll::Ready(Some(Err(e))) => {
1264 self.state = TraverseStreamState::Done;
1265 return Poll::Ready(Some(Err(e)));
1266 }
1267 Poll::Ready(None) => {
1268 self.state = TraverseStreamState::Done;
1269 return Poll::Ready(None);
1270 }
1271 Poll::Pending => {
1272 self.state = TraverseStreamState::Reading;
1273 return Poll::Pending;
1274 }
1275 }
1276 }
1277 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1278 Poll::Ready(Ok(batch)) => {
1279 self.state = TraverseStreamState::Reading;
1280 self.metrics.record_output(batch.num_rows());
1281 return Poll::Ready(Some(Ok(batch)));
1282 }
1283 Poll::Ready(Err(e)) => {
1284 self.state = TraverseStreamState::Done;
1285 return Poll::Ready(Some(Err(e)));
1286 }
1287 Poll::Pending => {
1288 self.state = TraverseStreamState::Materializing(fut);
1289 return Poll::Pending;
1290 }
1291 },
1292 TraverseStreamState::Done => {
1293 return Poll::Ready(None);
1294 }
1295 }
1296 }
1297 }
1298}
1299
1300fn build_traverse_output_batch_sync(
1305 input: &RecordBatch,
1306 expansions: &[(usize, Vid, u64, u32)],
1307 schema: &SchemaRef,
1308 edge_variable: Option<&String>,
1309 graph_ctx: &GraphExecutionContext,
1310 optional: bool,
1311 optional_pattern_vars: &HashSet<String>,
1312) -> DFResult<RecordBatch> {
1313 if expansions.is_empty() {
1314 if !optional {
1315 return Ok(RecordBatch::new_empty(schema.clone()));
1316 }
1317 let unmatched_reps = collect_unmatched_optional_group_rows(
1318 input,
1319 &HashSet::new(),
1320 schema,
1321 optional_pattern_vars,
1322 )?;
1323 if unmatched_reps.is_empty() {
1324 return Ok(RecordBatch::new_empty(schema.clone()));
1325 }
1326 return build_optional_null_batch_for_rows_with_optional_vars(
1327 input,
1328 &unmatched_reps,
1329 schema,
1330 optional_pattern_vars,
1331 );
1332 }
1333
1334 let indices: Vec<u64> = expansions
1335 .iter()
1336 .map(|(idx, _, _, _)| *idx as u64)
1337 .collect();
1338 let indices_array = UInt64Array::from(indices);
1339
1340 let mut columns: Vec<ArrayRef> = Vec::new();
1341 for col in input.columns() {
1342 let expanded = take(col.as_ref(), &indices_array, None)?;
1343 columns.push(expanded);
1344 }
1345
1346 let target_vids: Vec<u64> = expansions
1348 .iter()
1349 .map(|(_, vid, _, _)| vid.as_u64())
1350 .collect();
1351 columns.push(Arc::new(UInt64Array::from(target_vids)));
1352
1353 {
1355 use arrow_array::builder::{ListBuilder, StringBuilder};
1356 let l0_ctx = graph_ctx.l0_context();
1357 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1358 for (_, vid, _, _) in expansions {
1359 let mut row_labels: Vec<String> = Vec::new();
1360 for l0 in l0_ctx.iter_l0_buffers() {
1361 let guard = l0.read();
1362 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1363 for lbl in l0_labels {
1364 if !row_labels.contains(lbl) {
1365 row_labels.push(lbl.clone());
1366 }
1367 }
1368 }
1369 }
1370 let values = labels_builder.values();
1371 for lbl in &row_labels {
1372 values.append_value(lbl);
1373 }
1374 labels_builder.append(true);
1375 }
1376 columns.push(Arc::new(labels_builder.finish()));
1377 }
1378
1379 if edge_variable.is_some() {
1381 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1382 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1383
1384 let uni_schema = graph_ctx.storage().schema_manager().schema();
1386 let mut type_builder = arrow_array::builder::StringBuilder::new();
1387 for (_, _, _, edge_type_id) in expansions {
1388 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1389 type_builder.append_value(&name);
1390 } else {
1391 type_builder.append_null();
1392 }
1393 }
1394 columns.push(Arc::new(type_builder.finish()));
1395 } else {
1396 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1398 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1399 }
1400
1401 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1402
1403 if optional {
1404 let matched_indices: HashSet<usize> =
1405 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1406 let unmatched = collect_unmatched_optional_group_rows(
1407 input,
1408 &matched_indices,
1409 schema,
1410 optional_pattern_vars,
1411 )?;
1412
1413 if !unmatched.is_empty() {
1414 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1415 input,
1416 &unmatched,
1417 schema,
1418 optional_pattern_vars,
1419 )?;
1420 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1421 .map_err(|e| {
1422 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1423 })?;
1424 return Ok(combined);
1425 }
1426 }
1427
1428 Ok(expanded_batch)
1429}
1430
1431impl RecordBatchStream for GraphTraverseStream {
1432 fn schema(&self) -> SchemaRef {
1433 self.schema.clone()
1434 }
1435}
1436
1437type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1439
1440pub struct GraphTraverseMainExec {
1463 input: Arc<dyn ExecutionPlan>,
1465
1466 source_column: String,
1468
1469 type_names: Vec<String>,
1472
1473 direction: Direction,
1475
1476 target_variable: String,
1478
1479 edge_variable: Option<String>,
1481
1482 edge_properties: Vec<String>,
1484
1485 target_properties: Vec<String>,
1487
1488 graph_ctx: Arc<GraphExecutionContext>,
1490
1491 optional: bool,
1493
1494 optional_pattern_vars: HashSet<String>,
1496
1497 bound_target_column: Option<String>,
1500
1501 used_edge_columns: Vec<String>,
1504
1505 schema: SchemaRef,
1507
1508 properties: PlanProperties,
1510
1511 metrics: ExecutionPlanMetricsSet,
1513}
1514
1515impl fmt::Debug for GraphTraverseMainExec {
1516 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1517 f.debug_struct("GraphTraverseMainExec")
1518 .field("type_names", &self.type_names)
1519 .field("direction", &self.direction)
1520 .field("target_variable", &self.target_variable)
1521 .field("edge_variable", &self.edge_variable)
1522 .finish()
1523 }
1524}
1525
1526impl GraphTraverseMainExec {
1527 #[expect(clippy::too_many_arguments)]
1529 pub fn new(
1530 input: Arc<dyn ExecutionPlan>,
1531 source_column: impl Into<String>,
1532 type_names: Vec<String>,
1533 direction: Direction,
1534 target_variable: impl Into<String>,
1535 edge_variable: Option<String>,
1536 edge_properties: Vec<String>,
1537 target_properties: Vec<String>,
1538 graph_ctx: Arc<GraphExecutionContext>,
1539 optional: bool,
1540 optional_pattern_vars: HashSet<String>,
1541 bound_target_column: Option<String>,
1542 used_edge_columns: Vec<String>,
1543 ) -> Self {
1544 let source_column = source_column.into();
1545 let target_variable = target_variable.into();
1546
1547 let schema = Self::build_schema(
1549 &input.schema(),
1550 &target_variable,
1551 &edge_variable,
1552 &edge_properties,
1553 &target_properties,
1554 optional,
1555 );
1556
1557 let properties = compute_plan_properties(schema.clone());
1558
1559 Self {
1560 input,
1561 source_column,
1562 type_names,
1563 direction,
1564 target_variable,
1565 edge_variable,
1566 edge_properties,
1567 target_properties,
1568 graph_ctx,
1569 optional,
1570 optional_pattern_vars,
1571 bound_target_column,
1572 used_edge_columns,
1573 schema,
1574 properties,
1575 metrics: ExecutionPlanMetricsSet::new(),
1576 }
1577 }
1578
1579 fn build_schema(
1581 input_schema: &SchemaRef,
1582 target_variable: &str,
1583 edge_variable: &Option<String>,
1584 edge_properties: &[String],
1585 target_properties: &[String],
1586 optional: bool,
1587 ) -> SchemaRef {
1588 let mut fields: Vec<Field> = input_schema
1589 .fields()
1590 .iter()
1591 .map(|f| f.as_ref().clone())
1592 .collect();
1593
1594 let target_vid_name = format!("{}._vid", target_variable);
1596 if input_schema.column_with_name(&target_vid_name).is_none() {
1597 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1598 }
1599
1600 let target_labels_name = format!("{}._labels", target_variable);
1602 if input_schema.column_with_name(&target_labels_name).is_none() {
1603 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1604 }
1605
1606 if let Some(edge_var) = edge_variable {
1608 fields.push(Field::new(
1609 format!("{}._eid", edge_var),
1610 DataType::UInt64,
1611 optional,
1612 ));
1613
1614 fields.push(Field::new(
1616 format!("{}._type", edge_var),
1617 DataType::Utf8,
1618 true,
1619 ));
1620
1621 for prop in edge_properties {
1625 let col_name = format!("{}.{}", edge_var, prop);
1626 let mut metadata = std::collections::HashMap::new();
1627 metadata.insert("cv_encoded".to_string(), "true".to_string());
1628 fields.push(
1629 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1630 );
1631 }
1632 } else {
1633 fields.push(Field::new(
1636 format!("__eid_to_{}", target_variable),
1637 DataType::UInt64,
1638 optional,
1639 ));
1640 }
1641
1642 for prop in target_properties {
1644 fields.push(Field::new(
1645 format!("{}.{}", target_variable, prop),
1646 DataType::LargeBinary,
1647 true,
1648 ));
1649 }
1650
1651 Arc::new(Schema::new(fields))
1652 }
1653}
1654
1655impl DisplayAs for GraphTraverseMainExec {
1656 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1657 write!(
1658 f,
1659 "GraphTraverseMainExec: types={:?}, direction={:?}",
1660 self.type_names, self.direction
1661 )
1662 }
1663}
1664
1665impl ExecutionPlan for GraphTraverseMainExec {
1666 fn name(&self) -> &str {
1667 "GraphTraverseMainExec"
1668 }
1669
1670 fn as_any(&self) -> &dyn Any {
1671 self
1672 }
1673
1674 fn schema(&self) -> SchemaRef {
1675 self.schema.clone()
1676 }
1677
1678 fn properties(&self) -> &PlanProperties {
1679 &self.properties
1680 }
1681
1682 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1683 vec![&self.input]
1684 }
1685
1686 fn with_new_children(
1687 self: Arc<Self>,
1688 children: Vec<Arc<dyn ExecutionPlan>>,
1689 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1690 if children.len() != 1 {
1691 return Err(datafusion::error::DataFusionError::Plan(
1692 "GraphTraverseMainExec expects exactly one child".to_string(),
1693 ));
1694 }
1695
1696 Ok(Arc::new(Self {
1697 input: children[0].clone(),
1698 source_column: self.source_column.clone(),
1699 type_names: self.type_names.clone(),
1700 direction: self.direction,
1701 target_variable: self.target_variable.clone(),
1702 edge_variable: self.edge_variable.clone(),
1703 edge_properties: self.edge_properties.clone(),
1704 target_properties: self.target_properties.clone(),
1705 graph_ctx: self.graph_ctx.clone(),
1706 optional: self.optional,
1707 optional_pattern_vars: self.optional_pattern_vars.clone(),
1708 bound_target_column: self.bound_target_column.clone(),
1709 used_edge_columns: self.used_edge_columns.clone(),
1710 schema: self.schema.clone(),
1711 properties: self.properties.clone(),
1712 metrics: self.metrics.clone(),
1713 }))
1714 }
1715
1716 fn execute(
1717 &self,
1718 partition: usize,
1719 context: Arc<TaskContext>,
1720 ) -> DFResult<SendableRecordBatchStream> {
1721 let input_stream = self.input.execute(partition, context)?;
1722 let metrics = BaselineMetrics::new(&self.metrics, partition);
1723
1724 Ok(Box::pin(GraphTraverseMainStream::new(
1725 input_stream,
1726 self.source_column.clone(),
1727 self.type_names.clone(),
1728 self.direction,
1729 self.target_variable.clone(),
1730 self.edge_variable.clone(),
1731 self.edge_properties.clone(),
1732 self.target_properties.clone(),
1733 self.graph_ctx.clone(),
1734 self.optional,
1735 self.optional_pattern_vars.clone(),
1736 self.bound_target_column.clone(),
1737 self.used_edge_columns.clone(),
1738 self.schema.clone(),
1739 metrics,
1740 )))
1741 }
1742
1743 fn metrics(&self) -> Option<MetricsSet> {
1744 Some(self.metrics.clone_inner())
1745 }
1746}
1747
1748enum GraphTraverseMainState {
1750 LoadingEdges {
1752 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1753 input_stream: SendableRecordBatchStream,
1754 },
1755 Processing {
1757 adjacency: EdgeAdjacencyMap,
1758 input_stream: SendableRecordBatchStream,
1759 },
1760 Done,
1762}
1763
1764struct GraphTraverseMainStream {
1766 source_column: String,
1768
1769 target_variable: String,
1771
1772 edge_variable: Option<String>,
1774
1775 edge_properties: Vec<String>,
1777
1778 target_properties: Vec<String>,
1780
1781 graph_ctx: Arc<GraphExecutionContext>,
1783
1784 optional: bool,
1786
1787 optional_pattern_vars: HashSet<String>,
1789
1790 bound_target_column: Option<String>,
1792
1793 used_edge_columns: Vec<String>,
1795
1796 schema: SchemaRef,
1798
1799 state: GraphTraverseMainState,
1801
1802 metrics: BaselineMetrics,
1804}
1805
1806impl GraphTraverseMainStream {
1807 #[expect(clippy::too_many_arguments)]
1809 fn new(
1810 input_stream: SendableRecordBatchStream,
1811 source_column: String,
1812 type_names: Vec<String>,
1813 direction: Direction,
1814 target_variable: String,
1815 edge_variable: Option<String>,
1816 edge_properties: Vec<String>,
1817 target_properties: Vec<String>,
1818 graph_ctx: Arc<GraphExecutionContext>,
1819 optional: bool,
1820 optional_pattern_vars: HashSet<String>,
1821 bound_target_column: Option<String>,
1822 used_edge_columns: Vec<String>,
1823 schema: SchemaRef,
1824 metrics: BaselineMetrics,
1825 ) -> Self {
1826 let loading_ctx = graph_ctx.clone();
1828 let loading_types = type_names.clone();
1829 let fut =
1830 async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1831
1832 Self {
1833 source_column,
1834 target_variable,
1835 edge_variable,
1836 edge_properties,
1837 target_properties,
1838 graph_ctx,
1839 optional,
1840 optional_pattern_vars,
1841 bound_target_column,
1842 used_edge_columns,
1843 schema,
1844 state: GraphTraverseMainState::LoadingEdges {
1845 future: Box::pin(fut),
1846 input_stream,
1847 },
1848 metrics,
1849 }
1850 }
1851
1852 fn expand_batch(
1854 &self,
1855 input: &RecordBatch,
1856 adjacency: &EdgeAdjacencyMap,
1857 ) -> DFResult<RecordBatch> {
1858 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1860 datafusion::error::DataFusionError::Execution(format!(
1861 "Source column {} not found",
1862 self.source_column
1863 ))
1864 })?;
1865
1866 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1867 let source_vids: &UInt64Array = &source_vid_cow;
1868
1869 let bound_target_cow = self
1871 .bound_target_column
1872 .as_ref()
1873 .and_then(|col| input.column_by_name(col))
1874 .map(|c| column_as_vid_array(c.as_ref()))
1875 .transpose()?;
1876 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1877
1878 let used_edge_arrays: Vec<&UInt64Array> = self
1880 .used_edge_columns
1881 .iter()
1882 .filter_map(|col| {
1883 input
1884 .column_by_name(col)
1885 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1886 })
1887 .collect();
1888
1889 type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1891 let mut expansions: Vec<Expansion> = Vec::new();
1892
1893 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1894 if let Some(src_u64) = src_u64 {
1895 let src_vid = Vid::from(src_u64);
1896
1897 let used_eids: HashSet<u64> = used_edge_arrays
1899 .iter()
1900 .filter_map(|arr| {
1901 if arr.is_null(row_idx) {
1902 None
1903 } else {
1904 Some(arr.value(row_idx))
1905 }
1906 })
1907 .collect();
1908
1909 if let Some(neighbors) = adjacency.get(&src_vid) {
1910 for (target_vid, eid, edge_type, props) in neighbors {
1911 if used_eids.contains(&eid.as_u64()) {
1913 continue;
1914 }
1915
1916 if let Some(targets) = expected_targets {
1919 if targets.is_null(row_idx) {
1920 continue;
1921 }
1922 let expected_vid = targets.value(row_idx);
1923 if target_vid.as_u64() != expected_vid {
1924 continue;
1925 }
1926 }
1927
1928 expansions.push((
1929 row_idx,
1930 *target_vid,
1931 *eid,
1932 edge_type.clone(),
1933 props.clone(),
1934 ));
1935 }
1936 }
1937 }
1938 }
1939
1940 if expansions.is_empty() && self.optional {
1942 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1944 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1945 }
1946
1947 if expansions.is_empty() {
1948 return Ok(RecordBatch::new_empty(self.schema.clone()));
1950 }
1951
1952 let matched_rows: HashSet<usize> = if self.optional {
1954 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1955 } else {
1956 HashSet::new()
1957 };
1958
1959 let mut columns: Vec<ArrayRef> = Vec::new();
1961 let indices: Vec<u64> = expansions
1962 .iter()
1963 .map(|(idx, _, _, _, _)| *idx as u64)
1964 .collect();
1965 let indices_array = UInt64Array::from(indices);
1966
1967 for col in input.columns() {
1968 let expanded = take(col.as_ref(), &indices_array, None)?;
1969 columns.push(expanded);
1970 }
1971
1972 let target_vid_name = format!("{}._vid", self.target_variable);
1974 let target_vids: Vec<u64> = expansions
1975 .iter()
1976 .map(|(_, vid, _, _, _)| vid.as_u64())
1977 .collect();
1978 if input.schema().column_with_name(&target_vid_name).is_none() {
1979 columns.push(Arc::new(UInt64Array::from(target_vids)));
1980 }
1981
1982 let target_labels_name = format!("{}._labels", self.target_variable);
1984 if input
1985 .schema()
1986 .column_with_name(&target_labels_name)
1987 .is_none()
1988 {
1989 use arrow_array::builder::{ListBuilder, StringBuilder};
1990 let l0_ctx = self.graph_ctx.l0_context();
1991 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1992 for (_, target_vid, _, _, _) in &expansions {
1993 let mut row_labels: Vec<String> = Vec::new();
1994 for l0 in l0_ctx.iter_l0_buffers() {
1995 let guard = l0.read();
1996 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
1997 for lbl in l0_labels {
1998 if !row_labels.contains(lbl) {
1999 row_labels.push(lbl.clone());
2000 }
2001 }
2002 }
2003 }
2004 let values = labels_builder.values();
2005 for lbl in &row_labels {
2006 values.append_value(lbl);
2007 }
2008 labels_builder.append(true);
2009 }
2010 columns.push(Arc::new(labels_builder.finish()));
2011 }
2012
2013 if self.edge_variable.is_some() {
2016 let eids: Vec<u64> = expansions
2018 .iter()
2019 .map(|(_, _, eid, _, _)| eid.as_u64())
2020 .collect();
2021 columns.push(Arc::new(UInt64Array::from(eids)));
2022
2023 {
2025 let mut type_builder = arrow_array::builder::StringBuilder::new();
2026 for (_, _, _, edge_type, _) in &expansions {
2027 type_builder.append_value(edge_type);
2028 }
2029 columns.push(Arc::new(type_builder.finish()));
2030 }
2031
2032 for prop_name in &self.edge_properties {
2034 use crate::query::df_graph::scan::encode_cypher_value;
2035 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2036 if prop_name == "_all_props" {
2037 for (_, _, _, _, props) in &expansions {
2039 if props.is_empty() {
2040 builder.append_null();
2041 } else {
2042 let mut json_map = serde_json::Map::new();
2043 for (k, v) in props.iter() {
2044 let json_val: serde_json::Value = v.clone().into();
2045 json_map.insert(k.clone(), json_val);
2046 }
2047 let json = serde_json::Value::Object(json_map);
2048 match encode_cypher_value(&json) {
2049 Ok(bytes) => builder.append_value(bytes),
2050 Err(_) => builder.append_null(),
2051 }
2052 }
2053 }
2054 } else {
2055 for (_, _, _, _, props) in &expansions {
2057 match props.get(prop_name) {
2058 Some(uni_common::Value::Null) | None => builder.append_null(),
2059 Some(val) => {
2060 let json_val: serde_json::Value = val.clone().into();
2061 match encode_cypher_value(&json_val) {
2062 Ok(bytes) => builder.append_value(bytes),
2063 Err(_) => builder.append_null(),
2064 }
2065 }
2066 }
2067 }
2068 }
2069 columns.push(Arc::new(builder.finish()));
2070 }
2071 } else {
2072 let eids: Vec<u64> = expansions
2073 .iter()
2074 .map(|(_, _, eid, _, _)| eid.as_u64())
2075 .collect();
2076 columns.push(Arc::new(UInt64Array::from(eids)));
2077 }
2078
2079 {
2081 use crate::query::df_graph::scan::encode_cypher_value;
2082 let l0_ctx = self.graph_ctx.l0_context();
2083
2084 for prop_name in &self.target_properties {
2085 if prop_name == "_all_props" {
2086 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2088 for (_, target_vid, _, _, _) in &expansions {
2089 let mut merged_props = serde_json::Map::new();
2090 for l0 in l0_ctx.iter_l0_buffers() {
2091 let guard = l0.read();
2092 if let Some(props) = guard.vertex_properties.get(target_vid) {
2093 for (k, v) in props.iter() {
2094 let json_val: serde_json::Value = v.clone().into();
2095 merged_props.insert(k.to_string(), json_val);
2096 }
2097 }
2098 }
2099 if merged_props.is_empty() {
2100 builder.append_null();
2101 } else {
2102 let json = serde_json::Value::Object(merged_props);
2103 match encode_cypher_value(&json) {
2104 Ok(bytes) => builder.append_value(bytes),
2105 Err(_) => builder.append_null(),
2106 }
2107 }
2108 }
2109 columns.push(Arc::new(builder.finish()));
2110 } else {
2111 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2113 for (_, target_vid, _, _, _) in &expansions {
2114 let mut found = false;
2115 for l0 in l0_ctx.iter_l0_buffers() {
2116 let guard = l0.read();
2117 if let Some(props) = guard.vertex_properties.get(target_vid)
2118 && let Some(val) = props.get(prop_name.as_str())
2119 && !val.is_null()
2120 {
2121 let json_val: serde_json::Value = val.clone().into();
2122 if let Ok(bytes) = encode_cypher_value(&json_val) {
2123 builder.append_value(bytes);
2124 found = true;
2125 break;
2126 }
2127 }
2128 }
2129 if !found {
2130 builder.append_null();
2131 }
2132 }
2133 columns.push(Arc::new(builder.finish()));
2134 }
2135 }
2136 }
2137
2138 let matched_batch =
2139 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2140
2141 if self.optional {
2143 let unmatched = collect_unmatched_optional_group_rows(
2144 input,
2145 &matched_rows,
2146 &self.schema,
2147 &self.optional_pattern_vars,
2148 )?;
2149
2150 if unmatched.is_empty() {
2151 return Ok(matched_batch);
2152 }
2153
2154 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2155 input,
2156 &unmatched,
2157 &self.schema,
2158 &self.optional_pattern_vars,
2159 )?;
2160
2161 use arrow::compute::concat_batches;
2163 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2164 } else {
2165 Ok(matched_batch)
2166 }
2167 }
2168}
2169
2170async fn build_edge_adjacency_map(
2176 graph_ctx: &GraphExecutionContext,
2177 type_names: &[String],
2178 direction: Direction,
2179) -> DFResult<EdgeAdjacencyMap> {
2180 let storage = graph_ctx.storage();
2181 let l0_ctx = graph_ctx.l0_context();
2182
2183 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2185 let edges_with_type = storage
2186 .find_edges_by_type_names(&type_refs)
2187 .await
2188 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2189
2190 let mut edges: Vec<(
2192 uni_common::Eid,
2193 uni_common::Vid,
2194 uni_common::Vid,
2195 String,
2196 uni_common::Properties,
2197 )> = edges_with_type.into_iter().collect();
2198
2199 for l0 in l0_ctx.iter_l0_buffers() {
2201 let l0_guard = l0.read();
2202
2203 for type_name in type_names {
2204 let l0_eids = l0_guard.eids_for_type(type_name);
2205
2206 for &eid in &l0_eids {
2208 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2209 let src_vid = edge_ref.src_vid;
2210 let dst_vid = edge_ref.dst_vid;
2211
2212 let props = l0_guard
2214 .edge_properties
2215 .get(&eid)
2216 .cloned()
2217 .unwrap_or_default();
2218
2219 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2220 }
2221 }
2222 }
2223 }
2224
2225 let mut seen_eids = HashSet::new();
2227 let mut unique_edges = Vec::new();
2228 for edge in edges.into_iter().rev() {
2229 if seen_eids.insert(edge.0) {
2230 unique_edges.push(edge);
2231 }
2232 }
2233 unique_edges.reverse();
2234
2235 let mut tombstoned_eids = HashSet::new();
2237 for l0 in l0_ctx.iter_l0_buffers() {
2238 let l0_guard = l0.read();
2239 for eid in l0_guard.tombstones.keys() {
2240 tombstoned_eids.insert(*eid);
2241 }
2242 }
2243 if !tombstoned_eids.is_empty() {
2244 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2245 }
2246
2247 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2249
2250 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2251 match direction {
2252 Direction::Outgoing => {
2253 adjacency
2254 .entry(src_vid)
2255 .or_default()
2256 .push((dst_vid, eid, edge_type, props));
2257 }
2258 Direction::Incoming => {
2259 adjacency
2260 .entry(dst_vid)
2261 .or_default()
2262 .push((src_vid, eid, edge_type, props));
2263 }
2264 Direction::Both => {
2265 adjacency.entry(src_vid).or_default().push((
2266 dst_vid,
2267 eid,
2268 edge_type.clone(),
2269 props.clone(),
2270 ));
2271 adjacency
2272 .entry(dst_vid)
2273 .or_default()
2274 .push((src_vid, eid, edge_type, props));
2275 }
2276 }
2277 }
2278
2279 Ok(adjacency)
2280}
2281
2282impl Stream for GraphTraverseMainStream {
2283 type Item = DFResult<RecordBatch>;
2284
2285 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2286 loop {
2287 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2288
2289 match state {
2290 GraphTraverseMainState::LoadingEdges {
2291 mut future,
2292 input_stream,
2293 } => match future.as_mut().poll(cx) {
2294 Poll::Ready(Ok(adjacency)) => {
2295 self.state = GraphTraverseMainState::Processing {
2297 adjacency,
2298 input_stream,
2299 };
2300 }
2302 Poll::Ready(Err(e)) => {
2303 self.state = GraphTraverseMainState::Done;
2304 return Poll::Ready(Some(Err(e)));
2305 }
2306 Poll::Pending => {
2307 self.state = GraphTraverseMainState::LoadingEdges {
2308 future,
2309 input_stream,
2310 };
2311 return Poll::Pending;
2312 }
2313 },
2314 GraphTraverseMainState::Processing {
2315 adjacency,
2316 mut input_stream,
2317 } => {
2318 if let Err(e) = self.graph_ctx.check_timeout() {
2320 return Poll::Ready(Some(Err(
2321 datafusion::error::DataFusionError::Execution(e.to_string()),
2322 )));
2323 }
2324
2325 match input_stream.poll_next_unpin(cx) {
2326 Poll::Ready(Some(Ok(batch))) => {
2327 let result = self.expand_batch(&batch, &adjacency);
2329
2330 self.state = GraphTraverseMainState::Processing {
2331 adjacency,
2332 input_stream,
2333 };
2334
2335 if let Ok(ref r) = result {
2336 self.metrics.record_output(r.num_rows());
2337 }
2338 return Poll::Ready(Some(result));
2339 }
2340 Poll::Ready(Some(Err(e))) => {
2341 self.state = GraphTraverseMainState::Done;
2342 return Poll::Ready(Some(Err(e)));
2343 }
2344 Poll::Ready(None) => {
2345 self.state = GraphTraverseMainState::Done;
2346 return Poll::Ready(None);
2347 }
2348 Poll::Pending => {
2349 self.state = GraphTraverseMainState::Processing {
2350 adjacency,
2351 input_stream,
2352 };
2353 return Poll::Pending;
2354 }
2355 }
2356 }
2357 GraphTraverseMainState::Done => {
2358 return Poll::Ready(None);
2359 }
2360 }
2361 }
2362 }
2363}
2364
2365impl RecordBatchStream for GraphTraverseMainStream {
2366 fn schema(&self) -> SchemaRef {
2367 self.schema.clone()
2368 }
2369}
2370
2371pub struct GraphVariableLengthTraverseExec {
2392 input: Arc<dyn ExecutionPlan>,
2394
2395 source_column: String,
2397
2398 edge_type_ids: Vec<u32>,
2400
2401 direction: Direction,
2403
2404 min_hops: usize,
2406
2407 max_hops: usize,
2409
2410 target_variable: String,
2412
2413 step_variable: Option<String>,
2415
2416 path_variable: Option<String>,
2418
2419 target_properties: Vec<String>,
2421
2422 target_label_name: Option<String>,
2424
2425 is_optional: bool,
2427
2428 bound_target_column: Option<String>,
2430
2431 edge_lance_filter: Option<String>,
2433
2434 edge_property_conditions: Vec<(String, UniValue)>,
2437
2438 used_edge_columns: Vec<String>,
2440
2441 path_mode: super::nfa::PathMode,
2443
2444 output_mode: super::nfa::VlpOutputMode,
2446
2447 nfa: Arc<PathNfa>,
2449
2450 graph_ctx: Arc<GraphExecutionContext>,
2452
2453 schema: SchemaRef,
2455
2456 properties: PlanProperties,
2458
2459 metrics: ExecutionPlanMetricsSet,
2461}
2462
2463impl fmt::Debug for GraphVariableLengthTraverseExec {
2464 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2465 f.debug_struct("GraphVariableLengthTraverseExec")
2466 .field("source_column", &self.source_column)
2467 .field("edge_type_ids", &self.edge_type_ids)
2468 .field("direction", &self.direction)
2469 .field("min_hops", &self.min_hops)
2470 .field("max_hops", &self.max_hops)
2471 .field("target_variable", &self.target_variable)
2472 .finish()
2473 }
2474}
2475
2476impl GraphVariableLengthTraverseExec {
2477 #[expect(clippy::too_many_arguments)]
2483 pub fn new(
2484 input: Arc<dyn ExecutionPlan>,
2485 source_column: impl Into<String>,
2486 edge_type_ids: Vec<u32>,
2487 direction: Direction,
2488 min_hops: usize,
2489 max_hops: usize,
2490 target_variable: impl Into<String>,
2491 step_variable: Option<String>,
2492 path_variable: Option<String>,
2493 target_properties: Vec<String>,
2494 target_label_name: Option<String>,
2495 graph_ctx: Arc<GraphExecutionContext>,
2496 is_optional: bool,
2497 bound_target_column: Option<String>,
2498 edge_lance_filter: Option<String>,
2499 edge_property_conditions: Vec<(String, UniValue)>,
2500 used_edge_columns: Vec<String>,
2501 path_mode: super::nfa::PathMode,
2502 output_mode: super::nfa::VlpOutputMode,
2503 qpp_nfa: Option<PathNfa>,
2504 ) -> Self {
2505 let source_column = source_column.into();
2506 let target_variable = target_variable.into();
2507
2508 let uni_schema = graph_ctx.storage().schema_manager().schema();
2510 let label_props = target_label_name
2511 .as_deref()
2512 .and_then(|ln| uni_schema.properties.get(ln));
2513
2514 let schema = Self::build_schema(
2516 input.schema(),
2517 &target_variable,
2518 step_variable.as_deref(),
2519 path_variable.as_deref(),
2520 &target_properties,
2521 label_props,
2522 );
2523 let properties = compute_plan_properties(schema.clone());
2524
2525 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2527 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2528 }));
2529
2530 Self {
2531 input,
2532 source_column,
2533 edge_type_ids,
2534 direction,
2535 min_hops,
2536 max_hops,
2537 target_variable,
2538 step_variable,
2539 path_variable,
2540 target_properties,
2541 target_label_name,
2542 is_optional,
2543 bound_target_column,
2544 edge_lance_filter,
2545 edge_property_conditions,
2546 used_edge_columns,
2547 path_mode,
2548 output_mode,
2549 nfa,
2550 graph_ctx,
2551 schema,
2552 properties,
2553 metrics: ExecutionPlanMetricsSet::new(),
2554 }
2555 }
2556
2557 fn build_schema(
2559 input_schema: SchemaRef,
2560 target_variable: &str,
2561 step_variable: Option<&str>,
2562 path_variable: Option<&str>,
2563 target_properties: &[String],
2564 label_props: Option<
2565 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2566 >,
2567 ) -> SchemaRef {
2568 let mut fields: Vec<Field> = input_schema
2569 .fields()
2570 .iter()
2571 .map(|f| f.as_ref().clone())
2572 .collect();
2573
2574 let target_vid_name = format!("{}._vid", target_variable);
2576 if input_schema.column_with_name(&target_vid_name).is_none() {
2577 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2578 }
2579
2580 let target_labels_name = format!("{}._labels", target_variable);
2582 if input_schema.column_with_name(&target_labels_name).is_none() {
2583 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2584 }
2585
2586 for prop_name in target_properties {
2588 let col_name = format!("{}.{}", target_variable, prop_name);
2589 if input_schema.column_with_name(&col_name).is_none() {
2590 let arrow_type = resolve_property_type(prop_name, label_props);
2591 fields.push(Field::new(&col_name, arrow_type, true));
2592 }
2593 }
2594
2595 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2597
2598 if let Some(step_var) = step_variable {
2600 fields.push(build_edge_list_field(step_var));
2601 }
2602
2603 if let Some(path_var) = path_variable
2605 && input_schema.column_with_name(path_var).is_none()
2606 {
2607 fields.push(build_path_struct_field(path_var));
2608 }
2609
2610 Arc::new(Schema::new(fields))
2611 }
2612}
2613
2614impl DisplayAs for GraphVariableLengthTraverseExec {
2615 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2616 write!(
2617 f,
2618 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2619 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2620 )
2621 }
2622}
2623
2624impl ExecutionPlan for GraphVariableLengthTraverseExec {
2625 fn name(&self) -> &str {
2626 "GraphVariableLengthTraverseExec"
2627 }
2628
2629 fn as_any(&self) -> &dyn Any {
2630 self
2631 }
2632
2633 fn schema(&self) -> SchemaRef {
2634 self.schema.clone()
2635 }
2636
2637 fn properties(&self) -> &PlanProperties {
2638 &self.properties
2639 }
2640
2641 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2642 vec![&self.input]
2643 }
2644
2645 fn with_new_children(
2646 self: Arc<Self>,
2647 children: Vec<Arc<dyn ExecutionPlan>>,
2648 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2649 if children.len() != 1 {
2650 return Err(datafusion::error::DataFusionError::Plan(
2651 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2652 ));
2653 }
2654
2655 Ok(Arc::new(Self::new(
2657 children[0].clone(),
2658 self.source_column.clone(),
2659 self.edge_type_ids.clone(),
2660 self.direction,
2661 self.min_hops,
2662 self.max_hops,
2663 self.target_variable.clone(),
2664 self.step_variable.clone(),
2665 self.path_variable.clone(),
2666 self.target_properties.clone(),
2667 self.target_label_name.clone(),
2668 self.graph_ctx.clone(),
2669 self.is_optional,
2670 self.bound_target_column.clone(),
2671 self.edge_lance_filter.clone(),
2672 self.edge_property_conditions.clone(),
2673 self.used_edge_columns.clone(),
2674 self.path_mode.clone(),
2675 self.output_mode.clone(),
2676 Some((*self.nfa).clone()),
2677 )))
2678 }
2679
2680 fn execute(
2681 &self,
2682 partition: usize,
2683 context: Arc<TaskContext>,
2684 ) -> DFResult<SendableRecordBatchStream> {
2685 let input_stream = self.input.execute(partition, context)?;
2686
2687 let metrics = BaselineMetrics::new(&self.metrics, partition);
2688
2689 let warm_fut = self
2690 .graph_ctx
2691 .warming_future(self.edge_type_ids.clone(), self.direction);
2692
2693 Ok(Box::pin(GraphVariableLengthTraverseStream {
2694 input: input_stream,
2695 exec: Arc::new(self.clone_for_stream()),
2696 schema: self.schema.clone(),
2697 state: VarLengthStreamState::Warming(warm_fut),
2698 metrics,
2699 }))
2700 }
2701
2702 fn metrics(&self) -> Option<MetricsSet> {
2703 Some(self.metrics.clone_inner())
2704 }
2705}
2706
2707impl GraphVariableLengthTraverseExec {
2708 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2710 GraphVariableLengthTraverseExecData {
2711 source_column: self.source_column.clone(),
2712 edge_type_ids: self.edge_type_ids.clone(),
2713 direction: self.direction,
2714 min_hops: self.min_hops,
2715 max_hops: self.max_hops,
2716 target_variable: self.target_variable.clone(),
2717 step_variable: self.step_variable.clone(),
2718 path_variable: self.path_variable.clone(),
2719 target_properties: self.target_properties.clone(),
2720 target_label_name: self.target_label_name.clone(),
2721 is_optional: self.is_optional,
2722 bound_target_column: self.bound_target_column.clone(),
2723 edge_lance_filter: self.edge_lance_filter.clone(),
2724 edge_property_conditions: self.edge_property_conditions.clone(),
2725 used_edge_columns: self.used_edge_columns.clone(),
2726 path_mode: self.path_mode.clone(),
2727 output_mode: self.output_mode.clone(),
2728 nfa: self.nfa.clone(),
2729 graph_ctx: self.graph_ctx.clone(),
2730 }
2731 }
2732}
2733
2734#[expect(
2736 dead_code,
2737 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2738)]
2739struct GraphVariableLengthTraverseExecData {
2740 source_column: String,
2741 edge_type_ids: Vec<u32>,
2742 direction: Direction,
2743 min_hops: usize,
2744 max_hops: usize,
2745 target_variable: String,
2746 step_variable: Option<String>,
2747 path_variable: Option<String>,
2748 target_properties: Vec<String>,
2749 target_label_name: Option<String>,
2750 is_optional: bool,
2751 bound_target_column: Option<String>,
2752 #[expect(dead_code, reason = "Used in Phase 3 warming")]
2753 edge_lance_filter: Option<String>,
2754 edge_property_conditions: Vec<(String, UniValue)>,
2756 used_edge_columns: Vec<String>,
2757 path_mode: super::nfa::PathMode,
2758 output_mode: super::nfa::VlpOutputMode,
2759 nfa: Arc<PathNfa>,
2760 graph_ctx: Arc<GraphExecutionContext>,
2761}
2762
2763const MAX_FRONTIER_SIZE: usize = 500_000;
2765const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2767
2768impl GraphVariableLengthTraverseExecData {
2769 fn check_target_label(&self, vid: Vid) -> bool {
2771 if let Some(ref label_name) = self.target_label_name {
2772 let query_ctx = self.graph_ctx.query_context();
2773 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2774 Some(labels) => labels.contains(label_name),
2775 None => true, }
2777 } else {
2778 true
2779 }
2780 }
2781
2782 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2784 match constraint {
2785 super::nfa::VertexConstraint::Label(label_name) => {
2786 let query_ctx = self.graph_ctx.query_context();
2787 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2788 Some(labels) => labels.contains(label_name),
2789 None => true, }
2791 }
2792 }
2793 }
2794
2795 fn expand_neighbors(
2798 &self,
2799 vid: Vid,
2800 state: NfaStateId,
2801 eid_filter: &EidFilter,
2802 used_eids: &FxHashSet<u64>,
2803 ) -> Vec<(Vid, Eid, NfaStateId)> {
2804 let is_undirected = matches!(self.direction, Direction::Both);
2805 let mut results = Vec::new();
2806
2807 for transition in self.nfa.transitions_from(state) {
2808 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2809
2810 for &etype in &transition.edge_type_ids {
2811 for (neighbor, eid) in
2812 self.graph_ctx
2813 .get_neighbors(vid, etype, transition.direction)
2814 {
2815 if is_undirected && !seen_edges.insert(eid.as_u64()) {
2817 continue;
2818 }
2819
2820 if !eid_filter.contains(eid) {
2822 continue;
2823 }
2824
2825 if !self.edge_property_conditions.is_empty() {
2827 let query_ctx = self.graph_ctx.query_context();
2828 let passes = if let Some(props) =
2829 l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2830 {
2831 self.edge_property_conditions
2832 .iter()
2833 .all(|(name, expected)| {
2834 props.get(name).is_some_and(|actual| actual == expected)
2835 })
2836 } else {
2837 true
2841 };
2842 if !passes {
2843 continue;
2844 }
2845 }
2846
2847 if used_eids.contains(&eid.as_u64()) {
2849 continue;
2850 }
2851
2852 if let Some(constraint) = self.nfa.state_constraint(transition.to)
2854 && !self.check_state_constraint(neighbor, constraint)
2855 {
2856 continue;
2857 }
2858
2859 results.push((neighbor, eid, transition.to));
2860 }
2861 }
2862 }
2863
2864 results
2865 }
2866
2867 fn bfs_with_dag(
2872 &self,
2873 source: Vid,
2874 eid_filter: &EidFilter,
2875 used_eids: &FxHashSet<u64>,
2876 vid_filter: &VidFilter,
2877 ) -> Vec<BfsResult> {
2878 let nfa = &self.nfa;
2879 let selector = PathSelector::All;
2880 let mut dag = PredecessorDag::new(selector);
2881 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2882
2883 if nfa.is_accepting(nfa.start_state())
2885 && self.check_target_label(source)
2886 && vid_filter.contains(source)
2887 {
2888 accepting.push((source, nfa.start_state(), 0));
2889 }
2890
2891 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2893 let mut depth: u32 = 0;
2894
2895 while !frontier.is_empty() && depth < self.max_hops as u32 {
2896 depth += 1;
2897 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2898 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2899
2900 for &(vid, state) in &frontier {
2901 for (neighbor, eid, dst_state) in
2902 self.expand_neighbors(vid, state, eid_filter, used_eids)
2903 {
2904 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2906
2907 if seen_at_depth.insert((neighbor, dst_state)) {
2909 next_frontier.push((neighbor, dst_state));
2910
2911 if nfa.is_accepting(dst_state)
2913 && self.check_target_label(neighbor)
2914 && vid_filter.contains(neighbor)
2915 {
2916 accepting.push((neighbor, dst_state, depth));
2917 }
2918 }
2919 }
2920 }
2921
2922 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
2924 break;
2925 }
2926
2927 frontier = next_frontier;
2928 }
2929
2930 let mut results: Vec<BfsResult> = Vec::new();
2932 for &(target, state, depth) in &accepting {
2933 dag.enumerate_paths(
2934 source,
2935 target,
2936 state,
2937 depth,
2938 depth,
2939 &self.path_mode,
2940 &mut |nodes, edges| {
2941 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
2942 std::ops::ControlFlow::Continue(())
2943 },
2944 );
2945 }
2946
2947 results
2948 }
2949
2950 fn bfs_endpoints_only(
2955 &self,
2956 source: Vid,
2957 eid_filter: &EidFilter,
2958 used_eids: &FxHashSet<u64>,
2959 vid_filter: &VidFilter,
2960 ) -> Vec<(Vid, u32)> {
2961 let nfa = &self.nfa;
2962 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
2964 let mut results: Vec<(Vid, u32)> = Vec::new();
2965
2966 if nfa.is_accepting(nfa.start_state())
2968 && self.check_target_label(source)
2969 && vid_filter.contains(source)
2970 {
2971 results.push((source, 0));
2972 }
2973
2974 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2976 let mut depth: u32 = 0;
2977
2978 while !frontier.is_empty() && depth < self.max_hops as u32 {
2979 depth += 1;
2980 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2981 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2982
2983 for &(vid, state) in &frontier {
2984 for (neighbor, eid, dst_state) in
2985 self.expand_neighbors(vid, state, eid_filter, used_eids)
2986 {
2987 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2988
2989 if seen_at_depth.insert((neighbor, dst_state)) {
2990 next_frontier.push((neighbor, dst_state));
2991
2992 if nfa.is_accepting(dst_state)
2994 && self.check_target_label(neighbor)
2995 && vid_filter.contains(neighbor)
2996 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
2997 {
2998 results.push((neighbor, depth));
2999 }
3000 }
3001 }
3002 }
3003
3004 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3005 break;
3006 }
3007
3008 frontier = next_frontier;
3009 }
3010
3011 results
3012 }
3013}
3014
3015enum VarLengthStreamState {
3017 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3019 Reading,
3021 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3023 Done,
3025}
3026
3027struct GraphVariableLengthTraverseStream {
3029 input: SendableRecordBatchStream,
3030 exec: Arc<GraphVariableLengthTraverseExecData>,
3031 schema: SchemaRef,
3032 state: VarLengthStreamState,
3033 metrics: BaselineMetrics,
3034}
3035
3036impl Stream for GraphVariableLengthTraverseStream {
3037 type Item = DFResult<RecordBatch>;
3038
3039 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3040 loop {
3041 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3042
3043 match state {
3044 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3045 Poll::Ready(Ok(())) => {
3046 self.state = VarLengthStreamState::Reading;
3047 }
3049 Poll::Ready(Err(e)) => {
3050 self.state = VarLengthStreamState::Done;
3051 return Poll::Ready(Some(Err(e)));
3052 }
3053 Poll::Pending => {
3054 self.state = VarLengthStreamState::Warming(fut);
3055 return Poll::Pending;
3056 }
3057 },
3058 VarLengthStreamState::Reading => {
3059 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3061 return Poll::Ready(Some(Err(
3062 datafusion::error::DataFusionError::Execution(e.to_string()),
3063 )));
3064 }
3065
3066 match self.input.poll_next_unpin(cx) {
3067 Poll::Ready(Some(Ok(batch))) => {
3068 let eid_filter = EidFilter::AllAllowed;
3071 let vid_filter = VidFilter::AllAllowed;
3072 let base_result =
3073 self.process_batch_base(batch, &eid_filter, &vid_filter);
3074 let base_batch = match base_result {
3075 Ok(b) => b,
3076 Err(e) => {
3077 self.state = VarLengthStreamState::Reading;
3078 return Poll::Ready(Some(Err(e)));
3079 }
3080 };
3081
3082 if self.exec.target_properties.is_empty() {
3084 self.state = VarLengthStreamState::Reading;
3085 return Poll::Ready(Some(Ok(base_batch)));
3086 }
3087
3088 let schema = self.schema.clone();
3090 let target_variable = self.exec.target_variable.clone();
3091 let target_properties = self.exec.target_properties.clone();
3092 let target_label_name = self.exec.target_label_name.clone();
3093 let graph_ctx = self.exec.graph_ctx.clone();
3094
3095 let fut = hydrate_vlp_target_properties(
3096 base_batch,
3097 schema,
3098 target_variable,
3099 target_properties,
3100 target_label_name,
3101 graph_ctx,
3102 );
3103
3104 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3105 }
3107 Poll::Ready(Some(Err(e))) => {
3108 self.state = VarLengthStreamState::Done;
3109 return Poll::Ready(Some(Err(e)));
3110 }
3111 Poll::Ready(None) => {
3112 self.state = VarLengthStreamState::Done;
3113 return Poll::Ready(None);
3114 }
3115 Poll::Pending => {
3116 self.state = VarLengthStreamState::Reading;
3117 return Poll::Pending;
3118 }
3119 }
3120 }
3121 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3122 Poll::Ready(Ok(batch)) => {
3123 self.state = VarLengthStreamState::Reading;
3124 self.metrics.record_output(batch.num_rows());
3125 return Poll::Ready(Some(Ok(batch)));
3126 }
3127 Poll::Ready(Err(e)) => {
3128 self.state = VarLengthStreamState::Done;
3129 return Poll::Ready(Some(Err(e)));
3130 }
3131 Poll::Pending => {
3132 self.state = VarLengthStreamState::Materializing(fut);
3133 return Poll::Pending;
3134 }
3135 },
3136 VarLengthStreamState::Done => {
3137 return Poll::Ready(None);
3138 }
3139 }
3140 }
3141 }
3142}
3143
3144impl GraphVariableLengthTraverseStream {
3145 fn process_batch_base(
3146 &self,
3147 batch: RecordBatch,
3148 eid_filter: &EidFilter,
3149 vid_filter: &VidFilter,
3150 ) -> DFResult<RecordBatch> {
3151 let source_col = batch
3152 .column_by_name(&self.exec.source_column)
3153 .ok_or_else(|| {
3154 datafusion::error::DataFusionError::Execution(format!(
3155 "Source column '{}' not found",
3156 self.exec.source_column
3157 ))
3158 })?;
3159
3160 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3161 let source_vids: &UInt64Array = &source_vid_cow;
3162
3163 let bound_target_cow = self
3165 .exec
3166 .bound_target_column
3167 .as_ref()
3168 .and_then(|col| batch.column_by_name(col))
3169 .map(|c| column_as_vid_array(c.as_ref()))
3170 .transpose()?;
3171 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3172
3173 let used_edge_arrays: Vec<&UInt64Array> = self
3175 .exec
3176 .used_edge_columns
3177 .iter()
3178 .filter_map(|col| {
3179 batch
3180 .column_by_name(col)?
3181 .as_any()
3182 .downcast_ref::<UInt64Array>()
3183 })
3184 .collect();
3185
3186 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3188
3189 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3190 let mut emitted_for_row = false;
3191
3192 if let Some(src) = source_vid {
3193 let vid = Vid::from(src);
3194
3195 let used_eids: FxHashSet<u64> = used_edge_arrays
3197 .iter()
3198 .filter_map(|arr| {
3199 if arr.is_null(row_idx) {
3200 None
3201 } else {
3202 Some(arr.value(row_idx))
3203 }
3204 })
3205 .collect();
3206
3207 match &self.exec.output_mode {
3209 VlpOutputMode::EndpointsOnly => {
3210 let endpoints = self
3211 .exec
3212 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3213 for (target, depth) in endpoints {
3214 if let Some(targets) = expected_targets {
3216 if targets.is_null(row_idx) {
3217 continue;
3218 }
3219 if target.as_u64() != targets.value(row_idx) {
3220 continue;
3221 }
3222 }
3223 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3224 emitted_for_row = true;
3225 }
3226 }
3227 _ => {
3228 let bfs_results = self
3230 .exec
3231 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3232 for (target, hop_count, node_path, edge_path) in bfs_results {
3233 if let Some(targets) = expected_targets {
3235 if targets.is_null(row_idx) {
3236 continue;
3237 }
3238 if target.as_u64() != targets.value(row_idx) {
3239 continue;
3240 }
3241 }
3242 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3243 emitted_for_row = true;
3244 }
3245 }
3246 }
3247 }
3248
3249 if self.exec.is_optional && !emitted_for_row {
3250 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3253 }
3254 }
3255
3256 self.build_output_batch(&batch, &expansions)
3257 }
3258
3259 fn build_output_batch(
3260 &self,
3261 input: &RecordBatch,
3262 expansions: &[VarLengthExpansion],
3263 ) -> DFResult<RecordBatch> {
3264 if expansions.is_empty() {
3265 return Ok(RecordBatch::new_empty(self.schema.clone()));
3266 }
3267
3268 let num_rows = expansions.len();
3269
3270 let indices: Vec<u64> = expansions
3272 .iter()
3273 .map(|(idx, _, _, _, _)| *idx as u64)
3274 .collect();
3275 let indices_array = UInt64Array::from(indices);
3276
3277 let mut columns: Vec<ArrayRef> = Vec::new();
3279 for col in input.columns() {
3280 let expanded = take(col.as_ref(), &indices_array, None)?;
3281 columns.push(expanded);
3282 }
3283
3284 let unmatched_rows: Vec<bool> = expansions
3288 .iter()
3289 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3290 .collect();
3291 let target_vids: Vec<Option<u64>> = expansions
3292 .iter()
3293 .zip(unmatched_rows.iter())
3294 .map(
3295 |((_, vid, _, _, _), unmatched)| {
3296 if *unmatched { None } else { Some(vid.as_u64()) }
3297 },
3298 )
3299 .collect();
3300
3301 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3303 if input.schema().column_with_name(&target_vid_name).is_none() {
3304 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3305 }
3306
3307 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3309 if input
3310 .schema()
3311 .column_with_name(&target_labels_name)
3312 .is_none()
3313 {
3314 use arrow_array::builder::{ListBuilder, StringBuilder};
3315 let query_ctx = self.exec.graph_ctx.query_context();
3316 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3317 for target_vid in &target_vids {
3318 let Some(vid_u64) = target_vid else {
3319 labels_builder.append(false);
3320 continue;
3321 };
3322 let vid = Vid::from(*vid_u64);
3323 let row_labels: Vec<String> =
3324 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3325 Some(labels) => {
3326 labels
3328 }
3329 None => {
3330 if let Some(ref label_name) = self.exec.target_label_name {
3332 vec![label_name.clone()]
3333 } else {
3334 vec![]
3335 }
3336 }
3337 };
3338 let values = labels_builder.values();
3339 for lbl in &row_labels {
3340 values.append_value(lbl);
3341 }
3342 labels_builder.append(true);
3343 }
3344 columns.push(Arc::new(labels_builder.finish()));
3345 }
3346
3347 for prop_name in &self.exec.target_properties {
3349 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3350 if input.schema().column_with_name(&full_prop_name).is_none() {
3351 let col_idx = columns.len();
3352 if col_idx < self.schema.fields().len() {
3353 let field = self.schema.field(col_idx);
3354 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3355 }
3356 }
3357 }
3358
3359 let hop_counts: Vec<u64> = expansions
3361 .iter()
3362 .map(|(_, _, hops, _, _)| *hops as u64)
3363 .collect();
3364 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3365
3366 if self.exec.step_variable.is_some() {
3368 let mut edges_builder = new_edge_list_builder();
3369 let query_ctx = self.exec.graph_ctx.query_context();
3370
3371 for (_, _, _, node_path, edge_path) in expansions {
3372 if node_path.is_empty() && edge_path.is_empty() {
3373 edges_builder.append_null();
3375 } else if edge_path.is_empty() {
3376 edges_builder.append(true);
3378 } else {
3379 for (i, eid) in edge_path.iter().enumerate() {
3380 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3381 .unwrap_or_else(|| "UNKNOWN".to_string());
3382 append_edge_to_struct(
3383 edges_builder.values(),
3384 *eid,
3385 &type_name,
3386 node_path[i].as_u64(),
3387 node_path[i + 1].as_u64(),
3388 &query_ctx,
3389 );
3390 }
3391 edges_builder.append(true);
3392 }
3393 }
3394
3395 columns.push(Arc::new(edges_builder.finish()));
3396 }
3397
3398 if let Some(path_var_name) = &self.exec.path_variable {
3403 let existing_path_col_idx = input
3404 .schema()
3405 .column_with_name(path_var_name)
3406 .map(|(idx, _)| idx);
3407 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3409 let existing_path = existing_path_arc
3410 .as_ref()
3411 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3412
3413 let mut nodes_builder = new_node_list_builder();
3414 let mut rels_builder = new_edge_list_builder();
3415 let query_ctx = self.exec.graph_ctx.query_context();
3416 let mut path_validity = Vec::with_capacity(expansions.len());
3417
3418 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3419 if node_path.is_empty() && edge_path.is_empty() {
3420 nodes_builder.append(false);
3421 rels_builder.append(false);
3422 path_validity.push(false);
3423 continue;
3424 }
3425
3426 let skip_first_vlp_node = if let Some(existing) = existing_path {
3428 if !existing.is_null(row_out_idx) {
3429 prepend_existing_path(
3430 existing,
3431 row_out_idx,
3432 &mut nodes_builder,
3433 &mut rels_builder,
3434 &query_ctx,
3435 );
3436 true
3437 } else {
3438 false
3439 }
3440 } else {
3441 false
3442 };
3443
3444 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3446 for vid in &node_path[start_idx..] {
3447 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3448 }
3449 nodes_builder.append(true);
3450
3451 for (i, eid) in edge_path.iter().enumerate() {
3452 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3453 .unwrap_or_else(|| "UNKNOWN".to_string());
3454 append_edge_to_struct(
3455 rels_builder.values(),
3456 *eid,
3457 &type_name,
3458 node_path[i].as_u64(),
3459 node_path[i + 1].as_u64(),
3460 &query_ctx,
3461 );
3462 }
3463 rels_builder.append(true);
3464 path_validity.push(true);
3465 }
3466
3467 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3469 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3470
3471 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3473 let rels_field = Arc::new(Field::new(
3474 "relationships",
3475 rels_array.data_type().clone(),
3476 true,
3477 ));
3478
3479 let path_struct = arrow_array::StructArray::try_new(
3481 vec![nodes_field, rels_field].into(),
3482 vec![nodes_array, rels_array],
3483 Some(arrow::buffer::NullBuffer::from(path_validity)),
3484 )
3485 .map_err(arrow_err)?;
3486
3487 if let Some(idx) = existing_path_col_idx {
3488 columns[idx] = Arc::new(path_struct);
3489 } else {
3490 columns.push(Arc::new(path_struct));
3491 }
3492 }
3493
3494 self.metrics.record_output(num_rows);
3495
3496 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3497 }
3498}
3499
3500impl RecordBatchStream for GraphVariableLengthTraverseStream {
3501 fn schema(&self) -> SchemaRef {
3502 self.schema.clone()
3503 }
3504}
3505
3506async fn hydrate_vlp_target_properties(
3511 base_batch: RecordBatch,
3512 schema: SchemaRef,
3513 target_variable: String,
3514 target_properties: Vec<String>,
3515 target_label_name: Option<String>,
3516 graph_ctx: Arc<GraphExecutionContext>,
3517) -> DFResult<RecordBatch> {
3518 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3519 return Ok(base_batch);
3520 }
3521
3522 let target_vid_col_name = format!("{}._vid", target_variable);
3529 let vid_col_idx = schema
3530 .fields()
3531 .iter()
3532 .enumerate()
3533 .rev()
3534 .find(|(_, f)| f.name() == &target_vid_col_name)
3535 .map(|(i, _)| i);
3536
3537 let Some(vid_col_idx) = vid_col_idx else {
3538 return Ok(base_batch);
3539 };
3540
3541 let vid_col = base_batch.column(vid_col_idx);
3542 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3543 let target_vid_array: &UInt64Array = &target_vid_cow;
3544
3545 let target_vids: Vec<Vid> = target_vid_array
3546 .iter()
3547 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3550 .collect();
3551
3552 let mut property_columns: Vec<ArrayRef> = Vec::new();
3554
3555 if let Some(ref label_name) = target_label_name {
3556 let property_manager = graph_ctx.property_manager();
3557 let query_ctx = graph_ctx.query_context();
3558
3559 let props_map = property_manager
3560 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3561 .await
3562 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3563
3564 let uni_schema = graph_ctx.storage().schema_manager().schema();
3565 let label_props = uni_schema.properties.get(label_name.as_str());
3566
3567 for prop_name in &target_properties {
3568 let data_type = resolve_property_type(prop_name, label_props);
3569 let column =
3570 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3571 property_columns.push(column);
3572 }
3573 } else {
3574 let non_internal_props: Vec<&str> = target_properties
3577 .iter()
3578 .filter(|p| *p != "_all_props")
3579 .map(|s| s.as_str())
3580 .collect();
3581 let property_manager = graph_ctx.property_manager();
3582 let query_ctx = graph_ctx.query_context();
3583
3584 let props_map = if !non_internal_props.is_empty() {
3585 property_manager
3586 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3587 .await
3588 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3589 } else {
3590 std::collections::HashMap::new()
3591 };
3592
3593 for prop_name in &target_properties {
3594 if prop_name == "_all_props" {
3595 use crate::query::df_graph::scan::encode_cypher_value;
3597 use arrow_array::builder::LargeBinaryBuilder;
3598
3599 let mut builder = LargeBinaryBuilder::new();
3600 let l0_ctx = graph_ctx.l0_context();
3601 for vid in &target_vids {
3602 let mut merged_props = serde_json::Map::new();
3603 if let Some(vid_props) = props_map.get(vid) {
3605 for (k, v) in vid_props.iter() {
3606 let json_val: serde_json::Value = v.clone().into();
3607 merged_props.insert(k.to_string(), json_val);
3608 }
3609 }
3610 for l0 in l0_ctx.iter_l0_buffers() {
3612 let guard = l0.read();
3613 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3614 for (k, v) in l0_props.iter() {
3615 let json_val: serde_json::Value = v.clone().into();
3616 merged_props.insert(k.to_string(), json_val);
3617 }
3618 }
3619 }
3620 if merged_props.is_empty() {
3621 builder.append_null();
3622 } else {
3623 let json = serde_json::Value::Object(merged_props);
3624 match encode_cypher_value(&json) {
3625 Ok(bytes) => builder.append_value(bytes),
3626 Err(_) => builder.append_null(),
3627 }
3628 }
3629 }
3630 property_columns.push(Arc::new(builder.finish()));
3631 } else {
3632 let column = build_property_column_static(
3633 &target_vids,
3634 &props_map,
3635 prop_name,
3636 &arrow::datatypes::DataType::LargeBinary,
3637 )?;
3638 property_columns.push(column);
3639 }
3640 }
3641 }
3642
3643 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3649 let mut prop_idx = 0;
3650 for (col_idx, field) in schema.fields().iter().enumerate() {
3651 let is_target_prop = col_idx > vid_col_idx
3652 && target_properties
3653 .iter()
3654 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3655 if is_target_prop && prop_idx < property_columns.len() {
3656 new_columns.push(property_columns[prop_idx].clone());
3657 prop_idx += 1;
3658 } else {
3659 new_columns.push(base_batch.column(col_idx).clone());
3660 }
3661 }
3662
3663 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3664}
3665
3666pub struct GraphVariableLengthTraverseMainExec {
3676 input: Arc<dyn ExecutionPlan>,
3678
3679 source_column: String,
3681
3682 type_names: Vec<String>,
3684
3685 direction: Direction,
3687
3688 min_hops: usize,
3690
3691 max_hops: usize,
3693
3694 target_variable: String,
3696
3697 step_variable: Option<String>,
3699
3700 path_variable: Option<String>,
3702
3703 target_properties: Vec<String>,
3705
3706 is_optional: bool,
3708
3709 bound_target_column: Option<String>,
3711
3712 edge_lance_filter: Option<String>,
3714
3715 edge_property_conditions: Vec<(String, UniValue)>,
3718
3719 used_edge_columns: Vec<String>,
3721
3722 path_mode: super::nfa::PathMode,
3724
3725 output_mode: super::nfa::VlpOutputMode,
3727
3728 graph_ctx: Arc<GraphExecutionContext>,
3730
3731 schema: SchemaRef,
3733
3734 properties: PlanProperties,
3736
3737 metrics: ExecutionPlanMetricsSet,
3739}
3740
3741impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3742 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3743 f.debug_struct("GraphVariableLengthTraverseMainExec")
3744 .field("source_column", &self.source_column)
3745 .field("type_names", &self.type_names)
3746 .field("direction", &self.direction)
3747 .field("min_hops", &self.min_hops)
3748 .field("max_hops", &self.max_hops)
3749 .field("target_variable", &self.target_variable)
3750 .finish()
3751 }
3752}
3753
3754impl GraphVariableLengthTraverseMainExec {
3755 #[expect(clippy::too_many_arguments)]
3757 pub fn new(
3758 input: Arc<dyn ExecutionPlan>,
3759 source_column: impl Into<String>,
3760 type_names: Vec<String>,
3761 direction: Direction,
3762 min_hops: usize,
3763 max_hops: usize,
3764 target_variable: impl Into<String>,
3765 step_variable: Option<String>,
3766 path_variable: Option<String>,
3767 target_properties: Vec<String>,
3768 graph_ctx: Arc<GraphExecutionContext>,
3769 is_optional: bool,
3770 bound_target_column: Option<String>,
3771 edge_lance_filter: Option<String>,
3772 edge_property_conditions: Vec<(String, UniValue)>,
3773 used_edge_columns: Vec<String>,
3774 path_mode: super::nfa::PathMode,
3775 output_mode: super::nfa::VlpOutputMode,
3776 ) -> Self {
3777 let source_column = source_column.into();
3778 let target_variable = target_variable.into();
3779
3780 let schema = Self::build_schema(
3782 input.schema(),
3783 &target_variable,
3784 step_variable.as_deref(),
3785 path_variable.as_deref(),
3786 &target_properties,
3787 );
3788 let properties = compute_plan_properties(schema.clone());
3789
3790 Self {
3791 input,
3792 source_column,
3793 type_names,
3794 direction,
3795 min_hops,
3796 max_hops,
3797 target_variable,
3798 step_variable,
3799 path_variable,
3800 target_properties,
3801 is_optional,
3802 bound_target_column,
3803 edge_lance_filter,
3804 edge_property_conditions,
3805 used_edge_columns,
3806 path_mode,
3807 output_mode,
3808 graph_ctx,
3809 schema,
3810 properties,
3811 metrics: ExecutionPlanMetricsSet::new(),
3812 }
3813 }
3814
3815 fn build_schema(
3817 input_schema: SchemaRef,
3818 target_variable: &str,
3819 step_variable: Option<&str>,
3820 path_variable: Option<&str>,
3821 target_properties: &[String],
3822 ) -> SchemaRef {
3823 let mut fields: Vec<Field> = input_schema
3824 .fields()
3825 .iter()
3826 .map(|f| f.as_ref().clone())
3827 .collect();
3828
3829 let target_vid_name = format!("{}._vid", target_variable);
3831 if input_schema.column_with_name(&target_vid_name).is_none() {
3832 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3833 }
3834
3835 let target_labels_name = format!("{}._labels", target_variable);
3837 if input_schema.column_with_name(&target_labels_name).is_none() {
3838 fields.push(Field::new(target_labels_name, labels_data_type(), true));
3839 }
3840
3841 fields.push(Field::new("_hop_count", DataType::UInt64, false));
3843
3844 if let Some(step_var) = step_variable {
3847 fields.push(build_edge_list_field(step_var));
3848 }
3849
3850 if let Some(path_var) = path_variable
3852 && input_schema.column_with_name(path_var).is_none()
3853 {
3854 fields.push(build_path_struct_field(path_var));
3855 }
3856
3857 for prop in target_properties {
3860 let prop_name = format!("{}.{}", target_variable, prop);
3861 if input_schema.column_with_name(&prop_name).is_none() {
3862 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3863 }
3864 }
3865
3866 Arc::new(Schema::new(fields))
3867 }
3868}
3869
3870impl DisplayAs for GraphVariableLengthTraverseMainExec {
3871 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3872 write!(
3873 f,
3874 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3875 self.source_column, self.type_names, self.min_hops, self.max_hops
3876 )
3877 }
3878}
3879
3880impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3881 fn name(&self) -> &str {
3882 "GraphVariableLengthTraverseMainExec"
3883 }
3884
3885 fn as_any(&self) -> &dyn Any {
3886 self
3887 }
3888
3889 fn schema(&self) -> SchemaRef {
3890 self.schema.clone()
3891 }
3892
3893 fn properties(&self) -> &PlanProperties {
3894 &self.properties
3895 }
3896
3897 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3898 vec![&self.input]
3899 }
3900
3901 fn with_new_children(
3902 self: Arc<Self>,
3903 children: Vec<Arc<dyn ExecutionPlan>>,
3904 ) -> DFResult<Arc<dyn ExecutionPlan>> {
3905 if children.len() != 1 {
3906 return Err(datafusion::error::DataFusionError::Plan(
3907 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3908 ));
3909 }
3910
3911 Ok(Arc::new(Self::new(
3912 children[0].clone(),
3913 self.source_column.clone(),
3914 self.type_names.clone(),
3915 self.direction,
3916 self.min_hops,
3917 self.max_hops,
3918 self.target_variable.clone(),
3919 self.step_variable.clone(),
3920 self.path_variable.clone(),
3921 self.target_properties.clone(),
3922 self.graph_ctx.clone(),
3923 self.is_optional,
3924 self.bound_target_column.clone(),
3925 self.edge_lance_filter.clone(),
3926 self.edge_property_conditions.clone(),
3927 self.used_edge_columns.clone(),
3928 self.path_mode.clone(),
3929 self.output_mode.clone(),
3930 )))
3931 }
3932
3933 fn execute(
3934 &self,
3935 partition: usize,
3936 context: Arc<TaskContext>,
3937 ) -> DFResult<SendableRecordBatchStream> {
3938 let input_stream = self.input.execute(partition, context)?;
3939 let metrics = BaselineMetrics::new(&self.metrics, partition);
3940
3941 let graph_ctx = self.graph_ctx.clone();
3943 let type_names = self.type_names.clone();
3944 let direction = self.direction;
3945 let load_fut =
3946 async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
3947
3948 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
3949 input: input_stream,
3950 source_column: self.source_column.clone(),
3951 type_names: self.type_names.clone(),
3952 direction: self.direction,
3953 min_hops: self.min_hops,
3954 max_hops: self.max_hops,
3955 target_variable: self.target_variable.clone(),
3956 step_variable: self.step_variable.clone(),
3957 path_variable: self.path_variable.clone(),
3958 target_properties: self.target_properties.clone(),
3959 graph_ctx: self.graph_ctx.clone(),
3960 is_optional: self.is_optional,
3961 bound_target_column: self.bound_target_column.clone(),
3962 edge_lance_filter: self.edge_lance_filter.clone(),
3963 edge_property_conditions: self.edge_property_conditions.clone(),
3964 used_edge_columns: self.used_edge_columns.clone(),
3965 path_mode: self.path_mode.clone(),
3966 output_mode: self.output_mode.clone(),
3967 schema: self.schema.clone(),
3968 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
3969 metrics,
3970 }))
3971 }
3972
3973 fn metrics(&self) -> Option<MetricsSet> {
3974 Some(self.metrics.clone_inner())
3975 }
3976}
3977
3978enum VarLengthMainStreamState {
3980 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
3982 Processing(EdgeAdjacencyMap),
3984 Materializing {
3986 adjacency: EdgeAdjacencyMap,
3987 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
3988 },
3989 Done,
3991}
3992
3993#[expect(dead_code, reason = "VLP fields used in Phase 3")]
3995struct GraphVariableLengthTraverseMainStream {
3996 input: SendableRecordBatchStream,
3997 source_column: String,
3998 type_names: Vec<String>,
3999 direction: Direction,
4000 min_hops: usize,
4001 max_hops: usize,
4002 target_variable: String,
4003 step_variable: Option<String>,
4005 path_variable: Option<String>,
4006 target_properties: Vec<String>,
4007 graph_ctx: Arc<GraphExecutionContext>,
4008 is_optional: bool,
4009 bound_target_column: Option<String>,
4010 edge_lance_filter: Option<String>,
4011 edge_property_conditions: Vec<(String, UniValue)>,
4013 used_edge_columns: Vec<String>,
4014 path_mode: super::nfa::PathMode,
4015 output_mode: super::nfa::VlpOutputMode,
4016 schema: SchemaRef,
4017 state: VarLengthMainStreamState,
4018 metrics: BaselineMetrics,
4019}
4020
4021type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4023
4024impl GraphVariableLengthTraverseMainStream {
4025 fn bfs(
4031 &self,
4032 source: Vid,
4033 adjacency: &EdgeAdjacencyMap,
4034 used_eids: &FxHashSet<u64>,
4035 ) -> Vec<MainBfsResult> {
4036 let mut results = Vec::new();
4037 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4038
4039 queue.push_back((source, 0, vec![source], vec![]));
4040
4041 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4042 if depth >= self.min_hops && depth <= self.max_hops {
4044 results.push((current, depth, node_path.clone(), edge_path.clone()));
4045 }
4046
4047 if depth >= self.max_hops {
4049 continue;
4050 }
4051
4052 if let Some(neighbors) = adjacency.get(¤t) {
4054 let is_undirected = matches!(self.direction, Direction::Both);
4055 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4056
4057 for (neighbor, eid, _edge_type, props) in neighbors {
4058 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4060 continue;
4061 }
4062
4063 if edge_path.contains(eid) {
4065 continue;
4066 }
4067
4068 if used_eids.contains(&eid.as_u64()) {
4071 continue;
4072 }
4073
4074 if !self.edge_property_conditions.is_empty() {
4076 let passes =
4077 self.edge_property_conditions
4078 .iter()
4079 .all(|(name, expected)| {
4080 props.get(name).is_some_and(|actual| actual == expected)
4081 });
4082 if !passes {
4083 continue;
4084 }
4085 }
4086
4087 let mut new_node_path = node_path.clone();
4088 new_node_path.push(*neighbor);
4089 let mut new_edge_path = edge_path.clone();
4090 new_edge_path.push(*eid);
4091 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4092 }
4093 }
4094 }
4095
4096 results
4097 }
4098
4099 fn process_batch(
4101 &self,
4102 batch: RecordBatch,
4103 adjacency: &EdgeAdjacencyMap,
4104 ) -> DFResult<RecordBatch> {
4105 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4106 datafusion::error::DataFusionError::Execution(format!(
4107 "Source column '{}' not found in input batch",
4108 self.source_column
4109 ))
4110 })?;
4111
4112 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4113 let source_vids: &UInt64Array = &source_vid_cow;
4114
4115 let bound_target_cow = self
4117 .bound_target_column
4118 .as_ref()
4119 .and_then(|col| batch.column_by_name(col))
4120 .map(|c| column_as_vid_array(c.as_ref()))
4121 .transpose()?;
4122 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4123
4124 let used_edge_arrays: Vec<&UInt64Array> = self
4126 .used_edge_columns
4127 .iter()
4128 .filter_map(|col| {
4129 batch
4130 .column_by_name(col)?
4131 .as_any()
4132 .downcast_ref::<UInt64Array>()
4133 })
4134 .collect();
4135
4136 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4138
4139 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4140 let mut emitted_for_row = false;
4141
4142 if let Some(source_u64) = source_opt {
4143 let source = Vid::from(source_u64);
4144
4145 let used_eids: FxHashSet<u64> = used_edge_arrays
4147 .iter()
4148 .filter_map(|arr| {
4149 if arr.is_null(row_idx) {
4150 None
4151 } else {
4152 Some(arr.value(row_idx))
4153 }
4154 })
4155 .collect();
4156
4157 let bfs_results = self.bfs(source, adjacency, &used_eids);
4158
4159 for (target, hops, node_path, edge_path) in bfs_results {
4160 if let Some(targets) = expected_targets {
4163 if targets.is_null(row_idx) {
4164 continue;
4165 }
4166 let expected_vid = targets.value(row_idx);
4167 if target.as_u64() != expected_vid {
4168 continue;
4169 }
4170 }
4171
4172 expansions.push((row_idx, target, hops, node_path, edge_path));
4173 emitted_for_row = true;
4174 }
4175 }
4176
4177 if self.is_optional && !emitted_for_row {
4178 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4180 }
4181 }
4182
4183 if expansions.is_empty() {
4184 if self.is_optional {
4185 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4186 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4187 }
4188 return Ok(RecordBatch::new_empty(self.schema.clone()));
4189 }
4190
4191 let num_rows = expansions.len();
4192 self.metrics.record_output(num_rows);
4193
4194 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4196
4197 for col_idx in 0..batch.num_columns() {
4199 let array = batch.column(col_idx);
4200 let indices: Vec<u64> = expansions
4201 .iter()
4202 .map(|(idx, _, _, _, _)| *idx as u64)
4203 .collect();
4204 let take_indices = UInt64Array::from(indices);
4205 let expanded = arrow::compute::take(array, &take_indices, None)?;
4206 columns.push(expanded);
4207 }
4208
4209 let target_vid_name = format!("{}._vid", self.target_variable);
4211 if batch.schema().column_with_name(&target_vid_name).is_none() {
4212 let target_vids: Vec<Option<u64>> = expansions
4213 .iter()
4214 .map(|(_, vid, _, node_path, edge_path)| {
4215 if node_path.is_empty() && edge_path.is_empty() {
4216 None
4217 } else {
4218 Some(vid.as_u64())
4219 }
4220 })
4221 .collect();
4222 columns.push(Arc::new(UInt64Array::from(target_vids)));
4223 }
4224
4225 let target_labels_name = format!("{}._labels", self.target_variable);
4227 if batch
4228 .schema()
4229 .column_with_name(&target_labels_name)
4230 .is_none()
4231 {
4232 use arrow_array::builder::{ListBuilder, StringBuilder};
4233 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4234 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4235 if node_path.is_empty() && edge_path.is_empty() {
4236 labels_builder.append(false);
4237 continue;
4238 }
4239 let mut row_labels: Vec<String> = Vec::new();
4240 let labels =
4241 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4242 for lbl in &labels {
4243 if !row_labels.contains(lbl) {
4244 row_labels.push(lbl.clone());
4245 }
4246 }
4247 let values = labels_builder.values();
4248 for lbl in &row_labels {
4249 values.append_value(lbl);
4250 }
4251 labels_builder.append(true);
4252 }
4253 columns.push(Arc::new(labels_builder.finish()));
4254 }
4255
4256 let hop_counts: Vec<u64> = expansions
4258 .iter()
4259 .map(|(_, _, hops, _, _)| *hops as u64)
4260 .collect();
4261 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4262
4263 if self.step_variable.is_some() {
4265 let mut edges_builder = new_edge_list_builder();
4266 let query_ctx = self.graph_ctx.query_context();
4267 let type_names_str = self.type_names.join("|");
4268
4269 for (_, _, _, node_path, edge_path) in expansions.iter() {
4270 if node_path.is_empty() && edge_path.is_empty() {
4271 edges_builder.append_null();
4272 } else if edge_path.is_empty() {
4273 edges_builder.append(true);
4275 } else {
4276 for (i, eid) in edge_path.iter().enumerate() {
4277 append_edge_to_struct(
4278 edges_builder.values(),
4279 *eid,
4280 &type_names_str,
4281 node_path[i].as_u64(),
4282 node_path[i + 1].as_u64(),
4283 &query_ctx,
4284 );
4285 }
4286 edges_builder.append(true);
4287 }
4288 }
4289
4290 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4291 }
4292
4293 if let Some(path_var_name) = &self.path_variable {
4297 let existing_path_col_idx = batch
4298 .schema()
4299 .column_with_name(path_var_name)
4300 .map(|(idx, _)| idx);
4301 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4302 let existing_path = existing_path_arc
4303 .as_ref()
4304 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4305
4306 let mut nodes_builder = new_node_list_builder();
4307 let mut rels_builder = new_edge_list_builder();
4308 let query_ctx = self.graph_ctx.query_context();
4309 let type_names_str = self.type_names.join("|");
4310 let mut path_validity = Vec::with_capacity(expansions.len());
4311
4312 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4313 if node_path.is_empty() && edge_path.is_empty() {
4314 nodes_builder.append(false);
4315 rels_builder.append(false);
4316 path_validity.push(false);
4317 continue;
4318 }
4319
4320 let skip_first_vlp_node = if let Some(existing) = existing_path {
4322 if !existing.is_null(row_out_idx) {
4323 prepend_existing_path(
4324 existing,
4325 row_out_idx,
4326 &mut nodes_builder,
4327 &mut rels_builder,
4328 &query_ctx,
4329 );
4330 true
4331 } else {
4332 false
4333 }
4334 } else {
4335 false
4336 };
4337
4338 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4340 for vid in &node_path[start_idx..] {
4341 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4342 }
4343 nodes_builder.append(true);
4344
4345 for (i, eid) in edge_path.iter().enumerate() {
4346 append_edge_to_struct(
4347 rels_builder.values(),
4348 *eid,
4349 &type_names_str,
4350 node_path[i].as_u64(),
4351 node_path[i + 1].as_u64(),
4352 &query_ctx,
4353 );
4354 }
4355 rels_builder.append(true);
4356 path_validity.push(true);
4357 }
4358
4359 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4361 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4362
4363 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4365 let rels_field = Arc::new(Field::new(
4366 "relationships",
4367 rels_array.data_type().clone(),
4368 true,
4369 ));
4370
4371 let path_struct = arrow_array::StructArray::try_new(
4373 vec![nodes_field, rels_field].into(),
4374 vec![nodes_array, rels_array],
4375 Some(arrow::buffer::NullBuffer::from(path_validity)),
4376 )
4377 .map_err(arrow_err)?;
4378
4379 if let Some(idx) = existing_path_col_idx {
4380 columns[idx] = Arc::new(path_struct);
4381 } else {
4382 columns.push(Arc::new(path_struct));
4383 }
4384 }
4385
4386 for prop_name in &self.target_properties {
4389 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4390 if batch.schema().column_with_name(&full_prop_name).is_none() {
4391 columns.push(arrow_array::new_null_array(
4392 &DataType::LargeBinary,
4393 num_rows,
4394 ));
4395 }
4396 }
4397
4398 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4399 }
4400}
4401
4402impl Stream for GraphVariableLengthTraverseMainStream {
4403 type Item = DFResult<RecordBatch>;
4404
4405 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4406 loop {
4407 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4408
4409 match state {
4410 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4411 Poll::Ready(Ok(adjacency)) => {
4412 self.state = VarLengthMainStreamState::Processing(adjacency);
4413 }
4415 Poll::Ready(Err(e)) => {
4416 self.state = VarLengthMainStreamState::Done;
4417 return Poll::Ready(Some(Err(e)));
4418 }
4419 Poll::Pending => {
4420 self.state = VarLengthMainStreamState::Loading(fut);
4421 return Poll::Pending;
4422 }
4423 },
4424 VarLengthMainStreamState::Processing(adjacency) => {
4425 match self.input.poll_next_unpin(cx) {
4426 Poll::Ready(Some(Ok(batch))) => {
4427 let base_batch = match self.process_batch(batch, &adjacency) {
4428 Ok(b) => b,
4429 Err(e) => {
4430 self.state = VarLengthMainStreamState::Processing(adjacency);
4431 return Poll::Ready(Some(Err(e)));
4432 }
4433 };
4434
4435 if self.target_properties.is_empty() {
4437 self.state = VarLengthMainStreamState::Processing(adjacency);
4438 return Poll::Ready(Some(Ok(base_batch)));
4439 }
4440
4441 let schema = self.schema.clone();
4443 let target_variable = self.target_variable.clone();
4444 let target_properties = self.target_properties.clone();
4445 let graph_ctx = self.graph_ctx.clone();
4446
4447 let fut = hydrate_vlp_target_properties(
4448 base_batch,
4449 schema,
4450 target_variable,
4451 target_properties,
4452 None, graph_ctx,
4454 );
4455
4456 self.state = VarLengthMainStreamState::Materializing {
4457 adjacency,
4458 fut: Box::pin(fut),
4459 };
4460 }
4462 Poll::Ready(Some(Err(e))) => {
4463 self.state = VarLengthMainStreamState::Done;
4464 return Poll::Ready(Some(Err(e)));
4465 }
4466 Poll::Ready(None) => {
4467 self.state = VarLengthMainStreamState::Done;
4468 return Poll::Ready(None);
4469 }
4470 Poll::Pending => {
4471 self.state = VarLengthMainStreamState::Processing(adjacency);
4472 return Poll::Pending;
4473 }
4474 }
4475 }
4476 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4477 match fut.as_mut().poll(cx) {
4478 Poll::Ready(Ok(batch)) => {
4479 self.state = VarLengthMainStreamState::Processing(adjacency);
4480 return Poll::Ready(Some(Ok(batch)));
4481 }
4482 Poll::Ready(Err(e)) => {
4483 self.state = VarLengthMainStreamState::Done;
4484 return Poll::Ready(Some(Err(e)));
4485 }
4486 Poll::Pending => {
4487 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4488 return Poll::Pending;
4489 }
4490 }
4491 }
4492 VarLengthMainStreamState::Done => {
4493 return Poll::Ready(None);
4494 }
4495 }
4496 }
4497 }
4498}
4499
4500impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4501 fn schema(&self) -> SchemaRef {
4502 self.schema.clone()
4503 }
4504}
4505
4506#[cfg(test)]
4507mod tests {
4508 use super::*;
4509
4510 #[test]
4511 fn test_traverse_schema_without_edge() {
4512 let input_schema = Arc::new(Schema::new(vec![Field::new(
4513 "a._vid",
4514 DataType::UInt64,
4515 false,
4516 )]));
4517
4518 let output_schema =
4519 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4520
4521 assert_eq!(output_schema.fields().len(), 4);
4523 assert_eq!(output_schema.field(0).name(), "a._vid");
4524 assert_eq!(output_schema.field(1).name(), "m._vid");
4525 assert_eq!(output_schema.field(2).name(), "m._labels");
4526 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4527 }
4528
4529 #[test]
4530 fn test_traverse_schema_with_edge() {
4531 let input_schema = Arc::new(Schema::new(vec![Field::new(
4532 "a._vid",
4533 DataType::UInt64,
4534 false,
4535 )]));
4536
4537 let output_schema = GraphTraverseExec::build_schema(
4538 input_schema,
4539 "m",
4540 Some("r"),
4541 &[],
4542 &[],
4543 None,
4544 None,
4545 false,
4546 );
4547
4548 assert_eq!(output_schema.fields().len(), 5);
4550 assert_eq!(output_schema.field(0).name(), "a._vid");
4551 assert_eq!(output_schema.field(1).name(), "m._vid");
4552 assert_eq!(output_schema.field(2).name(), "m._labels");
4553 assert_eq!(output_schema.field(3).name(), "r._eid");
4554 assert_eq!(output_schema.field(4).name(), "r._type");
4555 }
4556
4557 #[test]
4558 fn test_traverse_schema_with_target_properties() {
4559 let input_schema = Arc::new(Schema::new(vec![Field::new(
4560 "a._vid",
4561 DataType::UInt64,
4562 false,
4563 )]));
4564
4565 let target_props = vec!["name".to_string(), "age".to_string()];
4566 let output_schema = GraphTraverseExec::build_schema(
4567 input_schema,
4568 "m",
4569 Some("r"),
4570 &[],
4571 &target_props,
4572 None,
4573 None,
4574 false,
4575 );
4576
4577 assert_eq!(output_schema.fields().len(), 7);
4579 assert_eq!(output_schema.field(0).name(), "a._vid");
4580 assert_eq!(output_schema.field(1).name(), "m._vid");
4581 assert_eq!(output_schema.field(2).name(), "m._labels");
4582 assert_eq!(output_schema.field(3).name(), "m.name");
4583 assert_eq!(output_schema.field(4).name(), "m.age");
4584 assert_eq!(output_schema.field(5).name(), "r._eid");
4585 assert_eq!(output_schema.field(6).name(), "r._type");
4586 }
4587
4588 #[test]
4589 fn test_variable_length_schema() {
4590 let input_schema = Arc::new(Schema::new(vec![Field::new(
4591 "a._vid",
4592 DataType::UInt64,
4593 false,
4594 )]));
4595
4596 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4597 input_schema,
4598 "b",
4599 None,
4600 Some("p"),
4601 &[],
4602 None,
4603 );
4604
4605 assert_eq!(output_schema.fields().len(), 5);
4606 assert_eq!(output_schema.field(0).name(), "a._vid");
4607 assert_eq!(output_schema.field(1).name(), "b._vid");
4608 assert_eq!(output_schema.field(2).name(), "b._labels");
4609 assert_eq!(output_schema.field(3).name(), "_hop_count");
4610 assert_eq!(output_schema.field(4).name(), "p");
4611 }
4612
4613 #[test]
4614 fn test_traverse_main_schema_without_edge() {
4615 let input_schema = Arc::new(Schema::new(vec![Field::new(
4616 "a._vid",
4617 DataType::UInt64,
4618 false,
4619 )]));
4620
4621 let output_schema =
4622 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4623
4624 assert_eq!(output_schema.fields().len(), 4);
4626 assert_eq!(output_schema.field(0).name(), "a._vid");
4627 assert_eq!(output_schema.field(1).name(), "m._vid");
4628 assert_eq!(output_schema.field(2).name(), "m._labels");
4629 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4630 }
4631
4632 #[test]
4633 fn test_traverse_main_schema_with_edge() {
4634 let input_schema = Arc::new(Schema::new(vec![Field::new(
4635 "a._vid",
4636 DataType::UInt64,
4637 false,
4638 )]));
4639
4640 let output_schema = GraphTraverseMainExec::build_schema(
4641 &input_schema,
4642 "m",
4643 &Some("r".to_string()),
4644 &[],
4645 &[],
4646 false,
4647 );
4648
4649 assert_eq!(output_schema.fields().len(), 5);
4651 assert_eq!(output_schema.field(0).name(), "a._vid");
4652 assert_eq!(output_schema.field(1).name(), "m._vid");
4653 assert_eq!(output_schema.field(2).name(), "m._labels");
4654 assert_eq!(output_schema.field(3).name(), "r._eid");
4655 assert_eq!(output_schema.field(4).name(), "r._type");
4656 }
4657
4658 #[test]
4659 fn test_traverse_main_schema_with_edge_properties() {
4660 let input_schema = Arc::new(Schema::new(vec![Field::new(
4661 "a._vid",
4662 DataType::UInt64,
4663 false,
4664 )]));
4665
4666 let edge_props = vec!["weight".to_string(), "since".to_string()];
4667 let output_schema = GraphTraverseMainExec::build_schema(
4668 &input_schema,
4669 "m",
4670 &Some("r".to_string()),
4671 &edge_props,
4672 &[],
4673 false,
4674 );
4675
4676 assert_eq!(output_schema.fields().len(), 7);
4678 assert_eq!(output_schema.field(0).name(), "a._vid");
4679 assert_eq!(output_schema.field(1).name(), "m._vid");
4680 assert_eq!(output_schema.field(2).name(), "m._labels");
4681 assert_eq!(output_schema.field(3).name(), "r._eid");
4682 assert_eq!(output_schema.field(4).name(), "r._type");
4683 assert_eq!(output_schema.field(5).name(), "r.weight");
4684 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4685 assert_eq!(output_schema.field(6).name(), "r.since");
4686 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4687 }
4688
4689 #[test]
4690 fn test_traverse_main_schema_with_target_properties() {
4691 let input_schema = Arc::new(Schema::new(vec![Field::new(
4692 "a._vid",
4693 DataType::UInt64,
4694 false,
4695 )]));
4696
4697 let target_props = vec!["name".to_string(), "age".to_string()];
4698 let output_schema = GraphTraverseMainExec::build_schema(
4699 &input_schema,
4700 "m",
4701 &Some("r".to_string()),
4702 &[],
4703 &target_props,
4704 false,
4705 );
4706
4707 assert_eq!(output_schema.fields().len(), 7);
4709 assert_eq!(output_schema.field(0).name(), "a._vid");
4710 assert_eq!(output_schema.field(1).name(), "m._vid");
4711 assert_eq!(output_schema.field(2).name(), "m._labels");
4712 assert_eq!(output_schema.field(3).name(), "r._eid");
4713 assert_eq!(output_schema.field(4).name(), "r._type");
4714 assert_eq!(output_schema.field(5).name(), "m.name");
4715 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4716 assert_eq!(output_schema.field(6).name(), "m.age");
4717 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4718 }
4719}