1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68fn prepend_existing_path(
75 existing_path: &arrow_array::StructArray,
76 row_idx: usize,
77 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79 query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81 let nodes_list = existing_path
83 .column(0)
84 .as_any()
85 .downcast_ref::<arrow_array::ListArray>()
86 .unwrap();
87 let node_values = nodes_list.value(row_idx);
88 let node_struct = node_values
89 .as_any()
90 .downcast_ref::<arrow_array::StructArray>()
91 .unwrap();
92 let vid_col = node_struct
93 .column(0)
94 .as_any()
95 .downcast_ref::<UInt64Array>()
96 .unwrap();
97 for i in 0..vid_col.len() {
98 append_node_to_struct(
99 nodes_builder.values(),
100 Vid::from(vid_col.value(i)),
101 query_ctx,
102 );
103 }
104
105 let rels_list = existing_path
107 .column(1)
108 .as_any()
109 .downcast_ref::<arrow_array::ListArray>()
110 .unwrap();
111 let edge_values = rels_list.value(row_idx);
112 let edge_struct = edge_values
113 .as_any()
114 .downcast_ref::<arrow_array::StructArray>()
115 .unwrap();
116 let eid_col = edge_struct
117 .column(0)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap();
121 let type_col = edge_struct
122 .column(1)
123 .as_any()
124 .downcast_ref::<arrow_array::StringArray>()
125 .unwrap();
126 let src_col = edge_struct
127 .column(2)
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .unwrap();
131 let dst_col = edge_struct
132 .column(3)
133 .as_any()
134 .downcast_ref::<UInt64Array>()
135 .unwrap();
136 for i in 0..eid_col.len() {
137 append_edge_to_struct(
138 rels_builder.values(),
139 Eid::from(eid_col.value(i)),
140 type_col.value(i),
141 src_col.value(i),
142 dst_col.value(i),
143 query_ctx,
144 );
145 }
146}
147
148fn resolve_edge_property_type(
153 prop: &str,
154 schema_props: Option<
155 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156 >,
157) -> DataType {
158 if prop == "overflow_json" {
159 DataType::LargeBinary
160 } else {
161 schema_props
162 .and_then(|props| props.get(prop))
163 .map(|meta| meta.r#type.to_arrow())
164 .unwrap_or(DataType::LargeBinary)
165 }
166}
167
168use crate::query::df_graph::common::merged_edge_schema_props;
169
170type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
172
173pub struct GraphTraverseExec {
198 input: Arc<dyn ExecutionPlan>,
200
201 source_column: String,
203
204 edge_type_ids: Vec<u32>,
206
207 direction: Direction,
209
210 target_variable: String,
212
213 edge_variable: Option<String>,
215
216 edge_properties: Vec<String>,
218
219 target_properties: Vec<String>,
221
222 target_label_name: Option<String>,
224
225 target_label_id: Option<u16>,
227
228 graph_ctx: Arc<GraphExecutionContext>,
230
231 optional: bool,
233
234 optional_pattern_vars: HashSet<String>,
237
238 bound_target_column: Option<String>,
241
242 used_edge_columns: Vec<String>,
245
246 schema: SchemaRef,
248
249 properties: PlanProperties,
251
252 metrics: ExecutionPlanMetricsSet,
254}
255
256impl fmt::Debug for GraphTraverseExec {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("GraphTraverseExec")
259 .field("source_column", &self.source_column)
260 .field("edge_type_ids", &self.edge_type_ids)
261 .field("direction", &self.direction)
262 .field("target_variable", &self.target_variable)
263 .field("edge_variable", &self.edge_variable)
264 .finish()
265 }
266}
267
268impl GraphTraverseExec {
269 #[expect(clippy::too_many_arguments)]
285 pub fn new(
286 input: Arc<dyn ExecutionPlan>,
287 source_column: impl Into<String>,
288 edge_type_ids: Vec<u32>,
289 direction: Direction,
290 target_variable: impl Into<String>,
291 edge_variable: Option<String>,
292 edge_properties: Vec<String>,
293 target_properties: Vec<String>,
294 target_label_name: Option<String>,
295 target_label_id: Option<u16>,
296 graph_ctx: Arc<GraphExecutionContext>,
297 optional: bool,
298 optional_pattern_vars: HashSet<String>,
299 bound_target_column: Option<String>,
300 used_edge_columns: Vec<String>,
301 ) -> Self {
302 let source_column = source_column.into();
303 let target_variable = target_variable.into();
304
305 let uni_schema = graph_ctx.storage().schema_manager().schema();
307 let label_props = target_label_name
308 .as_deref()
309 .and_then(|ln| uni_schema.properties.get(ln));
310 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
311 let edge_props = if merged_edge_props.is_empty() {
312 None
313 } else {
314 Some(&merged_edge_props)
315 };
316
317 let schema = Self::build_schema(
319 input.schema(),
320 &target_variable,
321 edge_variable.as_deref(),
322 &edge_properties,
323 &target_properties,
324 label_props,
325 edge_props,
326 optional,
327 );
328
329 let properties = compute_plan_properties(schema.clone());
330
331 Self {
332 input,
333 source_column,
334 edge_type_ids,
335 direction,
336 target_variable,
337 edge_variable,
338 edge_properties,
339 target_properties,
340 target_label_name,
341 target_label_id,
342 graph_ctx,
343 optional,
344 optional_pattern_vars,
345 bound_target_column,
346 used_edge_columns,
347 schema,
348 properties,
349 metrics: ExecutionPlanMetricsSet::new(),
350 }
351 }
352
353 #[expect(
355 clippy::too_many_arguments,
356 reason = "Schema construction needs all field metadata"
357 )]
358 fn build_schema(
359 input_schema: SchemaRef,
360 target_variable: &str,
361 edge_variable: Option<&str>,
362 edge_properties: &[String],
363 target_properties: &[String],
364 label_props: Option<
365 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
366 >,
367 edge_props: Option<
368 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
369 >,
370 optional: bool,
371 ) -> SchemaRef {
372 let mut fields: Vec<Field> = input_schema
373 .fields()
374 .iter()
375 .map(|f| f.as_ref().clone())
376 .collect();
377
378 let target_vid_name = format!("{}._vid", target_variable);
380 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
381
382 fields.push(Field::new(
384 format!("{}._labels", target_variable),
385 labels_data_type(),
386 true,
387 ));
388
389 for prop_name in target_properties {
391 let col_name = format!("{}.{}", target_variable, prop_name);
392 let arrow_type = resolve_property_type(prop_name, label_props);
393 fields.push(Field::new(&col_name, arrow_type, true));
394 }
395
396 if let Some(edge_var) = edge_variable {
398 let edge_id_name = format!("{}._eid", edge_var);
399 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
400
401 fields.push(Field::new(
403 format!("{}._type", edge_var),
404 DataType::Utf8,
405 true,
406 ));
407
408 for prop_name in edge_properties {
410 let prop_col_name = format!("{}.{}", edge_var, prop_name);
411 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
412 fields.push(Field::new(&prop_col_name, arrow_type, true));
413 }
414 } else {
415 let internal_eid_name = format!("__eid_to_{}", target_variable);
418 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
419 }
420
421 Arc::new(Schema::new(fields))
422 }
423}
424
425impl DisplayAs for GraphTraverseExec {
426 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427 write!(
428 f,
429 "GraphTraverseExec: {} --[{:?}]--> {}",
430 self.source_column, self.edge_type_ids, self.target_variable
431 )?;
432 if let Some(ref edge_var) = self.edge_variable {
433 write!(f, " as {}", edge_var)?;
434 }
435 Ok(())
436 }
437}
438
439impl ExecutionPlan for GraphTraverseExec {
440 fn name(&self) -> &str {
441 "GraphTraverseExec"
442 }
443
444 fn as_any(&self) -> &dyn Any {
445 self
446 }
447
448 fn schema(&self) -> SchemaRef {
449 self.schema.clone()
450 }
451
452 fn properties(&self) -> &PlanProperties {
453 &self.properties
454 }
455
456 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
457 vec![&self.input]
458 }
459
460 fn with_new_children(
461 self: Arc<Self>,
462 children: Vec<Arc<dyn ExecutionPlan>>,
463 ) -> DFResult<Arc<dyn ExecutionPlan>> {
464 if children.len() != 1 {
465 return Err(datafusion::error::DataFusionError::Plan(
466 "GraphTraverseExec requires exactly one child".to_string(),
467 ));
468 }
469
470 Ok(Arc::new(Self::new(
471 children[0].clone(),
472 self.source_column.clone(),
473 self.edge_type_ids.clone(),
474 self.direction,
475 self.target_variable.clone(),
476 self.edge_variable.clone(),
477 self.edge_properties.clone(),
478 self.target_properties.clone(),
479 self.target_label_name.clone(),
480 self.target_label_id,
481 self.graph_ctx.clone(),
482 self.optional,
483 self.optional_pattern_vars.clone(),
484 self.bound_target_column.clone(),
485 self.used_edge_columns.clone(),
486 )))
487 }
488
489 fn execute(
490 &self,
491 partition: usize,
492 context: Arc<TaskContext>,
493 ) -> DFResult<SendableRecordBatchStream> {
494 let input_stream = self.input.execute(partition, context)?;
495
496 let metrics = BaselineMetrics::new(&self.metrics, partition);
497
498 let warm_fut = self
499 .graph_ctx
500 .warming_future(self.edge_type_ids.clone(), self.direction);
501
502 Ok(Box::pin(GraphTraverseStream {
503 input: input_stream,
504 source_column: self.source_column.clone(),
505 edge_type_ids: self.edge_type_ids.clone(),
506 direction: self.direction,
507 target_variable: self.target_variable.clone(),
508 edge_variable: self.edge_variable.clone(),
509 edge_properties: self.edge_properties.clone(),
510 target_properties: self.target_properties.clone(),
511 target_label_name: self.target_label_name.clone(),
512 graph_ctx: self.graph_ctx.clone(),
513 optional: self.optional,
514 optional_pattern_vars: self.optional_pattern_vars.clone(),
515 bound_target_column: self.bound_target_column.clone(),
516 used_edge_columns: self.used_edge_columns.clone(),
517 schema: self.schema.clone(),
518 state: TraverseStreamState::Warming(warm_fut),
519 metrics,
520 }))
521 }
522
523 fn metrics(&self) -> Option<MetricsSet> {
524 Some(self.metrics.clone_inner())
525 }
526}
527
528enum TraverseStreamState {
530 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
532 Reading,
534 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
536 Done,
538}
539
540struct GraphTraverseStream {
542 input: SendableRecordBatchStream,
544
545 source_column: String,
547
548 edge_type_ids: Vec<u32>,
550
551 direction: Direction,
553
554 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
556 target_variable: String,
557
558 edge_variable: Option<String>,
560
561 edge_properties: Vec<String>,
563
564 target_properties: Vec<String>,
566
567 target_label_name: Option<String>,
569
570 graph_ctx: Arc<GraphExecutionContext>,
572
573 optional: bool,
575
576 optional_pattern_vars: HashSet<String>,
578
579 bound_target_column: Option<String>,
581
582 used_edge_columns: Vec<String>,
584
585 schema: SchemaRef,
587
588 state: TraverseStreamState,
590
591 metrics: BaselineMetrics,
593}
594
595impl GraphTraverseStream {
596 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
599 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
600 datafusion::error::DataFusionError::Execution(format!(
601 "Source column '{}' not found",
602 self.source_column
603 ))
604 })?;
605
606 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
607 let source_vids: &UInt64Array = &source_vid_cow;
608
609 let bound_target_cow = self
612 .bound_target_column
613 .as_ref()
614 .and_then(|col| batch.column_by_name(col))
615 .map(|c| column_as_vid_array(c.as_ref()))
616 .transpose()?;
617 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
618
619 let used_edge_arrays: Vec<&UInt64Array> = self
621 .used_edge_columns
622 .iter()
623 .filter_map(|col| {
624 batch
625 .column_by_name(col)
626 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
627 })
628 .collect();
629
630 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
631 let is_undirected = matches!(self.direction, Direction::Both);
632
633 for (row_idx, source_vid) in source_vids.iter().enumerate() {
634 let Some(src) = source_vid else {
635 continue;
636 };
637
638 let expected_target = bound_target_vids.map(|arr| {
644 if arr.is_null(row_idx) {
645 None
646 } else {
647 Some(arr.value(row_idx))
648 }
649 });
650
651 let used_eids: HashSet<u64> = used_edge_arrays
653 .iter()
654 .filter_map(|arr| {
655 if arr.is_null(row_idx) {
656 None
657 } else {
658 Some(arr.value(row_idx))
659 }
660 })
661 .collect();
662
663 let vid = Vid::from(src);
664 let mut seen_edges: HashSet<u64> = HashSet::new();
667
668 for &edge_type in &self.edge_type_ids {
669 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
670
671 for (target_vid, eid) in neighbors {
672 let eid_u64 = eid.as_u64();
673
674 if used_eids.contains(&eid_u64) {
676 continue;
677 }
678
679 if is_undirected && !seen_edges.insert(eid_u64) {
681 continue;
682 }
683
684 if let Some(expected_opt) = expected_target {
687 let Some(expected) = expected_opt else {
688 continue;
689 };
690 if target_vid.as_u64() != expected {
691 continue;
692 }
693 }
694
695 if let Some(ref label_name) = self.target_label_name {
698 let query_ctx = self.graph_ctx.query_context();
699 if let Some(vertex_labels) =
700 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
701 {
702 if !vertex_labels.contains(label_name) {
704 continue;
705 }
706 }
707 }
709
710 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
711 }
712 }
713 }
714
715 Ok(expanded_rows)
716 }
717}
718
719fn build_target_labels_column(
721 target_vids: &[Vid],
722 target_label_name: &Option<String>,
723 graph_ctx: &GraphExecutionContext,
724) -> ArrayRef {
725 use arrow_array::builder::{ListBuilder, StringBuilder};
726 let mut labels_builder = ListBuilder::new(StringBuilder::new());
727 let query_ctx = graph_ctx.query_context();
728 for vid in target_vids {
729 let row_labels: Vec<String> =
730 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
731 Some(labels) => labels,
732 None => {
733 if let Some(label_name) = target_label_name {
735 vec![label_name.clone()]
736 } else {
737 vec![]
738 }
739 }
740 };
741 let values = labels_builder.values();
742 for lbl in &row_labels {
743 values.append_value(lbl);
744 }
745 labels_builder.append(true);
746 }
747 Arc::new(labels_builder.finish())
748}
749
750async fn build_target_property_columns(
752 target_vids: &[Vid],
753 target_properties: &[String],
754 target_label_name: &Option<String>,
755 graph_ctx: &Arc<GraphExecutionContext>,
756) -> DFResult<Vec<ArrayRef>> {
757 let mut columns = Vec::new();
758
759 if let Some(label_name) = target_label_name {
760 let property_manager = graph_ctx.property_manager();
761 let query_ctx = graph_ctx.query_context();
762
763 let props_map = property_manager
764 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
765 .await
766 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
767
768 let uni_schema = graph_ctx.storage().schema_manager().schema();
769 let label_props = uni_schema.properties.get(label_name.as_str());
770
771 for prop_name in target_properties {
772 let data_type = resolve_property_type(prop_name, label_props);
773 let column =
774 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
775 columns.push(column);
776 }
777 } else {
778 let non_internal_props: Vec<&str> = target_properties
780 .iter()
781 .filter(|p| *p != "_all_props")
782 .map(|s| s.as_str())
783 .collect();
784 let property_manager = graph_ctx.property_manager();
785 let query_ctx = graph_ctx.query_context();
786
787 let props_map = if !non_internal_props.is_empty() {
788 property_manager
789 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
790 .await
791 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
792 } else {
793 std::collections::HashMap::new()
794 };
795
796 for prop_name in target_properties {
797 if prop_name == "_all_props" {
798 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
799 } else {
800 let column = build_property_column_static(
801 target_vids,
802 &props_map,
803 prop_name,
804 &arrow::datatypes::DataType::LargeBinary,
805 )?;
806 columns.push(column);
807 }
808 }
809 }
810
811 Ok(columns)
812}
813
814fn build_all_props_column(
816 target_vids: &[Vid],
817 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
818 graph_ctx: &Arc<GraphExecutionContext>,
819) -> ArrayRef {
820 use crate::query::df_graph::scan::encode_cypher_value;
821 use arrow_array::builder::LargeBinaryBuilder;
822
823 let mut builder = LargeBinaryBuilder::new();
824 let l0_ctx = graph_ctx.l0_context();
825 for vid in target_vids {
826 let mut merged_props = serde_json::Map::new();
827 if let Some(vid_props) = props_map.get(vid) {
828 for (k, v) in vid_props.iter() {
829 let json_val: serde_json::Value = v.clone().into();
830 merged_props.insert(k.to_string(), json_val);
831 }
832 }
833 for l0 in l0_ctx.iter_l0_buffers() {
834 let guard = l0.read();
835 if let Some(l0_props) = guard.vertex_properties.get(vid) {
836 for (k, v) in l0_props.iter() {
837 let json_val: serde_json::Value = v.clone().into();
838 merged_props.insert(k.to_string(), json_val);
839 }
840 }
841 }
842 if merged_props.is_empty() {
843 builder.append_null();
844 } else {
845 let json = serde_json::Value::Object(merged_props);
846 match encode_cypher_value(&json) {
847 Ok(bytes) => builder.append_value(bytes),
848 Err(_) => builder.append_null(),
849 }
850 }
851 }
852 Arc::new(builder.finish())
853}
854
855async fn build_edge_columns(
857 expansions: &[(usize, Vid, u64, u32)],
858 edge_properties: &[String],
859 edge_type_ids: &[u32],
860 graph_ctx: &Arc<GraphExecutionContext>,
861) -> DFResult<Vec<ArrayRef>> {
862 let mut columns = Vec::new();
863
864 let eids: Vec<Eid> = expansions
865 .iter()
866 .map(|(_, _, eid, _)| Eid::from(*eid))
867 .collect();
868 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
869 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
870
871 {
873 let uni_schema = graph_ctx.storage().schema_manager().schema();
874 let mut type_builder = arrow_array::builder::StringBuilder::new();
875 for (_, _, _, edge_type_id) in expansions {
876 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
877 type_builder.append_value(&name);
878 } else {
879 type_builder.append_null();
880 }
881 }
882 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
883 }
884
885 if !edge_properties.is_empty() {
886 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
887 let property_manager = graph_ctx.property_manager();
888 let query_ctx = graph_ctx.query_context();
889
890 let props_map = property_manager
891 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
892 .await
893 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
894
895 let uni_schema = graph_ctx.storage().schema_manager().schema();
896 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
897 let edge_type_props = if merged_edge_props.is_empty() {
898 None
899 } else {
900 Some(&merged_edge_props)
901 };
902
903 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
904
905 for prop_name in edge_properties {
906 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
907 let column =
908 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
909 columns.push(column);
910 }
911 }
912
913 Ok(columns)
914}
915
916#[expect(
921 clippy::too_many_arguments,
922 reason = "Standalone async fn needs all context passed explicitly"
923)]
924async fn build_traverse_output_batch(
925 input: RecordBatch,
926 expansions: Vec<(usize, Vid, u64, u32)>,
927 schema: SchemaRef,
928 edge_variable: Option<String>,
929 edge_properties: Vec<String>,
930 edge_type_ids: Vec<u32>,
931 target_properties: Vec<String>,
932 target_label_name: Option<String>,
933 graph_ctx: Arc<GraphExecutionContext>,
934 optional: bool,
935 optional_pattern_vars: HashSet<String>,
936) -> DFResult<RecordBatch> {
937 if expansions.is_empty() {
938 if !optional {
939 return Ok(RecordBatch::new_empty(schema));
940 }
941 let unmatched_reps = collect_unmatched_optional_group_rows(
942 &input,
943 &HashSet::new(),
944 &schema,
945 &optional_pattern_vars,
946 )?;
947 if unmatched_reps.is_empty() {
948 return Ok(RecordBatch::new_empty(schema));
949 }
950 return build_optional_null_batch_for_rows_with_optional_vars(
951 &input,
952 &unmatched_reps,
953 &schema,
954 &optional_pattern_vars,
955 );
956 }
957
958 let indices: Vec<u64> = expansions
960 .iter()
961 .map(|(idx, _, _, _)| *idx as u64)
962 .collect();
963 let indices_array = UInt64Array::from(indices);
964 let mut columns: Vec<ArrayRef> = input
965 .columns()
966 .iter()
967 .map(|col| take(col.as_ref(), &indices_array, None))
968 .collect::<Result<_, _>>()?;
969
970 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
972 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
973 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
974
975 columns.push(build_target_labels_column(
977 &target_vids,
978 &target_label_name,
979 &graph_ctx,
980 ));
981
982 if !target_properties.is_empty() {
984 let prop_cols = build_target_property_columns(
985 &target_vids,
986 &target_properties,
987 &target_label_name,
988 &graph_ctx,
989 )
990 .await?;
991 columns.extend(prop_cols);
992 }
993
994 if edge_variable.is_some() {
996 let edge_cols =
997 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
998 columns.extend(edge_cols);
999 } else {
1000 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1001 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1002 }
1003
1004 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1005
1006 if optional {
1008 let matched_indices: HashSet<usize> =
1009 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1010 let unmatched = collect_unmatched_optional_group_rows(
1011 &input,
1012 &matched_indices,
1013 &schema,
1014 &optional_pattern_vars,
1015 )?;
1016
1017 if !unmatched.is_empty() {
1018 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1019 &input,
1020 &unmatched,
1021 &schema,
1022 &optional_pattern_vars,
1023 )?;
1024 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1025 .map_err(arrow_err)?;
1026 return Ok(combined);
1027 }
1028 }
1029
1030 Ok(expanded_batch)
1031}
1032
1033fn build_optional_null_batch_for_rows(
1036 input: &RecordBatch,
1037 unmatched_indices: &[usize],
1038 schema: &SchemaRef,
1039) -> DFResult<RecordBatch> {
1040 let num_rows = unmatched_indices.len();
1041 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1042 let indices_array = UInt64Array::from(indices);
1043
1044 let mut columns: Vec<ArrayRef> = Vec::new();
1046 for col in input.columns() {
1047 let taken = take(col.as_ref(), &indices_array, None)?;
1048 columns.push(taken);
1049 }
1050 for field in schema.fields().iter().skip(input.num_columns()) {
1052 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1053 }
1054 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1055}
1056
1057fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1058 optional_vars.contains(col_name)
1059 || optional_vars.iter().any(|var| {
1060 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1061 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1062 })
1063}
1064
1065fn collect_unmatched_optional_group_rows(
1066 input: &RecordBatch,
1067 matched_indices: &HashSet<usize>,
1068 schema: &SchemaRef,
1069 optional_vars: &HashSet<String>,
1070) -> DFResult<Vec<usize>> {
1071 if input.num_rows() == 0 {
1072 return Ok(Vec::new());
1073 }
1074
1075 if optional_vars.is_empty() {
1076 return Ok((0..input.num_rows())
1077 .filter(|idx| !matched_indices.contains(idx))
1078 .collect());
1079 }
1080
1081 let source_vid_indices: Vec<usize> = schema
1082 .fields()
1083 .iter()
1084 .enumerate()
1085 .filter_map(|(idx, field)| {
1086 if idx >= input.num_columns() {
1087 return None;
1088 }
1089 let name = field.name();
1090 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1091 Some(idx)
1092 } else {
1093 None
1094 }
1095 })
1096 .collect();
1097
1098 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1101
1102 for row_idx in 0..input.num_rows() {
1103 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1104 let entry = groups.entry(key.clone());
1105 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1106 group_order.push(key.clone());
1107 }
1108 let matched = matched_indices.contains(&row_idx);
1109 entry
1110 .and_modify(|(_, any_matched)| *any_matched |= matched)
1111 .or_insert((row_idx, matched));
1112 }
1113
1114 Ok(group_order
1115 .into_iter()
1116 .filter_map(|key| {
1117 groups
1118 .get(&key)
1119 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1120 })
1121 .collect())
1122}
1123
1124fn compute_optional_group_key(
1125 batch: &RecordBatch,
1126 row_idx: usize,
1127 source_vid_indices: &[usize],
1128) -> DFResult<Vec<u8>> {
1129 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1130 for &col_idx in source_vid_indices {
1131 let col = batch.column(col_idx);
1132 let vid_cow = column_as_vid_array(col.as_ref())?;
1133 let arr: &UInt64Array = &vid_cow;
1134 if arr.is_null(row_idx) {
1135 key.extend_from_slice(&u64::MAX.to_le_bytes());
1136 } else {
1137 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1138 }
1139 }
1140 Ok(key)
1141}
1142
1143fn build_optional_null_batch_for_rows_with_optional_vars(
1144 input: &RecordBatch,
1145 unmatched_indices: &[usize],
1146 schema: &SchemaRef,
1147 optional_vars: &HashSet<String>,
1148) -> DFResult<RecordBatch> {
1149 if optional_vars.is_empty() {
1150 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1151 }
1152
1153 let num_rows = unmatched_indices.len();
1154 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1155 let indices_array = UInt64Array::from(indices);
1156
1157 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1158 for (col_idx, field) in schema.fields().iter().enumerate() {
1159 if col_idx < input.num_columns() {
1160 if is_optional_column_for_vars(field.name(), optional_vars) {
1161 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1162 } else {
1163 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1164 columns.push(taken);
1165 }
1166 } else {
1167 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1168 }
1169 }
1170
1171 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1172}
1173
1174impl Stream for GraphTraverseStream {
1175 type Item = DFResult<RecordBatch>;
1176
1177 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1178 loop {
1179 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1180
1181 match state {
1182 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1183 Poll::Ready(Ok(())) => {
1184 self.state = TraverseStreamState::Reading;
1185 }
1187 Poll::Ready(Err(e)) => {
1188 self.state = TraverseStreamState::Done;
1189 return Poll::Ready(Some(Err(e)));
1190 }
1191 Poll::Pending => {
1192 self.state = TraverseStreamState::Warming(fut);
1193 return Poll::Pending;
1194 }
1195 },
1196 TraverseStreamState::Reading => {
1197 if let Err(e) = self.graph_ctx.check_timeout() {
1199 return Poll::Ready(Some(Err(
1200 datafusion::error::DataFusionError::Execution(e.to_string()),
1201 )));
1202 }
1203
1204 match self.input.poll_next_unpin(cx) {
1205 Poll::Ready(Some(Ok(batch))) => {
1206 let expansions = match self.expand_neighbors(&batch) {
1208 Ok(exp) => exp,
1209 Err(e) => {
1210 self.state = TraverseStreamState::Reading;
1211 return Poll::Ready(Some(Err(e)));
1212 }
1213 };
1214
1215 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1217 {
1218 let result = build_traverse_output_batch_sync(
1219 &batch,
1220 &expansions,
1221 &self.schema,
1222 self.edge_variable.as_ref(),
1223 &self.graph_ctx,
1224 self.optional,
1225 &self.optional_pattern_vars,
1226 );
1227 self.state = TraverseStreamState::Reading;
1228 if let Ok(ref r) = result {
1229 self.metrics.record_output(r.num_rows());
1230 }
1231 return Poll::Ready(Some(result));
1232 }
1233
1234 let schema = self.schema.clone();
1236 let edge_variable = self.edge_variable.clone();
1237 let edge_properties = self.edge_properties.clone();
1238 let edge_type_ids = self.edge_type_ids.clone();
1239 let target_properties = self.target_properties.clone();
1240 let target_label_name = self.target_label_name.clone();
1241 let graph_ctx = self.graph_ctx.clone();
1242
1243 let optional = self.optional;
1244 let optional_pattern_vars = self.optional_pattern_vars.clone();
1245
1246 let fut = build_traverse_output_batch(
1247 batch,
1248 expansions,
1249 schema,
1250 edge_variable,
1251 edge_properties,
1252 edge_type_ids,
1253 target_properties,
1254 target_label_name,
1255 graph_ctx,
1256 optional,
1257 optional_pattern_vars,
1258 );
1259
1260 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1261 }
1263 Poll::Ready(Some(Err(e))) => {
1264 self.state = TraverseStreamState::Done;
1265 return Poll::Ready(Some(Err(e)));
1266 }
1267 Poll::Ready(None) => {
1268 self.state = TraverseStreamState::Done;
1269 return Poll::Ready(None);
1270 }
1271 Poll::Pending => {
1272 self.state = TraverseStreamState::Reading;
1273 return Poll::Pending;
1274 }
1275 }
1276 }
1277 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1278 Poll::Ready(Ok(batch)) => {
1279 self.state = TraverseStreamState::Reading;
1280 self.metrics.record_output(batch.num_rows());
1281 return Poll::Ready(Some(Ok(batch)));
1282 }
1283 Poll::Ready(Err(e)) => {
1284 self.state = TraverseStreamState::Done;
1285 return Poll::Ready(Some(Err(e)));
1286 }
1287 Poll::Pending => {
1288 self.state = TraverseStreamState::Materializing(fut);
1289 return Poll::Pending;
1290 }
1291 },
1292 TraverseStreamState::Done => {
1293 return Poll::Ready(None);
1294 }
1295 }
1296 }
1297 }
1298}
1299
1300fn build_traverse_output_batch_sync(
1305 input: &RecordBatch,
1306 expansions: &[(usize, Vid, u64, u32)],
1307 schema: &SchemaRef,
1308 edge_variable: Option<&String>,
1309 graph_ctx: &GraphExecutionContext,
1310 optional: bool,
1311 optional_pattern_vars: &HashSet<String>,
1312) -> DFResult<RecordBatch> {
1313 if expansions.is_empty() {
1314 if !optional {
1315 return Ok(RecordBatch::new_empty(schema.clone()));
1316 }
1317 let unmatched_reps = collect_unmatched_optional_group_rows(
1318 input,
1319 &HashSet::new(),
1320 schema,
1321 optional_pattern_vars,
1322 )?;
1323 if unmatched_reps.is_empty() {
1324 return Ok(RecordBatch::new_empty(schema.clone()));
1325 }
1326 return build_optional_null_batch_for_rows_with_optional_vars(
1327 input,
1328 &unmatched_reps,
1329 schema,
1330 optional_pattern_vars,
1331 );
1332 }
1333
1334 let indices: Vec<u64> = expansions
1335 .iter()
1336 .map(|(idx, _, _, _)| *idx as u64)
1337 .collect();
1338 let indices_array = UInt64Array::from(indices);
1339
1340 let mut columns: Vec<ArrayRef> = Vec::new();
1341 for col in input.columns() {
1342 let expanded = take(col.as_ref(), &indices_array, None)?;
1343 columns.push(expanded);
1344 }
1345
1346 let target_vids: Vec<u64> = expansions
1348 .iter()
1349 .map(|(_, vid, _, _)| vid.as_u64())
1350 .collect();
1351 columns.push(Arc::new(UInt64Array::from(target_vids)));
1352
1353 {
1355 use arrow_array::builder::{ListBuilder, StringBuilder};
1356 let l0_ctx = graph_ctx.l0_context();
1357 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1358 for (_, vid, _, _) in expansions {
1359 let mut row_labels: Vec<String> = Vec::new();
1360 for l0 in l0_ctx.iter_l0_buffers() {
1361 let guard = l0.read();
1362 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1363 for lbl in l0_labels {
1364 if !row_labels.contains(lbl) {
1365 row_labels.push(lbl.clone());
1366 }
1367 }
1368 }
1369 }
1370 let values = labels_builder.values();
1371 for lbl in &row_labels {
1372 values.append_value(lbl);
1373 }
1374 labels_builder.append(true);
1375 }
1376 columns.push(Arc::new(labels_builder.finish()));
1377 }
1378
1379 if edge_variable.is_some() {
1381 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1382 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1383
1384 let uni_schema = graph_ctx.storage().schema_manager().schema();
1386 let mut type_builder = arrow_array::builder::StringBuilder::new();
1387 for (_, _, _, edge_type_id) in expansions {
1388 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1389 type_builder.append_value(&name);
1390 } else {
1391 type_builder.append_null();
1392 }
1393 }
1394 columns.push(Arc::new(type_builder.finish()));
1395 } else {
1396 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1398 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1399 }
1400
1401 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1402
1403 if optional {
1404 let matched_indices: HashSet<usize> =
1405 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1406 let unmatched = collect_unmatched_optional_group_rows(
1407 input,
1408 &matched_indices,
1409 schema,
1410 optional_pattern_vars,
1411 )?;
1412
1413 if !unmatched.is_empty() {
1414 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1415 input,
1416 &unmatched,
1417 schema,
1418 optional_pattern_vars,
1419 )?;
1420 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1421 .map_err(|e| {
1422 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1423 })?;
1424 return Ok(combined);
1425 }
1426 }
1427
1428 Ok(expanded_batch)
1429}
1430
1431impl RecordBatchStream for GraphTraverseStream {
1432 fn schema(&self) -> SchemaRef {
1433 self.schema.clone()
1434 }
1435}
1436
1437type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1439
1440pub struct GraphTraverseMainExec {
1463 input: Arc<dyn ExecutionPlan>,
1465
1466 source_column: String,
1468
1469 type_names: Vec<String>,
1472
1473 direction: Direction,
1475
1476 target_variable: String,
1478
1479 edge_variable: Option<String>,
1481
1482 edge_properties: Vec<String>,
1484
1485 target_properties: Vec<String>,
1487
1488 graph_ctx: Arc<GraphExecutionContext>,
1490
1491 optional: bool,
1493
1494 optional_pattern_vars: HashSet<String>,
1496
1497 bound_target_column: Option<String>,
1500
1501 used_edge_columns: Vec<String>,
1504
1505 schema: SchemaRef,
1507
1508 properties: PlanProperties,
1510
1511 metrics: ExecutionPlanMetricsSet,
1513}
1514
1515impl fmt::Debug for GraphTraverseMainExec {
1516 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1517 f.debug_struct("GraphTraverseMainExec")
1518 .field("type_names", &self.type_names)
1519 .field("direction", &self.direction)
1520 .field("target_variable", &self.target_variable)
1521 .field("edge_variable", &self.edge_variable)
1522 .finish()
1523 }
1524}
1525
1526impl GraphTraverseMainExec {
1527 #[expect(clippy::too_many_arguments)]
1529 pub fn new(
1530 input: Arc<dyn ExecutionPlan>,
1531 source_column: impl Into<String>,
1532 type_names: Vec<String>,
1533 direction: Direction,
1534 target_variable: impl Into<String>,
1535 edge_variable: Option<String>,
1536 edge_properties: Vec<String>,
1537 target_properties: Vec<String>,
1538 graph_ctx: Arc<GraphExecutionContext>,
1539 optional: bool,
1540 optional_pattern_vars: HashSet<String>,
1541 bound_target_column: Option<String>,
1542 used_edge_columns: Vec<String>,
1543 ) -> Self {
1544 let source_column = source_column.into();
1545 let target_variable = target_variable.into();
1546
1547 let schema = Self::build_schema(
1549 &input.schema(),
1550 &target_variable,
1551 &edge_variable,
1552 &edge_properties,
1553 &target_properties,
1554 optional,
1555 );
1556
1557 let properties = compute_plan_properties(schema.clone());
1558
1559 Self {
1560 input,
1561 source_column,
1562 type_names,
1563 direction,
1564 target_variable,
1565 edge_variable,
1566 edge_properties,
1567 target_properties,
1568 graph_ctx,
1569 optional,
1570 optional_pattern_vars,
1571 bound_target_column,
1572 used_edge_columns,
1573 schema,
1574 properties,
1575 metrics: ExecutionPlanMetricsSet::new(),
1576 }
1577 }
1578
1579 fn build_schema(
1581 input_schema: &SchemaRef,
1582 target_variable: &str,
1583 edge_variable: &Option<String>,
1584 edge_properties: &[String],
1585 target_properties: &[String],
1586 optional: bool,
1587 ) -> SchemaRef {
1588 let mut fields: Vec<Field> = input_schema
1589 .fields()
1590 .iter()
1591 .map(|f| f.as_ref().clone())
1592 .collect();
1593
1594 let target_vid_name = format!("{}._vid", target_variable);
1596 if input_schema.column_with_name(&target_vid_name).is_none() {
1597 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1598 }
1599
1600 let target_labels_name = format!("{}._labels", target_variable);
1602 if input_schema.column_with_name(&target_labels_name).is_none() {
1603 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1604 }
1605
1606 if let Some(edge_var) = edge_variable {
1608 fields.push(Field::new(
1609 format!("{}._eid", edge_var),
1610 DataType::UInt64,
1611 optional,
1612 ));
1613
1614 fields.push(Field::new(
1616 format!("{}._type", edge_var),
1617 DataType::Utf8,
1618 true,
1619 ));
1620
1621 for prop in edge_properties {
1625 let col_name = format!("{}.{}", edge_var, prop);
1626 let mut metadata = std::collections::HashMap::new();
1627 metadata.insert("cv_encoded".to_string(), "true".to_string());
1628 fields.push(
1629 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1630 );
1631 }
1632 } else {
1633 fields.push(Field::new(
1636 format!("__eid_to_{}", target_variable),
1637 DataType::UInt64,
1638 optional,
1639 ));
1640 }
1641
1642 for prop in target_properties {
1644 fields.push(Field::new(
1645 format!("{}.{}", target_variable, prop),
1646 DataType::LargeBinary,
1647 true,
1648 ));
1649 }
1650
1651 Arc::new(Schema::new(fields))
1652 }
1653}
1654
1655impl DisplayAs for GraphTraverseMainExec {
1656 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1657 write!(
1658 f,
1659 "GraphTraverseMainExec: types={:?}, direction={:?}",
1660 self.type_names, self.direction
1661 )
1662 }
1663}
1664
1665impl ExecutionPlan for GraphTraverseMainExec {
1666 fn name(&self) -> &str {
1667 "GraphTraverseMainExec"
1668 }
1669
1670 fn as_any(&self) -> &dyn Any {
1671 self
1672 }
1673
1674 fn schema(&self) -> SchemaRef {
1675 self.schema.clone()
1676 }
1677
1678 fn properties(&self) -> &PlanProperties {
1679 &self.properties
1680 }
1681
1682 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1683 vec![&self.input]
1684 }
1685
1686 fn with_new_children(
1687 self: Arc<Self>,
1688 children: Vec<Arc<dyn ExecutionPlan>>,
1689 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1690 if children.len() != 1 {
1691 return Err(datafusion::error::DataFusionError::Plan(
1692 "GraphTraverseMainExec expects exactly one child".to_string(),
1693 ));
1694 }
1695
1696 Ok(Arc::new(Self {
1697 input: children[0].clone(),
1698 source_column: self.source_column.clone(),
1699 type_names: self.type_names.clone(),
1700 direction: self.direction,
1701 target_variable: self.target_variable.clone(),
1702 edge_variable: self.edge_variable.clone(),
1703 edge_properties: self.edge_properties.clone(),
1704 target_properties: self.target_properties.clone(),
1705 graph_ctx: self.graph_ctx.clone(),
1706 optional: self.optional,
1707 optional_pattern_vars: self.optional_pattern_vars.clone(),
1708 bound_target_column: self.bound_target_column.clone(),
1709 used_edge_columns: self.used_edge_columns.clone(),
1710 schema: self.schema.clone(),
1711 properties: self.properties.clone(),
1712 metrics: self.metrics.clone(),
1713 }))
1714 }
1715
1716 fn execute(
1717 &self,
1718 partition: usize,
1719 context: Arc<TaskContext>,
1720 ) -> DFResult<SendableRecordBatchStream> {
1721 let input_stream = self.input.execute(partition, context)?;
1722 let metrics = BaselineMetrics::new(&self.metrics, partition);
1723
1724 Ok(Box::pin(GraphTraverseMainStream::new(
1725 input_stream,
1726 self.source_column.clone(),
1727 self.type_names.clone(),
1728 self.direction,
1729 self.target_variable.clone(),
1730 self.edge_variable.clone(),
1731 self.edge_properties.clone(),
1732 self.target_properties.clone(),
1733 self.graph_ctx.clone(),
1734 self.optional,
1735 self.optional_pattern_vars.clone(),
1736 self.bound_target_column.clone(),
1737 self.used_edge_columns.clone(),
1738 self.schema.clone(),
1739 metrics,
1740 )))
1741 }
1742
1743 fn metrics(&self) -> Option<MetricsSet> {
1744 Some(self.metrics.clone_inner())
1745 }
1746}
1747
1748enum GraphTraverseMainState {
1750 LoadingEdges {
1752 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1753 input_stream: SendableRecordBatchStream,
1754 },
1755 Processing {
1757 adjacency: EdgeAdjacencyMap,
1758 input_stream: SendableRecordBatchStream,
1759 },
1760 Done,
1762}
1763
1764struct GraphTraverseMainStream {
1766 source_column: String,
1768
1769 target_variable: String,
1771
1772 edge_variable: Option<String>,
1774
1775 edge_properties: Vec<String>,
1777
1778 target_properties: Vec<String>,
1780
1781 graph_ctx: Arc<GraphExecutionContext>,
1783
1784 optional: bool,
1786
1787 optional_pattern_vars: HashSet<String>,
1789
1790 bound_target_column: Option<String>,
1792
1793 used_edge_columns: Vec<String>,
1795
1796 schema: SchemaRef,
1798
1799 state: GraphTraverseMainState,
1801
1802 metrics: BaselineMetrics,
1804}
1805
1806impl GraphTraverseMainStream {
1807 #[expect(clippy::too_many_arguments)]
1809 fn new(
1810 input_stream: SendableRecordBatchStream,
1811 source_column: String,
1812 type_names: Vec<String>,
1813 direction: Direction,
1814 target_variable: String,
1815 edge_variable: Option<String>,
1816 edge_properties: Vec<String>,
1817 target_properties: Vec<String>,
1818 graph_ctx: Arc<GraphExecutionContext>,
1819 optional: bool,
1820 optional_pattern_vars: HashSet<String>,
1821 bound_target_column: Option<String>,
1822 used_edge_columns: Vec<String>,
1823 schema: SchemaRef,
1824 metrics: BaselineMetrics,
1825 ) -> Self {
1826 let loading_ctx = graph_ctx.clone();
1828 let loading_types = type_names.clone();
1829 let fut =
1830 async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1831
1832 Self {
1833 source_column,
1834 target_variable,
1835 edge_variable,
1836 edge_properties,
1837 target_properties,
1838 graph_ctx,
1839 optional,
1840 optional_pattern_vars,
1841 bound_target_column,
1842 used_edge_columns,
1843 schema,
1844 state: GraphTraverseMainState::LoadingEdges {
1845 future: Box::pin(fut),
1846 input_stream,
1847 },
1848 metrics,
1849 }
1850 }
1851
1852 fn expand_batch(
1854 &self,
1855 input: &RecordBatch,
1856 adjacency: &EdgeAdjacencyMap,
1857 ) -> DFResult<RecordBatch> {
1858 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1860 datafusion::error::DataFusionError::Execution(format!(
1861 "Source column {} not found",
1862 self.source_column
1863 ))
1864 })?;
1865
1866 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1867 let source_vids: &UInt64Array = &source_vid_cow;
1868
1869 let bound_target_cow = self
1871 .bound_target_column
1872 .as_ref()
1873 .and_then(|col| input.column_by_name(col))
1874 .map(|c| column_as_vid_array(c.as_ref()))
1875 .transpose()?;
1876 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1877
1878 let used_edge_arrays: Vec<&UInt64Array> = self
1880 .used_edge_columns
1881 .iter()
1882 .filter_map(|col| {
1883 input
1884 .column_by_name(col)
1885 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1886 })
1887 .collect();
1888
1889 type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1891 let mut expansions: Vec<Expansion> = Vec::new();
1892
1893 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1894 if let Some(src_u64) = src_u64 {
1895 let src_vid = Vid::from(src_u64);
1896
1897 let used_eids: HashSet<u64> = used_edge_arrays
1899 .iter()
1900 .filter_map(|arr| {
1901 if arr.is_null(row_idx) {
1902 None
1903 } else {
1904 Some(arr.value(row_idx))
1905 }
1906 })
1907 .collect();
1908
1909 if let Some(neighbors) = adjacency.get(&src_vid) {
1910 for (target_vid, eid, edge_type, props) in neighbors {
1911 if used_eids.contains(&eid.as_u64()) {
1913 continue;
1914 }
1915
1916 if let Some(targets) = expected_targets {
1919 if targets.is_null(row_idx) {
1920 continue;
1921 }
1922 let expected_vid = targets.value(row_idx);
1923 if target_vid.as_u64() != expected_vid {
1924 continue;
1925 }
1926 }
1927
1928 expansions.push((
1929 row_idx,
1930 *target_vid,
1931 *eid,
1932 edge_type.clone(),
1933 props.clone(),
1934 ));
1935 }
1936 }
1937 }
1938 }
1939
1940 if expansions.is_empty() && self.optional {
1942 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1944 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1945 }
1946
1947 if expansions.is_empty() {
1948 return Ok(RecordBatch::new_empty(self.schema.clone()));
1950 }
1951
1952 let matched_rows: HashSet<usize> = if self.optional {
1954 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1955 } else {
1956 HashSet::new()
1957 };
1958
1959 let mut columns: Vec<ArrayRef> = Vec::new();
1961 let indices: Vec<u64> = expansions
1962 .iter()
1963 .map(|(idx, _, _, _, _)| *idx as u64)
1964 .collect();
1965 let indices_array = UInt64Array::from(indices);
1966
1967 for col in input.columns() {
1968 let expanded = take(col.as_ref(), &indices_array, None)?;
1969 columns.push(expanded);
1970 }
1971
1972 let target_vid_name = format!("{}._vid", self.target_variable);
1974 let target_vids: Vec<u64> = expansions
1975 .iter()
1976 .map(|(_, vid, _, _, _)| vid.as_u64())
1977 .collect();
1978 if input.schema().column_with_name(&target_vid_name).is_none() {
1979 columns.push(Arc::new(UInt64Array::from(target_vids)));
1980 }
1981
1982 let target_labels_name = format!("{}._labels", self.target_variable);
1984 if input
1985 .schema()
1986 .column_with_name(&target_labels_name)
1987 .is_none()
1988 {
1989 use arrow_array::builder::{ListBuilder, StringBuilder};
1990 let l0_ctx = self.graph_ctx.l0_context();
1991 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1992 for (_, target_vid, _, _, _) in &expansions {
1993 let mut row_labels: Vec<String> = Vec::new();
1994 for l0 in l0_ctx.iter_l0_buffers() {
1995 let guard = l0.read();
1996 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
1997 for lbl in l0_labels {
1998 if !row_labels.contains(lbl) {
1999 row_labels.push(lbl.clone());
2000 }
2001 }
2002 }
2003 }
2004 let values = labels_builder.values();
2005 for lbl in &row_labels {
2006 values.append_value(lbl);
2007 }
2008 labels_builder.append(true);
2009 }
2010 columns.push(Arc::new(labels_builder.finish()));
2011 }
2012
2013 if self.edge_variable.is_some() {
2016 let eids: Vec<u64> = expansions
2018 .iter()
2019 .map(|(_, _, eid, _, _)| eid.as_u64())
2020 .collect();
2021 columns.push(Arc::new(UInt64Array::from(eids)));
2022
2023 {
2025 let mut type_builder = arrow_array::builder::StringBuilder::new();
2026 for (_, _, _, edge_type, _) in &expansions {
2027 type_builder.append_value(edge_type);
2028 }
2029 columns.push(Arc::new(type_builder.finish()));
2030 }
2031
2032 for prop_name in &self.edge_properties {
2034 use crate::query::df_graph::scan::encode_cypher_value;
2035 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2036 if prop_name == "_all_props" {
2037 for (_, _, _, _, props) in &expansions {
2039 if props.is_empty() {
2040 builder.append_null();
2041 } else {
2042 let mut json_map = serde_json::Map::new();
2043 for (k, v) in props.iter() {
2044 let json_val: serde_json::Value = v.clone().into();
2045 json_map.insert(k.clone(), json_val);
2046 }
2047 let json = serde_json::Value::Object(json_map);
2048 match encode_cypher_value(&json) {
2049 Ok(bytes) => builder.append_value(bytes),
2050 Err(_) => builder.append_null(),
2051 }
2052 }
2053 }
2054 } else {
2055 for (_, _, _, _, props) in &expansions {
2057 match props.get(prop_name) {
2058 Some(uni_common::Value::Null) | None => builder.append_null(),
2059 Some(val) => {
2060 let json_val: serde_json::Value = val.clone().into();
2061 match encode_cypher_value(&json_val) {
2062 Ok(bytes) => builder.append_value(bytes),
2063 Err(_) => builder.append_null(),
2064 }
2065 }
2066 }
2067 }
2068 }
2069 columns.push(Arc::new(builder.finish()));
2070 }
2071 } else {
2072 let eids: Vec<u64> = expansions
2073 .iter()
2074 .map(|(_, _, eid, _, _)| eid.as_u64())
2075 .collect();
2076 columns.push(Arc::new(UInt64Array::from(eids)));
2077 }
2078
2079 {
2081 use crate::query::df_graph::scan::encode_cypher_value;
2082 let l0_ctx = self.graph_ctx.l0_context();
2083
2084 for prop_name in &self.target_properties {
2085 if prop_name == "_all_props" {
2086 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2088 for (_, target_vid, _, _, _) in &expansions {
2089 let mut merged_props = serde_json::Map::new();
2090 for l0 in l0_ctx.iter_l0_buffers() {
2091 let guard = l0.read();
2092 if let Some(props) = guard.vertex_properties.get(target_vid) {
2093 for (k, v) in props.iter() {
2094 let json_val: serde_json::Value = v.clone().into();
2095 merged_props.insert(k.to_string(), json_val);
2096 }
2097 }
2098 }
2099 if merged_props.is_empty() {
2100 builder.append_null();
2101 } else {
2102 let json = serde_json::Value::Object(merged_props);
2103 match encode_cypher_value(&json) {
2104 Ok(bytes) => builder.append_value(bytes),
2105 Err(_) => builder.append_null(),
2106 }
2107 }
2108 }
2109 columns.push(Arc::new(builder.finish()));
2110 } else {
2111 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2113 for (_, target_vid, _, _, _) in &expansions {
2114 let mut found = false;
2115 for l0 in l0_ctx.iter_l0_buffers() {
2116 let guard = l0.read();
2117 if let Some(props) = guard.vertex_properties.get(target_vid)
2118 && let Some(val) = props.get(prop_name.as_str())
2119 && !val.is_null()
2120 {
2121 let json_val: serde_json::Value = val.clone().into();
2122 if let Ok(bytes) = encode_cypher_value(&json_val) {
2123 builder.append_value(bytes);
2124 found = true;
2125 break;
2126 }
2127 }
2128 }
2129 if !found {
2130 builder.append_null();
2131 }
2132 }
2133 columns.push(Arc::new(builder.finish()));
2134 }
2135 }
2136 }
2137
2138 let matched_batch =
2139 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2140
2141 if self.optional {
2143 let unmatched = collect_unmatched_optional_group_rows(
2144 input,
2145 &matched_rows,
2146 &self.schema,
2147 &self.optional_pattern_vars,
2148 )?;
2149
2150 if unmatched.is_empty() {
2151 return Ok(matched_batch);
2152 }
2153
2154 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2155 input,
2156 &unmatched,
2157 &self.schema,
2158 &self.optional_pattern_vars,
2159 )?;
2160
2161 use arrow::compute::concat_batches;
2163 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2164 } else {
2165 Ok(matched_batch)
2166 }
2167 }
2168}
2169
2170async fn build_edge_adjacency_map(
2176 graph_ctx: &GraphExecutionContext,
2177 type_names: &[String],
2178 direction: Direction,
2179) -> DFResult<EdgeAdjacencyMap> {
2180 use uni_store::storage::main_edge::MainEdgeDataset;
2181
2182 let storage = graph_ctx.storage();
2183 let l0_ctx = graph_ctx.l0_context();
2184 let lancedb_store = storage.lancedb_store();
2185
2186 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2188 let edges_with_type = MainEdgeDataset::find_edges_by_type_names(lancedb_store, &type_refs)
2189 .await
2190 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2191
2192 let mut edges: Vec<(
2194 uni_common::Eid,
2195 uni_common::Vid,
2196 uni_common::Vid,
2197 String,
2198 uni_common::Properties,
2199 )> = edges_with_type.into_iter().collect();
2200
2201 for l0 in l0_ctx.iter_l0_buffers() {
2203 let l0_guard = l0.read();
2204
2205 for type_name in type_names {
2206 let l0_eids = l0_guard.eids_for_type(type_name);
2207
2208 for &eid in &l0_eids {
2210 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2211 let src_vid = edge_ref.src_vid;
2212 let dst_vid = edge_ref.dst_vid;
2213
2214 let props = l0_guard
2216 .edge_properties
2217 .get(&eid)
2218 .cloned()
2219 .unwrap_or_default();
2220
2221 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2222 }
2223 }
2224 }
2225 }
2226
2227 let mut seen_eids = HashSet::new();
2229 let mut unique_edges = Vec::new();
2230 for edge in edges.into_iter().rev() {
2231 if seen_eids.insert(edge.0) {
2232 unique_edges.push(edge);
2233 }
2234 }
2235 unique_edges.reverse();
2236
2237 let mut tombstoned_eids = HashSet::new();
2239 for l0 in l0_ctx.iter_l0_buffers() {
2240 let l0_guard = l0.read();
2241 for eid in l0_guard.tombstones.keys() {
2242 tombstoned_eids.insert(*eid);
2243 }
2244 }
2245 if !tombstoned_eids.is_empty() {
2246 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2247 }
2248
2249 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2251
2252 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2253 match direction {
2254 Direction::Outgoing => {
2255 adjacency
2256 .entry(src_vid)
2257 .or_default()
2258 .push((dst_vid, eid, edge_type, props));
2259 }
2260 Direction::Incoming => {
2261 adjacency
2262 .entry(dst_vid)
2263 .or_default()
2264 .push((src_vid, eid, edge_type, props));
2265 }
2266 Direction::Both => {
2267 adjacency.entry(src_vid).or_default().push((
2268 dst_vid,
2269 eid,
2270 edge_type.clone(),
2271 props.clone(),
2272 ));
2273 adjacency
2274 .entry(dst_vid)
2275 .or_default()
2276 .push((src_vid, eid, edge_type, props));
2277 }
2278 }
2279 }
2280
2281 Ok(adjacency)
2282}
2283
2284impl Stream for GraphTraverseMainStream {
2285 type Item = DFResult<RecordBatch>;
2286
2287 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2288 loop {
2289 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2290
2291 match state {
2292 GraphTraverseMainState::LoadingEdges {
2293 mut future,
2294 input_stream,
2295 } => match future.as_mut().poll(cx) {
2296 Poll::Ready(Ok(adjacency)) => {
2297 self.state = GraphTraverseMainState::Processing {
2299 adjacency,
2300 input_stream,
2301 };
2302 }
2304 Poll::Ready(Err(e)) => {
2305 self.state = GraphTraverseMainState::Done;
2306 return Poll::Ready(Some(Err(e)));
2307 }
2308 Poll::Pending => {
2309 self.state = GraphTraverseMainState::LoadingEdges {
2310 future,
2311 input_stream,
2312 };
2313 return Poll::Pending;
2314 }
2315 },
2316 GraphTraverseMainState::Processing {
2317 adjacency,
2318 mut input_stream,
2319 } => {
2320 if let Err(e) = self.graph_ctx.check_timeout() {
2322 return Poll::Ready(Some(Err(
2323 datafusion::error::DataFusionError::Execution(e.to_string()),
2324 )));
2325 }
2326
2327 match input_stream.poll_next_unpin(cx) {
2328 Poll::Ready(Some(Ok(batch))) => {
2329 let result = self.expand_batch(&batch, &adjacency);
2331
2332 self.state = GraphTraverseMainState::Processing {
2333 adjacency,
2334 input_stream,
2335 };
2336
2337 if let Ok(ref r) = result {
2338 self.metrics.record_output(r.num_rows());
2339 }
2340 return Poll::Ready(Some(result));
2341 }
2342 Poll::Ready(Some(Err(e))) => {
2343 self.state = GraphTraverseMainState::Done;
2344 return Poll::Ready(Some(Err(e)));
2345 }
2346 Poll::Ready(None) => {
2347 self.state = GraphTraverseMainState::Done;
2348 return Poll::Ready(None);
2349 }
2350 Poll::Pending => {
2351 self.state = GraphTraverseMainState::Processing {
2352 adjacency,
2353 input_stream,
2354 };
2355 return Poll::Pending;
2356 }
2357 }
2358 }
2359 GraphTraverseMainState::Done => {
2360 return Poll::Ready(None);
2361 }
2362 }
2363 }
2364 }
2365}
2366
2367impl RecordBatchStream for GraphTraverseMainStream {
2368 fn schema(&self) -> SchemaRef {
2369 self.schema.clone()
2370 }
2371}
2372
2373pub struct GraphVariableLengthTraverseExec {
2394 input: Arc<dyn ExecutionPlan>,
2396
2397 source_column: String,
2399
2400 edge_type_ids: Vec<u32>,
2402
2403 direction: Direction,
2405
2406 min_hops: usize,
2408
2409 max_hops: usize,
2411
2412 target_variable: String,
2414
2415 step_variable: Option<String>,
2417
2418 path_variable: Option<String>,
2420
2421 target_properties: Vec<String>,
2423
2424 target_label_name: Option<String>,
2426
2427 is_optional: bool,
2429
2430 bound_target_column: Option<String>,
2432
2433 edge_lance_filter: Option<String>,
2435
2436 edge_property_conditions: Vec<(String, UniValue)>,
2439
2440 used_edge_columns: Vec<String>,
2442
2443 path_mode: super::nfa::PathMode,
2445
2446 output_mode: super::nfa::VlpOutputMode,
2448
2449 nfa: Arc<PathNfa>,
2451
2452 graph_ctx: Arc<GraphExecutionContext>,
2454
2455 schema: SchemaRef,
2457
2458 properties: PlanProperties,
2460
2461 metrics: ExecutionPlanMetricsSet,
2463}
2464
2465impl fmt::Debug for GraphVariableLengthTraverseExec {
2466 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2467 f.debug_struct("GraphVariableLengthTraverseExec")
2468 .field("source_column", &self.source_column)
2469 .field("edge_type_ids", &self.edge_type_ids)
2470 .field("direction", &self.direction)
2471 .field("min_hops", &self.min_hops)
2472 .field("max_hops", &self.max_hops)
2473 .field("target_variable", &self.target_variable)
2474 .finish()
2475 }
2476}
2477
2478impl GraphVariableLengthTraverseExec {
2479 #[expect(clippy::too_many_arguments)]
2485 pub fn new(
2486 input: Arc<dyn ExecutionPlan>,
2487 source_column: impl Into<String>,
2488 edge_type_ids: Vec<u32>,
2489 direction: Direction,
2490 min_hops: usize,
2491 max_hops: usize,
2492 target_variable: impl Into<String>,
2493 step_variable: Option<String>,
2494 path_variable: Option<String>,
2495 target_properties: Vec<String>,
2496 target_label_name: Option<String>,
2497 graph_ctx: Arc<GraphExecutionContext>,
2498 is_optional: bool,
2499 bound_target_column: Option<String>,
2500 edge_lance_filter: Option<String>,
2501 edge_property_conditions: Vec<(String, UniValue)>,
2502 used_edge_columns: Vec<String>,
2503 path_mode: super::nfa::PathMode,
2504 output_mode: super::nfa::VlpOutputMode,
2505 qpp_nfa: Option<PathNfa>,
2506 ) -> Self {
2507 let source_column = source_column.into();
2508 let target_variable = target_variable.into();
2509
2510 let uni_schema = graph_ctx.storage().schema_manager().schema();
2512 let label_props = target_label_name
2513 .as_deref()
2514 .and_then(|ln| uni_schema.properties.get(ln));
2515
2516 let schema = Self::build_schema(
2518 input.schema(),
2519 &target_variable,
2520 step_variable.as_deref(),
2521 path_variable.as_deref(),
2522 &target_properties,
2523 label_props,
2524 );
2525 let properties = compute_plan_properties(schema.clone());
2526
2527 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2529 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2530 }));
2531
2532 Self {
2533 input,
2534 source_column,
2535 edge_type_ids,
2536 direction,
2537 min_hops,
2538 max_hops,
2539 target_variable,
2540 step_variable,
2541 path_variable,
2542 target_properties,
2543 target_label_name,
2544 is_optional,
2545 bound_target_column,
2546 edge_lance_filter,
2547 edge_property_conditions,
2548 used_edge_columns,
2549 path_mode,
2550 output_mode,
2551 nfa,
2552 graph_ctx,
2553 schema,
2554 properties,
2555 metrics: ExecutionPlanMetricsSet::new(),
2556 }
2557 }
2558
2559 fn build_schema(
2561 input_schema: SchemaRef,
2562 target_variable: &str,
2563 step_variable: Option<&str>,
2564 path_variable: Option<&str>,
2565 target_properties: &[String],
2566 label_props: Option<
2567 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2568 >,
2569 ) -> SchemaRef {
2570 let mut fields: Vec<Field> = input_schema
2571 .fields()
2572 .iter()
2573 .map(|f| f.as_ref().clone())
2574 .collect();
2575
2576 let target_vid_name = format!("{}._vid", target_variable);
2578 if input_schema.column_with_name(&target_vid_name).is_none() {
2579 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2580 }
2581
2582 let target_labels_name = format!("{}._labels", target_variable);
2584 if input_schema.column_with_name(&target_labels_name).is_none() {
2585 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2586 }
2587
2588 for prop_name in target_properties {
2590 let col_name = format!("{}.{}", target_variable, prop_name);
2591 if input_schema.column_with_name(&col_name).is_none() {
2592 let arrow_type = resolve_property_type(prop_name, label_props);
2593 fields.push(Field::new(&col_name, arrow_type, true));
2594 }
2595 }
2596
2597 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2599
2600 if let Some(step_var) = step_variable {
2602 fields.push(build_edge_list_field(step_var));
2603 }
2604
2605 if let Some(path_var) = path_variable
2607 && input_schema.column_with_name(path_var).is_none()
2608 {
2609 fields.push(build_path_struct_field(path_var));
2610 }
2611
2612 Arc::new(Schema::new(fields))
2613 }
2614}
2615
2616impl DisplayAs for GraphVariableLengthTraverseExec {
2617 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2618 write!(
2619 f,
2620 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2621 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2622 )
2623 }
2624}
2625
2626impl ExecutionPlan for GraphVariableLengthTraverseExec {
2627 fn name(&self) -> &str {
2628 "GraphVariableLengthTraverseExec"
2629 }
2630
2631 fn as_any(&self) -> &dyn Any {
2632 self
2633 }
2634
2635 fn schema(&self) -> SchemaRef {
2636 self.schema.clone()
2637 }
2638
2639 fn properties(&self) -> &PlanProperties {
2640 &self.properties
2641 }
2642
2643 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2644 vec![&self.input]
2645 }
2646
2647 fn with_new_children(
2648 self: Arc<Self>,
2649 children: Vec<Arc<dyn ExecutionPlan>>,
2650 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2651 if children.len() != 1 {
2652 return Err(datafusion::error::DataFusionError::Plan(
2653 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2654 ));
2655 }
2656
2657 Ok(Arc::new(Self::new(
2659 children[0].clone(),
2660 self.source_column.clone(),
2661 self.edge_type_ids.clone(),
2662 self.direction,
2663 self.min_hops,
2664 self.max_hops,
2665 self.target_variable.clone(),
2666 self.step_variable.clone(),
2667 self.path_variable.clone(),
2668 self.target_properties.clone(),
2669 self.target_label_name.clone(),
2670 self.graph_ctx.clone(),
2671 self.is_optional,
2672 self.bound_target_column.clone(),
2673 self.edge_lance_filter.clone(),
2674 self.edge_property_conditions.clone(),
2675 self.used_edge_columns.clone(),
2676 self.path_mode.clone(),
2677 self.output_mode.clone(),
2678 Some((*self.nfa).clone()),
2679 )))
2680 }
2681
2682 fn execute(
2683 &self,
2684 partition: usize,
2685 context: Arc<TaskContext>,
2686 ) -> DFResult<SendableRecordBatchStream> {
2687 let input_stream = self.input.execute(partition, context)?;
2688
2689 let metrics = BaselineMetrics::new(&self.metrics, partition);
2690
2691 let warm_fut = self
2692 .graph_ctx
2693 .warming_future(self.edge_type_ids.clone(), self.direction);
2694
2695 Ok(Box::pin(GraphVariableLengthTraverseStream {
2696 input: input_stream,
2697 exec: Arc::new(self.clone_for_stream()),
2698 schema: self.schema.clone(),
2699 state: VarLengthStreamState::Warming(warm_fut),
2700 metrics,
2701 }))
2702 }
2703
2704 fn metrics(&self) -> Option<MetricsSet> {
2705 Some(self.metrics.clone_inner())
2706 }
2707}
2708
2709impl GraphVariableLengthTraverseExec {
2710 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2712 GraphVariableLengthTraverseExecData {
2713 source_column: self.source_column.clone(),
2714 edge_type_ids: self.edge_type_ids.clone(),
2715 direction: self.direction,
2716 min_hops: self.min_hops,
2717 max_hops: self.max_hops,
2718 target_variable: self.target_variable.clone(),
2719 step_variable: self.step_variable.clone(),
2720 path_variable: self.path_variable.clone(),
2721 target_properties: self.target_properties.clone(),
2722 target_label_name: self.target_label_name.clone(),
2723 is_optional: self.is_optional,
2724 bound_target_column: self.bound_target_column.clone(),
2725 edge_lance_filter: self.edge_lance_filter.clone(),
2726 edge_property_conditions: self.edge_property_conditions.clone(),
2727 used_edge_columns: self.used_edge_columns.clone(),
2728 path_mode: self.path_mode.clone(),
2729 output_mode: self.output_mode.clone(),
2730 nfa: self.nfa.clone(),
2731 graph_ctx: self.graph_ctx.clone(),
2732 }
2733 }
2734}
2735
2736#[expect(
2738 dead_code,
2739 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2740)]
2741struct GraphVariableLengthTraverseExecData {
2742 source_column: String,
2743 edge_type_ids: Vec<u32>,
2744 direction: Direction,
2745 min_hops: usize,
2746 max_hops: usize,
2747 target_variable: String,
2748 step_variable: Option<String>,
2749 path_variable: Option<String>,
2750 target_properties: Vec<String>,
2751 target_label_name: Option<String>,
2752 is_optional: bool,
2753 bound_target_column: Option<String>,
2754 #[expect(dead_code, reason = "Used in Phase 3 warming")]
2755 edge_lance_filter: Option<String>,
2756 edge_property_conditions: Vec<(String, UniValue)>,
2758 used_edge_columns: Vec<String>,
2759 path_mode: super::nfa::PathMode,
2760 output_mode: super::nfa::VlpOutputMode,
2761 nfa: Arc<PathNfa>,
2762 graph_ctx: Arc<GraphExecutionContext>,
2763}
2764
2765const MAX_FRONTIER_SIZE: usize = 500_000;
2767const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2769
2770impl GraphVariableLengthTraverseExecData {
2771 fn check_target_label(&self, vid: Vid) -> bool {
2773 if let Some(ref label_name) = self.target_label_name {
2774 let query_ctx = self.graph_ctx.query_context();
2775 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2776 Some(labels) => labels.contains(label_name),
2777 None => true, }
2779 } else {
2780 true
2781 }
2782 }
2783
2784 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2786 match constraint {
2787 super::nfa::VertexConstraint::Label(label_name) => {
2788 let query_ctx = self.graph_ctx.query_context();
2789 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2790 Some(labels) => labels.contains(label_name),
2791 None => true, }
2793 }
2794 }
2795 }
2796
2797 fn expand_neighbors(
2800 &self,
2801 vid: Vid,
2802 state: NfaStateId,
2803 eid_filter: &EidFilter,
2804 used_eids: &FxHashSet<u64>,
2805 ) -> Vec<(Vid, Eid, NfaStateId)> {
2806 let is_undirected = matches!(self.direction, Direction::Both);
2807 let mut results = Vec::new();
2808
2809 for transition in self.nfa.transitions_from(state) {
2810 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2811
2812 for &etype in &transition.edge_type_ids {
2813 for (neighbor, eid) in
2814 self.graph_ctx
2815 .get_neighbors(vid, etype, transition.direction)
2816 {
2817 if is_undirected && !seen_edges.insert(eid.as_u64()) {
2819 continue;
2820 }
2821
2822 if !eid_filter.contains(eid) {
2824 continue;
2825 }
2826
2827 if !self.edge_property_conditions.is_empty() {
2829 let query_ctx = self.graph_ctx.query_context();
2830 let passes = if let Some(props) =
2831 l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2832 {
2833 self.edge_property_conditions
2834 .iter()
2835 .all(|(name, expected)| {
2836 props.get(name).is_some_and(|actual| actual == expected)
2837 })
2838 } else {
2839 true
2843 };
2844 if !passes {
2845 continue;
2846 }
2847 }
2848
2849 if used_eids.contains(&eid.as_u64()) {
2851 continue;
2852 }
2853
2854 if let Some(constraint) = self.nfa.state_constraint(transition.to)
2856 && !self.check_state_constraint(neighbor, constraint)
2857 {
2858 continue;
2859 }
2860
2861 results.push((neighbor, eid, transition.to));
2862 }
2863 }
2864 }
2865
2866 results
2867 }
2868
2869 fn bfs_with_dag(
2874 &self,
2875 source: Vid,
2876 eid_filter: &EidFilter,
2877 used_eids: &FxHashSet<u64>,
2878 vid_filter: &VidFilter,
2879 ) -> Vec<BfsResult> {
2880 let nfa = &self.nfa;
2881 let selector = PathSelector::All;
2882 let mut dag = PredecessorDag::new(selector);
2883 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2884
2885 if nfa.is_accepting(nfa.start_state())
2887 && self.check_target_label(source)
2888 && vid_filter.contains(source)
2889 {
2890 accepting.push((source, nfa.start_state(), 0));
2891 }
2892
2893 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2895 let mut depth: u32 = 0;
2896
2897 while !frontier.is_empty() && depth < self.max_hops as u32 {
2898 depth += 1;
2899 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2900 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2901
2902 for &(vid, state) in &frontier {
2903 for (neighbor, eid, dst_state) in
2904 self.expand_neighbors(vid, state, eid_filter, used_eids)
2905 {
2906 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2908
2909 if seen_at_depth.insert((neighbor, dst_state)) {
2911 next_frontier.push((neighbor, dst_state));
2912
2913 if nfa.is_accepting(dst_state)
2915 && self.check_target_label(neighbor)
2916 && vid_filter.contains(neighbor)
2917 {
2918 accepting.push((neighbor, dst_state, depth));
2919 }
2920 }
2921 }
2922 }
2923
2924 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
2926 break;
2927 }
2928
2929 frontier = next_frontier;
2930 }
2931
2932 let mut results: Vec<BfsResult> = Vec::new();
2934 for &(target, state, depth) in &accepting {
2935 dag.enumerate_paths(
2936 source,
2937 target,
2938 state,
2939 depth,
2940 depth,
2941 &self.path_mode,
2942 &mut |nodes, edges| {
2943 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
2944 std::ops::ControlFlow::Continue(())
2945 },
2946 );
2947 }
2948
2949 results
2950 }
2951
2952 fn bfs_endpoints_only(
2957 &self,
2958 source: Vid,
2959 eid_filter: &EidFilter,
2960 used_eids: &FxHashSet<u64>,
2961 vid_filter: &VidFilter,
2962 ) -> Vec<(Vid, u32)> {
2963 let nfa = &self.nfa;
2964 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
2966 let mut results: Vec<(Vid, u32)> = Vec::new();
2967
2968 if nfa.is_accepting(nfa.start_state())
2970 && self.check_target_label(source)
2971 && vid_filter.contains(source)
2972 {
2973 results.push((source, 0));
2974 }
2975
2976 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2978 let mut depth: u32 = 0;
2979
2980 while !frontier.is_empty() && depth < self.max_hops as u32 {
2981 depth += 1;
2982 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2983 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2984
2985 for &(vid, state) in &frontier {
2986 for (neighbor, eid, dst_state) in
2987 self.expand_neighbors(vid, state, eid_filter, used_eids)
2988 {
2989 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2990
2991 if seen_at_depth.insert((neighbor, dst_state)) {
2992 next_frontier.push((neighbor, dst_state));
2993
2994 if nfa.is_accepting(dst_state)
2996 && self.check_target_label(neighbor)
2997 && vid_filter.contains(neighbor)
2998 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
2999 {
3000 results.push((neighbor, depth));
3001 }
3002 }
3003 }
3004 }
3005
3006 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3007 break;
3008 }
3009
3010 frontier = next_frontier;
3011 }
3012
3013 results
3014 }
3015}
3016
3017enum VarLengthStreamState {
3019 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3021 Reading,
3023 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3025 Done,
3027}
3028
3029struct GraphVariableLengthTraverseStream {
3031 input: SendableRecordBatchStream,
3032 exec: Arc<GraphVariableLengthTraverseExecData>,
3033 schema: SchemaRef,
3034 state: VarLengthStreamState,
3035 metrics: BaselineMetrics,
3036}
3037
3038impl Stream for GraphVariableLengthTraverseStream {
3039 type Item = DFResult<RecordBatch>;
3040
3041 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3042 loop {
3043 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3044
3045 match state {
3046 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3047 Poll::Ready(Ok(())) => {
3048 self.state = VarLengthStreamState::Reading;
3049 }
3051 Poll::Ready(Err(e)) => {
3052 self.state = VarLengthStreamState::Done;
3053 return Poll::Ready(Some(Err(e)));
3054 }
3055 Poll::Pending => {
3056 self.state = VarLengthStreamState::Warming(fut);
3057 return Poll::Pending;
3058 }
3059 },
3060 VarLengthStreamState::Reading => {
3061 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3063 return Poll::Ready(Some(Err(
3064 datafusion::error::DataFusionError::Execution(e.to_string()),
3065 )));
3066 }
3067
3068 match self.input.poll_next_unpin(cx) {
3069 Poll::Ready(Some(Ok(batch))) => {
3070 let eid_filter = EidFilter::AllAllowed;
3073 let vid_filter = VidFilter::AllAllowed;
3074 let base_result =
3075 self.process_batch_base(batch, &eid_filter, &vid_filter);
3076 let base_batch = match base_result {
3077 Ok(b) => b,
3078 Err(e) => {
3079 self.state = VarLengthStreamState::Reading;
3080 return Poll::Ready(Some(Err(e)));
3081 }
3082 };
3083
3084 if self.exec.target_properties.is_empty() {
3086 self.state = VarLengthStreamState::Reading;
3087 return Poll::Ready(Some(Ok(base_batch)));
3088 }
3089
3090 let schema = self.schema.clone();
3092 let target_variable = self.exec.target_variable.clone();
3093 let target_properties = self.exec.target_properties.clone();
3094 let target_label_name = self.exec.target_label_name.clone();
3095 let graph_ctx = self.exec.graph_ctx.clone();
3096
3097 let fut = hydrate_vlp_target_properties(
3098 base_batch,
3099 schema,
3100 target_variable,
3101 target_properties,
3102 target_label_name,
3103 graph_ctx,
3104 );
3105
3106 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3107 }
3109 Poll::Ready(Some(Err(e))) => {
3110 self.state = VarLengthStreamState::Done;
3111 return Poll::Ready(Some(Err(e)));
3112 }
3113 Poll::Ready(None) => {
3114 self.state = VarLengthStreamState::Done;
3115 return Poll::Ready(None);
3116 }
3117 Poll::Pending => {
3118 self.state = VarLengthStreamState::Reading;
3119 return Poll::Pending;
3120 }
3121 }
3122 }
3123 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3124 Poll::Ready(Ok(batch)) => {
3125 self.state = VarLengthStreamState::Reading;
3126 self.metrics.record_output(batch.num_rows());
3127 return Poll::Ready(Some(Ok(batch)));
3128 }
3129 Poll::Ready(Err(e)) => {
3130 self.state = VarLengthStreamState::Done;
3131 return Poll::Ready(Some(Err(e)));
3132 }
3133 Poll::Pending => {
3134 self.state = VarLengthStreamState::Materializing(fut);
3135 return Poll::Pending;
3136 }
3137 },
3138 VarLengthStreamState::Done => {
3139 return Poll::Ready(None);
3140 }
3141 }
3142 }
3143 }
3144}
3145
3146impl GraphVariableLengthTraverseStream {
3147 fn process_batch_base(
3148 &self,
3149 batch: RecordBatch,
3150 eid_filter: &EidFilter,
3151 vid_filter: &VidFilter,
3152 ) -> DFResult<RecordBatch> {
3153 let source_col = batch
3154 .column_by_name(&self.exec.source_column)
3155 .ok_or_else(|| {
3156 datafusion::error::DataFusionError::Execution(format!(
3157 "Source column '{}' not found",
3158 self.exec.source_column
3159 ))
3160 })?;
3161
3162 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3163 let source_vids: &UInt64Array = &source_vid_cow;
3164
3165 let bound_target_cow = self
3167 .exec
3168 .bound_target_column
3169 .as_ref()
3170 .and_then(|col| batch.column_by_name(col))
3171 .map(|c| column_as_vid_array(c.as_ref()))
3172 .transpose()?;
3173 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3174
3175 let used_edge_arrays: Vec<&UInt64Array> = self
3177 .exec
3178 .used_edge_columns
3179 .iter()
3180 .filter_map(|col| {
3181 batch
3182 .column_by_name(col)?
3183 .as_any()
3184 .downcast_ref::<UInt64Array>()
3185 })
3186 .collect();
3187
3188 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3190
3191 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3192 let mut emitted_for_row = false;
3193
3194 if let Some(src) = source_vid {
3195 let vid = Vid::from(src);
3196
3197 let used_eids: FxHashSet<u64> = used_edge_arrays
3199 .iter()
3200 .filter_map(|arr| {
3201 if arr.is_null(row_idx) {
3202 None
3203 } else {
3204 Some(arr.value(row_idx))
3205 }
3206 })
3207 .collect();
3208
3209 match &self.exec.output_mode {
3211 VlpOutputMode::EndpointsOnly => {
3212 let endpoints = self
3213 .exec
3214 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3215 for (target, depth) in endpoints {
3216 if let Some(targets) = expected_targets {
3218 if targets.is_null(row_idx) {
3219 continue;
3220 }
3221 if target.as_u64() != targets.value(row_idx) {
3222 continue;
3223 }
3224 }
3225 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3226 emitted_for_row = true;
3227 }
3228 }
3229 _ => {
3230 let bfs_results = self
3232 .exec
3233 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3234 for (target, hop_count, node_path, edge_path) in bfs_results {
3235 if let Some(targets) = expected_targets {
3237 if targets.is_null(row_idx) {
3238 continue;
3239 }
3240 if target.as_u64() != targets.value(row_idx) {
3241 continue;
3242 }
3243 }
3244 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3245 emitted_for_row = true;
3246 }
3247 }
3248 }
3249 }
3250
3251 if self.exec.is_optional && !emitted_for_row {
3252 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3255 }
3256 }
3257
3258 self.build_output_batch(&batch, &expansions)
3259 }
3260
3261 fn build_output_batch(
3262 &self,
3263 input: &RecordBatch,
3264 expansions: &[VarLengthExpansion],
3265 ) -> DFResult<RecordBatch> {
3266 if expansions.is_empty() {
3267 return Ok(RecordBatch::new_empty(self.schema.clone()));
3268 }
3269
3270 let num_rows = expansions.len();
3271
3272 let indices: Vec<u64> = expansions
3274 .iter()
3275 .map(|(idx, _, _, _, _)| *idx as u64)
3276 .collect();
3277 let indices_array = UInt64Array::from(indices);
3278
3279 let mut columns: Vec<ArrayRef> = Vec::new();
3281 for col in input.columns() {
3282 let expanded = take(col.as_ref(), &indices_array, None)?;
3283 columns.push(expanded);
3284 }
3285
3286 let unmatched_rows: Vec<bool> = expansions
3290 .iter()
3291 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3292 .collect();
3293 let target_vids: Vec<Option<u64>> = expansions
3294 .iter()
3295 .zip(unmatched_rows.iter())
3296 .map(
3297 |((_, vid, _, _, _), unmatched)| {
3298 if *unmatched { None } else { Some(vid.as_u64()) }
3299 },
3300 )
3301 .collect();
3302
3303 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3305 if input.schema().column_with_name(&target_vid_name).is_none() {
3306 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3307 }
3308
3309 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3311 if input
3312 .schema()
3313 .column_with_name(&target_labels_name)
3314 .is_none()
3315 {
3316 use arrow_array::builder::{ListBuilder, StringBuilder};
3317 let query_ctx = self.exec.graph_ctx.query_context();
3318 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3319 for target_vid in &target_vids {
3320 let Some(vid_u64) = target_vid else {
3321 labels_builder.append(false);
3322 continue;
3323 };
3324 let vid = Vid::from(*vid_u64);
3325 let row_labels: Vec<String> =
3326 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3327 Some(labels) => {
3328 labels
3330 }
3331 None => {
3332 if let Some(ref label_name) = self.exec.target_label_name {
3334 vec![label_name.clone()]
3335 } else {
3336 vec![]
3337 }
3338 }
3339 };
3340 let values = labels_builder.values();
3341 for lbl in &row_labels {
3342 values.append_value(lbl);
3343 }
3344 labels_builder.append(true);
3345 }
3346 columns.push(Arc::new(labels_builder.finish()));
3347 }
3348
3349 for prop_name in &self.exec.target_properties {
3351 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3352 if input.schema().column_with_name(&full_prop_name).is_none() {
3353 let col_idx = columns.len();
3354 if col_idx < self.schema.fields().len() {
3355 let field = self.schema.field(col_idx);
3356 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3357 }
3358 }
3359 }
3360
3361 let hop_counts: Vec<u64> = expansions
3363 .iter()
3364 .map(|(_, _, hops, _, _)| *hops as u64)
3365 .collect();
3366 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3367
3368 if self.exec.step_variable.is_some() {
3370 let mut edges_builder = new_edge_list_builder();
3371 let query_ctx = self.exec.graph_ctx.query_context();
3372
3373 for (_, _, _, node_path, edge_path) in expansions {
3374 if node_path.is_empty() && edge_path.is_empty() {
3375 edges_builder.append_null();
3377 } else if edge_path.is_empty() {
3378 edges_builder.append(true);
3380 } else {
3381 for (i, eid) in edge_path.iter().enumerate() {
3382 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3383 .unwrap_or_else(|| "UNKNOWN".to_string());
3384 append_edge_to_struct(
3385 edges_builder.values(),
3386 *eid,
3387 &type_name,
3388 node_path[i].as_u64(),
3389 node_path[i + 1].as_u64(),
3390 &query_ctx,
3391 );
3392 }
3393 edges_builder.append(true);
3394 }
3395 }
3396
3397 columns.push(Arc::new(edges_builder.finish()));
3398 }
3399
3400 if let Some(path_var_name) = &self.exec.path_variable {
3405 let existing_path_col_idx = input
3406 .schema()
3407 .column_with_name(path_var_name)
3408 .map(|(idx, _)| idx);
3409 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3411 let existing_path = existing_path_arc
3412 .as_ref()
3413 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3414
3415 let mut nodes_builder = new_node_list_builder();
3416 let mut rels_builder = new_edge_list_builder();
3417 let query_ctx = self.exec.graph_ctx.query_context();
3418 let mut path_validity = Vec::with_capacity(expansions.len());
3419
3420 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3421 if node_path.is_empty() && edge_path.is_empty() {
3422 nodes_builder.append(false);
3423 rels_builder.append(false);
3424 path_validity.push(false);
3425 continue;
3426 }
3427
3428 let skip_first_vlp_node = if let Some(existing) = existing_path {
3430 if !existing.is_null(row_out_idx) {
3431 prepend_existing_path(
3432 existing,
3433 row_out_idx,
3434 &mut nodes_builder,
3435 &mut rels_builder,
3436 &query_ctx,
3437 );
3438 true
3439 } else {
3440 false
3441 }
3442 } else {
3443 false
3444 };
3445
3446 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3448 for vid in &node_path[start_idx..] {
3449 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3450 }
3451 nodes_builder.append(true);
3452
3453 for (i, eid) in edge_path.iter().enumerate() {
3454 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3455 .unwrap_or_else(|| "UNKNOWN".to_string());
3456 append_edge_to_struct(
3457 rels_builder.values(),
3458 *eid,
3459 &type_name,
3460 node_path[i].as_u64(),
3461 node_path[i + 1].as_u64(),
3462 &query_ctx,
3463 );
3464 }
3465 rels_builder.append(true);
3466 path_validity.push(true);
3467 }
3468
3469 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3471 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3472
3473 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3475 let rels_field = Arc::new(Field::new(
3476 "relationships",
3477 rels_array.data_type().clone(),
3478 true,
3479 ));
3480
3481 let path_struct = arrow_array::StructArray::try_new(
3483 vec![nodes_field, rels_field].into(),
3484 vec![nodes_array, rels_array],
3485 Some(arrow::buffer::NullBuffer::from(path_validity)),
3486 )
3487 .map_err(arrow_err)?;
3488
3489 if let Some(idx) = existing_path_col_idx {
3490 columns[idx] = Arc::new(path_struct);
3491 } else {
3492 columns.push(Arc::new(path_struct));
3493 }
3494 }
3495
3496 self.metrics.record_output(num_rows);
3497
3498 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3499 }
3500}
3501
3502impl RecordBatchStream for GraphVariableLengthTraverseStream {
3503 fn schema(&self) -> SchemaRef {
3504 self.schema.clone()
3505 }
3506}
3507
3508async fn hydrate_vlp_target_properties(
3513 base_batch: RecordBatch,
3514 schema: SchemaRef,
3515 target_variable: String,
3516 target_properties: Vec<String>,
3517 target_label_name: Option<String>,
3518 graph_ctx: Arc<GraphExecutionContext>,
3519) -> DFResult<RecordBatch> {
3520 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3521 return Ok(base_batch);
3522 }
3523
3524 let target_vid_col_name = format!("{}._vid", target_variable);
3531 let vid_col_idx = schema
3532 .fields()
3533 .iter()
3534 .enumerate()
3535 .rev()
3536 .find(|(_, f)| f.name() == &target_vid_col_name)
3537 .map(|(i, _)| i);
3538
3539 let Some(vid_col_idx) = vid_col_idx else {
3540 return Ok(base_batch);
3541 };
3542
3543 let vid_col = base_batch.column(vid_col_idx);
3544 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3545 let target_vid_array: &UInt64Array = &target_vid_cow;
3546
3547 let target_vids: Vec<Vid> = target_vid_array
3548 .iter()
3549 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3552 .collect();
3553
3554 let mut property_columns: Vec<ArrayRef> = Vec::new();
3556
3557 if let Some(ref label_name) = target_label_name {
3558 let property_manager = graph_ctx.property_manager();
3559 let query_ctx = graph_ctx.query_context();
3560
3561 let props_map = property_manager
3562 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3563 .await
3564 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3565
3566 let uni_schema = graph_ctx.storage().schema_manager().schema();
3567 let label_props = uni_schema.properties.get(label_name.as_str());
3568
3569 for prop_name in &target_properties {
3570 let data_type = resolve_property_type(prop_name, label_props);
3571 let column =
3572 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3573 property_columns.push(column);
3574 }
3575 } else {
3576 let non_internal_props: Vec<&str> = target_properties
3579 .iter()
3580 .filter(|p| *p != "_all_props")
3581 .map(|s| s.as_str())
3582 .collect();
3583 let property_manager = graph_ctx.property_manager();
3584 let query_ctx = graph_ctx.query_context();
3585
3586 let props_map = if !non_internal_props.is_empty() {
3587 property_manager
3588 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3589 .await
3590 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3591 } else {
3592 std::collections::HashMap::new()
3593 };
3594
3595 for prop_name in &target_properties {
3596 if prop_name == "_all_props" {
3597 use crate::query::df_graph::scan::encode_cypher_value;
3599 use arrow_array::builder::LargeBinaryBuilder;
3600
3601 let mut builder = LargeBinaryBuilder::new();
3602 let l0_ctx = graph_ctx.l0_context();
3603 for vid in &target_vids {
3604 let mut merged_props = serde_json::Map::new();
3605 if let Some(vid_props) = props_map.get(vid) {
3607 for (k, v) in vid_props.iter() {
3608 let json_val: serde_json::Value = v.clone().into();
3609 merged_props.insert(k.to_string(), json_val);
3610 }
3611 }
3612 for l0 in l0_ctx.iter_l0_buffers() {
3614 let guard = l0.read();
3615 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3616 for (k, v) in l0_props.iter() {
3617 let json_val: serde_json::Value = v.clone().into();
3618 merged_props.insert(k.to_string(), json_val);
3619 }
3620 }
3621 }
3622 if merged_props.is_empty() {
3623 builder.append_null();
3624 } else {
3625 let json = serde_json::Value::Object(merged_props);
3626 match encode_cypher_value(&json) {
3627 Ok(bytes) => builder.append_value(bytes),
3628 Err(_) => builder.append_null(),
3629 }
3630 }
3631 }
3632 property_columns.push(Arc::new(builder.finish()));
3633 } else {
3634 let column = build_property_column_static(
3635 &target_vids,
3636 &props_map,
3637 prop_name,
3638 &arrow::datatypes::DataType::LargeBinary,
3639 )?;
3640 property_columns.push(column);
3641 }
3642 }
3643 }
3644
3645 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3651 let mut prop_idx = 0;
3652 for (col_idx, field) in schema.fields().iter().enumerate() {
3653 let is_target_prop = col_idx > vid_col_idx
3654 && target_properties
3655 .iter()
3656 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3657 if is_target_prop && prop_idx < property_columns.len() {
3658 new_columns.push(property_columns[prop_idx].clone());
3659 prop_idx += 1;
3660 } else {
3661 new_columns.push(base_batch.column(col_idx).clone());
3662 }
3663 }
3664
3665 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3666}
3667
3668pub struct GraphVariableLengthTraverseMainExec {
3678 input: Arc<dyn ExecutionPlan>,
3680
3681 source_column: String,
3683
3684 type_names: Vec<String>,
3686
3687 direction: Direction,
3689
3690 min_hops: usize,
3692
3693 max_hops: usize,
3695
3696 target_variable: String,
3698
3699 step_variable: Option<String>,
3701
3702 path_variable: Option<String>,
3704
3705 target_properties: Vec<String>,
3707
3708 is_optional: bool,
3710
3711 bound_target_column: Option<String>,
3713
3714 edge_lance_filter: Option<String>,
3716
3717 edge_property_conditions: Vec<(String, UniValue)>,
3720
3721 used_edge_columns: Vec<String>,
3723
3724 path_mode: super::nfa::PathMode,
3726
3727 output_mode: super::nfa::VlpOutputMode,
3729
3730 graph_ctx: Arc<GraphExecutionContext>,
3732
3733 schema: SchemaRef,
3735
3736 properties: PlanProperties,
3738
3739 metrics: ExecutionPlanMetricsSet,
3741}
3742
3743impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3744 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3745 f.debug_struct("GraphVariableLengthTraverseMainExec")
3746 .field("source_column", &self.source_column)
3747 .field("type_names", &self.type_names)
3748 .field("direction", &self.direction)
3749 .field("min_hops", &self.min_hops)
3750 .field("max_hops", &self.max_hops)
3751 .field("target_variable", &self.target_variable)
3752 .finish()
3753 }
3754}
3755
3756impl GraphVariableLengthTraverseMainExec {
3757 #[expect(clippy::too_many_arguments)]
3759 pub fn new(
3760 input: Arc<dyn ExecutionPlan>,
3761 source_column: impl Into<String>,
3762 type_names: Vec<String>,
3763 direction: Direction,
3764 min_hops: usize,
3765 max_hops: usize,
3766 target_variable: impl Into<String>,
3767 step_variable: Option<String>,
3768 path_variable: Option<String>,
3769 target_properties: Vec<String>,
3770 graph_ctx: Arc<GraphExecutionContext>,
3771 is_optional: bool,
3772 bound_target_column: Option<String>,
3773 edge_lance_filter: Option<String>,
3774 edge_property_conditions: Vec<(String, UniValue)>,
3775 used_edge_columns: Vec<String>,
3776 path_mode: super::nfa::PathMode,
3777 output_mode: super::nfa::VlpOutputMode,
3778 ) -> Self {
3779 let source_column = source_column.into();
3780 let target_variable = target_variable.into();
3781
3782 let schema = Self::build_schema(
3784 input.schema(),
3785 &target_variable,
3786 step_variable.as_deref(),
3787 path_variable.as_deref(),
3788 &target_properties,
3789 );
3790 let properties = compute_plan_properties(schema.clone());
3791
3792 Self {
3793 input,
3794 source_column,
3795 type_names,
3796 direction,
3797 min_hops,
3798 max_hops,
3799 target_variable,
3800 step_variable,
3801 path_variable,
3802 target_properties,
3803 is_optional,
3804 bound_target_column,
3805 edge_lance_filter,
3806 edge_property_conditions,
3807 used_edge_columns,
3808 path_mode,
3809 output_mode,
3810 graph_ctx,
3811 schema,
3812 properties,
3813 metrics: ExecutionPlanMetricsSet::new(),
3814 }
3815 }
3816
3817 fn build_schema(
3819 input_schema: SchemaRef,
3820 target_variable: &str,
3821 step_variable: Option<&str>,
3822 path_variable: Option<&str>,
3823 target_properties: &[String],
3824 ) -> SchemaRef {
3825 let mut fields: Vec<Field> = input_schema
3826 .fields()
3827 .iter()
3828 .map(|f| f.as_ref().clone())
3829 .collect();
3830
3831 let target_vid_name = format!("{}._vid", target_variable);
3833 if input_schema.column_with_name(&target_vid_name).is_none() {
3834 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3835 }
3836
3837 let target_labels_name = format!("{}._labels", target_variable);
3839 if input_schema.column_with_name(&target_labels_name).is_none() {
3840 fields.push(Field::new(target_labels_name, labels_data_type(), true));
3841 }
3842
3843 fields.push(Field::new("_hop_count", DataType::UInt64, false));
3845
3846 if let Some(step_var) = step_variable {
3849 fields.push(build_edge_list_field(step_var));
3850 }
3851
3852 if let Some(path_var) = path_variable
3854 && input_schema.column_with_name(path_var).is_none()
3855 {
3856 fields.push(build_path_struct_field(path_var));
3857 }
3858
3859 for prop in target_properties {
3862 let prop_name = format!("{}.{}", target_variable, prop);
3863 if input_schema.column_with_name(&prop_name).is_none() {
3864 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3865 }
3866 }
3867
3868 Arc::new(Schema::new(fields))
3869 }
3870}
3871
3872impl DisplayAs for GraphVariableLengthTraverseMainExec {
3873 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3874 write!(
3875 f,
3876 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3877 self.source_column, self.type_names, self.min_hops, self.max_hops
3878 )
3879 }
3880}
3881
3882impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3883 fn name(&self) -> &str {
3884 "GraphVariableLengthTraverseMainExec"
3885 }
3886
3887 fn as_any(&self) -> &dyn Any {
3888 self
3889 }
3890
3891 fn schema(&self) -> SchemaRef {
3892 self.schema.clone()
3893 }
3894
3895 fn properties(&self) -> &PlanProperties {
3896 &self.properties
3897 }
3898
3899 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3900 vec![&self.input]
3901 }
3902
3903 fn with_new_children(
3904 self: Arc<Self>,
3905 children: Vec<Arc<dyn ExecutionPlan>>,
3906 ) -> DFResult<Arc<dyn ExecutionPlan>> {
3907 if children.len() != 1 {
3908 return Err(datafusion::error::DataFusionError::Plan(
3909 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3910 ));
3911 }
3912
3913 Ok(Arc::new(Self::new(
3914 children[0].clone(),
3915 self.source_column.clone(),
3916 self.type_names.clone(),
3917 self.direction,
3918 self.min_hops,
3919 self.max_hops,
3920 self.target_variable.clone(),
3921 self.step_variable.clone(),
3922 self.path_variable.clone(),
3923 self.target_properties.clone(),
3924 self.graph_ctx.clone(),
3925 self.is_optional,
3926 self.bound_target_column.clone(),
3927 self.edge_lance_filter.clone(),
3928 self.edge_property_conditions.clone(),
3929 self.used_edge_columns.clone(),
3930 self.path_mode.clone(),
3931 self.output_mode.clone(),
3932 )))
3933 }
3934
3935 fn execute(
3936 &self,
3937 partition: usize,
3938 context: Arc<TaskContext>,
3939 ) -> DFResult<SendableRecordBatchStream> {
3940 let input_stream = self.input.execute(partition, context)?;
3941 let metrics = BaselineMetrics::new(&self.metrics, partition);
3942
3943 let graph_ctx = self.graph_ctx.clone();
3945 let type_names = self.type_names.clone();
3946 let direction = self.direction;
3947 let load_fut =
3948 async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
3949
3950 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
3951 input: input_stream,
3952 source_column: self.source_column.clone(),
3953 type_names: self.type_names.clone(),
3954 direction: self.direction,
3955 min_hops: self.min_hops,
3956 max_hops: self.max_hops,
3957 target_variable: self.target_variable.clone(),
3958 step_variable: self.step_variable.clone(),
3959 path_variable: self.path_variable.clone(),
3960 target_properties: self.target_properties.clone(),
3961 graph_ctx: self.graph_ctx.clone(),
3962 is_optional: self.is_optional,
3963 bound_target_column: self.bound_target_column.clone(),
3964 edge_lance_filter: self.edge_lance_filter.clone(),
3965 edge_property_conditions: self.edge_property_conditions.clone(),
3966 used_edge_columns: self.used_edge_columns.clone(),
3967 path_mode: self.path_mode.clone(),
3968 output_mode: self.output_mode.clone(),
3969 schema: self.schema.clone(),
3970 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
3971 metrics,
3972 }))
3973 }
3974
3975 fn metrics(&self) -> Option<MetricsSet> {
3976 Some(self.metrics.clone_inner())
3977 }
3978}
3979
3980enum VarLengthMainStreamState {
3982 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
3984 Processing(EdgeAdjacencyMap),
3986 Materializing {
3988 adjacency: EdgeAdjacencyMap,
3989 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
3990 },
3991 Done,
3993}
3994
3995#[expect(dead_code, reason = "VLP fields used in Phase 3")]
3997struct GraphVariableLengthTraverseMainStream {
3998 input: SendableRecordBatchStream,
3999 source_column: String,
4000 type_names: Vec<String>,
4001 direction: Direction,
4002 min_hops: usize,
4003 max_hops: usize,
4004 target_variable: String,
4005 step_variable: Option<String>,
4007 path_variable: Option<String>,
4008 target_properties: Vec<String>,
4009 graph_ctx: Arc<GraphExecutionContext>,
4010 is_optional: bool,
4011 bound_target_column: Option<String>,
4012 edge_lance_filter: Option<String>,
4013 edge_property_conditions: Vec<(String, UniValue)>,
4015 used_edge_columns: Vec<String>,
4016 path_mode: super::nfa::PathMode,
4017 output_mode: super::nfa::VlpOutputMode,
4018 schema: SchemaRef,
4019 state: VarLengthMainStreamState,
4020 metrics: BaselineMetrics,
4021}
4022
4023type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4025
4026impl GraphVariableLengthTraverseMainStream {
4027 fn bfs(
4033 &self,
4034 source: Vid,
4035 adjacency: &EdgeAdjacencyMap,
4036 used_eids: &FxHashSet<u64>,
4037 ) -> Vec<MainBfsResult> {
4038 let mut results = Vec::new();
4039 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4040
4041 queue.push_back((source, 0, vec![source], vec![]));
4042
4043 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4044 if depth >= self.min_hops && depth <= self.max_hops {
4046 results.push((current, depth, node_path.clone(), edge_path.clone()));
4047 }
4048
4049 if depth >= self.max_hops {
4051 continue;
4052 }
4053
4054 if let Some(neighbors) = adjacency.get(¤t) {
4056 let is_undirected = matches!(self.direction, Direction::Both);
4057 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4058
4059 for (neighbor, eid, _edge_type, props) in neighbors {
4060 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4062 continue;
4063 }
4064
4065 if edge_path.contains(eid) {
4067 continue;
4068 }
4069
4070 if used_eids.contains(&eid.as_u64()) {
4073 continue;
4074 }
4075
4076 if !self.edge_property_conditions.is_empty() {
4078 let passes =
4079 self.edge_property_conditions
4080 .iter()
4081 .all(|(name, expected)| {
4082 props.get(name).is_some_and(|actual| actual == expected)
4083 });
4084 if !passes {
4085 continue;
4086 }
4087 }
4088
4089 let mut new_node_path = node_path.clone();
4090 new_node_path.push(*neighbor);
4091 let mut new_edge_path = edge_path.clone();
4092 new_edge_path.push(*eid);
4093 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4094 }
4095 }
4096 }
4097
4098 results
4099 }
4100
4101 fn process_batch(
4103 &self,
4104 batch: RecordBatch,
4105 adjacency: &EdgeAdjacencyMap,
4106 ) -> DFResult<RecordBatch> {
4107 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4108 datafusion::error::DataFusionError::Execution(format!(
4109 "Source column '{}' not found in input batch",
4110 self.source_column
4111 ))
4112 })?;
4113
4114 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4115 let source_vids: &UInt64Array = &source_vid_cow;
4116
4117 let bound_target_cow = self
4119 .bound_target_column
4120 .as_ref()
4121 .and_then(|col| batch.column_by_name(col))
4122 .map(|c| column_as_vid_array(c.as_ref()))
4123 .transpose()?;
4124 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4125
4126 let used_edge_arrays: Vec<&UInt64Array> = self
4128 .used_edge_columns
4129 .iter()
4130 .filter_map(|col| {
4131 batch
4132 .column_by_name(col)?
4133 .as_any()
4134 .downcast_ref::<UInt64Array>()
4135 })
4136 .collect();
4137
4138 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4140
4141 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4142 let mut emitted_for_row = false;
4143
4144 if let Some(source_u64) = source_opt {
4145 let source = Vid::from(source_u64);
4146
4147 let used_eids: FxHashSet<u64> = used_edge_arrays
4149 .iter()
4150 .filter_map(|arr| {
4151 if arr.is_null(row_idx) {
4152 None
4153 } else {
4154 Some(arr.value(row_idx))
4155 }
4156 })
4157 .collect();
4158
4159 let bfs_results = self.bfs(source, adjacency, &used_eids);
4160
4161 for (target, hops, node_path, edge_path) in bfs_results {
4162 if let Some(targets) = expected_targets {
4165 if targets.is_null(row_idx) {
4166 continue;
4167 }
4168 let expected_vid = targets.value(row_idx);
4169 if target.as_u64() != expected_vid {
4170 continue;
4171 }
4172 }
4173
4174 expansions.push((row_idx, target, hops, node_path, edge_path));
4175 emitted_for_row = true;
4176 }
4177 }
4178
4179 if self.is_optional && !emitted_for_row {
4180 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4182 }
4183 }
4184
4185 if expansions.is_empty() {
4186 if self.is_optional {
4187 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4188 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4189 }
4190 return Ok(RecordBatch::new_empty(self.schema.clone()));
4191 }
4192
4193 let num_rows = expansions.len();
4194 self.metrics.record_output(num_rows);
4195
4196 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4198
4199 for col_idx in 0..batch.num_columns() {
4201 let array = batch.column(col_idx);
4202 let indices: Vec<u64> = expansions
4203 .iter()
4204 .map(|(idx, _, _, _, _)| *idx as u64)
4205 .collect();
4206 let take_indices = UInt64Array::from(indices);
4207 let expanded = arrow::compute::take(array, &take_indices, None)?;
4208 columns.push(expanded);
4209 }
4210
4211 let target_vid_name = format!("{}._vid", self.target_variable);
4213 if batch.schema().column_with_name(&target_vid_name).is_none() {
4214 let target_vids: Vec<Option<u64>> = expansions
4215 .iter()
4216 .map(|(_, vid, _, node_path, edge_path)| {
4217 if node_path.is_empty() && edge_path.is_empty() {
4218 None
4219 } else {
4220 Some(vid.as_u64())
4221 }
4222 })
4223 .collect();
4224 columns.push(Arc::new(UInt64Array::from(target_vids)));
4225 }
4226
4227 let target_labels_name = format!("{}._labels", self.target_variable);
4229 if batch
4230 .schema()
4231 .column_with_name(&target_labels_name)
4232 .is_none()
4233 {
4234 use arrow_array::builder::{ListBuilder, StringBuilder};
4235 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4236 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4237 if node_path.is_empty() && edge_path.is_empty() {
4238 labels_builder.append(false);
4239 continue;
4240 }
4241 let mut row_labels: Vec<String> = Vec::new();
4242 let labels =
4243 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4244 for lbl in &labels {
4245 if !row_labels.contains(lbl) {
4246 row_labels.push(lbl.clone());
4247 }
4248 }
4249 let values = labels_builder.values();
4250 for lbl in &row_labels {
4251 values.append_value(lbl);
4252 }
4253 labels_builder.append(true);
4254 }
4255 columns.push(Arc::new(labels_builder.finish()));
4256 }
4257
4258 let hop_counts: Vec<u64> = expansions
4260 .iter()
4261 .map(|(_, _, hops, _, _)| *hops as u64)
4262 .collect();
4263 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4264
4265 if self.step_variable.is_some() {
4267 let mut edges_builder = new_edge_list_builder();
4268 let query_ctx = self.graph_ctx.query_context();
4269 let type_names_str = self.type_names.join("|");
4270
4271 for (_, _, _, node_path, edge_path) in expansions.iter() {
4272 if node_path.is_empty() && edge_path.is_empty() {
4273 edges_builder.append_null();
4274 } else if edge_path.is_empty() {
4275 edges_builder.append(true);
4277 } else {
4278 for (i, eid) in edge_path.iter().enumerate() {
4279 append_edge_to_struct(
4280 edges_builder.values(),
4281 *eid,
4282 &type_names_str,
4283 node_path[i].as_u64(),
4284 node_path[i + 1].as_u64(),
4285 &query_ctx,
4286 );
4287 }
4288 edges_builder.append(true);
4289 }
4290 }
4291
4292 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4293 }
4294
4295 if let Some(path_var_name) = &self.path_variable {
4299 let existing_path_col_idx = batch
4300 .schema()
4301 .column_with_name(path_var_name)
4302 .map(|(idx, _)| idx);
4303 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4304 let existing_path = existing_path_arc
4305 .as_ref()
4306 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4307
4308 let mut nodes_builder = new_node_list_builder();
4309 let mut rels_builder = new_edge_list_builder();
4310 let query_ctx = self.graph_ctx.query_context();
4311 let type_names_str = self.type_names.join("|");
4312 let mut path_validity = Vec::with_capacity(expansions.len());
4313
4314 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4315 if node_path.is_empty() && edge_path.is_empty() {
4316 nodes_builder.append(false);
4317 rels_builder.append(false);
4318 path_validity.push(false);
4319 continue;
4320 }
4321
4322 let skip_first_vlp_node = if let Some(existing) = existing_path {
4324 if !existing.is_null(row_out_idx) {
4325 prepend_existing_path(
4326 existing,
4327 row_out_idx,
4328 &mut nodes_builder,
4329 &mut rels_builder,
4330 &query_ctx,
4331 );
4332 true
4333 } else {
4334 false
4335 }
4336 } else {
4337 false
4338 };
4339
4340 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4342 for vid in &node_path[start_idx..] {
4343 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4344 }
4345 nodes_builder.append(true);
4346
4347 for (i, eid) in edge_path.iter().enumerate() {
4348 append_edge_to_struct(
4349 rels_builder.values(),
4350 *eid,
4351 &type_names_str,
4352 node_path[i].as_u64(),
4353 node_path[i + 1].as_u64(),
4354 &query_ctx,
4355 );
4356 }
4357 rels_builder.append(true);
4358 path_validity.push(true);
4359 }
4360
4361 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4363 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4364
4365 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4367 let rels_field = Arc::new(Field::new(
4368 "relationships",
4369 rels_array.data_type().clone(),
4370 true,
4371 ));
4372
4373 let path_struct = arrow_array::StructArray::try_new(
4375 vec![nodes_field, rels_field].into(),
4376 vec![nodes_array, rels_array],
4377 Some(arrow::buffer::NullBuffer::from(path_validity)),
4378 )
4379 .map_err(arrow_err)?;
4380
4381 if let Some(idx) = existing_path_col_idx {
4382 columns[idx] = Arc::new(path_struct);
4383 } else {
4384 columns.push(Arc::new(path_struct));
4385 }
4386 }
4387
4388 for prop_name in &self.target_properties {
4391 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4392 if batch.schema().column_with_name(&full_prop_name).is_none() {
4393 columns.push(arrow_array::new_null_array(
4394 &DataType::LargeBinary,
4395 num_rows,
4396 ));
4397 }
4398 }
4399
4400 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4401 }
4402}
4403
4404impl Stream for GraphVariableLengthTraverseMainStream {
4405 type Item = DFResult<RecordBatch>;
4406
4407 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4408 loop {
4409 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4410
4411 match state {
4412 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4413 Poll::Ready(Ok(adjacency)) => {
4414 self.state = VarLengthMainStreamState::Processing(adjacency);
4415 }
4417 Poll::Ready(Err(e)) => {
4418 self.state = VarLengthMainStreamState::Done;
4419 return Poll::Ready(Some(Err(e)));
4420 }
4421 Poll::Pending => {
4422 self.state = VarLengthMainStreamState::Loading(fut);
4423 return Poll::Pending;
4424 }
4425 },
4426 VarLengthMainStreamState::Processing(adjacency) => {
4427 match self.input.poll_next_unpin(cx) {
4428 Poll::Ready(Some(Ok(batch))) => {
4429 let base_batch = match self.process_batch(batch, &adjacency) {
4430 Ok(b) => b,
4431 Err(e) => {
4432 self.state = VarLengthMainStreamState::Processing(adjacency);
4433 return Poll::Ready(Some(Err(e)));
4434 }
4435 };
4436
4437 if self.target_properties.is_empty() {
4439 self.state = VarLengthMainStreamState::Processing(adjacency);
4440 return Poll::Ready(Some(Ok(base_batch)));
4441 }
4442
4443 let schema = self.schema.clone();
4445 let target_variable = self.target_variable.clone();
4446 let target_properties = self.target_properties.clone();
4447 let graph_ctx = self.graph_ctx.clone();
4448
4449 let fut = hydrate_vlp_target_properties(
4450 base_batch,
4451 schema,
4452 target_variable,
4453 target_properties,
4454 None, graph_ctx,
4456 );
4457
4458 self.state = VarLengthMainStreamState::Materializing {
4459 adjacency,
4460 fut: Box::pin(fut),
4461 };
4462 }
4464 Poll::Ready(Some(Err(e))) => {
4465 self.state = VarLengthMainStreamState::Done;
4466 return Poll::Ready(Some(Err(e)));
4467 }
4468 Poll::Ready(None) => {
4469 self.state = VarLengthMainStreamState::Done;
4470 return Poll::Ready(None);
4471 }
4472 Poll::Pending => {
4473 self.state = VarLengthMainStreamState::Processing(adjacency);
4474 return Poll::Pending;
4475 }
4476 }
4477 }
4478 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4479 match fut.as_mut().poll(cx) {
4480 Poll::Ready(Ok(batch)) => {
4481 self.state = VarLengthMainStreamState::Processing(adjacency);
4482 return Poll::Ready(Some(Ok(batch)));
4483 }
4484 Poll::Ready(Err(e)) => {
4485 self.state = VarLengthMainStreamState::Done;
4486 return Poll::Ready(Some(Err(e)));
4487 }
4488 Poll::Pending => {
4489 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4490 return Poll::Pending;
4491 }
4492 }
4493 }
4494 VarLengthMainStreamState::Done => {
4495 return Poll::Ready(None);
4496 }
4497 }
4498 }
4499 }
4500}
4501
4502impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4503 fn schema(&self) -> SchemaRef {
4504 self.schema.clone()
4505 }
4506}
4507
4508#[cfg(test)]
4509mod tests {
4510 use super::*;
4511
4512 #[test]
4513 fn test_traverse_schema_without_edge() {
4514 let input_schema = Arc::new(Schema::new(vec![Field::new(
4515 "a._vid",
4516 DataType::UInt64,
4517 false,
4518 )]));
4519
4520 let output_schema =
4521 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4522
4523 assert_eq!(output_schema.fields().len(), 4);
4525 assert_eq!(output_schema.field(0).name(), "a._vid");
4526 assert_eq!(output_schema.field(1).name(), "m._vid");
4527 assert_eq!(output_schema.field(2).name(), "m._labels");
4528 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4529 }
4530
4531 #[test]
4532 fn test_traverse_schema_with_edge() {
4533 let input_schema = Arc::new(Schema::new(vec![Field::new(
4534 "a._vid",
4535 DataType::UInt64,
4536 false,
4537 )]));
4538
4539 let output_schema = GraphTraverseExec::build_schema(
4540 input_schema,
4541 "m",
4542 Some("r"),
4543 &[],
4544 &[],
4545 None,
4546 None,
4547 false,
4548 );
4549
4550 assert_eq!(output_schema.fields().len(), 5);
4552 assert_eq!(output_schema.field(0).name(), "a._vid");
4553 assert_eq!(output_schema.field(1).name(), "m._vid");
4554 assert_eq!(output_schema.field(2).name(), "m._labels");
4555 assert_eq!(output_schema.field(3).name(), "r._eid");
4556 assert_eq!(output_schema.field(4).name(), "r._type");
4557 }
4558
4559 #[test]
4560 fn test_traverse_schema_with_target_properties() {
4561 let input_schema = Arc::new(Schema::new(vec![Field::new(
4562 "a._vid",
4563 DataType::UInt64,
4564 false,
4565 )]));
4566
4567 let target_props = vec!["name".to_string(), "age".to_string()];
4568 let output_schema = GraphTraverseExec::build_schema(
4569 input_schema,
4570 "m",
4571 Some("r"),
4572 &[],
4573 &target_props,
4574 None,
4575 None,
4576 false,
4577 );
4578
4579 assert_eq!(output_schema.fields().len(), 7);
4581 assert_eq!(output_schema.field(0).name(), "a._vid");
4582 assert_eq!(output_schema.field(1).name(), "m._vid");
4583 assert_eq!(output_schema.field(2).name(), "m._labels");
4584 assert_eq!(output_schema.field(3).name(), "m.name");
4585 assert_eq!(output_schema.field(4).name(), "m.age");
4586 assert_eq!(output_schema.field(5).name(), "r._eid");
4587 assert_eq!(output_schema.field(6).name(), "r._type");
4588 }
4589
4590 #[test]
4591 fn test_variable_length_schema() {
4592 let input_schema = Arc::new(Schema::new(vec![Field::new(
4593 "a._vid",
4594 DataType::UInt64,
4595 false,
4596 )]));
4597
4598 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4599 input_schema,
4600 "b",
4601 None,
4602 Some("p"),
4603 &[],
4604 None,
4605 );
4606
4607 assert_eq!(output_schema.fields().len(), 5);
4608 assert_eq!(output_schema.field(0).name(), "a._vid");
4609 assert_eq!(output_schema.field(1).name(), "b._vid");
4610 assert_eq!(output_schema.field(2).name(), "b._labels");
4611 assert_eq!(output_schema.field(3).name(), "_hop_count");
4612 assert_eq!(output_schema.field(4).name(), "p");
4613 }
4614
4615 #[test]
4616 fn test_traverse_main_schema_without_edge() {
4617 let input_schema = Arc::new(Schema::new(vec![Field::new(
4618 "a._vid",
4619 DataType::UInt64,
4620 false,
4621 )]));
4622
4623 let output_schema =
4624 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4625
4626 assert_eq!(output_schema.fields().len(), 4);
4628 assert_eq!(output_schema.field(0).name(), "a._vid");
4629 assert_eq!(output_schema.field(1).name(), "m._vid");
4630 assert_eq!(output_schema.field(2).name(), "m._labels");
4631 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4632 }
4633
4634 #[test]
4635 fn test_traverse_main_schema_with_edge() {
4636 let input_schema = Arc::new(Schema::new(vec![Field::new(
4637 "a._vid",
4638 DataType::UInt64,
4639 false,
4640 )]));
4641
4642 let output_schema = GraphTraverseMainExec::build_schema(
4643 &input_schema,
4644 "m",
4645 &Some("r".to_string()),
4646 &[],
4647 &[],
4648 false,
4649 );
4650
4651 assert_eq!(output_schema.fields().len(), 5);
4653 assert_eq!(output_schema.field(0).name(), "a._vid");
4654 assert_eq!(output_schema.field(1).name(), "m._vid");
4655 assert_eq!(output_schema.field(2).name(), "m._labels");
4656 assert_eq!(output_schema.field(3).name(), "r._eid");
4657 assert_eq!(output_schema.field(4).name(), "r._type");
4658 }
4659
4660 #[test]
4661 fn test_traverse_main_schema_with_edge_properties() {
4662 let input_schema = Arc::new(Schema::new(vec![Field::new(
4663 "a._vid",
4664 DataType::UInt64,
4665 false,
4666 )]));
4667
4668 let edge_props = vec!["weight".to_string(), "since".to_string()];
4669 let output_schema = GraphTraverseMainExec::build_schema(
4670 &input_schema,
4671 "m",
4672 &Some("r".to_string()),
4673 &edge_props,
4674 &[],
4675 false,
4676 );
4677
4678 assert_eq!(output_schema.fields().len(), 7);
4680 assert_eq!(output_schema.field(0).name(), "a._vid");
4681 assert_eq!(output_schema.field(1).name(), "m._vid");
4682 assert_eq!(output_schema.field(2).name(), "m._labels");
4683 assert_eq!(output_schema.field(3).name(), "r._eid");
4684 assert_eq!(output_schema.field(4).name(), "r._type");
4685 assert_eq!(output_schema.field(5).name(), "r.weight");
4686 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4687 assert_eq!(output_schema.field(6).name(), "r.since");
4688 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4689 }
4690
4691 #[test]
4692 fn test_traverse_main_schema_with_target_properties() {
4693 let input_schema = Arc::new(Schema::new(vec![Field::new(
4694 "a._vid",
4695 DataType::UInt64,
4696 false,
4697 )]));
4698
4699 let target_props = vec!["name".to_string(), "age".to_string()];
4700 let output_schema = GraphTraverseMainExec::build_schema(
4701 &input_schema,
4702 "m",
4703 &Some("r".to_string()),
4704 &[],
4705 &target_props,
4706 false,
4707 );
4708
4709 assert_eq!(output_schema.fields().len(), 7);
4711 assert_eq!(output_schema.field(0).name(), "a._vid");
4712 assert_eq!(output_schema.field(1).name(), "m._vid");
4713 assert_eq!(output_schema.field(2).name(), "m._labels");
4714 assert_eq!(output_schema.field(3).name(), "r._eid");
4715 assert_eq!(output_schema.field(4).name(), "r._type");
4716 assert_eq!(output_schema.field(5).name(), "m.name");
4717 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4718 assert_eq!(output_schema.field(6).name(), "m.age");
4719 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4720 }
4721}