1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68fn prepend_existing_path(
75 existing_path: &arrow_array::StructArray,
76 row_idx: usize,
77 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79 query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81 let nodes_list = existing_path
83 .column(0)
84 .as_any()
85 .downcast_ref::<arrow_array::ListArray>()
86 .unwrap();
87 let node_values = nodes_list.value(row_idx);
88 let node_struct = node_values
89 .as_any()
90 .downcast_ref::<arrow_array::StructArray>()
91 .unwrap();
92 let vid_col = node_struct
93 .column(0)
94 .as_any()
95 .downcast_ref::<UInt64Array>()
96 .unwrap();
97 for i in 0..vid_col.len() {
98 append_node_to_struct(
99 nodes_builder.values(),
100 Vid::from(vid_col.value(i)),
101 query_ctx,
102 );
103 }
104
105 let rels_list = existing_path
107 .column(1)
108 .as_any()
109 .downcast_ref::<arrow_array::ListArray>()
110 .unwrap();
111 let edge_values = rels_list.value(row_idx);
112 let edge_struct = edge_values
113 .as_any()
114 .downcast_ref::<arrow_array::StructArray>()
115 .unwrap();
116 let eid_col = edge_struct
117 .column(0)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap();
121 let type_col = edge_struct
122 .column(1)
123 .as_any()
124 .downcast_ref::<arrow_array::StringArray>()
125 .unwrap();
126 let src_col = edge_struct
127 .column(2)
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .unwrap();
131 let dst_col = edge_struct
132 .column(3)
133 .as_any()
134 .downcast_ref::<UInt64Array>()
135 .unwrap();
136 for i in 0..eid_col.len() {
137 append_edge_to_struct(
138 rels_builder.values(),
139 Eid::from(eid_col.value(i)),
140 type_col.value(i),
141 src_col.value(i),
142 dst_col.value(i),
143 query_ctx,
144 );
145 }
146}
147
148fn resolve_edge_property_type(
153 prop: &str,
154 schema_props: Option<
155 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156 >,
157) -> DataType {
158 if prop == "overflow_json" {
159 DataType::LargeBinary
160 } else if prop == "_created_at" || prop == "_updated_at" {
161 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
165 } else {
166 schema_props
167 .and_then(|props| props.get(prop))
168 .map(|meta| meta.r#type.to_arrow())
169 .unwrap_or(DataType::LargeBinary)
170 }
171}
172
173use crate::query::df_graph::common::merged_edge_schema_props;
174
175type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
177
178pub struct GraphTraverseExec {
203 input: Arc<dyn ExecutionPlan>,
205
206 source_column: String,
208
209 edge_type_ids: Vec<u32>,
211
212 direction: Direction,
214
215 target_variable: String,
217
218 edge_variable: Option<String>,
220
221 edge_properties: Vec<String>,
223
224 target_properties: Vec<String>,
226
227 target_label_name: Option<String>,
229
230 target_label_id: Option<u16>,
232
233 graph_ctx: Arc<GraphExecutionContext>,
235
236 optional: bool,
238
239 optional_pattern_vars: HashSet<String>,
242
243 bound_target_column: Option<String>,
246
247 used_edge_columns: Vec<String>,
250
251 schema: SchemaRef,
253
254 properties: Arc<PlanProperties>,
256
257 metrics: ExecutionPlanMetricsSet,
259}
260
261impl fmt::Debug for GraphTraverseExec {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("GraphTraverseExec")
264 .field("source_column", &self.source_column)
265 .field("edge_type_ids", &self.edge_type_ids)
266 .field("direction", &self.direction)
267 .field("target_variable", &self.target_variable)
268 .field("edge_variable", &self.edge_variable)
269 .finish()
270 }
271}
272
273impl GraphTraverseExec {
274 #[expect(clippy::too_many_arguments)]
290 pub fn new(
291 input: Arc<dyn ExecutionPlan>,
292 source_column: impl Into<String>,
293 edge_type_ids: Vec<u32>,
294 direction: Direction,
295 target_variable: impl Into<String>,
296 edge_variable: Option<String>,
297 edge_properties: Vec<String>,
298 target_properties: Vec<String>,
299 target_label_name: Option<String>,
300 target_label_id: Option<u16>,
301 graph_ctx: Arc<GraphExecutionContext>,
302 optional: bool,
303 optional_pattern_vars: HashSet<String>,
304 bound_target_column: Option<String>,
305 used_edge_columns: Vec<String>,
306 ) -> Self {
307 let source_column = source_column.into();
308 let target_variable = target_variable.into();
309
310 let uni_schema = graph_ctx.storage().schema_manager().schema();
312 let label_props = target_label_name
313 .as_deref()
314 .and_then(|ln| uni_schema.properties.get(ln));
315 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
316 let edge_props = if merged_edge_props.is_empty() {
317 None
318 } else {
319 Some(&merged_edge_props)
320 };
321
322 let schema = Self::build_schema(
324 input.schema(),
325 &target_variable,
326 edge_variable.as_deref(),
327 &edge_properties,
328 &target_properties,
329 label_props,
330 edge_props,
331 optional,
332 );
333
334 let properties = compute_plan_properties(schema.clone());
335
336 Self {
337 input,
338 source_column,
339 edge_type_ids,
340 direction,
341 target_variable,
342 edge_variable,
343 edge_properties,
344 target_properties,
345 target_label_name,
346 target_label_id,
347 graph_ctx,
348 optional,
349 optional_pattern_vars,
350 bound_target_column,
351 used_edge_columns,
352 schema,
353 properties,
354 metrics: ExecutionPlanMetricsSet::new(),
355 }
356 }
357
358 #[expect(
360 clippy::too_many_arguments,
361 reason = "Schema construction needs all field metadata"
362 )]
363 fn build_schema(
364 input_schema: SchemaRef,
365 target_variable: &str,
366 edge_variable: Option<&str>,
367 edge_properties: &[String],
368 target_properties: &[String],
369 label_props: Option<
370 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
371 >,
372 edge_props: Option<
373 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
374 >,
375 optional: bool,
376 ) -> SchemaRef {
377 let mut fields: Vec<Field> = input_schema
378 .fields()
379 .iter()
380 .map(|f| f.as_ref().clone())
381 .collect();
382
383 let target_vid_name = format!("{}._vid", target_variable);
385 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
386
387 fields.push(Field::new(
389 format!("{}._labels", target_variable),
390 labels_data_type(),
391 true,
392 ));
393
394 for prop_name in target_properties {
396 let col_name = format!("{}.{}", target_variable, prop_name);
397 let arrow_type = resolve_property_type(prop_name, label_props);
398 fields.push(Field::new(&col_name, arrow_type, true));
399 }
400
401 if let Some(edge_var) = edge_variable {
403 let edge_id_name = format!("{}._eid", edge_var);
404 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
405
406 fields.push(Field::new(
408 format!("{}._type", edge_var),
409 DataType::Utf8,
410 true,
411 ));
412
413 for prop_name in edge_properties {
415 let prop_col_name = format!("{}.{}", edge_var, prop_name);
416 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
417 fields.push(Field::new(&prop_col_name, arrow_type, true));
418 }
419 } else {
420 let internal_eid_name = format!("__eid_to_{}", target_variable);
423 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
424 }
425
426 Arc::new(Schema::new(fields))
427 }
428}
429
430impl DisplayAs for GraphTraverseExec {
431 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432 write!(
433 f,
434 "GraphTraverseExec: {} --[{:?}]--> {}",
435 self.source_column, self.edge_type_ids, self.target_variable
436 )?;
437 if let Some(ref edge_var) = self.edge_variable {
438 write!(f, " as {}", edge_var)?;
439 }
440 Ok(())
441 }
442}
443
444impl ExecutionPlan for GraphTraverseExec {
445 fn name(&self) -> &str {
446 "GraphTraverseExec"
447 }
448
449 fn as_any(&self) -> &dyn Any {
450 self
451 }
452
453 fn schema(&self) -> SchemaRef {
454 self.schema.clone()
455 }
456
457 fn properties(&self) -> &Arc<PlanProperties> {
458 &self.properties
459 }
460
461 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
462 vec![&self.input]
463 }
464
465 fn with_new_children(
466 self: Arc<Self>,
467 children: Vec<Arc<dyn ExecutionPlan>>,
468 ) -> DFResult<Arc<dyn ExecutionPlan>> {
469 if children.len() != 1 {
470 return Err(datafusion::error::DataFusionError::Plan(
471 "GraphTraverseExec requires exactly one child".to_string(),
472 ));
473 }
474
475 Ok(Arc::new(Self::new(
476 children[0].clone(),
477 self.source_column.clone(),
478 self.edge_type_ids.clone(),
479 self.direction,
480 self.target_variable.clone(),
481 self.edge_variable.clone(),
482 self.edge_properties.clone(),
483 self.target_properties.clone(),
484 self.target_label_name.clone(),
485 self.target_label_id,
486 self.graph_ctx.clone(),
487 self.optional,
488 self.optional_pattern_vars.clone(),
489 self.bound_target_column.clone(),
490 self.used_edge_columns.clone(),
491 )))
492 }
493
494 fn execute(
495 &self,
496 partition: usize,
497 context: Arc<TaskContext>,
498 ) -> DFResult<SendableRecordBatchStream> {
499 let input_stream = self.input.execute(partition, context)?;
500
501 let metrics = BaselineMetrics::new(&self.metrics, partition);
502
503 let warm_fut = self
504 .graph_ctx
505 .warming_future(self.edge_type_ids.clone(), self.direction);
506
507 Ok(Box::pin(GraphTraverseStream {
508 input: input_stream,
509 source_column: self.source_column.clone(),
510 edge_type_ids: self.edge_type_ids.clone(),
511 direction: self.direction,
512 target_variable: self.target_variable.clone(),
513 edge_variable: self.edge_variable.clone(),
514 edge_properties: self.edge_properties.clone(),
515 target_properties: self.target_properties.clone(),
516 target_label_name: self.target_label_name.clone(),
517 graph_ctx: self.graph_ctx.clone(),
518 optional: self.optional,
519 optional_pattern_vars: self.optional_pattern_vars.clone(),
520 bound_target_column: self.bound_target_column.clone(),
521 used_edge_columns: self.used_edge_columns.clone(),
522 schema: self.schema.clone(),
523 state: TraverseStreamState::Warming(warm_fut),
524 metrics,
525 }))
526 }
527
528 fn metrics(&self) -> Option<MetricsSet> {
529 Some(self.metrics.clone_inner())
530 }
531}
532
533enum TraverseStreamState {
535 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
537 Reading,
539 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
541 Done,
543}
544
545struct GraphTraverseStream {
547 input: SendableRecordBatchStream,
549
550 source_column: String,
552
553 edge_type_ids: Vec<u32>,
555
556 direction: Direction,
558
559 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
561 target_variable: String,
562
563 edge_variable: Option<String>,
565
566 edge_properties: Vec<String>,
568
569 target_properties: Vec<String>,
571
572 target_label_name: Option<String>,
574
575 graph_ctx: Arc<GraphExecutionContext>,
577
578 optional: bool,
580
581 optional_pattern_vars: HashSet<String>,
583
584 bound_target_column: Option<String>,
586
587 used_edge_columns: Vec<String>,
589
590 schema: SchemaRef,
592
593 state: TraverseStreamState,
595
596 metrics: BaselineMetrics,
598}
599
600impl GraphTraverseStream {
601 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
604 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
605 datafusion::error::DataFusionError::Execution(format!(
606 "Source column '{}' not found",
607 self.source_column
608 ))
609 })?;
610
611 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
612 let source_vids: &UInt64Array = &source_vid_cow;
613
614 let bound_target_cow = self
617 .bound_target_column
618 .as_ref()
619 .and_then(|col| batch.column_by_name(col))
620 .map(|c| column_as_vid_array(c.as_ref()))
621 .transpose()?;
622 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
623
624 let used_edge_arrays: Vec<&UInt64Array> = self
626 .used_edge_columns
627 .iter()
628 .filter_map(|col| {
629 batch
630 .column_by_name(col)
631 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
632 })
633 .collect();
634
635 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
636 let is_undirected = matches!(self.direction, Direction::Both);
637
638 for (row_idx, source_vid) in source_vids.iter().enumerate() {
639 let Some(src) = source_vid else {
640 continue;
641 };
642
643 let expected_target = bound_target_vids.map(|arr| {
649 if arr.is_null(row_idx) {
650 None
651 } else {
652 Some(arr.value(row_idx))
653 }
654 });
655
656 let used_eids: HashSet<u64> = used_edge_arrays
658 .iter()
659 .filter_map(|arr| {
660 if arr.is_null(row_idx) {
661 None
662 } else {
663 Some(arr.value(row_idx))
664 }
665 })
666 .collect();
667
668 let vid = Vid::from(src);
669 let mut seen_edges: HashSet<u64> = HashSet::new();
672
673 for &edge_type in &self.edge_type_ids {
674 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
675
676 for (target_vid, eid) in neighbors {
677 let eid_u64 = eid.as_u64();
678
679 if used_eids.contains(&eid_u64) {
681 continue;
682 }
683
684 if is_undirected && !seen_edges.insert(eid_u64) {
686 continue;
687 }
688
689 if let Some(expected_opt) = expected_target {
692 let Some(expected) = expected_opt else {
693 continue;
694 };
695 if target_vid.as_u64() != expected {
696 continue;
697 }
698 }
699
700 if let Some(ref label_name) = self.target_label_name {
703 let query_ctx = self.graph_ctx.query_context();
704 if let Some(vertex_labels) =
705 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
706 {
707 if !vertex_labels.contains(label_name) {
709 continue;
710 }
711 }
712 }
714
715 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
716 }
717 }
718 }
719
720 Ok(expanded_rows)
721 }
722}
723
724fn build_target_labels_column(
726 target_vids: &[Vid],
727 target_label_name: &Option<String>,
728 graph_ctx: &GraphExecutionContext,
729) -> ArrayRef {
730 use arrow_array::builder::{ListBuilder, StringBuilder};
731 let mut labels_builder = ListBuilder::new(StringBuilder::new());
732 let query_ctx = graph_ctx.query_context();
733 for vid in target_vids {
734 let row_labels: Vec<String> =
735 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
736 Some(labels) => labels,
737 None => {
738 if let Some(label_name) = target_label_name {
740 vec![label_name.clone()]
741 } else {
742 vec![]
743 }
744 }
745 };
746 let values = labels_builder.values();
747 for lbl in &row_labels {
748 values.append_value(lbl);
749 }
750 labels_builder.append(true);
751 }
752 Arc::new(labels_builder.finish())
753}
754
755async fn build_target_property_columns(
757 target_vids: &[Vid],
758 target_properties: &[String],
759 target_label_name: &Option<String>,
760 graph_ctx: &Arc<GraphExecutionContext>,
761) -> DFResult<Vec<ArrayRef>> {
762 let mut columns = Vec::new();
763
764 if let Some(label_name) = target_label_name {
765 let property_manager = graph_ctx.property_manager();
766 let query_ctx = graph_ctx.query_context();
767
768 let props_map = property_manager
769 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
770 .await
771 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
772
773 let uni_schema = graph_ctx.storage().schema_manager().schema();
774 let label_props = uni_schema.properties.get(label_name.as_str());
775
776 for prop_name in target_properties {
777 let data_type = resolve_property_type(prop_name, label_props);
778 let column =
779 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
780 columns.push(column);
781 }
782 } else {
783 let non_internal_props: Vec<&str> = target_properties
785 .iter()
786 .filter(|p| *p != "_all_props")
787 .map(|s| s.as_str())
788 .collect();
789 let property_manager = graph_ctx.property_manager();
790 let query_ctx = graph_ctx.query_context();
791
792 let props_map = if !non_internal_props.is_empty() {
793 property_manager
794 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
795 .await
796 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
797 } else {
798 std::collections::HashMap::new()
799 };
800
801 for prop_name in target_properties {
802 if prop_name == "_all_props" {
803 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
804 } else {
805 let column = build_property_column_static(
806 target_vids,
807 &props_map,
808 prop_name,
809 &arrow::datatypes::DataType::LargeBinary,
810 )?;
811 columns.push(column);
812 }
813 }
814 }
815
816 Ok(columns)
817}
818
819fn build_all_props_column(
821 target_vids: &[Vid],
822 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
823 graph_ctx: &Arc<GraphExecutionContext>,
824) -> ArrayRef {
825 use arrow_array::builder::LargeBinaryBuilder;
826
827 let mut builder = LargeBinaryBuilder::new();
828 let l0_ctx = graph_ctx.l0_context();
829 for vid in target_vids {
830 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
832 if let Some(vid_props) = props_map.get(vid) {
833 for (k, v) in vid_props.iter() {
834 merged_props.insert(k.clone(), v.clone());
835 }
836 }
837 for l0 in l0_ctx.iter_l0_buffers() {
838 let guard = l0.read();
839 if let Some(l0_props) = guard.vertex_properties.get(vid) {
840 for (k, v) in l0_props.iter() {
841 merged_props.insert(k.clone(), v.clone());
842 }
843 }
844 }
845 if merged_props.is_empty() {
846 builder.append_null();
847 } else {
848 builder.append_value(uni_common::cypher_value_codec::encode(
849 &uni_common::Value::Map(merged_props),
850 ));
851 }
852 }
853 Arc::new(builder.finish())
854}
855
856async fn build_edge_columns(
858 expansions: &[(usize, Vid, u64, u32)],
859 edge_properties: &[String],
860 edge_type_ids: &[u32],
861 graph_ctx: &Arc<GraphExecutionContext>,
862) -> DFResult<Vec<ArrayRef>> {
863 let mut columns = Vec::new();
864
865 let eids: Vec<Eid> = expansions
866 .iter()
867 .map(|(_, _, eid, _)| Eid::from(*eid))
868 .collect();
869 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
870 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
871
872 {
874 let uni_schema = graph_ctx.storage().schema_manager().schema();
875 let mut type_builder = arrow_array::builder::StringBuilder::new();
876 for (_, _, _, edge_type_id) in expansions {
877 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
878 type_builder.append_value(&name);
879 } else {
880 type_builder.append_null();
881 }
882 }
883 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
884 }
885
886 if !edge_properties.is_empty() {
887 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
888 let property_manager = graph_ctx.property_manager();
889 let query_ctx = graph_ctx.query_context();
890
891 let props_map = property_manager
892 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
893 .await
894 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
895
896 let uni_schema = graph_ctx.storage().schema_manager().schema();
897 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
898 let edge_type_props = if merged_edge_props.is_empty() {
899 None
900 } else {
901 Some(&merged_edge_props)
902 };
903
904 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
905
906 for prop_name in edge_properties {
907 if prop_name == "_created_at" || prop_name == "_updated_at" {
913 let mut builder =
914 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
915 let l0_ctx = graph_ctx.l0_context();
916 for eid in &eids {
917 let mut value: Option<i64> = None;
918 for l0 in l0_ctx.iter_l0_buffers() {
919 let guard = l0.read();
920 let opt = if prop_name == "_created_at" {
921 guard.edge_created_at.get(eid).copied()
922 } else {
923 guard.edge_updated_at.get(eid).copied()
924 };
925 if let Some(ts) = opt {
926 value = Some(match value {
927 None => ts,
928 Some(cur) if prop_name == "_created_at" => cur.min(ts),
929 Some(cur) => cur.max(ts),
930 });
931 }
932 }
933 match value {
934 Some(ts) => builder.append_value(ts),
935 None => builder.append_null(),
936 }
937 }
938 columns.push(Arc::new(builder.finish()) as ArrayRef);
939 continue;
940 }
941 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
942 let column =
943 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
944 columns.push(column);
945 }
946 }
947
948 Ok(columns)
949}
950
951#[expect(
956 clippy::too_many_arguments,
957 reason = "Standalone async fn needs all context passed explicitly"
958)]
959async fn build_traverse_output_batch(
960 input: RecordBatch,
961 expansions: Vec<(usize, Vid, u64, u32)>,
962 schema: SchemaRef,
963 edge_variable: Option<String>,
964 edge_properties: Vec<String>,
965 edge_type_ids: Vec<u32>,
966 target_properties: Vec<String>,
967 target_label_name: Option<String>,
968 graph_ctx: Arc<GraphExecutionContext>,
969 optional: bool,
970 optional_pattern_vars: HashSet<String>,
971) -> DFResult<RecordBatch> {
972 if expansions.is_empty() {
973 if !optional {
974 return Ok(RecordBatch::new_empty(schema));
975 }
976 let unmatched_reps = collect_unmatched_optional_group_rows(
977 &input,
978 &HashSet::new(),
979 &schema,
980 &optional_pattern_vars,
981 )?;
982 if unmatched_reps.is_empty() {
983 return Ok(RecordBatch::new_empty(schema));
984 }
985 return build_optional_null_batch_for_rows_with_optional_vars(
986 &input,
987 &unmatched_reps,
988 &schema,
989 &optional_pattern_vars,
990 );
991 }
992
993 let indices: Vec<u64> = expansions
995 .iter()
996 .map(|(idx, _, _, _)| *idx as u64)
997 .collect();
998 let indices_array = UInt64Array::from(indices);
999 let mut columns: Vec<ArrayRef> = input
1000 .columns()
1001 .iter()
1002 .map(|col| take(col.as_ref(), &indices_array, None))
1003 .collect::<Result<_, _>>()?;
1004
1005 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1007 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1008 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1009
1010 columns.push(build_target_labels_column(
1012 &target_vids,
1013 &target_label_name,
1014 &graph_ctx,
1015 ));
1016
1017 if !target_properties.is_empty() {
1019 let prop_cols = build_target_property_columns(
1020 &target_vids,
1021 &target_properties,
1022 &target_label_name,
1023 &graph_ctx,
1024 )
1025 .await?;
1026 columns.extend(prop_cols);
1027 }
1028
1029 if edge_variable.is_some() {
1031 let edge_cols =
1032 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1033 columns.extend(edge_cols);
1034 } else {
1035 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1036 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1037 }
1038
1039 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1040
1041 if optional {
1043 let matched_indices: HashSet<usize> =
1044 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1045 let unmatched = collect_unmatched_optional_group_rows(
1046 &input,
1047 &matched_indices,
1048 &schema,
1049 &optional_pattern_vars,
1050 )?;
1051
1052 if !unmatched.is_empty() {
1053 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1054 &input,
1055 &unmatched,
1056 &schema,
1057 &optional_pattern_vars,
1058 )?;
1059 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1060 .map_err(arrow_err)?;
1061 return Ok(combined);
1062 }
1063 }
1064
1065 Ok(expanded_batch)
1066}
1067
1068fn build_optional_null_batch_for_rows(
1071 input: &RecordBatch,
1072 unmatched_indices: &[usize],
1073 schema: &SchemaRef,
1074) -> DFResult<RecordBatch> {
1075 let num_rows = unmatched_indices.len();
1076 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1077 let indices_array = UInt64Array::from(indices);
1078
1079 let mut columns: Vec<ArrayRef> = Vec::new();
1081 for col in input.columns() {
1082 let taken = take(col.as_ref(), &indices_array, None)?;
1083 columns.push(taken);
1084 }
1085 for field in schema.fields().iter().skip(input.num_columns()) {
1087 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1088 }
1089 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1090}
1091
1092fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1093 optional_vars.contains(col_name)
1094 || optional_vars.iter().any(|var| {
1095 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1096 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1097 })
1098}
1099
1100fn collect_unmatched_optional_group_rows(
1101 input: &RecordBatch,
1102 matched_indices: &HashSet<usize>,
1103 schema: &SchemaRef,
1104 optional_vars: &HashSet<String>,
1105) -> DFResult<Vec<usize>> {
1106 if input.num_rows() == 0 {
1107 return Ok(Vec::new());
1108 }
1109
1110 if optional_vars.is_empty() {
1111 return Ok((0..input.num_rows())
1112 .filter(|idx| !matched_indices.contains(idx))
1113 .collect());
1114 }
1115
1116 let source_vid_indices: Vec<usize> = schema
1117 .fields()
1118 .iter()
1119 .enumerate()
1120 .filter_map(|(idx, field)| {
1121 if idx >= input.num_columns() {
1122 return None;
1123 }
1124 let name = field.name();
1125 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1126 Some(idx)
1127 } else {
1128 None
1129 }
1130 })
1131 .collect();
1132
1133 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1136
1137 for row_idx in 0..input.num_rows() {
1138 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1139 let entry = groups.entry(key.clone());
1140 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1141 group_order.push(key.clone());
1142 }
1143 let matched = matched_indices.contains(&row_idx);
1144 entry
1145 .and_modify(|(_, any_matched)| *any_matched |= matched)
1146 .or_insert((row_idx, matched));
1147 }
1148
1149 Ok(group_order
1150 .into_iter()
1151 .filter_map(|key| {
1152 groups
1153 .get(&key)
1154 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1155 })
1156 .collect())
1157}
1158
1159fn compute_optional_group_key(
1160 batch: &RecordBatch,
1161 row_idx: usize,
1162 source_vid_indices: &[usize],
1163) -> DFResult<Vec<u8>> {
1164 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1165 for &col_idx in source_vid_indices {
1166 let col = batch.column(col_idx);
1167 let vid_cow = column_as_vid_array(col.as_ref())?;
1168 let arr: &UInt64Array = &vid_cow;
1169 if arr.is_null(row_idx) {
1170 key.extend_from_slice(&u64::MAX.to_le_bytes());
1171 } else {
1172 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1173 }
1174 }
1175 Ok(key)
1176}
1177
1178fn build_optional_null_batch_for_rows_with_optional_vars(
1179 input: &RecordBatch,
1180 unmatched_indices: &[usize],
1181 schema: &SchemaRef,
1182 optional_vars: &HashSet<String>,
1183) -> DFResult<RecordBatch> {
1184 if optional_vars.is_empty() {
1185 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1186 }
1187
1188 let num_rows = unmatched_indices.len();
1189 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1190 let indices_array = UInt64Array::from(indices);
1191
1192 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1193 for (col_idx, field) in schema.fields().iter().enumerate() {
1194 if col_idx < input.num_columns() {
1195 if is_optional_column_for_vars(field.name(), optional_vars) {
1196 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1197 } else {
1198 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1199 columns.push(taken);
1200 }
1201 } else {
1202 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1203 }
1204 }
1205
1206 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1207}
1208
1209impl Stream for GraphTraverseStream {
1210 type Item = DFResult<RecordBatch>;
1211
1212 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1213 let metrics = self.metrics.clone();
1214 let _timer = metrics.elapsed_compute().timer();
1215 loop {
1216 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1217
1218 match state {
1219 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1220 Poll::Ready(Ok(())) => {
1221 self.state = TraverseStreamState::Reading;
1222 }
1224 Poll::Ready(Err(e)) => {
1225 self.state = TraverseStreamState::Done;
1226 return Poll::Ready(Some(Err(e)));
1227 }
1228 Poll::Pending => {
1229 self.state = TraverseStreamState::Warming(fut);
1230 return Poll::Pending;
1231 }
1232 },
1233 TraverseStreamState::Reading => {
1234 if let Err(e) = self.graph_ctx.check_timeout() {
1236 return Poll::Ready(Some(Err(
1237 datafusion::error::DataFusionError::Execution(e.to_string()),
1238 )));
1239 }
1240
1241 match self.input.poll_next_unpin(cx) {
1242 Poll::Ready(Some(Ok(batch))) => {
1243 let expansions = match self.expand_neighbors(&batch) {
1245 Ok(exp) => exp,
1246 Err(e) => {
1247 self.state = TraverseStreamState::Reading;
1248 return Poll::Ready(Some(Err(e)));
1249 }
1250 };
1251
1252 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1254 {
1255 let result = build_traverse_output_batch_sync(
1256 &batch,
1257 &expansions,
1258 &self.schema,
1259 self.edge_variable.as_ref(),
1260 &self.graph_ctx,
1261 self.optional,
1262 &self.optional_pattern_vars,
1263 );
1264 self.state = TraverseStreamState::Reading;
1265 if let Ok(ref r) = result {
1266 self.metrics.record_output(r.num_rows());
1267 }
1268 return Poll::Ready(Some(result));
1269 }
1270
1271 let schema = self.schema.clone();
1273 let edge_variable = self.edge_variable.clone();
1274 let edge_properties = self.edge_properties.clone();
1275 let edge_type_ids = self.edge_type_ids.clone();
1276 let target_properties = self.target_properties.clone();
1277 let target_label_name = self.target_label_name.clone();
1278 let graph_ctx = self.graph_ctx.clone();
1279
1280 let optional = self.optional;
1281 let optional_pattern_vars = self.optional_pattern_vars.clone();
1282
1283 let fut = build_traverse_output_batch(
1284 batch,
1285 expansions,
1286 schema,
1287 edge_variable,
1288 edge_properties,
1289 edge_type_ids,
1290 target_properties,
1291 target_label_name,
1292 graph_ctx,
1293 optional,
1294 optional_pattern_vars,
1295 );
1296
1297 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1298 }
1300 Poll::Ready(Some(Err(e))) => {
1301 self.state = TraverseStreamState::Done;
1302 return Poll::Ready(Some(Err(e)));
1303 }
1304 Poll::Ready(None) => {
1305 self.state = TraverseStreamState::Done;
1306 return Poll::Ready(None);
1307 }
1308 Poll::Pending => {
1309 self.state = TraverseStreamState::Reading;
1310 return Poll::Pending;
1311 }
1312 }
1313 }
1314 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1315 Poll::Ready(Ok(batch)) => {
1316 self.state = TraverseStreamState::Reading;
1317 self.metrics.record_output(batch.num_rows());
1318 return Poll::Ready(Some(Ok(batch)));
1319 }
1320 Poll::Ready(Err(e)) => {
1321 self.state = TraverseStreamState::Done;
1322 return Poll::Ready(Some(Err(e)));
1323 }
1324 Poll::Pending => {
1325 self.state = TraverseStreamState::Materializing(fut);
1326 return Poll::Pending;
1327 }
1328 },
1329 TraverseStreamState::Done => {
1330 return Poll::Ready(None);
1331 }
1332 }
1333 }
1334 }
1335}
1336
1337fn build_traverse_output_batch_sync(
1342 input: &RecordBatch,
1343 expansions: &[(usize, Vid, u64, u32)],
1344 schema: &SchemaRef,
1345 edge_variable: Option<&String>,
1346 graph_ctx: &GraphExecutionContext,
1347 optional: bool,
1348 optional_pattern_vars: &HashSet<String>,
1349) -> DFResult<RecordBatch> {
1350 if expansions.is_empty() {
1351 if !optional {
1352 return Ok(RecordBatch::new_empty(schema.clone()));
1353 }
1354 let unmatched_reps = collect_unmatched_optional_group_rows(
1355 input,
1356 &HashSet::new(),
1357 schema,
1358 optional_pattern_vars,
1359 )?;
1360 if unmatched_reps.is_empty() {
1361 return Ok(RecordBatch::new_empty(schema.clone()));
1362 }
1363 return build_optional_null_batch_for_rows_with_optional_vars(
1364 input,
1365 &unmatched_reps,
1366 schema,
1367 optional_pattern_vars,
1368 );
1369 }
1370
1371 let indices: Vec<u64> = expansions
1372 .iter()
1373 .map(|(idx, _, _, _)| *idx as u64)
1374 .collect();
1375 let indices_array = UInt64Array::from(indices);
1376
1377 let mut columns: Vec<ArrayRef> = Vec::new();
1378 for col in input.columns() {
1379 let expanded = take(col.as_ref(), &indices_array, None)?;
1380 columns.push(expanded);
1381 }
1382
1383 let target_vids: Vec<u64> = expansions
1385 .iter()
1386 .map(|(_, vid, _, _)| vid.as_u64())
1387 .collect();
1388 columns.push(Arc::new(UInt64Array::from(target_vids)));
1389
1390 {
1392 use arrow_array::builder::{ListBuilder, StringBuilder};
1393 let l0_ctx = graph_ctx.l0_context();
1394 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1395 for (_, vid, _, _) in expansions {
1396 let mut row_labels: Vec<String> = Vec::new();
1397 for l0 in l0_ctx.iter_l0_buffers() {
1398 let guard = l0.read();
1399 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1400 for lbl in l0_labels {
1401 if !row_labels.contains(lbl) {
1402 row_labels.push(lbl.clone());
1403 }
1404 }
1405 }
1406 }
1407 let values = labels_builder.values();
1408 for lbl in &row_labels {
1409 values.append_value(lbl);
1410 }
1411 labels_builder.append(true);
1412 }
1413 columns.push(Arc::new(labels_builder.finish()));
1414 }
1415
1416 if edge_variable.is_some() {
1418 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1419 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1420
1421 let uni_schema = graph_ctx.storage().schema_manager().schema();
1423 let mut type_builder = arrow_array::builder::StringBuilder::new();
1424 for (_, _, _, edge_type_id) in expansions {
1425 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1426 type_builder.append_value(&name);
1427 } else {
1428 type_builder.append_null();
1429 }
1430 }
1431 columns.push(Arc::new(type_builder.finish()));
1432 } else {
1433 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1435 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1436 }
1437
1438 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1439
1440 if optional {
1441 let matched_indices: HashSet<usize> =
1442 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1443 let unmatched = collect_unmatched_optional_group_rows(
1444 input,
1445 &matched_indices,
1446 schema,
1447 optional_pattern_vars,
1448 )?;
1449
1450 if !unmatched.is_empty() {
1451 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1452 input,
1453 &unmatched,
1454 schema,
1455 optional_pattern_vars,
1456 )?;
1457 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1458 .map_err(|e| {
1459 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1460 })?;
1461 return Ok(combined);
1462 }
1463 }
1464
1465 Ok(expanded_batch)
1466}
1467
1468impl RecordBatchStream for GraphTraverseStream {
1469 fn schema(&self) -> SchemaRef {
1470 self.schema.clone()
1471 }
1472}
1473
1474type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, Arc<str>, Arc<uni_common::Properties>)>>;
1480
1481pub struct GraphTraverseMainExec {
1504 input: Arc<dyn ExecutionPlan>,
1506
1507 source_column: String,
1509
1510 type_names: Vec<String>,
1513
1514 direction: Direction,
1516
1517 target_variable: String,
1519
1520 edge_variable: Option<String>,
1522
1523 edge_properties: Vec<String>,
1525
1526 target_properties: Vec<String>,
1528
1529 graph_ctx: Arc<GraphExecutionContext>,
1531
1532 optional: bool,
1534
1535 optional_pattern_vars: HashSet<String>,
1537
1538 bound_target_column: Option<String>,
1541
1542 used_edge_columns: Vec<String>,
1545
1546 schema: SchemaRef,
1548
1549 properties: Arc<PlanProperties>,
1551
1552 metrics: ExecutionPlanMetricsSet,
1554}
1555
1556impl fmt::Debug for GraphTraverseMainExec {
1557 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558 f.debug_struct("GraphTraverseMainExec")
1559 .field("type_names", &self.type_names)
1560 .field("direction", &self.direction)
1561 .field("target_variable", &self.target_variable)
1562 .field("edge_variable", &self.edge_variable)
1563 .finish()
1564 }
1565}
1566
1567impl GraphTraverseMainExec {
1568 #[expect(clippy::too_many_arguments)]
1570 pub fn new(
1571 input: Arc<dyn ExecutionPlan>,
1572 source_column: impl Into<String>,
1573 type_names: Vec<String>,
1574 direction: Direction,
1575 target_variable: impl Into<String>,
1576 edge_variable: Option<String>,
1577 edge_properties: Vec<String>,
1578 target_properties: Vec<String>,
1579 graph_ctx: Arc<GraphExecutionContext>,
1580 optional: bool,
1581 optional_pattern_vars: HashSet<String>,
1582 bound_target_column: Option<String>,
1583 used_edge_columns: Vec<String>,
1584 ) -> Self {
1585 let source_column = source_column.into();
1586 let target_variable = target_variable.into();
1587
1588 let schema = Self::build_schema(
1590 &input.schema(),
1591 &target_variable,
1592 &edge_variable,
1593 &edge_properties,
1594 &target_properties,
1595 optional,
1596 );
1597
1598 let properties = compute_plan_properties(schema.clone());
1599
1600 Self {
1601 input,
1602 source_column,
1603 type_names,
1604 direction,
1605 target_variable,
1606 edge_variable,
1607 edge_properties,
1608 target_properties,
1609 graph_ctx,
1610 optional,
1611 optional_pattern_vars,
1612 bound_target_column,
1613 used_edge_columns,
1614 schema,
1615 properties,
1616 metrics: ExecutionPlanMetricsSet::new(),
1617 }
1618 }
1619
1620 fn build_schema(
1622 input_schema: &SchemaRef,
1623 target_variable: &str,
1624 edge_variable: &Option<String>,
1625 edge_properties: &[String],
1626 target_properties: &[String],
1627 optional: bool,
1628 ) -> SchemaRef {
1629 let mut fields: Vec<Field> = input_schema
1630 .fields()
1631 .iter()
1632 .map(|f| f.as_ref().clone())
1633 .collect();
1634
1635 let target_vid_name = format!("{}._vid", target_variable);
1637 if input_schema.column_with_name(&target_vid_name).is_none() {
1638 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1639 }
1640
1641 let target_labels_name = format!("{}._labels", target_variable);
1643 if input_schema.column_with_name(&target_labels_name).is_none() {
1644 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1645 }
1646
1647 if let Some(edge_var) = edge_variable {
1649 fields.push(Field::new(
1650 format!("{}._eid", edge_var),
1651 DataType::UInt64,
1652 optional,
1653 ));
1654
1655 fields.push(Field::new(
1657 format!("{}._type", edge_var),
1658 DataType::Utf8,
1659 true,
1660 ));
1661
1662 for prop in edge_properties {
1666 let col_name = format!("{}.{}", edge_var, prop);
1667 let mut metadata = std::collections::HashMap::new();
1668 metadata.insert("cv_encoded".to_string(), "true".to_string());
1669 fields.push(
1670 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1671 );
1672 }
1673 } else {
1674 fields.push(Field::new(
1677 format!("__eid_to_{}", target_variable),
1678 DataType::UInt64,
1679 optional,
1680 ));
1681 }
1682
1683 for prop in target_properties {
1685 fields.push(Field::new(
1686 format!("{}.{}", target_variable, prop),
1687 DataType::LargeBinary,
1688 true,
1689 ));
1690 }
1691
1692 Arc::new(Schema::new(fields))
1693 }
1694}
1695
1696impl DisplayAs for GraphTraverseMainExec {
1697 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698 write!(
1699 f,
1700 "GraphTraverseMainExec: types={:?}, direction={:?}",
1701 self.type_names, self.direction
1702 )
1703 }
1704}
1705
1706impl ExecutionPlan for GraphTraverseMainExec {
1707 fn name(&self) -> &str {
1708 "GraphTraverseMainExec"
1709 }
1710
1711 fn as_any(&self) -> &dyn Any {
1712 self
1713 }
1714
1715 fn schema(&self) -> SchemaRef {
1716 self.schema.clone()
1717 }
1718
1719 fn properties(&self) -> &Arc<PlanProperties> {
1720 &self.properties
1721 }
1722
1723 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1724 vec![&self.input]
1725 }
1726
1727 fn with_new_children(
1728 self: Arc<Self>,
1729 children: Vec<Arc<dyn ExecutionPlan>>,
1730 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1731 if children.len() != 1 {
1732 return Err(datafusion::error::DataFusionError::Plan(
1733 "GraphTraverseMainExec expects exactly one child".to_string(),
1734 ));
1735 }
1736
1737 Ok(Arc::new(Self {
1738 input: children[0].clone(),
1739 source_column: self.source_column.clone(),
1740 type_names: self.type_names.clone(),
1741 direction: self.direction,
1742 target_variable: self.target_variable.clone(),
1743 edge_variable: self.edge_variable.clone(),
1744 edge_properties: self.edge_properties.clone(),
1745 target_properties: self.target_properties.clone(),
1746 graph_ctx: self.graph_ctx.clone(),
1747 optional: self.optional,
1748 optional_pattern_vars: self.optional_pattern_vars.clone(),
1749 bound_target_column: self.bound_target_column.clone(),
1750 used_edge_columns: self.used_edge_columns.clone(),
1751 schema: self.schema.clone(),
1752 properties: self.properties.clone(),
1753 metrics: self.metrics.clone(),
1754 }))
1755 }
1756
1757 fn execute(
1758 &self,
1759 partition: usize,
1760 context: Arc<TaskContext>,
1761 ) -> DFResult<SendableRecordBatchStream> {
1762 let input_stream = self.input.execute(partition, context)?;
1763 let metrics = BaselineMetrics::new(&self.metrics, partition);
1764
1765 Ok(Box::pin(GraphTraverseMainStream::new(
1766 input_stream,
1767 self.source_column.clone(),
1768 self.type_names.clone(),
1769 self.direction,
1770 self.target_variable.clone(),
1771 self.edge_variable.clone(),
1772 self.edge_properties.clone(),
1773 self.target_properties.clone(),
1774 self.graph_ctx.clone(),
1775 self.optional,
1776 self.optional_pattern_vars.clone(),
1777 self.bound_target_column.clone(),
1778 self.used_edge_columns.clone(),
1779 self.schema.clone(),
1780 metrics,
1781 )))
1782 }
1783
1784 fn metrics(&self) -> Option<MetricsSet> {
1785 Some(self.metrics.clone_inner())
1786 }
1787}
1788
1789const TRAVERSE_PUSHDOWN_MAX_SOURCES: usize = 1_024;
1795
1796enum GraphTraverseMainState {
1804 CollectingInput {
1806 input_stream: SendableRecordBatchStream,
1807 buffered: Vec<RecordBatch>,
1808 },
1809 LoadingEdges {
1811 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1812 buffered: Vec<RecordBatch>,
1813 },
1814 Processing {
1816 adjacency: EdgeAdjacencyMap,
1817 buffered: std::vec::IntoIter<RecordBatch>,
1818 },
1819 Done,
1821}
1822
1823struct GraphTraverseMainStream {
1825 source_column: String,
1827
1828 target_variable: String,
1830
1831 edge_variable: Option<String>,
1833
1834 edge_properties: Vec<String>,
1836
1837 target_properties: Vec<String>,
1839
1840 graph_ctx: Arc<GraphExecutionContext>,
1842
1843 optional: bool,
1845
1846 optional_pattern_vars: HashSet<String>,
1848
1849 bound_target_column: Option<String>,
1851
1852 used_edge_columns: Vec<String>,
1854
1855 schema: SchemaRef,
1857
1858 type_names: Vec<String>,
1860
1861 direction: Direction,
1863
1864 state: GraphTraverseMainState,
1866
1867 metrics: BaselineMetrics,
1869}
1870
1871impl GraphTraverseMainStream {
1872 #[expect(clippy::too_many_arguments)]
1874 fn new(
1875 input_stream: SendableRecordBatchStream,
1876 source_column: String,
1877 type_names: Vec<String>,
1878 direction: Direction,
1879 target_variable: String,
1880 edge_variable: Option<String>,
1881 edge_properties: Vec<String>,
1882 target_properties: Vec<String>,
1883 graph_ctx: Arc<GraphExecutionContext>,
1884 optional: bool,
1885 optional_pattern_vars: HashSet<String>,
1886 bound_target_column: Option<String>,
1887 used_edge_columns: Vec<String>,
1888 schema: SchemaRef,
1889 metrics: BaselineMetrics,
1890 ) -> Self {
1891 Self {
1894 source_column,
1895 target_variable,
1896 edge_variable,
1897 edge_properties,
1898 target_properties,
1899 graph_ctx,
1900 optional,
1901 optional_pattern_vars,
1902 bound_target_column,
1903 used_edge_columns,
1904 schema,
1905 type_names,
1906 direction,
1907 state: GraphTraverseMainState::CollectingInput {
1908 input_stream,
1909 buffered: Vec::new(),
1910 },
1911 metrics,
1912 }
1913 }
1914
1915 fn collect_source_vids(&self, buffered: &[RecordBatch]) -> Option<HashSet<Vid>> {
1920 let mut vids: HashSet<Vid> = HashSet::new();
1921 for batch in buffered {
1922 let col = batch.column_by_name(&self.source_column)?;
1923 let arr = column_as_vid_array(col.as_ref()).ok()?;
1924 for i in 0..arr.len() {
1925 if !arr.is_null(i) {
1926 vids.insert(Vid::from(arr.value(i)));
1927 if vids.len() > TRAVERSE_PUSHDOWN_MAX_SOURCES {
1928 return None;
1929 }
1930 }
1931 }
1932 }
1933 Some(vids)
1934 }
1935
1936 fn expand_batch(
1938 &self,
1939 input: &RecordBatch,
1940 adjacency: &EdgeAdjacencyMap,
1941 ) -> DFResult<RecordBatch> {
1942 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1944 datafusion::error::DataFusionError::Execution(format!(
1945 "Source column {} not found",
1946 self.source_column
1947 ))
1948 })?;
1949
1950 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1951 let source_vids: &UInt64Array = &source_vid_cow;
1952
1953 let bound_target_cow = self
1955 .bound_target_column
1956 .as_ref()
1957 .and_then(|col| input.column_by_name(col))
1958 .map(|c| column_as_vid_array(c.as_ref()))
1959 .transpose()?;
1960 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1961
1962 let used_edge_arrays: Vec<&UInt64Array> = self
1964 .used_edge_columns
1965 .iter()
1966 .filter_map(|col| {
1967 input
1968 .column_by_name(col)
1969 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1970 })
1971 .collect();
1972
1973 type Expansion = (usize, Vid, Eid, Arc<str>, Arc<uni_common::Properties>);
1976 let mut expansions: Vec<Expansion> = Vec::new();
1977
1978 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1979 if let Some(src_u64) = src_u64 {
1980 let src_vid = Vid::from(src_u64);
1981
1982 let used_eids: HashSet<u64> = used_edge_arrays
1984 .iter()
1985 .filter_map(|arr| {
1986 if arr.is_null(row_idx) {
1987 None
1988 } else {
1989 Some(arr.value(row_idx))
1990 }
1991 })
1992 .collect();
1993
1994 if let Some(neighbors) = adjacency.get(&src_vid) {
1995 for (target_vid, eid, edge_type, props) in neighbors {
1996 if used_eids.contains(&eid.as_u64()) {
1998 continue;
1999 }
2000
2001 if let Some(targets) = expected_targets {
2004 if targets.is_null(row_idx) {
2005 continue;
2006 }
2007 let expected_vid = targets.value(row_idx);
2008 if target_vid.as_u64() != expected_vid {
2009 continue;
2010 }
2011 }
2012
2013 expansions.push((
2014 row_idx,
2015 *target_vid,
2016 *eid,
2017 Arc::clone(edge_type),
2018 Arc::clone(props),
2019 ));
2020 }
2021 }
2022 }
2023 }
2024
2025 if expansions.is_empty() && self.optional {
2027 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
2029 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
2030 }
2031
2032 if expansions.is_empty() {
2033 return Ok(RecordBatch::new_empty(self.schema.clone()));
2035 }
2036
2037 let matched_rows: HashSet<usize> = if self.optional {
2039 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
2040 } else {
2041 HashSet::new()
2042 };
2043
2044 let mut columns: Vec<ArrayRef> = Vec::new();
2046 let indices: Vec<u64> = expansions
2047 .iter()
2048 .map(|(idx, _, _, _, _)| *idx as u64)
2049 .collect();
2050 let indices_array = UInt64Array::from(indices);
2051
2052 for col in input.columns() {
2053 let expanded = take(col.as_ref(), &indices_array, None)?;
2054 columns.push(expanded);
2055 }
2056
2057 let target_vid_name = format!("{}._vid", self.target_variable);
2059 let target_vids: Vec<u64> = expansions
2060 .iter()
2061 .map(|(_, vid, _, _, _)| vid.as_u64())
2062 .collect();
2063 if input.schema().column_with_name(&target_vid_name).is_none() {
2064 columns.push(Arc::new(UInt64Array::from(target_vids)));
2065 }
2066
2067 let target_labels_name = format!("{}._labels", self.target_variable);
2069 if input
2070 .schema()
2071 .column_with_name(&target_labels_name)
2072 .is_none()
2073 {
2074 use arrow_array::builder::{ListBuilder, StringBuilder};
2075 let l0_ctx = self.graph_ctx.l0_context();
2076 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2077 for (_, target_vid, _, _, _) in &expansions {
2078 let mut row_labels: Vec<String> = Vec::new();
2079 for l0 in l0_ctx.iter_l0_buffers() {
2080 let guard = l0.read();
2081 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2082 for lbl in l0_labels {
2083 if !row_labels.contains(lbl) {
2084 row_labels.push(lbl.clone());
2085 }
2086 }
2087 }
2088 }
2089 let values = labels_builder.values();
2090 for lbl in &row_labels {
2091 values.append_value(lbl);
2092 }
2093 labels_builder.append(true);
2094 }
2095 columns.push(Arc::new(labels_builder.finish()));
2096 }
2097
2098 if self.edge_variable.is_some() {
2101 let eids: Vec<u64> = expansions
2103 .iter()
2104 .map(|(_, _, eid, _, _)| eid.as_u64())
2105 .collect();
2106 columns.push(Arc::new(UInt64Array::from(eids)));
2107
2108 {
2110 let mut type_builder = arrow_array::builder::StringBuilder::new();
2111 for (_, _, _, edge_type, _) in &expansions {
2112 type_builder.append_value(edge_type);
2113 }
2114 columns.push(Arc::new(type_builder.finish()));
2115 }
2116
2117 for prop_name in &self.edge_properties {
2119 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2120 if prop_name == "_all_props" {
2121 for (_, _, _, _, props) in &expansions {
2124 if props.is_empty() {
2125 builder.append_null();
2126 } else {
2127 let map: HashMap<String, uni_common::Value> =
2128 props.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
2129 builder.append_value(uni_common::cypher_value_codec::encode(
2130 &uni_common::Value::Map(map),
2131 ));
2132 }
2133 }
2134 } else {
2135 for (_, _, _, _, props) in &expansions {
2137 match props.get(prop_name) {
2138 Some(uni_common::Value::Null) | None => builder.append_null(),
2139 Some(val) => {
2140 builder.append_value(uni_common::cypher_value_codec::encode(val));
2141 }
2142 }
2143 }
2144 }
2145 columns.push(Arc::new(builder.finish()));
2146 }
2147 } else {
2148 let eids: Vec<u64> = expansions
2149 .iter()
2150 .map(|(_, _, eid, _, _)| eid.as_u64())
2151 .collect();
2152 columns.push(Arc::new(UInt64Array::from(eids)));
2153 }
2154
2155 {
2157 let l0_ctx = self.graph_ctx.l0_context();
2158
2159 for prop_name in &self.target_properties {
2160 if prop_name == "_all_props" {
2161 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2163 for (_, target_vid, _, _, _) in &expansions {
2164 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
2166 for l0 in l0_ctx.iter_l0_buffers() {
2167 let guard = l0.read();
2168 if let Some(props) = guard.vertex_properties.get(target_vid) {
2169 for (k, v) in props.iter() {
2170 merged_props.insert(k.clone(), v.clone());
2171 }
2172 }
2173 }
2174 if merged_props.is_empty() {
2175 builder.append_null();
2176 } else {
2177 builder.append_value(uni_common::cypher_value_codec::encode(
2178 &uni_common::Value::Map(merged_props),
2179 ));
2180 }
2181 }
2182 columns.push(Arc::new(builder.finish()));
2183 } else {
2184 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2186 for (_, target_vid, _, _, _) in &expansions {
2187 let mut found = false;
2188 for l0 in l0_ctx.iter_l0_buffers() {
2189 let guard = l0.read();
2190 if let Some(props) = guard.vertex_properties.get(target_vid)
2191 && let Some(val) = props.get(prop_name.as_str())
2192 && !val.is_null()
2193 {
2194 builder.append_value(uni_common::cypher_value_codec::encode(val));
2195 found = true;
2196 break;
2197 }
2198 }
2199 if !found {
2200 builder.append_null();
2201 }
2202 }
2203 columns.push(Arc::new(builder.finish()));
2204 }
2205 }
2206 }
2207
2208 let matched_batch =
2209 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2210
2211 if self.optional {
2213 let unmatched = collect_unmatched_optional_group_rows(
2214 input,
2215 &matched_rows,
2216 &self.schema,
2217 &self.optional_pattern_vars,
2218 )?;
2219
2220 if unmatched.is_empty() {
2221 return Ok(matched_batch);
2222 }
2223
2224 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2225 input,
2226 &unmatched,
2227 &self.schema,
2228 &self.optional_pattern_vars,
2229 )?;
2230
2231 use arrow::compute::concat_batches;
2233 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2234 } else {
2235 Ok(matched_batch)
2236 }
2237 }
2238}
2239
2240impl GraphExecutionContext {
2241 fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2252 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2253 return;
2254 };
2255 let guard = tx_l0.read();
2256 let Some(read_set) = &guard.occ_read_set else {
2257 return;
2258 };
2259 let mut rs = read_set.lock();
2260 for (src, neighbors) in adjacency {
2261 rs.vertices.insert(*src);
2262 for (nbr, eid, _type, _props) in neighbors {
2263 rs.vertices.insert(*nbr);
2264 rs.edges.insert(*eid);
2265 }
2266 }
2267 }
2268}
2269
2270async fn build_edge_adjacency_map(
2281 graph_ctx: &GraphExecutionContext,
2282 type_names: &[String],
2283 direction: Direction,
2284 source_vids: Option<HashSet<Vid>>,
2285) -> DFResult<EdgeAdjacencyMap> {
2286 let storage = graph_ctx.storage();
2287 let l0_ctx = graph_ctx.l0_context();
2288
2289 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2292 let endpoint_vids: Option<Vec<Vid>> = source_vids.as_ref().map(|s| s.iter().copied().collect());
2293 let endpoint_filter = endpoint_vids.as_deref().map(|vids| {
2294 let side = match direction {
2295 Direction::Outgoing => uni_store::storage::EndpointSide::Src,
2296 Direction::Incoming => uni_store::storage::EndpointSide::Dst,
2297 Direction::Both => uni_store::storage::EndpointSide::Either,
2298 };
2299 (side, vids)
2300 });
2301 let edges_with_type = storage
2302 .find_edges_by_type_names(&type_refs, endpoint_filter)
2303 .await
2304 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2305
2306 let edge_in_scope = |src: Vid, dst: Vid| -> bool {
2310 match &source_vids {
2311 None => true,
2312 Some(set) => match direction {
2313 Direction::Outgoing => set.contains(&src),
2314 Direction::Incoming => set.contains(&dst),
2315 Direction::Both => set.contains(&src) || set.contains(&dst),
2316 },
2317 }
2318 };
2319
2320 let mut edges: Vec<(
2322 uni_common::Eid,
2323 uni_common::Vid,
2324 uni_common::Vid,
2325 String,
2326 uni_common::Properties,
2327 )> = edges_with_type.into_iter().collect();
2328
2329 for l0 in l0_ctx.iter_l0_buffers() {
2331 let l0_guard = l0.read();
2332
2333 for type_name in type_names {
2334 let l0_eids = l0_guard.eids_for_type(type_name);
2335
2336 for &eid in &l0_eids {
2338 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2339 let src_vid = edge_ref.src_vid;
2340 let dst_vid = edge_ref.dst_vid;
2341 if !edge_in_scope(src_vid, dst_vid) {
2342 continue;
2343 }
2344
2345 let props = l0_guard
2347 .edge_properties
2348 .get(&eid)
2349 .cloned()
2350 .unwrap_or_default();
2351
2352 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2353 }
2354 }
2355 }
2356 }
2357
2358 let mut seen_eids = HashSet::new();
2360 let mut unique_edges = Vec::new();
2361 for edge in edges.into_iter().rev() {
2362 if seen_eids.insert(edge.0) {
2363 unique_edges.push(edge);
2364 }
2365 }
2366 unique_edges.reverse();
2367
2368 let mut tombstoned_eids = HashSet::new();
2370 for l0 in l0_ctx.iter_l0_buffers() {
2371 let l0_guard = l0.read();
2372 for eid in l0_guard.tombstones.keys() {
2373 tombstoned_eids.insert(*eid);
2374 }
2375 }
2376 if !tombstoned_eids.is_empty() {
2377 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2378 }
2379
2380 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2383
2384 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2385 let edge_type: Arc<str> = edge_type.into();
2386 let props = Arc::new(props);
2387 match direction {
2388 Direction::Outgoing => {
2389 adjacency
2390 .entry(src_vid)
2391 .or_default()
2392 .push((dst_vid, eid, edge_type, props));
2393 }
2394 Direction::Incoming => {
2395 adjacency
2396 .entry(dst_vid)
2397 .or_default()
2398 .push((src_vid, eid, edge_type, props));
2399 }
2400 Direction::Both => {
2401 adjacency.entry(src_vid).or_default().push((
2402 dst_vid,
2403 eid,
2404 Arc::clone(&edge_type),
2405 Arc::clone(&props),
2406 ));
2407 adjacency
2408 .entry(dst_vid)
2409 .or_default()
2410 .push((src_vid, eid, edge_type, props));
2411 }
2412 }
2413 }
2414
2415 graph_ctx.record_edge_adjacency(&adjacency);
2423
2424 Ok(adjacency)
2425}
2426
2427impl Stream for GraphTraverseMainStream {
2428 type Item = DFResult<RecordBatch>;
2429
2430 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2431 let metrics = self.metrics.clone();
2432 let _timer = metrics.elapsed_compute().timer();
2433 loop {
2434 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2435
2436 match state {
2437 GraphTraverseMainState::CollectingInput {
2438 mut input_stream,
2439 mut buffered,
2440 } => match input_stream.poll_next_unpin(cx) {
2441 Poll::Ready(Some(Ok(batch))) => {
2442 buffered.push(batch);
2443 self.state = GraphTraverseMainState::CollectingInput {
2444 input_stream,
2445 buffered,
2446 };
2447 }
2449 Poll::Ready(Some(Err(e))) => {
2450 self.state = GraphTraverseMainState::Done;
2451 return Poll::Ready(Some(Err(e)));
2452 }
2453 Poll::Ready(None) => {
2454 let source_vids = self.collect_source_vids(&buffered);
2457 let loading_ctx = self.graph_ctx.clone();
2458 let loading_types = self.type_names.clone();
2459 let direction = self.direction;
2460 let fut = async move {
2461 build_edge_adjacency_map(
2462 &loading_ctx,
2463 &loading_types,
2464 direction,
2465 source_vids,
2466 )
2467 .await
2468 };
2469 self.state = GraphTraverseMainState::LoadingEdges {
2470 future: Box::pin(fut),
2471 buffered,
2472 };
2473 }
2475 Poll::Pending => {
2476 self.state = GraphTraverseMainState::CollectingInput {
2477 input_stream,
2478 buffered,
2479 };
2480 return Poll::Pending;
2481 }
2482 },
2483 GraphTraverseMainState::LoadingEdges {
2484 mut future,
2485 buffered,
2486 } => match future.as_mut().poll(cx) {
2487 Poll::Ready(Ok(adjacency)) => {
2488 self.state = GraphTraverseMainState::Processing {
2490 adjacency,
2491 buffered: buffered.into_iter(),
2492 };
2493 }
2495 Poll::Ready(Err(e)) => {
2496 self.state = GraphTraverseMainState::Done;
2497 return Poll::Ready(Some(Err(e)));
2498 }
2499 Poll::Pending => {
2500 self.state = GraphTraverseMainState::LoadingEdges { future, buffered };
2501 return Poll::Pending;
2502 }
2503 },
2504 GraphTraverseMainState::Processing {
2505 adjacency,
2506 mut buffered,
2507 } => {
2508 if let Err(e) = self.graph_ctx.check_timeout() {
2510 return Poll::Ready(Some(Err(
2511 datafusion::error::DataFusionError::Execution(e.to_string()),
2512 )));
2513 }
2514
2515 match buffered.next() {
2516 Some(batch) => {
2517 let result = self.expand_batch(&batch, &adjacency);
2519
2520 self.state = GraphTraverseMainState::Processing {
2521 adjacency,
2522 buffered,
2523 };
2524
2525 if let Ok(ref r) = result {
2526 self.metrics.record_output(r.num_rows());
2527 }
2528 return Poll::Ready(Some(result));
2529 }
2530 None => {
2531 self.state = GraphTraverseMainState::Done;
2532 return Poll::Ready(None);
2533 }
2534 }
2535 }
2536 GraphTraverseMainState::Done => {
2537 return Poll::Ready(None);
2538 }
2539 }
2540 }
2541 }
2542}
2543
2544impl RecordBatchStream for GraphTraverseMainStream {
2545 fn schema(&self) -> SchemaRef {
2546 self.schema.clone()
2547 }
2548}
2549
2550pub struct GraphVariableLengthTraverseExec {
2571 input: Arc<dyn ExecutionPlan>,
2573
2574 source_column: String,
2576
2577 edge_type_ids: Vec<u32>,
2579
2580 direction: Direction,
2582
2583 min_hops: usize,
2585
2586 max_hops: usize,
2588
2589 target_variable: String,
2591
2592 step_variable: Option<String>,
2594
2595 path_variable: Option<String>,
2597
2598 target_properties: Vec<String>,
2600
2601 target_label_name: Option<String>,
2603
2604 is_optional: bool,
2606
2607 bound_target_column: Option<String>,
2609
2610 edge_lance_filter: Option<String>,
2612
2613 edge_property_conditions: Vec<(String, UniValue)>,
2616
2617 used_edge_columns: Vec<String>,
2619
2620 path_mode: super::nfa::PathMode,
2622
2623 output_mode: super::nfa::VlpOutputMode,
2625
2626 nfa: Arc<PathNfa>,
2628
2629 graph_ctx: Arc<GraphExecutionContext>,
2631
2632 schema: SchemaRef,
2634
2635 properties: Arc<PlanProperties>,
2637
2638 metrics: ExecutionPlanMetricsSet,
2640}
2641
2642impl fmt::Debug for GraphVariableLengthTraverseExec {
2643 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2644 f.debug_struct("GraphVariableLengthTraverseExec")
2645 .field("source_column", &self.source_column)
2646 .field("edge_type_ids", &self.edge_type_ids)
2647 .field("direction", &self.direction)
2648 .field("min_hops", &self.min_hops)
2649 .field("max_hops", &self.max_hops)
2650 .field("target_variable", &self.target_variable)
2651 .finish()
2652 }
2653}
2654
2655impl GraphVariableLengthTraverseExec {
2656 #[expect(clippy::too_many_arguments)]
2662 pub fn new(
2663 input: Arc<dyn ExecutionPlan>,
2664 source_column: impl Into<String>,
2665 edge_type_ids: Vec<u32>,
2666 direction: Direction,
2667 min_hops: usize,
2668 max_hops: usize,
2669 target_variable: impl Into<String>,
2670 step_variable: Option<String>,
2671 path_variable: Option<String>,
2672 target_properties: Vec<String>,
2673 target_label_name: Option<String>,
2674 graph_ctx: Arc<GraphExecutionContext>,
2675 is_optional: bool,
2676 bound_target_column: Option<String>,
2677 edge_lance_filter: Option<String>,
2678 edge_property_conditions: Vec<(String, UniValue)>,
2679 used_edge_columns: Vec<String>,
2680 path_mode: super::nfa::PathMode,
2681 output_mode: super::nfa::VlpOutputMode,
2682 qpp_nfa: Option<PathNfa>,
2683 ) -> Self {
2684 let source_column = source_column.into();
2685 let target_variable = target_variable.into();
2686
2687 let uni_schema = graph_ctx.storage().schema_manager().schema();
2689 let label_props = target_label_name
2690 .as_deref()
2691 .and_then(|ln| uni_schema.properties.get(ln));
2692
2693 let schema = Self::build_schema(
2695 input.schema(),
2696 &target_variable,
2697 step_variable.as_deref(),
2698 path_variable.as_deref(),
2699 &target_properties,
2700 label_props,
2701 );
2702 let properties = compute_plan_properties(schema.clone());
2703
2704 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2706 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2707 }));
2708
2709 Self {
2710 input,
2711 source_column,
2712 edge_type_ids,
2713 direction,
2714 min_hops,
2715 max_hops,
2716 target_variable,
2717 step_variable,
2718 path_variable,
2719 target_properties,
2720 target_label_name,
2721 is_optional,
2722 bound_target_column,
2723 edge_lance_filter,
2724 edge_property_conditions,
2725 used_edge_columns,
2726 path_mode,
2727 output_mode,
2728 nfa,
2729 graph_ctx,
2730 schema,
2731 properties,
2732 metrics: ExecutionPlanMetricsSet::new(),
2733 }
2734 }
2735
2736 fn build_schema(
2738 input_schema: SchemaRef,
2739 target_variable: &str,
2740 step_variable: Option<&str>,
2741 path_variable: Option<&str>,
2742 target_properties: &[String],
2743 label_props: Option<
2744 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2745 >,
2746 ) -> SchemaRef {
2747 let mut fields: Vec<Field> = input_schema
2748 .fields()
2749 .iter()
2750 .map(|f| f.as_ref().clone())
2751 .collect();
2752
2753 let target_vid_name = format!("{}._vid", target_variable);
2755 if input_schema.column_with_name(&target_vid_name).is_none() {
2756 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2757 }
2758
2759 let target_labels_name = format!("{}._labels", target_variable);
2761 if input_schema.column_with_name(&target_labels_name).is_none() {
2762 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2763 }
2764
2765 for prop_name in target_properties {
2767 let col_name = format!("{}.{}", target_variable, prop_name);
2768 if input_schema.column_with_name(&col_name).is_none() {
2769 let arrow_type = resolve_property_type(prop_name, label_props);
2770 fields.push(Field::new(&col_name, arrow_type, true));
2771 }
2772 }
2773
2774 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2776
2777 if let Some(step_var) = step_variable {
2779 fields.push(build_edge_list_field(step_var));
2780 }
2781
2782 if let Some(path_var) = path_variable
2784 && input_schema.column_with_name(path_var).is_none()
2785 {
2786 fields.push(build_path_struct_field(path_var));
2787 }
2788
2789 Arc::new(Schema::new(fields))
2790 }
2791}
2792
2793impl DisplayAs for GraphVariableLengthTraverseExec {
2794 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2795 write!(
2796 f,
2797 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2798 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2799 )
2800 }
2801}
2802
2803impl ExecutionPlan for GraphVariableLengthTraverseExec {
2804 fn name(&self) -> &str {
2805 "GraphVariableLengthTraverseExec"
2806 }
2807
2808 fn as_any(&self) -> &dyn Any {
2809 self
2810 }
2811
2812 fn schema(&self) -> SchemaRef {
2813 self.schema.clone()
2814 }
2815
2816 fn properties(&self) -> &Arc<PlanProperties> {
2817 &self.properties
2818 }
2819
2820 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2821 vec![&self.input]
2822 }
2823
2824 fn with_new_children(
2825 self: Arc<Self>,
2826 children: Vec<Arc<dyn ExecutionPlan>>,
2827 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2828 if children.len() != 1 {
2829 return Err(datafusion::error::DataFusionError::Plan(
2830 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2831 ));
2832 }
2833
2834 Ok(Arc::new(Self::new(
2836 children[0].clone(),
2837 self.source_column.clone(),
2838 self.edge_type_ids.clone(),
2839 self.direction,
2840 self.min_hops,
2841 self.max_hops,
2842 self.target_variable.clone(),
2843 self.step_variable.clone(),
2844 self.path_variable.clone(),
2845 self.target_properties.clone(),
2846 self.target_label_name.clone(),
2847 self.graph_ctx.clone(),
2848 self.is_optional,
2849 self.bound_target_column.clone(),
2850 self.edge_lance_filter.clone(),
2851 self.edge_property_conditions.clone(),
2852 self.used_edge_columns.clone(),
2853 self.path_mode.clone(),
2854 self.output_mode.clone(),
2855 Some((*self.nfa).clone()),
2856 )))
2857 }
2858
2859 fn execute(
2860 &self,
2861 partition: usize,
2862 context: Arc<TaskContext>,
2863 ) -> DFResult<SendableRecordBatchStream> {
2864 let input_stream = self.input.execute(partition, context)?;
2865
2866 let metrics = BaselineMetrics::new(&self.metrics, partition);
2867
2868 let warm_fut = self
2869 .graph_ctx
2870 .warming_future(self.edge_type_ids.clone(), self.direction);
2871
2872 Ok(Box::pin(GraphVariableLengthTraverseStream {
2873 input: input_stream,
2874 exec: Arc::new(self.clone_for_stream()),
2875 schema: self.schema.clone(),
2876 state: VarLengthStreamState::Warming(warm_fut),
2877 metrics,
2878 }))
2879 }
2880
2881 fn metrics(&self) -> Option<MetricsSet> {
2882 Some(self.metrics.clone_inner())
2883 }
2884}
2885
2886impl GraphVariableLengthTraverseExec {
2887 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2889 GraphVariableLengthTraverseExecData {
2890 source_column: self.source_column.clone(),
2891 edge_type_ids: self.edge_type_ids.clone(),
2892 direction: self.direction,
2893 min_hops: self.min_hops,
2894 max_hops: self.max_hops,
2895 target_variable: self.target_variable.clone(),
2896 step_variable: self.step_variable.clone(),
2897 path_variable: self.path_variable.clone(),
2898 target_properties: self.target_properties.clone(),
2899 target_label_name: self.target_label_name.clone(),
2900 is_optional: self.is_optional,
2901 bound_target_column: self.bound_target_column.clone(),
2902 edge_lance_filter: self.edge_lance_filter.clone(),
2903 edge_property_conditions: self.edge_property_conditions.clone(),
2904 used_edge_columns: self.used_edge_columns.clone(),
2905 path_mode: self.path_mode.clone(),
2906 output_mode: self.output_mode.clone(),
2907 nfa: self.nfa.clone(),
2908 graph_ctx: self.graph_ctx.clone(),
2909 }
2910 }
2911}
2912
2913#[expect(
2915 dead_code,
2916 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2917)]
2918struct GraphVariableLengthTraverseExecData {
2919 source_column: String,
2920 edge_type_ids: Vec<u32>,
2921 direction: Direction,
2922 min_hops: usize,
2923 max_hops: usize,
2924 target_variable: String,
2925 step_variable: Option<String>,
2926 path_variable: Option<String>,
2927 target_properties: Vec<String>,
2928 target_label_name: Option<String>,
2929 is_optional: bool,
2930 bound_target_column: Option<String>,
2931 #[expect(dead_code, reason = "Used in Phase 3 warming")]
2932 edge_lance_filter: Option<String>,
2933 edge_property_conditions: Vec<(String, UniValue)>,
2935 used_edge_columns: Vec<String>,
2936 path_mode: super::nfa::PathMode,
2937 output_mode: super::nfa::VlpOutputMode,
2938 nfa: Arc<PathNfa>,
2939 graph_ctx: Arc<GraphExecutionContext>,
2940}
2941
2942const MAX_FRONTIER_SIZE: usize = 500_000;
2944const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2946
2947impl GraphVariableLengthTraverseExecData {
2948 fn check_target_label(&self, vid: Vid) -> bool {
2950 if let Some(ref label_name) = self.target_label_name {
2951 let query_ctx = self.graph_ctx.query_context();
2952 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2953 Some(labels) => labels.contains(label_name),
2954 None => true, }
2956 } else {
2957 true
2958 }
2959 }
2960
2961 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2963 match constraint {
2964 super::nfa::VertexConstraint::Label(label_name) => {
2965 let query_ctx = self.graph_ctx.query_context();
2966 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2967 Some(labels) => labels.contains(label_name),
2968 None => true, }
2970 }
2971 }
2972 }
2973
2974 fn expand_neighbors(
2977 &self,
2978 vid: Vid,
2979 state: NfaStateId,
2980 eid_filter: &EidFilter,
2981 used_eids: &FxHashSet<u64>,
2982 ) -> Vec<(Vid, Eid, NfaStateId)> {
2983 let is_undirected = matches!(self.direction, Direction::Both);
2984 let mut results = Vec::new();
2985
2986 for transition in self.nfa.transitions_from(state) {
2987 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2988
2989 for &etype in &transition.edge_type_ids {
2990 for (neighbor, eid) in
2991 self.graph_ctx
2992 .get_neighbors(vid, etype, transition.direction)
2993 {
2994 if is_undirected && !seen_edges.insert(eid.as_u64()) {
2996 continue;
2997 }
2998
2999 if !eid_filter.contains(eid) {
3001 continue;
3002 }
3003
3004 if !self.edge_property_conditions.is_empty() {
3006 let query_ctx = self.graph_ctx.query_context();
3007 let passes = if let Some(props) =
3008 l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
3009 {
3010 self.edge_property_conditions
3011 .iter()
3012 .all(|(name, expected)| {
3013 props.get(name).is_some_and(|actual| actual == expected)
3014 })
3015 } else {
3016 true
3020 };
3021 if !passes {
3022 continue;
3023 }
3024 }
3025
3026 if used_eids.contains(&eid.as_u64()) {
3028 continue;
3029 }
3030
3031 if let Some(constraint) = self.nfa.state_constraint(transition.to)
3033 && !self.check_state_constraint(neighbor, constraint)
3034 {
3035 continue;
3036 }
3037
3038 results.push((neighbor, eid, transition.to));
3039 }
3040 }
3041 }
3042
3043 results
3044 }
3045
3046 fn bfs_with_dag(
3051 &self,
3052 source: Vid,
3053 eid_filter: &EidFilter,
3054 used_eids: &FxHashSet<u64>,
3055 vid_filter: &VidFilter,
3056 ) -> Vec<BfsResult> {
3057 let nfa = &self.nfa;
3058 let selector = PathSelector::All;
3059 let mut dag = PredecessorDag::new(selector);
3060 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
3061
3062 if nfa.is_accepting(nfa.start_state())
3064 && self.check_target_label(source)
3065 && vid_filter.contains(source)
3066 {
3067 accepting.push((source, nfa.start_state(), 0));
3068 }
3069
3070 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3072 let mut depth: u32 = 0;
3073
3074 while !frontier.is_empty() && depth < self.max_hops as u32 {
3075 depth += 1;
3076 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3077 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3078
3079 for &(vid, state) in &frontier {
3080 for (neighbor, eid, dst_state) in
3081 self.expand_neighbors(vid, state, eid_filter, used_eids)
3082 {
3083 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3085
3086 if seen_at_depth.insert((neighbor, dst_state)) {
3088 next_frontier.push((neighbor, dst_state));
3089
3090 if nfa.is_accepting(dst_state)
3092 && self.check_target_label(neighbor)
3093 && vid_filter.contains(neighbor)
3094 {
3095 accepting.push((neighbor, dst_state, depth));
3096 }
3097 }
3098 }
3099 }
3100
3101 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3103 break;
3104 }
3105
3106 frontier = next_frontier;
3107 }
3108
3109 let mut results: Vec<BfsResult> = Vec::new();
3111 for &(target, state, depth) in &accepting {
3112 dag.enumerate_paths(
3113 source,
3114 target,
3115 state,
3116 depth,
3117 depth,
3118 &self.path_mode,
3119 &mut |nodes, edges| {
3120 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3121 std::ops::ControlFlow::Continue(())
3122 },
3123 );
3124 }
3125
3126 results
3127 }
3128
3129 fn bfs_endpoints_only(
3134 &self,
3135 source: Vid,
3136 eid_filter: &EidFilter,
3137 used_eids: &FxHashSet<u64>,
3138 vid_filter: &VidFilter,
3139 ) -> Vec<(Vid, u32)> {
3140 let nfa = &self.nfa;
3141 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
3143 let mut results: Vec<(Vid, u32)> = Vec::new();
3144
3145 if nfa.is_accepting(nfa.start_state())
3147 && self.check_target_label(source)
3148 && vid_filter.contains(source)
3149 {
3150 results.push((source, 0));
3151 }
3152
3153 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3155 let mut depth: u32 = 0;
3156
3157 while !frontier.is_empty() && depth < self.max_hops as u32 {
3158 depth += 1;
3159 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3160 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3161
3162 for &(vid, state) in &frontier {
3163 for (neighbor, eid, dst_state) in
3164 self.expand_neighbors(vid, state, eid_filter, used_eids)
3165 {
3166 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3167
3168 if seen_at_depth.insert((neighbor, dst_state)) {
3169 next_frontier.push((neighbor, dst_state));
3170
3171 if nfa.is_accepting(dst_state)
3173 && self.check_target_label(neighbor)
3174 && vid_filter.contains(neighbor)
3175 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3176 {
3177 results.push((neighbor, depth));
3178 }
3179 }
3180 }
3181 }
3182
3183 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3184 break;
3185 }
3186
3187 frontier = next_frontier;
3188 }
3189
3190 results
3191 }
3192}
3193
3194enum VarLengthStreamState {
3196 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3198 Reading,
3200 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3202 Done,
3204}
3205
3206struct GraphVariableLengthTraverseStream {
3208 input: SendableRecordBatchStream,
3209 exec: Arc<GraphVariableLengthTraverseExecData>,
3210 schema: SchemaRef,
3211 state: VarLengthStreamState,
3212 metrics: BaselineMetrics,
3213}
3214
3215impl Stream for GraphVariableLengthTraverseStream {
3216 type Item = DFResult<RecordBatch>;
3217
3218 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3219 let metrics = self.metrics.clone();
3220 let _timer = metrics.elapsed_compute().timer();
3221 loop {
3222 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3223
3224 match state {
3225 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3226 Poll::Ready(Ok(())) => {
3227 self.state = VarLengthStreamState::Reading;
3228 }
3230 Poll::Ready(Err(e)) => {
3231 self.state = VarLengthStreamState::Done;
3232 return Poll::Ready(Some(Err(e)));
3233 }
3234 Poll::Pending => {
3235 self.state = VarLengthStreamState::Warming(fut);
3236 return Poll::Pending;
3237 }
3238 },
3239 VarLengthStreamState::Reading => {
3240 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3242 return Poll::Ready(Some(Err(
3243 datafusion::error::DataFusionError::Execution(e.to_string()),
3244 )));
3245 }
3246
3247 match self.input.poll_next_unpin(cx) {
3248 Poll::Ready(Some(Ok(batch))) => {
3249 let eid_filter = EidFilter::AllAllowed;
3252 let vid_filter = VidFilter::AllAllowed;
3253 let base_result =
3254 self.process_batch_base(batch, &eid_filter, &vid_filter);
3255 let base_batch = match base_result {
3256 Ok(b) => b,
3257 Err(e) => {
3258 self.state = VarLengthStreamState::Reading;
3259 return Poll::Ready(Some(Err(e)));
3260 }
3261 };
3262
3263 if self.exec.target_properties.is_empty() {
3265 self.state = VarLengthStreamState::Reading;
3266 return Poll::Ready(Some(Ok(base_batch)));
3267 }
3268
3269 let schema = self.schema.clone();
3271 let target_variable = self.exec.target_variable.clone();
3272 let target_properties = self.exec.target_properties.clone();
3273 let target_label_name = self.exec.target_label_name.clone();
3274 let graph_ctx = self.exec.graph_ctx.clone();
3275
3276 let fut = hydrate_vlp_target_properties(
3277 base_batch,
3278 schema,
3279 target_variable,
3280 target_properties,
3281 target_label_name,
3282 graph_ctx,
3283 );
3284
3285 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3286 }
3288 Poll::Ready(Some(Err(e))) => {
3289 self.state = VarLengthStreamState::Done;
3290 return Poll::Ready(Some(Err(e)));
3291 }
3292 Poll::Ready(None) => {
3293 self.state = VarLengthStreamState::Done;
3294 return Poll::Ready(None);
3295 }
3296 Poll::Pending => {
3297 self.state = VarLengthStreamState::Reading;
3298 return Poll::Pending;
3299 }
3300 }
3301 }
3302 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3303 Poll::Ready(Ok(batch)) => {
3304 self.state = VarLengthStreamState::Reading;
3305 self.metrics.record_output(batch.num_rows());
3306 return Poll::Ready(Some(Ok(batch)));
3307 }
3308 Poll::Ready(Err(e)) => {
3309 self.state = VarLengthStreamState::Done;
3310 return Poll::Ready(Some(Err(e)));
3311 }
3312 Poll::Pending => {
3313 self.state = VarLengthStreamState::Materializing(fut);
3314 return Poll::Pending;
3315 }
3316 },
3317 VarLengthStreamState::Done => {
3318 return Poll::Ready(None);
3319 }
3320 }
3321 }
3322 }
3323}
3324
3325impl GraphVariableLengthTraverseStream {
3326 fn process_batch_base(
3327 &self,
3328 batch: RecordBatch,
3329 eid_filter: &EidFilter,
3330 vid_filter: &VidFilter,
3331 ) -> DFResult<RecordBatch> {
3332 let source_col = batch
3333 .column_by_name(&self.exec.source_column)
3334 .ok_or_else(|| {
3335 datafusion::error::DataFusionError::Execution(format!(
3336 "Source column '{}' not found",
3337 self.exec.source_column
3338 ))
3339 })?;
3340
3341 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3342 let source_vids: &UInt64Array = &source_vid_cow;
3343
3344 let bound_target_cow = self
3346 .exec
3347 .bound_target_column
3348 .as_ref()
3349 .and_then(|col| batch.column_by_name(col))
3350 .map(|c| column_as_vid_array(c.as_ref()))
3351 .transpose()?;
3352 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3353
3354 let used_edge_arrays: Vec<&UInt64Array> = self
3356 .exec
3357 .used_edge_columns
3358 .iter()
3359 .filter_map(|col| {
3360 batch
3361 .column_by_name(col)?
3362 .as_any()
3363 .downcast_ref::<UInt64Array>()
3364 })
3365 .collect();
3366
3367 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3369
3370 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3371 let mut emitted_for_row = false;
3372
3373 if let Some(src) = source_vid {
3374 let vid = Vid::from(src);
3375
3376 let used_eids: FxHashSet<u64> = used_edge_arrays
3378 .iter()
3379 .filter_map(|arr| {
3380 if arr.is_null(row_idx) {
3381 None
3382 } else {
3383 Some(arr.value(row_idx))
3384 }
3385 })
3386 .collect();
3387
3388 match &self.exec.output_mode {
3390 VlpOutputMode::EndpointsOnly => {
3391 let endpoints = self
3392 .exec
3393 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3394 for (target, depth) in endpoints {
3395 if let Some(targets) = expected_targets {
3397 if targets.is_null(row_idx) {
3398 continue;
3399 }
3400 if target.as_u64() != targets.value(row_idx) {
3401 continue;
3402 }
3403 }
3404 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3405 emitted_for_row = true;
3406 }
3407 }
3408 _ => {
3409 let bfs_results = self
3411 .exec
3412 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3413 for (target, hop_count, node_path, edge_path) in bfs_results {
3414 if let Some(targets) = expected_targets {
3416 if targets.is_null(row_idx) {
3417 continue;
3418 }
3419 if target.as_u64() != targets.value(row_idx) {
3420 continue;
3421 }
3422 }
3423 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3424 emitted_for_row = true;
3425 }
3426 }
3427 }
3428 }
3429
3430 if self.exec.is_optional && !emitted_for_row {
3431 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3434 }
3435 }
3436
3437 self.build_output_batch(&batch, &expansions)
3438 }
3439
3440 fn build_output_batch(
3441 &self,
3442 input: &RecordBatch,
3443 expansions: &[VarLengthExpansion],
3444 ) -> DFResult<RecordBatch> {
3445 if expansions.is_empty() {
3446 return Ok(RecordBatch::new_empty(self.schema.clone()));
3447 }
3448
3449 let num_rows = expansions.len();
3450
3451 let indices: Vec<u64> = expansions
3453 .iter()
3454 .map(|(idx, _, _, _, _)| *idx as u64)
3455 .collect();
3456 let indices_array = UInt64Array::from(indices);
3457
3458 let mut columns: Vec<ArrayRef> = Vec::new();
3460 for col in input.columns() {
3461 let expanded = take(col.as_ref(), &indices_array, None)?;
3462 columns.push(expanded);
3463 }
3464
3465 let unmatched_rows: Vec<bool> = expansions
3469 .iter()
3470 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3471 .collect();
3472 let target_vids: Vec<Option<u64>> = expansions
3473 .iter()
3474 .zip(unmatched_rows.iter())
3475 .map(
3476 |((_, vid, _, _, _), unmatched)| {
3477 if *unmatched { None } else { Some(vid.as_u64()) }
3478 },
3479 )
3480 .collect();
3481
3482 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3484 if input.schema().column_with_name(&target_vid_name).is_none() {
3485 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3486 }
3487
3488 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3490 if input
3491 .schema()
3492 .column_with_name(&target_labels_name)
3493 .is_none()
3494 {
3495 use arrow_array::builder::{ListBuilder, StringBuilder};
3496 let query_ctx = self.exec.graph_ctx.query_context();
3497 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3498 for target_vid in &target_vids {
3499 let Some(vid_u64) = target_vid else {
3500 labels_builder.append(false);
3501 continue;
3502 };
3503 let vid = Vid::from(*vid_u64);
3504 let row_labels: Vec<String> =
3505 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3506 Some(labels) => {
3507 labels
3509 }
3510 None => {
3511 if let Some(ref label_name) = self.exec.target_label_name {
3513 vec![label_name.clone()]
3514 } else {
3515 vec![]
3516 }
3517 }
3518 };
3519 let values = labels_builder.values();
3520 for lbl in &row_labels {
3521 values.append_value(lbl);
3522 }
3523 labels_builder.append(true);
3524 }
3525 columns.push(Arc::new(labels_builder.finish()));
3526 }
3527
3528 for prop_name in &self.exec.target_properties {
3530 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3531 if input.schema().column_with_name(&full_prop_name).is_none() {
3532 let col_idx = columns.len();
3533 if col_idx < self.schema.fields().len() {
3534 let field = self.schema.field(col_idx);
3535 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3536 }
3537 }
3538 }
3539
3540 let hop_counts: Vec<u64> = expansions
3542 .iter()
3543 .map(|(_, _, hops, _, _)| *hops as u64)
3544 .collect();
3545 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3546
3547 if self.exec.step_variable.is_some() {
3549 let mut edges_builder = new_edge_list_builder();
3550 let query_ctx = self.exec.graph_ctx.query_context();
3551
3552 for (_, _, _, node_path, edge_path) in expansions {
3553 if node_path.is_empty() && edge_path.is_empty() {
3554 edges_builder.append_null();
3556 } else if edge_path.is_empty() {
3557 edges_builder.append(true);
3559 } else {
3560 for (i, eid) in edge_path.iter().enumerate() {
3561 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3562 .unwrap_or_else(|| "UNKNOWN".to_string());
3563 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3568 *eid,
3569 node_path[i],
3570 node_path[i + 1],
3571 &self.exec.edge_type_ids,
3572 );
3573 append_edge_to_struct(
3574 edges_builder.values(),
3575 *eid,
3576 &type_name,
3577 src,
3578 dst,
3579 &query_ctx,
3580 );
3581 }
3582 edges_builder.append(true);
3583 }
3584 }
3585
3586 columns.push(Arc::new(edges_builder.finish()));
3587 }
3588
3589 if let Some(path_var_name) = &self.exec.path_variable {
3594 let existing_path_col_idx = input
3595 .schema()
3596 .column_with_name(path_var_name)
3597 .map(|(idx, _)| idx);
3598 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3600 let existing_path = existing_path_arc
3601 .as_ref()
3602 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3603
3604 let mut nodes_builder = new_node_list_builder();
3605 let mut rels_builder = new_edge_list_builder();
3606 let query_ctx = self.exec.graph_ctx.query_context();
3607 let mut path_validity = Vec::with_capacity(expansions.len());
3608
3609 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3610 if node_path.is_empty() && edge_path.is_empty() {
3611 nodes_builder.append(false);
3612 rels_builder.append(false);
3613 path_validity.push(false);
3614 continue;
3615 }
3616
3617 let skip_first_vlp_node = if let Some(existing) = existing_path {
3619 if !existing.is_null(row_out_idx) {
3620 prepend_existing_path(
3621 existing,
3622 row_out_idx,
3623 &mut nodes_builder,
3624 &mut rels_builder,
3625 &query_ctx,
3626 );
3627 true
3628 } else {
3629 false
3630 }
3631 } else {
3632 false
3633 };
3634
3635 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3637 for vid in &node_path[start_idx..] {
3638 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3639 }
3640 nodes_builder.append(true);
3641
3642 for (i, eid) in edge_path.iter().enumerate() {
3643 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3644 .unwrap_or_else(|| "UNKNOWN".to_string());
3645 let (src, dst) = self.exec.graph_ctx.resolve_stored_edge_endpoints(
3650 *eid,
3651 node_path[i],
3652 node_path[i + 1],
3653 &self.exec.edge_type_ids,
3654 );
3655 append_edge_to_struct(
3656 rels_builder.values(),
3657 *eid,
3658 &type_name,
3659 src,
3660 dst,
3661 &query_ctx,
3662 );
3663 }
3664 rels_builder.append(true);
3665 path_validity.push(true);
3666 }
3667
3668 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3670 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3671
3672 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3674 let rels_field = Arc::new(Field::new(
3675 "relationships",
3676 rels_array.data_type().clone(),
3677 true,
3678 ));
3679
3680 let path_struct = arrow_array::StructArray::try_new(
3682 vec![nodes_field, rels_field].into(),
3683 vec![nodes_array, rels_array],
3684 Some(arrow::buffer::NullBuffer::from(path_validity)),
3685 )
3686 .map_err(arrow_err)?;
3687
3688 if let Some(idx) = existing_path_col_idx {
3689 columns[idx] = Arc::new(path_struct);
3690 } else {
3691 columns.push(Arc::new(path_struct));
3692 }
3693 }
3694
3695 self.metrics.record_output(num_rows);
3696
3697 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3698 }
3699}
3700
3701impl RecordBatchStream for GraphVariableLengthTraverseStream {
3702 fn schema(&self) -> SchemaRef {
3703 self.schema.clone()
3704 }
3705}
3706
3707async fn hydrate_vlp_target_properties(
3712 base_batch: RecordBatch,
3713 schema: SchemaRef,
3714 target_variable: String,
3715 target_properties: Vec<String>,
3716 target_label_name: Option<String>,
3717 graph_ctx: Arc<GraphExecutionContext>,
3718) -> DFResult<RecordBatch> {
3719 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3720 return Ok(base_batch);
3721 }
3722
3723 let target_vid_col_name = format!("{}._vid", target_variable);
3730 let vid_col_idx = schema
3731 .fields()
3732 .iter()
3733 .enumerate()
3734 .rev()
3735 .find(|(_, f)| f.name() == &target_vid_col_name)
3736 .map(|(i, _)| i);
3737
3738 let Some(vid_col_idx) = vid_col_idx else {
3739 return Ok(base_batch);
3740 };
3741
3742 let vid_col = base_batch.column(vid_col_idx);
3743 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3744 let target_vid_array: &UInt64Array = &target_vid_cow;
3745
3746 let target_vids: Vec<Vid> = target_vid_array
3747 .iter()
3748 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3751 .collect();
3752
3753 let mut property_columns: Vec<ArrayRef> = Vec::new();
3755
3756 if let Some(ref label_name) = target_label_name {
3757 let property_manager = graph_ctx.property_manager();
3758 let query_ctx = graph_ctx.query_context();
3759
3760 let props_map = property_manager
3761 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3762 .await
3763 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3764
3765 let uni_schema = graph_ctx.storage().schema_manager().schema();
3766 let label_props = uni_schema.properties.get(label_name.as_str());
3767
3768 for prop_name in &target_properties {
3769 let data_type = resolve_property_type(prop_name, label_props);
3770 let column =
3771 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3772 property_columns.push(column);
3773 }
3774 } else {
3775 let non_internal_props: Vec<&str> = target_properties
3778 .iter()
3779 .filter(|p| *p != "_all_props")
3780 .map(|s| s.as_str())
3781 .collect();
3782 let property_manager = graph_ctx.property_manager();
3783 let query_ctx = graph_ctx.query_context();
3784
3785 let props_map = if !non_internal_props.is_empty() {
3786 property_manager
3787 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3788 .await
3789 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3790 } else {
3791 std::collections::HashMap::new()
3792 };
3793
3794 for prop_name in &target_properties {
3795 if prop_name == "_all_props" {
3796 use arrow_array::builder::LargeBinaryBuilder;
3799
3800 let mut builder = LargeBinaryBuilder::new();
3801 let l0_ctx = graph_ctx.l0_context();
3802 for vid in &target_vids {
3803 let mut merged_props: HashMap<String, uni_common::Value> = HashMap::new();
3804 if let Some(vid_props) = props_map.get(vid) {
3806 for (k, v) in vid_props.iter() {
3807 merged_props.insert(k.clone(), v.clone());
3808 }
3809 }
3810 for l0 in l0_ctx.iter_l0_buffers() {
3812 let guard = l0.read();
3813 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3814 for (k, v) in l0_props.iter() {
3815 merged_props.insert(k.clone(), v.clone());
3816 }
3817 }
3818 }
3819 if merged_props.is_empty() {
3820 builder.append_null();
3821 } else {
3822 builder.append_value(uni_common::cypher_value_codec::encode(
3823 &uni_common::Value::Map(merged_props),
3824 ));
3825 }
3826 }
3827 property_columns.push(Arc::new(builder.finish()));
3828 } else {
3829 let column = build_property_column_static(
3830 &target_vids,
3831 &props_map,
3832 prop_name,
3833 &arrow::datatypes::DataType::LargeBinary,
3834 )?;
3835 property_columns.push(column);
3836 }
3837 }
3838 }
3839
3840 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3846 let mut prop_idx = 0;
3847 for (col_idx, field) in schema.fields().iter().enumerate() {
3848 let is_target_prop = col_idx > vid_col_idx
3849 && target_properties
3850 .iter()
3851 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3852 if is_target_prop && prop_idx < property_columns.len() {
3853 new_columns.push(property_columns[prop_idx].clone());
3854 prop_idx += 1;
3855 } else {
3856 new_columns.push(base_batch.column(col_idx).clone());
3857 }
3858 }
3859
3860 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3861}
3862
3863pub struct GraphVariableLengthTraverseMainExec {
3873 input: Arc<dyn ExecutionPlan>,
3875
3876 source_column: String,
3878
3879 type_names: Vec<String>,
3881
3882 direction: Direction,
3884
3885 min_hops: usize,
3887
3888 max_hops: usize,
3890
3891 target_variable: String,
3893
3894 step_variable: Option<String>,
3896
3897 path_variable: Option<String>,
3899
3900 target_properties: Vec<String>,
3902
3903 is_optional: bool,
3905
3906 bound_target_column: Option<String>,
3908
3909 edge_lance_filter: Option<String>,
3911
3912 edge_property_conditions: Vec<(String, UniValue)>,
3915
3916 used_edge_columns: Vec<String>,
3918
3919 path_mode: super::nfa::PathMode,
3921
3922 output_mode: super::nfa::VlpOutputMode,
3924
3925 graph_ctx: Arc<GraphExecutionContext>,
3927
3928 schema: SchemaRef,
3930
3931 properties: Arc<PlanProperties>,
3933
3934 metrics: ExecutionPlanMetricsSet,
3936}
3937
3938impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3939 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3940 f.debug_struct("GraphVariableLengthTraverseMainExec")
3941 .field("source_column", &self.source_column)
3942 .field("type_names", &self.type_names)
3943 .field("direction", &self.direction)
3944 .field("min_hops", &self.min_hops)
3945 .field("max_hops", &self.max_hops)
3946 .field("target_variable", &self.target_variable)
3947 .finish()
3948 }
3949}
3950
3951impl GraphVariableLengthTraverseMainExec {
3952 #[expect(clippy::too_many_arguments)]
3954 pub fn new(
3955 input: Arc<dyn ExecutionPlan>,
3956 source_column: impl Into<String>,
3957 type_names: Vec<String>,
3958 direction: Direction,
3959 min_hops: usize,
3960 max_hops: usize,
3961 target_variable: impl Into<String>,
3962 step_variable: Option<String>,
3963 path_variable: Option<String>,
3964 target_properties: Vec<String>,
3965 graph_ctx: Arc<GraphExecutionContext>,
3966 is_optional: bool,
3967 bound_target_column: Option<String>,
3968 edge_lance_filter: Option<String>,
3969 edge_property_conditions: Vec<(String, UniValue)>,
3970 used_edge_columns: Vec<String>,
3971 path_mode: super::nfa::PathMode,
3972 output_mode: super::nfa::VlpOutputMode,
3973 ) -> Self {
3974 let source_column = source_column.into();
3975 let target_variable = target_variable.into();
3976
3977 let schema = Self::build_schema(
3979 input.schema(),
3980 &target_variable,
3981 step_variable.as_deref(),
3982 path_variable.as_deref(),
3983 &target_properties,
3984 );
3985 let properties = compute_plan_properties(schema.clone());
3986
3987 Self {
3988 input,
3989 source_column,
3990 type_names,
3991 direction,
3992 min_hops,
3993 max_hops,
3994 target_variable,
3995 step_variable,
3996 path_variable,
3997 target_properties,
3998 is_optional,
3999 bound_target_column,
4000 edge_lance_filter,
4001 edge_property_conditions,
4002 used_edge_columns,
4003 path_mode,
4004 output_mode,
4005 graph_ctx,
4006 schema,
4007 properties,
4008 metrics: ExecutionPlanMetricsSet::new(),
4009 }
4010 }
4011
4012 fn build_schema(
4014 input_schema: SchemaRef,
4015 target_variable: &str,
4016 step_variable: Option<&str>,
4017 path_variable: Option<&str>,
4018 target_properties: &[String],
4019 ) -> SchemaRef {
4020 let mut fields: Vec<Field> = input_schema
4021 .fields()
4022 .iter()
4023 .map(|f| f.as_ref().clone())
4024 .collect();
4025
4026 let target_vid_name = format!("{}._vid", target_variable);
4028 if input_schema.column_with_name(&target_vid_name).is_none() {
4029 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
4030 }
4031
4032 let target_labels_name = format!("{}._labels", target_variable);
4034 if input_schema.column_with_name(&target_labels_name).is_none() {
4035 fields.push(Field::new(target_labels_name, labels_data_type(), true));
4036 }
4037
4038 fields.push(Field::new("_hop_count", DataType::UInt64, false));
4040
4041 if let Some(step_var) = step_variable {
4044 fields.push(build_edge_list_field(step_var));
4045 }
4046
4047 if let Some(path_var) = path_variable
4049 && input_schema.column_with_name(path_var).is_none()
4050 {
4051 fields.push(build_path_struct_field(path_var));
4052 }
4053
4054 for prop in target_properties {
4057 let prop_name = format!("{}.{}", target_variable, prop);
4058 if input_schema.column_with_name(&prop_name).is_none() {
4059 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
4060 }
4061 }
4062
4063 Arc::new(Schema::new(fields))
4064 }
4065}
4066
4067impl DisplayAs for GraphVariableLengthTraverseMainExec {
4068 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
4069 write!(
4070 f,
4071 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
4072 self.source_column, self.type_names, self.min_hops, self.max_hops
4073 )
4074 }
4075}
4076
4077impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
4078 fn name(&self) -> &str {
4079 "GraphVariableLengthTraverseMainExec"
4080 }
4081
4082 fn as_any(&self) -> &dyn Any {
4083 self
4084 }
4085
4086 fn schema(&self) -> SchemaRef {
4087 self.schema.clone()
4088 }
4089
4090 fn properties(&self) -> &Arc<PlanProperties> {
4091 &self.properties
4092 }
4093
4094 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
4095 vec![&self.input]
4096 }
4097
4098 fn with_new_children(
4099 self: Arc<Self>,
4100 children: Vec<Arc<dyn ExecutionPlan>>,
4101 ) -> DFResult<Arc<dyn ExecutionPlan>> {
4102 if children.len() != 1 {
4103 return Err(datafusion::error::DataFusionError::Plan(
4104 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
4105 ));
4106 }
4107
4108 Ok(Arc::new(Self::new(
4109 children[0].clone(),
4110 self.source_column.clone(),
4111 self.type_names.clone(),
4112 self.direction,
4113 self.min_hops,
4114 self.max_hops,
4115 self.target_variable.clone(),
4116 self.step_variable.clone(),
4117 self.path_variable.clone(),
4118 self.target_properties.clone(),
4119 self.graph_ctx.clone(),
4120 self.is_optional,
4121 self.bound_target_column.clone(),
4122 self.edge_lance_filter.clone(),
4123 self.edge_property_conditions.clone(),
4124 self.used_edge_columns.clone(),
4125 self.path_mode.clone(),
4126 self.output_mode.clone(),
4127 )))
4128 }
4129
4130 fn execute(
4131 &self,
4132 partition: usize,
4133 context: Arc<TaskContext>,
4134 ) -> DFResult<SendableRecordBatchStream> {
4135 let input_stream = self.input.execute(partition, context)?;
4136 let metrics = BaselineMetrics::new(&self.metrics, partition);
4137
4138 let graph_ctx = self.graph_ctx.clone();
4140 let type_names = self.type_names.clone();
4141 let direction = self.direction;
4142 let load_fut = async move {
4143 build_edge_adjacency_map(&graph_ctx, &type_names, direction, None).await
4145 };
4146
4147 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4148 input: input_stream,
4149 source_column: self.source_column.clone(),
4150 type_names: self.type_names.clone(),
4151 direction: self.direction,
4152 min_hops: self.min_hops,
4153 max_hops: self.max_hops,
4154 target_variable: self.target_variable.clone(),
4155 step_variable: self.step_variable.clone(),
4156 path_variable: self.path_variable.clone(),
4157 target_properties: self.target_properties.clone(),
4158 graph_ctx: self.graph_ctx.clone(),
4159 is_optional: self.is_optional,
4160 bound_target_column: self.bound_target_column.clone(),
4161 edge_lance_filter: self.edge_lance_filter.clone(),
4162 edge_property_conditions: self.edge_property_conditions.clone(),
4163 used_edge_columns: self.used_edge_columns.clone(),
4164 path_mode: self.path_mode.clone(),
4165 output_mode: self.output_mode.clone(),
4166 schema: self.schema.clone(),
4167 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4168 metrics,
4169 }))
4170 }
4171
4172 fn metrics(&self) -> Option<MetricsSet> {
4173 Some(self.metrics.clone_inner())
4174 }
4175}
4176
4177enum VarLengthMainStreamState {
4179 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4181 Processing(EdgeAdjacencyMap),
4183 Materializing {
4185 adjacency: EdgeAdjacencyMap,
4186 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4187 },
4188 Done,
4190}
4191
4192#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4194struct GraphVariableLengthTraverseMainStream {
4195 input: SendableRecordBatchStream,
4196 source_column: String,
4197 type_names: Vec<String>,
4198 direction: Direction,
4199 min_hops: usize,
4200 max_hops: usize,
4201 target_variable: String,
4202 step_variable: Option<String>,
4204 path_variable: Option<String>,
4205 target_properties: Vec<String>,
4206 graph_ctx: Arc<GraphExecutionContext>,
4207 is_optional: bool,
4208 bound_target_column: Option<String>,
4209 edge_lance_filter: Option<String>,
4210 edge_property_conditions: Vec<(String, UniValue)>,
4212 used_edge_columns: Vec<String>,
4213 path_mode: super::nfa::PathMode,
4214 output_mode: super::nfa::VlpOutputMode,
4215 schema: SchemaRef,
4216 state: VarLengthMainStreamState,
4217 metrics: BaselineMetrics,
4218}
4219
4220type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4222
4223impl GraphVariableLengthTraverseMainStream {
4224 fn bfs(
4230 &self,
4231 source: Vid,
4232 adjacency: &EdgeAdjacencyMap,
4233 used_eids: &FxHashSet<u64>,
4234 ) -> Vec<MainBfsResult> {
4235 let mut results = Vec::new();
4236 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4237
4238 queue.push_back((source, 0, vec![source], vec![]));
4239
4240 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4241 if depth >= self.min_hops && depth <= self.max_hops {
4243 results.push((current, depth, node_path.clone(), edge_path.clone()));
4244 }
4245
4246 if depth >= self.max_hops {
4248 continue;
4249 }
4250
4251 if let Some(neighbors) = adjacency.get(¤t) {
4253 let is_undirected = matches!(self.direction, Direction::Both);
4254 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4255
4256 for (neighbor, eid, _edge_type, props) in neighbors {
4257 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4259 continue;
4260 }
4261
4262 if edge_path.contains(eid) {
4264 continue;
4265 }
4266
4267 if used_eids.contains(&eid.as_u64()) {
4270 continue;
4271 }
4272
4273 if !self.edge_property_conditions.is_empty() {
4275 let passes =
4276 self.edge_property_conditions
4277 .iter()
4278 .all(|(name, expected)| {
4279 props.get(name).is_some_and(|actual| actual == expected)
4280 });
4281 if !passes {
4282 continue;
4283 }
4284 }
4285
4286 let mut new_node_path = node_path.clone();
4287 new_node_path.push(*neighbor);
4288 let mut new_edge_path = edge_path.clone();
4289 new_edge_path.push(*eid);
4290 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4291 }
4292 }
4293 }
4294
4295 results
4296 }
4297
4298 fn process_batch(
4300 &self,
4301 batch: RecordBatch,
4302 adjacency: &EdgeAdjacencyMap,
4303 ) -> DFResult<RecordBatch> {
4304 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4305 datafusion::error::DataFusionError::Execution(format!(
4306 "Source column '{}' not found in input batch",
4307 self.source_column
4308 ))
4309 })?;
4310
4311 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4312 let source_vids: &UInt64Array = &source_vid_cow;
4313
4314 let bound_target_cow = self
4316 .bound_target_column
4317 .as_ref()
4318 .and_then(|col| batch.column_by_name(col))
4319 .map(|c| column_as_vid_array(c.as_ref()))
4320 .transpose()?;
4321 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4322
4323 let used_edge_arrays: Vec<&UInt64Array> = self
4325 .used_edge_columns
4326 .iter()
4327 .filter_map(|col| {
4328 batch
4329 .column_by_name(col)?
4330 .as_any()
4331 .downcast_ref::<UInt64Array>()
4332 })
4333 .collect();
4334
4335 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4337
4338 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4339 let mut emitted_for_row = false;
4340
4341 if let Some(source_u64) = source_opt {
4342 let source = Vid::from(source_u64);
4343
4344 let used_eids: FxHashSet<u64> = used_edge_arrays
4346 .iter()
4347 .filter_map(|arr| {
4348 if arr.is_null(row_idx) {
4349 None
4350 } else {
4351 Some(arr.value(row_idx))
4352 }
4353 })
4354 .collect();
4355
4356 let bfs_results = self.bfs(source, adjacency, &used_eids);
4357
4358 for (target, hops, node_path, edge_path) in bfs_results {
4359 if let Some(targets) = expected_targets {
4362 if targets.is_null(row_idx) {
4363 continue;
4364 }
4365 let expected_vid = targets.value(row_idx);
4366 if target.as_u64() != expected_vid {
4367 continue;
4368 }
4369 }
4370
4371 expansions.push((row_idx, target, hops, node_path, edge_path));
4372 emitted_for_row = true;
4373 }
4374 }
4375
4376 if self.is_optional && !emitted_for_row {
4377 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4379 }
4380 }
4381
4382 if expansions.is_empty() {
4383 if self.is_optional {
4384 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4385 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4386 }
4387 return Ok(RecordBatch::new_empty(self.schema.clone()));
4388 }
4389
4390 let num_rows = expansions.len();
4391 self.metrics.record_output(num_rows);
4392
4393 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4395
4396 let edge_type_ids: Vec<u32> = {
4400 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
4401 self.type_names
4402 .iter()
4403 .filter_map(|name| uni_schema.edge_type_id_by_name(name))
4404 .collect()
4405 };
4406
4407 for col_idx in 0..batch.num_columns() {
4409 let array = batch.column(col_idx);
4410 let indices: Vec<u64> = expansions
4411 .iter()
4412 .map(|(idx, _, _, _, _)| *idx as u64)
4413 .collect();
4414 let take_indices = UInt64Array::from(indices);
4415 let expanded = arrow::compute::take(array, &take_indices, None)?;
4416 columns.push(expanded);
4417 }
4418
4419 let target_vid_name = format!("{}._vid", self.target_variable);
4421 if batch.schema().column_with_name(&target_vid_name).is_none() {
4422 let target_vids: Vec<Option<u64>> = expansions
4423 .iter()
4424 .map(|(_, vid, _, node_path, edge_path)| {
4425 if node_path.is_empty() && edge_path.is_empty() {
4426 None
4427 } else {
4428 Some(vid.as_u64())
4429 }
4430 })
4431 .collect();
4432 columns.push(Arc::new(UInt64Array::from(target_vids)));
4433 }
4434
4435 let target_labels_name = format!("{}._labels", self.target_variable);
4437 if batch
4438 .schema()
4439 .column_with_name(&target_labels_name)
4440 .is_none()
4441 {
4442 use arrow_array::builder::{ListBuilder, StringBuilder};
4443 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4444 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4445 if node_path.is_empty() && edge_path.is_empty() {
4446 labels_builder.append(false);
4447 continue;
4448 }
4449 let mut row_labels: Vec<String> = Vec::new();
4450 let labels =
4451 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4452 for lbl in &labels {
4453 if !row_labels.contains(lbl) {
4454 row_labels.push(lbl.clone());
4455 }
4456 }
4457 let values = labels_builder.values();
4458 for lbl in &row_labels {
4459 values.append_value(lbl);
4460 }
4461 labels_builder.append(true);
4462 }
4463 columns.push(Arc::new(labels_builder.finish()));
4464 }
4465
4466 let hop_counts: Vec<u64> = expansions
4468 .iter()
4469 .map(|(_, _, hops, _, _)| *hops as u64)
4470 .collect();
4471 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4472
4473 if self.step_variable.is_some() {
4475 let mut edges_builder = new_edge_list_builder();
4476 let query_ctx = self.graph_ctx.query_context();
4477 let type_names_str = self.type_names.join("|");
4478
4479 for (_, _, _, node_path, edge_path) in expansions.iter() {
4480 if node_path.is_empty() && edge_path.is_empty() {
4481 edges_builder.append_null();
4482 } else if edge_path.is_empty() {
4483 edges_builder.append(true);
4485 } else {
4486 for (i, eid) in edge_path.iter().enumerate() {
4487 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4492 *eid,
4493 node_path[i],
4494 node_path[i + 1],
4495 &edge_type_ids,
4496 );
4497 append_edge_to_struct(
4498 edges_builder.values(),
4499 *eid,
4500 &type_names_str,
4501 src,
4502 dst,
4503 &query_ctx,
4504 );
4505 }
4506 edges_builder.append(true);
4507 }
4508 }
4509
4510 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4511 }
4512
4513 if let Some(path_var_name) = &self.path_variable {
4517 let existing_path_col_idx = batch
4518 .schema()
4519 .column_with_name(path_var_name)
4520 .map(|(idx, _)| idx);
4521 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4522 let existing_path = existing_path_arc
4523 .as_ref()
4524 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4525
4526 let mut nodes_builder = new_node_list_builder();
4527 let mut rels_builder = new_edge_list_builder();
4528 let query_ctx = self.graph_ctx.query_context();
4529 let type_names_str = self.type_names.join("|");
4530 let mut path_validity = Vec::with_capacity(expansions.len());
4531
4532 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4533 if node_path.is_empty() && edge_path.is_empty() {
4534 nodes_builder.append(false);
4535 rels_builder.append(false);
4536 path_validity.push(false);
4537 continue;
4538 }
4539
4540 let skip_first_vlp_node = if let Some(existing) = existing_path {
4542 if !existing.is_null(row_out_idx) {
4543 prepend_existing_path(
4544 existing,
4545 row_out_idx,
4546 &mut nodes_builder,
4547 &mut rels_builder,
4548 &query_ctx,
4549 );
4550 true
4551 } else {
4552 false
4553 }
4554 } else {
4555 false
4556 };
4557
4558 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4560 for vid in &node_path[start_idx..] {
4561 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4562 }
4563 nodes_builder.append(true);
4564
4565 for (i, eid) in edge_path.iter().enumerate() {
4566 let (src, dst) = self.graph_ctx.resolve_stored_edge_endpoints(
4571 *eid,
4572 node_path[i],
4573 node_path[i + 1],
4574 &edge_type_ids,
4575 );
4576 append_edge_to_struct(
4577 rels_builder.values(),
4578 *eid,
4579 &type_names_str,
4580 src,
4581 dst,
4582 &query_ctx,
4583 );
4584 }
4585 rels_builder.append(true);
4586 path_validity.push(true);
4587 }
4588
4589 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4591 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4592
4593 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4595 let rels_field = Arc::new(Field::new(
4596 "relationships",
4597 rels_array.data_type().clone(),
4598 true,
4599 ));
4600
4601 let path_struct = arrow_array::StructArray::try_new(
4603 vec![nodes_field, rels_field].into(),
4604 vec![nodes_array, rels_array],
4605 Some(arrow::buffer::NullBuffer::from(path_validity)),
4606 )
4607 .map_err(arrow_err)?;
4608
4609 if let Some(idx) = existing_path_col_idx {
4610 columns[idx] = Arc::new(path_struct);
4611 } else {
4612 columns.push(Arc::new(path_struct));
4613 }
4614 }
4615
4616 for prop_name in &self.target_properties {
4619 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4620 if batch.schema().column_with_name(&full_prop_name).is_none() {
4621 columns.push(arrow_array::new_null_array(
4622 &DataType::LargeBinary,
4623 num_rows,
4624 ));
4625 }
4626 }
4627
4628 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4629 }
4630}
4631
4632impl Stream for GraphVariableLengthTraverseMainStream {
4633 type Item = DFResult<RecordBatch>;
4634
4635 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4636 let metrics = self.metrics.clone();
4637 let _timer = metrics.elapsed_compute().timer();
4638 loop {
4639 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4640
4641 match state {
4642 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4643 Poll::Ready(Ok(adjacency)) => {
4644 self.state = VarLengthMainStreamState::Processing(adjacency);
4645 }
4647 Poll::Ready(Err(e)) => {
4648 self.state = VarLengthMainStreamState::Done;
4649 return Poll::Ready(Some(Err(e)));
4650 }
4651 Poll::Pending => {
4652 self.state = VarLengthMainStreamState::Loading(fut);
4653 return Poll::Pending;
4654 }
4655 },
4656 VarLengthMainStreamState::Processing(adjacency) => {
4657 match self.input.poll_next_unpin(cx) {
4658 Poll::Ready(Some(Ok(batch))) => {
4659 let base_batch = match self.process_batch(batch, &adjacency) {
4660 Ok(b) => b,
4661 Err(e) => {
4662 self.state = VarLengthMainStreamState::Processing(adjacency);
4663 return Poll::Ready(Some(Err(e)));
4664 }
4665 };
4666
4667 if self.target_properties.is_empty() {
4669 self.state = VarLengthMainStreamState::Processing(adjacency);
4670 return Poll::Ready(Some(Ok(base_batch)));
4671 }
4672
4673 let schema = self.schema.clone();
4675 let target_variable = self.target_variable.clone();
4676 let target_properties = self.target_properties.clone();
4677 let graph_ctx = self.graph_ctx.clone();
4678
4679 let fut = hydrate_vlp_target_properties(
4680 base_batch,
4681 schema,
4682 target_variable,
4683 target_properties,
4684 None, graph_ctx,
4686 );
4687
4688 self.state = VarLengthMainStreamState::Materializing {
4689 adjacency,
4690 fut: Box::pin(fut),
4691 };
4692 }
4694 Poll::Ready(Some(Err(e))) => {
4695 self.state = VarLengthMainStreamState::Done;
4696 return Poll::Ready(Some(Err(e)));
4697 }
4698 Poll::Ready(None) => {
4699 self.state = VarLengthMainStreamState::Done;
4700 return Poll::Ready(None);
4701 }
4702 Poll::Pending => {
4703 self.state = VarLengthMainStreamState::Processing(adjacency);
4704 return Poll::Pending;
4705 }
4706 }
4707 }
4708 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4709 match fut.as_mut().poll(cx) {
4710 Poll::Ready(Ok(batch)) => {
4711 self.state = VarLengthMainStreamState::Processing(adjacency);
4712 return Poll::Ready(Some(Ok(batch)));
4713 }
4714 Poll::Ready(Err(e)) => {
4715 self.state = VarLengthMainStreamState::Done;
4716 return Poll::Ready(Some(Err(e)));
4717 }
4718 Poll::Pending => {
4719 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4720 return Poll::Pending;
4721 }
4722 }
4723 }
4724 VarLengthMainStreamState::Done => {
4725 return Poll::Ready(None);
4726 }
4727 }
4728 }
4729 }
4730}
4731
4732impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4733 fn schema(&self) -> SchemaRef {
4734 self.schema.clone()
4735 }
4736}
4737
4738#[cfg(test)]
4739mod tests {
4740 use super::*;
4741
4742 #[test]
4743 fn test_traverse_schema_without_edge() {
4744 let input_schema = Arc::new(Schema::new(vec![Field::new(
4745 "a._vid",
4746 DataType::UInt64,
4747 false,
4748 )]));
4749
4750 let output_schema =
4751 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4752
4753 assert_eq!(output_schema.fields().len(), 4);
4755 assert_eq!(output_schema.field(0).name(), "a._vid");
4756 assert_eq!(output_schema.field(1).name(), "m._vid");
4757 assert_eq!(output_schema.field(2).name(), "m._labels");
4758 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4759 }
4760
4761 #[test]
4762 fn test_traverse_schema_with_edge() {
4763 let input_schema = Arc::new(Schema::new(vec![Field::new(
4764 "a._vid",
4765 DataType::UInt64,
4766 false,
4767 )]));
4768
4769 let output_schema = GraphTraverseExec::build_schema(
4770 input_schema,
4771 "m",
4772 Some("r"),
4773 &[],
4774 &[],
4775 None,
4776 None,
4777 false,
4778 );
4779
4780 assert_eq!(output_schema.fields().len(), 5);
4782 assert_eq!(output_schema.field(0).name(), "a._vid");
4783 assert_eq!(output_schema.field(1).name(), "m._vid");
4784 assert_eq!(output_schema.field(2).name(), "m._labels");
4785 assert_eq!(output_schema.field(3).name(), "r._eid");
4786 assert_eq!(output_schema.field(4).name(), "r._type");
4787 }
4788
4789 #[test]
4790 fn test_traverse_schema_with_target_properties() {
4791 let input_schema = Arc::new(Schema::new(vec![Field::new(
4792 "a._vid",
4793 DataType::UInt64,
4794 false,
4795 )]));
4796
4797 let target_props = vec!["name".to_string(), "age".to_string()];
4798 let output_schema = GraphTraverseExec::build_schema(
4799 input_schema,
4800 "m",
4801 Some("r"),
4802 &[],
4803 &target_props,
4804 None,
4805 None,
4806 false,
4807 );
4808
4809 assert_eq!(output_schema.fields().len(), 7);
4811 assert_eq!(output_schema.field(0).name(), "a._vid");
4812 assert_eq!(output_schema.field(1).name(), "m._vid");
4813 assert_eq!(output_schema.field(2).name(), "m._labels");
4814 assert_eq!(output_schema.field(3).name(), "m.name");
4815 assert_eq!(output_schema.field(4).name(), "m.age");
4816 assert_eq!(output_schema.field(5).name(), "r._eid");
4817 assert_eq!(output_schema.field(6).name(), "r._type");
4818 }
4819
4820 #[test]
4821 fn test_variable_length_schema() {
4822 let input_schema = Arc::new(Schema::new(vec![Field::new(
4823 "a._vid",
4824 DataType::UInt64,
4825 false,
4826 )]));
4827
4828 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4829 input_schema,
4830 "b",
4831 None,
4832 Some("p"),
4833 &[],
4834 None,
4835 );
4836
4837 assert_eq!(output_schema.fields().len(), 5);
4838 assert_eq!(output_schema.field(0).name(), "a._vid");
4839 assert_eq!(output_schema.field(1).name(), "b._vid");
4840 assert_eq!(output_schema.field(2).name(), "b._labels");
4841 assert_eq!(output_schema.field(3).name(), "_hop_count");
4842 assert_eq!(output_schema.field(4).name(), "p");
4843 }
4844
4845 #[test]
4846 fn test_traverse_main_schema_without_edge() {
4847 let input_schema = Arc::new(Schema::new(vec![Field::new(
4848 "a._vid",
4849 DataType::UInt64,
4850 false,
4851 )]));
4852
4853 let output_schema =
4854 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4855
4856 assert_eq!(output_schema.fields().len(), 4);
4858 assert_eq!(output_schema.field(0).name(), "a._vid");
4859 assert_eq!(output_schema.field(1).name(), "m._vid");
4860 assert_eq!(output_schema.field(2).name(), "m._labels");
4861 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4862 }
4863
4864 #[test]
4865 fn test_traverse_main_schema_with_edge() {
4866 let input_schema = Arc::new(Schema::new(vec![Field::new(
4867 "a._vid",
4868 DataType::UInt64,
4869 false,
4870 )]));
4871
4872 let output_schema = GraphTraverseMainExec::build_schema(
4873 &input_schema,
4874 "m",
4875 &Some("r".to_string()),
4876 &[],
4877 &[],
4878 false,
4879 );
4880
4881 assert_eq!(output_schema.fields().len(), 5);
4883 assert_eq!(output_schema.field(0).name(), "a._vid");
4884 assert_eq!(output_schema.field(1).name(), "m._vid");
4885 assert_eq!(output_schema.field(2).name(), "m._labels");
4886 assert_eq!(output_schema.field(3).name(), "r._eid");
4887 assert_eq!(output_schema.field(4).name(), "r._type");
4888 }
4889
4890 #[test]
4891 fn test_traverse_main_schema_with_edge_properties() {
4892 let input_schema = Arc::new(Schema::new(vec![Field::new(
4893 "a._vid",
4894 DataType::UInt64,
4895 false,
4896 )]));
4897
4898 let edge_props = vec!["weight".to_string(), "since".to_string()];
4899 let output_schema = GraphTraverseMainExec::build_schema(
4900 &input_schema,
4901 "m",
4902 &Some("r".to_string()),
4903 &edge_props,
4904 &[],
4905 false,
4906 );
4907
4908 assert_eq!(output_schema.fields().len(), 7);
4910 assert_eq!(output_schema.field(0).name(), "a._vid");
4911 assert_eq!(output_schema.field(1).name(), "m._vid");
4912 assert_eq!(output_schema.field(2).name(), "m._labels");
4913 assert_eq!(output_schema.field(3).name(), "r._eid");
4914 assert_eq!(output_schema.field(4).name(), "r._type");
4915 assert_eq!(output_schema.field(5).name(), "r.weight");
4916 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4917 assert_eq!(output_schema.field(6).name(), "r.since");
4918 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4919 }
4920
4921 #[test]
4922 fn test_traverse_main_schema_with_target_properties() {
4923 let input_schema = Arc::new(Schema::new(vec![Field::new(
4924 "a._vid",
4925 DataType::UInt64,
4926 false,
4927 )]));
4928
4929 let target_props = vec!["name".to_string(), "age".to_string()];
4930 let output_schema = GraphTraverseMainExec::build_schema(
4931 &input_schema,
4932 "m",
4933 &Some("r".to_string()),
4934 &[],
4935 &target_props,
4936 false,
4937 );
4938
4939 assert_eq!(output_schema.fields().len(), 7);
4941 assert_eq!(output_schema.field(0).name(), "a._vid");
4942 assert_eq!(output_schema.field(1).name(), "m._vid");
4943 assert_eq!(output_schema.field(2).name(), "m._labels");
4944 assert_eq!(output_schema.field(3).name(), "r._eid");
4945 assert_eq!(output_schema.field(4).name(), "r._type");
4946 assert_eq!(output_schema.field(5).name(), "m.name");
4947 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4948 assert_eq!(output_schema.field(6).name(), "m.age");
4949 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4950 }
4951}