1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{
42 build_property_column_static, property_field, resolve_property_type,
43};
44use arrow::compute::take;
45use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
46use arrow_schema::{DataType, Field, Schema, SchemaRef};
47use datafusion::common::Result as DFResult;
48use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
49use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
50use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
51use futures::{Stream, StreamExt};
52use fxhash::FxHashSet;
53use std::any::Any;
54use std::collections::{HashMap, HashSet, VecDeque};
55use std::fmt;
56use std::pin::Pin;
57use std::sync::Arc;
58use std::task::{Context, Poll};
59use uni_common::Value as UniValue;
60use uni_common::core::id::{Eid, Vid};
61use uni_store::runtime::l0_visibility;
62use uni_store::storage::direction::Direction;
63
64type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
66
67type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
69
70fn prepend_existing_path(
77 existing_path: &arrow_array::StructArray,
78 row_idx: usize,
79 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
80 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
81 query_ctx: &uni_store::runtime::context::QueryContext,
82) {
83 let nodes_list = existing_path
85 .column(0)
86 .as_any()
87 .downcast_ref::<arrow_array::ListArray>()
88 .unwrap();
89 let node_values = nodes_list.value(row_idx);
90 let node_struct = node_values
91 .as_any()
92 .downcast_ref::<arrow_array::StructArray>()
93 .unwrap();
94 let vid_col = node_struct
95 .column(0)
96 .as_any()
97 .downcast_ref::<UInt64Array>()
98 .unwrap();
99 for i in 0..vid_col.len() {
100 append_node_to_struct(
101 nodes_builder.values(),
102 Vid::from(vid_col.value(i)),
103 query_ctx,
104 );
105 }
106
107 let rels_list = existing_path
109 .column(1)
110 .as_any()
111 .downcast_ref::<arrow_array::ListArray>()
112 .unwrap();
113 let edge_values = rels_list.value(row_idx);
114 let edge_struct = edge_values
115 .as_any()
116 .downcast_ref::<arrow_array::StructArray>()
117 .unwrap();
118 let eid_col = edge_struct
119 .column(0)
120 .as_any()
121 .downcast_ref::<UInt64Array>()
122 .unwrap();
123 let type_col = edge_struct
124 .column(1)
125 .as_any()
126 .downcast_ref::<arrow_array::StringArray>()
127 .unwrap();
128 let src_col = edge_struct
129 .column(2)
130 .as_any()
131 .downcast_ref::<UInt64Array>()
132 .unwrap();
133 let dst_col = edge_struct
134 .column(3)
135 .as_any()
136 .downcast_ref::<UInt64Array>()
137 .unwrap();
138 for i in 0..eid_col.len() {
139 append_edge_to_struct(
140 rels_builder.values(),
141 Eid::from(eid_col.value(i)),
142 type_col.value(i),
143 src_col.value(i),
144 dst_col.value(i),
145 query_ctx,
146 );
147 }
148}
149
150fn resolve_edge_property_type(
155 prop: &str,
156 schema_props: Option<
157 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
158 >,
159) -> DataType {
160 if prop == "overflow_json" {
161 DataType::LargeBinary
162 } else if prop == "_created_at" || prop == "_updated_at" {
163 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
167 } else {
168 schema_props
169 .and_then(|props| props.get(prop))
170 .map(|meta| meta.r#type.to_arrow())
171 .unwrap_or(DataType::LargeBinary)
172 }
173}
174
175use crate::query::df_graph::common::merged_edge_schema_props;
176
177type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
179
180pub struct GraphTraverseExec {
205 input: Arc<dyn ExecutionPlan>,
207
208 source_column: String,
210
211 edge_type_ids: Vec<u32>,
213
214 direction: Direction,
216
217 target_variable: String,
219
220 edge_variable: Option<String>,
222
223 edge_properties: Vec<String>,
225
226 target_properties: Vec<String>,
228
229 target_label_name: Option<String>,
231
232 target_label_id: Option<u16>,
234
235 graph_ctx: Arc<GraphExecutionContext>,
237
238 optional: bool,
240
241 optional_pattern_vars: HashSet<String>,
244
245 bound_target_column: Option<String>,
248
249 used_edge_columns: Vec<String>,
252
253 schema: SchemaRef,
255
256 properties: Arc<PlanProperties>,
258
259 metrics: ExecutionPlanMetricsSet,
261}
262
263impl fmt::Debug for GraphTraverseExec {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 f.debug_struct("GraphTraverseExec")
266 .field("source_column", &self.source_column)
267 .field("edge_type_ids", &self.edge_type_ids)
268 .field("direction", &self.direction)
269 .field("target_variable", &self.target_variable)
270 .field("edge_variable", &self.edge_variable)
271 .finish()
272 }
273}
274
275impl GraphTraverseExec {
276 #[expect(clippy::too_many_arguments)]
292 pub fn new(
293 input: Arc<dyn ExecutionPlan>,
294 source_column: impl Into<String>,
295 edge_type_ids: Vec<u32>,
296 direction: Direction,
297 target_variable: impl Into<String>,
298 edge_variable: Option<String>,
299 edge_properties: Vec<String>,
300 target_properties: Vec<String>,
301 target_label_name: Option<String>,
302 target_label_id: Option<u16>,
303 graph_ctx: Arc<GraphExecutionContext>,
304 optional: bool,
305 optional_pattern_vars: HashSet<String>,
306 bound_target_column: Option<String>,
307 used_edge_columns: Vec<String>,
308 ) -> Self {
309 let source_column = source_column.into();
310 let target_variable = target_variable.into();
311
312 let uni_schema = graph_ctx.storage().schema_manager().schema();
314 let label_props = target_label_name
315 .as_deref()
316 .and_then(|ln| uni_schema.properties.get(ln));
317 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
318 let edge_props = if merged_edge_props.is_empty() {
319 None
320 } else {
321 Some(&merged_edge_props)
322 };
323
324 let schema = Self::build_schema(
326 input.schema(),
327 &target_variable,
328 edge_variable.as_deref(),
329 &edge_properties,
330 &target_properties,
331 label_props,
332 edge_props,
333 optional,
334 );
335
336 let properties = compute_plan_properties(schema.clone());
337
338 Self {
339 input,
340 source_column,
341 edge_type_ids,
342 direction,
343 target_variable,
344 edge_variable,
345 edge_properties,
346 target_properties,
347 target_label_name,
348 target_label_id,
349 graph_ctx,
350 optional,
351 optional_pattern_vars,
352 bound_target_column,
353 used_edge_columns,
354 schema,
355 properties,
356 metrics: ExecutionPlanMetricsSet::new(),
357 }
358 }
359
360 #[expect(
362 clippy::too_many_arguments,
363 reason = "Schema construction needs all field metadata"
364 )]
365 fn build_schema(
366 input_schema: SchemaRef,
367 target_variable: &str,
368 edge_variable: Option<&str>,
369 edge_properties: &[String],
370 target_properties: &[String],
371 label_props: Option<
372 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
373 >,
374 edge_props: Option<
375 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
376 >,
377 optional: bool,
378 ) -> SchemaRef {
379 let mut fields: Vec<Field> = input_schema
380 .fields()
381 .iter()
382 .map(|f| f.as_ref().clone())
383 .collect();
384
385 let target_vid_name = format!("{}._vid", target_variable);
387 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
388
389 fields.push(Field::new(
391 format!("{}._labels", target_variable),
392 labels_data_type(),
393 true,
394 ));
395
396 for prop_name in target_properties {
398 let col_name = format!("{}.{}", target_variable, prop_name);
399 let arrow_type = resolve_property_type(prop_name, label_props);
400 let uni_type = label_props
401 .and_then(|p| p.get(prop_name))
402 .map(|m| &m.r#type);
403 fields.push(property_field(&col_name, arrow_type, uni_type));
404 }
405
406 if let Some(edge_var) = edge_variable {
408 let edge_id_name = format!("{}._eid", edge_var);
409 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
410
411 fields.push(Field::new(
413 format!("{}._type", edge_var),
414 DataType::Utf8,
415 true,
416 ));
417
418 for prop_name in edge_properties {
420 let prop_col_name = format!("{}.{}", edge_var, prop_name);
421 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
422 let uni_type = edge_props.and_then(|p| p.get(prop_name)).map(|m| &m.r#type);
423 fields.push(property_field(&prop_col_name, arrow_type, uni_type));
424 }
425 } else {
426 let internal_eid_name = format!("__eid_to_{}", target_variable);
429 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
430 }
431
432 Arc::new(Schema::new(fields))
433 }
434}
435
436impl DisplayAs for GraphTraverseExec {
437 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 write!(
439 f,
440 "GraphTraverseExec: {} --[{:?}]--> {}",
441 self.source_column, self.edge_type_ids, self.target_variable
442 )?;
443 if let Some(ref edge_var) = self.edge_variable {
444 write!(f, " as {}", edge_var)?;
445 }
446 Ok(())
447 }
448}
449
450impl ExecutionPlan for GraphTraverseExec {
451 fn name(&self) -> &str {
452 "GraphTraverseExec"
453 }
454
455 fn as_any(&self) -> &dyn Any {
456 self
457 }
458
459 fn schema(&self) -> SchemaRef {
460 self.schema.clone()
461 }
462
463 fn properties(&self) -> &Arc<PlanProperties> {
464 &self.properties
465 }
466
467 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
468 vec![&self.input]
469 }
470
471 fn with_new_children(
472 self: Arc<Self>,
473 children: Vec<Arc<dyn ExecutionPlan>>,
474 ) -> DFResult<Arc<dyn ExecutionPlan>> {
475 if children.len() != 1 {
476 return Err(datafusion::error::DataFusionError::Plan(
477 "GraphTraverseExec requires exactly one child".to_string(),
478 ));
479 }
480
481 Ok(Arc::new(Self::new(
482 children[0].clone(),
483 self.source_column.clone(),
484 self.edge_type_ids.clone(),
485 self.direction,
486 self.target_variable.clone(),
487 self.edge_variable.clone(),
488 self.edge_properties.clone(),
489 self.target_properties.clone(),
490 self.target_label_name.clone(),
491 self.target_label_id,
492 self.graph_ctx.clone(),
493 self.optional,
494 self.optional_pattern_vars.clone(),
495 self.bound_target_column.clone(),
496 self.used_edge_columns.clone(),
497 )))
498 }
499
500 fn execute(
501 &self,
502 partition: usize,
503 context: Arc<TaskContext>,
504 ) -> DFResult<SendableRecordBatchStream> {
505 let input_stream = self.input.execute(partition, context)?;
506
507 let metrics = BaselineMetrics::new(&self.metrics, partition);
508
509 let warm_fut = self
510 .graph_ctx
511 .warming_future(self.edge_type_ids.clone(), self.direction);
512
513 Ok(Box::pin(GraphTraverseStream {
514 input: input_stream,
515 source_column: self.source_column.clone(),
516 edge_type_ids: self.edge_type_ids.clone(),
517 direction: self.direction,
518 target_variable: self.target_variable.clone(),
519 edge_variable: self.edge_variable.clone(),
520 edge_properties: self.edge_properties.clone(),
521 target_properties: self.target_properties.clone(),
522 target_label_name: self.target_label_name.clone(),
523 graph_ctx: self.graph_ctx.clone(),
524 optional: self.optional,
525 optional_pattern_vars: self.optional_pattern_vars.clone(),
526 bound_target_column: self.bound_target_column.clone(),
527 used_edge_columns: self.used_edge_columns.clone(),
528 schema: self.schema.clone(),
529 state: TraverseStreamState::Warming(warm_fut),
530 metrics,
531 }))
532 }
533
534 fn metrics(&self) -> Option<MetricsSet> {
535 Some(self.metrics.clone_inner())
536 }
537}
538
539enum TraverseStreamState {
541 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
543 Reading,
545 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
547 Done,
549}
550
551struct GraphTraverseStream {
553 input: SendableRecordBatchStream,
555
556 source_column: String,
558
559 edge_type_ids: Vec<u32>,
561
562 direction: Direction,
564
565 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
567 target_variable: String,
568
569 edge_variable: Option<String>,
571
572 edge_properties: Vec<String>,
574
575 target_properties: Vec<String>,
577
578 target_label_name: Option<String>,
580
581 graph_ctx: Arc<GraphExecutionContext>,
583
584 optional: bool,
586
587 optional_pattern_vars: HashSet<String>,
589
590 bound_target_column: Option<String>,
592
593 used_edge_columns: Vec<String>,
595
596 schema: SchemaRef,
598
599 state: TraverseStreamState,
601
602 metrics: BaselineMetrics,
604}
605
606impl GraphTraverseStream {
607 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
610 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
611 datafusion::error::DataFusionError::Execution(format!(
612 "Source column '{}' not found",
613 self.source_column
614 ))
615 })?;
616
617 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
618 let source_vids: &UInt64Array = &source_vid_cow;
619
620 let bound_target_cow = self
623 .bound_target_column
624 .as_ref()
625 .and_then(|col| batch.column_by_name(col))
626 .map(|c| column_as_vid_array(c.as_ref()))
627 .transpose()?;
628 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
629
630 let used_edge_arrays: Vec<&UInt64Array> = self
632 .used_edge_columns
633 .iter()
634 .filter_map(|col| {
635 batch
636 .column_by_name(col)
637 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
638 })
639 .collect();
640
641 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
642 let is_undirected = matches!(self.direction, Direction::Both);
643
644 for (row_idx, source_vid) in source_vids.iter().enumerate() {
645 let Some(src) = source_vid else {
646 continue;
647 };
648
649 let expected_target = bound_target_vids.map(|arr| {
655 if arr.is_null(row_idx) {
656 None
657 } else {
658 Some(arr.value(row_idx))
659 }
660 });
661
662 let used_eids: HashSet<u64> = used_edge_arrays
664 .iter()
665 .filter_map(|arr| {
666 if arr.is_null(row_idx) {
667 None
668 } else {
669 Some(arr.value(row_idx))
670 }
671 })
672 .collect();
673
674 let vid = Vid::from(src);
675 let mut seen_edges: HashSet<u64> = HashSet::new();
678
679 for &edge_type in &self.edge_type_ids {
680 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
681
682 for (target_vid, eid) in neighbors {
683 let eid_u64 = eid.as_u64();
684
685 if used_eids.contains(&eid_u64) {
687 continue;
688 }
689
690 if is_undirected && !seen_edges.insert(eid_u64) {
692 continue;
693 }
694
695 if let Some(expected_opt) = expected_target {
698 let Some(expected) = expected_opt else {
699 continue;
700 };
701 if target_vid.as_u64() != expected {
702 continue;
703 }
704 }
705
706 if let Some(ref label_name) = self.target_label_name {
711 let query_ctx = self.graph_ctx.query_context();
712 if let Some(vertex_labels) =
713 self.graph_ctx.resolve_vertex_labels(target_vid, &query_ctx)
714 {
715 if !vertex_labels.contains(label_name) {
717 continue;
718 }
719 }
720 }
722
723 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
724 }
725 }
726 }
727
728 Ok(expanded_rows)
729 }
730}
731
732fn build_target_labels_column(
739 target_vids: &[Vid],
740 target_label_name: &Option<String>,
741 graph_ctx: &GraphExecutionContext,
742) -> ArrayRef {
743 use arrow_array::builder::{ListBuilder, StringBuilder};
744 let mut labels_builder = ListBuilder::new(StringBuilder::new());
745 let query_ctx = graph_ctx.query_context();
746 for vid in target_vids {
747 let row_labels: Vec<String> = match graph_ctx.resolve_vertex_labels(*vid, &query_ctx) {
748 Some(labels) => labels,
749 None => {
750 if let Some(label_name) = target_label_name {
753 vec![label_name.clone()]
754 } else {
755 vec![]
756 }
757 }
758 };
759 let values = labels_builder.values();
760 for lbl in &row_labels {
761 values.append_value(lbl);
762 }
763 labels_builder.append(true);
764 }
765 Arc::new(labels_builder.finish())
766}
767
768async fn build_target_property_columns(
770 target_vids: &[Vid],
771 target_properties: &[String],
772 target_label_name: &Option<String>,
773 graph_ctx: &Arc<GraphExecutionContext>,
774) -> DFResult<Vec<ArrayRef>> {
775 let mut columns = Vec::new();
776
777 if let Some(label_name) = target_label_name {
778 let property_manager = graph_ctx.property_manager();
779 let query_ctx = graph_ctx.query_context();
780
781 let props_map = property_manager
782 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
783 .await
784 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
785
786 let uni_schema = graph_ctx.storage().schema_manager().schema();
787 let label_props = uni_schema.properties.get(label_name.as_str());
788
789 for prop_name in target_properties {
790 let data_type = resolve_property_type(prop_name, label_props);
791 let column =
792 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
793 columns.push(column);
794 }
795 } else {
796 let non_internal_props: Vec<&str> = target_properties
798 .iter()
799 .filter(|p| *p != "_all_props")
800 .map(|s| s.as_str())
801 .collect();
802 let property_manager = graph_ctx.property_manager();
803 let query_ctx = graph_ctx.query_context();
804
805 let props_map = if !non_internal_props.is_empty() {
806 property_manager
807 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
808 .await
809 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
810 } else {
811 std::collections::HashMap::new()
812 };
813
814 for prop_name in target_properties {
815 if prop_name == "_all_props" {
816 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
817 } else {
818 let column = build_property_column_static(
819 target_vids,
820 &props_map,
821 prop_name,
822 &arrow::datatypes::DataType::LargeBinary,
823 )?;
824 columns.push(column);
825 }
826 }
827 }
828
829 Ok(columns)
830}
831
832fn build_all_props_column(
834 target_vids: &[Vid],
835 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
836 graph_ctx: &Arc<GraphExecutionContext>,
837) -> ArrayRef {
838 use arrow_array::builder::LargeBinaryBuilder;
839
840 let mut builder = LargeBinaryBuilder::new();
841 let l0_ctx = graph_ctx.l0_context();
842 for vid in target_vids {
843 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
845 if let Some(vid_props) = props_map.get(vid) {
846 for (k, v) in vid_props.iter() {
847 merged_props.insert(k.clone(), v.clone());
848 }
849 }
850 for l0 in l0_ctx.iter_l0_buffers() {
851 let guard = l0.read();
852 if let Some(l0_props) = guard.vertex_properties.get(vid) {
853 for (k, v) in l0_props.iter() {
854 merged_props.insert(k.clone(), v.clone());
855 }
856 }
857 }
858 if merged_props.is_empty() {
859 builder.append_null();
860 } else {
861 builder.append_value(uni_common::cypher_value_codec::encode(
862 &uni_common::Value::Map(merged_props),
863 ));
864 }
865 }
866 Arc::new(builder.finish())
867}
868
869async fn build_edge_columns(
871 expansions: &[(usize, Vid, u64, u32)],
872 edge_properties: &[String],
873 edge_type_ids: &[u32],
874 graph_ctx: &Arc<GraphExecutionContext>,
875) -> DFResult<Vec<ArrayRef>> {
876 let mut columns = Vec::new();
877
878 let eids: Vec<Eid> = expansions
879 .iter()
880 .map(|(_, _, eid, _)| Eid::from(*eid))
881 .collect();
882 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
883 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
884
885 {
887 let uni_schema = graph_ctx.storage().schema_manager().schema();
888 let mut type_builder = arrow_array::builder::StringBuilder::new();
889 for (_, _, _, edge_type_id) in expansions {
890 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
891 type_builder.append_value(&name);
892 } else {
893 type_builder.append_null();
894 }
895 }
896 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
897 }
898
899 if !edge_properties.is_empty() {
900 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
901 let property_manager = graph_ctx.property_manager();
902 let query_ctx = graph_ctx.query_context();
903
904 let props_map = property_manager
905 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
906 .await
907 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
908
909 let uni_schema = graph_ctx.storage().schema_manager().schema();
910 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
911 let edge_type_props = if merged_edge_props.is_empty() {
912 None
913 } else {
914 Some(&merged_edge_props)
915 };
916
917 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
918
919 for prop_name in edge_properties {
920 if prop_name == "_created_at" || prop_name == "_updated_at" {
926 let mut builder =
927 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
928 let l0_ctx = graph_ctx.l0_context();
929 for eid in &eids {
930 let mut value: Option<i64> = None;
931 for l0 in l0_ctx.iter_l0_buffers() {
932 let guard = l0.read();
933 let opt = if prop_name == "_created_at" {
934 guard.edge_created_at.get(eid).copied()
935 } else {
936 guard.edge_updated_at.get(eid).copied()
937 };
938 if let Some(ts) = opt {
939 value = Some(match value {
940 None => ts,
941 Some(cur) if prop_name == "_created_at" => cur.min(ts),
942 Some(cur) => cur.max(ts),
943 });
944 }
945 }
946 match value {
947 Some(ts) => builder.append_value(ts),
948 None => builder.append_null(),
949 }
950 }
951 columns.push(Arc::new(builder.finish()) as ArrayRef);
952 continue;
953 }
954 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
955 let column =
956 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
957 columns.push(column);
958 }
959 }
960
961 Ok(columns)
962}
963
964#[expect(
969 clippy::too_many_arguments,
970 reason = "Standalone async fn needs all context passed explicitly"
971)]
972async fn build_traverse_output_batch(
973 input: RecordBatch,
974 expansions: Vec<(usize, Vid, u64, u32)>,
975 schema: SchemaRef,
976 edge_variable: Option<String>,
977 edge_properties: Vec<String>,
978 edge_type_ids: Vec<u32>,
979 target_properties: Vec<String>,
980 target_label_name: Option<String>,
981 graph_ctx: Arc<GraphExecutionContext>,
982 optional: bool,
983 optional_pattern_vars: HashSet<String>,
984) -> DFResult<RecordBatch> {
985 if expansions.is_empty() {
986 if !optional {
987 return Ok(RecordBatch::new_empty(schema));
988 }
989 let unmatched_reps = collect_unmatched_optional_group_rows(
990 &input,
991 &HashSet::new(),
992 &schema,
993 &optional_pattern_vars,
994 )?;
995 if unmatched_reps.is_empty() {
996 return Ok(RecordBatch::new_empty(schema));
997 }
998 return build_optional_null_batch_for_rows_with_optional_vars(
999 &input,
1000 &unmatched_reps,
1001 &schema,
1002 &optional_pattern_vars,
1003 );
1004 }
1005
1006 let indices: Vec<u64> = expansions
1008 .iter()
1009 .map(|(idx, _, _, _)| *idx as u64)
1010 .collect();
1011 let indices_array = UInt64Array::from(indices);
1012 let mut columns: Vec<ArrayRef> = input
1013 .columns()
1014 .iter()
1015 .map(|col| take(col.as_ref(), &indices_array, None))
1016 .collect::<Result<_, _>>()?;
1017
1018 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1020 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1021 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1022
1023 columns.push(build_target_labels_column(
1025 &target_vids,
1026 &target_label_name,
1027 &graph_ctx,
1028 ));
1029
1030 if !target_properties.is_empty() {
1032 let prop_cols = build_target_property_columns(
1033 &target_vids,
1034 &target_properties,
1035 &target_label_name,
1036 &graph_ctx,
1037 )
1038 .await?;
1039 columns.extend(prop_cols);
1040 }
1041
1042 if edge_variable.is_some() {
1044 let edge_cols =
1045 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1046 columns.extend(edge_cols);
1047 } else {
1048 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1049 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1050 }
1051
1052 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1053
1054 if optional {
1056 let matched_indices: HashSet<usize> =
1057 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1058 let unmatched = collect_unmatched_optional_group_rows(
1059 &input,
1060 &matched_indices,
1061 &schema,
1062 &optional_pattern_vars,
1063 )?;
1064
1065 if !unmatched.is_empty() {
1066 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1067 &input,
1068 &unmatched,
1069 &schema,
1070 &optional_pattern_vars,
1071 )?;
1072 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1073 .map_err(arrow_err)?;
1074 return Ok(combined);
1075 }
1076 }
1077
1078 Ok(expanded_batch)
1079}
1080
1081fn build_optional_null_batch_for_rows(
1084 input: &RecordBatch,
1085 unmatched_indices: &[usize],
1086 schema: &SchemaRef,
1087) -> DFResult<RecordBatch> {
1088 let num_rows = unmatched_indices.len();
1089 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1090 let indices_array = UInt64Array::from(indices);
1091
1092 let mut columns: Vec<ArrayRef> = Vec::new();
1094 for col in input.columns() {
1095 let taken = take(col.as_ref(), &indices_array, None)?;
1096 columns.push(taken);
1097 }
1098 for field in schema.fields().iter().skip(input.num_columns()) {
1100 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1101 }
1102 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1103}
1104
1105fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1106 optional_vars.contains(col_name)
1107 || optional_vars.iter().any(|var| {
1108 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1109 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1110 })
1111}
1112
1113fn collect_unmatched_optional_group_rows(
1114 input: &RecordBatch,
1115 matched_indices: &HashSet<usize>,
1116 schema: &SchemaRef,
1117 optional_vars: &HashSet<String>,
1118) -> DFResult<Vec<usize>> {
1119 if input.num_rows() == 0 {
1120 return Ok(Vec::new());
1121 }
1122
1123 if optional_vars.is_empty() {
1124 return Ok((0..input.num_rows())
1125 .filter(|idx| !matched_indices.contains(idx))
1126 .collect());
1127 }
1128
1129 let source_vid_indices: Vec<usize> = schema
1130 .fields()
1131 .iter()
1132 .enumerate()
1133 .filter_map(|(idx, field)| {
1134 if idx >= input.num_columns() {
1135 return None;
1136 }
1137 let name = field.name();
1138 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1139 Some(idx)
1140 } else {
1141 None
1142 }
1143 })
1144 .collect();
1145
1146 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1149
1150 for row_idx in 0..input.num_rows() {
1151 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1152 let entry = groups.entry(key.clone());
1153 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1154 group_order.push(key.clone());
1155 }
1156 let matched = matched_indices.contains(&row_idx);
1157 entry
1158 .and_modify(|(_, any_matched)| *any_matched |= matched)
1159 .or_insert((row_idx, matched));
1160 }
1161
1162 Ok(group_order
1163 .into_iter()
1164 .filter_map(|key| {
1165 groups
1166 .get(&key)
1167 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1168 })
1169 .collect())
1170}
1171
1172fn compute_optional_group_key(
1173 batch: &RecordBatch,
1174 row_idx: usize,
1175 source_vid_indices: &[usize],
1176) -> DFResult<Vec<u8>> {
1177 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1178 for &col_idx in source_vid_indices {
1179 let col = batch.column(col_idx);
1180 let vid_cow = column_as_vid_array(col.as_ref())?;
1181 let arr: &UInt64Array = &vid_cow;
1182 if arr.is_null(row_idx) {
1183 key.extend_from_slice(&u64::MAX.to_le_bytes());
1184 } else {
1185 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1186 }
1187 }
1188 Ok(key)
1189}
1190
1191fn build_optional_null_batch_for_rows_with_optional_vars(
1192 input: &RecordBatch,
1193 unmatched_indices: &[usize],
1194 schema: &SchemaRef,
1195 optional_vars: &HashSet<String>,
1196) -> DFResult<RecordBatch> {
1197 if optional_vars.is_empty() {
1198 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1199 }
1200
1201 let num_rows = unmatched_indices.len();
1202 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1203 let indices_array = UInt64Array::from(indices);
1204
1205 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1206 for (col_idx, field) in schema.fields().iter().enumerate() {
1207 if col_idx < input.num_columns() {
1208 if is_optional_column_for_vars(field.name(), optional_vars) {
1209 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1210 } else {
1211 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1212 columns.push(taken);
1213 }
1214 } else {
1215 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1216 }
1217 }
1218
1219 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1220}
1221
1222impl Stream for GraphTraverseStream {
1223 type Item = DFResult<RecordBatch>;
1224
1225 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1226 let metrics = self.metrics.clone();
1227 let _timer = metrics.elapsed_compute().timer();
1228 loop {
1229 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1230
1231 match state {
1232 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1233 Poll::Ready(Ok(())) => {
1234 self.state = TraverseStreamState::Reading;
1235 }
1237 Poll::Ready(Err(e)) => {
1238 self.state = TraverseStreamState::Done;
1239 return Poll::Ready(Some(Err(e)));
1240 }
1241 Poll::Pending => {
1242 self.state = TraverseStreamState::Warming(fut);
1243 return Poll::Pending;
1244 }
1245 },
1246 TraverseStreamState::Reading => {
1247 if let Err(e) = self.graph_ctx.check_timeout() {
1249 return Poll::Ready(Some(Err(
1250 datafusion::error::DataFusionError::Execution(e.to_string()),
1251 )));
1252 }
1253
1254 match self.input.poll_next_unpin(cx) {
1255 Poll::Ready(Some(Ok(batch))) => {
1256 let expansions = match self.expand_neighbors(&batch) {
1258 Ok(exp) => exp,
1259 Err(e) => {
1260 self.state = TraverseStreamState::Reading;
1261 return Poll::Ready(Some(Err(e)));
1262 }
1263 };
1264
1265 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1267 {
1268 let result = build_traverse_output_batch_sync(
1269 &batch,
1270 &expansions,
1271 &self.schema,
1272 self.edge_variable.as_ref(),
1273 &self.graph_ctx,
1274 self.optional,
1275 &self.optional_pattern_vars,
1276 );
1277 self.state = TraverseStreamState::Reading;
1278 if let Ok(ref r) = result {
1279 self.metrics.record_output(r.num_rows());
1280 }
1281 return Poll::Ready(Some(result));
1282 }
1283
1284 let schema = self.schema.clone();
1286 let edge_variable = self.edge_variable.clone();
1287 let edge_properties = self.edge_properties.clone();
1288 let edge_type_ids = self.edge_type_ids.clone();
1289 let target_properties = self.target_properties.clone();
1290 let target_label_name = self.target_label_name.clone();
1291 let graph_ctx = self.graph_ctx.clone();
1292
1293 let optional = self.optional;
1294 let optional_pattern_vars = self.optional_pattern_vars.clone();
1295
1296 let fut = build_traverse_output_batch(
1297 batch,
1298 expansions,
1299 schema,
1300 edge_variable,
1301 edge_properties,
1302 edge_type_ids,
1303 target_properties,
1304 target_label_name,
1305 graph_ctx,
1306 optional,
1307 optional_pattern_vars,
1308 );
1309
1310 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1311 }
1313 Poll::Ready(Some(Err(e))) => {
1314 self.state = TraverseStreamState::Done;
1315 return Poll::Ready(Some(Err(e)));
1316 }
1317 Poll::Ready(None) => {
1318 self.state = TraverseStreamState::Done;
1319 return Poll::Ready(None);
1320 }
1321 Poll::Pending => {
1322 self.state = TraverseStreamState::Reading;
1323 return Poll::Pending;
1324 }
1325 }
1326 }
1327 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1328 Poll::Ready(Ok(batch)) => {
1329 self.state = TraverseStreamState::Reading;
1330 self.metrics.record_output(batch.num_rows());
1331 return Poll::Ready(Some(Ok(batch)));
1332 }
1333 Poll::Ready(Err(e)) => {
1334 self.state = TraverseStreamState::Done;
1335 return Poll::Ready(Some(Err(e)));
1336 }
1337 Poll::Pending => {
1338 self.state = TraverseStreamState::Materializing(fut);
1339 return Poll::Pending;
1340 }
1341 },
1342 TraverseStreamState::Done => {
1343 return Poll::Ready(None);
1344 }
1345 }
1346 }
1347 }
1348}
1349
1350fn build_traverse_output_batch_sync(
1355 input: &RecordBatch,
1356 expansions: &[(usize, Vid, u64, u32)],
1357 schema: &SchemaRef,
1358 edge_variable: Option<&String>,
1359 graph_ctx: &GraphExecutionContext,
1360 optional: bool,
1361 optional_pattern_vars: &HashSet<String>,
1362) -> DFResult<RecordBatch> {
1363 if expansions.is_empty() {
1364 if !optional {
1365 return Ok(RecordBatch::new_empty(schema.clone()));
1366 }
1367 let unmatched_reps = collect_unmatched_optional_group_rows(
1368 input,
1369 &HashSet::new(),
1370 schema,
1371 optional_pattern_vars,
1372 )?;
1373 if unmatched_reps.is_empty() {
1374 return Ok(RecordBatch::new_empty(schema.clone()));
1375 }
1376 return build_optional_null_batch_for_rows_with_optional_vars(
1377 input,
1378 &unmatched_reps,
1379 schema,
1380 optional_pattern_vars,
1381 );
1382 }
1383
1384 let indices: Vec<u64> = expansions
1385 .iter()
1386 .map(|(idx, _, _, _)| *idx as u64)
1387 .collect();
1388 let indices_array = UInt64Array::from(indices);
1389
1390 let mut columns: Vec<ArrayRef> = Vec::new();
1391 for col in input.columns() {
1392 let expanded = take(col.as_ref(), &indices_array, None)?;
1393 columns.push(expanded);
1394 }
1395
1396 let target_vids: Vec<u64> = expansions
1398 .iter()
1399 .map(|(_, vid, _, _)| vid.as_u64())
1400 .collect();
1401 columns.push(Arc::new(UInt64Array::from(target_vids)));
1402
1403 {
1405 use arrow_array::builder::{ListBuilder, StringBuilder};
1406 let l0_ctx = graph_ctx.l0_context();
1407 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1408 for (_, vid, _, _) in expansions {
1409 let mut row_labels: Vec<String> = Vec::new();
1410 for l0 in l0_ctx.iter_l0_buffers() {
1411 let guard = l0.read();
1412 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1413 for lbl in l0_labels {
1414 if !row_labels.contains(lbl) {
1415 row_labels.push(lbl.clone());
1416 }
1417 }
1418 }
1419 }
1420 let values = labels_builder.values();
1421 for lbl in &row_labels {
1422 values.append_value(lbl);
1423 }
1424 labels_builder.append(true);
1425 }
1426 columns.push(Arc::new(labels_builder.finish()));
1427 }
1428
1429 if edge_variable.is_some() {
1431 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1432 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1433
1434 let uni_schema = graph_ctx.storage().schema_manager().schema();
1436 let mut type_builder = arrow_array::builder::StringBuilder::new();
1437 for (_, _, _, edge_type_id) in expansions {
1438 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1439 type_builder.append_value(&name);
1440 } else {
1441 type_builder.append_null();
1442 }
1443 }
1444 columns.push(Arc::new(type_builder.finish()));
1445 } else {
1446 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1448 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1449 }
1450
1451 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1452
1453 if optional {
1454 let matched_indices: HashSet<usize> =
1455 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1456 let unmatched = collect_unmatched_optional_group_rows(
1457 input,
1458 &matched_indices,
1459 schema,
1460 optional_pattern_vars,
1461 )?;
1462
1463 if !unmatched.is_empty() {
1464 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1465 input,
1466 &unmatched,
1467 schema,
1468 optional_pattern_vars,
1469 )?;
1470 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1471 .map_err(|e| {
1472 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1473 })?;
1474 return Ok(combined);
1475 }
1476 }
1477
1478 Ok(expanded_batch)
1479}
1480
1481impl RecordBatchStream for GraphTraverseStream {
1482 fn schema(&self) -> SchemaRef {
1483 self.schema.clone()
1484 }
1485}
1486
1487type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1493
1494pub struct GraphTraverseMainExec {
1517 input: Arc<dyn ExecutionPlan>,
1519
1520 source_column: String,
1522
1523 type_names: Vec<String>,
1526
1527 direction: Direction,
1529
1530 target_variable: String,
1532
1533 edge_variable: Option<String>,
1535
1536 edge_properties: Vec<String>,
1538
1539 target_properties: Vec<String>,
1541
1542 graph_ctx: Arc<GraphExecutionContext>,
1544
1545 optional: bool,
1547
1548 optional_pattern_vars: HashSet<String>,
1550
1551 bound_target_column: Option<String>,
1554
1555 used_edge_columns: Vec<String>,
1558
1559 schema: SchemaRef,
1561
1562 properties: Arc<PlanProperties>,
1564
1565 metrics: ExecutionPlanMetricsSet,
1567}
1568
1569impl fmt::Debug for GraphTraverseMainExec {
1570 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1571 f.debug_struct("GraphTraverseMainExec")
1572 .field("type_names", &self.type_names)
1573 .field("direction", &self.direction)
1574 .field("target_variable", &self.target_variable)
1575 .field("edge_variable", &self.edge_variable)
1576 .finish()
1577 }
1578}
1579
1580impl GraphTraverseMainExec {
1581 #[expect(clippy::too_many_arguments)]
1583 pub fn new(
1584 input: Arc<dyn ExecutionPlan>,
1585 source_column: impl Into<String>,
1586 type_names: Vec<String>,
1587 direction: Direction,
1588 target_variable: impl Into<String>,
1589 edge_variable: Option<String>,
1590 edge_properties: Vec<String>,
1591 target_properties: Vec<String>,
1592 graph_ctx: Arc<GraphExecutionContext>,
1593 optional: bool,
1594 optional_pattern_vars: HashSet<String>,
1595 bound_target_column: Option<String>,
1596 used_edge_columns: Vec<String>,
1597 ) -> Self {
1598 let source_column = source_column.into();
1599 let target_variable = target_variable.into();
1600
1601 let schema = Self::build_schema(
1603 &input.schema(),
1604 &target_variable,
1605 &edge_variable,
1606 &edge_properties,
1607 &target_properties,
1608 optional,
1609 );
1610
1611 let properties = compute_plan_properties(schema.clone());
1612
1613 Self {
1614 input,
1615 source_column,
1616 type_names,
1617 direction,
1618 target_variable,
1619 edge_variable,
1620 edge_properties,
1621 target_properties,
1622 graph_ctx,
1623 optional,
1624 optional_pattern_vars,
1625 bound_target_column,
1626 used_edge_columns,
1627 schema,
1628 properties,
1629 metrics: ExecutionPlanMetricsSet::new(),
1630 }
1631 }
1632
1633 fn build_schema(
1635 input_schema: &SchemaRef,
1636 target_variable: &str,
1637 edge_variable: &Option<String>,
1638 edge_properties: &[String],
1639 target_properties: &[String],
1640 optional: bool,
1641 ) -> SchemaRef {
1642 let mut fields: Vec<Field> = input_schema
1643 .fields()
1644 .iter()
1645 .map(|f| f.as_ref().clone())
1646 .collect();
1647
1648 let target_vid_name = format!("{}._vid", target_variable);
1650 if input_schema.column_with_name(&target_vid_name).is_none() {
1651 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1652 }
1653
1654 let target_labels_name = format!("{}._labels", target_variable);
1656 if input_schema.column_with_name(&target_labels_name).is_none() {
1657 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1658 }
1659
1660 if let Some(edge_var) = edge_variable {
1662 fields.push(Field::new(
1663 format!("{}._eid", edge_var),
1664 DataType::UInt64,
1665 optional,
1666 ));
1667
1668 fields.push(Field::new(
1670 format!("{}._type", edge_var),
1671 DataType::Utf8,
1672 true,
1673 ));
1674
1675 for prop in edge_properties {
1679 let col_name = format!("{}.{}", edge_var, prop);
1680 let mut metadata = std::collections::HashMap::new();
1681 metadata.insert("cv_encoded".to_string(), "true".to_string());
1682 fields.push(
1683 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1684 );
1685 }
1686 } else {
1687 fields.push(Field::new(
1690 format!("__eid_to_{}", target_variable),
1691 DataType::UInt64,
1692 optional,
1693 ));
1694 }
1695
1696 for prop in target_properties {
1698 fields.push(Field::new(
1699 format!("{}.{}", target_variable, prop),
1700 DataType::LargeBinary,
1701 true,
1702 ));
1703 }
1704
1705 Arc::new(Schema::new(fields))
1706 }
1707}
1708
1709impl DisplayAs for GraphTraverseMainExec {
1710 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1711 write!(
1712 f,
1713 "GraphTraverseMainExec: types={:?}, direction={:?}",
1714 self.type_names, self.direction
1715 )
1716 }
1717}
1718
1719impl ExecutionPlan for GraphTraverseMainExec {
1720 fn name(&self) -> &str {
1721 "GraphTraverseMainExec"
1722 }
1723
1724 fn as_any(&self) -> &dyn Any {
1725 self
1726 }
1727
1728 fn schema(&self) -> SchemaRef {
1729 self.schema.clone()
1730 }
1731
1732 fn properties(&self) -> &Arc<PlanProperties> {
1733 &self.properties
1734 }
1735
1736 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1737 vec![&self.input]
1738 }
1739
1740 fn with_new_children(
1741 self: Arc<Self>,
1742 children: Vec<Arc<dyn ExecutionPlan>>,
1743 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1744 if children.len() != 1 {
1745 return Err(datafusion::error::DataFusionError::Plan(
1746 "GraphTraverseMainExec expects exactly one child".to_string(),
1747 ));
1748 }
1749
1750 Ok(Arc::new(Self {
1751 input: children[0].clone(),
1752 source_column: self.source_column.clone(),
1753 type_names: self.type_names.clone(),
1754 direction: self.direction,
1755 target_variable: self.target_variable.clone(),
1756 edge_variable: self.edge_variable.clone(),
1757 edge_properties: self.edge_properties.clone(),
1758 target_properties: self.target_properties.clone(),
1759 graph_ctx: self.graph_ctx.clone(),
1760 optional: self.optional,
1761 optional_pattern_vars: self.optional_pattern_vars.clone(),
1762 bound_target_column: self.bound_target_column.clone(),
1763 used_edge_columns: self.used_edge_columns.clone(),
1764 schema: self.schema.clone(),
1765 properties: self.properties.clone(),
1766 metrics: self.metrics.clone(),
1767 }))
1768 }
1769
1770 fn execute(
1771 &self,
1772 partition: usize,
1773 context: Arc<TaskContext>,
1774 ) -> DFResult<SendableRecordBatchStream> {
1775 let input_stream = self.input.execute(partition, context)?;
1776 let metrics = BaselineMetrics::new(&self.metrics, partition);
1777
1778 Ok(Box::pin(GraphTraverseMainStream::new(
1779 input_stream,
1780 self.source_column.clone(),
1781 self.type_names.clone(),
1782 self.direction,
1783 self.target_variable.clone(),
1784 self.edge_variable.clone(),
1785 self.edge_properties.clone(),
1786 self.target_properties.clone(),
1787 self.graph_ctx.clone(),
1788 self.optional,
1789 self.optional_pattern_vars.clone(),
1790 self.bound_target_column.clone(),
1791 self.used_edge_columns.clone(),
1792 self.schema.clone(),
1793 metrics,
1794 )))
1795 }
1796
1797 fn metrics(&self) -> Option<MetricsSet> {
1798 Some(self.metrics.clone_inner())
1799 }
1800}
1801
1802const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1808
1809enum GraphTraverseMainState {
1817 CollectingInput {
1819 input_stream: SendableRecordBatchStream,
1820 buffered: Vec<RecordBatch>,
1821 },
1822 LoadingEdges {
1824 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1825 buffered: Vec<RecordBatch>,
1826 },
1827 Processing {
1829 adjacency: EdgeAdjacencyMap,
1830 buffered: std::vec::IntoIter<RecordBatch>,
1831 },
1832 Done,
1834}
1835
1836struct GraphTraverseMainStream {
1838 source_column: String,
1840
1841 target_variable: String,
1843
1844 edge_variable: Option<String>,
1846
1847 edge_properties: Vec<String>,
1849
1850 target_properties: Vec<String>,
1852
1853 graph_ctx: Arc<GraphExecutionContext>,
1855
1856 optional: bool,
1858
1859 optional_pattern_vars: HashSet<String>,
1861
1862 bound_target_column: Option<String>,
1864
1865 used_edge_columns: Vec<String>,
1867
1868 schema: SchemaRef,
1870
1871 type_names: Vec<String>,
1873
1874 direction: Direction,
1876
1877 state: GraphTraverseMainState,
1879
1880 metrics: BaselineMetrics,
1882}
1883
1884impl GraphTraverseMainStream {
1885 #[expect(clippy::too_many_arguments)]
1887 fn new(
1888 input_stream: SendableRecordBatchStream,
1889 source_column: String,
1890 type_names: Vec<String>,
1891 direction: Direction,
1892 target_variable: String,
1893 edge_variable: Option<String>,
1894 edge_properties: Vec<String>,
1895 target_properties: Vec<String>,
1896 graph_ctx: Arc<GraphExecutionContext>,
1897 optional: bool,
1898 optional_pattern_vars: HashSet<String>,
1899 bound_target_column: Option<String>,
1900 used_edge_columns: Vec<String>,
1901 schema: SchemaRef,
1902 metrics: BaselineMetrics,
1903 ) -> Self {
1904 Self {
1907 source_column,
1908 target_variable,
1909 edge_variable,
1910 edge_properties,
1911 target_properties,
1912 graph_ctx,
1913 optional,
1914 optional_pattern_vars,
1915 bound_target_column,
1916 used_edge_columns,
1917 schema,
1918 type_names,
1919 direction,
1920 state: GraphTraverseMainState::CollectingInput {
1921 input_stream,
1922 buffered: Vec::new(),
1923 },
1924 metrics,
1925 }
1926 }
1927
1928 fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1933 let mut vids: HashSet<Vid> = HashSet::new();
1934 for batch in buffered {
1935 let col = batch.column_by_name(&self.source_column)?;
1936 let arr = column_as_vid_array(col.as_ref()).ok()?;
1937 for i in 0..arr.len() {
1938 if !arr.is_null(i) {
1939 vids.insert(Vid::from(arr.value(i)));
1940 if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1941 return None;
1942 }
1943 }
1944 }
1945 }
1946 Some(vids)
1947 }
1948
1949 fn expand_batch(
1951 &self,
1952 input: &RecordBatch,
1953 adjacency: &EdgeAdjacencyMap,
1954 ) -> DFResult<RecordBatch> {
1955 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1957 datafusion::error::DataFusionError::Execution(format!(
1958 "Source column {} not found",
1959 self.source_column
1960 ))
1961 })?;
1962
1963 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1964 let source_vids: &UInt64Array = &source_vid_cow;
1965
1966 let bound_target_cow = self
1968 .bound_target_column
1969 .as_ref()
1970 .and_then(|col| input.column_by_name(col))
1971 .map(|c| column_as_vid_array(c.as_ref()))
1972 .transpose()?;
1973 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1974
1975 let used_edge_arrays: Vec<&UInt64Array> = self
1977 .used_edge_columns
1978 .iter()
1979 .filter_map(|col| {
1980 input
1981 .column_by_name(col)
1982 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1983 })
1984 .collect();
1985
1986 type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1989 let mut expansions: Vec<Expansion> = Vec::new();
1990
1991 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1992 if let Some(src_u64) = src_u64 {
1993 let src_vid = Vid::from(src_u64);
1994
1995 let used_eids: HashSet<u64> = used_edge_arrays
1997 .iter()
1998 .filter_map(|arr| {
1999 if arr.is_null(row_idx) {
2000 None
2001 } else {
2002 Some(arr.value(row_idx))
2003 }
2004 })
2005 .collect();
2006
2007 if let Some(neighbors) = adjacency.get(&src_vid) {
2008 for (target_vid, eid, edge_type, props) in neighbors {
2009 if used_eids.contains(&eid.as_u64()) {
2011 continue;
2012 }
2013
2014 if let Some(targets) = expected_targets {
2017 if targets.is_null(row_idx) {
2018 continue;
2019 }
2020 let expected_vid = targets.value(row_idx);
2021 if target_vid.as_u64() != expected_vid {
2022 continue;
2023 }
2024 }
2025
2026 expansions.push((
2027 row_idx,
2028 *target_vid,
2029 *eid,
2030 Arc::clone(edge_type),
2031 Arc::clone(props),
2032 ));
2033 }
2034 }
2035 }
2036 }
2037
2038 if expansions.is_empty() && self.optional {
2040 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2042 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2043 }
2044
2045 if expansions.is_empty() {
2046 return Ok(RecordBatch::new_empty(self.schema.clone()));
2048 }
2049
2050 let matched_rows: HashSet<usize> = if self.optional {
2052 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2053 } else {
2054 HashSet::new()
2055 };
2056
2057 let mut columns: Vec<ArrayRef> = Vec::new();
2059 let indices: Vec<u64> = expansions
2060 .iter()
2061 .map(|(idx, _, _, _, _)| *idx as u64)
2062 .collect();
2063 let indices_array = UInt64Array::from(indices);
2064
2065 for col in input.columns() {
2066 let expanded = take(col.as_ref(), &indices_array, None)?;
2067 columns.push(expanded);
2068 }
2069
2070 let target_vid_name = format!("{}._vid", self.target_variable);
2072 let target_vids: Vec<u64> = expansions
2073 .iter()
2074 .map(|(_, vid, _, _, _)| vid.as_u64())
2075 .collect();
2076 if input.schema().column_with_name(&target_vid_name).is_none() {
2077 columns.push(Arc::new(UInt64Array::from(target_vids)));
2078 }
2079
2080 let target_labels_name = format!("{}._labels", self.target_variable);
2082 if input
2083 .schema()
2084 .column_with_name(&target_labels_name)
2085 .is_none()
2086 {
2087 use arrow_array::builder::{ListBuilder, StringBuilder};
2088 let query_ctx = self.graph_ctx.query_context();
2094 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2095 for (_, target_vid, _, _, _) in &expansions {
2096 let row_labels = self
2097 .graph_ctx
2098 .resolve_vertex_labels(*target_vid, &query_ctx)
2099 .unwrap_or_default();
2100 let values = labels_builder.values();
2101 for lbl in &row_labels {
2102 values.append_value(lbl);
2103 }
2104 labels_builder.append(true);
2105 }
2106 columns.push(Arc::new(labels_builder.finish()));
2107 }
2108
2109 if self.edge_variable.is_some() {
2112 let eids: Vec<u64> = expansions
2114 .iter()
2115 .map(|(_, _, eid, _, _)| eid.as_u64())
2116 .collect();
2117 columns.push(Arc::new(UInt64Array::from(eids)));
2118
2119 {
2121 let mut type_builder = arrow_array::builder::StringBuilder::new();
2122 for (_, _, _, edge_type, _) in &expansions {
2123 type_builder.append_value(edge_type);
2124 }
2125 columns.push(Arc::new(type_builder.finish()));
2126 }
2127
2128 for prop_name in &self.edge_properties {
2130 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2131 if prop_name == "_all_props" {
2132 for (_, _, _, _, props) in &expansions {
2135 if props.is_empty() {
2136 builder.append_null();
2137 } else {
2138 let map: HashMap<String, uni_common::Value> =
2139 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2140 builder.append_value(uni_common::cypher_value_codec::encode(
2141 &uni_common::Value::Map(map),
2142 ));
2143 }
2144 }
2145 } else {
2146 for (_, _, _, _, props) in &expansions {
2148 match props.get(prop_name) {
2149 Some(uni_common::Value::Null) | None => builder.append_null(),
2150 Some(val) => {
2151 builder.append_value(uni_common::cypher_value_codec::encode(val));
2152 }
2153 }
2154 }
2155 }
2156 columns.push(Arc::new(builder.finish()));
2157 }
2158 } else {
2159 let eids: Vec<u64> = expansions
2160 .iter()
2161 .map(|(_, _, eid, _, _)| eid.as_u64())
2162 .collect();
2163 columns.push(Arc::new(UInt64Array::from(eids)));
2164 }
2165
2166 {
2168 let l0_ctx = self.graph_ctx.l0_context();
2169
2170 for prop_name in &self.target_properties {
2171 if prop_name == "_all_props" {
2172 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2174 for (_, target_vid, _, _, _) in &expansions {
2175 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2177 for l0 in l0_ctx.iter_l0_buffers() {
2178 let guard = l0.read();
2179 if let Some(props) = guard.vertex_properties.get(target_vid) {
2180 for (k, v) in props.iter() {
2181 merged_props.insert(k.clone(), v.clone());
2182 }
2183 }
2184 }
2185 if merged_props.is_empty() {
2186 builder.append_null();
2187 } else {
2188 builder.append_value(uni_common::cypher_value_codec::encode(
2189 &uni_common::Value::Map(merged_props),
2190 ));
2191 }
2192 }
2193 columns.push(Arc::new(builder.finish()));
2194 } else {
2195 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2197 for (_, target_vid, _, _, _) in &expansions {
2198 let mut found = false;
2199 for l0 in l0_ctx.iter_l0_buffers() {
2200 let guard = l0.read();
2201 if let Some(props) = guard.vertex_properties.get(target_vid)
2202 && let Some(val) = props.get(prop_name.as_str())
2203 && !val.is_null()
2204 {
2205 builder.append_value(uni_common::cypher_value_codec::encode(val));
2206 found = true;
2207 break;
2208 }
2209 }
2210 if !found {
2211 builder.append_null();
2212 }
2213 }
2214 columns.push(Arc::new(builder.finish()));
2215 }
2216 }
2217 }
2218
2219 let matched_batch =
2220 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2221
2222 if self.optional {
2224 let unmatched = collect_unmatched_optional_group_rows(
2225 input,
2226 &matched_rows,
2227 &self.schema,
2228 &self.optional_pattern_vars,
2229 )?;
2230
2231 if unmatched.is_empty() {
2232 return Ok(matched_batch);
2233 }
2234
2235 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2236 input,
2237 &unmatched,
2238 &self.schema,
2239 &self.optional_pattern_vars,
2240 )?;
2241
2242 use arrow::compute::concat_batches;
2244 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2245 } else {
2246 Ok(matched_batch)
2247 }
2248 }
2249}
2250
2251impl GraphExecutionContext {
2252 fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2263 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2264 return;
2265 };
2266 let guard = tx_l0.read();
2267 let Some(read_set) = &guard.occ_read_set else {
2268 return;
2269 };
2270 let mut rs = read_set.lock();
2271 for (src, neighbors) in adjacency {
2272 rs.vertices.insert(*src);
2273 for (nbr, eid, _type, _props) in neighbors {
2274 rs.vertices.insert(*nbr);
2275 rs.edges.insert(*eid);
2276 }
2277 }
2278 }
2279}
2280
2281async fn build_edge_adjacency_map(
2292 graph_ctx: &GraphExecutionContext,
2293 type_names: &[String],
2294 direction: Direction,
2295 source_vids: Option<HashSet<Vid>>,
2296) -> DFResult<EdgeAdjacencyMap> {
2297 let storage = graph_ctx.storage();
2298 let l0_ctx = graph_ctx.l0_context();
2299
2300 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2303 let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2304 let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2305 let side = match direction {
2306 Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2307 Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2308 Direction::Both => uni_store::storage::EndpointSide::Either,
2309 };
2310 (side, vids)
2311 });
2312 let edges_with_type = storage
2313 .find_edges_by_type_names(&type_refs, endpoint_filter)
2314 .await
2315 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2316
2317 let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2321 match &source_vids {
2322 None => true,
2323 Some(set) => match direction {
2324 Direction::Outgoing => set.contains(&src),
2325 Direction::Incoming => set.contains(&dst),
2326 Direction::Both => set.contains(&src) || set.contains(&dst),
2327 },
2328 }
2329 };
2330
2331 let mut edges: Vec<(
2333 uni_common::Eid,
2334 uni_common::Vid,
2335 uni_common::Vid,
2336 String,
2337 uni_common::Properties,
2338 )> = edges_with_type.into_iter().collect();
2339
2340 for l0 in l0_ctx.iter_l0_buffers() {
2342 let l0_guard = l0.read();
2343
2344 for type_name in type_names {
2345 let l0_eids = l0_guard.eids_for_type(type_name);
2346
2347 for &eid in &l0_eids {
2349 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2350 let src_vid = edge_ref.src_vid;
2351 let dst_vid = edge_ref.dst_vid;
2352 if !edge_in_scope(src_vid, dst_vid) {
2353 continue;
2354 }
2355
2356 let props = l0_guard
2358 .edge_properties
2359 .get(&eid)
2360 .cloned()
2361 .unwrap_or_default();
2362
2363 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2364 }
2365 }
2366 }
2367 }
2368
2369 let mut seen_eids = HashSet::new();
2371 let mut unique_edges = Vec::new();
2372 for edge in edges.into_iter().rev() {
2373 if seen_eids.insert(edge.0) {
2374 unique_edges.push(edge);
2375 }
2376 }
2377 unique_edges.reverse();
2378
2379 let mut tombstoned_eids = HashSet::new();
2381 for l0 in l0_ctx.iter_l0_buffers() {
2382 let l0_guard = l0.read();
2383 for eid in l0_guard.tombstones.keys() {
2384 tombstoned_eids.insert(*eid);
2385 }
2386 }
2387 if !tombstoned_eids.is_empty() {
2388 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2389 }
2390
2391 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2394
2395 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2396 let edge_type: Arc<str> = edge_type.into();
2397 let props = Arc::new(props);
2398 match direction {
2399 Direction::Outgoing => {
2400 adjacency
2401 .entry(src_vid)
2402 .or_default()
2403 .push((dst_vid, eid, edge_type, props));
2404 }
2405 Direction::Incoming => {
2406 adjacency
2407 .entry(dst_vid)
2408 .or_default()
2409 .push((src_vid, eid, edge_type, props));
2410 }
2411 Direction::Both => {
2412 adjacency.entry(src_vid).or_default().push((
2413 dst_vid,
2414 eid,
2415 Arc::clone(&edge_type),
2416 Arc::clone(&props),
2417 ));
2418 adjacency
2419 .entry(dst_vid)
2420 .or_default()
2421 .push((src_vid, eid, edge_type, props));
2422 }
2423 }
2424 }
2425
2426 graph_ctx.record_edge_adjacency(&adjacency);
2434
2435 Ok(adjacency)
2436}
2437
2438async fn build_edge_property_filter(
2453 graph_ctx: &Arc<GraphExecutionContext>,
2454 edge_type_ids: &[u32],
2455 direction: Direction,
2456 conditions: &[(String, UniValue)],
2457) -> DFResult<EidFilter> {
2458 if conditions.is_empty() {
2459 return Ok(EidFilter::AllAllowed);
2460 }
2461
2462 let uni_schema = graph_ctx.storage().schema_manager().schema();
2463 let type_names: Vec<String> = edge_type_ids
2464 .iter()
2465 .filter_map(|id| uni_schema.edge_type_name_by_id_unified(*id))
2466 .collect();
2467 if type_names.is_empty() {
2468 return Ok(EidFilter::AllAllowed);
2469 }
2470
2471 let adjacency = build_edge_adjacency_map(graph_ctx, &type_names, direction, None).await?;
2475
2476 let mut passing: Vec<u64> = Vec::new();
2477 let mut max_eid: u64 = 0;
2478 let mut seen: FxHashSet<u64> = FxHashSet::default();
2479 for edges in adjacency.values() {
2480 for (_neighbor, eid, _etype, props) in edges {
2481 let raw = eid.as_u64();
2482 if !seen.insert(raw) {
2485 continue;
2486 }
2487 max_eid = max_eid.max(raw);
2488 let passes = conditions
2489 .iter()
2490 .all(|(name, expected)| props.get(name).is_some_and(|actual| actual == expected));
2491 if passes {
2492 passing.push(raw);
2493 }
2494 }
2495 }
2496
2497 Ok(EidFilter::from_eids(passing, max_eid as usize + 1))
2498}
2499
2500impl Stream for GraphTraverseMainStream {
2501 type Item = DFResult<RecordBatch>;
2502
2503 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2504 let metrics = self.metrics.clone();
2505 let _timer = metrics.elapsed_compute().timer();
2506 loop {
2507 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2508
2509 match state {
2510 GraphTraverseMainState::CollectingInput {
2511 mut input_stream,
2512 mut buffered,
2513 } => match input_stream.poll_next_unpin(cx) {
2514 Poll::Ready(Some(Ok(batch))) => {
2515 buffered.push(batch);
2516 self.state = GraphTraverseMainState::CollectingInput {
2517 input_stream,
2518 buffered,
2519 };
2520 }
2522 Poll::Ready(Some(Err(e))) => {
2523 self.state = GraphTraverseMainState::Done;
2524 return Poll::Ready(Some(Err(e)));
2525 }
2526 Poll::Ready(None) => {
2527 let source_vids = self.collect_source_vids(&buffered);
2530 let loading_ctx = self.graph_ctx.clone();
2531 let loading_types = self.type_names.clone();
2532 let direction = self.direction;
2533 let fut = async move {
2534 build_edge_adjacency_map(
2535 &loading_ctx,
2536 &loading_types,
2537 direction,
2538 source_vids,
2539 )
2540 .await
2541 };
2542 self.state = GraphTraverseMainState::LoadingEdges {
2543 future: Box::pin(fut),
2544 buffered,
2545 };
2546 }
2548 Poll::Pending => {
2549 self.state = GraphTraverseMainState::CollectingInput {
2550 input_stream,
2551 buffered,
2552 };
2553 return Poll::Pending;
2554 }
2555 },
2556 GraphTraverseMainState::LoadingEdges {
2557 mut future,
2558 buffered,
2559 } => match future.as_mut().poll(cx) {
2560 Poll::Ready(Ok(adjacency)) => {
2561 self.state = GraphTraverseMainState::Processing {
2563 adjacency,
2564 buffered: buffered.into_iter(),
2565 };
2566 }
2568 Poll::Ready(Err(e)) => {
2569 self.state = GraphTraverseMainState::Done;
2570 return Poll::Ready(Some(Err(e)));
2571 }
2572 Poll::Pending => {
2573 self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2574 return Poll::Pending;
2575 }
2576 },
2577 GraphTraverseMainState::Processing {
2578 adjacency,
2579 mut buffered,
2580 } => {
2581 if let Err(e) = self.graph_ctx.check_timeout() {
2583 return Poll::Ready(Some(Err(
2584 datafusion::error::DataFusionError::Execution(e.to_string()),
2585 )));
2586 }
2587
2588 match buffered.next() {
2589 Some(batch) => {
2590 let result = self.expand_batch(&batch, &adjacency);
2592
2593 self.state = GraphTraverseMainState::Processing {
2594 adjacency,
2595 buffered,
2596 };
2597
2598 if let Ok(ref r) = result {
2599 self.metrics.record_output(r.num_rows());
2600 }
2601 return Poll::Ready(Some(result));
2602 }
2603 None => {
2604 self.state = GraphTraverseMainState::Done;
2605 return Poll::Ready(None);
2606 }
2607 }
2608 }
2609 GraphTraverseMainState::Done => {
2610 return Poll::Ready(None);
2611 }
2612 }
2613 }
2614 }
2615}
2616
2617impl RecordBatchStream for GraphTraverseMainStream {
2618 fn schema(&self) -> SchemaRef {
2619 self.schema.clone()
2620 }
2621}
2622
2623pub struct GraphVariableLengthTraverseExec {
2644 input: Arc<dyn ExecutionPlan>,
2646
2647 source_column: String,
2649
2650 edge_type_ids: Vec<u32>,
2652
2653 direction: Direction,
2655
2656 min_hops: usize,
2658
2659 max_hops: usize,
2661
2662 target_variable: String,
2664
2665 step_variable: Option<String>,
2667
2668 path_variable: Option<String>,
2670
2671 target_properties: Vec<String>,
2673
2674 target_label_name: Option<String>,
2676
2677 is_optional: bool,
2679
2680 bound_target_column: Option<String>,
2682
2683 edge_lance_filter: Option<String>,
2685
2686 edge_property_conditions: Vec<(String, UniValue)>,
2689
2690 used_edge_columns: Vec<String>,
2692
2693 path_mode: super::nfa::PathMode,
2695
2696 output_mode: super::nfa::VlpOutputMode,
2698
2699 nfa: Arc<PathNfa>,
2701
2702 graph_ctx: Arc<GraphExecutionContext>,
2704
2705 schema: SchemaRef,
2707
2708 properties: Arc<PlanProperties>,
2710
2711 metrics: ExecutionPlanMetricsSet,
2713}
2714
2715impl fmt::Debug for GraphVariableLengthTraverseExec {
2716 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2717 f.debug_struct("GraphVariableLengthTraverseExec")
2718 .field("source_column", &self.source_column)
2719 .field("edge_type_ids", &self.edge_type_ids)
2720 .field("direction", &self.direction)
2721 .field("min_hops", &self.min_hops)
2722 .field("max_hops", &self.max_hops)
2723 .field("target_variable", &self.target_variable)
2724 .finish()
2725 }
2726}
2727
2728impl GraphVariableLengthTraverseExec {
2729 #[expect(clippy::too_many_arguments)]
2735 pub fn new(
2736 input: Arc<dyn ExecutionPlan>,
2737 source_column: impl Into<String>,
2738 edge_type_ids: Vec<u32>,
2739 direction: Direction,
2740 min_hops: usize,
2741 max_hops: usize,
2742 target_variable: impl Into<String>,
2743 step_variable: Option<String>,
2744 path_variable: Option<String>,
2745 target_properties: Vec<String>,
2746 target_label_name: Option<String>,
2747 graph_ctx: Arc<GraphExecutionContext>,
2748 is_optional: bool,
2749 bound_target_column: Option<String>,
2750 edge_lance_filter: Option<String>,
2751 edge_property_conditions: Vec<(String, UniValue)>,
2752 used_edge_columns: Vec<String>,
2753 path_mode: super::nfa::PathMode,
2754 output_mode: super::nfa::VlpOutputMode,
2755 qpp_nfa: Option<PathNfa>,
2756 ) -> Self {
2757 let source_column = source_column.into();
2758 let target_variable = target_variable.into();
2759
2760 let uni_schema = graph_ctx.storage().schema_manager().schema();
2762 let label_props = target_label_name
2763 .as_deref()
2764 .and_then(|ln| uni_schema.properties.get(ln));
2765
2766 let schema = Self::build_schema(
2768 input.schema(),
2769 &target_variable,
2770 step_variable.as_deref(),
2771 path_variable.as_deref(),
2772 &target_properties,
2773 label_props,
2774 );
2775 let properties = compute_plan_properties(schema.clone());
2776
2777 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2779 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2780 }));
2781
2782 Self {
2783 input,
2784 source_column,
2785 edge_type_ids,
2786 direction,
2787 min_hops,
2788 max_hops,
2789 target_variable,
2790 step_variable,
2791 path_variable,
2792 target_properties,
2793 target_label_name,
2794 is_optional,
2795 bound_target_column,
2796 edge_lance_filter,
2797 edge_property_conditions,
2798 used_edge_columns,
2799 path_mode,
2800 output_mode,
2801 nfa,
2802 graph_ctx,
2803 schema,
2804 properties,
2805 metrics: ExecutionPlanMetricsSet::new(),
2806 }
2807 }
2808
2809 fn build_schema(
2811 input_schema: SchemaRef,
2812 target_variable: &str,
2813 step_variable: Option<&str>,
2814 path_variable: Option<&str>,
2815 target_properties: &[String],
2816 label_props: Option<
2817 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2818 >,
2819 ) -> SchemaRef {
2820 let mut fields: Vec<Field> = input_schema
2821 .fields()
2822 .iter()
2823 .map(|f| f.as_ref().clone())
2824 .collect();
2825
2826 let target_vid_name = format!("{}._vid", target_variable);
2828 if input_schema.column_with_name(&target_vid_name).is_none() {
2829 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2830 }
2831
2832 let target_labels_name = format!("{}._labels", target_variable);
2834 if input_schema.column_with_name(&target_labels_name).is_none() {
2835 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2836 }
2837
2838 for prop_name in target_properties {
2840 let col_name = format!("{}.{}", target_variable, prop_name);
2841 if input_schema.column_with_name(&col_name).is_none() {
2842 let arrow_type = resolve_property_type(prop_name, label_props);
2843 let uni_type = label_props
2844 .and_then(|p| p.get(prop_name))
2845 .map(|m| &m.r#type);
2846 fields.push(property_field(&col_name, arrow_type, uni_type));
2847 }
2848 }
2849
2850 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2852
2853 if let Some(step_var) = step_variable {
2855 fields.push(build_edge_list_field(step_var));
2856 }
2857
2858 if let Some(path_var) = path_variable
2860 && input_schema.column_with_name(path_var).is_none()
2861 {
2862 fields.push(build_path_struct_field(path_var));
2863 }
2864
2865 Arc::new(Schema::new(fields))
2866 }
2867}
2868
2869impl DisplayAs for GraphVariableLengthTraverseExec {
2870 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2871 write!(
2872 f,
2873 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2874 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2875 )
2876 }
2877}
2878
2879impl ExecutionPlan for GraphVariableLengthTraverseExec {
2880 fn name(&self) -> &str {
2881 "GraphVariableLengthTraverseExec"
2882 }
2883
2884 fn as_any(&self) -> &dyn Any {
2885 self
2886 }
2887
2888 fn schema(&self) -> SchemaRef {
2889 self.schema.clone()
2890 }
2891
2892 fn properties(&self) -> &Arc<PlanProperties> {
2893 &self.properties
2894 }
2895
2896 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2897 vec![&self.input]
2898 }
2899
2900 fn with_new_children(
2901 self: Arc<Self>,
2902 children: Vec<Arc<dyn ExecutionPlan>>,
2903 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2904 if children.len() != 1 {
2905 return Err(datafusion::error::DataFusionError::Plan(
2906 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2907 ));
2908 }
2909
2910 Ok(Arc::new(Self::new(
2912 children[0].clone(),
2913 self.source_column.clone(),
2914 self.edge_type_ids.clone(),
2915 self.direction,
2916 self.min_hops,
2917 self.max_hops,
2918 self.target_variable.clone(),
2919 self.step_variable.clone(),
2920 self.path_variable.clone(),
2921 self.target_properties.clone(),
2922 self.target_label_name.clone(),
2923 self.graph_ctx.clone(),
2924 self.is_optional,
2925 self.bound_target_column.clone(),
2926 self.edge_lance_filter.clone(),
2927 self.edge_property_conditions.clone(),
2928 self.used_edge_columns.clone(),
2929 self.path_mode.clone(),
2930 self.output_mode.clone(),
2931 Some((*self.nfa).clone()),
2932 )))
2933 }
2934
2935 fn execute(
2936 &self,
2937 partition: usize,
2938 context: Arc<TaskContext>,
2939 ) -> DFResult<SendableRecordBatchStream> {
2940 let input_stream = self.input.execute(partition, context)?;
2941
2942 let metrics = BaselineMetrics::new(&self.metrics, partition);
2943
2944 let graph_ctx = self.graph_ctx.clone();
2949 let edge_type_ids = self.edge_type_ids.clone();
2950 let direction = self.direction;
2951 let edge_property_conditions = self.edge_property_conditions.clone();
2952 let warm_fut: Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>> =
2953 Box::pin(async move {
2954 graph_ctx
2955 .ensure_adjacency_warmed(&edge_type_ids, direction)
2956 .await
2957 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2958 build_edge_property_filter(
2959 &graph_ctx,
2960 &edge_type_ids,
2961 direction,
2962 &edge_property_conditions,
2963 )
2964 .await
2965 });
2966
2967 Ok(Box::pin(GraphVariableLengthTraverseStream {
2968 input: input_stream,
2969 exec: Arc::new(self.clone_for_stream()),
2970 schema: self.schema.clone(),
2971 state: VarLengthStreamState::Warming(warm_fut),
2972 edge_property_filter: EidFilter::AllAllowed,
2973 metrics,
2974 }))
2975 }
2976
2977 fn metrics(&self) -> Option<MetricsSet> {
2978 Some(self.metrics.clone_inner())
2979 }
2980}
2981
2982impl GraphVariableLengthTraverseExec {
2983 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2985 GraphVariableLengthTraverseExecData {
2986 source_column: self.source_column.clone(),
2987 edge_type_ids: self.edge_type_ids.clone(),
2988 direction: self.direction,
2989 min_hops: self.min_hops,
2990 max_hops: self.max_hops,
2991 target_variable: self.target_variable.clone(),
2992 step_variable: self.step_variable.clone(),
2993 path_variable: self.path_variable.clone(),
2994 target_properties: self.target_properties.clone(),
2995 target_label_name: self.target_label_name.clone(),
2996 is_optional: self.is_optional,
2997 bound_target_column: self.bound_target_column.clone(),
2998 edge_lance_filter: self.edge_lance_filter.clone(),
2999 edge_property_conditions: self.edge_property_conditions.clone(),
3000 used_edge_columns: self.used_edge_columns.clone(),
3001 path_mode: self.path_mode.clone(),
3002 output_mode: self.output_mode.clone(),
3003 nfa: self.nfa.clone(),
3004 graph_ctx: self.graph_ctx.clone(),
3005 }
3006 }
3007}
3008
3009#[expect(
3011 dead_code,
3012 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
3013)]
3014struct GraphVariableLengthTraverseExecData {
3015 source_column: String,
3016 edge_type_ids: Vec<u32>,
3017 direction: Direction,
3018 min_hops: usize,
3019 max_hops: usize,
3020 target_variable: String,
3021 step_variable: Option<String>,
3022 path_variable: Option<String>,
3023 target_properties: Vec<String>,
3024 target_label_name: Option<String>,
3025 is_optional: bool,
3026 bound_target_column: Option<String>,
3027 #[expect(dead_code, reason = "Used in Phase 3 warming")]
3028 edge_lance_filter: Option<String>,
3029 edge_property_conditions: Vec<(String, UniValue)>,
3031 used_edge_columns: Vec<String>,
3032 path_mode: super::nfa::PathMode,
3033 output_mode: super::nfa::VlpOutputMode,
3034 nfa: Arc<PathNfa>,
3035 graph_ctx: Arc<GraphExecutionContext>,
3036}
3037
3038const MAX_FRONTIER_SIZE: usize = 500_000;
3040const MAX_PRED_POOL_SIZE: usize = 2_000_000;
3042
3043impl GraphVariableLengthTraverseExecData {
3044 fn check_target_label(&self, vid: Vid) -> bool {
3046 if let Some(ref label_name) = self.target_label_name {
3047 let query_ctx = self.graph_ctx.query_context();
3048 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3049 Some(labels) => labels.contains(label_name),
3050 None => true, }
3052 } else {
3053 true
3054 }
3055 }
3056
3057 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
3059 match constraint {
3060 super::nfa::VertexConstraint::Label(label_name) => {
3061 let query_ctx = self.graph_ctx.query_context();
3062 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3063 Some(labels) => labels.contains(label_name),
3064 None => true, }
3066 }
3067 }
3068 }
3069
3070 fn expand_neighbors(
3073 &self,
3074 vid: Vid,
3075 state: NfaStateId,
3076 eid_filter: &EidFilter,
3077 used_eids: &FxHashSet<u64>,
3078 ) -> Vec<(Vid, Eid, NfaStateId)> {
3079 let is_undirected = matches!(self.direction, Direction::Both);
3080 let mut results = Vec::new();
3081
3082 for transition in self.nfa.transitions_from(state) {
3083 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
3084
3085 for &etype in &transition.edge_type_ids {
3086 for (neighbor, eid) in
3087 self.graph_ctx
3088 .get_neighbors(vid, etype, transition.direction)
3089 {
3090 if is_undirected && !seen_edges.insert(eid.as_u64()) {
3092 continue;
3093 }
3094
3095 if !eid_filter.contains(eid) {
3103 continue;
3104 }
3105
3106 if used_eids.contains(&eid.as_u64()) {
3108 continue;
3109 }
3110
3111 if let Some(constraint) = self.nfa.state_constraint(transition.to)
3113 && !self.check_state_constraint(neighbor, constraint)
3114 {
3115 continue;
3116 }
3117
3118 results.push((neighbor, eid, transition.to));
3119 }
3120 }
3121 }
3122
3123 results
3124 }
3125
3126 fn bfs_with_dag(
3131 &self,
3132 source: Vid,
3133 eid_filter: &EidFilter,
3134 used_eids: &FxHashSet<u64>,
3135 vid_filter: &VidFilter,
3136 ) -> Vec<BfsResult> {
3137 let nfa = &self.nfa;
3138 let selector = PathSelector::All;
3139 let mut dag = PredecessorDag::new(selector);
3140 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3141
3142 if nfa.is_accepting(nfa.start_state())
3144 && self.check_target_label(source)
3145 && vid_filter.contains(source)
3146 {
3147 accepting.push((source, nfa.start_state(), 0));
3148 }
3149
3150 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3152 let mut depth: u32 = 0;
3153
3154 while !frontier.is_empty() && depth < self.max_hops as u32 {
3155 depth += 1;
3156 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3157 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3158
3159 for &(vid, state) in &frontier {
3160 for (neighbor, eid, dst_state) in
3161 self.expand_neighbors(vid, state, eid_filter, used_eids)
3162 {
3163 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3165
3166 if seen_at_depth.insert((neighbor, dst_state)) {
3168 next_frontier.push((neighbor, dst_state));
3169
3170 if nfa.is_accepting(dst_state)
3172 && self.check_target_label(neighbor)
3173 && vid_filter.contains(neighbor)
3174 {
3175 accepting.push((neighbor, dst_state, depth));
3176 }
3177 }
3178 }
3179 }
3180
3181 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3183 break;
3184 }
3185
3186 frontier = next_frontier;
3187 }
3188
3189 let mut results: Vec<BfsResult> = Vec::new();
3191 for &(target, state, depth) in &accepting {
3192 dag.enumerate_paths(
3193 source,
3194 target,
3195 state,
3196 depth,
3197 depth,
3198 &self.path_mode,
3199 &mut |nodes, edges| {
3200 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3201 std::ops::ControlFlow::Continue(())
3202 },
3203 );
3204 }
3205
3206 results
3207 }
3208
3209 fn bfs_endpoints_only(
3214 &self,
3215 source: Vid,
3216 eid_filter: &EidFilter,
3217 used_eids: &FxHashSet<u64>,
3218 vid_filter: &VidFilter,
3219 ) -> Vec<(Vid, u32)> {
3220 let nfa = &self.nfa;
3221 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
3223 let mut results: Vec<(Vid, u32)> = Vec::new();
3224
3225 if nfa.is_accepting(nfa.start_state())
3227 && self.check_target_label(source)
3228 && vid_filter.contains(source)
3229 {
3230 results.push((source, 0));
3231 }
3232
3233 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3235 let mut depth: u32 = 0;
3236
3237 while !frontier.is_empty() && depth < self.max_hops as u32 {
3238 depth += 1;
3239 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3240 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3241
3242 for &(vid, state) in &frontier {
3243 for (neighbor, eid, dst_state) in
3244 self.expand_neighbors(vid, state, eid_filter, used_eids)
3245 {
3246 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3247
3248 if seen_at_depth.insert((neighbor, dst_state)) {
3249 next_frontier.push((neighbor, dst_state));
3250
3251 if nfa.is_accepting(dst_state)
3253 && self.check_target_label(neighbor)
3254 && vid_filter.contains(neighbor)
3255 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3256 {
3257 results.push((neighbor, depth));
3258 }
3259 }
3260 }
3261 }
3262
3263 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3264 break;
3265 }
3266
3267 frontier = next_frontier;
3268 }
3269
3270 results
3271 }
3272}
3273
3274enum VarLengthStreamState {
3276 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<EidFilter>> + Send>>),
3281 Reading,
3283 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3285 Done,
3287}
3288
3289struct GraphVariableLengthTraverseStream {
3291 input: SendableRecordBatchStream,
3292 exec: Arc<GraphVariableLengthTraverseExecData>,
3293 schema: SchemaRef,
3294 state: VarLengthStreamState,
3295 edge_property_filter: EidFilter,
3299 metrics: BaselineMetrics,
3300}
3301
3302impl Stream for GraphVariableLengthTraverseStream {
3303 type Item = DFResult<RecordBatch>;
3304
3305 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3306 let metrics = self.metrics.clone();
3307 let _timer = metrics.elapsed_compute().timer();
3308 loop {
3309 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3310
3311 match state {
3312 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3313 Poll::Ready(Ok(filter)) => {
3314 self.edge_property_filter = filter;
3315 self.state = VarLengthStreamState::Reading;
3316 }
3318 Poll::Ready(Err(e)) => {
3319 self.state = VarLengthStreamState::Done;
3320 return Poll::Ready(Some(Err(e)));
3321 }
3322 Poll::Pending => {
3323 self.state = VarLengthStreamState::Warming(fut);
3324 return Poll::Pending;
3325 }
3326 },
3327 VarLengthStreamState::Reading => {
3328 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3330 return Poll::Ready(Some(Err(
3331 datafusion::error::DataFusionError::Execution(e.to_string()),
3332 )));
3333 }
3334
3335 match self.input.poll_next_unpin(cx) {
3336 Poll::Ready(Some(Ok(batch))) => {
3337 let vid_filter = VidFilter::AllAllowed;
3342 let base_result = self.process_batch_base(
3343 batch,
3344 &self.edge_property_filter,
3345 &vid_filter,
3346 );
3347 let base_batch = match base_result {
3348 Ok(b) => b,
3349 Err(e) => {
3350 self.state = VarLengthStreamState::Reading;
3351 return Poll::Ready(Some(Err(e)));
3352 }
3353 };
3354
3355 if self.exec.target_properties.is_empty() {
3357 self.state = VarLengthStreamState::Reading;
3358 return Poll::Ready(Some(Ok(base_batch)));
3359 }
3360
3361 let schema = self.schema.clone();
3363 let target_variable = self.exec.target_variable.clone();
3364 let target_properties = self.exec.target_properties.clone();
3365 let target_label_name = self.exec.target_label_name.clone();
3366 let graph_ctx = self.exec.graph_ctx.clone();
3367
3368 let fut = hydrate_vlp_target_properties(
3369 base_batch,
3370 schema,
3371 target_variable,
3372 target_properties,
3373 target_label_name,
3374 graph_ctx,
3375 );
3376
3377 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3378 }
3380 Poll::Ready(Some(Err(e))) => {
3381 self.state = VarLengthStreamState::Done;
3382 return Poll::Ready(Some(Err(e)));
3383 }
3384 Poll::Ready(None) => {
3385 self.state = VarLengthStreamState::Done;
3386 return Poll::Ready(None);
3387 }
3388 Poll::Pending => {
3389 self.state = VarLengthStreamState::Reading;
3390 return Poll::Pending;
3391 }
3392 }
3393 }
3394 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3395 Poll::Ready(Ok(batch)) => {
3396 self.state = VarLengthStreamState::Reading;
3397 self.metrics.record_output(batch.num_rows());
3398 return Poll::Ready(Some(Ok(batch)));
3399 }
3400 Poll::Ready(Err(e)) => {
3401 self.state = VarLengthStreamState::Done;
3402 return Poll::Ready(Some(Err(e)));
3403 }
3404 Poll::Pending => {
3405 self.state = VarLengthStreamState::Materializing(fut);
3406 return Poll::Pending;
3407 }
3408 },
3409 VarLengthStreamState::Done => {
3410 return Poll::Ready(None);
3411 }
3412 }
3413 }
3414 }
3415}
3416
3417impl GraphVariableLengthTraverseStream {
3418 fn process_batch_base(
3419 &self,
3420 batch: RecordBatch,
3421 eid_filter: &EidFilter,
3422 vid_filter: &VidFilter,
3423 ) -> DFResult<RecordBatch> {
3424 let source_col = batch
3425 .column_by_name(&self.exec.source_column)
3426 .ok_or_else(|| {
3427 datafusion::error::DataFusionError::Execution(format!(
3428 "Source column '{}' not found",
3429 self.exec.source_column
3430 ))
3431 })?;
3432
3433 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3434 let source_vids: &UInt64Array = &source_vid_cow;
3435
3436 let bound_target_cow = self
3438 .exec
3439 .bound_target_column
3440 .as_ref()
3441 .and_then(|col| batch.column_by_name(col))
3442 .map(|c| column_as_vid_array(c.as_ref()))
3443 .transpose()?;
3444 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3445
3446 let used_edge_arrays: Vec<&UInt64Array> = self
3448 .exec
3449 .used_edge_columns
3450 .iter()
3451 .filter_map(|col| {
3452 batch
3453 .column_by_name(col)?
3454 .as_any()
3455 .downcast_ref::<UInt64Array>()
3456 })
3457 .collect();
3458
3459 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3461
3462 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3463 let mut emitted_for_row = false;
3464
3465 if let Some(src) = source_vid {
3466 let vid = Vid::from(src);
3467
3468 let used_eids: FxHashSet<u64> = used_edge_arrays
3470 .iter()
3471 .filter_map(|arr| {
3472 if arr.is_null(row_idx) {
3473 None
3474 } else {
3475 Some(arr.value(row_idx))
3476 }
3477 })
3478 .collect();
3479
3480 match &self.exec.output_mode {
3482 VlpOutputMode::EndpointsOnly => {
3483 let endpoints = self
3484 .exec
3485 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3486 for (target, depth) in endpoints {
3487 if let Some(targets) = expected_targets {
3489 if targets.is_null(row_idx) {
3490 continue;
3491 }
3492 if target.as_u64() != targets.value(row_idx) {
3493 continue;
3494 }
3495 }
3496 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3497 emitted_for_row = true;
3498 }
3499 }
3500 _ => {
3501 let bfs_results = self
3503 .exec
3504 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3505 for (target, hop_count, node_path, edge_path) in bfs_results {
3506 if let Some(targets) = expected_targets {
3508 if targets.is_null(row_idx) {
3509 continue;
3510 }
3511 if target.as_u64() != targets.value(row_idx) {
3512 continue;
3513 }
3514 }
3515 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3516 emitted_for_row = true;
3517 }
3518 }
3519 }
3520 }
3521
3522 if self.exec.is_optional && !emitted_for_row {
3523 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3526 }
3527 }
3528
3529 self.build_output_batch(&batch, &expansions)
3530 }
3531
3532 fn build_output_batch(
3533 &self,
3534 input: &RecordBatch,
3535 expansions: &[VarLengthExpansion],
3536 ) -> DFResult<RecordBatch> {
3537 if expansions.is_empty() {
3538 return Ok(RecordBatch::new_empty(self.schema.clone()));
3539 }
3540
3541 let num_rows = expansions.len();
3542
3543 let indices: Vec<u64> = expansions
3545 .iter()
3546 .map(|(idx, _, _, _, _)| *idx as u64)
3547 .collect();
3548 let indices_array = UInt64Array::from(indices);
3549
3550 let mut columns: Vec<ArrayRef> = Vec::new();
3552 for col in input.columns() {
3553 let expanded = take(col.as_ref(), &indices_array, None)?;
3554 columns.push(expanded);
3555 }
3556
3557 let unmatched_rows: Vec<bool> = expansions
3561 .iter()
3562 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3563 .collect();
3564 let target_vids: Vec<Option<u64>> = expansions
3565 .iter()
3566 .zip(unmatched_rows.iter())
3567 .map(
3568 |((_, vid, _, _, _), unmatched)| {
3569 if *unmatched { None } else { Some(vid.as_u64()) }
3570 },
3571 )
3572 .collect();
3573
3574 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3576 if input.schema().column_with_name(&target_vid_name).is_none() {
3577 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3578 }
3579
3580 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3582 if input
3583 .schema()
3584 .column_with_name(&target_labels_name)
3585 .is_none()
3586 {
3587 use arrow_array::builder::{ListBuilder, StringBuilder};
3588 let query_ctx = self.exec.graph_ctx.query_context();
3589 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3590 for target_vid in &target_vids {
3591 let Some(vid_u64) = target_vid else {
3592 labels_builder.append(false);
3593 continue;
3594 };
3595 let vid = Vid::from(*vid_u64);
3596 let row_labels: Vec<String> =
3597 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3598 Some(labels) => {
3599 labels
3601 }
3602 None => {
3603 if let Some(ref label_name) = self.exec.target_label_name {
3605 vec![label_name.clone()]
3606 } else {
3607 vec![]
3608 }
3609 }
3610 };
3611 let values = labels_builder.values();
3612 for lbl in &row_labels {
3613 values.append_value(lbl);
3614 }
3615 labels_builder.append(true);
3616 }
3617 columns.push(Arc::new(labels_builder.finish()));
3618 }
3619
3620 for prop_name in &self.exec.target_properties {
3622 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3623 if input.schema().column_with_name(&full_prop_name).is_none() {
3624 let col_idx = columns.len();
3625 if col_idx < self.schema.fields().len() {
3626 let field = self.schema.field(col_idx);
3627 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3628 }
3629 }
3630 }
3631
3632 let hop_counts: Vec<u64> = expansions
3634 .iter()
3635 .map(|(_, _, hops, _, _)| *hops as u64)
3636 .collect();
3637 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3638
3639 if self.exec.step_variable.is_some() {
3641 let mut edges_builder = new_edge_list_builder();
3642 let query_ctx = self.exec.graph_ctx.query_context();
3643
3644 for (_, _, _, node_path, edge_path) in expansions {
3645 if node_path.is_empty() && edge_path.is_empty() {
3646 edges_builder.append_null();
3648 } else if edge_path.is_empty() {
3649 edges_builder.append(true);
3651 } else {
3652 for (i, eid) in edge_path.iter().enumerate() {
3653 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3654 .unwrap_or_else(|| "UNKNOWN".to_string());
3655 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3660 *eid,
3661 node_path[i],
3662 node_path[i + 1],
3663 &self.exec.edge_type_ids,
3664 );
3665 append_edge_to_struct(
3666 edges_builder.values(),
3667 *eid,
3668 &type_name,
3669 src,
3670 dst,
3671 &query_ctx,
3672 );
3673 }
3674 edges_builder.append(true);
3675 }
3676 }
3677
3678 columns.push(Arc::new(edges_builder.finish()));
3679 }
3680
3681 if let Some(path_var_name) = &self.exec.path_variable {
3686 let existing_path_col_idx = input
3687 .schema()
3688 .column_with_name(path_var_name)
3689 .map(|(idx, _)| idx);
3690 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3692 let existing_path = existing_path_arc
3693 .as_ref()
3694 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3695
3696 let mut nodes_builder = new_node_list_builder();
3697 let mut rels_builder = new_edge_list_builder();
3698 let query_ctx = self.exec.graph_ctx.query_context();
3699 let mut path_validity = Vec::with_capacity(expansions.len());
3700
3701 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3702 if node_path.is_empty() && edge_path.is_empty() {
3703 nodes_builder.append(false);
3704 rels_builder.append(false);
3705 path_validity.push(false);
3706 continue;
3707 }
3708
3709 let skip_first_vlp_node = if let Some(existing) = existing_path {
3711 if !existing.is_null(row_out_idx) {
3712 prepend_existing_path(
3713 existing,
3714 row_out_idx,
3715 &mut nodes_builder,
3716 &mut rels_builder,
3717 &query_ctx,
3718 );
3719 true
3720 } else {
3721 false
3722 }
3723 } else {
3724 false
3725 };
3726
3727 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3729 for vid in &node_path[start_idx..] {
3730 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3731 }
3732 nodes_builder.append(true);
3733
3734 for (i, eid) in edge_path.iter().enumerate() {
3735 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3736 .unwrap_or_else(|| "UNKNOWN".to_string());
3737 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3742 *eid,
3743 node_path[i],
3744 node_path[i + 1],
3745 &self.exec.edge_type_ids,
3746 );
3747 append_edge_to_struct(
3748 rels_builder.values(),
3749 *eid,
3750 &type_name,
3751 src,
3752 dst,
3753 &query_ctx,
3754 );
3755 }
3756 rels_builder.append(true);
3757 path_validity.push(true);
3758 }
3759
3760 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3762 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3763
3764 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3766 let rels_field = Arc::new(Field::new(
3767 "relationships",
3768 rels_array.data_type().clone(),
3769 true,
3770 ));
3771
3772 let path_struct = arrow_array::StructArray::try_new(
3774 vec![nodes_field, rels_field].into(),
3775 vec![nodes_array, rels_array],
3776 Some(arrow::buffer::NullBuffer::from(path_validity)),
3777 )
3778 .map_err(arrow_err)?;
3779
3780 if let Some(idx) = existing_path_col_idx {
3781 columns[idx] = Arc::new(path_struct);
3782 } else {
3783 columns.push(Arc::new(path_struct));
3784 }
3785 }
3786
3787 self.metrics.record_output(num_rows);
3788
3789 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3790 }
3791}
3792
3793impl RecordBatchStream for GraphVariableLengthTraverseStream {
3794 fn schema(&self) -> SchemaRef {
3795 self.schema.clone()
3796 }
3797}
3798
3799async fn hydrate_vlp_target_properties(
3804 base_batch: RecordBatch,
3805 schema: SchemaRef,
3806 target_variable: String,
3807 target_properties: Vec<String>,
3808 target_label_name: Option<String>,
3809 graph_ctx: Arc<GraphExecutionContext>,
3810) -> DFResult<RecordBatch> {
3811 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3812 return Ok(base_batch);
3813 }
3814
3815 let target_vid_col_name = format!("{}._vid", target_variable);
3822 let vid_col_idx = schema
3823 .fields()
3824 .iter()
3825 .enumerate()
3826 .rev()
3827 .find(|(_, f)| f.name() == &target_vid_col_name)
3828 .map(|(i, _)| i);
3829
3830 let Some(vid_col_idx) = vid_col_idx else {
3831 return Ok(base_batch);
3832 };
3833
3834 let vid_col = base_batch.column(vid_col_idx);
3835 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3836 let target_vid_array: &UInt64Array = &target_vid_cow;
3837
3838 let target_vids: Vec<Vid> = target_vid_array
3839 .iter()
3840 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3843 .collect();
3844
3845 let mut property_columns: Vec<ArrayRef> = Vec::new();
3847
3848 if let Some(ref label_name) = target_label_name {
3849 let property_manager = graph_ctx.property_manager();
3850 let query_ctx = graph_ctx.query_context();
3851
3852 let props_map = property_manager
3853 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3854 .await
3855 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3856
3857 let uni_schema = graph_ctx.storage().schema_manager().schema();
3858 let label_props = uni_schema.properties.get(label_name.as_str());
3859
3860 for prop_name in &target_properties {
3861 let data_type = resolve_property_type(prop_name, label_props);
3862 let column =
3863 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3864 property_columns.push(column);
3865 }
3866 } else {
3867 let non_internal_props: Vec<&str> = target_properties
3870 .iter()
3871 .filter(|p| *p != "_all_props")
3872 .map(|s| s.as_str())
3873 .collect();
3874 let property_manager = graph_ctx.property_manager();
3875 let query_ctx = graph_ctx.query_context();
3876
3877 let props_map = if !non_internal_props.is_empty() {
3878 property_manager
3879 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3880 .await
3881 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3882 } else {
3883 std::collections::HashMap::new()
3884 };
3885
3886 for prop_name in &target_properties {
3887 if prop_name == "_all_props" {
3888 use arrow_array::builder::LargeBinaryBuilder;
3891
3892 let mut builder = LargeBinaryBuilder::new();
3893 let l0_ctx = graph_ctx.l0_context();
3894 for vid in &target_vids {
3895 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3896 if let Some(vid_props) = props_map.get(vid) {
3898 for (k, v) in vid_props.iter() {
3899 merged_props.insert(k.clone(), v.clone());
3900 }
3901 }
3902 for l0 in l0_ctx.iter_l0_buffers() {
3904 let guard = l0.read();
3905 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3906 for (k, v) in l0_props.iter() {
3907 merged_props.insert(k.clone(), v.clone());
3908 }
3909 }
3910 }
3911 if merged_props.is_empty() {
3912 builder.append_null();
3913 } else {
3914 builder.append_value(uni_common::cypher_value_codec::encode(
3915 &uni_common::Value::Map(merged_props),
3916 ));
3917 }
3918 }
3919 property_columns.push(Arc::new(builder.finish()));
3920 } else {
3921 let column = build_property_column_static(
3922 &target_vids,
3923 &props_map,
3924 prop_name,
3925 &arrow::datatypes::DataType::LargeBinary,
3926 )?;
3927 property_columns.push(column);
3928 }
3929 }
3930 }
3931
3932 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3938 let mut prop_idx = 0;
3939 for (col_idx, field) in schema.fields().iter().enumerate() {
3940 let is_target_prop = col_idx > vid_col_idx
3941 && target_properties
3942 .iter()
3943 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3944 if is_target_prop && prop_idx < property_columns.len() {
3945 new_columns.push(property_columns[prop_idx].clone());
3946 prop_idx += 1;
3947 } else {
3948 new_columns.push(base_batch.column(col_idx).clone());
3949 }
3950 }
3951
3952 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3953}
3954
3955pub struct GraphVariableLengthTraverseMainExec {
3965 input: Arc<dyn ExecutionPlan>,
3967
3968 source_column: String,
3970
3971 type_names: Vec<String>,
3973
3974 direction: Direction,
3976
3977 min_hops: usize,
3979
3980 max_hops: usize,
3982
3983 target_variable: String,
3985
3986 step_variable: Option<String>,
3988
3989 path_variable: Option<String>,
3991
3992 target_properties: Vec<String>,
3994
3995 is_optional: bool,
3997
3998 bound_target_column: Option<String>,
4000
4001 edge_lance_filter: Option<String>,
4003
4004 edge_property_conditions: Vec<(String, UniValue)>,
4007
4008 used_edge_columns: Vec<String>,
4010
4011 path_mode: super::nfa::PathMode,
4013
4014 output_mode: super::nfa::VlpOutputMode,
4016
4017 graph_ctx: Arc<GraphExecutionContext>,
4019
4020 schema: SchemaRef,
4022
4023 properties: Arc<PlanProperties>,
4025
4026 metrics: ExecutionPlanMetricsSet,
4028}
4029
4030impl fmt::Debug for GraphVariableLengthTraverseMainExec {
4031 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4032 f.debug_struct("GraphVariableLengthTraverseMainExec")
4033 .field("source_column", &self.source_column)
4034 .field("type_names", &self.type_names)
4035 .field("direction", &self.direction)
4036 .field("min_hops", &self.min_hops)
4037 .field("max_hops", &self.max_hops)
4038 .field("target_variable", &self.target_variable)
4039 .finish()
4040 }
4041}
4042
4043impl GraphVariableLengthTraverseMainExec {
4044 #[expect(clippy::too_many_arguments)]
4046 pub fn new(
4047 input: Arc<dyn ExecutionPlan>,
4048 source_column: impl Into<String>,
4049 type_names: Vec<String>,
4050 direction: Direction,
4051 min_hops: usize,
4052 max_hops: usize,
4053 target_variable: impl Into<String>,
4054 step_variable: Option<String>,
4055 path_variable: Option<String>,
4056 target_properties: Vec<String>,
4057 graph_ctx: Arc<GraphExecutionContext>,
4058 is_optional: bool,
4059 bound_target_column: Option<String>,
4060 edge_lance_filter: Option<String>,
4061 edge_property_conditions: Vec<(String, UniValue)>,
4062 used_edge_columns: Vec<String>,
4063 path_mode: super::nfa::PathMode,
4064 output_mode: super::nfa::VlpOutputMode,
4065 ) -> Self {
4066 let source_column = source_column.into();
4067 let target_variable = target_variable.into();
4068
4069 let schema = Self::build_schema(
4071 input.schema(),
4072 &target_variable,
4073 step_variable.as_deref(),
4074 path_variable.as_deref(),
4075 &target_properties,
4076 );
4077 let properties = compute_plan_properties(schema.clone());
4078
4079 Self {
4080 input,
4081 source_column,
4082 type_names,
4083 direction,
4084 min_hops,
4085 max_hops,
4086 target_variable,
4087 step_variable,
4088 path_variable,
4089 target_properties,
4090 is_optional,
4091 bound_target_column,
4092 edge_lance_filter,
4093 edge_property_conditions,
4094 used_edge_columns,
4095 path_mode,
4096 output_mode,
4097 graph_ctx,
4098 schema,
4099 properties,
4100 metrics: ExecutionPlanMetricsSet::new(),
4101 }
4102 }
4103
4104 fn build_schema(
4106 input_schema: SchemaRef,
4107 target_variable: &str,
4108 step_variable: Option<&str>,
4109 path_variable: Option<&str>,
4110 target_properties: &[String],
4111 ) -> SchemaRef {
4112 let mut fields: Vec<Field> = input_schema
4113 .fields()
4114 .iter()
4115 .map(|f| f.as_ref().clone())
4116 .collect();
4117
4118 let target_vid_name = format!("{}._vid", target_variable);
4120 if input_schema.column_with_name(&target_vid_name).is_none() {
4121 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4122 }
4123
4124 let target_labels_name = format!("{}._labels", target_variable);
4126 if input_schema.column_with_name(&target_labels_name).is_none() {
4127 fields.push(Field::new(target_labels_name, labels_data_type(), true));
4128 }
4129
4130 fields.push(Field::new("_hop_count", DataType::UInt64, false));
4132
4133 if let Some(step_var) = step_variable {
4136 fields.push(build_edge_list_field(step_var));
4137 }
4138
4139 if let Some(path_var) = path_variable
4141 && input_schema.column_with_name(path_var).is_none()
4142 {
4143 fields.push(build_path_struct_field(path_var));
4144 }
4145
4146 for prop in target_properties {
4149 let prop_name = format!("{}.{}", target_variable, prop);
4150 if input_schema.column_with_name(&prop_name).is_none() {
4151 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4152 }
4153 }
4154
4155 Arc::new(Schema::new(fields))
4156 }
4157}
4158
4159impl DisplayAs for GraphVariableLengthTraverseMainExec {
4160 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4161 write!(
4162 f,
4163 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4164 self.source_column, self.type_names, self.min_hops, self.max_hops
4165 )
4166 }
4167}
4168
4169impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4170 fn name(&self) -> &str {
4171 "GraphVariableLengthTraverseMainExec"
4172 }
4173
4174 fn as_any(&self) -> &dyn Any {
4175 self
4176 }
4177
4178 fn schema(&self) -> SchemaRef {
4179 self.schema.clone()
4180 }
4181
4182 fn properties(&self) -> &Arc<PlanProperties> {
4183 &self.properties
4184 }
4185
4186 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4187 vec![&self.input]
4188 }
4189
4190 fn with_new_children(
4191 self: Arc<Self>,
4192 children: Vec<Arc<dyn ExecutionPlan>>,
4193 ) -> DFResult<Arc<dyn ExecutionPlan>> {
4194 if children.len() != 1 {
4195 return Err(datafusion::error::DataFusionError::Plan(
4196 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4197 ));
4198 }
4199
4200 Ok(Arc::new(Self::new(
4201 children[0].clone(),
4202 self.source_column.clone(),
4203 self.type_names.clone(),
4204 self.direction,
4205 self.min_hops,
4206 self.max_hops,
4207 self.target_variable.clone(),
4208 self.step_variable.clone(),
4209 self.path_variable.clone(),
4210 self.target_properties.clone(),
4211 self.graph_ctx.clone(),
4212 self.is_optional,
4213 self.bound_target_column.clone(),
4214 self.edge_lance_filter.clone(),
4215 self.edge_property_conditions.clone(),
4216 self.used_edge_columns.clone(),
4217 self.path_mode.clone(),
4218 self.output_mode.clone(),
4219 )))
4220 }
4221
4222 fn execute(
4223 &self,
4224 partition: usize,
4225 context: Arc<TaskContext>,
4226 ) -> DFResult<SendableRecordBatchStream> {
4227 let input_stream = self.input.execute(partition, context)?;
4228 let metrics = BaselineMetrics::new(&self.metrics, partition);
4229
4230 let graph_ctx = self.graph_ctx.clone();
4232 let type_names = self.type_names.clone();
4233 let direction = self.direction;
4234 let load_fut = async move {
4235 build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4237 };
4238
4239 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4240 input: input_stream,
4241 source_column: self.source_column.clone(),
4242 type_names: self.type_names.clone(),
4243 direction: self.direction,
4244 min_hops: self.min_hops,
4245 max_hops: self.max_hops,
4246 target_variable: self.target_variable.clone(),
4247 step_variable: self.step_variable.clone(),
4248 path_variable: self.path_variable.clone(),
4249 target_properties: self.target_properties.clone(),
4250 graph_ctx: self.graph_ctx.clone(),
4251 is_optional: self.is_optional,
4252 bound_target_column: self.bound_target_column.clone(),
4253 edge_lance_filter: self.edge_lance_filter.clone(),
4254 edge_property_conditions: self.edge_property_conditions.clone(),
4255 used_edge_columns: self.used_edge_columns.clone(),
4256 path_mode: self.path_mode.clone(),
4257 output_mode: self.output_mode.clone(),
4258 schema: self.schema.clone(),
4259 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4260 metrics,
4261 }))
4262 }
4263
4264 fn metrics(&self) -> Option<MetricsSet> {
4265 Some(self.metrics.clone_inner())
4266 }
4267}
4268
4269enum VarLengthMainStreamState {
4271 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4273 Processing(EdgeAdjacencyMap),
4275 Materializing {
4277 adjacency: EdgeAdjacencyMap,
4278 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4279 },
4280 Done,
4282}
4283
4284#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4286struct GraphVariableLengthTraverseMainStream {
4287 input: SendableRecordBatchStream,
4288 source_column: String,
4289 type_names: Vec<String>,
4290 direction: Direction,
4291 min_hops: usize,
4292 max_hops: usize,
4293 target_variable: String,
4294 step_variable: Option<String>,
4296 path_variable: Option<String>,
4297 target_properties: Vec<String>,
4298 graph_ctx: Arc<GraphExecutionContext>,
4299 is_optional: bool,
4300 bound_target_column: Option<String>,
4301 edge_lance_filter: Option<String>,
4302 edge_property_conditions: Vec<(String, UniValue)>,
4304 used_edge_columns: Vec<String>,
4305 path_mode: super::nfa::PathMode,
4306 output_mode: super::nfa::VlpOutputMode,
4307 schema: SchemaRef,
4308 state: VarLengthMainStreamState,
4309 metrics: BaselineMetrics,
4310}
4311
4312type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4314
4315impl GraphVariableLengthTraverseMainStream {
4316 fn bfs(
4322 &self,
4323 source: Vid,
4324 adjacency: &EdgeAdjacencyMap,
4325 used_eids: &FxHashSet<u64>,
4326 ) -> Vec<MainBfsResult> {
4327 let mut results = Vec::new();
4328 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4329
4330 queue.push_back((source, 0, vec![source], vec![]));
4331
4332 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4333 if depth >= self.min_hops && depth <= self.max_hops {
4335 results.push((current, depth, node_path.clone(), edge_path.clone()));
4336 }
4337
4338 if depth >= self.max_hops {
4340 continue;
4341 }
4342
4343 if let Some(neighbors) = adjacency.get(¤t) {
4345 let is_undirected = matches!(self.direction, Direction::Both);
4346 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4347
4348 for (neighbor, eid, _edge_type, props) in neighbors {
4349 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4351 continue;
4352 }
4353
4354 if edge_path.contains(eid) {
4356 continue;
4357 }
4358
4359 if used_eids.contains(&eid.as_u64()) {
4362 continue;
4363 }
4364
4365 if !self.edge_property_conditions.is_empty() {
4367 let passes =
4368 self.edge_property_conditions
4369 .iter()
4370 .all(|(name, expected)| {
4371 props.get(name).is_some_and(|actual| actual == expected)
4372 });
4373 if !passes {
4374 continue;
4375 }
4376 }
4377
4378 let mut new_node_path = node_path.clone();
4379 new_node_path.push(*neighbor);
4380 let mut new_edge_path = edge_path.clone();
4381 new_edge_path.push(*eid);
4382 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4383 }
4384 }
4385 }
4386
4387 results
4388 }
4389
4390 fn process_batch(
4392 &self,
4393 batch: RecordBatch,
4394 adjacency: &EdgeAdjacencyMap,
4395 ) -> DFResult<RecordBatch> {
4396 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4397 datafusion::error::DataFusionError::Execution(format!(
4398 "Source column '{}' not found in input batch",
4399 self.source_column
4400 ))
4401 })?;
4402
4403 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4404 let source_vids: &UInt64Array = &source_vid_cow;
4405
4406 let bound_target_cow = self
4408 .bound_target_column
4409 .as_ref()
4410 .and_then(|col| batch.column_by_name(col))
4411 .map(|c| column_as_vid_array(c.as_ref()))
4412 .transpose()?;
4413 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4414
4415 let used_edge_arrays: Vec<&UInt64Array> = self
4417 .used_edge_columns
4418 .iter()
4419 .filter_map(|col| {
4420 batch
4421 .column_by_name(col)?
4422 .as_any()
4423 .downcast_ref::<UInt64Array>()
4424 })
4425 .collect();
4426
4427 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4429
4430 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4431 let mut emitted_for_row = false;
4432
4433 if let Some(source_u64) = source_opt {
4434 let source = Vid::from(source_u64);
4435
4436 let used_eids: FxHashSet<u64> = used_edge_arrays
4438 .iter()
4439 .filter_map(|arr| {
4440 if arr.is_null(row_idx) {
4441 None
4442 } else {
4443 Some(arr.value(row_idx))
4444 }
4445 })
4446 .collect();
4447
4448 let bfs_results = self.bfs(source, adjacency, &used_eids);
4449
4450 for (target, hops, node_path, edge_path) in bfs_results {
4451 if let Some(targets) = expected_targets {
4454 if targets.is_null(row_idx) {
4455 continue;
4456 }
4457 let expected_vid = targets.value(row_idx);
4458 if target.as_u64() != expected_vid {
4459 continue;
4460 }
4461 }
4462
4463 expansions.push((row_idx, target, hops, node_path, edge_path));
4464 emitted_for_row = true;
4465 }
4466 }
4467
4468 if self.is_optional && !emitted_for_row {
4469 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4471 }
4472 }
4473
4474 if expansions.is_empty() {
4475 if self.is_optional {
4476 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4477 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4478 }
4479 return Ok(RecordBatch::new_empty(self.schema.clone()));
4480 }
4481
4482 let num_rows = expansions.len();
4483 self.metrics.record_output(num_rows);
4484
4485 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4487
4488 let edge_type_ids: Vec<u32> = {
4492 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4493 self.type_names
4494 .iter()
4495 .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4496 .collect()
4497 };
4498
4499 for col_idx in 0..batch.num_columns() {
4501 let array = batch.column(col_idx);
4502 let indices: Vec<u64> = expansions
4503 .iter()
4504 .map(|(idx, _, _, _, _)| *idx as u64)
4505 .collect();
4506 let take_indices = UInt64Array::from(indices);
4507 let expanded = arrow::compute::take(array, &take_indices, None)?;
4508 columns.push(expanded);
4509 }
4510
4511 let target_vid_name = format!("{}._vid", self.target_variable);
4513 if batch.schema().column_with_name(&target_vid_name).is_none() {
4514 let target_vids: Vec<Option<u64>> = expansions
4515 .iter()
4516 .map(|(_, vid, _, node_path, edge_path)| {
4517 if node_path.is_empty() && edge_path.is_empty() {
4518 None
4519 } else {
4520 Some(vid.as_u64())
4521 }
4522 })
4523 .collect();
4524 columns.push(Arc::new(UInt64Array::from(target_vids)));
4525 }
4526
4527 let target_labels_name = format!("{}._labels", self.target_variable);
4529 if batch
4530 .schema()
4531 .column_with_name(&target_labels_name)
4532 .is_none()
4533 {
4534 use arrow_array::builder::{ListBuilder, StringBuilder};
4535 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4536 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4537 if node_path.is_empty() && edge_path.is_empty() {
4538 labels_builder.append(false);
4539 continue;
4540 }
4541 let mut row_labels: Vec<String> = Vec::new();
4542 let labels =
4543 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4544 for lbl in &labels {
4545 if !row_labels.contains(lbl) {
4546 row_labels.push(lbl.clone());
4547 }
4548 }
4549 let values = labels_builder.values();
4550 for lbl in &row_labels {
4551 values.append_value(lbl);
4552 }
4553 labels_builder.append(true);
4554 }
4555 columns.push(Arc::new(labels_builder.finish()));
4556 }
4557
4558 let hop_counts: Vec<u64> = expansions
4560 .iter()
4561 .map(|(_, _, hops, _, _)| *hops as u64)
4562 .collect();
4563 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4564
4565 if self.step_variable.is_some() {
4567 let mut edges_builder = new_edge_list_builder();
4568 let query_ctx = self.graph_ctx.query_context();
4569 let type_names_str = self.type_names.join("|");
4570
4571 for (_, _, _, node_path, edge_path) in expansions.iter() {
4572 if node_path.is_empty() && edge_path.is_empty() {
4573 edges_builder.append_null();
4574 } else if edge_path.is_empty() {
4575 edges_builder.append(true);
4577 } else {
4578 for (i, eid) in edge_path.iter().enumerate() {
4579 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4584 *eid,
4585 node_path[i],
4586 node_path[i + 1],
4587 &edge_type_ids,
4588 );
4589 append_edge_to_struct(
4590 edges_builder.values(),
4591 *eid,
4592 &type_names_str,
4593 src,
4594 dst,
4595 &query_ctx,
4596 );
4597 }
4598 edges_builder.append(true);
4599 }
4600 }
4601
4602 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4603 }
4604
4605 if let Some(path_var_name) = &self.path_variable {
4609 let existing_path_col_idx = batch
4610 .schema()
4611 .column_with_name(path_var_name)
4612 .map(|(idx, _)| idx);
4613 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4614 let existing_path = existing_path_arc
4615 .as_ref()
4616 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4617
4618 let mut nodes_builder = new_node_list_builder();
4619 let mut rels_builder = new_edge_list_builder();
4620 let query_ctx = self.graph_ctx.query_context();
4621 let type_names_str = self.type_names.join("|");
4622 let mut path_validity = Vec::with_capacity(expansions.len());
4623
4624 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4625 if node_path.is_empty() && edge_path.is_empty() {
4626 nodes_builder.append(false);
4627 rels_builder.append(false);
4628 path_validity.push(false);
4629 continue;
4630 }
4631
4632 let skip_first_vlp_node = if let Some(existing) = existing_path {
4634 if !existing.is_null(row_out_idx) {
4635 prepend_existing_path(
4636 existing,
4637 row_out_idx,
4638 &mut nodes_builder,
4639 &mut rels_builder,
4640 &query_ctx,
4641 );
4642 true
4643 } else {
4644 false
4645 }
4646 } else {
4647 false
4648 };
4649
4650 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4652 for vid in &node_path[start_idx..] {
4653 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4654 }
4655 nodes_builder.append(true);
4656
4657 for (i, eid) in edge_path.iter().enumerate() {
4658 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4663 *eid,
4664 node_path[i],
4665 node_path[i + 1],
4666 &edge_type_ids,
4667 );
4668 append_edge_to_struct(
4669 rels_builder.values(),
4670 *eid,
4671 &type_names_str,
4672 src,
4673 dst,
4674 &query_ctx,
4675 );
4676 }
4677 rels_builder.append(true);
4678 path_validity.push(true);
4679 }
4680
4681 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4683 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4684
4685 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4687 let rels_field = Arc::new(Field::new(
4688 "relationships",
4689 rels_array.data_type().clone(),
4690 true,
4691 ));
4692
4693 let path_struct = arrow_array::StructArray::try_new(
4695 vec![nodes_field, rels_field].into(),
4696 vec![nodes_array, rels_array],
4697 Some(arrow::buffer::NullBuffer::from(path_validity)),
4698 )
4699 .map_err(arrow_err)?;
4700
4701 if let Some(idx) = existing_path_col_idx {
4702 columns[idx] = Arc::new(path_struct);
4703 } else {
4704 columns.push(Arc::new(path_struct));
4705 }
4706 }
4707
4708 for prop_name in &self.target_properties {
4711 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4712 if batch.schema().column_with_name(&full_prop_name).is_none() {
4713 columns.push(arrow_array::new_null_array(
4714 &DataType::LargeBinary,
4715 num_rows,
4716 ));
4717 }
4718 }
4719
4720 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4721 }
4722}
4723
4724impl Stream for GraphVariableLengthTraverseMainStream {
4725 type Item = DFResult<RecordBatch>;
4726
4727 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4728 let metrics = self.metrics.clone();
4729 let _timer = metrics.elapsed_compute().timer();
4730 loop {
4731 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4732
4733 match state {
4734 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4735 Poll::Ready(Ok(adjacency)) => {
4736 self.state = VarLengthMainStreamState::Processing(adjacency);
4737 }
4739 Poll::Ready(Err(e)) => {
4740 self.state = VarLengthMainStreamState::Done;
4741 return Poll::Ready(Some(Err(e)));
4742 }
4743 Poll::Pending => {
4744 self.state = VarLengthMainStreamState::Loading(fut);
4745 return Poll::Pending;
4746 }
4747 },
4748 VarLengthMainStreamState::Processing(adjacency) => {
4749 match self.input.poll_next_unpin(cx) {
4750 Poll::Ready(Some(Ok(batch))) => {
4751 let base_batch = match self.process_batch(batch, &adjacency) {
4752 Ok(b) => b,
4753 Err(e) => {
4754 self.state = VarLengthMainStreamState::Processing(adjacency);
4755 return Poll::Ready(Some(Err(e)));
4756 }
4757 };
4758
4759 if self.target_properties.is_empty() {
4761 self.state = VarLengthMainStreamState::Processing(adjacency);
4762 return Poll::Ready(Some(Ok(base_batch)));
4763 }
4764
4765 let schema = self.schema.clone();
4767 let target_variable = self.target_variable.clone();
4768 let target_properties = self.target_properties.clone();
4769 let graph_ctx = self.graph_ctx.clone();
4770
4771 let fut = hydrate_vlp_target_properties(
4772 base_batch,
4773 schema,
4774 target_variable,
4775 target_properties,
4776 None, graph_ctx,
4778 );
4779
4780 self.state = VarLengthMainStreamState::Materializing {
4781 adjacency,
4782 fut: Box::pin(fut),
4783 };
4784 }
4786 Poll::Ready(Some(Err(e))) => {
4787 self.state = VarLengthMainStreamState::Done;
4788 return Poll::Ready(Some(Err(e)));
4789 }
4790 Poll::Ready(None) => {
4791 self.state = VarLengthMainStreamState::Done;
4792 return Poll::Ready(None);
4793 }
4794 Poll::Pending => {
4795 self.state = VarLengthMainStreamState::Processing(adjacency);
4796 return Poll::Pending;
4797 }
4798 }
4799 }
4800 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4801 match fut.as_mut().poll(cx) {
4802 Poll::Ready(Ok(batch)) => {
4803 self.state = VarLengthMainStreamState::Processing(adjacency);
4804 return Poll::Ready(Some(Ok(batch)));
4805 }
4806 Poll::Ready(Err(e)) => {
4807 self.state = VarLengthMainStreamState::Done;
4808 return Poll::Ready(Some(Err(e)));
4809 }
4810 Poll::Pending => {
4811 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4812 return Poll::Pending;
4813 }
4814 }
4815 }
4816 VarLengthMainStreamState::Done => {
4817 return Poll::Ready(None);
4818 }
4819 }
4820 }
4821 }
4822}
4823
4824impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4825 fn schema(&self) -> SchemaRef {
4826 self.schema.clone()
4827 }
4828}
4829
4830#[cfg(test)]
4831mod tests {
4832 use super::*;
4833
4834 #[test]
4835 fn test_traverse_schema_without_edge() {
4836 let input_schema = Arc::new(Schema::new(vec![Field::new(
4837 "a._vid",
4838 DataType::UInt64,
4839 false,
4840 )]));
4841
4842 let output_schema =
4843 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4844
4845 assert_eq!(output_schema.fields().len(), 4);
4847 assert_eq!(output_schema.field(0).name(), "a._vid");
4848 assert_eq!(output_schema.field(1).name(), "m._vid");
4849 assert_eq!(output_schema.field(2).name(), "m._labels");
4850 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4851 }
4852
4853 #[test]
4854 fn test_traverse_schema_with_edge() {
4855 let input_schema = Arc::new(Schema::new(vec![Field::new(
4856 "a._vid",
4857 DataType::UInt64,
4858 false,
4859 )]));
4860
4861 let output_schema = GraphTraverseExec::build_schema(
4862 input_schema,
4863 "m",
4864 Some("r"),
4865 &[],
4866 &[],
4867 None,
4868 None,
4869 false,
4870 );
4871
4872 assert_eq!(output_schema.fields().len(), 5);
4874 assert_eq!(output_schema.field(0).name(), "a._vid");
4875 assert_eq!(output_schema.field(1).name(), "m._vid");
4876 assert_eq!(output_schema.field(2).name(), "m._labels");
4877 assert_eq!(output_schema.field(3).name(), "r._eid");
4878 assert_eq!(output_schema.field(4).name(), "r._type");
4879 }
4880
4881 #[test]
4882 fn test_traverse_schema_with_target_properties() {
4883 let input_schema = Arc::new(Schema::new(vec![Field::new(
4884 "a._vid",
4885 DataType::UInt64,
4886 false,
4887 )]));
4888
4889 let target_props = vec!["name".to_string(), "age".to_string()];
4890 let output_schema = GraphTraverseExec::build_schema(
4891 input_schema,
4892 "m",
4893 Some("r"),
4894 &[],
4895 &target_props,
4896 None,
4897 None,
4898 false,
4899 );
4900
4901 assert_eq!(output_schema.fields().len(), 7);
4903 assert_eq!(output_schema.field(0).name(), "a._vid");
4904 assert_eq!(output_schema.field(1).name(), "m._vid");
4905 assert_eq!(output_schema.field(2).name(), "m._labels");
4906 assert_eq!(output_schema.field(3).name(), "m.name");
4907 assert_eq!(output_schema.field(4).name(), "m.age");
4908 assert_eq!(output_schema.field(5).name(), "r._eid");
4909 assert_eq!(output_schema.field(6).name(), "r._type");
4910 }
4911
4912 #[test]
4913 fn test_variable_length_schema() {
4914 let input_schema = Arc::new(Schema::new(vec![Field::new(
4915 "a._vid",
4916 DataType::UInt64,
4917 false,
4918 )]));
4919
4920 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4921 input_schema,
4922 "b",
4923 None,
4924 Some("p"),
4925 &[],
4926 None,
4927 );
4928
4929 assert_eq!(output_schema.fields().len(), 5);
4930 assert_eq!(output_schema.field(0).name(), "a._vid");
4931 assert_eq!(output_schema.field(1).name(), "b._vid");
4932 assert_eq!(output_schema.field(2).name(), "b._labels");
4933 assert_eq!(output_schema.field(3).name(), "_hop_count");
4934 assert_eq!(output_schema.field(4).name(), "p");
4935 }
4936
4937 #[test]
4938 fn test_traverse_main_schema_without_edge() {
4939 let input_schema = Arc::new(Schema::new(vec![Field::new(
4940 "a._vid",
4941 DataType::UInt64,
4942 false,
4943 )]));
4944
4945 let output_schema =
4946 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4947
4948 assert_eq!(output_schema.fields().len(), 4);
4950 assert_eq!(output_schema.field(0).name(), "a._vid");
4951 assert_eq!(output_schema.field(1).name(), "m._vid");
4952 assert_eq!(output_schema.field(2).name(), "m._labels");
4953 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4954 }
4955
4956 #[test]
4957 fn test_traverse_main_schema_with_edge() {
4958 let input_schema = Arc::new(Schema::new(vec![Field::new(
4959 "a._vid",
4960 DataType::UInt64,
4961 false,
4962 )]));
4963
4964 let output_schema = GraphTraverseMainExec::build_schema(
4965 &input_schema,
4966 "m",
4967 &Some("r".to_string()),
4968 &[],
4969 &[],
4970 false,
4971 );
4972
4973 assert_eq!(output_schema.fields().len(), 5);
4975 assert_eq!(output_schema.field(0).name(), "a._vid");
4976 assert_eq!(output_schema.field(1).name(), "m._vid");
4977 assert_eq!(output_schema.field(2).name(), "m._labels");
4978 assert_eq!(output_schema.field(3).name(), "r._eid");
4979 assert_eq!(output_schema.field(4).name(), "r._type");
4980 }
4981
4982 #[test]
4983 fn test_traverse_main_schema_with_edge_properties() {
4984 let input_schema = Arc::new(Schema::new(vec![Field::new(
4985 "a._vid",
4986 DataType::UInt64,
4987 false,
4988 )]));
4989
4990 let edge_props = vec!["weight".to_string(), "since".to_string()];
4991 let output_schema = GraphTraverseMainExec::build_schema(
4992 &input_schema,
4993 "m",
4994 &Some("r".to_string()),
4995 &edge_props,
4996 &[],
4997 false,
4998 );
4999
5000 assert_eq!(output_schema.fields().len(), 7);
5002 assert_eq!(output_schema.field(0).name(), "a._vid");
5003 assert_eq!(output_schema.field(1).name(), "m._vid");
5004 assert_eq!(output_schema.field(2).name(), "m._labels");
5005 assert_eq!(output_schema.field(3).name(), "r._eid");
5006 assert_eq!(output_schema.field(4).name(), "r._type");
5007 assert_eq!(output_schema.field(5).name(), "r.weight");
5008 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5009 assert_eq!(output_schema.field(6).name(), "r.since");
5010 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5011 }
5012
5013 #[test]
5014 fn test_traverse_main_schema_with_target_properties() {
5015 let input_schema = Arc::new(Schema::new(vec![Field::new(
5016 "a._vid",
5017 DataType::UInt64,
5018 false,
5019 )]));
5020
5021 let target_props = vec!["name".to_string(), "age".to_string()];
5022 let output_schema = GraphTraverseMainExec::build_schema(
5023 &input_schema,
5024 "m",
5025 &Some("r".to_string()),
5026 &[],
5027 &target_props,
5028 false,
5029 );
5030
5031 assert_eq!(output_schema.fields().len(), 7);
5033 assert_eq!(output_schema.field(0).name(), "a._vid");
5034 assert_eq!(output_schema.field(1).name(), "m._vid");
5035 assert_eq!(output_schema.field(2).name(), "m._labels");
5036 assert_eq!(output_schema.field(3).name(), "r._eid");
5037 assert_eq!(output_schema.field(4).name(), "r._type");
5038 assert_eq!(output_schema.field(5).name(), "m.name");
5039 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
5040 assert_eq!(output_schema.field(6).name(), "m.age");
5041 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
5042 }
5043}