1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68fn prepend_existing_path(
75 existing_path: &arrow_array::StructArray,
76 row_idx: usize,
77 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79 query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81 let nodes_list = existing_path
83 .column(0)
84 .as_any()
85 .downcast_ref::<arrow_array::ListArray>()
86 .unwrap();
87 let node_values = nodes_list.value(row_idx);
88 let node_struct = node_values
89 .as_any()
90 .downcast_ref::<arrow_array::StructArray>()
91 .unwrap();
92 let vid_col = node_struct
93 .column(0)
94 .as_any()
95 .downcast_ref::<UInt64Array>()
96 .unwrap();
97 for i in 0..vid_col.len() {
98 append_node_to_struct(
99 nodes_builder.values(),
100 Vid::from(vid_col.value(i)),
101 query_ctx,
102 );
103 }
104
105 let rels_list = existing_path
107 .column(1)
108 .as_any()
109 .downcast_ref::<arrow_array::ListArray>()
110 .unwrap();
111 let edge_values = rels_list.value(row_idx);
112 let edge_struct = edge_values
113 .as_any()
114 .downcast_ref::<arrow_array::StructArray>()
115 .unwrap();
116 let eid_col = edge_struct
117 .column(0)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap();
121 let type_col = edge_struct
122 .column(1)
123 .as_any()
124 .downcast_ref::<arrow_array::StringArray>()
125 .unwrap();
126 let src_col = edge_struct
127 .column(2)
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .unwrap();
131 let dst_col = edge_struct
132 .column(3)
133 .as_any()
134 .downcast_ref::<UInt64Array>()
135 .unwrap();
136 for i in 0..eid_col.len() {
137 append_edge_to_struct(
138 rels_builder.values(),
139 Eid::from(eid_col.value(i)),
140 type_col.value(i),
141 src_col.value(i),
142 dst_col.value(i),
143 query_ctx,
144 );
145 }
146}
147
148fn resolve_edge_property_type(
153 prop: &str,
154 schema_props: Option<
155 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156 >,
157) -> DataType {
158 if prop == "overflow_json" {
159 DataType::LargeBinary
160 } else if prop == "_created_at" || prop == "_updated_at" {
161 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
165 } else {
166 schema_props
167 .and_then(|props| props.get(prop))
168 .map(|meta| meta.r#type.to_arrow())
169 .unwrap_or(DataType::LargeBinary)
170 }
171}
172
173use crate::query::df_graph::common::merged_edge_schema_props;
174
175type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
177
178pub struct GraphTraverseExec {
203 input: Arc<dyn ExecutionPlan>,
205
206 source_column: String,
208
209 edge_type_ids: Vec<u32>,
211
212 direction: Direction,
214
215 target_variable: String,
217
218 edge_variable: Option<String>,
220
221 edge_properties: Vec<String>,
223
224 target_properties: Vec<String>,
226
227 target_label_name: Option<String>,
229
230 target_label_id: Option<u16>,
232
233 graph_ctx: Arc<GraphExecutionContext>,
235
236 optional: bool,
238
239 optional_pattern_vars: HashSet<String>,
242
243 bound_target_column: Option<String>,
246
247 used_edge_columns: Vec<String>,
250
251 schema: SchemaRef,
253
254 properties: Arc<PlanProperties>,
256
257 metrics: ExecutionPlanMetricsSet,
259}
260
261impl fmt::Debug for GraphTraverseExec {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("GraphTraverseExec")
264 .field("source_column", &self.source_column)
265 .field("edge_type_ids", &self.edge_type_ids)
266 .field("direction", &self.direction)
267 .field("target_variable", &self.target_variable)
268 .field("edge_variable", &self.edge_variable)
269 .finish()
270 }
271}
272
273impl GraphTraverseExec {
274 #[expect(clippy::too_many_arguments)]
290 pub fn new(
291 input: Arc<dyn ExecutionPlan>,
292 source_column: impl Into<String>,
293 edge_type_ids: Vec<u32>,
294 direction: Direction,
295 target_variable: impl Into<String>,
296 edge_variable: Option<String>,
297 edge_properties: Vec<String>,
298 target_properties: Vec<String>,
299 target_label_name: Option<String>,
300 target_label_id: Option<u16>,
301 graph_ctx: Arc<GraphExecutionContext>,
302 optional: bool,
303 optional_pattern_vars: HashSet<String>,
304 bound_target_column: Option<String>,
305 used_edge_columns: Vec<String>,
306 ) -> Self {
307 let source_column = source_column.into();
308 let target_variable = target_variable.into();
309
310 let uni_schema = graph_ctx.storage().schema_manager().schema();
312 let label_props = target_label_name
313 .as_deref()
314 .and_then(|ln| uni_schema.properties.get(ln));
315 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
316 let edge_props = if merged_edge_props.is_empty() {
317 None
318 } else {
319 Some(&merged_edge_props)
320 };
321
322 let schema = Self::build_schema(
324 input.schema(),
325 &target_variable,
326 edge_variable.as_deref(),
327 &edge_properties,
328 &target_properties,
329 label_props,
330 edge_props,
331 optional,
332 );
333
334 let properties = compute_plan_properties(schema.clone());
335
336 Self {
337 input,
338 source_column,
339 edge_type_ids,
340 direction,
341 target_variable,
342 edge_variable,
343 edge_properties,
344 target_properties,
345 target_label_name,
346 target_label_id,
347 graph_ctx,
348 optional,
349 optional_pattern_vars,
350 bound_target_column,
351 used_edge_columns,
352 schema,
353 properties,
354 metrics: ExecutionPlanMetricsSet::new(),
355 }
356 }
357
358 #[expect(
360 clippy::too_many_arguments,
361 reason = "Schema construction needs all field metadata"
362 )]
363 fn build_schema(
364 input_schema: SchemaRef,
365 target_variable: &str,
366 edge_variable: Option<&str>,
367 edge_properties: &[String],
368 target_properties: &[String],
369 label_props: Option<
370 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
371 >,
372 edge_props: Option<
373 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
374 >,
375 optional: bool,
376 ) -> SchemaRef {
377 let mut fields: Vec<Field> = input_schema
378 .fields()
379 .iter()
380 .map(|f| f.as_ref().clone())
381 .collect();
382
383 let target_vid_name = format!("{}._vid", target_variable);
385 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
386
387 fields.push(Field::new(
389 format!("{}._labels", target_variable),
390 labels_data_type(),
391 true,
392 ));
393
394 for prop_name in target_properties {
396 let col_name = format!("{}.{}", target_variable, prop_name);
397 let arrow_type = resolve_property_type(prop_name, label_props);
398 fields.push(Field::new(&col_name, arrow_type, true));
399 }
400
401 if let Some(edge_var) = edge_variable {
403 let edge_id_name = format!("{}._eid", edge_var);
404 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
405
406 fields.push(Field::new(
408 format!("{}._type", edge_var),
409 DataType::Utf8,
410 true,
411 ));
412
413 for prop_name in edge_properties {
415 let prop_col_name = format!("{}.{}", edge_var, prop_name);
416 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
417 fields.push(Field::new(&prop_col_name, arrow_type, true));
418 }
419 } else {
420 let internal_eid_name = format!("__eid_to_{}", target_variable);
423 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
424 }
425
426 Arc::new(Schema::new(fields))
427 }
428}
429
430impl DisplayAs for GraphTraverseExec {
431 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432 write!(
433 f,
434 "GraphTraverseExec: {} --[{:?}]--> {}",
435 self.source_column, self.edge_type_ids, self.target_variable
436 )?;
437 if let Some(ref edge_var) = self.edge_variable {
438 write!(f, " as {}", edge_var)?;
439 }
440 Ok(())
441 }
442}
443
444impl ExecutionPlan for GraphTraverseExec {
445 fn name(&self) -> &str {
446 "GraphTraverseExec"
447 }
448
449 fn as_any(&self) -> &dyn Any {
450 self
451 }
452
453 fn schema(&self) -> SchemaRef {
454 self.schema.clone()
455 }
456
457 fn properties(&self) -> &Arc<PlanProperties> {
458 &self.properties
459 }
460
461 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
462 vec![&self.input]
463 }
464
465 fn with_new_children(
466 self: Arc<Self>,
467 children: Vec<Arc<dyn ExecutionPlan>>,
468 ) -> DFResult<Arc<dyn ExecutionPlan>> {
469 if children.len() != 1 {
470 return Err(datafusion::error::DataFusionError::Plan(
471 "GraphTraverseExec requires exactly one child".to_string(),
472 ));
473 }
474
475 Ok(Arc::new(Self::new(
476 children[0].clone(),
477 self.source_column.clone(),
478 self.edge_type_ids.clone(),
479 self.direction,
480 self.target_variable.clone(),
481 self.edge_variable.clone(),
482 self.edge_properties.clone(),
483 self.target_properties.clone(),
484 self.target_label_name.clone(),
485 self.target_label_id,
486 self.graph_ctx.clone(),
487 self.optional,
488 self.optional_pattern_vars.clone(),
489 self.bound_target_column.clone(),
490 self.used_edge_columns.clone(),
491 )))
492 }
493
494 fn execute(
495 &self,
496 partition: usize,
497 context: Arc<TaskContext>,
498 ) -> DFResult<SendableRecordBatchStream> {
499 let input_stream = self.input.execute(partition, context)?;
500
501 let metrics = BaselineMetrics::new(&self.metrics, partition);
502
503 let warm_fut = self
504 .graph_ctx
505 .warming_future(self.edge_type_ids.clone(), self.direction);
506
507 Ok(Box::pin(GraphTraverseStream {
508 input: input_stream,
509 source_column: self.source_column.clone(),
510 edge_type_ids: self.edge_type_ids.clone(),
511 direction: self.direction,
512 target_variable: self.target_variable.clone(),
513 edge_variable: self.edge_variable.clone(),
514 edge_properties: self.edge_properties.clone(),
515 target_properties: self.target_properties.clone(),
516 target_label_name: self.target_label_name.clone(),
517 graph_ctx: self.graph_ctx.clone(),
518 optional: self.optional,
519 optional_pattern_vars: self.optional_pattern_vars.clone(),
520 bound_target_column: self.bound_target_column.clone(),
521 used_edge_columns: self.used_edge_columns.clone(),
522 schema: self.schema.clone(),
523 state: TraverseStreamState::Warming(warm_fut),
524 metrics,
525 }))
526 }
527
528 fn metrics(&self) -> Option<MetricsSet> {
529 Some(self.metrics.clone_inner())
530 }
531}
532
533enum TraverseStreamState {
535 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
537 Reading,
539 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
541 Done,
543}
544
545struct GraphTraverseStream {
547 input: SendableRecordBatchStream,
549
550 source_column: String,
552
553 edge_type_ids: Vec<u32>,
555
556 direction: Direction,
558
559 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
561 target_variable: String,
562
563 edge_variable: Option<String>,
565
566 edge_properties: Vec<String>,
568
569 target_properties: Vec<String>,
571
572 target_label_name: Option<String>,
574
575 graph_ctx: Arc<GraphExecutionContext>,
577
578 optional: bool,
580
581 optional_pattern_vars: HashSet<String>,
583
584 bound_target_column: Option<String>,
586
587 used_edge_columns: Vec<String>,
589
590 schema: SchemaRef,
592
593 state: TraverseStreamState,
595
596 metrics: BaselineMetrics,
598}
599
600impl GraphTraverseStream {
601 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
604 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
605 datafusion::error::DataFusionError::Execution(format!(
606 "Source column '{}' not found",
607 self.source_column
608 ))
609 })?;
610
611 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
612 let source_vids: &UInt64Array = &source_vid_cow;
613
614 let bound_target_cow = self
617 .bound_target_column
618 .as_ref()
619 .and_then(|col| batch.column_by_name(col))
620 .map(|c| column_as_vid_array(c.as_ref()))
621 .transpose()?;
622 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
623
624 let used_edge_arrays: Vec<&UInt64Array> = self
626 .used_edge_columns
627 .iter()
628 .filter_map(|col| {
629 batch
630 .column_by_name(col)
631 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
632 })
633 .collect();
634
635 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
636 let is_undirected = matches!(self.direction, Direction::Both);
637
638 for (row_idx, source_vid) in source_vids.iter().enumerate() {
639 let Some(src) = source_vid else {
640 continue;
641 };
642
643 let expected_target = bound_target_vids.map(|arr| {
649 if arr.is_null(row_idx) {
650 None
651 } else {
652 Some(arr.value(row_idx))
653 }
654 });
655
656 let used_eids: HashSet<u64> = used_edge_arrays
658 .iter()
659 .filter_map(|arr| {
660 if arr.is_null(row_idx) {
661 None
662 } else {
663 Some(arr.value(row_idx))
664 }
665 })
666 .collect();
667
668 let vid = Vid::from(src);
669 let mut seen_edges: HashSet<u64> = HashSet::new();
672
673 for &edge_type in &self.edge_type_ids {
674 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
675
676 for (target_vid, eid) in neighbors {
677 let eid_u64 = eid.as_u64();
678
679 if used_eids.contains(&eid_u64) {
681 continue;
682 }
683
684 if is_undirected && !seen_edges.insert(eid_u64) {
686 continue;
687 }
688
689 if let Some(expected_opt) = expected_target {
692 let Some(expected) = expected_opt else {
693 continue;
694 };
695 if target_vid.as_u64() != expected {
696 continue;
697 }
698 }
699
700 if let Some(ref label_name) = self.target_label_name {
703 let query_ctx = self.graph_ctx.query_context();
704 if let Some(vertex_labels) =
705 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
706 {
707 if !vertex_labels.contains(label_name) {
709 continue;
710 }
711 }
712 }
714
715 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
716 }
717 }
718 }
719
720 Ok(expanded_rows)
721 }
722}
723
724fn build_target_labels_column(
726 target_vids: &[Vid],
727 target_label_name: &Option<String>,
728 graph_ctx: &GraphExecutionContext,
729) -> ArrayRef {
730 use arrow_array::builder::{ListBuilder, StringBuilder};
731 let mut labels_builder = ListBuilder::new(StringBuilder::new());
732 let query_ctx = graph_ctx.query_context();
733 for vid in target_vids {
734 let row_labels: Vec<String> =
735 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
736 Some(labels) => labels,
737 None => {
738 if let Some(label_name) = target_label_name {
740 vec![label_name.clone()]
741 } else {
742 vec![]
743 }
744 }
745 };
746 let values = labels_builder.values();
747 for lbl in &row_labels {
748 values.append_value(lbl);
749 }
750 labels_builder.append(true);
751 }
752 Arc::new(labels_builder.finish())
753}
754
755async fn build_target_property_columns(
757 target_vids: &[Vid],
758 target_properties: &[String],
759 target_label_name: &Option<String>,
760 graph_ctx: &Arc<GraphExecutionContext>,
761) -> DFResult<Vec<ArrayRef>> {
762 let mut columns = Vec::new();
763
764 if let Some(label_name) = target_label_name {
765 let property_manager = graph_ctx.property_manager();
766 let query_ctx = graph_ctx.query_context();
767
768 let props_map = property_manager
769 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
770 .await
771 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
772
773 let uni_schema = graph_ctx.storage().schema_manager().schema();
774 let label_props = uni_schema.properties.get(label_name.as_str());
775
776 for prop_name in target_properties {
777 let data_type = resolve_property_type(prop_name, label_props);
778 let column =
779 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
780 columns.push(column);
781 }
782 } else {
783 let non_internal_props: Vec<&str> = target_properties
785 .iter()
786 .filter(|p| *p != "_all_props")
787 .map(|s| s.as_str())
788 .collect();
789 let property_manager = graph_ctx.property_manager();
790 let query_ctx = graph_ctx.query_context();
791
792 let props_map = if !non_internal_props.is_empty() {
793 property_manager
794 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
795 .await
796 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
797 } else {
798 std::collections::HashMap::new()
799 };
800
801 for prop_name in target_properties {
802 if prop_name == "_all_props" {
803 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
804 } else {
805 let column = build_property_column_static(
806 target_vids,
807 &props_map,
808 prop_name,
809 &arrow::datatypes::DataType::LargeBinary,
810 )?;
811 columns.push(column);
812 }
813 }
814 }
815
816 Ok(columns)
817}
818
819fn build_all_props_column(
821 target_vids: &[Vid],
822 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
823 graph_ctx: &Arc<GraphExecutionContext>,
824) -> ArrayRef {
825 use arrow_array::builder::LargeBinaryBuilder;
826
827 let mut builder = LargeBinaryBuilder::new();
828 let l0_ctx = graph_ctx.l0_context();
829 for vid in target_vids {
830 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
832 if let Some(vid_props) = props_map.get(vid) {
833 for (k, v) in vid_props.iter() {
834 merged_props.insert(k.clone(), v.clone());
835 }
836 }
837 for l0 in l0_ctx.iter_l0_buffers() {
838 let guard = l0.read();
839 if let Some(l0_props) = guard.vertex_properties.get(vid) {
840 for (k, v) in l0_props.iter() {
841 merged_props.insert(k.clone(), v.clone());
842 }
843 }
844 }
845 if merged_props.is_empty() {
846 builder.append_null();
847 } else {
848 builder.append_value(uni_common::cypher_value_codec::encode(
849 &uni_common::Value::Map(merged_props),
850 ));
851 }
852 }
853 Arc::new(builder.finish())
854}
855
856async fn build_edge_columns(
858 expansions: &[(usize, Vid, u64, u32)],
859 edge_properties: &[String],
860 edge_type_ids: &[u32],
861 graph_ctx: &Arc<GraphExecutionContext>,
862) -> DFResult<Vec<ArrayRef>> {
863 let mut columns = Vec::new();
864
865 let eids: Vec<Eid> = expansions
866 .iter()
867 .map(|(_, _, eid, _)| Eid::from(*eid))
868 .collect();
869 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
870 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
871
872 {
874 let uni_schema = graph_ctx.storage().schema_manager().schema();
875 let mut type_builder = arrow_array::builder::StringBuilder::new();
876 for (_, _, _, edge_type_id) in expansions {
877 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
878 type_builder.append_value(&name);
879 } else {
880 type_builder.append_null();
881 }
882 }
883 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
884 }
885
886 if !edge_properties.is_empty() {
887 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
888 let property_manager = graph_ctx.property_manager();
889 let query_ctx = graph_ctx.query_context();
890
891 let props_map = property_manager
892 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
893 .await
894 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
895
896 let uni_schema = graph_ctx.storage().schema_manager().schema();
897 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
898 let edge_type_props = if merged_edge_props.is_empty() {
899 None
900 } else {
901 Some(&merged_edge_props)
902 };
903
904 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
905
906 for prop_name in edge_properties {
907 if prop_name == "_created_at" || prop_name == "_updated_at" {
913 let mut builder =
914 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
915 let l0_ctx = graph_ctx.l0_context();
916 for eid in &eids {
917 let mut value: Option<i64> = None;
918 for l0 in l0_ctx.iter_l0_buffers() {
919 let guard = l0.read();
920 let opt = if prop_name == "_created_at" {
921 guard.edge_created_at.get(eid).copied()
922 } else {
923 guard.edge_updated_at.get(eid).copied()
924 };
925 if let Some(ts) = opt {
926 value = Some(match value {
927 None => ts,
928 Some(cur) if prop_name == "_created_at" => cur.min(ts),
929 Some(cur) => cur.max(ts),
930 });
931 }
932 }
933 match value {
934 Some(ts) => builder.append_value(ts),
935 None => builder.append_null(),
936 }
937 }
938 columns.push(Arc::new(builder.finish()) as ArrayRef);
939 continue;
940 }
941 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
942 let column =
943 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
944 columns.push(column);
945 }
946 }
947
948 Ok(columns)
949}
950
951#[expect(
956 clippy::too_many_arguments,
957 reason = "Standalone async fn needs all context passed explicitly"
958)]
959async fn build_traverse_output_batch(
960 input: RecordBatch,
961 expansions: Vec<(usize, Vid, u64, u32)>,
962 schema: SchemaRef,
963 edge_variable: Option<String>,
964 edge_properties: Vec<String>,
965 edge_type_ids: Vec<u32>,
966 target_properties: Vec<String>,
967 target_label_name: Option<String>,
968 graph_ctx: Arc<GraphExecutionContext>,
969 optional: bool,
970 optional_pattern_vars: HashSet<String>,
971) -> DFResult<RecordBatch> {
972 if expansions.is_empty() {
973 if !optional {
974 return Ok(RecordBatch::new_empty(schema));
975 }
976 let unmatched_reps = collect_unmatched_optional_group_rows(
977 &input,
978 &HashSet::new(),
979 &schema,
980 &optional_pattern_vars,
981 )?;
982 if unmatched_reps.is_empty() {
983 return Ok(RecordBatch::new_empty(schema));
984 }
985 return build_optional_null_batch_for_rows_with_optional_vars(
986 &input,
987 &unmatched_reps,
988 &schema,
989 &optional_pattern_vars,
990 );
991 }
992
993 let indices: Vec<u64> = expansions
995 .iter()
996 .map(|(idx, _, _, _)| *idx as u64)
997 .collect();
998 let indices_array = UInt64Array::from(indices);
999 let mut columns: Vec<ArrayRef> = input
1000 .columns()
1001 .iter()
1002 .map(|col| take(col.as_ref(), &indices_array, None))
1003 .collect::<Result<_, _>>()?;
1004
1005 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1007 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1008 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1009
1010 columns.push(build_target_labels_column(
1012 &target_vids,
1013 &target_label_name,
1014 &graph_ctx,
1015 ));
1016
1017 if !target_properties.is_empty() {
1019 let prop_cols = build_target_property_columns(
1020 &target_vids,
1021 &target_properties,
1022 &target_label_name,
1023 &graph_ctx,
1024 )
1025 .await?;
1026 columns.extend(prop_cols);
1027 }
1028
1029 if edge_variable.is_some() {
1031 let edge_cols =
1032 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1033 columns.extend(edge_cols);
1034 } else {
1035 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1036 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1037 }
1038
1039 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1040
1041 if optional {
1043 let matched_indices: HashSet<usize> =
1044 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1045 let unmatched = collect_unmatched_optional_group_rows(
1046 &input,
1047 &matched_indices,
1048 &schema,
1049 &optional_pattern_vars,
1050 )?;
1051
1052 if !unmatched.is_empty() {
1053 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1054 &input,
1055 &unmatched,
1056 &schema,
1057 &optional_pattern_vars,
1058 )?;
1059 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1060 .map_err(arrow_err)?;
1061 return Ok(combined);
1062 }
1063 }
1064
1065 Ok(expanded_batch)
1066}
1067
1068fn build_optional_null_batch_for_rows(
1071 input: &RecordBatch,
1072 unmatched_indices: &[usize],
1073 schema: &SchemaRef,
1074) -> DFResult<RecordBatch> {
1075 let num_rows = unmatched_indices.len();
1076 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1077 let indices_array = UInt64Array::from(indices);
1078
1079 let mut columns: Vec<ArrayRef> = Vec::new();
1081 for col in input.columns() {
1082 let taken = take(col.as_ref(), &indices_array, None)?;
1083 columns.push(taken);
1084 }
1085 for field in schema.fields().iter().skip(input.num_columns()) {
1087 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1088 }
1089 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1090}
1091
1092fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1093 optional_vars.contains(col_name)
1094 || optional_vars.iter().any(|var| {
1095 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1096 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1097 })
1098}
1099
1100fn collect_unmatched_optional_group_rows(
1101 input: &RecordBatch,
1102 matched_indices: &HashSet<usize>,
1103 schema: &SchemaRef,
1104 optional_vars: &HashSet<String>,
1105) -> DFResult<Vec<usize>> {
1106 if input.num_rows() == 0 {
1107 return Ok(Vec::new());
1108 }
1109
1110 if optional_vars.is_empty() {
1111 return Ok((0..input.num_rows())
1112 .filter(|idx| !matched_indices.contains(idx))
1113 .collect());
1114 }
1115
1116 let source_vid_indices: Vec<usize> = schema
1117 .fields()
1118 .iter()
1119 .enumerate()
1120 .filter_map(|(idx, field)| {
1121 if idx >= input.num_columns() {
1122 return None;
1123 }
1124 let name = field.name();
1125 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1126 Some(idx)
1127 } else {
1128 None
1129 }
1130 })
1131 .collect();
1132
1133 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1136
1137 for row_idx in 0..input.num_rows() {
1138 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1139 let entry = groups.entry(key.clone());
1140 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1141 group_order.push(key.clone());
1142 }
1143 let matched = matched_indices.contains(&row_idx);
1144 entry
1145 .and_modify(|(_, any_matched)| *any_matched |= matched)
1146 .or_insert((row_idx, matched));
1147 }
1148
1149 Ok(group_order
1150 .into_iter()
1151 .filter_map(|key| {
1152 groups
1153 .get(&key)
1154 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1155 })
1156 .collect())
1157}
1158
1159fn compute_optional_group_key(
1160 batch: &RecordBatch,
1161 row_idx: usize,
1162 source_vid_indices: &[usize],
1163) -> DFResult<Vec<u8>> {
1164 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1165 for &col_idx in source_vid_indices {
1166 let col = batch.column(col_idx);
1167 let vid_cow = column_as_vid_array(col.as_ref())?;
1168 let arr: &UInt64Array = &vid_cow;
1169 if arr.is_null(row_idx) {
1170 key.extend_from_slice(&u64::MAX.to_le_bytes());
1171 } else {
1172 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1173 }
1174 }
1175 Ok(key)
1176}
1177
1178fn build_optional_null_batch_for_rows_with_optional_vars(
1179 input: &RecordBatch,
1180 unmatched_indices: &[usize],
1181 schema: &SchemaRef,
1182 optional_vars: &HashSet<String>,
1183) -> DFResult<RecordBatch> {
1184 if optional_vars.is_empty() {
1185 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1186 }
1187
1188 let num_rows = unmatched_indices.len();
1189 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1190 let indices_array = UInt64Array::from(indices);
1191
1192 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1193 for (col_idx, field) in schema.fields().iter().enumerate() {
1194 if col_idx < input.num_columns() {
1195 if is_optional_column_for_vars(field.name(), optional_vars) {
1196 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1197 } else {
1198 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1199 columns.push(taken);
1200 }
1201 } else {
1202 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1203 }
1204 }
1205
1206 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1207}
1208
1209impl Stream for GraphTraverseStream {
1210 type Item = DFResult<RecordBatch>;
1211
1212 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1213 let metrics = self.metrics.clone();
1214 let _timer = metrics.elapsed_compute().timer();
1215 loop {
1216 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1217
1218 match state {
1219 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1220 Poll::Ready(Ok(())) => {
1221 self.state = TraverseStreamState::Reading;
1222 }
1224 Poll::Ready(Err(e)) => {
1225 self.state = TraverseStreamState::Done;
1226 return Poll::Ready(Some(Err(e)));
1227 }
1228 Poll::Pending => {
1229 self.state = TraverseStreamState::Warming(fut);
1230 return Poll::Pending;
1231 }
1232 },
1233 TraverseStreamState::Reading => {
1234 if let Err(e) = self.graph_ctx.check_timeout() {
1236 return Poll::Ready(Some(Err(
1237 datafusion::error::DataFusionError::Execution(e.to_string()),
1238 )));
1239 }
1240
1241 match self.input.poll_next_unpin(cx) {
1242 Poll::Ready(Some(Ok(batch))) => {
1243 let expansions = match self.expand_neighbors(&batch) {
1245 Ok(exp) => exp,
1246 Err(e) => {
1247 self.state = TraverseStreamState::Reading;
1248 return Poll::Ready(Some(Err(e)));
1249 }
1250 };
1251
1252 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1254 {
1255 let result = build_traverse_output_batch_sync(
1256 &batch,
1257 &expansions,
1258 &self.schema,
1259 self.edge_variable.as_ref(),
1260 &self.graph_ctx,
1261 self.optional,
1262 &self.optional_pattern_vars,
1263 );
1264 self.state = TraverseStreamState::Reading;
1265 if let Ok(ref r) = result {
1266 self.metrics.record_output(r.num_rows());
1267 }
1268 return Poll::Ready(Some(result));
1269 }
1270
1271 let schema = self.schema.clone();
1273 let edge_variable = self.edge_variable.clone();
1274 let edge_properties = self.edge_properties.clone();
1275 let edge_type_ids = self.edge_type_ids.clone();
1276 let target_properties = self.target_properties.clone();
1277 let target_label_name = self.target_label_name.clone();
1278 let graph_ctx = self.graph_ctx.clone();
1279
1280 let optional = self.optional;
1281 let optional_pattern_vars = self.optional_pattern_vars.clone();
1282
1283 let fut = build_traverse_output_batch(
1284 batch,
1285 expansions,
1286 schema,
1287 edge_variable,
1288 edge_properties,
1289 edge_type_ids,
1290 target_properties,
1291 target_label_name,
1292 graph_ctx,
1293 optional,
1294 optional_pattern_vars,
1295 );
1296
1297 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1298 }
1300 Poll::Ready(Some(Err(e))) => {
1301 self.state = TraverseStreamState::Done;
1302 return Poll::Ready(Some(Err(e)));
1303 }
1304 Poll::Ready(None) => {
1305 self.state = TraverseStreamState::Done;
1306 return Poll::Ready(None);
1307 }
1308 Poll::Pending => {
1309 self.state = TraverseStreamState::Reading;
1310 return Poll::Pending;
1311 }
1312 }
1313 }
1314 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1315 Poll::Ready(Ok(batch)) => {
1316 self.state = TraverseStreamState::Reading;
1317 self.metrics.record_output(batch.num_rows());
1318 return Poll::Ready(Some(Ok(batch)));
1319 }
1320 Poll::Ready(Err(e)) => {
1321 self.state = TraverseStreamState::Done;
1322 return Poll::Ready(Some(Err(e)));
1323 }
1324 Poll::Pending => {
1325 self.state = TraverseStreamState::Materializing(fut);
1326 return Poll::Pending;
1327 }
1328 },
1329 TraverseStreamState::Done => {
1330 return Poll::Ready(None);
1331 }
1332 }
1333 }
1334 }
1335}
1336
1337fn build_traverse_output_batch_sync(
1342 input: &RecordBatch,
1343 expansions: &[(usize, Vid, u64, u32)],
1344 schema: &SchemaRef,
1345 edge_variable: Option<&String>,
1346 graph_ctx: &GraphExecutionContext,
1347 optional: bool,
1348 optional_pattern_vars: &HashSet<String>,
1349) -> DFResult<RecordBatch> {
1350 if expansions.is_empty() {
1351 if !optional {
1352 return Ok(RecordBatch::new_empty(schema.clone()));
1353 }
1354 let unmatched_reps = collect_unmatched_optional_group_rows(
1355 input,
1356 &HashSet::new(),
1357 schema,
1358 optional_pattern_vars,
1359 )?;
1360 if unmatched_reps.is_empty() {
1361 return Ok(RecordBatch::new_empty(schema.clone()));
1362 }
1363 return build_optional_null_batch_for_rows_with_optional_vars(
1364 input,
1365 &unmatched_reps,
1366 schema,
1367 optional_pattern_vars,
1368 );
1369 }
1370
1371 let indices: Vec<u64> = expansions
1372 .iter()
1373 .map(|(idx, _, _, _)| *idx as u64)
1374 .collect();
1375 let indices_array = UInt64Array::from(indices);
1376
1377 let mut columns: Vec<ArrayRef> = Vec::new();
1378 for col in input.columns() {
1379 let expanded = take(col.as_ref(), &indices_array, None)?;
1380 columns.push(expanded);
1381 }
1382
1383 let target_vids: Vec<u64> = expansions
1385 .iter()
1386 .map(|(_, vid, _, _)| vid.as_u64())
1387 .collect();
1388 columns.push(Arc::new(UInt64Array::from(target_vids)));
1389
1390 {
1392 use arrow_array::builder::{ListBuilder, StringBuilder};
1393 let l0_ctx = graph_ctx.l0_context();
1394 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1395 for (_, vid, _, _) in expansions {
1396 let mut row_labels: Vec<String> = Vec::new();
1397 for l0 in l0_ctx.iter_l0_buffers() {
1398 let guard = l0.read();
1399 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1400 for lbl in l0_labels {
1401 if !row_labels.contains(lbl) {
1402 row_labels.push(lbl.clone());
1403 }
1404 }
1405 }
1406 }
1407 let values = labels_builder.values();
1408 for lbl in &row_labels {
1409 values.append_value(lbl);
1410 }
1411 labels_builder.append(true);
1412 }
1413 columns.push(Arc::new(labels_builder.finish()));
1414 }
1415
1416 if edge_variable.is_some() {
1418 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1419 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1420
1421 let uni_schema = graph_ctx.storage().schema_manager().schema();
1423 let mut type_builder = arrow_array::builder::StringBuilder::new();
1424 for (_, _, _, edge_type_id) in expansions {
1425 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1426 type_builder.append_value(&name);
1427 } else {
1428 type_builder.append_null();
1429 }
1430 }
1431 columns.push(Arc::new(type_builder.finish()));
1432 } else {
1433 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1435 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1436 }
1437
1438 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1439
1440 if optional {
1441 let matched_indices: HashSet<usize> =
1442 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1443 let unmatched = collect_unmatched_optional_group_rows(
1444 input,
1445 &matched_indices,
1446 schema,
1447 optional_pattern_vars,
1448 )?;
1449
1450 if !unmatched.is_empty() {
1451 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1452 input,
1453 &unmatched,
1454 schema,
1455 optional_pattern_vars,
1456 )?;
1457 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1458 .map_err(|e| {
1459 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1460 })?;
1461 return Ok(combined);
1462 }
1463 }
1464
1465 Ok(expanded_batch)
1466}
1467
1468impl RecordBatchStream for GraphTraverseStream {
1469 fn schema(&self) -> SchemaRef {
1470 self.schema.clone()
1471 }
1472}
1473
1474type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1480
1481pub struct GraphTraverseMainExec {
1504 input: Arc<dyn ExecutionPlan>,
1506
1507 source_column: String,
1509
1510 type_names: Vec<String>,
1513
1514 direction: Direction,
1516
1517 target_variable: String,
1519
1520 edge_variable: Option<String>,
1522
1523 edge_properties: Vec<String>,
1525
1526 target_properties: Vec<String>,
1528
1529 graph_ctx: Arc<GraphExecutionContext>,
1531
1532 optional: bool,
1534
1535 optional_pattern_vars: HashSet<String>,
1537
1538 bound_target_column: Option<String>,
1541
1542 used_edge_columns: Vec<String>,
1545
1546 schema: SchemaRef,
1548
1549 properties: Arc<PlanProperties>,
1551
1552 metrics: ExecutionPlanMetricsSet,
1554}
1555
1556impl fmt::Debug for GraphTraverseMainExec {
1557 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558 f.debug_struct("GraphTraverseMainExec")
1559 .field("type_names", &self.type_names)
1560 .field("direction", &self.direction)
1561 .field("target_variable", &self.target_variable)
1562 .field("edge_variable", &self.edge_variable)
1563 .finish()
1564 }
1565}
1566
1567impl GraphTraverseMainExec {
1568 #[expect(clippy::too_many_arguments)]
1570 pub fn new(
1571 input: Arc<dyn ExecutionPlan>,
1572 source_column: impl Into<String>,
1573 type_names: Vec<String>,
1574 direction: Direction,
1575 target_variable: impl Into<String>,
1576 edge_variable: Option<String>,
1577 edge_properties: Vec<String>,
1578 target_properties: Vec<String>,
1579 graph_ctx: Arc<GraphExecutionContext>,
1580 optional: bool,
1581 optional_pattern_vars: HashSet<String>,
1582 bound_target_column: Option<String>,
1583 used_edge_columns: Vec<String>,
1584 ) -> Self {
1585 let source_column = source_column.into();
1586 let target_variable = target_variable.into();
1587
1588 let schema = Self::build_schema(
1590 &input.schema(),
1591 &target_variable,
1592 &edge_variable,
1593 &edge_properties,
1594 &target_properties,
1595 optional,
1596 );
1597
1598 let properties = compute_plan_properties(schema.clone());
1599
1600 Self {
1601 input,
1602 source_column,
1603 type_names,
1604 direction,
1605 target_variable,
1606 edge_variable,
1607 edge_properties,
1608 target_properties,
1609 graph_ctx,
1610 optional,
1611 optional_pattern_vars,
1612 bound_target_column,
1613 used_edge_columns,
1614 schema,
1615 properties,
1616 metrics: ExecutionPlanMetricsSet::new(),
1617 }
1618 }
1619
1620 fn build_schema(
1622 input_schema: &SchemaRef,
1623 target_variable: &str,
1624 edge_variable: &Option<String>,
1625 edge_properties: &[String],
1626 target_properties: &[String],
1627 optional: bool,
1628 ) -> SchemaRef {
1629 let mut fields: Vec<Field> = input_schema
1630 .fields()
1631 .iter()
1632 .map(|f| f.as_ref().clone())
1633 .collect();
1634
1635 let target_vid_name = format!("{}._vid", target_variable);
1637 if input_schema.column_with_name(&target_vid_name).is_none() {
1638 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1639 }
1640
1641 let target_labels_name = format!("{}._labels", target_variable);
1643 if input_schema.column_with_name(&target_labels_name).is_none() {
1644 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1645 }
1646
1647 if let Some(edge_var) = edge_variable {
1649 fields.push(Field::new(
1650 format!("{}._eid", edge_var),
1651 DataType::UInt64,
1652 optional,
1653 ));
1654
1655 fields.push(Field::new(
1657 format!("{}._type", edge_var),
1658 DataType::Utf8,
1659 true,
1660 ));
1661
1662 for prop in edge_properties {
1666 let col_name = format!("{}.{}", edge_var, prop);
1667 let mut metadata = std::collections::HashMap::new();
1668 metadata.insert("cv_encoded".to_string(), "true".to_string());
1669 fields.push(
1670 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1671 );
1672 }
1673 } else {
1674 fields.push(Field::new(
1677 format!("__eid_to_{}", target_variable),
1678 DataType::UInt64,
1679 optional,
1680 ));
1681 }
1682
1683 for prop in target_properties {
1685 fields.push(Field::new(
1686 format!("{}.{}", target_variable, prop),
1687 DataType::LargeBinary,
1688 true,
1689 ));
1690 }
1691
1692 Arc::new(Schema::new(fields))
1693 }
1694}
1695
1696impl DisplayAs for GraphTraverseMainExec {
1697 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698 write!(
1699 f,
1700 "GraphTraverseMainExec: types={:?}, direction={:?}",
1701 self.type_names, self.direction
1702 )
1703 }
1704}
1705
1706impl ExecutionPlan for GraphTraverseMainExec {
1707 fn name(&self) -> &str {
1708 "GraphTraverseMainExec"
1709 }
1710
1711 fn as_any(&self) -> &dyn Any {
1712 self
1713 }
1714
1715 fn schema(&self) -> SchemaRef {
1716 self.schema.clone()
1717 }
1718
1719 fn properties(&self) -> &Arc<PlanProperties> {
1720 &self.properties
1721 }
1722
1723 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1724 vec![&self.input]
1725 }
1726
1727 fn with_new_children(
1728 self: Arc<Self>,
1729 children: Vec<Arc<dyn ExecutionPlan>>,
1730 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1731 if children.len() != 1 {
1732 return Err(datafusion::error::DataFusionError::Plan(
1733 "GraphTraverseMainExec expects exactly one child".to_string(),
1734 ));
1735 }
1736
1737 Ok(Arc::new(Self {
1738 input: children[0].clone(),
1739 source_column: self.source_column.clone(),
1740 type_names: self.type_names.clone(),
1741 direction: self.direction,
1742 target_variable: self.target_variable.clone(),
1743 edge_variable: self.edge_variable.clone(),
1744 edge_properties: self.edge_properties.clone(),
1745 target_properties: self.target_properties.clone(),
1746 graph_ctx: self.graph_ctx.clone(),
1747 optional: self.optional,
1748 optional_pattern_vars: self.optional_pattern_vars.clone(),
1749 bound_target_column: self.bound_target_column.clone(),
1750 used_edge_columns: self.used_edge_columns.clone(),
1751 schema: self.schema.clone(),
1752 properties: self.properties.clone(),
1753 metrics: self.metrics.clone(),
1754 }))
1755 }
1756
1757 fn execute(
1758 &self,
1759 partition: usize,
1760 context: Arc<TaskContext>,
1761 ) -> DFResult<SendableRecordBatchStream> {
1762 let input_stream = self.input.execute(partition, context)?;
1763 let metrics = BaselineMetrics::new(&self.metrics, partition);
1764
1765 Ok(Box::pin(GraphTraverseMainStream::new(
1766 input_stream,
1767 self.source_column.clone(),
1768 self.type_names.clone(),
1769 self.direction,
1770 self.target_variable.clone(),
1771 self.edge_variable.clone(),
1772 self.edge_properties.clone(),
1773 self.target_properties.clone(),
1774 self.graph_ctx.clone(),
1775 self.optional,
1776 self.optional_pattern_vars.clone(),
1777 self.bound_target_column.clone(),
1778 self.used_edge_columns.clone(),
1779 self.schema.clone(),
1780 metrics,
1781 )))
1782 }
1783
1784 fn metrics(&self) -> Option<MetricsSet> {
1785 Some(self.metrics.clone_inner())
1786 }
1787}
1788
1789const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1795
1796enum GraphTraverseMainState {
1804 CollectingInput {
1806 input_stream: SendableRecordBatchStream,
1807 buffered: Vec<RecordBatch>,
1808 },
1809 LoadingEdges {
1811 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1812 buffered: Vec<RecordBatch>,
1813 },
1814 Processing {
1816 adjacency: EdgeAdjacencyMap,
1817 buffered: std::vec::IntoIter<RecordBatch>,
1818 },
1819 Done,
1821}
1822
1823struct GraphTraverseMainStream {
1825 source_column: String,
1827
1828 target_variable: String,
1830
1831 edge_variable: Option<String>,
1833
1834 edge_properties: Vec<String>,
1836
1837 target_properties: Vec<String>,
1839
1840 graph_ctx: Arc<GraphExecutionContext>,
1842
1843 optional: bool,
1845
1846 optional_pattern_vars: HashSet<String>,
1848
1849 bound_target_column: Option<String>,
1851
1852 used_edge_columns: Vec<String>,
1854
1855 schema: SchemaRef,
1857
1858 type_names: Vec<String>,
1860
1861 direction: Direction,
1863
1864 state: GraphTraverseMainState,
1866
1867 metrics: BaselineMetrics,
1869}
1870
1871impl GraphTraverseMainStream {
1872 #[expect(clippy::too_many_arguments)]
1874 fn new(
1875 input_stream: SendableRecordBatchStream,
1876 source_column: String,
1877 type_names: Vec<String>,
1878 direction: Direction,
1879 target_variable: String,
1880 edge_variable: Option<String>,
1881 edge_properties: Vec<String>,
1882 target_properties: Vec<String>,
1883 graph_ctx: Arc<GraphExecutionContext>,
1884 optional: bool,
1885 optional_pattern_vars: HashSet<String>,
1886 bound_target_column: Option<String>,
1887 used_edge_columns: Vec<String>,
1888 schema: SchemaRef,
1889 metrics: BaselineMetrics,
1890 ) -> Self {
1891 Self {
1894 source_column,
1895 target_variable,
1896 edge_variable,
1897 edge_properties,
1898 target_properties,
1899 graph_ctx,
1900 optional,
1901 optional_pattern_vars,
1902 bound_target_column,
1903 used_edge_columns,
1904 schema,
1905 type_names,
1906 direction,
1907 state: GraphTraverseMainState::CollectingInput {
1908 input_stream,
1909 buffered: Vec::new(),
1910 },
1911 metrics,
1912 }
1913 }
1914
1915 fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1920 let mut vids: HashSet<Vid> = HashSet::new();
1921 for batch in buffered {
1922 let col = batch.column_by_name(&self.source_column)?;
1923 let arr = column_as_vid_array(col.as_ref()).ok()?;
1924 for i in 0..arr.len() {
1925 if !arr.is_null(i) {
1926 vids.insert(Vid::from(arr.value(i)));
1927 if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1928 return None;
1929 }
1930 }
1931 }
1932 }
1933 Some(vids)
1934 }
1935
1936 fn expand_batch(
1938 &self,
1939 input: &RecordBatch,
1940 adjacency: &EdgeAdjacencyMap,
1941 ) -> DFResult<RecordBatch> {
1942 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1944 datafusion::error::DataFusionError::Execution(format!(
1945 "Source column {} not found",
1946 self.source_column
1947 ))
1948 })?;
1949
1950 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1951 let source_vids: &UInt64Array = &source_vid_cow;
1952
1953 let bound_target_cow = self
1955 .bound_target_column
1956 .as_ref()
1957 .and_then(|col| input.column_by_name(col))
1958 .map(|c| column_as_vid_array(c.as_ref()))
1959 .transpose()?;
1960 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1961
1962 let used_edge_arrays: Vec<&UInt64Array> = self
1964 .used_edge_columns
1965 .iter()
1966 .filter_map(|col| {
1967 input
1968 .column_by_name(col)
1969 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1970 })
1971 .collect();
1972
1973 type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1976 let mut expansions: Vec<Expansion> = Vec::new();
1977
1978 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1979 if let Some(src_u64) = src_u64 {
1980 let src_vid = Vid::from(src_u64);
1981
1982 let used_eids: HashSet<u64> = used_edge_arrays
1984 .iter()
1985 .filter_map(|arr| {
1986 if arr.is_null(row_idx) {
1987 None
1988 } else {
1989 Some(arr.value(row_idx))
1990 }
1991 })
1992 .collect();
1993
1994 if let Some(neighbors) = adjacency.get(&src_vid) {
1995 for (target_vid, eid, edge_type, props) in neighbors {
1996 if used_eids.contains(&eid.as_u64()) {
1998 continue;
1999 }
2000
2001 if let Some(targets) = expected_targets {
2004 if targets.is_null(row_idx) {
2005 continue;
2006 }
2007 let expected_vid = targets.value(row_idx);
2008 if target_vid.as_u64() != expected_vid {
2009 continue;
2010 }
2011 }
2012
2013 expansions.push((
2014 row_idx,
2015 *target_vid,
2016 *eid,
2017 Arc::clone(edge_type),
2018 Arc::clone(props),
2019 ));
2020 }
2021 }
2022 }
2023 }
2024
2025 if expansions.is_empty() && self.optional {
2027 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2029 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2030 }
2031
2032 if expansions.is_empty() {
2033 return Ok(RecordBatch::new_empty(self.schema.clone()));
2035 }
2036
2037 let matched_rows: HashSet<usize> = if self.optional {
2039 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2040 } else {
2041 HashSet::new()
2042 };
2043
2044 let mut columns: Vec<ArrayRef> = Vec::new();
2046 let indices: Vec<u64> = expansions
2047 .iter()
2048 .map(|(idx, _, _, _, _)| *idx as u64)
2049 .collect();
2050 let indices_array = UInt64Array::from(indices);
2051
2052 for col in input.columns() {
2053 let expanded = take(col.as_ref(), &indices_array, None)?;
2054 columns.push(expanded);
2055 }
2056
2057 let target_vid_name = format!("{}._vid", self.target_variable);
2059 let target_vids: Vec<u64> = expansions
2060 .iter()
2061 .map(|(_, vid, _, _, _)| vid.as_u64())
2062 .collect();
2063 if input.schema().column_with_name(&target_vid_name).is_none() {
2064 columns.push(Arc::new(UInt64Array::from(target_vids)));
2065 }
2066
2067 let target_labels_name = format!("{}._labels", self.target_variable);
2069 if input
2070 .schema()
2071 .column_with_name(&target_labels_name)
2072 .is_none()
2073 {
2074 use arrow_array::builder::{ListBuilder, StringBuilder};
2075 let l0_ctx = self.graph_ctx.l0_context();
2076 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2077 for (_, target_vid, _, _, _) in &expansions {
2078 let mut row_labels: Vec<String> = Vec::new();
2079 for l0 in l0_ctx.iter_l0_buffers() {
2080 let guard = l0.read();
2081 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2082 for lbl in l0_labels {
2083 if !row_labels.contains(lbl) {
2084 row_labels.push(lbl.clone());
2085 }
2086 }
2087 }
2088 }
2089 let values = labels_builder.values();
2090 for lbl in &row_labels {
2091 values.append_value(lbl);
2092 }
2093 labels_builder.append(true);
2094 }
2095 columns.push(Arc::new(labels_builder.finish()));
2096 }
2097
2098 if self.edge_variable.is_some() {
2101 let eids: Vec<u64> = expansions
2103 .iter()
2104 .map(|(_, _, eid, _, _)| eid.as_u64())
2105 .collect();
2106 columns.push(Arc::new(UInt64Array::from(eids)));
2107
2108 {
2110 let mut type_builder = arrow_array::builder::StringBuilder::new();
2111 for (_, _, _, edge_type, _) in &expansions {
2112 type_builder.append_value(edge_type);
2113 }
2114 columns.push(Arc::new(type_builder.finish()));
2115 }
2116
2117 for prop_name in &self.edge_properties {
2119 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2120 if prop_name == "_all_props" {
2121 for (_, _, _, _, props) in &expansions {
2124 if props.is_empty() {
2125 builder.append_null();
2126 } else {
2127 let map: HashMap<String, uni_common::Value> =
2128 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2129 builder.append_value(uni_common::cypher_value_codec::encode(
2130 &uni_common::Value::Map(map),
2131 ));
2132 }
2133 }
2134 } else {
2135 for (_, _, _, _, props) in &expansions {
2137 match props.get(prop_name) {
2138 Some(uni_common::Value::Null) | None => builder.append_null(),
2139 Some(val) => {
2140 builder.append_value(uni_common::cypher_value_codec::encode(val));
2141 }
2142 }
2143 }
2144 }
2145 columns.push(Arc::new(builder.finish()));
2146 }
2147 } else {
2148 let eids: Vec<u64> = expansions
2149 .iter()
2150 .map(|(_, _, eid, _, _)| eid.as_u64())
2151 .collect();
2152 columns.push(Arc::new(UInt64Array::from(eids)));
2153 }
2154
2155 {
2157 let l0_ctx = self.graph_ctx.l0_context();
2158
2159 for prop_name in &self.target_properties {
2160 if prop_name == "_all_props" {
2161 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2163 for (_, target_vid, _, _, _) in &expansions {
2164 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2166 for l0 in l0_ctx.iter_l0_buffers() {
2167 let guard = l0.read();
2168 if let Some(props) = guard.vertex_properties.get(target_vid) {
2169 for (k, v) in props.iter() {
2170 merged_props.insert(k.clone(), v.clone());
2171 }
2172 }
2173 }
2174 if merged_props.is_empty() {
2175 builder.append_null();
2176 } else {
2177 builder.append_value(uni_common::cypher_value_codec::encode(
2178 &uni_common::Value::Map(merged_props),
2179 ));
2180 }
2181 }
2182 columns.push(Arc::new(builder.finish()));
2183 } else {
2184 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2186 for (_, target_vid, _, _, _) in &expansions {
2187 let mut found = false;
2188 for l0 in l0_ctx.iter_l0_buffers() {
2189 let guard = l0.read();
2190 if let Some(props) = guard.vertex_properties.get(target_vid)
2191 && let Some(val) = props.get(prop_name.as_str())
2192 && !val.is_null()
2193 {
2194 builder.append_value(uni_common::cypher_value_codec::encode(val));
2195 found = true;
2196 break;
2197 }
2198 }
2199 if !found {
2200 builder.append_null();
2201 }
2202 }
2203 columns.push(Arc::new(builder.finish()));
2204 }
2205 }
2206 }
2207
2208 let matched_batch =
2209 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2210
2211 if self.optional {
2213 let unmatched = collect_unmatched_optional_group_rows(
2214 input,
2215 &matched_rows,
2216 &self.schema,
2217 &self.optional_pattern_vars,
2218 )?;
2219
2220 if unmatched.is_empty() {
2221 return Ok(matched_batch);
2222 }
2223
2224 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2225 input,
2226 &unmatched,
2227 &self.schema,
2228 &self.optional_pattern_vars,
2229 )?;
2230
2231 use arrow::compute::concat_batches;
2233 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2234 } else {
2235 Ok(matched_batch)
2236 }
2237 }
2238}
2239
2240impl GraphExecutionContext {
2241 fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2252 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2253 return;
2254 };
2255 let guard = tx_l0.read();
2256 let Some(read_set) = &guard.occ_read_set else {
2257 return;
2258 };
2259 let mut rs = read_set.lock();
2260 for (src, neighbors) in adjacency {
2261 rs.vertices.insert(*src);
2262 for (nbr, eid, _type, _props) in neighbors {
2263 rs.vertices.insert(*nbr);
2264 rs.edges.insert(*eid);
2265 }
2266 }
2267 }
2268}
2269
2270async fn build_edge_adjacency_map(
2281 graph_ctx: &GraphExecutionContext,
2282 type_names: &[String],
2283 direction: Direction,
2284 source_vids: Option<HashSet<Vid>>,
2285) -> DFResult<EdgeAdjacencyMap> {
2286 let storage = graph_ctx.storage();
2287 let l0_ctx = graph_ctx.l0_context();
2288
2289 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2292 let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2293 let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2294 let side = match direction {
2295 Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2296 Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2297 Direction::Both => uni_store::storage::EndpointSide::Either,
2298 };
2299 (side, vids)
2300 });
2301 let edges_with_type = storage
2302 .find_edges_by_type_names(&type_refs, endpoint_filter)
2303 .await
2304 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2305
2306 let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2310 match &source_vids {
2311 None => true,
2312 Some(set) => match direction {
2313 Direction::Outgoing => set.contains(&src),
2314 Direction::Incoming => set.contains(&dst),
2315 Direction::Both => set.contains(&src) || set.contains(&dst),
2316 },
2317 }
2318 };
2319
2320 let mut edges: Vec<(
2322 uni_common::Eid,
2323 uni_common::Vid,
2324 uni_common::Vid,
2325 String,
2326 uni_common::Properties,
2327 )> = edges_with_type.into_iter().collect();
2328
2329 for l0 in l0_ctx.iter_l0_buffers() {
2331 let l0_guard = l0.read();
2332
2333 for type_name in type_names {
2334 let l0_eids = l0_guard.eids_for_type(type_name);
2335
2336 for &eid in &l0_eids {
2338 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2339 let src_vid = edge_ref.src_vid;
2340 let dst_vid = edge_ref.dst_vid;
2341 if !edge_in_scope(src_vid, dst_vid) {
2342 continue;
2343 }
2344
2345 let props = l0_guard
2347 .edge_properties
2348 .get(&eid)
2349 .cloned()
2350 .unwrap_or_default();
2351
2352 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2353 }
2354 }
2355 }
2356 }
2357
2358 let mut seen_eids = HashSet::new();
2360 let mut unique_edges = Vec::new();
2361 for edge in edges.into_iter().rev() {
2362 if seen_eids.insert(edge.0) {
2363 unique_edges.push(edge);
2364 }
2365 }
2366 unique_edges.reverse();
2367
2368 let mut tombstoned_eids = HashSet::new();
2370 for l0 in l0_ctx.iter_l0_buffers() {
2371 let l0_guard = l0.read();
2372 for eid in l0_guard.tombstones.keys() {
2373 tombstoned_eids.insert(*eid);
2374 }
2375 }
2376 if !tombstoned_eids.is_empty() {
2377 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2378 }
2379
2380 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2383
2384 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2385 let edge_type: Arc<str> = edge_type.into();
2386 let props = Arc::new(props);
2387 match direction {
2388 Direction::Outgoing => {
2389 adjacency
2390 .entry(src_vid)
2391 .or_default()
2392 .push((dst_vid, eid, edge_type, props));
2393 }
2394 Direction::Incoming => {
2395 adjacency
2396 .entry(dst_vid)
2397 .or_default()
2398 .push((src_vid, eid, edge_type, props));
2399 }
2400 Direction::Both => {
2401 adjacency.entry(src_vid).or_default().push((
2402 dst_vid,
2403 eid,
2404 Arc::clone(&edge_type),
2405 Arc::clone(&props),
2406 ));
2407 adjacency
2408 .entry(dst_vid)
2409 .or_default()
2410 .push((src_vid, eid, edge_type, props));
2411 }
2412 }
2413 }
2414
2415 graph_ctx.record_edge_adjacency(&adjacency);
2423
2424 Ok(adjacency)
2425}
2426
2427async fn build_edge_property_filter(
2442 graph_ctx: &Arc<GraphExecutionContext>,
2443 edge_type_ids: &[u32],
2444 direction: Direction,
2445 conditions: &[(String, UniValue)],
2446) -> DFResult<EidFilter> {
2447 if conditions.is_empty() {
2448 return Ok(EidFilter::AllAllowed);
2449 }
2450
2451 let uni_schema = graph_ctx.storage().schema_manager().schema();
2452 let type_names: Vec<String> = edge_type_ids
2453 .iter()
2454 .filter_map(|id| uni_schema.edge_type_name_by_id_unified(*id))
2455 .collect();
2456 if type_names.is_empty() {
2457 return Ok(EidFilter::AllAllowed);
2458 }
2459
2460 let adjacency = build_edge_adjacency_map(graph_ctx, &type_names, direction, None).await?;
2464
2465 let mut passing: Vec<u64> = Vec::new();
2466 let mut max_eid: u64 = 0;
2467 let mut seen: FxHashSet<u64> = FxHashSet::default();
2468 for edges in adjacency.values() {
2469 for (_neighbor, eid, _etype, props) in edges {
2470 let raw = eid.as_u64();
2471 if !seen.insert(raw) {
2474 continue;
2475 }
2476 max_eid = max_eid.max(raw);
2477 let passes = conditions
2478 .iter()
2479 .all(|(name, expected)| props.get(name).is_some_and(|actual| actual == expected));
2480 if passes {
2481 passing.push(raw);
2482 }
2483 }
2484 }
2485
2486 Ok(EidFilter::from_eids(passing, max_eid as usize + 1))
2487}
2488
2489impl Stream for GraphTraverseMainStream {
2490 type Item = DFResult<RecordBatch>;
2491
2492 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2493 let metrics = self.metrics.clone();
2494 let _timer = metrics.elapsed_compute().timer();
2495 loop {
2496 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2497
2498 match state {
2499 GraphTraverseMainState::CollectingInput {
2500 mut input_stream,
2501 mut buffered,
2502 } => match input_stream.poll_next_unpin(cx) {
2503 Poll::Ready(Some(Ok(batch))) => {
2504 buffered.push(batch);
2505 self.state = GraphTraverseMainState::CollectingInput {
2506 input_stream,
2507 buffered,
2508 };
2509 }
2511 Poll::Ready(Some(Err(e))) => {
2512 self.state = GraphTraverseMainState::Done;
2513 return Poll::Ready(Some(Err(e)));
2514 }
2515 Poll::Ready(None) => {
2516 let source_vids = self.collect_source_vids(&buffered);
2519 let loading_ctx = self.graph_ctx.clone();
2520 let loading_types = self.type_names.clone();
2521 let direction = self.direction;
2522 let fut = async move {
2523 build_edge_adjacency_map(
2524 &loading_ctx,
2525 &loading_types,
2526 direction,
2527 source_vids,
2528 )
2529 .await
2530 };
2531 self.state = GraphTraverseMainState::LoadingEdges {
2532 future: Box::pin(fut),
2533 buffered,
2534 };
2535 }
2537 Poll::Pending => {
2538 self.state = GraphTraverseMainState::CollectingInput {
2539 input_stream,
2540 buffered,
2541 };
2542 return Poll::Pending;
2543 }
2544 },
2545 GraphTraverseMainState::LoadingEdges {
2546 mut future,
2547 buffered,
2548 } => match future.as_mut().poll(cx) {
2549 Poll::Ready(Ok(adjacency)) => {
2550 self.state = GraphTraverseMainState::Processing {
2552 adjacency,
2553 buffered: buffered.into_iter(),
2554 };
2555 }
2557 Poll::Ready(Err(e)) => {
2558 self.state = GraphTraverseMainState::Done;
2559 return Poll::Ready(Some(Err(e)));
2560 }
2561 Poll::Pending => {
2562 self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2563 return Poll::Pending;
2564 }
2565 },
2566 GraphTraverseMainState::Processing {
2567 adjacency,
2568 mut buffered,
2569 } => {
2570 if let Err(e) = self.graph_ctx.check_timeout() {
2572 return Poll::Ready(Some(Err(
2573 datafusion::error::DataFusionError::Execution(e.to_string()),
2574 )));
2575 }
2576
2577 match buffered.next() {
2578 Some(batch) => {
2579 let result = self.expand_batch(&batch, &adjacency);
2581
2582 self.state = GraphTraverseMainState::Processing {
2583 adjacency,
2584 buffered,
2585 };
2586
2587 if let Ok(ref r) = result {
2588 self.metrics.record_output(r.num_rows());
2589 }
2590 return Poll::Ready(Some(result));
2591 }
2592 None => {
2593 self.state = GraphTraverseMainState::Done;
2594 return Poll::Ready(None);
2595 }
2596 }
2597 }
2598 GraphTraverseMainState::Done => {
2599 return Poll::Ready(None);
2600 }
2601 }
2602 }
2603 }
2604}
2605
2606impl RecordBatchStream for GraphTraverseMainStream {
2607 fn schema(&self) -> SchemaRef {
2608 self.schema.clone()
2609 }
2610}
2611
2612pub struct GraphVariableLengthTraverseExec {
2633 input: Arc<dyn ExecutionPlan>,
2635
2636 source_column: String,
2638
2639 edge_type_ids: Vec<u32>,
2641
2642 direction: Direction,
2644
2645 min_hops: usize,
2647
2648 max_hops: usize,
2650
2651 target_variable: String,
2653
2654 step_variable: Option<String>,
2656
2657 path_variable: Option<String>,
2659
2660 target_properties: Vec<String>,
2662
2663 target_label_name: Option<String>,
2665
2666 is_optional: bool,
2668
2669 bound_target_column: Option<String>,
2671
2672 edge_lance_filter: Option<String>,
2674
2675 edge_property_conditions: Vec<(String, UniValue)>,
2678
2679 used_edge_columns: Vec<String>,
2681
2682 path_mode: super::nfa::PathMode,
2684
2685 output_mode: super::nfa::VlpOutputMode,
2687
2688 nfa: Arc<PathNfa>,
2690
2691 graph_ctx: Arc<GraphExecutionContext>,
2693
2694 schema: SchemaRef,
2696
2697 properties: Arc<PlanProperties>,
2699
2700 metrics: ExecutionPlanMetricsSet,
2702}
2703
2704impl fmt::Debug for GraphVariableLengthTraverseExec {
2705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2706 f.debug_struct("GraphVariableLengthTraverseExec")
2707 .field("source_column", &self.source_column)
2708 .field("edge_type_ids", &self.edge_type_ids)
2709 .field("direction", &self.direction)
2710 .field("min_hops", &self.min_hops)
2711 .field("max_hops", &self.max_hops)
2712 .field("target_variable", &self.target_variable)
2713 .finish()
2714 }
2715}
2716
2717impl GraphVariableLengthTraverseExec {
2718 #[expect(clippy::too_many_arguments)]
2724 pub fn new(
2725 input: Arc<dyn ExecutionPlan>,
2726 source_column: impl Into<String>,
2727 edge_type_ids: Vec<u32>,
2728 direction: Direction,
2729 min_hops: usize,
2730 max_hops: usize,
2731 target_variable: impl Into<String>,
2732 step_variable: Option<String>,
2733 path_variable: Option<String>,
2734 target_properties: Vec<String>,
2735 target_label_name: Option<String>,
2736 graph_ctx: Arc<GraphExecutionContext>,
2737 is_optional: bool,
2738 bound_target_column: Option<String>,
2739 edge_lance_filter: Option<String>,
2740 edge_property_conditions: Vec<(String, UniValue)>,
2741 used_edge_columns: Vec<String>,
2742 path_mode: super::nfa::PathMode,
2743 output_mode: super::nfa::VlpOutputMode,
2744 qpp_nfa: Option<PathNfa>,
2745 ) -> Self {
2746 let source_column = source_column.into();
2747 let target_variable = target_variable.into();
2748
2749 let uni_schema = graph_ctx.storage().schema_manager().schema();
2751 let label_props = target_label_name
2752 .as_deref()
2753 .and_then(|ln| uni_schema.properties.get(ln));
2754
2755 let schema = Self::build_schema(
2757 input.schema(),
2758 &target_variable,
2759 step_variable.as_deref(),
2760 path_variable.as_deref(),
2761 &target_properties,
2762 label_props,
2763 );
2764 let properties = compute_plan_properties(schema.clone());
2765
2766 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2768 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2769 }));
2770
2771 Self {
2772 input,
2773 source_column,
2774 edge_type_ids,
2775 direction,
2776 min_hops,
2777 max_hops,
2778 target_variable,
2779 step_variable,
2780 path_variable,
2781 target_properties,
2782 target_label_name,
2783 is_optional,
2784 bound_target_column,
2785 edge_lance_filter,
2786 edge_property_conditions,
2787 used_edge_columns,
2788 path_mode,
2789 output_mode,
2790 nfa,
2791 graph_ctx,
2792 schema,
2793 properties,
2794 metrics: ExecutionPlanMetricsSet::new(),
2795 }
2796 }
2797
2798 fn build_schema(
2800 input_schema: SchemaRef,
2801 target_variable: &str,
2802 step_variable: Option<&str>,
2803 path_variable: Option<&str>,
2804 target_properties: &[String],
2805 label_props: Option<
2806 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2807 >,
2808 ) -> SchemaRef {
2809 let mut fields: Vec<Field> = input_schema
2810 .fields()
2811 .iter()
2812 .map(|f| f.as_ref().clone())
2813 .collect();
2814
2815 let target_vid_name = format!("{}._vid", target_variable);
2817 if input_schema.column_with_name(&target_vid_name).is_none() {
2818 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2819 }
2820
2821 let target_labels_name = format!("{}._labels", target_variable);
2823 if input_schema.column_with_name(&target_labels_name).is_none() {
2824 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2825 }
2826
2827 for prop_name in target_properties {
2829 let col_name = format!("{}.{}", target_variable, prop_name);
2830 if input_schema.column_with_name(&col_name).is_none() {
2831 let arrow_type = resolve_property_type(prop_name, label_props);
2832 fields.push(Field::new(&col_name, arrow_type, true));
2833 }
2834 }
2835
2836 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2838
2839 if let Some(step_var) = step_variable {
2841 fields.push(build_edge_list_field(step_var));
2842 }
2843
2844 if let Some(path_var) = path_variable
2846 && input_schema.column_with_name(path_var).is_none()
2847 {
2848 fields.push(build_path_struct_field(path_var));
2849 }
2850
2851 Arc::new(Schema::new(fields))
2852 }
2853}
2854
2855impl DisplayAs for GraphVariableLengthTraverseExec {
2856 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2857 write!(
2858 f,
2859 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2860 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2861 )
2862 }
2863}
2864
2865impl ExecutionPlan for GraphVariableLengthTraverseExec {
2866 fn name(&self) -> &str {
2867 "GraphVariableLengthTraverseExec"
2868 }
2869
2870 fn as_any(&self) -> &dyn Any {
2871 self
2872 }
2873
2874 fn schema(&self) -> SchemaRef {
2875 self.schema.clone()
2876 }
2877
2878 fn properties(&self) -> &Arc<PlanProperties> {
2879 &self.properties
2880 }
2881
2882 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2883 vec![&self.input]
2884 }
2885
2886 fn with_new_children(
2887 self: Arc<Self>,
2888 children: Vec<Arc<dyn ExecutionPlan>>,
2889 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2890 if children.len() != 1 {
2891 return Err(datafusion::error::DataFusionError::Plan(
2892 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2893 ));
2894 }
2895
2896 Ok(Arc::new(Self::new(
2898 children[0].clone(),
2899 self.source_column.clone(),
2900 self.edge_type_ids.clone(),
2901 self.direction,
2902 self.min_hops,
2903 self.max_hops,
2904 self.target_variable.clone(),
2905 self.step_variable.clone(),
2906 self.path_variable.clone(),
2907 self.target_properties.clone(),
2908 self.target_label_name.clone(),
2909 self.graph_ctx.clone(),
2910 self.is_optional,
2911 self.bound_target_column.clone(),
2912 self.edge_lance_filter.clone(),
2913 self.edge_property_conditions.clone(),
2914 self.used_edge_columns.clone(),
2915 self.path_mode.clone(),
2916 self.output_mode.clone(),
2917 Some((*self.nfa).clone()),
2918 )))
2919 }
2920
2921 fn execute(
2922 &self,
2923 partition: usize,
2924 context: Arc<TaskContext>,
2925 ) -> DFResult<SendableRecordBatchStream> {
2926 let input_stream = self.input.execute(partition, context)?;
2927
2928 let metrics = BaselineMetrics::new(&self.metrics, partition);
2929
2930 let graph_ctx = self.graph_ctx.clone();
2935 let edge_type_ids = self.edge_type_ids.clone();
2936 let direction = self.direction;
2937 let edge_property_conditions = self.edge_property_conditions.clone();
2938 let warm_fut: Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>> =
2939 Box::pin(async move {
2940 graph_ctx
2941 .ensure_adjacency_warmed(&edge_type_ids, direction)
2942 .await
2943 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2944 build_edge_property_filter(
2945 &graph_ctx,
2946 &edge_type_ids,
2947 direction,
2948 &edge_property_conditions,
2949 )
2950 .await
2951 });
2952
2953 Ok(Box::pin(GraphVariableLengthTraverseStream {
2954 input: input_stream,
2955 exec: Arc::new(self.clone_for_stream()),
2956 schema: self.schema.clone(),
2957 state: VarLengthStreamState::Warming(warm_fut),
2958 edge_property_filter: EidFilter::AllAllowed,
2959 metrics,
2960 }))
2961 }
2962
2963 fn metrics(&self) -> Option<MetricsSet> {
2964 Some(self.metrics.clone_inner())
2965 }
2966}
2967
2968impl GraphVariableLengthTraverseExec {
2969 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2971 GraphVariableLengthTraverseExecData {
2972 source_column: self.source_column.clone(),
2973 edge_type_ids: self.edge_type_ids.clone(),
2974 direction: self.direction,
2975 min_hops: self.min_hops,
2976 max_hops: self.max_hops,
2977 target_variable: self.target_variable.clone(),
2978 step_variable: self.step_variable.clone(),
2979 path_variable: self.path_variable.clone(),
2980 target_properties: self.target_properties.clone(),
2981 target_label_name: self.target_label_name.clone(),
2982 is_optional: self.is_optional,
2983 bound_target_column: self.bound_target_column.clone(),
2984 edge_lance_filter: self.edge_lance_filter.clone(),
2985 edge_property_conditions: self.edge_property_conditions.clone(),
2986 used_edge_columns: self.used_edge_columns.clone(),
2987 path_mode: self.path_mode.clone(),
2988 output_mode: self.output_mode.clone(),
2989 nfa: self.nfa.clone(),
2990 graph_ctx: self.graph_ctx.clone(),
2991 }
2992 }
2993}
2994
2995#[expect(
2997 dead_code,
2998 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2999)]
3000struct GraphVariableLengthTraverseExecData {
3001 source_column: String,
3002 edge_type_ids: Vec<u32>,
3003 direction: Direction,
3004 min_hops: usize,
3005 max_hops: usize,
3006 target_variable: String,
3007 step_variable: Option<String>,
3008 path_variable: Option<String>,
3009 target_properties: Vec<String>,
3010 target_label_name: Option<String>,
3011 is_optional: bool,
3012 bound_target_column: Option<String>,
3013 #[expect(dead_code, reason = "Used in Phase 3 warming")]
3014 edge_lance_filter: Option<String>,
3015 edge_property_conditions: Vec<(String, UniValue)>,
3017 used_edge_columns: Vec<String>,
3018 path_mode: super::nfa::PathMode,
3019 output_mode: super::nfa::VlpOutputMode,
3020 nfa: Arc<PathNfa>,
3021 graph_ctx: Arc<GraphExecutionContext>,
3022}
3023
3024const MAX_FRONTIER_SIZE: usize = 500_000;
3026const MAX_PRED_POOL_SIZE: usize = 2_000_000;
3028
3029impl GraphVariableLengthTraverseExecData {
3030 fn check_target_label(&self, vid: Vid) -> bool {
3032 if let Some(ref label_name) = self.target_label_name {
3033 let query_ctx = self.graph_ctx.query_context();
3034 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3035 Some(labels) => labels.contains(label_name),
3036 None => true, }
3038 } else {
3039 true
3040 }
3041 }
3042
3043 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
3045 match constraint {
3046 super::nfa::VertexConstraint::Label(label_name) => {
3047 let query_ctx = self.graph_ctx.query_context();
3048 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3049 Some(labels) => labels.contains(label_name),
3050 None => true, }
3052 }
3053 }
3054 }
3055
3056 fn expand_neighbors(
3059 &self,
3060 vid: Vid,
3061 state: NfaStateId,
3062 eid_filter: &EidFilter,
3063 used_eids: &FxHashSet<u64>,
3064 ) -> Vec<(Vid, Eid, NfaStateId)> {
3065 let is_undirected = matches!(self.direction, Direction::Both);
3066 let mut results = Vec::new();
3067
3068 for transition in self.nfa.transitions_from(state) {
3069 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
3070
3071 for &etype in &transition.edge_type_ids {
3072 for (neighbor, eid) in
3073 self.graph_ctx
3074 .get_neighbors(vid, etype, transition.direction)
3075 {
3076 if is_undirected && !seen_edges.insert(eid.as_u64()) {
3078 continue;
3079 }
3080
3081 if !eid_filter.contains(eid) {
3089 continue;
3090 }
3091
3092 if used_eids.contains(&eid.as_u64()) {
3094 continue;
3095 }
3096
3097 if let Some(constraint) = self.nfa.state_constraint(transition.to)
3099 && !self.check_state_constraint(neighbor, constraint)
3100 {
3101 continue;
3102 }
3103
3104 results.push((neighbor, eid, transition.to));
3105 }
3106 }
3107 }
3108
3109 results
3110 }
3111
3112 fn bfs_with_dag(
3117 &self,
3118 source: Vid,
3119 eid_filter: &EidFilter,
3120 used_eids: &FxHashSet<u64>,
3121 vid_filter: &VidFilter,
3122 ) -> Vec<BfsResult> {
3123 let nfa = &self.nfa;
3124 let selector = PathSelector::All;
3125 let mut dag = PredecessorDag::new(selector);
3126 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3127
3128 if nfa.is_accepting(nfa.start_state())
3130 && self.check_target_label(source)
3131 && vid_filter.contains(source)
3132 {
3133 accepting.push((source, nfa.start_state(), 0));
3134 }
3135
3136 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3138 let mut depth: u32 = 0;
3139
3140 while !frontier.is_empty() && depth < self.max_hops as u32 {
3141 depth += 1;
3142 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3143 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3144
3145 for &(vid, state) in &frontier {
3146 for (neighbor, eid, dst_state) in
3147 self.expand_neighbors(vid, state, eid_filter, used_eids)
3148 {
3149 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3151
3152 if seen_at_depth.insert((neighbor, dst_state)) {
3154 next_frontier.push((neighbor, dst_state));
3155
3156 if nfa.is_accepting(dst_state)
3158 && self.check_target_label(neighbor)
3159 && vid_filter.contains(neighbor)
3160 {
3161 accepting.push((neighbor, dst_state, depth));
3162 }
3163 }
3164 }
3165 }
3166
3167 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3169 break;
3170 }
3171
3172 frontier = next_frontier;
3173 }
3174
3175 let mut results: Vec<BfsResult> = Vec::new();
3177 for &(target, state, depth) in &accepting {
3178 dag.enumerate_paths(
3179 source,
3180 target,
3181 state,
3182 depth,
3183 depth,
3184 &self.path_mode,
3185 &mut |nodes, edges| {
3186 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3187 std::ops::ControlFlow::Continue(())
3188 },
3189 );
3190 }
3191
3192 results
3193 }
3194
3195 fn bfs_endpoints_only(
3200 &self,
3201 source: Vid,
3202 eid_filter: &EidFilter,
3203 used_eids: &FxHashSet<u64>,
3204 vid_filter: &VidFilter,
3205 ) -> Vec<(Vid, u32)> {
3206 let nfa = &self.nfa;
3207 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
3209 let mut results: Vec<(Vid, u32)> = Vec::new();
3210
3211 if nfa.is_accepting(nfa.start_state())
3213 && self.check_target_label(source)
3214 && vid_filter.contains(source)
3215 {
3216 results.push((source, 0));
3217 }
3218
3219 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3221 let mut depth: u32 = 0;
3222
3223 while !frontier.is_empty() && depth < self.max_hops as u32 {
3224 depth += 1;
3225 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3226 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3227
3228 for &(vid, state) in &frontier {
3229 for (neighbor, eid, dst_state) in
3230 self.expand_neighbors(vid, state, eid_filter, used_eids)
3231 {
3232 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3233
3234 if seen_at_depth.insert((neighbor, dst_state)) {
3235 next_frontier.push((neighbor, dst_state));
3236
3237 if nfa.is_accepting(dst_state)
3239 && self.check_target_label(neighbor)
3240 && vid_filter.contains(neighbor)
3241 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3242 {
3243 results.push((neighbor, depth));
3244 }
3245 }
3246 }
3247 }
3248
3249 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3250 break;
3251 }
3252
3253 frontier = next_frontier;
3254 }
3255
3256 results
3257 }
3258}
3259
3260enum VarLengthStreamState {
3262 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>>),
3267 Reading,
3269 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3271 Done,
3273}
3274
3275struct GraphVariableLengthTraverseStream {
3277 input: SendableRecordBatchStream,
3278 exec: Arc<GraphVariableLengthTraverseExecData>,
3279 schema: SchemaRef,
3280 state: VarLengthStreamState,
3281 edge_property_filter: EidFilter,
3285 metrics: BaselineMetrics,
3286}
3287
3288impl Stream for GraphVariableLengthTraverseStream {
3289 type Item = DFResult<RecordBatch>;
3290
3291 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3292 let metrics = self.metrics.clone();
3293 let _timer = metrics.elapsed_compute().timer();
3294 loop {
3295 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3296
3297 match state {
3298 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3299 Poll::Ready(Ok(filter)) => {
3300 self.edge_property_filter = filter;
3301 self.state = VarLengthStreamState::Reading;
3302 }
3304 Poll::Ready(Err(e)) => {
3305 self.state = VarLengthStreamState::Done;
3306 return Poll::Ready(Some(Err(e)));
3307 }
3308 Poll::Pending => {
3309 self.state = VarLengthStreamState::Warming(fut);
3310 return Poll::Pending;
3311 }
3312 },
3313 VarLengthStreamState::Reading => {
3314 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3316 return Poll::Ready(Some(Err(
3317 datafusion::error::DataFusionError::Execution(e.to_string()),
3318 )));
3319 }
3320
3321 match self.input.poll_next_unpin(cx) {
3322 Poll::Ready(Some(Ok(batch))) => {
3323 let vid_filter = VidFilter::AllAllowed;
3328 let base_result = self.process_batch_base(
3329 batch,
3330 &self.edge_property_filter,
3331 &vid_filter,
3332 );
3333 let base_batch = match base_result {
3334 Ok(b) => b,
3335 Err(e) => {
3336 self.state = VarLengthStreamState::Reading;
3337 return Poll::Ready(Some(Err(e)));
3338 }
3339 };
3340
3341 if self.exec.target_properties.is_empty() {
3343 self.state = VarLengthStreamState::Reading;
3344 return Poll::Ready(Some(Ok(base_batch)));
3345 }
3346
3347 let schema = self.schema.clone();
3349 let target_variable = self.exec.target_variable.clone();
3350 let target_properties = self.exec.target_properties.clone();
3351 let target_label_name = self.exec.target_label_name.clone();
3352 let graph_ctx = self.exec.graph_ctx.clone();
3353
3354 let fut = hydrate_vlp_target_properties(
3355 base_batch,
3356 schema,
3357 target_variable,
3358 target_properties,
3359 target_label_name,
3360 graph_ctx,
3361 );
3362
3363 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3364 }
3366 Poll::Ready(Some(Err(e))) => {
3367 self.state = VarLengthStreamState::Done;
3368 return Poll::Ready(Some(Err(e)));
3369 }
3370 Poll::Ready(None) => {
3371 self.state = VarLengthStreamState::Done;
3372 return Poll::Ready(None);
3373 }
3374 Poll::Pending => {
3375 self.state = VarLengthStreamState::Reading;
3376 return Poll::Pending;
3377 }
3378 }
3379 }
3380 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3381 Poll::Ready(Ok(batch)) => {
3382 self.state = VarLengthStreamState::Reading;
3383 self.metrics.record_output(batch.num_rows());
3384 return Poll::Ready(Some(Ok(batch)));
3385 }
3386 Poll::Ready(Err(e)) => {
3387 self.state = VarLengthStreamState::Done;
3388 return Poll::Ready(Some(Err(e)));
3389 }
3390 Poll::Pending => {
3391 self.state = VarLengthStreamState::Materializing(fut);
3392 return Poll::Pending;
3393 }
3394 },
3395 VarLengthStreamState::Done => {
3396 return Poll::Ready(None);
3397 }
3398 }
3399 }
3400 }
3401}
3402
3403impl GraphVariableLengthTraverseStream {
3404 fn process_batch_base(
3405 &self,
3406 batch: RecordBatch,
3407 eid_filter: &EidFilter,
3408 vid_filter: &VidFilter,
3409 ) -> DFResult<RecordBatch> {
3410 let source_col = batch
3411 .column_by_name(&self.exec.source_column)
3412 .ok_or_else(|| {
3413 datafusion::error::DataFusionError::Execution(format!(
3414 "Source column '{}' not found",
3415 self.exec.source_column
3416 ))
3417 })?;
3418
3419 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3420 let source_vids: &UInt64Array = &source_vid_cow;
3421
3422 let bound_target_cow = self
3424 .exec
3425 .bound_target_column
3426 .as_ref()
3427 .and_then(|col| batch.column_by_name(col))
3428 .map(|c| column_as_vid_array(c.as_ref()))
3429 .transpose()?;
3430 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3431
3432 let used_edge_arrays: Vec<&UInt64Array> = self
3434 .exec
3435 .used_edge_columns
3436 .iter()
3437 .filter_map(|col| {
3438 batch
3439 .column_by_name(col)?
3440 .as_any()
3441 .downcast_ref::<UInt64Array>()
3442 })
3443 .collect();
3444
3445 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3447
3448 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3449 let mut emitted_for_row = false;
3450
3451 if let Some(src) = source_vid {
3452 let vid = Vid::from(src);
3453
3454 let used_eids: FxHashSet<u64> = used_edge_arrays
3456 .iter()
3457 .filter_map(|arr| {
3458 if arr.is_null(row_idx) {
3459 None
3460 } else {
3461 Some(arr.value(row_idx))
3462 }
3463 })
3464 .collect();
3465
3466 match &self.exec.output_mode {
3468 VlpOutputMode::EndpointsOnly => {
3469 let endpoints = self
3470 .exec
3471 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3472 for (target, depth) in endpoints {
3473 if let Some(targets) = expected_targets {
3475 if targets.is_null(row_idx) {
3476 continue;
3477 }
3478 if target.as_u64() != targets.value(row_idx) {
3479 continue;
3480 }
3481 }
3482 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3483 emitted_for_row = true;
3484 }
3485 }
3486 _ => {
3487 let bfs_results = self
3489 .exec
3490 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3491 for (target, hop_count, node_path, edge_path) in bfs_results {
3492 if let Some(targets) = expected_targets {
3494 if targets.is_null(row_idx) {
3495 continue;
3496 }
3497 if target.as_u64() != targets.value(row_idx) {
3498 continue;
3499 }
3500 }
3501 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3502 emitted_for_row = true;
3503 }
3504 }
3505 }
3506 }
3507
3508 if self.exec.is_optional && !emitted_for_row {
3509 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3512 }
3513 }
3514
3515 self.build_output_batch(&batch, &expansions)
3516 }
3517
3518 fn build_output_batch(
3519 &self,
3520 input: &RecordBatch,
3521 expansions: &[VarLengthExpansion],
3522 ) -> DFResult<RecordBatch> {
3523 if expansions.is_empty() {
3524 return Ok(RecordBatch::new_empty(self.schema.clone()));
3525 }
3526
3527 let num_rows = expansions.len();
3528
3529 let indices: Vec<u64> = expansions
3531 .iter()
3532 .map(|(idx, _, _, _, _)| *idx as u64)
3533 .collect();
3534 let indices_array = UInt64Array::from(indices);
3535
3536 let mut columns: Vec<ArrayRef> = Vec::new();
3538 for col in input.columns() {
3539 let expanded = take(col.as_ref(), &indices_array, None)?;
3540 columns.push(expanded);
3541 }
3542
3543 let unmatched_rows: Vec<bool> = expansions
3547 .iter()
3548 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3549 .collect();
3550 let target_vids: Vec<Option<u64>> = expansions
3551 .iter()
3552 .zip(unmatched_rows.iter())
3553 .map(
3554 |((_, vid, _, _, _), unmatched)| {
3555 if *unmatched { None } else { Some(vid.as_u64()) }
3556 },
3557 )
3558 .collect();
3559
3560 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3562 if input.schema().column_with_name(&target_vid_name).is_none() {
3563 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3564 }
3565
3566 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3568 if input
3569 .schema()
3570 .column_with_name(&target_labels_name)
3571 .is_none()
3572 {
3573 use arrow_array::builder::{ListBuilder, StringBuilder};
3574 let query_ctx = self.exec.graph_ctx.query_context();
3575 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3576 for target_vid in &target_vids {
3577 let Some(vid_u64) = target_vid else {
3578 labels_builder.append(false);
3579 continue;
3580 };
3581 let vid = Vid::from(*vid_u64);
3582 let row_labels: Vec<String> =
3583 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3584 Some(labels) => {
3585 labels
3587 }
3588 None => {
3589 if let Some(ref label_name) = self.exec.target_label_name {
3591 vec![label_name.clone()]
3592 } else {
3593 vec![]
3594 }
3595 }
3596 };
3597 let values = labels_builder.values();
3598 for lbl in &row_labels {
3599 values.append_value(lbl);
3600 }
3601 labels_builder.append(true);
3602 }
3603 columns.push(Arc::new(labels_builder.finish()));
3604 }
3605
3606 for prop_name in &self.exec.target_properties {
3608 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3609 if input.schema().column_with_name(&full_prop_name).is_none() {
3610 let col_idx = columns.len();
3611 if col_idx < self.schema.fields().len() {
3612 let field = self.schema.field(col_idx);
3613 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3614 }
3615 }
3616 }
3617
3618 let hop_counts: Vec<u64> = expansions
3620 .iter()
3621 .map(|(_, _, hops, _, _)| *hops as u64)
3622 .collect();
3623 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3624
3625 if self.exec.step_variable.is_some() {
3627 let mut edges_builder = new_edge_list_builder();
3628 let query_ctx = self.exec.graph_ctx.query_context();
3629
3630 for (_, _, _, node_path, edge_path) in expansions {
3631 if node_path.is_empty() && edge_path.is_empty() {
3632 edges_builder.append_null();
3634 } else if edge_path.is_empty() {
3635 edges_builder.append(true);
3637 } else {
3638 for (i, eid) in edge_path.iter().enumerate() {
3639 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3640 .unwrap_or_else(|| "UNKNOWN".to_string());
3641 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3646 *eid,
3647 node_path[i],
3648 node_path[i + 1],
3649 &self.exec.edge_type_ids,
3650 );
3651 append_edge_to_struct(
3652 edges_builder.values(),
3653 *eid,
3654 &type_name,
3655 src,
3656 dst,
3657 &query_ctx,
3658 );
3659 }
3660 edges_builder.append(true);
3661 }
3662 }
3663
3664 columns.push(Arc::new(edges_builder.finish()));
3665 }
3666
3667 if let Some(path_var_name) = &self.exec.path_variable {
3672 let existing_path_col_idx = input
3673 .schema()
3674 .column_with_name(path_var_name)
3675 .map(|(idx, _)| idx);
3676 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3678 let existing_path = existing_path_arc
3679 .as_ref()
3680 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3681
3682 let mut nodes_builder = new_node_list_builder();
3683 let mut rels_builder = new_edge_list_builder();
3684 let query_ctx = self.exec.graph_ctx.query_context();
3685 let mut path_validity = Vec::with_capacity(expansions.len());
3686
3687 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3688 if node_path.is_empty() && edge_path.is_empty() {
3689 nodes_builder.append(false);
3690 rels_builder.append(false);
3691 path_validity.push(false);
3692 continue;
3693 }
3694
3695 let skip_first_vlp_node = if let Some(existing) = existing_path {
3697 if !existing.is_null(row_out_idx) {
3698 prepend_existing_path(
3699 existing,
3700 row_out_idx,
3701 &mut nodes_builder,
3702 &mut rels_builder,
3703 &query_ctx,
3704 );
3705 true
3706 } else {
3707 false
3708 }
3709 } else {
3710 false
3711 };
3712
3713 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3715 for vid in &node_path[start_idx..] {
3716 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3717 }
3718 nodes_builder.append(true);
3719
3720 for (i, eid) in edge_path.iter().enumerate() {
3721 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3722 .unwrap_or_else(|| "UNKNOWN".to_string());
3723 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3728 *eid,
3729 node_path[i],
3730 node_path[i + 1],
3731 &self.exec.edge_type_ids,
3732 );
3733 append_edge_to_struct(
3734 rels_builder.values(),
3735 *eid,
3736 &type_name,
3737 src,
3738 dst,
3739 &query_ctx,
3740 );
3741 }
3742 rels_builder.append(true);
3743 path_validity.push(true);
3744 }
3745
3746 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3748 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3749
3750 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3752 let rels_field = Arc::new(Field::new(
3753 "relationships",
3754 rels_array.data_type().clone(),
3755 true,
3756 ));
3757
3758 let path_struct = arrow_array::StructArray::try_new(
3760 vec![nodes_field, rels_field].into(),
3761 vec![nodes_array, rels_array],
3762 Some(arrow::buffer::NullBuffer::from(path_validity)),
3763 )
3764 .map_err(arrow_err)?;
3765
3766 if let Some(idx) = existing_path_col_idx {
3767 columns[idx] = Arc::new(path_struct);
3768 } else {
3769 columns.push(Arc::new(path_struct));
3770 }
3771 }
3772
3773 self.metrics.record_output(num_rows);
3774
3775 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3776 }
3777}
3778
3779impl RecordBatchStream for GraphVariableLengthTraverseStream {
3780 fn schema(&self) -> SchemaRef {
3781 self.schema.clone()
3782 }
3783}
3784
3785async fn hydrate_vlp_target_properties(
3790 base_batch: RecordBatch,
3791 schema: SchemaRef,
3792 target_variable: String,
3793 target_properties: Vec<String>,
3794 target_label_name: Option<String>,
3795 graph_ctx: Arc<GraphExecutionContext>,
3796) -> DFResult<RecordBatch> {
3797 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3798 return Ok(base_batch);
3799 }
3800
3801 let target_vid_col_name = format!("{}._vid", target_variable);
3808 let vid_col_idx = schema
3809 .fields()
3810 .iter()
3811 .enumerate()
3812 .rev()
3813 .find(|(_, f)| f.name() == &target_vid_col_name)
3814 .map(|(i, _)| i);
3815
3816 let Some(vid_col_idx) = vid_col_idx else {
3817 return Ok(base_batch);
3818 };
3819
3820 let vid_col = base_batch.column(vid_col_idx);
3821 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3822 let target_vid_array: &UInt64Array = &target_vid_cow;
3823
3824 let target_vids: Vec<Vid> = target_vid_array
3825 .iter()
3826 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3829 .collect();
3830
3831 let mut property_columns: Vec<ArrayRef> = Vec::new();
3833
3834 if let Some(ref label_name) = target_label_name {
3835 let property_manager = graph_ctx.property_manager();
3836 let query_ctx = graph_ctx.query_context();
3837
3838 let props_map = property_manager
3839 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3840 .await
3841 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3842
3843 let uni_schema = graph_ctx.storage().schema_manager().schema();
3844 let label_props = uni_schema.properties.get(label_name.as_str());
3845
3846 for prop_name in &target_properties {
3847 let data_type = resolve_property_type(prop_name, label_props);
3848 let column =
3849 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3850 property_columns.push(column);
3851 }
3852 } else {
3853 let non_internal_props: Vec<&str> = target_properties
3856 .iter()
3857 .filter(|p| *p != "_all_props")
3858 .map(|s| s.as_str())
3859 .collect();
3860 let property_manager = graph_ctx.property_manager();
3861 let query_ctx = graph_ctx.query_context();
3862
3863 let props_map = if !non_internal_props.is_empty() {
3864 property_manager
3865 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3866 .await
3867 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3868 } else {
3869 std::collections::HashMap::new()
3870 };
3871
3872 for prop_name in &target_properties {
3873 if prop_name == "_all_props" {
3874 use arrow_array::builder::LargeBinaryBuilder;
3877
3878 let mut builder = LargeBinaryBuilder::new();
3879 let l0_ctx = graph_ctx.l0_context();
3880 for vid in &target_vids {
3881 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3882 if let Some(vid_props) = props_map.get(vid) {
3884 for (k, v) in vid_props.iter() {
3885 merged_props.insert(k.clone(), v.clone());
3886 }
3887 }
3888 for l0 in l0_ctx.iter_l0_buffers() {
3890 let guard = l0.read();
3891 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3892 for (k, v) in l0_props.iter() {
3893 merged_props.insert(k.clone(), v.clone());
3894 }
3895 }
3896 }
3897 if merged_props.is_empty() {
3898 builder.append_null();
3899 } else {
3900 builder.append_value(uni_common::cypher_value_codec::encode(
3901 &uni_common::Value::Map(merged_props),
3902 ));
3903 }
3904 }
3905 property_columns.push(Arc::new(builder.finish()));
3906 } else {
3907 let column = build_property_column_static(
3908 &target_vids,
3909 &props_map,
3910 prop_name,
3911 &arrow::datatypes::DataType::LargeBinary,
3912 )?;
3913 property_columns.push(column);
3914 }
3915 }
3916 }
3917
3918 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3924 let mut prop_idx = 0;
3925 for (col_idx, field) in schema.fields().iter().enumerate() {
3926 let is_target_prop = col_idx > vid_col_idx
3927 && target_properties
3928 .iter()
3929 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3930 if is_target_prop && prop_idx < property_columns.len() {
3931 new_columns.push(property_columns[prop_idx].clone());
3932 prop_idx += 1;
3933 } else {
3934 new_columns.push(base_batch.column(col_idx).clone());
3935 }
3936 }
3937
3938 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3939}
3940
3941pub struct GraphVariableLengthTraverseMainExec {
3951 input: Arc<dyn ExecutionPlan>,
3953
3954 source_column: String,
3956
3957 type_names: Vec<String>,
3959
3960 direction: Direction,
3962
3963 min_hops: usize,
3965
3966 max_hops: usize,
3968
3969 target_variable: String,
3971
3972 step_variable: Option<String>,
3974
3975 path_variable: Option<String>,
3977
3978 target_properties: Vec<String>,
3980
3981 is_optional: bool,
3983
3984 bound_target_column: Option<String>,
3986
3987 edge_lance_filter: Option<String>,
3989
3990 edge_property_conditions: Vec<(String, UniValue)>,
3993
3994 used_edge_columns: Vec<String>,
3996
3997 path_mode: super::nfa::PathMode,
3999
4000 output_mode: super::nfa::VlpOutputMode,
4002
4003 graph_ctx: Arc<GraphExecutionContext>,
4005
4006 schema: SchemaRef,
4008
4009 properties: Arc<PlanProperties>,
4011
4012 metrics: ExecutionPlanMetricsSet,
4014}
4015
4016impl fmt::Debug for GraphVariableLengthTraverseMainExec {
4017 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4018 f.debug_struct("GraphVariableLengthTraverseMainExec")
4019 .field("source_column", &self.source_column)
4020 .field("type_names", &self.type_names)
4021 .field("direction", &self.direction)
4022 .field("min_hops", &self.min_hops)
4023 .field("max_hops", &self.max_hops)
4024 .field("target_variable", &self.target_variable)
4025 .finish()
4026 }
4027}
4028
4029impl GraphVariableLengthTraverseMainExec {
4030 #[expect(clippy::too_many_arguments)]
4032 pub fn new(
4033 input: Arc<dyn ExecutionPlan>,
4034 source_column: impl Into<String>,
4035 type_names: Vec<String>,
4036 direction: Direction,
4037 min_hops: usize,
4038 max_hops: usize,
4039 target_variable: impl Into<String>,
4040 step_variable: Option<String>,
4041 path_variable: Option<String>,
4042 target_properties: Vec<String>,
4043 graph_ctx: Arc<GraphExecutionContext>,
4044 is_optional: bool,
4045 bound_target_column: Option<String>,
4046 edge_lance_filter: Option<String>,
4047 edge_property_conditions: Vec<(String, UniValue)>,
4048 used_edge_columns: Vec<String>,
4049 path_mode: super::nfa::PathMode,
4050 output_mode: super::nfa::VlpOutputMode,
4051 ) -> Self {
4052 let source_column = source_column.into();
4053 let target_variable = target_variable.into();
4054
4055 let schema = Self::build_schema(
4057 input.schema(),
4058 &target_variable,
4059 step_variable.as_deref(),
4060 path_variable.as_deref(),
4061 &target_properties,
4062 );
4063 let properties = compute_plan_properties(schema.clone());
4064
4065 Self {
4066 input,
4067 source_column,
4068 type_names,
4069 direction,
4070 min_hops,
4071 max_hops,
4072 target_variable,
4073 step_variable,
4074 path_variable,
4075 target_properties,
4076 is_optional,
4077 bound_target_column,
4078 edge_lance_filter,
4079 edge_property_conditions,
4080 used_edge_columns,
4081 path_mode,
4082 output_mode,
4083 graph_ctx,
4084 schema,
4085 properties,
4086 metrics: ExecutionPlanMetricsSet::new(),
4087 }
4088 }
4089
4090 fn build_schema(
4092 input_schema: SchemaRef,
4093 target_variable: &str,
4094 step_variable: Option<&str>,
4095 path_variable: Option<&str>,
4096 target_properties: &[String],
4097 ) -> SchemaRef {
4098 let mut fields: Vec<Field> = input_schema
4099 .fields()
4100 .iter()
4101 .map(|f| f.as_ref().clone())
4102 .collect();
4103
4104 let target_vid_name = format!("{}._vid", target_variable);
4106 if input_schema.column_with_name(&target_vid_name).is_none() {
4107 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4108 }
4109
4110 let target_labels_name = format!("{}._labels", target_variable);
4112 if input_schema.column_with_name(&target_labels_name).is_none() {
4113 fields.push(Field::new(target_labels_name, labels_data_type(), true));
4114 }
4115
4116 fields.push(Field::new("_hop_count", DataType::UInt64, false));
4118
4119 if let Some(step_var) = step_variable {
4122 fields.push(build_edge_list_field(step_var));
4123 }
4124
4125 if let Some(path_var) = path_variable
4127 && input_schema.column_with_name(path_var).is_none()
4128 {
4129 fields.push(build_path_struct_field(path_var));
4130 }
4131
4132 for prop in target_properties {
4135 let prop_name = format!("{}.{}", target_variable, prop);
4136 if input_schema.column_with_name(&prop_name).is_none() {
4137 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4138 }
4139 }
4140
4141 Arc::new(Schema::new(fields))
4142 }
4143}
4144
4145impl DisplayAs for GraphVariableLengthTraverseMainExec {
4146 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4147 write!(
4148 f,
4149 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4150 self.source_column, self.type_names, self.min_hops, self.max_hops
4151 )
4152 }
4153}
4154
4155impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4156 fn name(&self) -> &str {
4157 "GraphVariableLengthTraverseMainExec"
4158 }
4159
4160 fn as_any(&self) -> &dyn Any {
4161 self
4162 }
4163
4164 fn schema(&self) -> SchemaRef {
4165 self.schema.clone()
4166 }
4167
4168 fn properties(&self) -> &Arc<PlanProperties> {
4169 &self.properties
4170 }
4171
4172 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4173 vec![&self.input]
4174 }
4175
4176 fn with_new_children(
4177 self: Arc<Self>,
4178 children: Vec<Arc<dyn ExecutionPlan>>,
4179 ) -> DFResult<Arc<dyn ExecutionPlan>> {
4180 if children.len() != 1 {
4181 return Err(datafusion::error::DataFusionError::Plan(
4182 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4183 ));
4184 }
4185
4186 Ok(Arc::new(Self::new(
4187 children[0].clone(),
4188 self.source_column.clone(),
4189 self.type_names.clone(),
4190 self.direction,
4191 self.min_hops,
4192 self.max_hops,
4193 self.target_variable.clone(),
4194 self.step_variable.clone(),
4195 self.path_variable.clone(),
4196 self.target_properties.clone(),
4197 self.graph_ctx.clone(),
4198 self.is_optional,
4199 self.bound_target_column.clone(),
4200 self.edge_lance_filter.clone(),
4201 self.edge_property_conditions.clone(),
4202 self.used_edge_columns.clone(),
4203 self.path_mode.clone(),
4204 self.output_mode.clone(),
4205 )))
4206 }
4207
4208 fn execute(
4209 &self,
4210 partition: usize,
4211 context: Arc<TaskContext>,
4212 ) -> DFResult<SendableRecordBatchStream> {
4213 let input_stream = self.input.execute(partition, context)?;
4214 let metrics = BaselineMetrics::new(&self.metrics, partition);
4215
4216 let graph_ctx = self.graph_ctx.clone();
4218 let type_names = self.type_names.clone();
4219 let direction = self.direction;
4220 let load_fut = async move {
4221 build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4223 };
4224
4225 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4226 input: input_stream,
4227 source_column: self.source_column.clone(),
4228 type_names: self.type_names.clone(),
4229 direction: self.direction,
4230 min_hops: self.min_hops,
4231 max_hops: self.max_hops,
4232 target_variable: self.target_variable.clone(),
4233 step_variable: self.step_variable.clone(),
4234 path_variable: self.path_variable.clone(),
4235 target_properties: self.target_properties.clone(),
4236 graph_ctx: self.graph_ctx.clone(),
4237 is_optional: self.is_optional,
4238 bound_target_column: self.bound_target_column.clone(),
4239 edge_lance_filter: self.edge_lance_filter.clone(),
4240 edge_property_conditions: self.edge_property_conditions.clone(),
4241 used_edge_columns: self.used_edge_columns.clone(),
4242 path_mode: self.path_mode.clone(),
4243 output_mode: self.output_mode.clone(),
4244 schema: self.schema.clone(),
4245 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4246 metrics,
4247 }))
4248 }
4249
4250 fn metrics(&self) -> Option<MetricsSet> {
4251 Some(self.metrics.clone_inner())
4252 }
4253}
4254
4255enum VarLengthMainStreamState {
4257 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4259 Processing(EdgeAdjacencyMap),
4261 Materializing {
4263 adjacency: EdgeAdjacencyMap,
4264 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4265 },
4266 Done,
4268}
4269
4270#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4272struct GraphVariableLengthTraverseMainStream {
4273 input: SendableRecordBatchStream,
4274 source_column: String,
4275 type_names: Vec<String>,
4276 direction: Direction,
4277 min_hops: usize,
4278 max_hops: usize,
4279 target_variable: String,
4280 step_variable: Option<String>,
4282 path_variable: Option<String>,
4283 target_properties: Vec<String>,
4284 graph_ctx: Arc<GraphExecutionContext>,
4285 is_optional: bool,
4286 bound_target_column: Option<String>,
4287 edge_lance_filter: Option<String>,
4288 edge_property_conditions: Vec<(String, UniValue)>,
4290 used_edge_columns: Vec<String>,
4291 path_mode: super::nfa::PathMode,
4292 output_mode: super::nfa::VlpOutputMode,
4293 schema: SchemaRef,
4294 state: VarLengthMainStreamState,
4295 metrics: BaselineMetrics,
4296}
4297
4298type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4300
4301impl GraphVariableLengthTraverseMainStream {
4302 fn bfs(
4308 &self,
4309 source: Vid,
4310 adjacency: &EdgeAdjacencyMap,
4311 used_eids: &FxHashSet<u64>,
4312 ) -> Vec<MainBfsResult> {
4313 let mut results = Vec::new();
4314 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4315
4316 queue.push_back((source, 0, vec![source], vec![]));
4317
4318 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4319 if depth >= self.min_hops && depth <= self.max_hops {
4321 results.push((current, depth, node_path.clone(), edge_path.clone()));
4322 }
4323
4324 if depth >= self.max_hops {
4326 continue;
4327 }
4328
4329 if let Some(neighbors) = adjacency.get(¤t) {
4331 let is_undirected = matches!(self.direction, Direction::Both);
4332 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4333
4334 for (neighbor, eid, _edge_type, props) in neighbors {
4335 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4337 continue;
4338 }
4339
4340 if edge_path.contains(eid) {
4342 continue;
4343 }
4344
4345 if used_eids.contains(&eid.as_u64()) {
4348 continue;
4349 }
4350
4351 if !self.edge_property_conditions.is_empty() {
4353 let passes =
4354 self.edge_property_conditions
4355 .iter()
4356 .all(|(name, expected)| {
4357 props.get(name).is_some_and(|actual| actual == expected)
4358 });
4359 if !passes {
4360 continue;
4361 }
4362 }
4363
4364 let mut new_node_path = node_path.clone();
4365 new_node_path.push(*neighbor);
4366 let mut new_edge_path = edge_path.clone();
4367 new_edge_path.push(*eid);
4368 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4369 }
4370 }
4371 }
4372
4373 results
4374 }
4375
4376 fn process_batch(
4378 &self,
4379 batch: RecordBatch,
4380 adjacency: &EdgeAdjacencyMap,
4381 ) -> DFResult<RecordBatch> {
4382 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4383 datafusion::error::DataFusionError::Execution(format!(
4384 "Source column '{}' not found in input batch",
4385 self.source_column
4386 ))
4387 })?;
4388
4389 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4390 let source_vids: &UInt64Array = &source_vid_cow;
4391
4392 let bound_target_cow = self
4394 .bound_target_column
4395 .as_ref()
4396 .and_then(|col| batch.column_by_name(col))
4397 .map(|c| column_as_vid_array(c.as_ref()))
4398 .transpose()?;
4399 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4400
4401 let used_edge_arrays: Vec<&UInt64Array> = self
4403 .used_edge_columns
4404 .iter()
4405 .filter_map(|col| {
4406 batch
4407 .column_by_name(col)?
4408 .as_any()
4409 .downcast_ref::<UInt64Array>()
4410 })
4411 .collect();
4412
4413 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4415
4416 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4417 let mut emitted_for_row = false;
4418
4419 if let Some(source_u64) = source_opt {
4420 let source = Vid::from(source_u64);
4421
4422 let used_eids: FxHashSet<u64> = used_edge_arrays
4424 .iter()
4425 .filter_map(|arr| {
4426 if arr.is_null(row_idx) {
4427 None
4428 } else {
4429 Some(arr.value(row_idx))
4430 }
4431 })
4432 .collect();
4433
4434 let bfs_results = self.bfs(source, adjacency, &used_eids);
4435
4436 for (target, hops, node_path, edge_path) in bfs_results {
4437 if let Some(targets) = expected_targets {
4440 if targets.is_null(row_idx) {
4441 continue;
4442 }
4443 let expected_vid = targets.value(row_idx);
4444 if target.as_u64() != expected_vid {
4445 continue;
4446 }
4447 }
4448
4449 expansions.push((row_idx, target, hops, node_path, edge_path));
4450 emitted_for_row = true;
4451 }
4452 }
4453
4454 if self.is_optional && !emitted_for_row {
4455 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4457 }
4458 }
4459
4460 if expansions.is_empty() {
4461 if self.is_optional {
4462 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4463 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4464 }
4465 return Ok(RecordBatch::new_empty(self.schema.clone()));
4466 }
4467
4468 let num_rows = expansions.len();
4469 self.metrics.record_output(num_rows);
4470
4471 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4473
4474 let edge_type_ids: Vec<u32> = {
4478 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4479 self.type_names
4480 .iter()
4481 .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4482 .collect()
4483 };
4484
4485 for col_idx in 0..batch.num_columns() {
4487 let array = batch.column(col_idx);
4488 let indices: Vec<u64> = expansions
4489 .iter()
4490 .map(|(idx, _, _, _, _)| *idx as u64)
4491 .collect();
4492 let take_indices = UInt64Array::from(indices);
4493 let expanded = arrow::compute::take(array, &take_indices, None)?;
4494 columns.push(expanded);
4495 }
4496
4497 let target_vid_name = format!("{}._vid", self.target_variable);
4499 if batch.schema().column_with_name(&target_vid_name).is_none() {
4500 let target_vids: Vec<Option<u64>> = expansions
4501 .iter()
4502 .map(|(_, vid, _, node_path, edge_path)| {
4503 if node_path.is_empty() && edge_path.is_empty() {
4504 None
4505 } else {
4506 Some(vid.as_u64())
4507 }
4508 })
4509 .collect();
4510 columns.push(Arc::new(UInt64Array::from(target_vids)));
4511 }
4512
4513 let target_labels_name = format!("{}._labels", self.target_variable);
4515 if batch
4516 .schema()
4517 .column_with_name(&target_labels_name)
4518 .is_none()
4519 {
4520 use arrow_array::builder::{ListBuilder, StringBuilder};
4521 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4522 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4523 if node_path.is_empty() && edge_path.is_empty() {
4524 labels_builder.append(false);
4525 continue;
4526 }
4527 let mut row_labels: Vec<String> = Vec::new();
4528 let labels =
4529 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4530 for lbl in &labels {
4531 if !row_labels.contains(lbl) {
4532 row_labels.push(lbl.clone());
4533 }
4534 }
4535 let values = labels_builder.values();
4536 for lbl in &row_labels {
4537 values.append_value(lbl);
4538 }
4539 labels_builder.append(true);
4540 }
4541 columns.push(Arc::new(labels_builder.finish()));
4542 }
4543
4544 let hop_counts: Vec<u64> = expansions
4546 .iter()
4547 .map(|(_, _, hops, _, _)| *hops as u64)
4548 .collect();
4549 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4550
4551 if self.step_variable.is_some() {
4553 let mut edges_builder = new_edge_list_builder();
4554 let query_ctx = self.graph_ctx.query_context();
4555 let type_names_str = self.type_names.join("|");
4556
4557 for (_, _, _, node_path, edge_path) in expansions.iter() {
4558 if node_path.is_empty() && edge_path.is_empty() {
4559 edges_builder.append_null();
4560 } else if edge_path.is_empty() {
4561 edges_builder.append(true);
4563 } else {
4564 for (i, eid) in edge_path.iter().enumerate() {
4565 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4570 *eid,
4571 node_path[i],
4572 node_path[i + 1],
4573 &edge_type_ids,
4574 );
4575 append_edge_to_struct(
4576 edges_builder.values(),
4577 *eid,
4578 &type_names_str,
4579 src,
4580 dst,
4581 &query_ctx,
4582 );
4583 }
4584 edges_builder.append(true);
4585 }
4586 }
4587
4588 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4589 }
4590
4591 if let Some(path_var_name) = &self.path_variable {
4595 let existing_path_col_idx = batch
4596 .schema()
4597 .column_with_name(path_var_name)
4598 .map(|(idx, _)| idx);
4599 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4600 let existing_path = existing_path_arc
4601 .as_ref()
4602 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4603
4604 let mut nodes_builder = new_node_list_builder();
4605 let mut rels_builder = new_edge_list_builder();
4606 let query_ctx = self.graph_ctx.query_context();
4607 let type_names_str = self.type_names.join("|");
4608 let mut path_validity = Vec::with_capacity(expansions.len());
4609
4610 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4611 if node_path.is_empty() && edge_path.is_empty() {
4612 nodes_builder.append(false);
4613 rels_builder.append(false);
4614 path_validity.push(false);
4615 continue;
4616 }
4617
4618 let skip_first_vlp_node = if let Some(existing) = existing_path {
4620 if !existing.is_null(row_out_idx) {
4621 prepend_existing_path(
4622 existing,
4623 row_out_idx,
4624 &mut nodes_builder,
4625 &mut rels_builder,
4626 &query_ctx,
4627 );
4628 true
4629 } else {
4630 false
4631 }
4632 } else {
4633 false
4634 };
4635
4636 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4638 for vid in &node_path[start_idx..] {
4639 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4640 }
4641 nodes_builder.append(true);
4642
4643 for (i, eid) in edge_path.iter().enumerate() {
4644 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4649 *eid,
4650 node_path[i],
4651 node_path[i + 1],
4652 &edge_type_ids,
4653 );
4654 append_edge_to_struct(
4655 rels_builder.values(),
4656 *eid,
4657 &type_names_str,
4658 src,
4659 dst,
4660 &query_ctx,
4661 );
4662 }
4663 rels_builder.append(true);
4664 path_validity.push(true);
4665 }
4666
4667 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4669 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4670
4671 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4673 let rels_field = Arc::new(Field::new(
4674 "relationships",
4675 rels_array.data_type().clone(),
4676 true,
4677 ));
4678
4679 let path_struct = arrow_array::StructArray::try_new(
4681 vec![nodes_field, rels_field].into(),
4682 vec![nodes_array, rels_array],
4683 Some(arrow::buffer::NullBuffer::from(path_validity)),
4684 )
4685 .map_err(arrow_err)?;
4686
4687 if let Some(idx) = existing_path_col_idx {
4688 columns[idx] = Arc::new(path_struct);
4689 } else {
4690 columns.push(Arc::new(path_struct));
4691 }
4692 }
4693
4694 for prop_name in &self.target_properties {
4697 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4698 if batch.schema().column_with_name(&full_prop_name).is_none() {
4699 columns.push(arrow_array::new_null_array(
4700 &DataType::LargeBinary,
4701 num_rows,
4702 ));
4703 }
4704 }
4705
4706 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4707 }
4708}
4709
4710impl Stream for GraphVariableLengthTraverseMainStream {
4711 type Item = DFResult<RecordBatch>;
4712
4713 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4714 let metrics = self.metrics.clone();
4715 let _timer = metrics.elapsed_compute().timer();
4716 loop {
4717 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4718
4719 match state {
4720 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4721 Poll::Ready(Ok(adjacency)) => {
4722 self.state = VarLengthMainStreamState::Processing(adjacency);
4723 }
4725 Poll::Ready(Err(e)) => {
4726 self.state = VarLengthMainStreamState::Done;
4727 return Poll::Ready(Some(Err(e)));
4728 }
4729 Poll::Pending => {
4730 self.state = VarLengthMainStreamState::Loading(fut);
4731 return Poll::Pending;
4732 }
4733 },
4734 VarLengthMainStreamState::Processing(adjacency) => {
4735 match self.input.poll_next_unpin(cx) {
4736 Poll::Ready(Some(Ok(batch))) => {
4737 let base_batch = match self.process_batch(batch, &adjacency) {
4738 Ok(b) => b,
4739 Err(e) => {
4740 self.state = VarLengthMainStreamState::Processing(adjacency);
4741 return Poll::Ready(Some(Err(e)));
4742 }
4743 };
4744
4745 if self.target_properties.is_empty() {
4747 self.state = VarLengthMainStreamState::Processing(adjacency);
4748 return Poll::Ready(Some(Ok(base_batch)));
4749 }
4750
4751 let schema = self.schema.clone();
4753 let target_variable = self.target_variable.clone();
4754 let target_properties = self.target_properties.clone();
4755 let graph_ctx = self.graph_ctx.clone();
4756
4757 let fut = hydrate_vlp_target_properties(
4758 base_batch,
4759 schema,
4760 target_variable,
4761 target_properties,
4762 None, graph_ctx,
4764 );
4765
4766 self.state = VarLengthMainStreamState::Materializing {
4767 adjacency,
4768 fut: Box::pin(fut),
4769 };
4770 }
4772 Poll::Ready(Some(Err(e))) => {
4773 self.state = VarLengthMainStreamState::Done;
4774 return Poll::Ready(Some(Err(e)));
4775 }
4776 Poll::Ready(None) => {
4777 self.state = VarLengthMainStreamState::Done;
4778 return Poll::Ready(None);
4779 }
4780 Poll::Pending => {
4781 self.state = VarLengthMainStreamState::Processing(adjacency);
4782 return Poll::Pending;
4783 }
4784 }
4785 }
4786 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4787 match fut.as_mut().poll(cx) {
4788 Poll::Ready(Ok(batch)) => {
4789 self.state = VarLengthMainStreamState::Processing(adjacency);
4790 return Poll::Ready(Some(Ok(batch)));
4791 }
4792 Poll::Ready(Err(e)) => {
4793 self.state = VarLengthMainStreamState::Done;
4794 return Poll::Ready(Some(Err(e)));
4795 }
4796 Poll::Pending => {
4797 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4798 return Poll::Pending;
4799 }
4800 }
4801 }
4802 VarLengthMainStreamState::Done => {
4803 return Poll::Ready(None);
4804 }
4805 }
4806 }
4807 }
4808}
4809
4810impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4811 fn schema(&self) -> SchemaRef {
4812 self.schema.clone()
4813 }
4814}
4815
4816#[cfg(test)]
4817mod tests {
4818 use super::*;
4819
4820 #[test]
4821 fn test_traverse_schema_without_edge() {
4822 let input_schema = Arc::new(Schema::new(vec![Field::new(
4823 "a._vid",
4824 DataType::UInt64,
4825 false,
4826 )]));
4827
4828 let output_schema =
4829 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4830
4831 assert_eq!(output_schema.fields().len(), 4);
4833 assert_eq!(output_schema.field(0).name(), "a._vid");
4834 assert_eq!(output_schema.field(1).name(), "m._vid");
4835 assert_eq!(output_schema.field(2).name(), "m._labels");
4836 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4837 }
4838
4839 #[test]
4840 fn test_traverse_schema_with_edge() {
4841 let input_schema = Arc::new(Schema::new(vec![Field::new(
4842 "a._vid",
4843 DataType::UInt64,
4844 false,
4845 )]));
4846
4847 let output_schema = GraphTraverseExec::build_schema(
4848 input_schema,
4849 "m",
4850 Some("r"),
4851 &[],
4852 &[],
4853 None,
4854 None,
4855 false,
4856 );
4857
4858 assert_eq!(output_schema.fields().len(), 5);
4860 assert_eq!(output_schema.field(0).name(), "a._vid");
4861 assert_eq!(output_schema.field(1).name(), "m._vid");
4862 assert_eq!(output_schema.field(2).name(), "m._labels");
4863 assert_eq!(output_schema.field(3).name(), "r._eid");
4864 assert_eq!(output_schema.field(4).name(), "r._type");
4865 }
4866
4867 #[test]
4868 fn test_traverse_schema_with_target_properties() {
4869 let input_schema = Arc::new(Schema::new(vec![Field::new(
4870 "a._vid",
4871 DataType::UInt64,
4872 false,
4873 )]));
4874
4875 let target_props = vec!["name".to_string(), "age".to_string()];
4876 let output_schema = GraphTraverseExec::build_schema(
4877 input_schema,
4878 "m",
4879 Some("r"),
4880 &[],
4881 &target_props,
4882 None,
4883 None,
4884 false,
4885 );
4886
4887 assert_eq!(output_schema.fields().len(), 7);
4889 assert_eq!(output_schema.field(0).name(), "a._vid");
4890 assert_eq!(output_schema.field(1).name(), "m._vid");
4891 assert_eq!(output_schema.field(2).name(), "m._labels");
4892 assert_eq!(output_schema.field(3).name(), "m.name");
4893 assert_eq!(output_schema.field(4).name(), "m.age");
4894 assert_eq!(output_schema.field(5).name(), "r._eid");
4895 assert_eq!(output_schema.field(6).name(), "r._type");
4896 }
4897
4898 #[test]
4899 fn test_variable_length_schema() {
4900 let input_schema = Arc::new(Schema::new(vec![Field::new(
4901 "a._vid",
4902 DataType::UInt64,
4903 false,
4904 )]));
4905
4906 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4907 input_schema,
4908 "b",
4909 None,
4910 Some("p"),
4911 &[],
4912 None,
4913 );
4914
4915 assert_eq!(output_schema.fields().len(), 5);
4916 assert_eq!(output_schema.field(0).name(), "a._vid");
4917 assert_eq!(output_schema.field(1).name(), "b._vid");
4918 assert_eq!(output_schema.field(2).name(), "b._labels");
4919 assert_eq!(output_schema.field(3).name(), "_hop_count");
4920 assert_eq!(output_schema.field(4).name(), "p");
4921 }
4922
4923 #[test]
4924 fn test_traverse_main_schema_without_edge() {
4925 let input_schema = Arc::new(Schema::new(vec![Field::new(
4926 "a._vid",
4927 DataType::UInt64,
4928 false,
4929 )]));
4930
4931 let output_schema =
4932 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4933
4934 assert_eq!(output_schema.fields().len(), 4);
4936 assert_eq!(output_schema.field(0).name(), "a._vid");
4937 assert_eq!(output_schema.field(1).name(), "m._vid");
4938 assert_eq!(output_schema.field(2).name(), "m._labels");
4939 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4940 }
4941
4942 #[test]
4943 fn test_traverse_main_schema_with_edge() {
4944 let input_schema = Arc::new(Schema::new(vec![Field::new(
4945 "a._vid",
4946 DataType::UInt64,
4947 false,
4948 )]));
4949
4950 let output_schema = GraphTraverseMainExec::build_schema(
4951 &input_schema,
4952 "m",
4953 &Some("r".to_string()),
4954 &[],
4955 &[],
4956 false,
4957 );
4958
4959 assert_eq!(output_schema.fields().len(), 5);
4961 assert_eq!(output_schema.field(0).name(), "a._vid");
4962 assert_eq!(output_schema.field(1).name(), "m._vid");
4963 assert_eq!(output_schema.field(2).name(), "m._labels");
4964 assert_eq!(output_schema.field(3).name(), "r._eid");
4965 assert_eq!(output_schema.field(4).name(), "r._type");
4966 }
4967
4968 #[test]
4969 fn test_traverse_main_schema_with_edge_properties() {
4970 let input_schema = Arc::new(Schema::new(vec![Field::new(
4971 "a._vid",
4972 DataType::UInt64,
4973 false,
4974 )]));
4975
4976 let edge_props = vec!["weight".to_string(), "since".to_string()];
4977 let output_schema = GraphTraverseMainExec::build_schema(
4978 &input_schema,
4979 "m",
4980 &Some("r".to_string()),
4981 &edge_props,
4982 &[],
4983 false,
4984 );
4985
4986 assert_eq!(output_schema.fields().len(), 7);
4988 assert_eq!(output_schema.field(0).name(), "a._vid");
4989 assert_eq!(output_schema.field(1).name(), "m._vid");
4990 assert_eq!(output_schema.field(2).name(), "m._labels");
4991 assert_eq!(output_schema.field(3).name(), "r._eid");
4992 assert_eq!(output_schema.field(4).name(), "r._type");
4993 assert_eq!(output_schema.field(5).name(), "r.weight");
4994 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4995 assert_eq!(output_schema.field(6).name(), "r.since");
4996 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4997 }
4998
4999 #[test]
5000 fn test_traverse_main_schema_with_target_properties() {
5001 let input_schema = Arc::new(Schema::new(vec![Field::new(
5002 "a._vid",
5003 DataType::UInt64,
5004 false,
5005 )]));
5006
5007 let target_props = vec!["name".to_string(), "age".to_string()];
5008 let output_schema = GraphTraverseMainExec::build_schema(
5009 &input_schema,
5010 "m",
5011 &Some("r".to_string()),
5012 &[],
5013 &target_props,
5014 false,
5015 );
5016
5017 assert_eq!(output_schema.fields().len(), 7);
5019 assert_eq!(output_schema.field(0).name(), "a._vid");
5020 assert_eq!(output_schema.field(1).name(), "m._vid");
5021 assert_eq!(output_schema.field(2).name(), "m._labels");
5022 assert_eq!(output_schema.field(3).name(), "r._eid");
5023 assert_eq!(output_schema.field(4).name(), "r._type");
5024 assert_eq!(output_schema.field(5).name(), "m.name");
5025 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5026 assert_eq!(output_schema.field(6).name(), "m.age");
5027 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5028 }
5029}