1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{
42 build_property_column_static, property_field, resolve_property_type,
43};
44use arrow::compute::take;
45use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
46use arrow_schema::{DataType, Field, Schema, SchemaRef};
47use datafusion::common::Result as DFResult;
48use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
49use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
50use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
51use futures::{Stream, StreamExt};
52use fxhash::FxHashSet;
53use std::any::Any;
54use std::collections::{HashMap, HashSet, VecDeque};
55use std::fmt;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::task::{Context, Poll};
59use uni_common::Value as UniValue;
60use uni_common::core::id::{Eid, Vid};
61use uni_store::runtime::l0_visibility;
62use uni_store::storage::direction::Direction;
63
64type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
66
67type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
69
70fn prepend_existing_path(
77 existing_path: &arrow_array::StructArray,
78 row_idx: usize,
79 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
80 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
81 query_ctx: &uni_store::runtime::context::QueryContext,
82) {
83 let nodes_list = existing_path
85 .column(0)
86 .as_any()
87 .downcast_ref::<arrow_array::ListArray>()
88 .unwrap();
89 let node_values = nodes_list.value(row_idx);
90 let node_struct = node_values
91 .as_any()
92 .downcast_ref::<arrow_array::StructArray>()
93 .unwrap();
94 let vid_col = node_struct
95 .column(0)
96 .as_any()
97 .downcast_ref::<UInt64Array>()
98 .unwrap();
99 for i in 0..vid_col.len() {
100 append_node_to_struct(
101 nodes_builder.values(),
102 Vid::from(vid_col.value(i)),
103 query_ctx,
104 );
105 }
106
107 let rels_list = existing_path
109 .column(1)
110 .as_any()
111 .downcast_ref::<arrow_array::ListArray>()
112 .unwrap();
113 let edge_values = rels_list.value(row_idx);
114 let edge_struct = edge_values
115 .as_any()
116 .downcast_ref::<arrow_array::StructArray>()
117 .unwrap();
118 let eid_col = edge_struct
119 .column(0)
120 .as_any()
121 .downcast_ref::<UInt64Array>()
122 .unwrap();
123 let type_col = edge_struct
124 .column(1)
125 .as_any()
126 .downcast_ref::<arrow_array::StringArray>()
127 .unwrap();
128 let src_col = edge_struct
129 .column(2)
130 .as_any()
131 .downcast_ref::<UInt64Array>()
132 .unwrap();
133 let dst_col = edge_struct
134 .column(3)
135 .as_any()
136 .downcast_ref::<UInt64Array>()
137 .unwrap();
138 for i in 0..eid_col.len() {
139 append_edge_to_struct(
140 rels_builder.values(),
141 Eid::from(eid_col.value(i)),
142 type_col.value(i),
143 src_col.value(i),
144 dst_col.value(i),
145 query_ctx,
146 );
147 }
148}
149
150fn resolve_edge_property_type(
155 prop: &str,
156 schema_props: Option<
157 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
158 >,
159) -> DataType {
160 if prop == "overflow_json" {
161 DataType::LargeBinary
162 } else if prop == "_created_at" || prop == "_updated_at" {
163 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
167 } else {
168 schema_props
169 .and_then(|props| props.get(prop))
170 .map(|meta| meta.r#type.to_arrow())
171 .unwrap_or(DataType::LargeBinary)
172 }
173}
174
175use crate::query::df_graph::common::merged_edge_schema_props;
176
177type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
179
180pub struct GraphTraverseExec {
205 input: Arc<dyn ExecutionPlan>,
207
208 source_column: String,
210
211 edge_type_ids: Vec<u32>,
213
214 direction: Direction,
216
217 target_variable: String,
219
220 edge_variable: Option<String>,
222
223 edge_properties: Vec<String>,
225
226 target_properties: Vec<String>,
228
229 target_label_name: Option<String>,
231
232 target_label_id: Option<u16>,
234
235 graph_ctx: Arc<GraphExecutionContext>,
237
238 optional: bool,
240
241 optional_pattern_vars: HashSet<String>,
244
245 bound_target_column: Option<String>,
248
249 used_edge_columns: Vec<String>,
252
253 schema: SchemaRef,
255
256 properties: Arc<PlanProperties>,
258
259 metrics: ExecutionPlanMetricsSet,
261}
262
263impl fmt::Debug for GraphTraverseExec {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 f.debug_struct("GraphTraverseExec")
266 .field("source_column", &self.source_column)
267 .field("edge_type_ids", &self.edge_type_ids)
268 .field("direction", &self.direction)
269 .field("target_variable", &self.target_variable)
270 .field("edge_variable", &self.edge_variable)
271 .finish()
272 }
273}
274
275impl GraphTraverseExec {
276 #[expect(clippy::too_many_arguments)]
292 pub fn new(
293 input: Arc<dyn ExecutionPlan>,
294 source_column: impl Into<String>,
295 edge_type_ids: Vec<u32>,
296 direction: Direction,
297 target_variable: impl Into<String>,
298 edge_variable: Option<String>,
299 edge_properties: Vec<String>,
300 target_properties: Vec<String>,
301 target_label_name: Option<String>,
302 target_label_id: Option<u16>,
303 graph_ctx: Arc<GraphExecutionContext>,
304 optional: bool,
305 optional_pattern_vars: HashSet<String>,
306 bound_target_column: Option<String>,
307 used_edge_columns: Vec<String>,
308 ) -> Self {
309 let source_column = source_column.into();
310 let target_variable = target_variable.into();
311
312 let uni_schema = graph_ctx.storage().schema_manager().schema();
314 let label_props = target_label_name
315 .as_deref()
316 .and_then(|ln| uni_schema.properties.get(ln));
317 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
318 let edge_props = if merged_edge_props.is_empty() {
319 None
320 } else {
321 Some(&merged_edge_props)
322 };
323
324 let schema = Self::build_schema(
326 input.schema(),
327 &target_variable,
328 edge_variable.as_deref(),
329 &edge_properties,
330 &target_properties,
331 label_props,
332 edge_props,
333 optional,
334 );
335
336 let properties = compute_plan_properties(schema.clone());
337
338 Self {
339 input,
340 source_column,
341 edge_type_ids,
342 direction,
343 target_variable,
344 edge_variable,
345 edge_properties,
346 target_properties,
347 target_label_name,
348 target_label_id,
349 graph_ctx,
350 optional,
351 optional_pattern_vars,
352 bound_target_column,
353 used_edge_columns,
354 schema,
355 properties,
356 metrics: ExecutionPlanMetricsSet::new(),
357 }
358 }
359
360 #[expect(
362 clippy::too_many_arguments,
363 reason = "Schema construction needs all field metadata"
364 )]
365 fn build_schema(
366 input_schema: SchemaRef,
367 target_variable: &str,
368 edge_variable: Option<&str>,
369 edge_properties: &[String],
370 target_properties: &[String],
371 label_props: Option<
372 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
373 >,
374 edge_props: Option<
375 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
376 >,
377 optional: bool,
378 ) -> SchemaRef {
379 let mut fields: Vec<Field> = input_schema
380 .fields()
381 .iter()
382 .map(|f| f.as_ref().clone())
383 .collect();
384
385 let target_vid_name = format!("{}._vid", target_variable);
387 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
388
389 fields.push(Field::new(
391 format!("{}._labels", target_variable),
392 labels_data_type(),
393 true,
394 ));
395
396 for prop_name in target_properties {
398 let col_name = format!("{}.{}", target_variable, prop_name);
399 let arrow_type = resolve_property_type(prop_name, label_props);
400 let uni_type = label_props
401 .and_then(|p| p.get(prop_name))
402 .map(|m| &m.r#type);
403 fields.push(property_field(&col_name, arrow_type, uni_type));
404 }
405
406 if let Some(edge_var) = edge_variable {
408 let edge_id_name = format!("{}._eid", edge_var);
409 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
410
411 fields.push(Field::new(
413 format!("{}._type", edge_var),
414 DataType::Utf8,
415 true,
416 ));
417
418 for prop_name in edge_properties {
420 let prop_col_name = format!("{}.{}", edge_var, prop_name);
421 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
422 let uni_type = edge_props.and_then(|p| p.get(prop_name)).map(|m| &m.r#type);
423 fields.push(property_field(&prop_col_name, arrow_type, uni_type));
424 }
425 } else {
426 let internal_eid_name = format!("__eid_to_{}", target_variable);
429 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
430 }
431
432 Arc::new(Schema::new(fields))
433 }
434}
435
436impl DisplayAs for GraphTraverseExec {
437 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 write!(
439 f,
440 "GraphTraverseExec: {} --[{:?}]--> {}",
441 self.source_column, self.edge_type_ids, self.target_variable
442 )?;
443 if let Some(ref edge_var) = self.edge_variable {
444 write!(f, " as {}", edge_var)?;
445 }
446 Ok(())
447 }
448}
449
450impl ExecutionPlan for GraphTraverseExec {
451 fn name(&self) -> &str {
452 "GraphTraverseExec"
453 }
454
455 fn as_any(&self) -> &dyn Any {
456 self
457 }
458
459 fn schema(&self) -> SchemaRef {
460 self.schema.clone()
461 }
462
463 fn properties(&self) -> &Arc<PlanProperties> {
464 &self.properties
465 }
466
467 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
468 vec![&self.input]
469 }
470
471 fn with_new_children(
472 self: Arc<Self>,
473 children: Vec<Arc<dyn ExecutionPlan>>,
474 ) -> DFResult<Arc<dyn ExecutionPlan>> {
475 if children.len() != 1 {
476 return Err(datafusion::error::DataFusionError::Plan(
477 "GraphTraverseExec requires exactly one child".to_string(),
478 ));
479 }
480
481 Ok(Arc::new(Self::new(
482 children[0].clone(),
483 self.source_column.clone(),
484 self.edge_type_ids.clone(),
485 self.direction,
486 self.target_variable.clone(),
487 self.edge_variable.clone(),
488 self.edge_properties.clone(),
489 self.target_properties.clone(),
490 self.target_label_name.clone(),
491 self.target_label_id,
492 self.graph_ctx.clone(),
493 self.optional,
494 self.optional_pattern_vars.clone(),
495 self.bound_target_column.clone(),
496 self.used_edge_columns.clone(),
497 )))
498 }
499
500 fn execute(
501 &self,
502 partition: usize,
503 context: Arc<TaskContext>,
504 ) -> DFResult<SendableRecordBatchStream> {
505 let input_stream = self.input.execute(partition, context)?;
506
507 let metrics = BaselineMetrics::new(&self.metrics, partition);
508
509 let warm_fut = self
510 .graph_ctx
511 .warming_future(self.edge_type_ids.clone(), self.direction);
512
513 Ok(Box::pin(GraphTraverseStream {
514 input: input_stream,
515 source_column: self.source_column.clone(),
516 edge_type_ids: self.edge_type_ids.clone(),
517 direction: self.direction,
518 target_variable: self.target_variable.clone(),
519 edge_variable: self.edge_variable.clone(),
520 edge_properties: self.edge_properties.clone(),
521 target_properties: self.target_properties.clone(),
522 target_label_name: self.target_label_name.clone(),
523 graph_ctx: self.graph_ctx.clone(),
524 optional: self.optional,
525 optional_pattern_vars: self.optional_pattern_vars.clone(),
526 bound_target_column: self.bound_target_column.clone(),
527 used_edge_columns: self.used_edge_columns.clone(),
528 schema: self.schema.clone(),
529 state: TraverseStreamState::Warming(warm_fut),
530 metrics,
531 }))
532 }
533
534 fn metrics(&self) -> Option<MetricsSet> {
535 Some(self.metrics.clone_inner())
536 }
537}
538
539enum TraverseStreamState {
541 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
543 Reading,
545 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
547 Done,
549}
550
551struct GraphTraverseStream {
553 input: SendableRecordBatchStream,
555
556 source_column: String,
558
559 edge_type_ids: Vec<u32>,
561
562 direction: Direction,
564
565 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
567 target_variable: String,
568
569 edge_variable: Option<String>,
571
572 edge_properties: Vec<String>,
574
575 target_properties: Vec<String>,
577
578 target_label_name: Option<String>,
580
581 graph_ctx: Arc<GraphExecutionContext>,
583
584 optional: bool,
586
587 optional_pattern_vars: HashSet<String>,
589
590 bound_target_column: Option<String>,
592
593 used_edge_columns: Vec<String>,
595
596 schema: SchemaRef,
598
599 state: TraverseStreamState,
601
602 metrics: BaselineMetrics,
604}
605
606impl GraphTraverseStream {
607 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
610 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
611 datafusion::error::DataFusionError::Execution(format!(
612 "Source column '{}' not found",
613 self.source_column
614 ))
615 })?;
616
617 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
618 let source_vids: &UInt64Array = &source_vid_cow;
619
620 let bound_target_cow = self
623 .bound_target_column
624 .as_ref()
625 .and_then(|col| batch.column_by_name(col))
626 .map(|c| column_as_vid_array(c.as_ref()))
627 .transpose()?;
628 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
629
630 let used_edge_arrays: Vec<&UInt64Array> = self
632 .used_edge_columns
633 .iter()
634 .filter_map(|col| {
635 batch
636 .column_by_name(col)
637 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
638 })
639 .collect();
640
641 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
642 let is_undirected = matches!(self.direction, Direction::Both);
643
644 for (row_idx, source_vid) in source_vids.iter().enumerate() {
645 let Some(src) = source_vid else {
646 continue;
647 };
648
649 let expected_target = bound_target_vids.map(|arr| {
655 if arr.is_null(row_idx) {
656 None
657 } else {
658 Some(arr.value(row_idx))
659 }
660 });
661
662 let used_eids: HashSet<u64> = used_edge_arrays
664 .iter()
665 .filter_map(|arr| {
666 if arr.is_null(row_idx) {
667 None
668 } else {
669 Some(arr.value(row_idx))
670 }
671 })
672 .collect();
673
674 let vid = Vid::from(src);
675 let mut seen_edges: HashSet<u64> = HashSet::new();
678
679 for &edge_type in &self.edge_type_ids {
680 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
681
682 for (target_vid, eid) in neighbors {
683 let eid_u64 = eid.as_u64();
684
685 if used_eids.contains(&eid_u64) {
687 continue;
688 }
689
690 if is_undirected && !seen_edges.insert(eid_u64) {
692 continue;
693 }
694
695 if let Some(expected_opt) = expected_target {
698 let Some(expected) = expected_opt else {
699 continue;
700 };
701 if target_vid.as_u64() != expected {
702 continue;
703 }
704 }
705
706 if let Some(ref label_name) = self.target_label_name {
709 let query_ctx = self.graph_ctx.query_context();
710 if let Some(vertex_labels) =
711 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
712 {
713 if !vertex_labels.contains(label_name) {
715 continue;
716 }
717 }
718 }
720
721 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
722 }
723 }
724 }
725
726 Ok(expanded_rows)
727 }
728}
729
730fn build_target_labels_column(
732 target_vids: &[Vid],
733 target_label_name: &Option<String>,
734 graph_ctx: &GraphExecutionContext,
735) -> ArrayRef {
736 use arrow_array::builder::{ListBuilder, StringBuilder};
737 let mut labels_builder = ListBuilder::new(StringBuilder::new());
738 let query_ctx = graph_ctx.query_context();
739 for vid in target_vids {
740 let row_labels: Vec<String> =
741 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
742 Some(labels) => labels,
743 None => {
744 if let Some(label_name) = target_label_name {
746 vec![label_name.clone()]
747 } else {
748 vec![]
749 }
750 }
751 };
752 let values = labels_builder.values();
753 for lbl in &row_labels {
754 values.append_value(lbl);
755 }
756 labels_builder.append(true);
757 }
758 Arc::new(labels_builder.finish())
759}
760
761async fn build_target_property_columns(
763 target_vids: &[Vid],
764 target_properties: &[String],
765 target_label_name: &Option<String>,
766 graph_ctx: &Arc<GraphExecutionContext>,
767) -> DFResult<Vec<ArrayRef>> {
768 let mut columns = Vec::new();
769
770 if let Some(label_name) = target_label_name {
771 let property_manager = graph_ctx.property_manager();
772 let query_ctx = graph_ctx.query_context();
773
774 let props_map = property_manager
775 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
776 .await
777 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
778
779 let uni_schema = graph_ctx.storage().schema_manager().schema();
780 let label_props = uni_schema.properties.get(label_name.as_str());
781
782 for prop_name in target_properties {
783 let data_type = resolve_property_type(prop_name, label_props);
784 let column =
785 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
786 columns.push(column);
787 }
788 } else {
789 let non_internal_props: Vec<&str> = target_properties
791 .iter()
792 .filter(|p| *p != "_all_props")
793 .map(|s| s.as_str())
794 .collect();
795 let property_manager = graph_ctx.property_manager();
796 let query_ctx = graph_ctx.query_context();
797
798 let props_map = if !non_internal_props.is_empty() {
799 property_manager
800 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
801 .await
802 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
803 } else {
804 std::collections::HashMap::new()
805 };
806
807 for prop_name in target_properties {
808 if prop_name == "_all_props" {
809 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
810 } else {
811 let column = build_property_column_static(
812 target_vids,
813 &props_map,
814 prop_name,
815 &arrow::datatypes::DataType::LargeBinary,
816 )?;
817 columns.push(column);
818 }
819 }
820 }
821
822 Ok(columns)
823}
824
825fn build_all_props_column(
827 target_vids: &[Vid],
828 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
829 graph_ctx: &Arc<GraphExecutionContext>,
830) -> ArrayRef {
831 use arrow_array::builder::LargeBinaryBuilder;
832
833 let mut builder = LargeBinaryBuilder::new();
834 let l0_ctx = graph_ctx.l0_context();
835 for vid in target_vids {
836 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
838 if let Some(vid_props) = props_map.get(vid) {
839 for (k, v) in vid_props.iter() {
840 merged_props.insert(k.clone(), v.clone());
841 }
842 }
843 for l0 in l0_ctx.iter_l0_buffers() {
844 let guard = l0.read();
845 if let Some(l0_props) = guard.vertex_properties.get(vid) {
846 for (k, v) in l0_props.iter() {
847 merged_props.insert(k.clone(), v.clone());
848 }
849 }
850 }
851 if merged_props.is_empty() {
852 builder.append_null();
853 } else {
854 builder.append_value(uni_common::cypher_value_codec::encode(
855 &uni_common::Value::Map(merged_props),
856 ));
857 }
858 }
859 Arc::new(builder.finish())
860}
861
862async fn build_edge_columns(
864 expansions: &[(usize, Vid, u64, u32)],
865 edge_properties: &[String],
866 edge_type_ids: &[u32],
867 graph_ctx: &Arc<GraphExecutionContext>,
868) -> DFResult<Vec<ArrayRef>> {
869 let mut columns = Vec::new();
870
871 let eids: Vec<Eid> = expansions
872 .iter()
873 .map(|(_, _, eid, _)| Eid::from(*eid))
874 .collect();
875 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
876 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
877
878 {
880 let uni_schema = graph_ctx.storage().schema_manager().schema();
881 let mut type_builder = arrow_array::builder::StringBuilder::new();
882 for (_, _, _, edge_type_id) in expansions {
883 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
884 type_builder.append_value(&name);
885 } else {
886 type_builder.append_null();
887 }
888 }
889 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
890 }
891
892 if !edge_properties.is_empty() {
893 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
894 let property_manager = graph_ctx.property_manager();
895 let query_ctx = graph_ctx.query_context();
896
897 let props_map = property_manager
898 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
899 .await
900 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
901
902 let uni_schema = graph_ctx.storage().schema_manager().schema();
903 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
904 let edge_type_props = if merged_edge_props.is_empty() {
905 None
906 } else {
907 Some(&merged_edge_props)
908 };
909
910 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
911
912 for prop_name in edge_properties {
913 if prop_name == "_created_at" || prop_name == "_updated_at" {
919 let mut builder =
920 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
921 let l0_ctx = graph_ctx.l0_context();
922 for eid in &eids {
923 let mut value: Option<i64> = None;
924 for l0 in l0_ctx.iter_l0_buffers() {
925 let guard = l0.read();
926 let opt = if prop_name == "_created_at" {
927 guard.edge_created_at.get(eid).copied()
928 } else {
929 guard.edge_updated_at.get(eid).copied()
930 };
931 if let Some(ts) = opt {
932 value = Some(match value {
933 None => ts,
934 Some(cur) if prop_name == "_created_at" => cur.min(ts),
935 Some(cur) => cur.max(ts),
936 });
937 }
938 }
939 match value {
940 Some(ts) => builder.append_value(ts),
941 None => builder.append_null(),
942 }
943 }
944 columns.push(Arc::new(builder.finish()) as ArrayRef);
945 continue;
946 }
947 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
948 let column =
949 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
950 columns.push(column);
951 }
952 }
953
954 Ok(columns)
955}
956
957#[expect(
962 clippy::too_many_arguments,
963 reason = "Standalone async fn needs all context passed explicitly"
964)]
965async fn build_traverse_output_batch(
966 input: RecordBatch,
967 expansions: Vec<(usize, Vid, u64, u32)>,
968 schema: SchemaRef,
969 edge_variable: Option<String>,
970 edge_properties: Vec<String>,
971 edge_type_ids: Vec<u32>,
972 target_properties: Vec<String>,
973 target_label_name: Option<String>,
974 graph_ctx: Arc<GraphExecutionContext>,
975 optional: bool,
976 optional_pattern_vars: HashSet<String>,
977) -> DFResult<RecordBatch> {
978 if expansions.is_empty() {
979 if !optional {
980 return Ok(RecordBatch::new_empty(schema));
981 }
982 let unmatched_reps = collect_unmatched_optional_group_rows(
983 &input,
984 &HashSet::new(),
985 &schema,
986 &optional_pattern_vars,
987 )?;
988 if unmatched_reps.is_empty() {
989 return Ok(RecordBatch::new_empty(schema));
990 }
991 return build_optional_null_batch_for_rows_with_optional_vars(
992 &input,
993 &unmatched_reps,
994 &schema,
995 &optional_pattern_vars,
996 );
997 }
998
999 let indices: Vec<u64> = expansions
1001 .iter()
1002 .map(|(idx, _, _, _)| *idx as u64)
1003 .collect();
1004 let indices_array = UInt64Array::from(indices);
1005 let mut columns: Vec<ArrayRef> = input
1006 .columns()
1007 .iter()
1008 .map(|col| take(col.as_ref(), &indices_array, None))
1009 .collect::<Result<_, _>>()?;
1010
1011 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1013 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1014 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1015
1016 columns.push(build_target_labels_column(
1018 &target_vids,
1019 &target_label_name,
1020 &graph_ctx,
1021 ));
1022
1023 if !target_properties.is_empty() {
1025 let prop_cols = build_target_property_columns(
1026 &target_vids,
1027 &target_properties,
1028 &target_label_name,
1029 &graph_ctx,
1030 )
1031 .await?;
1032 columns.extend(prop_cols);
1033 }
1034
1035 if edge_variable.is_some() {
1037 let edge_cols =
1038 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1039 columns.extend(edge_cols);
1040 } else {
1041 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1042 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1043 }
1044
1045 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1046
1047 if optional {
1049 let matched_indices: HashSet<usize> =
1050 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1051 let unmatched = collect_unmatched_optional_group_rows(
1052 &input,
1053 &matched_indices,
1054 &schema,
1055 &optional_pattern_vars,
1056 )?;
1057
1058 if !unmatched.is_empty() {
1059 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1060 &input,
1061 &unmatched,
1062 &schema,
1063 &optional_pattern_vars,
1064 )?;
1065 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1066 .map_err(arrow_err)?;
1067 return Ok(combined);
1068 }
1069 }
1070
1071 Ok(expanded_batch)
1072}
1073
1074fn build_optional_null_batch_for_rows(
1077 input: &RecordBatch,
1078 unmatched_indices: &[usize],
1079 schema: &SchemaRef,
1080) -> DFResult<RecordBatch> {
1081 let num_rows = unmatched_indices.len();
1082 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1083 let indices_array = UInt64Array::from(indices);
1084
1085 let mut columns: Vec<ArrayRef> = Vec::new();
1087 for col in input.columns() {
1088 let taken = take(col.as_ref(), &indices_array, None)?;
1089 columns.push(taken);
1090 }
1091 for field in schema.fields().iter().skip(input.num_columns()) {
1093 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1094 }
1095 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1096}
1097
1098fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1099 optional_vars.contains(col_name)
1100 || optional_vars.iter().any(|var| {
1101 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1102 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1103 })
1104}
1105
1106fn collect_unmatched_optional_group_rows(
1107 input: &RecordBatch,
1108 matched_indices: &HashSet<usize>,
1109 schema: &SchemaRef,
1110 optional_vars: &HashSet<String>,
1111) -> DFResult<Vec<usize>> {
1112 if input.num_rows() == 0 {
1113 return Ok(Vec::new());
1114 }
1115
1116 if optional_vars.is_empty() {
1117 return Ok((0..input.num_rows())
1118 .filter(|idx| !matched_indices.contains(idx))
1119 .collect());
1120 }
1121
1122 let source_vid_indices: Vec<usize> = schema
1123 .fields()
1124 .iter()
1125 .enumerate()
1126 .filter_map(|(idx, field)| {
1127 if idx >= input.num_columns() {
1128 return None;
1129 }
1130 let name = field.name();
1131 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1132 Some(idx)
1133 } else {
1134 None
1135 }
1136 })
1137 .collect();
1138
1139 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1142
1143 for row_idx in 0..input.num_rows() {
1144 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1145 let entry = groups.entry(key.clone());
1146 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1147 group_order.push(key.clone());
1148 }
1149 let matched = matched_indices.contains(&row_idx);
1150 entry
1151 .and_modify(|(_, any_matched)| *any_matched |= matched)
1152 .or_insert((row_idx, matched));
1153 }
1154
1155 Ok(group_order
1156 .into_iter()
1157 .filter_map(|key| {
1158 groups
1159 .get(&key)
1160 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1161 })
1162 .collect())
1163}
1164
1165fn compute_optional_group_key(
1166 batch: &RecordBatch,
1167 row_idx: usize,
1168 source_vid_indices: &[usize],
1169) -> DFResult<Vec<u8>> {
1170 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1171 for &col_idx in source_vid_indices {
1172 let col = batch.column(col_idx);
1173 let vid_cow = column_as_vid_array(col.as_ref())?;
1174 let arr: &UInt64Array = &vid_cow;
1175 if arr.is_null(row_idx) {
1176 key.extend_from_slice(&u64::MAX.to_le_bytes());
1177 } else {
1178 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1179 }
1180 }
1181 Ok(key)
1182}
1183
1184fn build_optional_null_batch_for_rows_with_optional_vars(
1185 input: &RecordBatch,
1186 unmatched_indices: &[usize],
1187 schema: &SchemaRef,
1188 optional_vars: &HashSet<String>,
1189) -> DFResult<RecordBatch> {
1190 if optional_vars.is_empty() {
1191 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1192 }
1193
1194 let num_rows = unmatched_indices.len();
1195 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1196 let indices_array = UInt64Array::from(indices);
1197
1198 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1199 for (col_idx, field) in schema.fields().iter().enumerate() {
1200 if col_idx < input.num_columns() {
1201 if is_optional_column_for_vars(field.name(), optional_vars) {
1202 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1203 } else {
1204 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1205 columns.push(taken);
1206 }
1207 } else {
1208 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1209 }
1210 }
1211
1212 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1213}
1214
1215impl Stream for GraphTraverseStream {
1216 type Item = DFResult<RecordBatch>;
1217
1218 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1219 let metrics = self.metrics.clone();
1220 let _timer = metrics.elapsed_compute().timer();
1221 loop {
1222 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1223
1224 match state {
1225 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1226 Poll::Ready(Ok(())) => {
1227 self.state = TraverseStreamState::Reading;
1228 }
1230 Poll::Ready(Err(e)) => {
1231 self.state = TraverseStreamState::Done;
1232 return Poll::Ready(Some(Err(e)));
1233 }
1234 Poll::Pending => {
1235 self.state = TraverseStreamState::Warming(fut);
1236 return Poll::Pending;
1237 }
1238 },
1239 TraverseStreamState::Reading => {
1240 if let Err(e) = self.graph_ctx.check_timeout() {
1242 return Poll::Ready(Some(Err(
1243 datafusion::error::DataFusionError::Execution(e.to_string()),
1244 )));
1245 }
1246
1247 match self.input.poll_next_unpin(cx) {
1248 Poll::Ready(Some(Ok(batch))) => {
1249 let expansions = match self.expand_neighbors(&batch) {
1251 Ok(exp) => exp,
1252 Err(e) => {
1253 self.state = TraverseStreamState::Reading;
1254 return Poll::Ready(Some(Err(e)));
1255 }
1256 };
1257
1258 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1260 {
1261 let result = build_traverse_output_batch_sync(
1262 &batch,
1263 &expansions,
1264 &self.schema,
1265 self.edge_variable.as_ref(),
1266 &self.graph_ctx,
1267 self.optional,
1268 &self.optional_pattern_vars,
1269 );
1270 self.state = TraverseStreamState::Reading;
1271 if let Ok(ref r) = result {
1272 self.metrics.record_output(r.num_rows());
1273 }
1274 return Poll::Ready(Some(result));
1275 }
1276
1277 let schema = self.schema.clone();
1279 let edge_variable = self.edge_variable.clone();
1280 let edge_properties = self.edge_properties.clone();
1281 let edge_type_ids = self.edge_type_ids.clone();
1282 let target_properties = self.target_properties.clone();
1283 let target_label_name = self.target_label_name.clone();
1284 let graph_ctx = self.graph_ctx.clone();
1285
1286 let optional = self.optional;
1287 let optional_pattern_vars = self.optional_pattern_vars.clone();
1288
1289 let fut = build_traverse_output_batch(
1290 batch,
1291 expansions,
1292 schema,
1293 edge_variable,
1294 edge_properties,
1295 edge_type_ids,
1296 target_properties,
1297 target_label_name,
1298 graph_ctx,
1299 optional,
1300 optional_pattern_vars,
1301 );
1302
1303 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1304 }
1306 Poll::Ready(Some(Err(e))) => {
1307 self.state = TraverseStreamState::Done;
1308 return Poll::Ready(Some(Err(e)));
1309 }
1310 Poll::Ready(None) => {
1311 self.state = TraverseStreamState::Done;
1312 return Poll::Ready(None);
1313 }
1314 Poll::Pending => {
1315 self.state = TraverseStreamState::Reading;
1316 return Poll::Pending;
1317 }
1318 }
1319 }
1320 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1321 Poll::Ready(Ok(batch)) => {
1322 self.state = TraverseStreamState::Reading;
1323 self.metrics.record_output(batch.num_rows());
1324 return Poll::Ready(Some(Ok(batch)));
1325 }
1326 Poll::Ready(Err(e)) => {
1327 self.state = TraverseStreamState::Done;
1328 return Poll::Ready(Some(Err(e)));
1329 }
1330 Poll::Pending => {
1331 self.state = TraverseStreamState::Materializing(fut);
1332 return Poll::Pending;
1333 }
1334 },
1335 TraverseStreamState::Done => {
1336 return Poll::Ready(None);
1337 }
1338 }
1339 }
1340 }
1341}
1342
1343fn build_traverse_output_batch_sync(
1348 input: &RecordBatch,
1349 expansions: &[(usize, Vid, u64, u32)],
1350 schema: &SchemaRef,
1351 edge_variable: Option<&String>,
1352 graph_ctx: &GraphExecutionContext,
1353 optional: bool,
1354 optional_pattern_vars: &HashSet<String>,
1355) -> DFResult<RecordBatch> {
1356 if expansions.is_empty() {
1357 if !optional {
1358 return Ok(RecordBatch::new_empty(schema.clone()));
1359 }
1360 let unmatched_reps = collect_unmatched_optional_group_rows(
1361 input,
1362 &HashSet::new(),
1363 schema,
1364 optional_pattern_vars,
1365 )?;
1366 if unmatched_reps.is_empty() {
1367 return Ok(RecordBatch::new_empty(schema.clone()));
1368 }
1369 return build_optional_null_batch_for_rows_with_optional_vars(
1370 input,
1371 &unmatched_reps,
1372 schema,
1373 optional_pattern_vars,
1374 );
1375 }
1376
1377 let indices: Vec<u64> = expansions
1378 .iter()
1379 .map(|(idx, _, _, _)| *idx as u64)
1380 .collect();
1381 let indices_array = UInt64Array::from(indices);
1382
1383 let mut columns: Vec<ArrayRef> = Vec::new();
1384 for col in input.columns() {
1385 let expanded = take(col.as_ref(), &indices_array, None)?;
1386 columns.push(expanded);
1387 }
1388
1389 let target_vids: Vec<u64> = expansions
1391 .iter()
1392 .map(|(_, vid, _, _)| vid.as_u64())
1393 .collect();
1394 columns.push(Arc::new(UInt64Array::from(target_vids)));
1395
1396 {
1398 use arrow_array::builder::{ListBuilder, StringBuilder};
1399 let l0_ctx = graph_ctx.l0_context();
1400 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1401 for (_, vid, _, _) in expansions {
1402 let mut row_labels: Vec<String> = Vec::new();
1403 for l0 in l0_ctx.iter_l0_buffers() {
1404 let guard = l0.read();
1405 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1406 for lbl in l0_labels {
1407 if !row_labels.contains(lbl) {
1408 row_labels.push(lbl.clone());
1409 }
1410 }
1411 }
1412 }
1413 let values = labels_builder.values();
1414 for lbl in &row_labels {
1415 values.append_value(lbl);
1416 }
1417 labels_builder.append(true);
1418 }
1419 columns.push(Arc::new(labels_builder.finish()));
1420 }
1421
1422 if edge_variable.is_some() {
1424 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1425 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1426
1427 let uni_schema = graph_ctx.storage().schema_manager().schema();
1429 let mut type_builder = arrow_array::builder::StringBuilder::new();
1430 for (_, _, _, edge_type_id) in expansions {
1431 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1432 type_builder.append_value(&name);
1433 } else {
1434 type_builder.append_null();
1435 }
1436 }
1437 columns.push(Arc::new(type_builder.finish()));
1438 } else {
1439 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1441 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1442 }
1443
1444 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1445
1446 if optional {
1447 let matched_indices: HashSet<usize> =
1448 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1449 let unmatched = collect_unmatched_optional_group_rows(
1450 input,
1451 &matched_indices,
1452 schema,
1453 optional_pattern_vars,
1454 )?;
1455
1456 if !unmatched.is_empty() {
1457 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1458 input,
1459 &unmatched,
1460 schema,
1461 optional_pattern_vars,
1462 )?;
1463 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1464 .map_err(|e| {
1465 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1466 })?;
1467 return Ok(combined);
1468 }
1469 }
1470
1471 Ok(expanded_batch)
1472}
1473
1474impl RecordBatchStream for GraphTraverseStream {
1475 fn schema(&self) -> SchemaRef {
1476 self.schema.clone()
1477 }
1478}
1479
1480type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1486
1487pub struct GraphTraverseMainExec {
1510 input: Arc<dyn ExecutionPlan>,
1512
1513 source_column: String,
1515
1516 type_names: Vec<String>,
1519
1520 direction: Direction,
1522
1523 target_variable: String,
1525
1526 edge_variable: Option<String>,
1528
1529 edge_properties: Vec<String>,
1531
1532 target_properties: Vec<String>,
1534
1535 graph_ctx: Arc<GraphExecutionContext>,
1537
1538 optional: bool,
1540
1541 optional_pattern_vars: HashSet<String>,
1543
1544 bound_target_column: Option<String>,
1547
1548 used_edge_columns: Vec<String>,
1551
1552 schema: SchemaRef,
1554
1555 properties: Arc<PlanProperties>,
1557
1558 metrics: ExecutionPlanMetricsSet,
1560}
1561
1562impl fmt::Debug for GraphTraverseMainExec {
1563 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1564 f.debug_struct("GraphTraverseMainExec")
1565 .field("type_names", &self.type_names)
1566 .field("direction", &self.direction)
1567 .field("target_variable", &self.target_variable)
1568 .field("edge_variable", &self.edge_variable)
1569 .finish()
1570 }
1571}
1572
1573impl GraphTraverseMainExec {
1574 #[expect(clippy::too_many_arguments)]
1576 pub fn new(
1577 input: Arc<dyn ExecutionPlan>,
1578 source_column: impl Into<String>,
1579 type_names: Vec<String>,
1580 direction: Direction,
1581 target_variable: impl Into<String>,
1582 edge_variable: Option<String>,
1583 edge_properties: Vec<String>,
1584 target_properties: Vec<String>,
1585 graph_ctx: Arc<GraphExecutionContext>,
1586 optional: bool,
1587 optional_pattern_vars: HashSet<String>,
1588 bound_target_column: Option<String>,
1589 used_edge_columns: Vec<String>,
1590 ) -> Self {
1591 let source_column = source_column.into();
1592 let target_variable = target_variable.into();
1593
1594 let schema = Self::build_schema(
1596 &input.schema(),
1597 &target_variable,
1598 &edge_variable,
1599 &edge_properties,
1600 &target_properties,
1601 optional,
1602 );
1603
1604 let properties = compute_plan_properties(schema.clone());
1605
1606 Self {
1607 input,
1608 source_column,
1609 type_names,
1610 direction,
1611 target_variable,
1612 edge_variable,
1613 edge_properties,
1614 target_properties,
1615 graph_ctx,
1616 optional,
1617 optional_pattern_vars,
1618 bound_target_column,
1619 used_edge_columns,
1620 schema,
1621 properties,
1622 metrics: ExecutionPlanMetricsSet::new(),
1623 }
1624 }
1625
1626 fn build_schema(
1628 input_schema: &SchemaRef,
1629 target_variable: &str,
1630 edge_variable: &Option<String>,
1631 edge_properties: &[String],
1632 target_properties: &[String],
1633 optional: bool,
1634 ) -> SchemaRef {
1635 let mut fields: Vec<Field> = input_schema
1636 .fields()
1637 .iter()
1638 .map(|f| f.as_ref().clone())
1639 .collect();
1640
1641 let target_vid_name = format!("{}._vid", target_variable);
1643 if input_schema.column_with_name(&target_vid_name).is_none() {
1644 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1645 }
1646
1647 let target_labels_name = format!("{}._labels", target_variable);
1649 if input_schema.column_with_name(&target_labels_name).is_none() {
1650 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1651 }
1652
1653 if let Some(edge_var) = edge_variable {
1655 fields.push(Field::new(
1656 format!("{}._eid", edge_var),
1657 DataType::UInt64,
1658 optional,
1659 ));
1660
1661 fields.push(Field::new(
1663 format!("{}._type", edge_var),
1664 DataType::Utf8,
1665 true,
1666 ));
1667
1668 for prop in edge_properties {
1672 let col_name = format!("{}.{}", edge_var, prop);
1673 let mut metadata = std::collections::HashMap::new();
1674 metadata.insert("cv_encoded".to_string(), "true".to_string());
1675 fields.push(
1676 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1677 );
1678 }
1679 } else {
1680 fields.push(Field::new(
1683 format!("__eid_to_{}", target_variable),
1684 DataType::UInt64,
1685 optional,
1686 ));
1687 }
1688
1689 for prop in target_properties {
1691 fields.push(Field::new(
1692 format!("{}.{}", target_variable, prop),
1693 DataType::LargeBinary,
1694 true,
1695 ));
1696 }
1697
1698 Arc::new(Schema::new(fields))
1699 }
1700}
1701
1702impl DisplayAs for GraphTraverseMainExec {
1703 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1704 write!(
1705 f,
1706 "GraphTraverseMainExec: types={:?}, direction={:?}",
1707 self.type_names, self.direction
1708 )
1709 }
1710}
1711
1712impl ExecutionPlan for GraphTraverseMainExec {
1713 fn name(&self) -> &str {
1714 "GraphTraverseMainExec"
1715 }
1716
1717 fn as_any(&self) -> &dyn Any {
1718 self
1719 }
1720
1721 fn schema(&self) -> SchemaRef {
1722 self.schema.clone()
1723 }
1724
1725 fn properties(&self) -> &Arc<PlanProperties> {
1726 &self.properties
1727 }
1728
1729 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1730 vec![&self.input]
1731 }
1732
1733 fn with_new_children(
1734 self: Arc<Self>,
1735 children: Vec<Arc<dyn ExecutionPlan>>,
1736 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1737 if children.len() != 1 {
1738 return Err(datafusion::error::DataFusionError::Plan(
1739 "GraphTraverseMainExec expects exactly one child".to_string(),
1740 ));
1741 }
1742
1743 Ok(Arc::new(Self {
1744 input: children[0].clone(),
1745 source_column: self.source_column.clone(),
1746 type_names: self.type_names.clone(),
1747 direction: self.direction,
1748 target_variable: self.target_variable.clone(),
1749 edge_variable: self.edge_variable.clone(),
1750 edge_properties: self.edge_properties.clone(),
1751 target_properties: self.target_properties.clone(),
1752 graph_ctx: self.graph_ctx.clone(),
1753 optional: self.optional,
1754 optional_pattern_vars: self.optional_pattern_vars.clone(),
1755 bound_target_column: self.bound_target_column.clone(),
1756 used_edge_columns: self.used_edge_columns.clone(),
1757 schema: self.schema.clone(),
1758 properties: self.properties.clone(),
1759 metrics: self.metrics.clone(),
1760 }))
1761 }
1762
1763 fn execute(
1764 &self,
1765 partition: usize,
1766 context: Arc<TaskContext>,
1767 ) -> DFResult<SendableRecordBatchStream> {
1768 let input_stream = self.input.execute(partition, context)?;
1769 let metrics = BaselineMetrics::new(&self.metrics, partition);
1770
1771 Ok(Box::pin(GraphTraverseMainStream::new(
1772 input_stream,
1773 self.source_column.clone(),
1774 self.type_names.clone(),
1775 self.direction,
1776 self.target_variable.clone(),
1777 self.edge_variable.clone(),
1778 self.edge_properties.clone(),
1779 self.target_properties.clone(),
1780 self.graph_ctx.clone(),
1781 self.optional,
1782 self.optional_pattern_vars.clone(),
1783 self.bound_target_column.clone(),
1784 self.used_edge_columns.clone(),
1785 self.schema.clone(),
1786 metrics,
1787 )))
1788 }
1789
1790 fn metrics(&self) -> Option<MetricsSet> {
1791 Some(self.metrics.clone_inner())
1792 }
1793}
1794
1795const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1801
1802enum GraphTraverseMainState {
1810 CollectingInput {
1812 input_stream: SendableRecordBatchStream,
1813 buffered: Vec<RecordBatch>,
1814 },
1815 LoadingEdges {
1817 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1818 buffered: Vec<RecordBatch>,
1819 },
1820 Processing {
1822 adjacency: EdgeAdjacencyMap,
1823 buffered: std::vec::IntoIter<RecordBatch>,
1824 },
1825 Done,
1827}
1828
1829struct GraphTraverseMainStream {
1831 source_column: String,
1833
1834 target_variable: String,
1836
1837 edge_variable: Option<String>,
1839
1840 edge_properties: Vec<String>,
1842
1843 target_properties: Vec<String>,
1845
1846 graph_ctx: Arc<GraphExecutionContext>,
1848
1849 optional: bool,
1851
1852 optional_pattern_vars: HashSet<String>,
1854
1855 bound_target_column: Option<String>,
1857
1858 used_edge_columns: Vec<String>,
1860
1861 schema: SchemaRef,
1863
1864 type_names: Vec<String>,
1866
1867 direction: Direction,
1869
1870 state: GraphTraverseMainState,
1872
1873 metrics: BaselineMetrics,
1875}
1876
1877impl GraphTraverseMainStream {
1878 #[expect(clippy::too_many_arguments)]
1880 fn new(
1881 input_stream: SendableRecordBatchStream,
1882 source_column: String,
1883 type_names: Vec<String>,
1884 direction: Direction,
1885 target_variable: String,
1886 edge_variable: Option<String>,
1887 edge_properties: Vec<String>,
1888 target_properties: Vec<String>,
1889 graph_ctx: Arc<GraphExecutionContext>,
1890 optional: bool,
1891 optional_pattern_vars: HashSet<String>,
1892 bound_target_column: Option<String>,
1893 used_edge_columns: Vec<String>,
1894 schema: SchemaRef,
1895 metrics: BaselineMetrics,
1896 ) -> Self {
1897 Self {
1900 source_column,
1901 target_variable,
1902 edge_variable,
1903 edge_properties,
1904 target_properties,
1905 graph_ctx,
1906 optional,
1907 optional_pattern_vars,
1908 bound_target_column,
1909 used_edge_columns,
1910 schema,
1911 type_names,
1912 direction,
1913 state: GraphTraverseMainState::CollectingInput {
1914 input_stream,
1915 buffered: Vec::new(),
1916 },
1917 metrics,
1918 }
1919 }
1920
1921 fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1926 let mut vids: HashSet<Vid> = HashSet::new();
1927 for batch in buffered {
1928 let col = batch.column_by_name(&self.source_column)?;
1929 let arr = column_as_vid_array(col.as_ref()).ok()?;
1930 for i in 0..arr.len() {
1931 if !arr.is_null(i) {
1932 vids.insert(Vid::from(arr.value(i)));
1933 if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1934 return None;
1935 }
1936 }
1937 }
1938 }
1939 Some(vids)
1940 }
1941
1942 fn expand_batch(
1944 &self,
1945 input: &RecordBatch,
1946 adjacency: &EdgeAdjacencyMap,
1947 ) -> DFResult<RecordBatch> {
1948 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1950 datafusion::error::DataFusionError::Execution(format!(
1951 "Source column {} not found",
1952 self.source_column
1953 ))
1954 })?;
1955
1956 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1957 let source_vids: &UInt64Array = &source_vid_cow;
1958
1959 let bound_target_cow = self
1961 .bound_target_column
1962 .as_ref()
1963 .and_then(|col| input.column_by_name(col))
1964 .map(|c| column_as_vid_array(c.as_ref()))
1965 .transpose()?;
1966 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1967
1968 let used_edge_arrays: Vec<&UInt64Array> = self
1970 .used_edge_columns
1971 .iter()
1972 .filter_map(|col| {
1973 input
1974 .column_by_name(col)
1975 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1976 })
1977 .collect();
1978
1979 type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1982 let mut expansions: Vec<Expansion> = Vec::new();
1983
1984 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1985 if let Some(src_u64) = src_u64 {
1986 let src_vid = Vid::from(src_u64);
1987
1988 let used_eids: HashSet<u64> = used_edge_arrays
1990 .iter()
1991 .filter_map(|arr| {
1992 if arr.is_null(row_idx) {
1993 None
1994 } else {
1995 Some(arr.value(row_idx))
1996 }
1997 })
1998 .collect();
1999
2000 if let Some(neighbors) = adjacency.get(&src_vid) {
2001 for (target_vid, eid, edge_type, props) in neighbors {
2002 if used_eids.contains(&eid.as_u64()) {
2004 continue;
2005 }
2006
2007 if let Some(targets) = expected_targets {
2010 if targets.is_null(row_idx) {
2011 continue;
2012 }
2013 let expected_vid = targets.value(row_idx);
2014 if target_vid.as_u64() != expected_vid {
2015 continue;
2016 }
2017 }
2018
2019 expansions.push((
2020 row_idx,
2021 *target_vid,
2022 *eid,
2023 Arc::clone(edge_type),
2024 Arc::clone(props),
2025 ));
2026 }
2027 }
2028 }
2029 }
2030
2031 if expansions.is_empty() && self.optional {
2033 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2035 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2036 }
2037
2038 if expansions.is_empty() {
2039 return Ok(RecordBatch::new_empty(self.schema.clone()));
2041 }
2042
2043 let matched_rows: HashSet<usize> = if self.optional {
2045 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2046 } else {
2047 HashSet::new()
2048 };
2049
2050 let mut columns: Vec<ArrayRef> = Vec::new();
2052 let indices: Vec<u64> = expansions
2053 .iter()
2054 .map(|(idx, _, _, _, _)| *idx as u64)
2055 .collect();
2056 let indices_array = UInt64Array::from(indices);
2057
2058 for col in input.columns() {
2059 let expanded = take(col.as_ref(), &indices_array, None)?;
2060 columns.push(expanded);
2061 }
2062
2063 let target_vid_name = format!("{}._vid", self.target_variable);
2065 let target_vids: Vec<u64> = expansions
2066 .iter()
2067 .map(|(_, vid, _, _, _)| vid.as_u64())
2068 .collect();
2069 if input.schema().column_with_name(&target_vid_name).is_none() {
2070 columns.push(Arc::new(UInt64Array::from(target_vids)));
2071 }
2072
2073 let target_labels_name = format!("{}._labels", self.target_variable);
2075 if input
2076 .schema()
2077 .column_with_name(&target_labels_name)
2078 .is_none()
2079 {
2080 use arrow_array::builder::{ListBuilder, StringBuilder};
2081 let l0_ctx = self.graph_ctx.l0_context();
2082 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2083 for (_, target_vid, _, _, _) in &expansions {
2084 let mut row_labels: Vec<String> = Vec::new();
2085 for l0 in l0_ctx.iter_l0_buffers() {
2086 let guard = l0.read();
2087 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2088 for lbl in l0_labels {
2089 if !row_labels.contains(lbl) {
2090 row_labels.push(lbl.clone());
2091 }
2092 }
2093 }
2094 }
2095 let values = labels_builder.values();
2096 for lbl in &row_labels {
2097 values.append_value(lbl);
2098 }
2099 labels_builder.append(true);
2100 }
2101 columns.push(Arc::new(labels_builder.finish()));
2102 }
2103
2104 if self.edge_variable.is_some() {
2107 let eids: Vec<u64> = expansions
2109 .iter()
2110 .map(|(_, _, eid, _, _)| eid.as_u64())
2111 .collect();
2112 columns.push(Arc::new(UInt64Array::from(eids)));
2113
2114 {
2116 let mut type_builder = arrow_array::builder::StringBuilder::new();
2117 for (_, _, _, edge_type, _) in &expansions {
2118 type_builder.append_value(edge_type);
2119 }
2120 columns.push(Arc::new(type_builder.finish()));
2121 }
2122
2123 for prop_name in &self.edge_properties {
2125 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2126 if prop_name == "_all_props" {
2127 for (_, _, _, _, props) in &expansions {
2130 if props.is_empty() {
2131 builder.append_null();
2132 } else {
2133 let map: HashMap<String, uni_common::Value> =
2134 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2135 builder.append_value(uni_common::cypher_value_codec::encode(
2136 &uni_common::Value::Map(map),
2137 ));
2138 }
2139 }
2140 } else {
2141 for (_, _, _, _, props) in &expansions {
2143 match props.get(prop_name) {
2144 Some(uni_common::Value::Null) | None => builder.append_null(),
2145 Some(val) => {
2146 builder.append_value(uni_common::cypher_value_codec::encode(val));
2147 }
2148 }
2149 }
2150 }
2151 columns.push(Arc::new(builder.finish()));
2152 }
2153 } else {
2154 let eids: Vec<u64> = expansions
2155 .iter()
2156 .map(|(_, _, eid, _, _)| eid.as_u64())
2157 .collect();
2158 columns.push(Arc::new(UInt64Array::from(eids)));
2159 }
2160
2161 {
2163 let l0_ctx = self.graph_ctx.l0_context();
2164
2165 for prop_name in &self.target_properties {
2166 if prop_name == "_all_props" {
2167 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2169 for (_, target_vid, _, _, _) in &expansions {
2170 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2172 for l0 in l0_ctx.iter_l0_buffers() {
2173 let guard = l0.read();
2174 if let Some(props) = guard.vertex_properties.get(target_vid) {
2175 for (k, v) in props.iter() {
2176 merged_props.insert(k.clone(), v.clone());
2177 }
2178 }
2179 }
2180 if merged_props.is_empty() {
2181 builder.append_null();
2182 } else {
2183 builder.append_value(uni_common::cypher_value_codec::encode(
2184 &uni_common::Value::Map(merged_props),
2185 ));
2186 }
2187 }
2188 columns.push(Arc::new(builder.finish()));
2189 } else {
2190 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2192 for (_, target_vid, _, _, _) in &expansions {
2193 let mut found = false;
2194 for l0 in l0_ctx.iter_l0_buffers() {
2195 let guard = l0.read();
2196 if let Some(props) = guard.vertex_properties.get(target_vid)
2197 && let Some(val) = props.get(prop_name.as_str())
2198 && !val.is_null()
2199 {
2200 builder.append_value(uni_common::cypher_value_codec::encode(val));
2201 found = true;
2202 break;
2203 }
2204 }
2205 if !found {
2206 builder.append_null();
2207 }
2208 }
2209 columns.push(Arc::new(builder.finish()));
2210 }
2211 }
2212 }
2213
2214 let matched_batch =
2215 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2216
2217 if self.optional {
2219 let unmatched = collect_unmatched_optional_group_rows(
2220 input,
2221 &matched_rows,
2222 &self.schema,
2223 &self.optional_pattern_vars,
2224 )?;
2225
2226 if unmatched.is_empty() {
2227 return Ok(matched_batch);
2228 }
2229
2230 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2231 input,
2232 &unmatched,
2233 &self.schema,
2234 &self.optional_pattern_vars,
2235 )?;
2236
2237 use arrow::compute::concat_batches;
2239 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2240 } else {
2241 Ok(matched_batch)
2242 }
2243 }
2244}
2245
2246impl GraphExecutionContext {
2247 fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2258 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2259 return;
2260 };
2261 let guard = tx_l0.read();
2262 let Some(read_set) = &guard.occ_read_set else {
2263 return;
2264 };
2265 let mut rs = read_set.lock();
2266 for (src, neighbors) in adjacency {
2267 rs.vertices.insert(*src);
2268 for (nbr, eid, _type, _props) in neighbors {
2269 rs.vertices.insert(*nbr);
2270 rs.edges.insert(*eid);
2271 }
2272 }
2273 }
2274}
2275
2276async fn build_edge_adjacency_map(
2287 graph_ctx: &GraphExecutionContext,
2288 type_names: &[String],
2289 direction: Direction,
2290 source_vids: Option<HashSet<Vid>>,
2291) -> DFResult<EdgeAdjacencyMap> {
2292 let storage = graph_ctx.storage();
2293 let l0_ctx = graph_ctx.l0_context();
2294
2295 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2298 let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2299 let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2300 let side = match direction {
2301 Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2302 Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2303 Direction::Both => uni_store::storage::EndpointSide::Either,
2304 };
2305 (side, vids)
2306 });
2307 let edges_with_type = storage
2308 .find_edges_by_type_names(&type_refs, endpoint_filter)
2309 .await
2310 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2311
2312 let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2316 match &source_vids {
2317 None => true,
2318 Some(set) => match direction {
2319 Direction::Outgoing => set.contains(&src),
2320 Direction::Incoming => set.contains(&dst),
2321 Direction::Both => set.contains(&src) || set.contains(&dst),
2322 },
2323 }
2324 };
2325
2326 let mut edges: Vec<(
2328 uni_common::Eid,
2329 uni_common::Vid,
2330 uni_common::Vid,
2331 String,
2332 uni_common::Properties,
2333 )> = edges_with_type.into_iter().collect();
2334
2335 for l0 in l0_ctx.iter_l0_buffers() {
2337 let l0_guard = l0.read();
2338
2339 for type_name in type_names {
2340 let l0_eids = l0_guard.eids_for_type(type_name);
2341
2342 for &eid in &l0_eids {
2344 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2345 let src_vid = edge_ref.src_vid;
2346 let dst_vid = edge_ref.dst_vid;
2347 if !edge_in_scope(src_vid, dst_vid) {
2348 continue;
2349 }
2350
2351 let props = l0_guard
2353 .edge_properties
2354 .get(&eid)
2355 .cloned()
2356 .unwrap_or_default();
2357
2358 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2359 }
2360 }
2361 }
2362 }
2363
2364 let mut seen_eids = HashSet::new();
2366 let mut unique_edges = Vec::new();
2367 for edge in edges.into_iter().rev() {
2368 if seen_eids.insert(edge.0) {
2369 unique_edges.push(edge);
2370 }
2371 }
2372 unique_edges.reverse();
2373
2374 let mut tombstoned_eids = HashSet::new();
2376 for l0 in l0_ctx.iter_l0_buffers() {
2377 let l0_guard = l0.read();
2378 for eid in l0_guard.tombstones.keys() {
2379 tombstoned_eids.insert(*eid);
2380 }
2381 }
2382 if !tombstoned_eids.is_empty() {
2383 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2384 }
2385
2386 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2389
2390 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2391 let edge_type: Arc<str> = edge_type.into();
2392 let props = Arc::new(props);
2393 match direction {
2394 Direction::Outgoing => {
2395 adjacency
2396 .entry(src_vid)
2397 .or_default()
2398 .push((dst_vid, eid, edge_type, props));
2399 }
2400 Direction::Incoming => {
2401 adjacency
2402 .entry(dst_vid)
2403 .or_default()
2404 .push((src_vid, eid, edge_type, props));
2405 }
2406 Direction::Both => {
2407 adjacency.entry(src_vid).or_default().push((
2408 dst_vid,
2409 eid,
2410 Arc::clone(&edge_type),
2411 Arc::clone(&props),
2412 ));
2413 adjacency
2414 .entry(dst_vid)
2415 .or_default()
2416 .push((src_vid, eid, edge_type, props));
2417 }
2418 }
2419 }
2420
2421 graph_ctx.record_edge_adjacency(&adjacency);
2429
2430 Ok(adjacency)
2431}
2432
2433async fn build_edge_property_filter(
2448 graph_ctx: &Arc<GraphExecutionContext>,
2449 edge_type_ids: &[u32],
2450 direction: Direction,
2451 conditions: &[(String, UniValue)],
2452) -> DFResult<EidFilter> {
2453 if conditions.is_empty() {
2454 return Ok(EidFilter::AllAllowed);
2455 }
2456
2457 let uni_schema = graph_ctx.storage().schema_manager().schema();
2458 let type_names: Vec<String> = edge_type_ids
2459 .iter()
2460 .filter_map(|id| uni_schema.edge_type_name_by_id_unified(*id))
2461 .collect();
2462 if type_names.is_empty() {
2463 return Ok(EidFilter::AllAllowed);
2464 }
2465
2466 let adjacency = build_edge_adjacency_map(graph_ctx, &type_names, direction, None).await?;
2470
2471 let mut passing: Vec<u64> = Vec::new();
2472 let mut max_eid: u64 = 0;
2473 let mut seen: FxHashSet<u64> = FxHashSet::default();
2474 for edges in adjacency.values() {
2475 for (_neighbor, eid, _etype, props) in edges {
2476 let raw = eid.as_u64();
2477 if !seen.insert(raw) {
2480 continue;
2481 }
2482 max_eid = max_eid.max(raw);
2483 let passes = conditions
2484 .iter()
2485 .all(|(name, expected)| props.get(name).is_some_and(|actual| actual == expected));
2486 if passes {
2487 passing.push(raw);
2488 }
2489 }
2490 }
2491
2492 Ok(EidFilter::from_eids(passing, max_eid as usize + 1))
2493}
2494
2495impl Stream for GraphTraverseMainStream {
2496 type Item = DFResult<RecordBatch>;
2497
2498 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2499 let metrics = self.metrics.clone();
2500 let _timer = metrics.elapsed_compute().timer();
2501 loop {
2502 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2503
2504 match state {
2505 GraphTraverseMainState::CollectingInput {
2506 mut input_stream,
2507 mut buffered,
2508 } => match input_stream.poll_next_unpin(cx) {
2509 Poll::Ready(Some(Ok(batch))) => {
2510 buffered.push(batch);
2511 self.state = GraphTraverseMainState::CollectingInput {
2512 input_stream,
2513 buffered,
2514 };
2515 }
2517 Poll::Ready(Some(Err(e))) => {
2518 self.state = GraphTraverseMainState::Done;
2519 return Poll::Ready(Some(Err(e)));
2520 }
2521 Poll::Ready(None) => {
2522 let source_vids = self.collect_source_vids(&buffered);
2525 let loading_ctx = self.graph_ctx.clone();
2526 let loading_types = self.type_names.clone();
2527 let direction = self.direction;
2528 let fut = async move {
2529 build_edge_adjacency_map(
2530 &loading_ctx,
2531 &loading_types,
2532 direction,
2533 source_vids,
2534 )
2535 .await
2536 };
2537 self.state = GraphTraverseMainState::LoadingEdges {
2538 future: Box::pin(fut),
2539 buffered,
2540 };
2541 }
2543 Poll::Pending => {
2544 self.state = GraphTraverseMainState::CollectingInput {
2545 input_stream,
2546 buffered,
2547 };
2548 return Poll::Pending;
2549 }
2550 },
2551 GraphTraverseMainState::LoadingEdges {
2552 mut future,
2553 buffered,
2554 } => match future.as_mut().poll(cx) {
2555 Poll::Ready(Ok(adjacency)) => {
2556 self.state = GraphTraverseMainState::Processing {
2558 adjacency,
2559 buffered: buffered.into_iter(),
2560 };
2561 }
2563 Poll::Ready(Err(e)) => {
2564 self.state = GraphTraverseMainState::Done;
2565 return Poll::Ready(Some(Err(e)));
2566 }
2567 Poll::Pending => {
2568 self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2569 return Poll::Pending;
2570 }
2571 },
2572 GraphTraverseMainState::Processing {
2573 adjacency,
2574 mut buffered,
2575 } => {
2576 if let Err(e) = self.graph_ctx.check_timeout() {
2578 return Poll::Ready(Some(Err(
2579 datafusion::error::DataFusionError::Execution(e.to_string()),
2580 )));
2581 }
2582
2583 match buffered.next() {
2584 Some(batch) => {
2585 let result = self.expand_batch(&batch, &adjacency);
2587
2588 self.state = GraphTraverseMainState::Processing {
2589 adjacency,
2590 buffered,
2591 };
2592
2593 if let Ok(ref r) = result {
2594 self.metrics.record_output(r.num_rows());
2595 }
2596 return Poll::Ready(Some(result));
2597 }
2598 None => {
2599 self.state = GraphTraverseMainState::Done;
2600 return Poll::Ready(None);
2601 }
2602 }
2603 }
2604 GraphTraverseMainState::Done => {
2605 return Poll::Ready(None);
2606 }
2607 }
2608 }
2609 }
2610}
2611
2612impl RecordBatchStream for GraphTraverseMainStream {
2613 fn schema(&self) -> SchemaRef {
2614 self.schema.clone()
2615 }
2616}
2617
2618pub struct GraphVariableLengthTraverseExec {
2639 input: Arc<dyn ExecutionPlan>,
2641
2642 source_column: String,
2644
2645 edge_type_ids: Vec<u32>,
2647
2648 direction: Direction,
2650
2651 min_hops: usize,
2653
2654 max_hops: usize,
2656
2657 target_variable: String,
2659
2660 step_variable: Option<String>,
2662
2663 path_variable: Option<String>,
2665
2666 target_properties: Vec<String>,
2668
2669 target_label_name: Option<String>,
2671
2672 is_optional: bool,
2674
2675 bound_target_column: Option<String>,
2677
2678 edge_lance_filter: Option<String>,
2680
2681 edge_property_conditions: Vec<(String, UniValue)>,
2684
2685 used_edge_columns: Vec<String>,
2687
2688 path_mode: super::nfa::PathMode,
2690
2691 output_mode: super::nfa::VlpOutputMode,
2693
2694 nfa: Arc<PathNfa>,
2696
2697 graph_ctx: Arc<GraphExecutionContext>,
2699
2700 schema: SchemaRef,
2702
2703 properties: Arc<PlanProperties>,
2705
2706 metrics: ExecutionPlanMetricsSet,
2708}
2709
2710impl fmt::Debug for GraphVariableLengthTraverseExec {
2711 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2712 f.debug_struct("GraphVariableLengthTraverseExec")
2713 .field("source_column", &self.source_column)
2714 .field("edge_type_ids", &self.edge_type_ids)
2715 .field("direction", &self.direction)
2716 .field("min_hops", &self.min_hops)
2717 .field("max_hops", &self.max_hops)
2718 .field("target_variable", &self.target_variable)
2719 .finish()
2720 }
2721}
2722
2723impl GraphVariableLengthTraverseExec {
2724 #[expect(clippy::too_many_arguments)]
2730 pub fn new(
2731 input: Arc<dyn ExecutionPlan>,
2732 source_column: impl Into<String>,
2733 edge_type_ids: Vec<u32>,
2734 direction: Direction,
2735 min_hops: usize,
2736 max_hops: usize,
2737 target_variable: impl Into<String>,
2738 step_variable: Option<String>,
2739 path_variable: Option<String>,
2740 target_properties: Vec<String>,
2741 target_label_name: Option<String>,
2742 graph_ctx: Arc<GraphExecutionContext>,
2743 is_optional: bool,
2744 bound_target_column: Option<String>,
2745 edge_lance_filter: Option<String>,
2746 edge_property_conditions: Vec<(String, UniValue)>,
2747 used_edge_columns: Vec<String>,
2748 path_mode: super::nfa::PathMode,
2749 output_mode: super::nfa::VlpOutputMode,
2750 qpp_nfa: Option<PathNfa>,
2751 ) -> Self {
2752 let source_column = source_column.into();
2753 let target_variable = target_variable.into();
2754
2755 let uni_schema = graph_ctx.storage().schema_manager().schema();
2757 let label_props = target_label_name
2758 .as_deref()
2759 .and_then(|ln| uni_schema.properties.get(ln));
2760
2761 let schema = Self::build_schema(
2763 input.schema(),
2764 &target_variable,
2765 step_variable.as_deref(),
2766 path_variable.as_deref(),
2767 &target_properties,
2768 label_props,
2769 );
2770 let properties = compute_plan_properties(schema.clone());
2771
2772 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2774 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2775 }));
2776
2777 Self {
2778 input,
2779 source_column,
2780 edge_type_ids,
2781 direction,
2782 min_hops,
2783 max_hops,
2784 target_variable,
2785 step_variable,
2786 path_variable,
2787 target_properties,
2788 target_label_name,
2789 is_optional,
2790 bound_target_column,
2791 edge_lance_filter,
2792 edge_property_conditions,
2793 used_edge_columns,
2794 path_mode,
2795 output_mode,
2796 nfa,
2797 graph_ctx,
2798 schema,
2799 properties,
2800 metrics: ExecutionPlanMetricsSet::new(),
2801 }
2802 }
2803
2804 fn build_schema(
2806 input_schema: SchemaRef,
2807 target_variable: &str,
2808 step_variable: Option<&str>,
2809 path_variable: Option<&str>,
2810 target_properties: &[String],
2811 label_props: Option<
2812 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2813 >,
2814 ) -> SchemaRef {
2815 let mut fields: Vec<Field> = input_schema
2816 .fields()
2817 .iter()
2818 .map(|f| f.as_ref().clone())
2819 .collect();
2820
2821 let target_vid_name = format!("{}._vid", target_variable);
2823 if input_schema.column_with_name(&target_vid_name).is_none() {
2824 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2825 }
2826
2827 let target_labels_name = format!("{}._labels", target_variable);
2829 if input_schema.column_with_name(&target_labels_name).is_none() {
2830 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2831 }
2832
2833 for prop_name in target_properties {
2835 let col_name = format!("{}.{}", target_variable, prop_name);
2836 if input_schema.column_with_name(&col_name).is_none() {
2837 let arrow_type = resolve_property_type(prop_name, label_props);
2838 let uni_type = label_props
2839 .and_then(|p| p.get(prop_name))
2840 .map(|m| &m.r#type);
2841 fields.push(property_field(&col_name, arrow_type, uni_type));
2842 }
2843 }
2844
2845 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2847
2848 if let Some(step_var) = step_variable {
2850 fields.push(build_edge_list_field(step_var));
2851 }
2852
2853 if let Some(path_var) = path_variable
2855 && input_schema.column_with_name(path_var).is_none()
2856 {
2857 fields.push(build_path_struct_field(path_var));
2858 }
2859
2860 Arc::new(Schema::new(fields))
2861 }
2862}
2863
2864impl DisplayAs for GraphVariableLengthTraverseExec {
2865 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2866 write!(
2867 f,
2868 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2869 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2870 )
2871 }
2872}
2873
2874impl ExecutionPlan for GraphVariableLengthTraverseExec {
2875 fn name(&self) -> &str {
2876 "GraphVariableLengthTraverseExec"
2877 }
2878
2879 fn as_any(&self) -> &dyn Any {
2880 self
2881 }
2882
2883 fn schema(&self) -> SchemaRef {
2884 self.schema.clone()
2885 }
2886
2887 fn properties(&self) -> &Arc<PlanProperties> {
2888 &self.properties
2889 }
2890
2891 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2892 vec![&self.input]
2893 }
2894
2895 fn with_new_children(
2896 self: Arc<Self>,
2897 children: Vec<Arc<dyn ExecutionPlan>>,
2898 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2899 if children.len() != 1 {
2900 return Err(datafusion::error::DataFusionError::Plan(
2901 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2902 ));
2903 }
2904
2905 Ok(Arc::new(Self::new(
2907 children[0].clone(),
2908 self.source_column.clone(),
2909 self.edge_type_ids.clone(),
2910 self.direction,
2911 self.min_hops,
2912 self.max_hops,
2913 self.target_variable.clone(),
2914 self.step_variable.clone(),
2915 self.path_variable.clone(),
2916 self.target_properties.clone(),
2917 self.target_label_name.clone(),
2918 self.graph_ctx.clone(),
2919 self.is_optional,
2920 self.bound_target_column.clone(),
2921 self.edge_lance_filter.clone(),
2922 self.edge_property_conditions.clone(),
2923 self.used_edge_columns.clone(),
2924 self.path_mode.clone(),
2925 self.output_mode.clone(),
2926 Some((*self.nfa).clone()),
2927 )))
2928 }
2929
2930 fn execute(
2931 &self,
2932 partition: usize,
2933 context: Arc<TaskContext>,
2934 ) -> DFResult<SendableRecordBatchStream> {
2935 let input_stream = self.input.execute(partition, context)?;
2936
2937 let metrics = BaselineMetrics::new(&self.metrics, partition);
2938
2939 let graph_ctx = self.graph_ctx.clone();
2944 let edge_type_ids = self.edge_type_ids.clone();
2945 let direction = self.direction;
2946 let edge_property_conditions = self.edge_property_conditions.clone();
2947 let warm_fut: Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>> =
2948 Box::pin(async move {
2949 graph_ctx
2950 .ensure_adjacency_warmed(&edge_type_ids, direction)
2951 .await
2952 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2953 build_edge_property_filter(
2954 &graph_ctx,
2955 &edge_type_ids,
2956 direction,
2957 &edge_property_conditions,
2958 )
2959 .await
2960 });
2961
2962 Ok(Box::pin(GraphVariableLengthTraverseStream {
2963 input: input_stream,
2964 exec: Arc::new(self.clone_for_stream()),
2965 schema: self.schema.clone(),
2966 state: VarLengthStreamState::Warming(warm_fut),
2967 edge_property_filter: EidFilter::AllAllowed,
2968 metrics,
2969 }))
2970 }
2971
2972 fn metrics(&self) -> Option<MetricsSet> {
2973 Some(self.metrics.clone_inner())
2974 }
2975}
2976
2977impl GraphVariableLengthTraverseExec {
2978 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2980 GraphVariableLengthTraverseExecData {
2981 source_column: self.source_column.clone(),
2982 edge_type_ids: self.edge_type_ids.clone(),
2983 direction: self.direction,
2984 min_hops: self.min_hops,
2985 max_hops: self.max_hops,
2986 target_variable: self.target_variable.clone(),
2987 step_variable: self.step_variable.clone(),
2988 path_variable: self.path_variable.clone(),
2989 target_properties: self.target_properties.clone(),
2990 target_label_name: self.target_label_name.clone(),
2991 is_optional: self.is_optional,
2992 bound_target_column: self.bound_target_column.clone(),
2993 edge_lance_filter: self.edge_lance_filter.clone(),
2994 edge_property_conditions: self.edge_property_conditions.clone(),
2995 used_edge_columns: self.used_edge_columns.clone(),
2996 path_mode: self.path_mode.clone(),
2997 output_mode: self.output_mode.clone(),
2998 nfa: self.nfa.clone(),
2999 graph_ctx: self.graph_ctx.clone(),
3000 }
3001 }
3002}
3003
3004#[expect(
3006 dead_code,
3007 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
3008)]
3009struct GraphVariableLengthTraverseExecData {
3010 source_column: String,
3011 edge_type_ids: Vec<u32>,
3012 direction: Direction,
3013 min_hops: usize,
3014 max_hops: usize,
3015 target_variable: String,
3016 step_variable: Option<String>,
3017 path_variable: Option<String>,
3018 target_properties: Vec<String>,
3019 target_label_name: Option<String>,
3020 is_optional: bool,
3021 bound_target_column: Option<String>,
3022 #[expect(dead_code, reason = "Used in Phase 3 warming")]
3023 edge_lance_filter: Option<String>,
3024 edge_property_conditions: Vec<(String, UniValue)>,
3026 used_edge_columns: Vec<String>,
3027 path_mode: super::nfa::PathMode,
3028 output_mode: super::nfa::VlpOutputMode,
3029 nfa: Arc<PathNfa>,
3030 graph_ctx: Arc<GraphExecutionContext>,
3031}
3032
3033const MAX_FRONTIER_SIZE: usize = 500_000;
3035const MAX_PRED_POOL_SIZE: usize = 2_000_000;
3037
3038impl GraphVariableLengthTraverseExecData {
3039 fn check_target_label(&self, vid: Vid) -> bool {
3041 if let Some(ref label_name) = self.target_label_name {
3042 let query_ctx = self.graph_ctx.query_context();
3043 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3044 Some(labels) => labels.contains(label_name),
3045 None => true, }
3047 } else {
3048 true
3049 }
3050 }
3051
3052 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
3054 match constraint {
3055 super::nfa::VertexConstraint::Label(label_name) => {
3056 let query_ctx = self.graph_ctx.query_context();
3057 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3058 Some(labels) => labels.contains(label_name),
3059 None => true, }
3061 }
3062 }
3063 }
3064
3065 fn expand_neighbors(
3068 &self,
3069 vid: Vid,
3070 state: NfaStateId,
3071 eid_filter: &EidFilter,
3072 used_eids: &FxHashSet<u64>,
3073 ) -> Vec<(Vid, Eid, NfaStateId)> {
3074 let is_undirected = matches!(self.direction, Direction::Both);
3075 let mut results = Vec::new();
3076
3077 for transition in self.nfa.transitions_from(state) {
3078 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
3079
3080 for &etype in &transition.edge_type_ids {
3081 for (neighbor, eid) in
3082 self.graph_ctx
3083 .get_neighbors(vid, etype, transition.direction)
3084 {
3085 if is_undirected && !seen_edges.insert(eid.as_u64()) {
3087 continue;
3088 }
3089
3090 if !eid_filter.contains(eid) {
3098 continue;
3099 }
3100
3101 if used_eids.contains(&eid.as_u64()) {
3103 continue;
3104 }
3105
3106 if let Some(constraint) = self.nfa.state_constraint(transition.to)
3108 && !self.check_state_constraint(neighbor, constraint)
3109 {
3110 continue;
3111 }
3112
3113 results.push((neighbor, eid, transition.to));
3114 }
3115 }
3116 }
3117
3118 results
3119 }
3120
3121 fn bfs_with_dag(
3126 &self,
3127 source: Vid,
3128 eid_filter: &EidFilter,
3129 used_eids: &FxHashSet<u64>,
3130 vid_filter: &VidFilter,
3131 ) -> Vec<BfsResult> {
3132 let nfa = &self.nfa;
3133 let selector = PathSelector::All;
3134 let mut dag = PredecessorDag::new(selector);
3135 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3136
3137 if nfa.is_accepting(nfa.start_state())
3139 && self.check_target_label(source)
3140 && vid_filter.contains(source)
3141 {
3142 accepting.push((source, nfa.start_state(), 0));
3143 }
3144
3145 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3147 let mut depth: u32 = 0;
3148
3149 while !frontier.is_empty() && depth < self.max_hops as u32 {
3150 depth += 1;
3151 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3152 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3153
3154 for &(vid, state) in &frontier {
3155 for (neighbor, eid, dst_state) in
3156 self.expand_neighbors(vid, state, eid_filter, used_eids)
3157 {
3158 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3160
3161 if seen_at_depth.insert((neighbor, dst_state)) {
3163 next_frontier.push((neighbor, dst_state));
3164
3165 if nfa.is_accepting(dst_state)
3167 && self.check_target_label(neighbor)
3168 && vid_filter.contains(neighbor)
3169 {
3170 accepting.push((neighbor, dst_state, depth));
3171 }
3172 }
3173 }
3174 }
3175
3176 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3178 break;
3179 }
3180
3181 frontier = next_frontier;
3182 }
3183
3184 let mut results: Vec<BfsResult> = Vec::new();
3186 for &(target, state, depth) in &accepting {
3187 dag.enumerate_paths(
3188 source,
3189 target,
3190 state,
3191 depth,
3192 depth,
3193 &self.path_mode,
3194 &mut |nodes, edges| {
3195 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3196 std::ops::ControlFlow::Continue(())
3197 },
3198 );
3199 }
3200
3201 results
3202 }
3203
3204 fn bfs_endpoints_only(
3209 &self,
3210 source: Vid,
3211 eid_filter: &EidFilter,
3212 used_eids: &FxHashSet<u64>,
3213 vid_filter: &VidFilter,
3214 ) -> Vec<(Vid, u32)> {
3215 let nfa = &self.nfa;
3216 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
3218 let mut results: Vec<(Vid, u32)> = Vec::new();
3219
3220 if nfa.is_accepting(nfa.start_state())
3222 && self.check_target_label(source)
3223 && vid_filter.contains(source)
3224 {
3225 results.push((source, 0));
3226 }
3227
3228 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3230 let mut depth: u32 = 0;
3231
3232 while !frontier.is_empty() && depth < self.max_hops as u32 {
3233 depth += 1;
3234 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3235 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3236
3237 for &(vid, state) in &frontier {
3238 for (neighbor, eid, dst_state) in
3239 self.expand_neighbors(vid, state, eid_filter, used_eids)
3240 {
3241 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3242
3243 if seen_at_depth.insert((neighbor, dst_state)) {
3244 next_frontier.push((neighbor, dst_state));
3245
3246 if nfa.is_accepting(dst_state)
3248 && self.check_target_label(neighbor)
3249 && vid_filter.contains(neighbor)
3250 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3251 {
3252 results.push((neighbor, depth));
3253 }
3254 }
3255 }
3256 }
3257
3258 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3259 break;
3260 }
3261
3262 frontier = next_frontier;
3263 }
3264
3265 results
3266 }
3267}
3268
3269enum VarLengthStreamState {
3271 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>>),
3276 Reading,
3278 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3280 Done,
3282}
3283
3284struct GraphVariableLengthTraverseStream {
3286 input: SendableRecordBatchStream,
3287 exec: Arc<GraphVariableLengthTraverseExecData>,
3288 schema: SchemaRef,
3289 state: VarLengthStreamState,
3290 edge_property_filter: EidFilter,
3294 metrics: BaselineMetrics,
3295}
3296
3297impl Stream for GraphVariableLengthTraverseStream {
3298 type Item = DFResult<RecordBatch>;
3299
3300 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3301 let metrics = self.metrics.clone();
3302 let _timer = metrics.elapsed_compute().timer();
3303 loop {
3304 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3305
3306 match state {
3307 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3308 Poll::Ready(Ok(filter)) => {
3309 self.edge_property_filter = filter;
3310 self.state = VarLengthStreamState::Reading;
3311 }
3313 Poll::Ready(Err(e)) => {
3314 self.state = VarLengthStreamState::Done;
3315 return Poll::Ready(Some(Err(e)));
3316 }
3317 Poll::Pending => {
3318 self.state = VarLengthStreamState::Warming(fut);
3319 return Poll::Pending;
3320 }
3321 },
3322 VarLengthStreamState::Reading => {
3323 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3325 return Poll::Ready(Some(Err(
3326 datafusion::error::DataFusionError::Execution(e.to_string()),
3327 )));
3328 }
3329
3330 match self.input.poll_next_unpin(cx) {
3331 Poll::Ready(Some(Ok(batch))) => {
3332 let vid_filter = VidFilter::AllAllowed;
3337 let base_result = self.process_batch_base(
3338 batch,
3339 &self.edge_property_filter,
3340 &vid_filter,
3341 );
3342 let base_batch = match base_result {
3343 Ok(b) => b,
3344 Err(e) => {
3345 self.state = VarLengthStreamState::Reading;
3346 return Poll::Ready(Some(Err(e)));
3347 }
3348 };
3349
3350 if self.exec.target_properties.is_empty() {
3352 self.state = VarLengthStreamState::Reading;
3353 return Poll::Ready(Some(Ok(base_batch)));
3354 }
3355
3356 let schema = self.schema.clone();
3358 let target_variable = self.exec.target_variable.clone();
3359 let target_properties = self.exec.target_properties.clone();
3360 let target_label_name = self.exec.target_label_name.clone();
3361 let graph_ctx = self.exec.graph_ctx.clone();
3362
3363 let fut = hydrate_vlp_target_properties(
3364 base_batch,
3365 schema,
3366 target_variable,
3367 target_properties,
3368 target_label_name,
3369 graph_ctx,
3370 );
3371
3372 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3373 }
3375 Poll::Ready(Some(Err(e))) => {
3376 self.state = VarLengthStreamState::Done;
3377 return Poll::Ready(Some(Err(e)));
3378 }
3379 Poll::Ready(None) => {
3380 self.state = VarLengthStreamState::Done;
3381 return Poll::Ready(None);
3382 }
3383 Poll::Pending => {
3384 self.state = VarLengthStreamState::Reading;
3385 return Poll::Pending;
3386 }
3387 }
3388 }
3389 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3390 Poll::Ready(Ok(batch)) => {
3391 self.state = VarLengthStreamState::Reading;
3392 self.metrics.record_output(batch.num_rows());
3393 return Poll::Ready(Some(Ok(batch)));
3394 }
3395 Poll::Ready(Err(e)) => {
3396 self.state = VarLengthStreamState::Done;
3397 return Poll::Ready(Some(Err(e)));
3398 }
3399 Poll::Pending => {
3400 self.state = VarLengthStreamState::Materializing(fut);
3401 return Poll::Pending;
3402 }
3403 },
3404 VarLengthStreamState::Done => {
3405 return Poll::Ready(None);
3406 }
3407 }
3408 }
3409 }
3410}
3411
3412impl GraphVariableLengthTraverseStream {
3413 fn process_batch_base(
3414 &self,
3415 batch: RecordBatch,
3416 eid_filter: &EidFilter,
3417 vid_filter: &VidFilter,
3418 ) -> DFResult<RecordBatch> {
3419 let source_col = batch
3420 .column_by_name(&self.exec.source_column)
3421 .ok_or_else(|| {
3422 datafusion::error::DataFusionError::Execution(format!(
3423 "Source column '{}' not found",
3424 self.exec.source_column
3425 ))
3426 })?;
3427
3428 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3429 let source_vids: &UInt64Array = &source_vid_cow;
3430
3431 let bound_target_cow = self
3433 .exec
3434 .bound_target_column
3435 .as_ref()
3436 .and_then(|col| batch.column_by_name(col))
3437 .map(|c| column_as_vid_array(c.as_ref()))
3438 .transpose()?;
3439 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3440
3441 let used_edge_arrays: Vec<&UInt64Array> = self
3443 .exec
3444 .used_edge_columns
3445 .iter()
3446 .filter_map(|col| {
3447 batch
3448 .column_by_name(col)?
3449 .as_any()
3450 .downcast_ref::<UInt64Array>()
3451 })
3452 .collect();
3453
3454 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3456
3457 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3458 let mut emitted_for_row = false;
3459
3460 if let Some(src) = source_vid {
3461 let vid = Vid::from(src);
3462
3463 let used_eids: FxHashSet<u64> = used_edge_arrays
3465 .iter()
3466 .filter_map(|arr| {
3467 if arr.is_null(row_idx) {
3468 None
3469 } else {
3470 Some(arr.value(row_idx))
3471 }
3472 })
3473 .collect();
3474
3475 match &self.exec.output_mode {
3477 VlpOutputMode::EndpointsOnly => {
3478 let endpoints = self
3479 .exec
3480 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3481 for (target, depth) in endpoints {
3482 if let Some(targets) = expected_targets {
3484 if targets.is_null(row_idx) {
3485 continue;
3486 }
3487 if target.as_u64() != targets.value(row_idx) {
3488 continue;
3489 }
3490 }
3491 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3492 emitted_for_row = true;
3493 }
3494 }
3495 _ => {
3496 let bfs_results = self
3498 .exec
3499 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3500 for (target, hop_count, node_path, edge_path) in bfs_results {
3501 if let Some(targets) = expected_targets {
3503 if targets.is_null(row_idx) {
3504 continue;
3505 }
3506 if target.as_u64() != targets.value(row_idx) {
3507 continue;
3508 }
3509 }
3510 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3511 emitted_for_row = true;
3512 }
3513 }
3514 }
3515 }
3516
3517 if self.exec.is_optional && !emitted_for_row {
3518 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3521 }
3522 }
3523
3524 self.build_output_batch(&batch, &expansions)
3525 }
3526
3527 fn build_output_batch(
3528 &self,
3529 input: &RecordBatch,
3530 expansions: &[VarLengthExpansion],
3531 ) -> DFResult<RecordBatch> {
3532 if expansions.is_empty() {
3533 return Ok(RecordBatch::new_empty(self.schema.clone()));
3534 }
3535
3536 let num_rows = expansions.len();
3537
3538 let indices: Vec<u64> = expansions
3540 .iter()
3541 .map(|(idx, _, _, _, _)| *idx as u64)
3542 .collect();
3543 let indices_array = UInt64Array::from(indices);
3544
3545 let mut columns: Vec<ArrayRef> = Vec::new();
3547 for col in input.columns() {
3548 let expanded = take(col.as_ref(), &indices_array, None)?;
3549 columns.push(expanded);
3550 }
3551
3552 let unmatched_rows: Vec<bool> = expansions
3556 .iter()
3557 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3558 .collect();
3559 let target_vids: Vec<Option<u64>> = expansions
3560 .iter()
3561 .zip(unmatched_rows.iter())
3562 .map(
3563 |((_, vid, _, _, _), unmatched)| {
3564 if *unmatched { None } else { Some(vid.as_u64()) }
3565 },
3566 )
3567 .collect();
3568
3569 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3571 if input.schema().column_with_name(&target_vid_name).is_none() {
3572 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3573 }
3574
3575 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3577 if input
3578 .schema()
3579 .column_with_name(&target_labels_name)
3580 .is_none()
3581 {
3582 use arrow_array::builder::{ListBuilder, StringBuilder};
3583 let query_ctx = self.exec.graph_ctx.query_context();
3584 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3585 for target_vid in &target_vids {
3586 let Some(vid_u64) = target_vid else {
3587 labels_builder.append(false);
3588 continue;
3589 };
3590 let vid = Vid::from(*vid_u64);
3591 let row_labels: Vec<String> =
3592 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3593 Some(labels) => {
3594 labels
3596 }
3597 None => {
3598 if let Some(ref label_name) = self.exec.target_label_name {
3600 vec![label_name.clone()]
3601 } else {
3602 vec![]
3603 }
3604 }
3605 };
3606 let values = labels_builder.values();
3607 for lbl in &row_labels {
3608 values.append_value(lbl);
3609 }
3610 labels_builder.append(true);
3611 }
3612 columns.push(Arc::new(labels_builder.finish()));
3613 }
3614
3615 for prop_name in &self.exec.target_properties {
3617 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3618 if input.schema().column_with_name(&full_prop_name).is_none() {
3619 let col_idx = columns.len();
3620 if col_idx < self.schema.fields().len() {
3621 let field = self.schema.field(col_idx);
3622 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3623 }
3624 }
3625 }
3626
3627 let hop_counts: Vec<u64> = expansions
3629 .iter()
3630 .map(|(_, _, hops, _, _)| *hops as u64)
3631 .collect();
3632 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3633
3634 if self.exec.step_variable.is_some() {
3636 let mut edges_builder = new_edge_list_builder();
3637 let query_ctx = self.exec.graph_ctx.query_context();
3638
3639 for (_, _, _, node_path, edge_path) in expansions {
3640 if node_path.is_empty() && edge_path.is_empty() {
3641 edges_builder.append_null();
3643 } else if edge_path.is_empty() {
3644 edges_builder.append(true);
3646 } else {
3647 for (i, eid) in edge_path.iter().enumerate() {
3648 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3649 .unwrap_or_else(|| "UNKNOWN".to_string());
3650 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3655 *eid,
3656 node_path[i],
3657 node_path[i + 1],
3658 &self.exec.edge_type_ids,
3659 );
3660 append_edge_to_struct(
3661 edges_builder.values(),
3662 *eid,
3663 &type_name,
3664 src,
3665 dst,
3666 &query_ctx,
3667 );
3668 }
3669 edges_builder.append(true);
3670 }
3671 }
3672
3673 columns.push(Arc::new(edges_builder.finish()));
3674 }
3675
3676 if let Some(path_var_name) = &self.exec.path_variable {
3681 let existing_path_col_idx = input
3682 .schema()
3683 .column_with_name(path_var_name)
3684 .map(|(idx, _)| idx);
3685 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3687 let existing_path = existing_path_arc
3688 .as_ref()
3689 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3690
3691 let mut nodes_builder = new_node_list_builder();
3692 let mut rels_builder = new_edge_list_builder();
3693 let query_ctx = self.exec.graph_ctx.query_context();
3694 let mut path_validity = Vec::with_capacity(expansions.len());
3695
3696 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3697 if node_path.is_empty() && edge_path.is_empty() {
3698 nodes_builder.append(false);
3699 rels_builder.append(false);
3700 path_validity.push(false);
3701 continue;
3702 }
3703
3704 let skip_first_vlp_node = if let Some(existing) = existing_path {
3706 if !existing.is_null(row_out_idx) {
3707 prepend_existing_path(
3708 existing,
3709 row_out_idx,
3710 &mut nodes_builder,
3711 &mut rels_builder,
3712 &query_ctx,
3713 );
3714 true
3715 } else {
3716 false
3717 }
3718 } else {
3719 false
3720 };
3721
3722 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3724 for vid in &node_path[start_idx..] {
3725 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3726 }
3727 nodes_builder.append(true);
3728
3729 for (i, eid) in edge_path.iter().enumerate() {
3730 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3731 .unwrap_or_else(|| "UNKNOWN".to_string());
3732 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3737 *eid,
3738 node_path[i],
3739 node_path[i + 1],
3740 &self.exec.edge_type_ids,
3741 );
3742 append_edge_to_struct(
3743 rels_builder.values(),
3744 *eid,
3745 &type_name,
3746 src,
3747 dst,
3748 &query_ctx,
3749 );
3750 }
3751 rels_builder.append(true);
3752 path_validity.push(true);
3753 }
3754
3755 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3757 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3758
3759 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3761 let rels_field = Arc::new(Field::new(
3762 "relationships",
3763 rels_array.data_type().clone(),
3764 true,
3765 ));
3766
3767 let path_struct = arrow_array::StructArray::try_new(
3769 vec![nodes_field, rels_field].into(),
3770 vec![nodes_array, rels_array],
3771 Some(arrow::buffer::NullBuffer::from(path_validity)),
3772 )
3773 .map_err(arrow_err)?;
3774
3775 if let Some(idx) = existing_path_col_idx {
3776 columns[idx] = Arc::new(path_struct);
3777 } else {
3778 columns.push(Arc::new(path_struct));
3779 }
3780 }
3781
3782 self.metrics.record_output(num_rows);
3783
3784 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3785 }
3786}
3787
3788impl RecordBatchStream for GraphVariableLengthTraverseStream {
3789 fn schema(&self) -> SchemaRef {
3790 self.schema.clone()
3791 }
3792}
3793
3794async fn hydrate_vlp_target_properties(
3799 base_batch: RecordBatch,
3800 schema: SchemaRef,
3801 target_variable: String,
3802 target_properties: Vec<String>,
3803 target_label_name: Option<String>,
3804 graph_ctx: Arc<GraphExecutionContext>,
3805) -> DFResult<RecordBatch> {
3806 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3807 return Ok(base_batch);
3808 }
3809
3810 let target_vid_col_name = format!("{}._vid", target_variable);
3817 let vid_col_idx = schema
3818 .fields()
3819 .iter()
3820 .enumerate()
3821 .rev()
3822 .find(|(_, f)| f.name() == &target_vid_col_name)
3823 .map(|(i, _)| i);
3824
3825 let Some(vid_col_idx) = vid_col_idx else {
3826 return Ok(base_batch);
3827 };
3828
3829 let vid_col = base_batch.column(vid_col_idx);
3830 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3831 let target_vid_array: &UInt64Array = &target_vid_cow;
3832
3833 let target_vids: Vec<Vid> = target_vid_array
3834 .iter()
3835 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3838 .collect();
3839
3840 let mut property_columns: Vec<ArrayRef> = Vec::new();
3842
3843 if let Some(ref label_name) = target_label_name {
3844 let property_manager = graph_ctx.property_manager();
3845 let query_ctx = graph_ctx.query_context();
3846
3847 let props_map = property_manager
3848 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3849 .await
3850 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3851
3852 let uni_schema = graph_ctx.storage().schema_manager().schema();
3853 let label_props = uni_schema.properties.get(label_name.as_str());
3854
3855 for prop_name in &target_properties {
3856 let data_type = resolve_property_type(prop_name, label_props);
3857 let column =
3858 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3859 property_columns.push(column);
3860 }
3861 } else {
3862 let non_internal_props: Vec<&str> = target_properties
3865 .iter()
3866 .filter(|p| *p != "_all_props")
3867 .map(|s| s.as_str())
3868 .collect();
3869 let property_manager = graph_ctx.property_manager();
3870 let query_ctx = graph_ctx.query_context();
3871
3872 let props_map = if !non_internal_props.is_empty() {
3873 property_manager
3874 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3875 .await
3876 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3877 } else {
3878 std::collections::HashMap::new()
3879 };
3880
3881 for prop_name in &target_properties {
3882 if prop_name == "_all_props" {
3883 use arrow_array::builder::LargeBinaryBuilder;
3886
3887 let mut builder = LargeBinaryBuilder::new();
3888 let l0_ctx = graph_ctx.l0_context();
3889 for vid in &target_vids {
3890 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3891 if let Some(vid_props) = props_map.get(vid) {
3893 for (k, v) in vid_props.iter() {
3894 merged_props.insert(k.clone(), v.clone());
3895 }
3896 }
3897 for l0 in l0_ctx.iter_l0_buffers() {
3899 let guard = l0.read();
3900 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3901 for (k, v) in l0_props.iter() {
3902 merged_props.insert(k.clone(), v.clone());
3903 }
3904 }
3905 }
3906 if merged_props.is_empty() {
3907 builder.append_null();
3908 } else {
3909 builder.append_value(uni_common::cypher_value_codec::encode(
3910 &uni_common::Value::Map(merged_props),
3911 ));
3912 }
3913 }
3914 property_columns.push(Arc::new(builder.finish()));
3915 } else {
3916 let column = build_property_column_static(
3917 &target_vids,
3918 &props_map,
3919 prop_name,
3920 &arrow::datatypes::DataType::LargeBinary,
3921 )?;
3922 property_columns.push(column);
3923 }
3924 }
3925 }
3926
3927 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3933 let mut prop_idx = 0;
3934 for (col_idx, field) in schema.fields().iter().enumerate() {
3935 let is_target_prop = col_idx > vid_col_idx
3936 && target_properties
3937 .iter()
3938 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3939 if is_target_prop && prop_idx < property_columns.len() {
3940 new_columns.push(property_columns[prop_idx].clone());
3941 prop_idx += 1;
3942 } else {
3943 new_columns.push(base_batch.column(col_idx).clone());
3944 }
3945 }
3946
3947 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3948}
3949
3950pub struct GraphVariableLengthTraverseMainExec {
3960 input: Arc<dyn ExecutionPlan>,
3962
3963 source_column: String,
3965
3966 type_names: Vec<String>,
3968
3969 direction: Direction,
3971
3972 min_hops: usize,
3974
3975 max_hops: usize,
3977
3978 target_variable: String,
3980
3981 step_variable: Option<String>,
3983
3984 path_variable: Option<String>,
3986
3987 target_properties: Vec<String>,
3989
3990 is_optional: bool,
3992
3993 bound_target_column: Option<String>,
3995
3996 edge_lance_filter: Option<String>,
3998
3999 edge_property_conditions: Vec<(String, UniValue)>,
4002
4003 used_edge_columns: Vec<String>,
4005
4006 path_mode: super::nfa::PathMode,
4008
4009 output_mode: super::nfa::VlpOutputMode,
4011
4012 graph_ctx: Arc<GraphExecutionContext>,
4014
4015 schema: SchemaRef,
4017
4018 properties: Arc<PlanProperties>,
4020
4021 metrics: ExecutionPlanMetricsSet,
4023}
4024
4025impl fmt::Debug for GraphVariableLengthTraverseMainExec {
4026 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4027 f.debug_struct("GraphVariableLengthTraverseMainExec")
4028 .field("source_column", &self.source_column)
4029 .field("type_names", &self.type_names)
4030 .field("direction", &self.direction)
4031 .field("min_hops", &self.min_hops)
4032 .field("max_hops", &self.max_hops)
4033 .field("target_variable", &self.target_variable)
4034 .finish()
4035 }
4036}
4037
4038impl GraphVariableLengthTraverseMainExec {
4039 #[expect(clippy::too_many_arguments)]
4041 pub fn new(
4042 input: Arc<dyn ExecutionPlan>,
4043 source_column: impl Into<String>,
4044 type_names: Vec<String>,
4045 direction: Direction,
4046 min_hops: usize,
4047 max_hops: usize,
4048 target_variable: impl Into<String>,
4049 step_variable: Option<String>,
4050 path_variable: Option<String>,
4051 target_properties: Vec<String>,
4052 graph_ctx: Arc<GraphExecutionContext>,
4053 is_optional: bool,
4054 bound_target_column: Option<String>,
4055 edge_lance_filter: Option<String>,
4056 edge_property_conditions: Vec<(String, UniValue)>,
4057 used_edge_columns: Vec<String>,
4058 path_mode: super::nfa::PathMode,
4059 output_mode: super::nfa::VlpOutputMode,
4060 ) -> Self {
4061 let source_column = source_column.into();
4062 let target_variable = target_variable.into();
4063
4064 let schema = Self::build_schema(
4066 input.schema(),
4067 &target_variable,
4068 step_variable.as_deref(),
4069 path_variable.as_deref(),
4070 &target_properties,
4071 );
4072 let properties = compute_plan_properties(schema.clone());
4073
4074 Self {
4075 input,
4076 source_column,
4077 type_names,
4078 direction,
4079 min_hops,
4080 max_hops,
4081 target_variable,
4082 step_variable,
4083 path_variable,
4084 target_properties,
4085 is_optional,
4086 bound_target_column,
4087 edge_lance_filter,
4088 edge_property_conditions,
4089 used_edge_columns,
4090 path_mode,
4091 output_mode,
4092 graph_ctx,
4093 schema,
4094 properties,
4095 metrics: ExecutionPlanMetricsSet::new(),
4096 }
4097 }
4098
4099 fn build_schema(
4101 input_schema: SchemaRef,
4102 target_variable: &str,
4103 step_variable: Option<&str>,
4104 path_variable: Option<&str>,
4105 target_properties: &[String],
4106 ) -> SchemaRef {
4107 let mut fields: Vec<Field> = input_schema
4108 .fields()
4109 .iter()
4110 .map(|f| f.as_ref().clone())
4111 .collect();
4112
4113 let target_vid_name = format!("{}._vid", target_variable);
4115 if input_schema.column_with_name(&target_vid_name).is_none() {
4116 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4117 }
4118
4119 let target_labels_name = format!("{}._labels", target_variable);
4121 if input_schema.column_with_name(&target_labels_name).is_none() {
4122 fields.push(Field::new(target_labels_name, labels_data_type(), true));
4123 }
4124
4125 fields.push(Field::new("_hop_count", DataType::UInt64, false));
4127
4128 if let Some(step_var) = step_variable {
4131 fields.push(build_edge_list_field(step_var));
4132 }
4133
4134 if let Some(path_var) = path_variable
4136 && input_schema.column_with_name(path_var).is_none()
4137 {
4138 fields.push(build_path_struct_field(path_var));
4139 }
4140
4141 for prop in target_properties {
4144 let prop_name = format!("{}.{}", target_variable, prop);
4145 if input_schema.column_with_name(&prop_name).is_none() {
4146 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4147 }
4148 }
4149
4150 Arc::new(Schema::new(fields))
4151 }
4152}
4153
4154impl DisplayAs for GraphVariableLengthTraverseMainExec {
4155 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4156 write!(
4157 f,
4158 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4159 self.source_column, self.type_names, self.min_hops, self.max_hops
4160 )
4161 }
4162}
4163
4164impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4165 fn name(&self) -> &str {
4166 "GraphVariableLengthTraverseMainExec"
4167 }
4168
4169 fn as_any(&self) -> &dyn Any {
4170 self
4171 }
4172
4173 fn schema(&self) -> SchemaRef {
4174 self.schema.clone()
4175 }
4176
4177 fn properties(&self) -> &Arc<PlanProperties> {
4178 &self.properties
4179 }
4180
4181 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4182 vec![&self.input]
4183 }
4184
4185 fn with_new_children(
4186 self: Arc<Self>,
4187 children: Vec<Arc<dyn ExecutionPlan>>,
4188 ) -> DFResult<Arc<dyn ExecutionPlan>> {
4189 if children.len() != 1 {
4190 return Err(datafusion::error::DataFusionError::Plan(
4191 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4192 ));
4193 }
4194
4195 Ok(Arc::new(Self::new(
4196 children[0].clone(),
4197 self.source_column.clone(),
4198 self.type_names.clone(),
4199 self.direction,
4200 self.min_hops,
4201 self.max_hops,
4202 self.target_variable.clone(),
4203 self.step_variable.clone(),
4204 self.path_variable.clone(),
4205 self.target_properties.clone(),
4206 self.graph_ctx.clone(),
4207 self.is_optional,
4208 self.bound_target_column.clone(),
4209 self.edge_lance_filter.clone(),
4210 self.edge_property_conditions.clone(),
4211 self.used_edge_columns.clone(),
4212 self.path_mode.clone(),
4213 self.output_mode.clone(),
4214 )))
4215 }
4216
4217 fn execute(
4218 &self,
4219 partition: usize,
4220 context: Arc<TaskContext>,
4221 ) -> DFResult<SendableRecordBatchStream> {
4222 let input_stream = self.input.execute(partition, context)?;
4223 let metrics = BaselineMetrics::new(&self.metrics, partition);
4224
4225 let graph_ctx = self.graph_ctx.clone();
4227 let type_names = self.type_names.clone();
4228 let direction = self.direction;
4229 let load_fut = async move {
4230 build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4232 };
4233
4234 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4235 input: input_stream,
4236 source_column: self.source_column.clone(),
4237 type_names: self.type_names.clone(),
4238 direction: self.direction,
4239 min_hops: self.min_hops,
4240 max_hops: self.max_hops,
4241 target_variable: self.target_variable.clone(),
4242 step_variable: self.step_variable.clone(),
4243 path_variable: self.path_variable.clone(),
4244 target_properties: self.target_properties.clone(),
4245 graph_ctx: self.graph_ctx.clone(),
4246 is_optional: self.is_optional,
4247 bound_target_column: self.bound_target_column.clone(),
4248 edge_lance_filter: self.edge_lance_filter.clone(),
4249 edge_property_conditions: self.edge_property_conditions.clone(),
4250 used_edge_columns: self.used_edge_columns.clone(),
4251 path_mode: self.path_mode.clone(),
4252 output_mode: self.output_mode.clone(),
4253 schema: self.schema.clone(),
4254 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4255 metrics,
4256 }))
4257 }
4258
4259 fn metrics(&self) -> Option<MetricsSet> {
4260 Some(self.metrics.clone_inner())
4261 }
4262}
4263
4264enum VarLengthMainStreamState {
4266 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4268 Processing(EdgeAdjacencyMap),
4270 Materializing {
4272 adjacency: EdgeAdjacencyMap,
4273 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4274 },
4275 Done,
4277}
4278
4279#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4281struct GraphVariableLengthTraverseMainStream {
4282 input: SendableRecordBatchStream,
4283 source_column: String,
4284 type_names: Vec<String>,
4285 direction: Direction,
4286 min_hops: usize,
4287 max_hops: usize,
4288 target_variable: String,
4289 step_variable: Option<String>,
4291 path_variable: Option<String>,
4292 target_properties: Vec<String>,
4293 graph_ctx: Arc<GraphExecutionContext>,
4294 is_optional: bool,
4295 bound_target_column: Option<String>,
4296 edge_lance_filter: Option<String>,
4297 edge_property_conditions: Vec<(String, UniValue)>,
4299 used_edge_columns: Vec<String>,
4300 path_mode: super::nfa::PathMode,
4301 output_mode: super::nfa::VlpOutputMode,
4302 schema: SchemaRef,
4303 state: VarLengthMainStreamState,
4304 metrics: BaselineMetrics,
4305}
4306
4307type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4309
4310impl GraphVariableLengthTraverseMainStream {
4311 fn bfs(
4317 &self,
4318 source: Vid,
4319 adjacency: &EdgeAdjacencyMap,
4320 used_eids: &FxHashSet<u64>,
4321 ) -> Vec<MainBfsResult> {
4322 let mut results = Vec::new();
4323 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4324
4325 queue.push_back((source, 0, vec![source], vec![]));
4326
4327 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4328 if depth >= self.min_hops && depth <= self.max_hops {
4330 results.push((current, depth, node_path.clone(), edge_path.clone()));
4331 }
4332
4333 if depth >= self.max_hops {
4335 continue;
4336 }
4337
4338 if let Some(neighbors) = adjacency.get(¤t) {
4340 let is_undirected = matches!(self.direction, Direction::Both);
4341 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4342
4343 for (neighbor, eid, _edge_type, props) in neighbors {
4344 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4346 continue;
4347 }
4348
4349 if edge_path.contains(eid) {
4351 continue;
4352 }
4353
4354 if used_eids.contains(&eid.as_u64()) {
4357 continue;
4358 }
4359
4360 if !self.edge_property_conditions.is_empty() {
4362 let passes =
4363 self.edge_property_conditions
4364 .iter()
4365 .all(|(name, expected)| {
4366 props.get(name).is_some_and(|actual| actual == expected)
4367 });
4368 if !passes {
4369 continue;
4370 }
4371 }
4372
4373 let mut new_node_path = node_path.clone();
4374 new_node_path.push(*neighbor);
4375 let mut new_edge_path = edge_path.clone();
4376 new_edge_path.push(*eid);
4377 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4378 }
4379 }
4380 }
4381
4382 results
4383 }
4384
4385 fn process_batch(
4387 &self,
4388 batch: RecordBatch,
4389 adjacency: &EdgeAdjacencyMap,
4390 ) -> DFResult<RecordBatch> {
4391 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4392 datafusion::error::DataFusionError::Execution(format!(
4393 "Source column '{}' not found in input batch",
4394 self.source_column
4395 ))
4396 })?;
4397
4398 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4399 let source_vids: &UInt64Array = &source_vid_cow;
4400
4401 let bound_target_cow = self
4403 .bound_target_column
4404 .as_ref()
4405 .and_then(|col| batch.column_by_name(col))
4406 .map(|c| column_as_vid_array(c.as_ref()))
4407 .transpose()?;
4408 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4409
4410 let used_edge_arrays: Vec<&UInt64Array> = self
4412 .used_edge_columns
4413 .iter()
4414 .filter_map(|col| {
4415 batch
4416 .column_by_name(col)?
4417 .as_any()
4418 .downcast_ref::<UInt64Array>()
4419 })
4420 .collect();
4421
4422 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4424
4425 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4426 let mut emitted_for_row = false;
4427
4428 if let Some(source_u64) = source_opt {
4429 let source = Vid::from(source_u64);
4430
4431 let used_eids: FxHashSet<u64> = used_edge_arrays
4433 .iter()
4434 .filter_map(|arr| {
4435 if arr.is_null(row_idx) {
4436 None
4437 } else {
4438 Some(arr.value(row_idx))
4439 }
4440 })
4441 .collect();
4442
4443 let bfs_results = self.bfs(source, adjacency, &used_eids);
4444
4445 for (target, hops, node_path, edge_path) in bfs_results {
4446 if let Some(targets) = expected_targets {
4449 if targets.is_null(row_idx) {
4450 continue;
4451 }
4452 let expected_vid = targets.value(row_idx);
4453 if target.as_u64() != expected_vid {
4454 continue;
4455 }
4456 }
4457
4458 expansions.push((row_idx, target, hops, node_path, edge_path));
4459 emitted_for_row = true;
4460 }
4461 }
4462
4463 if self.is_optional && !emitted_for_row {
4464 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4466 }
4467 }
4468
4469 if expansions.is_empty() {
4470 if self.is_optional {
4471 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4472 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4473 }
4474 return Ok(RecordBatch::new_empty(self.schema.clone()));
4475 }
4476
4477 let num_rows = expansions.len();
4478 self.metrics.record_output(num_rows);
4479
4480 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4482
4483 let edge_type_ids: Vec<u32> = {
4487 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4488 self.type_names
4489 .iter()
4490 .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4491 .collect()
4492 };
4493
4494 for col_idx in 0..batch.num_columns() {
4496 let array = batch.column(col_idx);
4497 let indices: Vec<u64> = expansions
4498 .iter()
4499 .map(|(idx, _, _, _, _)| *idx as u64)
4500 .collect();
4501 let take_indices = UInt64Array::from(indices);
4502 let expanded = arrow::compute::take(array, &take_indices, None)?;
4503 columns.push(expanded);
4504 }
4505
4506 let target_vid_name = format!("{}._vid", self.target_variable);
4508 if batch.schema().column_with_name(&target_vid_name).is_none() {
4509 let target_vids: Vec<Option<u64>> = expansions
4510 .iter()
4511 .map(|(_, vid, _, node_path, edge_path)| {
4512 if node_path.is_empty() && edge_path.is_empty() {
4513 None
4514 } else {
4515 Some(vid.as_u64())
4516 }
4517 })
4518 .collect();
4519 columns.push(Arc::new(UInt64Array::from(target_vids)));
4520 }
4521
4522 let target_labels_name = format!("{}._labels", self.target_variable);
4524 if batch
4525 .schema()
4526 .column_with_name(&target_labels_name)
4527 .is_none()
4528 {
4529 use arrow_array::builder::{ListBuilder, StringBuilder};
4530 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4531 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4532 if node_path.is_empty() && edge_path.is_empty() {
4533 labels_builder.append(false);
4534 continue;
4535 }
4536 let mut row_labels: Vec<String> = Vec::new();
4537 let labels =
4538 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4539 for lbl in &labels {
4540 if !row_labels.contains(lbl) {
4541 row_labels.push(lbl.clone());
4542 }
4543 }
4544 let values = labels_builder.values();
4545 for lbl in &row_labels {
4546 values.append_value(lbl);
4547 }
4548 labels_builder.append(true);
4549 }
4550 columns.push(Arc::new(labels_builder.finish()));
4551 }
4552
4553 let hop_counts: Vec<u64> = expansions
4555 .iter()
4556 .map(|(_, _, hops, _, _)| *hops as u64)
4557 .collect();
4558 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4559
4560 if self.step_variable.is_some() {
4562 let mut edges_builder = new_edge_list_builder();
4563 let query_ctx = self.graph_ctx.query_context();
4564 let type_names_str = self.type_names.join("|");
4565
4566 for (_, _, _, node_path, edge_path) in expansions.iter() {
4567 if node_path.is_empty() && edge_path.is_empty() {
4568 edges_builder.append_null();
4569 } else if edge_path.is_empty() {
4570 edges_builder.append(true);
4572 } else {
4573 for (i, eid) in edge_path.iter().enumerate() {
4574 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4579 *eid,
4580 node_path[i],
4581 node_path[i + 1],
4582 &edge_type_ids,
4583 );
4584 append_edge_to_struct(
4585 edges_builder.values(),
4586 *eid,
4587 &type_names_str,
4588 src,
4589 dst,
4590 &query_ctx,
4591 );
4592 }
4593 edges_builder.append(true);
4594 }
4595 }
4596
4597 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4598 }
4599
4600 if let Some(path_var_name) = &self.path_variable {
4604 let existing_path_col_idx = batch
4605 .schema()
4606 .column_with_name(path_var_name)
4607 .map(|(idx, _)| idx);
4608 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4609 let existing_path = existing_path_arc
4610 .as_ref()
4611 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4612
4613 let mut nodes_builder = new_node_list_builder();
4614 let mut rels_builder = new_edge_list_builder();
4615 let query_ctx = self.graph_ctx.query_context();
4616 let type_names_str = self.type_names.join("|");
4617 let mut path_validity = Vec::with_capacity(expansions.len());
4618
4619 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4620 if node_path.is_empty() && edge_path.is_empty() {
4621 nodes_builder.append(false);
4622 rels_builder.append(false);
4623 path_validity.push(false);
4624 continue;
4625 }
4626
4627 let skip_first_vlp_node = if let Some(existing) = existing_path {
4629 if !existing.is_null(row_out_idx) {
4630 prepend_existing_path(
4631 existing,
4632 row_out_idx,
4633 &mut nodes_builder,
4634 &mut rels_builder,
4635 &query_ctx,
4636 );
4637 true
4638 } else {
4639 false
4640 }
4641 } else {
4642 false
4643 };
4644
4645 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4647 for vid in &node_path[start_idx..] {
4648 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4649 }
4650 nodes_builder.append(true);
4651
4652 for (i, eid) in edge_path.iter().enumerate() {
4653 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4658 *eid,
4659 node_path[i],
4660 node_path[i + 1],
4661 &edge_type_ids,
4662 );
4663 append_edge_to_struct(
4664 rels_builder.values(),
4665 *eid,
4666 &type_names_str,
4667 src,
4668 dst,
4669 &query_ctx,
4670 );
4671 }
4672 rels_builder.append(true);
4673 path_validity.push(true);
4674 }
4675
4676 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4678 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4679
4680 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4682 let rels_field = Arc::new(Field::new(
4683 "relationships",
4684 rels_array.data_type().clone(),
4685 true,
4686 ));
4687
4688 let path_struct = arrow_array::StructArray::try_new(
4690 vec![nodes_field, rels_field].into(),
4691 vec![nodes_array, rels_array],
4692 Some(arrow::buffer::NullBuffer::from(path_validity)),
4693 )
4694 .map_err(arrow_err)?;
4695
4696 if let Some(idx) = existing_path_col_idx {
4697 columns[idx] = Arc::new(path_struct);
4698 } else {
4699 columns.push(Arc::new(path_struct));
4700 }
4701 }
4702
4703 for prop_name in &self.target_properties {
4706 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4707 if batch.schema().column_with_name(&full_prop_name).is_none() {
4708 columns.push(arrow_array::new_null_array(
4709 &DataType::LargeBinary,
4710 num_rows,
4711 ));
4712 }
4713 }
4714
4715 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4716 }
4717}
4718
4719impl Stream for GraphVariableLengthTraverseMainStream {
4720 type Item = DFResult<RecordBatch>;
4721
4722 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4723 let metrics = self.metrics.clone();
4724 let _timer = metrics.elapsed_compute().timer();
4725 loop {
4726 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4727
4728 match state {
4729 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4730 Poll::Ready(Ok(adjacency)) => {
4731 self.state = VarLengthMainStreamState::Processing(adjacency);
4732 }
4734 Poll::Ready(Err(e)) => {
4735 self.state = VarLengthMainStreamState::Done;
4736 return Poll::Ready(Some(Err(e)));
4737 }
4738 Poll::Pending => {
4739 self.state = VarLengthMainStreamState::Loading(fut);
4740 return Poll::Pending;
4741 }
4742 },
4743 VarLengthMainStreamState::Processing(adjacency) => {
4744 match self.input.poll_next_unpin(cx) {
4745 Poll::Ready(Some(Ok(batch))) => {
4746 let base_batch = match self.process_batch(batch, &adjacency) {
4747 Ok(b) => b,
4748 Err(e) => {
4749 self.state = VarLengthMainStreamState::Processing(adjacency);
4750 return Poll::Ready(Some(Err(e)));
4751 }
4752 };
4753
4754 if self.target_properties.is_empty() {
4756 self.state = VarLengthMainStreamState::Processing(adjacency);
4757 return Poll::Ready(Some(Ok(base_batch)));
4758 }
4759
4760 let schema = self.schema.clone();
4762 let target_variable = self.target_variable.clone();
4763 let target_properties = self.target_properties.clone();
4764 let graph_ctx = self.graph_ctx.clone();
4765
4766 let fut = hydrate_vlp_target_properties(
4767 base_batch,
4768 schema,
4769 target_variable,
4770 target_properties,
4771 None, graph_ctx,
4773 );
4774
4775 self.state = VarLengthMainStreamState::Materializing {
4776 adjacency,
4777 fut: Box::pin(fut),
4778 };
4779 }
4781 Poll::Ready(Some(Err(e))) => {
4782 self.state = VarLengthMainStreamState::Done;
4783 return Poll::Ready(Some(Err(e)));
4784 }
4785 Poll::Ready(None) => {
4786 self.state = VarLengthMainStreamState::Done;
4787 return Poll::Ready(None);
4788 }
4789 Poll::Pending => {
4790 self.state = VarLengthMainStreamState::Processing(adjacency);
4791 return Poll::Pending;
4792 }
4793 }
4794 }
4795 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4796 match fut.as_mut().poll(cx) {
4797 Poll::Ready(Ok(batch)) => {
4798 self.state = VarLengthMainStreamState::Processing(adjacency);
4799 return Poll::Ready(Some(Ok(batch)));
4800 }
4801 Poll::Ready(Err(e)) => {
4802 self.state = VarLengthMainStreamState::Done;
4803 return Poll::Ready(Some(Err(e)));
4804 }
4805 Poll::Pending => {
4806 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4807 return Poll::Pending;
4808 }
4809 }
4810 }
4811 VarLengthMainStreamState::Done => {
4812 return Poll::Ready(None);
4813 }
4814 }
4815 }
4816 }
4817}
4818
4819impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4820 fn schema(&self) -> SchemaRef {
4821 self.schema.clone()
4822 }
4823}
4824
4825#[cfg(test)]
4826mod tests {
4827 use super::*;
4828
4829 #[test]
4830 fn test_traverse_schema_without_edge() {
4831 let input_schema = Arc::new(Schema::new(vec![Field::new(
4832 "a._vid",
4833 DataType::UInt64,
4834 false,
4835 )]));
4836
4837 let output_schema =
4838 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4839
4840 assert_eq!(output_schema.fields().len(), 4);
4842 assert_eq!(output_schema.field(0).name(), "a._vid");
4843 assert_eq!(output_schema.field(1).name(), "m._vid");
4844 assert_eq!(output_schema.field(2).name(), "m._labels");
4845 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4846 }
4847
4848 #[test]
4849 fn test_traverse_schema_with_edge() {
4850 let input_schema = Arc::new(Schema::new(vec![Field::new(
4851 "a._vid",
4852 DataType::UInt64,
4853 false,
4854 )]));
4855
4856 let output_schema = GraphTraverseExec::build_schema(
4857 input_schema,
4858 "m",
4859 Some("r"),
4860 &[],
4861 &[],
4862 None,
4863 None,
4864 false,
4865 );
4866
4867 assert_eq!(output_schema.fields().len(), 5);
4869 assert_eq!(output_schema.field(0).name(), "a._vid");
4870 assert_eq!(output_schema.field(1).name(), "m._vid");
4871 assert_eq!(output_schema.field(2).name(), "m._labels");
4872 assert_eq!(output_schema.field(3).name(), "r._eid");
4873 assert_eq!(output_schema.field(4).name(), "r._type");
4874 }
4875
4876 #[test]
4877 fn test_traverse_schema_with_target_properties() {
4878 let input_schema = Arc::new(Schema::new(vec![Field::new(
4879 "a._vid",
4880 DataType::UInt64,
4881 false,
4882 )]));
4883
4884 let target_props = vec!["name".to_string(), "age".to_string()];
4885 let output_schema = GraphTraverseExec::build_schema(
4886 input_schema,
4887 "m",
4888 Some("r"),
4889 &[],
4890 &target_props,
4891 None,
4892 None,
4893 false,
4894 );
4895
4896 assert_eq!(output_schema.fields().len(), 7);
4898 assert_eq!(output_schema.field(0).name(), "a._vid");
4899 assert_eq!(output_schema.field(1).name(), "m._vid");
4900 assert_eq!(output_schema.field(2).name(), "m._labels");
4901 assert_eq!(output_schema.field(3).name(), "m.name");
4902 assert_eq!(output_schema.field(4).name(), "m.age");
4903 assert_eq!(output_schema.field(5).name(), "r._eid");
4904 assert_eq!(output_schema.field(6).name(), "r._type");
4905 }
4906
4907 #[test]
4908 fn test_variable_length_schema() {
4909 let input_schema = Arc::new(Schema::new(vec![Field::new(
4910 "a._vid",
4911 DataType::UInt64,
4912 false,
4913 )]));
4914
4915 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4916 input_schema,
4917 "b",
4918 None,
4919 Some("p"),
4920 &[],
4921 None,
4922 );
4923
4924 assert_eq!(output_schema.fields().len(), 5);
4925 assert_eq!(output_schema.field(0).name(), "a._vid");
4926 assert_eq!(output_schema.field(1).name(), "b._vid");
4927 assert_eq!(output_schema.field(2).name(), "b._labels");
4928 assert_eq!(output_schema.field(3).name(), "_hop_count");
4929 assert_eq!(output_schema.field(4).name(), "p");
4930 }
4931
4932 #[test]
4933 fn test_traverse_main_schema_without_edge() {
4934 let input_schema = Arc::new(Schema::new(vec![Field::new(
4935 "a._vid",
4936 DataType::UInt64,
4937 false,
4938 )]));
4939
4940 let output_schema =
4941 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4942
4943 assert_eq!(output_schema.fields().len(), 4);
4945 assert_eq!(output_schema.field(0).name(), "a._vid");
4946 assert_eq!(output_schema.field(1).name(), "m._vid");
4947 assert_eq!(output_schema.field(2).name(), "m._labels");
4948 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4949 }
4950
4951 #[test]
4952 fn test_traverse_main_schema_with_edge() {
4953 let input_schema = Arc::new(Schema::new(vec![Field::new(
4954 "a._vid",
4955 DataType::UInt64,
4956 false,
4957 )]));
4958
4959 let output_schema = GraphTraverseMainExec::build_schema(
4960 &input_schema,
4961 "m",
4962 &Some("r".to_string()),
4963 &[],
4964 &[],
4965 false,
4966 );
4967
4968 assert_eq!(output_schema.fields().len(), 5);
4970 assert_eq!(output_schema.field(0).name(), "a._vid");
4971 assert_eq!(output_schema.field(1).name(), "m._vid");
4972 assert_eq!(output_schema.field(2).name(), "m._labels");
4973 assert_eq!(output_schema.field(3).name(), "r._eid");
4974 assert_eq!(output_schema.field(4).name(), "r._type");
4975 }
4976
4977 #[test]
4978 fn test_traverse_main_schema_with_edge_properties() {
4979 let input_schema = Arc::new(Schema::new(vec![Field::new(
4980 "a._vid",
4981 DataType::UInt64,
4982 false,
4983 )]));
4984
4985 let edge_props = vec!["weight".to_string(), "since".to_string()];
4986 let output_schema = GraphTraverseMainExec::build_schema(
4987 &input_schema,
4988 "m",
4989 &Some("r".to_string()),
4990 &edge_props,
4991 &[],
4992 false,
4993 );
4994
4995 assert_eq!(output_schema.fields().len(), 7);
4997 assert_eq!(output_schema.field(0).name(), "a._vid");
4998 assert_eq!(output_schema.field(1).name(), "m._vid");
4999 assert_eq!(output_schema.field(2).name(), "m._labels");
5000 assert_eq!(output_schema.field(3).name(), "r._eid");
5001 assert_eq!(output_schema.field(4).name(), "r._type");
5002 assert_eq!(output_schema.field(5).name(), "r.weight");
5003 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5004 assert_eq!(output_schema.field(6).name(), "r.since");
5005 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5006 }
5007
5008 #[test]
5009 fn test_traverse_main_schema_with_target_properties() {
5010 let input_schema = Arc::new(Schema::new(vec![Field::new(
5011 "a._vid",
5012 DataType::UInt64,
5013 false,
5014 )]));
5015
5016 let target_props = vec!["name".to_string(), "age".to_string()];
5017 let output_schema = GraphTraverseMainExec::build_schema(
5018 &input_schema,
5019 "m",
5020 &Some("r".to_string()),
5021 &[],
5022 &target_props,
5023 false,
5024 );
5025
5026 assert_eq!(output_schema.fields().len(), 7);
5028 assert_eq!(output_schema.field(0).name(), "a._vid");
5029 assert_eq!(output_schema.field(1).name(), "m._vid");
5030 assert_eq!(output_schema.field(2).name(), "m._labels");
5031 assert_eq!(output_schema.field(3).name(), "r._eid");
5032 assert_eq!(output_schema.field(4).name(), "r._type");
5033 assert_eq!(output_schema.field(5).name(), "m.name");
5034 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5035 assert_eq!(output_schema.field(6).name(), "m.age");
5036 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5037 }
5038}