1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
36 build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
37 new_edge_list_builder, new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68fn prepend_existing_path(
75 existing_path: &arrow_array::StructArray,
76 row_idx: usize,
77 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79 query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81 let nodes_list = existing_path
83 .column(0)
84 .as_any()
85 .downcast_ref::<arrow_array::ListArray>()
86 .unwrap();
87 let node_values = nodes_list.value(row_idx);
88 let node_struct = node_values
89 .as_any()
90 .downcast_ref::<arrow_array::StructArray>()
91 .unwrap();
92 let vid_col = node_struct
93 .column(0)
94 .as_any()
95 .downcast_ref::<UInt64Array>()
96 .unwrap();
97 for i in 0..vid_col.len() {
98 append_node_to_struct(
99 nodes_builder.values(),
100 Vid::from(vid_col.value(i)),
101 query_ctx,
102 );
103 }
104
105 let rels_list = existing_path
107 .column(1)
108 .as_any()
109 .downcast_ref::<arrow_array::ListArray>()
110 .unwrap();
111 let edge_values = rels_list.value(row_idx);
112 let edge_struct = edge_values
113 .as_any()
114 .downcast_ref::<arrow_array::StructArray>()
115 .unwrap();
116 let eid_col = edge_struct
117 .column(0)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap();
121 let type_col = edge_struct
122 .column(1)
123 .as_any()
124 .downcast_ref::<arrow_array::StringArray>()
125 .unwrap();
126 let src_col = edge_struct
127 .column(2)
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .unwrap();
131 let dst_col = edge_struct
132 .column(3)
133 .as_any()
134 .downcast_ref::<UInt64Array>()
135 .unwrap();
136 for i in 0..eid_col.len() {
137 append_edge_to_struct(
138 rels_builder.values(),
139 Eid::from(eid_col.value(i)),
140 type_col.value(i),
141 src_col.value(i),
142 dst_col.value(i),
143 query_ctx,
144 );
145 }
146}
147
148fn resolve_edge_property_type(
153 prop: &str,
154 schema_props: Option<
155 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156 >,
157) -> DataType {
158 if prop == "overflow_json" {
159 DataType::LargeBinary
160 } else if prop == "_created_at" || prop == "_updated_at" {
161 DataType::Timestamp(arrow_schema::TimeUnit::Nanosecond, Some("UTC".into()))
165 } else {
166 schema_props
167 .and_then(|props| props.get(prop))
168 .map(|meta| meta.r#type.to_arrow())
169 .unwrap_or(DataType::LargeBinary)
170 }
171}
172
173use crate::query::df_graph::common::merged_edge_schema_props;
174
175type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
177
178pub struct GraphTraverseExec {
203 input: Arc<dyn ExecutionPlan>,
205
206 source_column: String,
208
209 edge_type_ids: Vec<u32>,
211
212 direction: Direction,
214
215 target_variable: String,
217
218 edge_variable: Option<String>,
220
221 edge_properties: Vec<String>,
223
224 target_properties: Vec<String>,
226
227 target_label_name: Option<String>,
229
230 target_label_id: Option<u16>,
232
233 graph_ctx: Arc<GraphExecutionContext>,
235
236 optional: bool,
238
239 optional_pattern_vars: HashSet<String>,
242
243 bound_target_column: Option<String>,
246
247 used_edge_columns: Vec<String>,
250
251 schema: SchemaRef,
253
254 properties: Arc<PlanProperties>,
256
257 metrics: ExecutionPlanMetricsSet,
259}
260
261impl fmt::Debug for GraphTraverseExec {
262 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
263 f.debug_struct("GraphTraverseExec")
264 .field("source_column", &self.source_column)
265 .field("edge_type_ids", &self.edge_type_ids)
266 .field("direction", &self.direction)
267 .field("target_variable", &self.target_variable)
268 .field("edge_variable", &self.edge_variable)
269 .finish()
270 }
271}
272
273impl GraphTraverseExec {
274 #[expect(clippy::too_many_arguments)]
290 pub fn new(
291 input: Arc<dyn ExecutionPlan>,
292 source_column: impl Into<String>,
293 edge_type_ids: Vec<u32>,
294 direction: Direction,
295 target_variable: impl Into<String>,
296 edge_variable: Option<String>,
297 edge_properties: Vec<String>,
298 target_properties: Vec<String>,
299 target_label_name: Option<String>,
300 target_label_id: Option<u16>,
301 graph_ctx: Arc<GraphExecutionContext>,
302 optional: bool,
303 optional_pattern_vars: HashSet<String>,
304 bound_target_column: Option<String>,
305 used_edge_columns: Vec<String>,
306 ) -> Self {
307 let source_column = source_column.into();
308 let target_variable = target_variable.into();
309
310 let uni_schema = graph_ctx.storage().schema_manager().schema();
312 let label_props = target_label_name
313 .as_deref()
314 .and_then(|ln| uni_schema.properties.get(ln));
315 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
316 let edge_props = if merged_edge_props.is_empty() {
317 None
318 } else {
319 Some(&merged_edge_props)
320 };
321
322 let schema = Self::build_schema(
324 input.schema(),
325 &target_variable,
326 edge_variable.as_deref(),
327 &edge_properties,
328 &target_properties,
329 label_props,
330 edge_props,
331 optional,
332 );
333
334 let properties = compute_plan_properties(schema.clone());
335
336 Self {
337 input,
338 source_column,
339 edge_type_ids,
340 direction,
341 target_variable,
342 edge_variable,
343 edge_properties,
344 target_properties,
345 target_label_name,
346 target_label_id,
347 graph_ctx,
348 optional,
349 optional_pattern_vars,
350 bound_target_column,
351 used_edge_columns,
352 schema,
353 properties,
354 metrics: ExecutionPlanMetricsSet::new(),
355 }
356 }
357
358 #[expect(
360 clippy::too_many_arguments,
361 reason = "Schema construction needs all field metadata"
362 )]
363 fn build_schema(
364 input_schema: SchemaRef,
365 target_variable: &str,
366 edge_variable: Option<&str>,
367 edge_properties: &[String],
368 target_properties: &[String],
369 label_props: Option<
370 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
371 >,
372 edge_props: Option<
373 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
374 >,
375 optional: bool,
376 ) -> SchemaRef {
377 let mut fields: Vec<Field> = input_schema
378 .fields()
379 .iter()
380 .map(|f| f.as_ref().clone())
381 .collect();
382
383 let target_vid_name = format!("{}._vid", target_variable);
385 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
386
387 fields.push(Field::new(
389 format!("{}._labels", target_variable),
390 labels_data_type(),
391 true,
392 ));
393
394 for prop_name in target_properties {
396 let col_name = format!("{}.{}", target_variable, prop_name);
397 let arrow_type = resolve_property_type(prop_name, label_props);
398 fields.push(Field::new(&col_name, arrow_type, true));
399 }
400
401 if let Some(edge_var) = edge_variable {
403 let edge_id_name = format!("{}._eid", edge_var);
404 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
405
406 fields.push(Field::new(
408 format!("{}._type", edge_var),
409 DataType::Utf8,
410 true,
411 ));
412
413 for prop_name in edge_properties {
415 let prop_col_name = format!("{}.{}", edge_var, prop_name);
416 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
417 fields.push(Field::new(&prop_col_name, arrow_type, true));
418 }
419 } else {
420 let internal_eid_name = format!("__eid_to_{}", target_variable);
423 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
424 }
425
426 Arc::new(Schema::new(fields))
427 }
428}
429
430impl DisplayAs for GraphTraverseExec {
431 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
432 write!(
433 f,
434 "GraphTraverseExec: {} --[{:?}]--> {}",
435 self.source_column, self.edge_type_ids, self.target_variable
436 )?;
437 if let Some(ref edge_var) = self.edge_variable {
438 write!(f, " as {}", edge_var)?;
439 }
440 Ok(())
441 }
442}
443
444impl ExecutionPlan for GraphTraverseExec {
445 fn name(&self) -> &str {
446 "GraphTraverseExec"
447 }
448
449 fn as_any(&self) -> &dyn Any {
450 self
451 }
452
453 fn schema(&self) -> SchemaRef {
454 self.schema.clone()
455 }
456
457 fn properties(&self) -> &Arc<PlanProperties> {
458 &self.properties
459 }
460
461 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
462 vec![&self.input]
463 }
464
465 fn with_new_children(
466 self: Arc<Self>,
467 children: Vec<Arc<dyn ExecutionPlan>>,
468 ) -> DFResult<Arc<dyn ExecutionPlan>> {
469 if children.len() != 1 {
470 return Err(datafusion::error::DataFusionError::Plan(
471 "GraphTraverseExec requires exactly one child".to_string(),
472 ));
473 }
474
475 Ok(Arc::new(Self::new(
476 children[0].clone(),
477 self.source_column.clone(),
478 self.edge_type_ids.clone(),
479 self.direction,
480 self.target_variable.clone(),
481 self.edge_variable.clone(),
482 self.edge_properties.clone(),
483 self.target_properties.clone(),
484 self.target_label_name.clone(),
485 self.target_label_id,
486 self.graph_ctx.clone(),
487 self.optional,
488 self.optional_pattern_vars.clone(),
489 self.bound_target_column.clone(),
490 self.used_edge_columns.clone(),
491 )))
492 }
493
494 fn execute(
495 &self,
496 partition: usize,
497 context: Arc<TaskContext>,
498 ) -> DFResult<SendableRecordBatchStream> {
499 let input_stream = self.input.execute(partition, context)?;
500
501 let metrics = BaselineMetrics::new(&self.metrics, partition);
502
503 let warm_fut = self
504 .graph_ctx
505 .warming_future(self.edge_type_ids.clone(), self.direction);
506
507 Ok(Box::pin(GraphTraverseStream {
508 input: input_stream,
509 source_column: self.source_column.clone(),
510 edge_type_ids: self.edge_type_ids.clone(),
511 direction: self.direction,
512 target_variable: self.target_variable.clone(),
513 edge_variable: self.edge_variable.clone(),
514 edge_properties: self.edge_properties.clone(),
515 target_properties: self.target_properties.clone(),
516 target_label_name: self.target_label_name.clone(),
517 graph_ctx: self.graph_ctx.clone(),
518 optional: self.optional,
519 optional_pattern_vars: self.optional_pattern_vars.clone(),
520 bound_target_column: self.bound_target_column.clone(),
521 used_edge_columns: self.used_edge_columns.clone(),
522 schema: self.schema.clone(),
523 state: TraverseStreamState::Warming(warm_fut),
524 metrics,
525 }))
526 }
527
528 fn metrics(&self) -> Option<MetricsSet> {
529 Some(self.metrics.clone_inner())
530 }
531}
532
533enum TraverseStreamState {
535 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
537 Reading,
539 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
541 Done,
543}
544
545struct GraphTraverseStream {
547 input: SendableRecordBatchStream,
549
550 source_column: String,
552
553 edge_type_ids: Vec<u32>,
555
556 direction: Direction,
558
559 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
561 target_variable: String,
562
563 edge_variable: Option<String>,
565
566 edge_properties: Vec<String>,
568
569 target_properties: Vec<String>,
571
572 target_label_name: Option<String>,
574
575 graph_ctx: Arc<GraphExecutionContext>,
577
578 optional: bool,
580
581 optional_pattern_vars: HashSet<String>,
583
584 bound_target_column: Option<String>,
586
587 used_edge_columns: Vec<String>,
589
590 schema: SchemaRef,
592
593 state: TraverseStreamState,
595
596 metrics: BaselineMetrics,
598}
599
600impl GraphTraverseStream {
601 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
604 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
605 datafusion::error::DataFusionError::Execution(format!(
606 "Source column '{}' not found",
607 self.source_column
608 ))
609 })?;
610
611 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
612 let source_vids: &UInt64Array = &source_vid_cow;
613
614 let bound_target_cow = self
617 .bound_target_column
618 .as_ref()
619 .and_then(|col| batch.column_by_name(col))
620 .map(|c| column_as_vid_array(c.as_ref()))
621 .transpose()?;
622 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
623
624 let used_edge_arrays: Vec<&UInt64Array> = self
626 .used_edge_columns
627 .iter()
628 .filter_map(|col| {
629 batch
630 .column_by_name(col)
631 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
632 })
633 .collect();
634
635 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
636 let is_undirected = matches!(self.direction, Direction::Both);
637
638 for (row_idx, source_vid) in source_vids.iter().enumerate() {
639 let Some(src) = source_vid else {
640 continue;
641 };
642
643 let expected_target = bound_target_vids.map(|arr| {
649 if arr.is_null(row_idx) {
650 None
651 } else {
652 Some(arr.value(row_idx))
653 }
654 });
655
656 let used_eids: HashSet<u64> = used_edge_arrays
658 .iter()
659 .filter_map(|arr| {
660 if arr.is_null(row_idx) {
661 None
662 } else {
663 Some(arr.value(row_idx))
664 }
665 })
666 .collect();
667
668 let vid = Vid::from(src);
669 let mut seen_edges: HashSet<u64> = HashSet::new();
672
673 for &edge_type in &self.edge_type_ids {
674 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
675
676 for (target_vid, eid) in neighbors {
677 let eid_u64 = eid.as_u64();
678
679 if used_eids.contains(&eid_u64) {
681 continue;
682 }
683
684 if is_undirected && !seen_edges.insert(eid_u64) {
686 continue;
687 }
688
689 if let Some(expected_opt) = expected_target {
692 let Some(expected) = expected_opt else {
693 continue;
694 };
695 if target_vid.as_u64() != expected {
696 continue;
697 }
698 }
699
700 if let Some(ref label_name) = self.target_label_name {
703 let query_ctx = self.graph_ctx.query_context();
704 if let Some(vertex_labels) =
705 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
706 {
707 if !vertex_labels.contains(label_name) {
709 continue;
710 }
711 }
712 }
714
715 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
716 }
717 }
718 }
719
720 Ok(expanded_rows)
721 }
722}
723
724fn build_target_labels_column(
726 target_vids: &[Vid],
727 target_label_name: &Option<String>,
728 graph_ctx: &GraphExecutionContext,
729) -> ArrayRef {
730 use arrow_array::builder::{ListBuilder, StringBuilder};
731 let mut labels_builder = ListBuilder::new(StringBuilder::new());
732 let query_ctx = graph_ctx.query_context();
733 for vid in target_vids {
734 let row_labels: Vec<String> =
735 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
736 Some(labels) => labels,
737 None => {
738 if let Some(label_name) = target_label_name {
740 vec![label_name.clone()]
741 } else {
742 vec![]
743 }
744 }
745 };
746 let values = labels_builder.values();
747 for lbl in &row_labels {
748 values.append_value(lbl);
749 }
750 labels_builder.append(true);
751 }
752 Arc::new(labels_builder.finish())
753}
754
755async fn build_target_property_columns(
757 target_vids: &[Vid],
758 target_properties: &[String],
759 target_label_name: &Option<String>,
760 graph_ctx: &Arc<GraphExecutionContext>,
761) -> DFResult<Vec<ArrayRef>> {
762 let mut columns = Vec::new();
763
764 if let Some(label_name) = target_label_name {
765 let property_manager = graph_ctx.property_manager();
766 let query_ctx = graph_ctx.query_context();
767
768 let props_map = property_manager
769 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
770 .await
771 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
772
773 let uni_schema = graph_ctx.storage().schema_manager().schema();
774 let label_props = uni_schema.properties.get(label_name.as_str());
775
776 for prop_name in target_properties {
777 let data_type = resolve_property_type(prop_name, label_props);
778 let column =
779 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
780 columns.push(column);
781 }
782 } else {
783 let non_internal_props: Vec<&str> = target_properties
785 .iter()
786 .filter(|p| *p != "_all_props")
787 .map(|s| s.as_str())
788 .collect();
789 let property_manager = graph_ctx.property_manager();
790 let query_ctx = graph_ctx.query_context();
791
792 let props_map = if !non_internal_props.is_empty() {
793 property_manager
794 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
795 .await
796 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
797 } else {
798 std::collections::HashMap::new()
799 };
800
801 for prop_name in target_properties {
802 if prop_name == "_all_props" {
803 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
804 } else {
805 let column = build_property_column_static(
806 target_vids,
807 &props_map,
808 prop_name,
809 &arrow::datatypes::DataType::LargeBinary,
810 )?;
811 columns.push(column);
812 }
813 }
814 }
815
816 Ok(columns)
817}
818
819fn build_all_props_column(
821 target_vids: &[Vid],
822 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
823 graph_ctx: &Arc<GraphExecutionContext>,
824) -> ArrayRef {
825 use crate::query::df_graph::scan::encode_cypher_value;
826 use arrow_array::builder::LargeBinaryBuilder;
827
828 let mut builder = LargeBinaryBuilder::new();
829 let l0_ctx = graph_ctx.l0_context();
830 for vid in target_vids {
831 let mut merged_props = serde_json::Map::new();
832 if let Some(vid_props) = props_map.get(vid) {
833 for (k, v) in vid_props.iter() {
834 let json_val: serde_json::Value = v.clone().into();
835 merged_props.insert(k.to_string(), json_val);
836 }
837 }
838 for l0 in l0_ctx.iter_l0_buffers() {
839 let guard = l0.read();
840 if let Some(l0_props) = guard.vertex_properties.get(vid) {
841 for (k, v) in l0_props.iter() {
842 let json_val: serde_json::Value = v.clone().into();
843 merged_props.insert(k.to_string(), json_val);
844 }
845 }
846 }
847 if merged_props.is_empty() {
848 builder.append_null();
849 } else {
850 let json = serde_json::Value::Object(merged_props);
851 match encode_cypher_value(&json) {
852 Ok(bytes) => builder.append_value(bytes),
853 Err(_) => builder.append_null(),
854 }
855 }
856 }
857 Arc::new(builder.finish())
858}
859
860async fn build_edge_columns(
862 expansions: &[(usize, Vid, u64, u32)],
863 edge_properties: &[String],
864 edge_type_ids: &[u32],
865 graph_ctx: &Arc<GraphExecutionContext>,
866) -> DFResult<Vec<ArrayRef>> {
867 let mut columns = Vec::new();
868
869 let eids: Vec<Eid> = expansions
870 .iter()
871 .map(|(_, _, eid, _)| Eid::from(*eid))
872 .collect();
873 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
874 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
875
876 {
878 let uni_schema = graph_ctx.storage().schema_manager().schema();
879 let mut type_builder = arrow_array::builder::StringBuilder::new();
880 for (_, _, _, edge_type_id) in expansions {
881 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
882 type_builder.append_value(&name);
883 } else {
884 type_builder.append_null();
885 }
886 }
887 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
888 }
889
890 if !edge_properties.is_empty() {
891 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
892 let property_manager = graph_ctx.property_manager();
893 let query_ctx = graph_ctx.query_context();
894
895 let props_map = property_manager
896 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
897 .await
898 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
899
900 let uni_schema = graph_ctx.storage().schema_manager().schema();
901 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
902 let edge_type_props = if merged_edge_props.is_empty() {
903 None
904 } else {
905 Some(&merged_edge_props)
906 };
907
908 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
909
910 for prop_name in edge_properties {
911 if prop_name == "_created_at" || prop_name == "_updated_at" {
917 let mut builder =
918 arrow_array::builder::TimestampNanosecondBuilder::new().with_timezone("UTC");
919 let l0_ctx = graph_ctx.l0_context();
920 for eid in &eids {
921 let mut value: Option<i64> = None;
922 for l0 in l0_ctx.iter_l0_buffers() {
923 let guard = l0.read();
924 let opt = if prop_name == "_created_at" {
925 guard.edge_created_at.get(eid).copied()
926 } else {
927 guard.edge_updated_at.get(eid).copied()
928 };
929 if let Some(ts) = opt {
930 value = Some(match value {
931 None => ts,
932 Some(cur) if prop_name == "_created_at" => cur.min(ts),
933 Some(cur) => cur.max(ts),
934 });
935 }
936 }
937 match value {
938 Some(ts) => builder.append_value(ts),
939 None => builder.append_null(),
940 }
941 }
942 columns.push(Arc::new(builder.finish()) as ArrayRef);
943 continue;
944 }
945 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
946 let column =
947 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
948 columns.push(column);
949 }
950 }
951
952 Ok(columns)
953}
954
955#[expect(
960 clippy::too_many_arguments,
961 reason = "Standalone async fn needs all context passed explicitly"
962)]
963async fn build_traverse_output_batch(
964 input: RecordBatch,
965 expansions: Vec<(usize, Vid, u64, u32)>,
966 schema: SchemaRef,
967 edge_variable: Option<String>,
968 edge_properties: Vec<String>,
969 edge_type_ids: Vec<u32>,
970 target_properties: Vec<String>,
971 target_label_name: Option<String>,
972 graph_ctx: Arc<GraphExecutionContext>,
973 optional: bool,
974 optional_pattern_vars: HashSet<String>,
975) -> DFResult<RecordBatch> {
976 if expansions.is_empty() {
977 if !optional {
978 return Ok(RecordBatch::new_empty(schema));
979 }
980 let unmatched_reps = collect_unmatched_optional_group_rows(
981 &input,
982 &HashSet::new(),
983 &schema,
984 &optional_pattern_vars,
985 )?;
986 if unmatched_reps.is_empty() {
987 return Ok(RecordBatch::new_empty(schema));
988 }
989 return build_optional_null_batch_for_rows_with_optional_vars(
990 &input,
991 &unmatched_reps,
992 &schema,
993 &optional_pattern_vars,
994 );
995 }
996
997 let indices: Vec<u64> = expansions
999 .iter()
1000 .map(|(idx, _, _, _)| *idx as u64)
1001 .collect();
1002 let indices_array = UInt64Array::from(indices);
1003 let mut columns: Vec<ArrayRef> = input
1004 .columns()
1005 .iter()
1006 .map(|col| take(col.as_ref(), &indices_array, None))
1007 .collect::<Result<_, _>>()?;
1008
1009 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
1011 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
1012 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
1013
1014 columns.push(build_target_labels_column(
1016 &target_vids,
1017 &target_label_name,
1018 &graph_ctx,
1019 ));
1020
1021 if !target_properties.is_empty() {
1023 let prop_cols = build_target_property_columns(
1024 &target_vids,
1025 &target_properties,
1026 &target_label_name,
1027 &graph_ctx,
1028 )
1029 .await?;
1030 columns.extend(prop_cols);
1031 }
1032
1033 if edge_variable.is_some() {
1035 let edge_cols =
1036 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
1037 columns.extend(edge_cols);
1038 } else {
1039 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1040 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1041 }
1042
1043 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1044
1045 if optional {
1047 let matched_indices: HashSet<usize> =
1048 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1049 let unmatched = collect_unmatched_optional_group_rows(
1050 &input,
1051 &matched_indices,
1052 &schema,
1053 &optional_pattern_vars,
1054 )?;
1055
1056 if !unmatched.is_empty() {
1057 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1058 &input,
1059 &unmatched,
1060 &schema,
1061 &optional_pattern_vars,
1062 )?;
1063 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1064 .map_err(arrow_err)?;
1065 return Ok(combined);
1066 }
1067 }
1068
1069 Ok(expanded_batch)
1070}
1071
1072fn build_optional_null_batch_for_rows(
1075 input: &RecordBatch,
1076 unmatched_indices: &[usize],
1077 schema: &SchemaRef,
1078) -> DFResult<RecordBatch> {
1079 let num_rows = unmatched_indices.len();
1080 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1081 let indices_array = UInt64Array::from(indices);
1082
1083 let mut columns: Vec<ArrayRef> = Vec::new();
1085 for col in input.columns() {
1086 let taken = take(col.as_ref(), &indices_array, None)?;
1087 columns.push(taken);
1088 }
1089 for field in schema.fields().iter().skip(input.num_columns()) {
1091 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1092 }
1093 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1094}
1095
1096fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1097 optional_vars.contains(col_name)
1098 || optional_vars.iter().any(|var| {
1099 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1100 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1101 })
1102}
1103
1104fn collect_unmatched_optional_group_rows(
1105 input: &RecordBatch,
1106 matched_indices: &HashSet<usize>,
1107 schema: &SchemaRef,
1108 optional_vars: &HashSet<String>,
1109) -> DFResult<Vec<usize>> {
1110 if input.num_rows() == 0 {
1111 return Ok(Vec::new());
1112 }
1113
1114 if optional_vars.is_empty() {
1115 return Ok((0..input.num_rows())
1116 .filter(|idx| !matched_indices.contains(idx))
1117 .collect());
1118 }
1119
1120 let source_vid_indices: Vec<usize> = schema
1121 .fields()
1122 .iter()
1123 .enumerate()
1124 .filter_map(|(idx, field)| {
1125 if idx >= input.num_columns() {
1126 return None;
1127 }
1128 let name = field.name();
1129 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1130 Some(idx)
1131 } else {
1132 None
1133 }
1134 })
1135 .collect();
1136
1137 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1140
1141 for row_idx in 0..input.num_rows() {
1142 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1143 let entry = groups.entry(key.clone());
1144 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1145 group_order.push(key.clone());
1146 }
1147 let matched = matched_indices.contains(&row_idx);
1148 entry
1149 .and_modify(|(_, any_matched)| *any_matched |= matched)
1150 .or_insert((row_idx, matched));
1151 }
1152
1153 Ok(group_order
1154 .into_iter()
1155 .filter_map(|key| {
1156 groups
1157 .get(&key)
1158 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1159 })
1160 .collect())
1161}
1162
1163fn compute_optional_group_key(
1164 batch: &RecordBatch,
1165 row_idx: usize,
1166 source_vid_indices: &[usize],
1167) -> DFResult<Vec<u8>> {
1168 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1169 for &col_idx in source_vid_indices {
1170 let col = batch.column(col_idx);
1171 let vid_cow = column_as_vid_array(col.as_ref())?;
1172 let arr: &UInt64Array = &vid_cow;
1173 if arr.is_null(row_idx) {
1174 key.extend_from_slice(&u64::MAX.to_le_bytes());
1175 } else {
1176 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1177 }
1178 }
1179 Ok(key)
1180}
1181
1182fn build_optional_null_batch_for_rows_with_optional_vars(
1183 input: &RecordBatch,
1184 unmatched_indices: &[usize],
1185 schema: &SchemaRef,
1186 optional_vars: &HashSet<String>,
1187) -> DFResult<RecordBatch> {
1188 if optional_vars.is_empty() {
1189 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1190 }
1191
1192 let num_rows = unmatched_indices.len();
1193 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1194 let indices_array = UInt64Array::from(indices);
1195
1196 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1197 for (col_idx, field) in schema.fields().iter().enumerate() {
1198 if col_idx < input.num_columns() {
1199 if is_optional_column_for_vars(field.name(), optional_vars) {
1200 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1201 } else {
1202 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1203 columns.push(taken);
1204 }
1205 } else {
1206 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1207 }
1208 }
1209
1210 RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
1211}
1212
1213impl Stream for GraphTraverseStream {
1214 type Item = DFResult<RecordBatch>;
1215
1216 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1217 let metrics = self.metrics.clone();
1218 let _timer = metrics.elapsed_compute().timer();
1219 loop {
1220 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1221
1222 match state {
1223 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1224 Poll::Ready(Ok(())) => {
1225 self.state = TraverseStreamState::Reading;
1226 }
1228 Poll::Ready(Err(e)) => {
1229 self.state = TraverseStreamState::Done;
1230 return Poll::Ready(Some(Err(e)));
1231 }
1232 Poll::Pending => {
1233 self.state = TraverseStreamState::Warming(fut);
1234 return Poll::Pending;
1235 }
1236 },
1237 TraverseStreamState::Reading => {
1238 if let Err(e) = self.graph_ctx.check_timeout() {
1240 return Poll::Ready(Some(Err(
1241 datafusion::error::DataFusionError::Execution(e.to_string()),
1242 )));
1243 }
1244
1245 match self.input.poll_next_unpin(cx) {
1246 Poll::Ready(Some(Ok(batch))) => {
1247 let expansions = match self.expand_neighbors(&batch) {
1249 Ok(exp) => exp,
1250 Err(e) => {
1251 self.state = TraverseStreamState::Reading;
1252 return Poll::Ready(Some(Err(e)));
1253 }
1254 };
1255
1256 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1258 {
1259 let result = build_traverse_output_batch_sync(
1260 &batch,
1261 &expansions,
1262 &self.schema,
1263 self.edge_variable.as_ref(),
1264 &self.graph_ctx,
1265 self.optional,
1266 &self.optional_pattern_vars,
1267 );
1268 self.state = TraverseStreamState::Reading;
1269 if let Ok(ref r) = result {
1270 self.metrics.record_output(r.num_rows());
1271 }
1272 return Poll::Ready(Some(result));
1273 }
1274
1275 let schema = self.schema.clone();
1277 let edge_variable = self.edge_variable.clone();
1278 let edge_properties = self.edge_properties.clone();
1279 let edge_type_ids = self.edge_type_ids.clone();
1280 let target_properties = self.target_properties.clone();
1281 let target_label_name = self.target_label_name.clone();
1282 let graph_ctx = self.graph_ctx.clone();
1283
1284 let optional = self.optional;
1285 let optional_pattern_vars = self.optional_pattern_vars.clone();
1286
1287 let fut = build_traverse_output_batch(
1288 batch,
1289 expansions,
1290 schema,
1291 edge_variable,
1292 edge_properties,
1293 edge_type_ids,
1294 target_properties,
1295 target_label_name,
1296 graph_ctx,
1297 optional,
1298 optional_pattern_vars,
1299 );
1300
1301 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1302 }
1304 Poll::Ready(Some(Err(e))) => {
1305 self.state = TraverseStreamState::Done;
1306 return Poll::Ready(Some(Err(e)));
1307 }
1308 Poll::Ready(None) => {
1309 self.state = TraverseStreamState::Done;
1310 return Poll::Ready(None);
1311 }
1312 Poll::Pending => {
1313 self.state = TraverseStreamState::Reading;
1314 return Poll::Pending;
1315 }
1316 }
1317 }
1318 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1319 Poll::Ready(Ok(batch)) => {
1320 self.state = TraverseStreamState::Reading;
1321 self.metrics.record_output(batch.num_rows());
1322 return Poll::Ready(Some(Ok(batch)));
1323 }
1324 Poll::Ready(Err(e)) => {
1325 self.state = TraverseStreamState::Done;
1326 return Poll::Ready(Some(Err(e)));
1327 }
1328 Poll::Pending => {
1329 self.state = TraverseStreamState::Materializing(fut);
1330 return Poll::Pending;
1331 }
1332 },
1333 TraverseStreamState::Done => {
1334 return Poll::Ready(None);
1335 }
1336 }
1337 }
1338 }
1339}
1340
1341fn build_traverse_output_batch_sync(
1346 input: &RecordBatch,
1347 expansions: &[(usize, Vid, u64, u32)],
1348 schema: &SchemaRef,
1349 edge_variable: Option<&String>,
1350 graph_ctx: &GraphExecutionContext,
1351 optional: bool,
1352 optional_pattern_vars: &HashSet<String>,
1353) -> DFResult<RecordBatch> {
1354 if expansions.is_empty() {
1355 if !optional {
1356 return Ok(RecordBatch::new_empty(schema.clone()));
1357 }
1358 let unmatched_reps = collect_unmatched_optional_group_rows(
1359 input,
1360 &HashSet::new(),
1361 schema,
1362 optional_pattern_vars,
1363 )?;
1364 if unmatched_reps.is_empty() {
1365 return Ok(RecordBatch::new_empty(schema.clone()));
1366 }
1367 return build_optional_null_batch_for_rows_with_optional_vars(
1368 input,
1369 &unmatched_reps,
1370 schema,
1371 optional_pattern_vars,
1372 );
1373 }
1374
1375 let indices: Vec<u64> = expansions
1376 .iter()
1377 .map(|(idx, _, _, _)| *idx as u64)
1378 .collect();
1379 let indices_array = UInt64Array::from(indices);
1380
1381 let mut columns: Vec<ArrayRef> = Vec::new();
1382 for col in input.columns() {
1383 let expanded = take(col.as_ref(), &indices_array, None)?;
1384 columns.push(expanded);
1385 }
1386
1387 let target_vids: Vec<u64> = expansions
1389 .iter()
1390 .map(|(_, vid, _, _)| vid.as_u64())
1391 .collect();
1392 columns.push(Arc::new(UInt64Array::from(target_vids)));
1393
1394 {
1396 use arrow_array::builder::{ListBuilder, StringBuilder};
1397 let l0_ctx = graph_ctx.l0_context();
1398 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1399 for (_, vid, _, _) in expansions {
1400 let mut row_labels: Vec<String> = Vec::new();
1401 for l0 in l0_ctx.iter_l0_buffers() {
1402 let guard = l0.read();
1403 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1404 for lbl in l0_labels {
1405 if !row_labels.contains(lbl) {
1406 row_labels.push(lbl.clone());
1407 }
1408 }
1409 }
1410 }
1411 let values = labels_builder.values();
1412 for lbl in &row_labels {
1413 values.append_value(lbl);
1414 }
1415 labels_builder.append(true);
1416 }
1417 columns.push(Arc::new(labels_builder.finish()));
1418 }
1419
1420 if edge_variable.is_some() {
1422 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1423 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1424
1425 let uni_schema = graph_ctx.storage().schema_manager().schema();
1427 let mut type_builder = arrow_array::builder::StringBuilder::new();
1428 for (_, _, _, edge_type_id) in expansions {
1429 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1430 type_builder.append_value(&name);
1431 } else {
1432 type_builder.append_null();
1433 }
1434 }
1435 columns.push(Arc::new(type_builder.finish()));
1436 } else {
1437 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1439 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1440 }
1441
1442 let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
1443
1444 if optional {
1445 let matched_indices: HashSet<usize> =
1446 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1447 let unmatched = collect_unmatched_optional_group_rows(
1448 input,
1449 &matched_indices,
1450 schema,
1451 optional_pattern_vars,
1452 )?;
1453
1454 if !unmatched.is_empty() {
1455 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1456 input,
1457 &unmatched,
1458 schema,
1459 optional_pattern_vars,
1460 )?;
1461 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1462 .map_err(|e| {
1463 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1464 })?;
1465 return Ok(combined);
1466 }
1467 }
1468
1469 Ok(expanded_batch)
1470}
1471
1472impl RecordBatchStream for GraphTraverseStream {
1473 fn schema(&self) -> SchemaRef {
1474 self.schema.clone()
1475 }
1476}
1477
1478type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1480
1481pub struct GraphTraverseMainExec {
1504 input: Arc<dyn ExecutionPlan>,
1506
1507 source_column: String,
1509
1510 type_names: Vec<String>,
1513
1514 direction: Direction,
1516
1517 target_variable: String,
1519
1520 edge_variable: Option<String>,
1522
1523 edge_properties: Vec<String>,
1525
1526 target_properties: Vec<String>,
1528
1529 graph_ctx: Arc<GraphExecutionContext>,
1531
1532 optional: bool,
1534
1535 optional_pattern_vars: HashSet<String>,
1537
1538 bound_target_column: Option<String>,
1541
1542 used_edge_columns: Vec<String>,
1545
1546 schema: SchemaRef,
1548
1549 properties: Arc<PlanProperties>,
1551
1552 metrics: ExecutionPlanMetricsSet,
1554}
1555
1556impl fmt::Debug for GraphTraverseMainExec {
1557 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1558 f.debug_struct("GraphTraverseMainExec")
1559 .field("type_names", &self.type_names)
1560 .field("direction", &self.direction)
1561 .field("target_variable", &self.target_variable)
1562 .field("edge_variable", &self.edge_variable)
1563 .finish()
1564 }
1565}
1566
1567impl GraphTraverseMainExec {
1568 #[expect(clippy::too_many_arguments)]
1570 pub fn new(
1571 input: Arc<dyn ExecutionPlan>,
1572 source_column: impl Into<String>,
1573 type_names: Vec<String>,
1574 direction: Direction,
1575 target_variable: impl Into<String>,
1576 edge_variable: Option<String>,
1577 edge_properties: Vec<String>,
1578 target_properties: Vec<String>,
1579 graph_ctx: Arc<GraphExecutionContext>,
1580 optional: bool,
1581 optional_pattern_vars: HashSet<String>,
1582 bound_target_column: Option<String>,
1583 used_edge_columns: Vec<String>,
1584 ) -> Self {
1585 let source_column = source_column.into();
1586 let target_variable = target_variable.into();
1587
1588 let schema = Self::build_schema(
1590 &input.schema(),
1591 &target_variable,
1592 &edge_variable,
1593 &edge_properties,
1594 &target_properties,
1595 optional,
1596 );
1597
1598 let properties = compute_plan_properties(schema.clone());
1599
1600 Self {
1601 input,
1602 source_column,
1603 type_names,
1604 direction,
1605 target_variable,
1606 edge_variable,
1607 edge_properties,
1608 target_properties,
1609 graph_ctx,
1610 optional,
1611 optional_pattern_vars,
1612 bound_target_column,
1613 used_edge_columns,
1614 schema,
1615 properties,
1616 metrics: ExecutionPlanMetricsSet::new(),
1617 }
1618 }
1619
1620 fn build_schema(
1622 input_schema: &SchemaRef,
1623 target_variable: &str,
1624 edge_variable: &Option<String>,
1625 edge_properties: &[String],
1626 target_properties: &[String],
1627 optional: bool,
1628 ) -> SchemaRef {
1629 let mut fields: Vec<Field> = input_schema
1630 .fields()
1631 .iter()
1632 .map(|f| f.as_ref().clone())
1633 .collect();
1634
1635 let target_vid_name = format!("{}._vid", target_variable);
1637 if input_schema.column_with_name(&target_vid_name).is_none() {
1638 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1639 }
1640
1641 let target_labels_name = format!("{}._labels", target_variable);
1643 if input_schema.column_with_name(&target_labels_name).is_none() {
1644 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1645 }
1646
1647 if let Some(edge_var) = edge_variable {
1649 fields.push(Field::new(
1650 format!("{}._eid", edge_var),
1651 DataType::UInt64,
1652 optional,
1653 ));
1654
1655 fields.push(Field::new(
1657 format!("{}._type", edge_var),
1658 DataType::Utf8,
1659 true,
1660 ));
1661
1662 for prop in edge_properties {
1666 let col_name = format!("{}.{}", edge_var, prop);
1667 let mut metadata = std::collections::HashMap::new();
1668 metadata.insert("cv_encoded".to_string(), "true".to_string());
1669 fields.push(
1670 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1671 );
1672 }
1673 } else {
1674 fields.push(Field::new(
1677 format!("__eid_to_{}", target_variable),
1678 DataType::UInt64,
1679 optional,
1680 ));
1681 }
1682
1683 for prop in target_properties {
1685 fields.push(Field::new(
1686 format!("{}.{}", target_variable, prop),
1687 DataType::LargeBinary,
1688 true,
1689 ));
1690 }
1691
1692 Arc::new(Schema::new(fields))
1693 }
1694}
1695
1696impl DisplayAs for GraphTraverseMainExec {
1697 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1698 write!(
1699 f,
1700 "GraphTraverseMainExec: types={:?}, direction={:?}",
1701 self.type_names, self.direction
1702 )
1703 }
1704}
1705
1706impl ExecutionPlan for GraphTraverseMainExec {
1707 fn name(&self) -> &str {
1708 "GraphTraverseMainExec"
1709 }
1710
1711 fn as_any(&self) -> &dyn Any {
1712 self
1713 }
1714
1715 fn schema(&self) -> SchemaRef {
1716 self.schema.clone()
1717 }
1718
1719 fn properties(&self) -> &Arc<PlanProperties> {
1720 &self.properties
1721 }
1722
1723 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1724 vec![&self.input]
1725 }
1726
1727 fn with_new_children(
1728 self: Arc<Self>,
1729 children: Vec<Arc<dyn ExecutionPlan>>,
1730 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1731 if children.len() != 1 {
1732 return Err(datafusion::error::DataFusionError::Plan(
1733 "GraphTraverseMainExec expects exactly one child".to_string(),
1734 ));
1735 }
1736
1737 Ok(Arc::new(Self {
1738 input: children[0].clone(),
1739 source_column: self.source_column.clone(),
1740 type_names: self.type_names.clone(),
1741 direction: self.direction,
1742 target_variable: self.target_variable.clone(),
1743 edge_variable: self.edge_variable.clone(),
1744 edge_properties: self.edge_properties.clone(),
1745 target_properties: self.target_properties.clone(),
1746 graph_ctx: self.graph_ctx.clone(),
1747 optional: self.optional,
1748 optional_pattern_vars: self.optional_pattern_vars.clone(),
1749 bound_target_column: self.bound_target_column.clone(),
1750 used_edge_columns: self.used_edge_columns.clone(),
1751 schema: self.schema.clone(),
1752 properties: self.properties.clone(),
1753 metrics: self.metrics.clone(),
1754 }))
1755 }
1756
1757 fn execute(
1758 &self,
1759 partition: usize,
1760 context: Arc<TaskContext>,
1761 ) -> DFResult<SendableRecordBatchStream> {
1762 let input_stream = self.input.execute(partition, context)?;
1763 let metrics = BaselineMetrics::new(&self.metrics, partition);
1764
1765 Ok(Box::pin(GraphTraverseMainStream::new(
1766 input_stream,
1767 self.source_column.clone(),
1768 self.type_names.clone(),
1769 self.direction,
1770 self.target_variable.clone(),
1771 self.edge_variable.clone(),
1772 self.edge_properties.clone(),
1773 self.target_properties.clone(),
1774 self.graph_ctx.clone(),
1775 self.optional,
1776 self.optional_pattern_vars.clone(),
1777 self.bound_target_column.clone(),
1778 self.used_edge_columns.clone(),
1779 self.schema.clone(),
1780 metrics,
1781 )))
1782 }
1783
1784 fn metrics(&self) -> Option<MetricsSet> {
1785 Some(self.metrics.clone_inner())
1786 }
1787}
1788
1789enum GraphTraverseMainState {
1791 LoadingEdges {
1793 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1794 input_stream: SendableRecordBatchStream,
1795 },
1796 Processing {
1798 adjacency: EdgeAdjacencyMap,
1799 input_stream: SendableRecordBatchStream,
1800 },
1801 Done,
1803}
1804
1805struct GraphTraverseMainStream {
1807 source_column: String,
1809
1810 target_variable: String,
1812
1813 edge_variable: Option<String>,
1815
1816 edge_properties: Vec<String>,
1818
1819 target_properties: Vec<String>,
1821
1822 graph_ctx: Arc<GraphExecutionContext>,
1824
1825 optional: bool,
1827
1828 optional_pattern_vars: HashSet<String>,
1830
1831 bound_target_column: Option<String>,
1833
1834 used_edge_columns: Vec<String>,
1836
1837 schema: SchemaRef,
1839
1840 state: GraphTraverseMainState,
1842
1843 metrics: BaselineMetrics,
1845}
1846
1847impl GraphTraverseMainStream {
1848 #[expect(clippy::too_many_arguments)]
1850 fn new(
1851 input_stream: SendableRecordBatchStream,
1852 source_column: String,
1853 type_names: Vec<String>,
1854 direction: Direction,
1855 target_variable: String,
1856 edge_variable: Option<String>,
1857 edge_properties: Vec<String>,
1858 target_properties: Vec<String>,
1859 graph_ctx: Arc<GraphExecutionContext>,
1860 optional: bool,
1861 optional_pattern_vars: HashSet<String>,
1862 bound_target_column: Option<String>,
1863 used_edge_columns: Vec<String>,
1864 schema: SchemaRef,
1865 metrics: BaselineMetrics,
1866 ) -> Self {
1867 let loading_ctx = graph_ctx.clone();
1869 let loading_types = type_names.clone();
1870 let fut =
1871 async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1872
1873 Self {
1874 source_column,
1875 target_variable,
1876 edge_variable,
1877 edge_properties,
1878 target_properties,
1879 graph_ctx,
1880 optional,
1881 optional_pattern_vars,
1882 bound_target_column,
1883 used_edge_columns,
1884 schema,
1885 state: GraphTraverseMainState::LoadingEdges {
1886 future: Box::pin(fut),
1887 input_stream,
1888 },
1889 metrics,
1890 }
1891 }
1892
1893 fn expand_batch(
1895 &self,
1896 input: &RecordBatch,
1897 adjacency: &EdgeAdjacencyMap,
1898 ) -> DFResult<RecordBatch> {
1899 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1901 datafusion::error::DataFusionError::Execution(format!(
1902 "Source column {} not found",
1903 self.source_column
1904 ))
1905 })?;
1906
1907 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1908 let source_vids: &UInt64Array = &source_vid_cow;
1909
1910 let bound_target_cow = self
1912 .bound_target_column
1913 .as_ref()
1914 .and_then(|col| input.column_by_name(col))
1915 .map(|c| column_as_vid_array(c.as_ref()))
1916 .transpose()?;
1917 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1918
1919 let used_edge_arrays: Vec<&UInt64Array> = self
1921 .used_edge_columns
1922 .iter()
1923 .filter_map(|col| {
1924 input
1925 .column_by_name(col)
1926 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1927 })
1928 .collect();
1929
1930 type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1932 let mut expansions: Vec<Expansion> = Vec::new();
1933
1934 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1935 if let Some(src_u64) = src_u64 {
1936 let src_vid = Vid::from(src_u64);
1937
1938 let used_eids: HashSet<u64> = used_edge_arrays
1940 .iter()
1941 .filter_map(|arr| {
1942 if arr.is_null(row_idx) {
1943 None
1944 } else {
1945 Some(arr.value(row_idx))
1946 }
1947 })
1948 .collect();
1949
1950 if let Some(neighbors) = adjacency.get(&src_vid) {
1951 for (target_vid, eid, edge_type, props) in neighbors {
1952 if used_eids.contains(&eid.as_u64()) {
1954 continue;
1955 }
1956
1957 if let Some(targets) = expected_targets {
1960 if targets.is_null(row_idx) {
1961 continue;
1962 }
1963 let expected_vid = targets.value(row_idx);
1964 if target_vid.as_u64() != expected_vid {
1965 continue;
1966 }
1967 }
1968
1969 expansions.push((
1970 row_idx,
1971 *target_vid,
1972 *eid,
1973 edge_type.clone(),
1974 props.clone(),
1975 ));
1976 }
1977 }
1978 }
1979 }
1980
1981 if expansions.is_empty() && self.optional {
1983 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1985 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1986 }
1987
1988 if expansions.is_empty() {
1989 return Ok(RecordBatch::new_empty(self.schema.clone()));
1991 }
1992
1993 let matched_rows: HashSet<usize> = if self.optional {
1995 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1996 } else {
1997 HashSet::new()
1998 };
1999
2000 let mut columns: Vec<ArrayRef> = Vec::new();
2002 let indices: Vec<u64> = expansions
2003 .iter()
2004 .map(|(idx, _, _, _, _)| *idx as u64)
2005 .collect();
2006 let indices_array = UInt64Array::from(indices);
2007
2008 for col in input.columns() {
2009 let expanded = take(col.as_ref(), &indices_array, None)?;
2010 columns.push(expanded);
2011 }
2012
2013 let target_vid_name = format!("{}._vid", self.target_variable);
2015 let target_vids: Vec<u64> = expansions
2016 .iter()
2017 .map(|(_, vid, _, _, _)| vid.as_u64())
2018 .collect();
2019 if input.schema().column_with_name(&target_vid_name).is_none() {
2020 columns.push(Arc::new(UInt64Array::from(target_vids)));
2021 }
2022
2023 let target_labels_name = format!("{}._labels", self.target_variable);
2025 if input
2026 .schema()
2027 .column_with_name(&target_labels_name)
2028 .is_none()
2029 {
2030 use arrow_array::builder::{ListBuilder, StringBuilder};
2031 let l0_ctx = self.graph_ctx.l0_context();
2032 let mut labels_builder = ListBuilder::new(StringBuilder::new());
2033 for (_, target_vid, _, _, _) in &expansions {
2034 let mut row_labels: Vec<String> = Vec::new();
2035 for l0 in l0_ctx.iter_l0_buffers() {
2036 let guard = l0.read();
2037 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2038 for lbl in l0_labels {
2039 if !row_labels.contains(lbl) {
2040 row_labels.push(lbl.clone());
2041 }
2042 }
2043 }
2044 }
2045 let values = labels_builder.values();
2046 for lbl in &row_labels {
2047 values.append_value(lbl);
2048 }
2049 labels_builder.append(true);
2050 }
2051 columns.push(Arc::new(labels_builder.finish()));
2052 }
2053
2054 if self.edge_variable.is_some() {
2057 let eids: Vec<u64> = expansions
2059 .iter()
2060 .map(|(_, _, eid, _, _)| eid.as_u64())
2061 .collect();
2062 columns.push(Arc::new(UInt64Array::from(eids)));
2063
2064 {
2066 let mut type_builder = arrow_array::builder::StringBuilder::new();
2067 for (_, _, _, edge_type, _) in &expansions {
2068 type_builder.append_value(edge_type);
2069 }
2070 columns.push(Arc::new(type_builder.finish()));
2071 }
2072
2073 for prop_name in &self.edge_properties {
2075 use crate::query::df_graph::scan::encode_cypher_value;
2076 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2077 if prop_name == "_all_props" {
2078 for (_, _, _, _, props) in &expansions {
2080 if props.is_empty() {
2081 builder.append_null();
2082 } else {
2083 let mut json_map = serde_json::Map::new();
2084 for (k, v) in props.iter() {
2085 let json_val: serde_json::Value = v.clone().into();
2086 json_map.insert(k.clone(), json_val);
2087 }
2088 let json = serde_json::Value::Object(json_map);
2089 match encode_cypher_value(&json) {
2090 Ok(bytes) => builder.append_value(bytes),
2091 Err(_) => builder.append_null(),
2092 }
2093 }
2094 }
2095 } else {
2096 for (_, _, _, _, props) in &expansions {
2098 match props.get(prop_name) {
2099 Some(uni_common::Value::Null) | None => builder.append_null(),
2100 Some(val) => {
2101 let json_val: serde_json::Value = val.clone().into();
2102 match encode_cypher_value(&json_val) {
2103 Ok(bytes) => builder.append_value(bytes),
2104 Err(_) => builder.append_null(),
2105 }
2106 }
2107 }
2108 }
2109 }
2110 columns.push(Arc::new(builder.finish()));
2111 }
2112 } else {
2113 let eids: Vec<u64> = expansions
2114 .iter()
2115 .map(|(_, _, eid, _, _)| eid.as_u64())
2116 .collect();
2117 columns.push(Arc::new(UInt64Array::from(eids)));
2118 }
2119
2120 {
2122 use crate::query::df_graph::scan::encode_cypher_value;
2123 let l0_ctx = self.graph_ctx.l0_context();
2124
2125 for prop_name in &self.target_properties {
2126 if prop_name == "_all_props" {
2127 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2129 for (_, target_vid, _, _, _) in &expansions {
2130 let mut merged_props = serde_json::Map::new();
2131 for l0 in l0_ctx.iter_l0_buffers() {
2132 let guard = l0.read();
2133 if let Some(props) = guard.vertex_properties.get(target_vid) {
2134 for (k, v) in props.iter() {
2135 let json_val: serde_json::Value = v.clone().into();
2136 merged_props.insert(k.to_string(), json_val);
2137 }
2138 }
2139 }
2140 if merged_props.is_empty() {
2141 builder.append_null();
2142 } else {
2143 let json = serde_json::Value::Object(merged_props);
2144 match encode_cypher_value(&json) {
2145 Ok(bytes) => builder.append_value(bytes),
2146 Err(_) => builder.append_null(),
2147 }
2148 }
2149 }
2150 columns.push(Arc::new(builder.finish()));
2151 } else {
2152 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2154 for (_, target_vid, _, _, _) in &expansions {
2155 let mut found = false;
2156 for l0 in l0_ctx.iter_l0_buffers() {
2157 let guard = l0.read();
2158 if let Some(props) = guard.vertex_properties.get(target_vid)
2159 && let Some(val) = props.get(prop_name.as_str())
2160 && !val.is_null()
2161 {
2162 let json_val: serde_json::Value = val.clone().into();
2163 if let Ok(bytes) = encode_cypher_value(&json_val) {
2164 builder.append_value(bytes);
2165 found = true;
2166 break;
2167 }
2168 }
2169 }
2170 if !found {
2171 builder.append_null();
2172 }
2173 }
2174 columns.push(Arc::new(builder.finish()));
2175 }
2176 }
2177 }
2178
2179 let matched_batch =
2180 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
2181
2182 if self.optional {
2184 let unmatched = collect_unmatched_optional_group_rows(
2185 input,
2186 &matched_rows,
2187 &self.schema,
2188 &self.optional_pattern_vars,
2189 )?;
2190
2191 if unmatched.is_empty() {
2192 return Ok(matched_batch);
2193 }
2194
2195 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2196 input,
2197 &unmatched,
2198 &self.schema,
2199 &self.optional_pattern_vars,
2200 )?;
2201
2202 use arrow::compute::concat_batches;
2204 concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
2205 } else {
2206 Ok(matched_batch)
2207 }
2208 }
2209}
2210
2211impl GraphExecutionContext {
2212 fn record_edge_adjacency(&self, adjacency: &EdgeAdjacencyMap) {
2223 let Some(tx_l0) = &self.l0_context.transaction_l0 else {
2224 return;
2225 };
2226 let guard = tx_l0.read();
2227 let Some(read_set) = &guard.occ_read_set else {
2228 return;
2229 };
2230 let mut rs = read_set.lock();
2231 for (src, neighbors) in adjacency {
2232 rs.vertices.insert(*src);
2233 for (nbr, eid, _type, _props) in neighbors {
2234 rs.vertices.insert(*nbr);
2235 rs.edges.insert(*eid);
2236 }
2237 }
2238 }
2239}
2240
2241async fn build_edge_adjacency_map(
2247 graph_ctx: &GraphExecutionContext,
2248 type_names: &[String],
2249 direction: Direction,
2250) -> DFResult<EdgeAdjacencyMap> {
2251 let storage = graph_ctx.storage();
2252 let l0_ctx = graph_ctx.l0_context();
2253
2254 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2256 let edges_with_type = storage
2257 .find_edges_by_type_names(&type_refs)
2258 .await
2259 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2260
2261 let mut edges: Vec<(
2263 uni_common::Eid,
2264 uni_common::Vid,
2265 uni_common::Vid,
2266 String,
2267 uni_common::Properties,
2268 )> = edges_with_type.into_iter().collect();
2269
2270 for l0 in l0_ctx.iter_l0_buffers() {
2272 let l0_guard = l0.read();
2273
2274 for type_name in type_names {
2275 let l0_eids = l0_guard.eids_for_type(type_name);
2276
2277 for &eid in &l0_eids {
2279 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2280 let src_vid = edge_ref.src_vid;
2281 let dst_vid = edge_ref.dst_vid;
2282
2283 let props = l0_guard
2285 .edge_properties
2286 .get(&eid)
2287 .cloned()
2288 .unwrap_or_default();
2289
2290 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2291 }
2292 }
2293 }
2294 }
2295
2296 let mut seen_eids = HashSet::new();
2298 let mut unique_edges = Vec::new();
2299 for edge in edges.into_iter().rev() {
2300 if seen_eids.insert(edge.0) {
2301 unique_edges.push(edge);
2302 }
2303 }
2304 unique_edges.reverse();
2305
2306 let mut tombstoned_eids = HashSet::new();
2308 for l0 in l0_ctx.iter_l0_buffers() {
2309 let l0_guard = l0.read();
2310 for eid in l0_guard.tombstones.keys() {
2311 tombstoned_eids.insert(*eid);
2312 }
2313 }
2314 if !tombstoned_eids.is_empty() {
2315 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2316 }
2317
2318 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2320
2321 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2322 match direction {
2323 Direction::Outgoing => {
2324 adjacency
2325 .entry(src_vid)
2326 .or_default()
2327 .push((dst_vid, eid, edge_type, props));
2328 }
2329 Direction::Incoming => {
2330 adjacency
2331 .entry(dst_vid)
2332 .or_default()
2333 .push((src_vid, eid, edge_type, props));
2334 }
2335 Direction::Both => {
2336 adjacency.entry(src_vid).or_default().push((
2337 dst_vid,
2338 eid,
2339 edge_type.clone(),
2340 props.clone(),
2341 ));
2342 adjacency
2343 .entry(dst_vid)
2344 .or_default()
2345 .push((src_vid, eid, edge_type, props));
2346 }
2347 }
2348 }
2349
2350 graph_ctx.record_edge_adjacency(&adjacency);
2353
2354 Ok(adjacency)
2355}
2356
2357impl Stream for GraphTraverseMainStream {
2358 type Item = DFResult<RecordBatch>;
2359
2360 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2361 let metrics = self.metrics.clone();
2362 let _timer = metrics.elapsed_compute().timer();
2363 loop {
2364 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2365
2366 match state {
2367 GraphTraverseMainState::LoadingEdges {
2368 mut future,
2369 input_stream,
2370 } => match future.as_mut().poll(cx) {
2371 Poll::Ready(Ok(adjacency)) => {
2372 self.state = GraphTraverseMainState::Processing {
2374 adjacency,
2375 input_stream,
2376 };
2377 }
2379 Poll::Ready(Err(e)) => {
2380 self.state = GraphTraverseMainState::Done;
2381 return Poll::Ready(Some(Err(e)));
2382 }
2383 Poll::Pending => {
2384 self.state = GraphTraverseMainState::LoadingEdges {
2385 future,
2386 input_stream,
2387 };
2388 return Poll::Pending;
2389 }
2390 },
2391 GraphTraverseMainState::Processing {
2392 adjacency,
2393 mut input_stream,
2394 } => {
2395 if let Err(e) = self.graph_ctx.check_timeout() {
2397 return Poll::Ready(Some(Err(
2398 datafusion::error::DataFusionError::Execution(e.to_string()),
2399 )));
2400 }
2401
2402 match input_stream.poll_next_unpin(cx) {
2403 Poll::Ready(Some(Ok(batch))) => {
2404 let result = self.expand_batch(&batch, &adjacency);
2406
2407 self.state = GraphTraverseMainState::Processing {
2408 adjacency,
2409 input_stream,
2410 };
2411
2412 if let Ok(ref r) = result {
2413 self.metrics.record_output(r.num_rows());
2414 }
2415 return Poll::Ready(Some(result));
2416 }
2417 Poll::Ready(Some(Err(e))) => {
2418 self.state = GraphTraverseMainState::Done;
2419 return Poll::Ready(Some(Err(e)));
2420 }
2421 Poll::Ready(None) => {
2422 self.state = GraphTraverseMainState::Done;
2423 return Poll::Ready(None);
2424 }
2425 Poll::Pending => {
2426 self.state = GraphTraverseMainState::Processing {
2427 adjacency,
2428 input_stream,
2429 };
2430 return Poll::Pending;
2431 }
2432 }
2433 }
2434 GraphTraverseMainState::Done => {
2435 return Poll::Ready(None);
2436 }
2437 }
2438 }
2439 }
2440}
2441
2442impl RecordBatchStream for GraphTraverseMainStream {
2443 fn schema(&self) -> SchemaRef {
2444 self.schema.clone()
2445 }
2446}
2447
2448pub struct GraphVariableLengthTraverseExec {
2469 input: Arc<dyn ExecutionPlan>,
2471
2472 source_column: String,
2474
2475 edge_type_ids: Vec<u32>,
2477
2478 direction: Direction,
2480
2481 min_hops: usize,
2483
2484 max_hops: usize,
2486
2487 target_variable: String,
2489
2490 step_variable: Option<String>,
2492
2493 path_variable: Option<String>,
2495
2496 target_properties: Vec<String>,
2498
2499 target_label_name: Option<String>,
2501
2502 is_optional: bool,
2504
2505 bound_target_column: Option<String>,
2507
2508 edge_lance_filter: Option<String>,
2510
2511 edge_property_conditions: Vec<(String, UniValue)>,
2514
2515 used_edge_columns: Vec<String>,
2517
2518 path_mode: super::nfa::PathMode,
2520
2521 output_mode: super::nfa::VlpOutputMode,
2523
2524 nfa: Arc<PathNfa>,
2526
2527 graph_ctx: Arc<GraphExecutionContext>,
2529
2530 schema: SchemaRef,
2532
2533 properties: Arc<PlanProperties>,
2535
2536 metrics: ExecutionPlanMetricsSet,
2538}
2539
2540impl fmt::Debug for GraphVariableLengthTraverseExec {
2541 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2542 f.debug_struct("GraphVariableLengthTraverseExec")
2543 .field("source_column", &self.source_column)
2544 .field("edge_type_ids", &self.edge_type_ids)
2545 .field("direction", &self.direction)
2546 .field("min_hops", &self.min_hops)
2547 .field("max_hops", &self.max_hops)
2548 .field("target_variable", &self.target_variable)
2549 .finish()
2550 }
2551}
2552
2553impl GraphVariableLengthTraverseExec {
2554 #[expect(clippy::too_many_arguments)]
2560 pub fn new(
2561 input: Arc<dyn ExecutionPlan>,
2562 source_column: impl Into<String>,
2563 edge_type_ids: Vec<u32>,
2564 direction: Direction,
2565 min_hops: usize,
2566 max_hops: usize,
2567 target_variable: impl Into<String>,
2568 step_variable: Option<String>,
2569 path_variable: Option<String>,
2570 target_properties: Vec<String>,
2571 target_label_name: Option<String>,
2572 graph_ctx: Arc<GraphExecutionContext>,
2573 is_optional: bool,
2574 bound_target_column: Option<String>,
2575 edge_lance_filter: Option<String>,
2576 edge_property_conditions: Vec<(String, UniValue)>,
2577 used_edge_columns: Vec<String>,
2578 path_mode: super::nfa::PathMode,
2579 output_mode: super::nfa::VlpOutputMode,
2580 qpp_nfa: Option<PathNfa>,
2581 ) -> Self {
2582 let source_column = source_column.into();
2583 let target_variable = target_variable.into();
2584
2585 let uni_schema = graph_ctx.storage().schema_manager().schema();
2587 let label_props = target_label_name
2588 .as_deref()
2589 .and_then(|ln| uni_schema.properties.get(ln));
2590
2591 let schema = Self::build_schema(
2593 input.schema(),
2594 &target_variable,
2595 step_variable.as_deref(),
2596 path_variable.as_deref(),
2597 &target_properties,
2598 label_props,
2599 );
2600 let properties = compute_plan_properties(schema.clone());
2601
2602 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2604 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2605 }));
2606
2607 Self {
2608 input,
2609 source_column,
2610 edge_type_ids,
2611 direction,
2612 min_hops,
2613 max_hops,
2614 target_variable,
2615 step_variable,
2616 path_variable,
2617 target_properties,
2618 target_label_name,
2619 is_optional,
2620 bound_target_column,
2621 edge_lance_filter,
2622 edge_property_conditions,
2623 used_edge_columns,
2624 path_mode,
2625 output_mode,
2626 nfa,
2627 graph_ctx,
2628 schema,
2629 properties,
2630 metrics: ExecutionPlanMetricsSet::new(),
2631 }
2632 }
2633
2634 fn build_schema(
2636 input_schema: SchemaRef,
2637 target_variable: &str,
2638 step_variable: Option<&str>,
2639 path_variable: Option<&str>,
2640 target_properties: &[String],
2641 label_props: Option<
2642 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2643 >,
2644 ) -> SchemaRef {
2645 let mut fields: Vec<Field> = input_schema
2646 .fields()
2647 .iter()
2648 .map(|f| f.as_ref().clone())
2649 .collect();
2650
2651 let target_vid_name = format!("{}._vid", target_variable);
2653 if input_schema.column_with_name(&target_vid_name).is_none() {
2654 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2655 }
2656
2657 let target_labels_name = format!("{}._labels", target_variable);
2659 if input_schema.column_with_name(&target_labels_name).is_none() {
2660 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2661 }
2662
2663 for prop_name in target_properties {
2665 let col_name = format!("{}.{}", target_variable, prop_name);
2666 if input_schema.column_with_name(&col_name).is_none() {
2667 let arrow_type = resolve_property_type(prop_name, label_props);
2668 fields.push(Field::new(&col_name, arrow_type, true));
2669 }
2670 }
2671
2672 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2674
2675 if let Some(step_var) = step_variable {
2677 fields.push(build_edge_list_field(step_var));
2678 }
2679
2680 if let Some(path_var) = path_variable
2682 && input_schema.column_with_name(path_var).is_none()
2683 {
2684 fields.push(build_path_struct_field(path_var));
2685 }
2686
2687 Arc::new(Schema::new(fields))
2688 }
2689}
2690
2691impl DisplayAs for GraphVariableLengthTraverseExec {
2692 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2693 write!(
2694 f,
2695 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2696 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2697 )
2698 }
2699}
2700
2701impl ExecutionPlan for GraphVariableLengthTraverseExec {
2702 fn name(&self) -> &str {
2703 "GraphVariableLengthTraverseExec"
2704 }
2705
2706 fn as_any(&self) -> &dyn Any {
2707 self
2708 }
2709
2710 fn schema(&self) -> SchemaRef {
2711 self.schema.clone()
2712 }
2713
2714 fn properties(&self) -> &Arc<PlanProperties> {
2715 &self.properties
2716 }
2717
2718 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2719 vec![&self.input]
2720 }
2721
2722 fn with_new_children(
2723 self: Arc<Self>,
2724 children: Vec<Arc<dyn ExecutionPlan>>,
2725 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2726 if children.len() != 1 {
2727 return Err(datafusion::error::DataFusionError::Plan(
2728 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2729 ));
2730 }
2731
2732 Ok(Arc::new(Self::new(
2734 children[0].clone(),
2735 self.source_column.clone(),
2736 self.edge_type_ids.clone(),
2737 self.direction,
2738 self.min_hops,
2739 self.max_hops,
2740 self.target_variable.clone(),
2741 self.step_variable.clone(),
2742 self.path_variable.clone(),
2743 self.target_properties.clone(),
2744 self.target_label_name.clone(),
2745 self.graph_ctx.clone(),
2746 self.is_optional,
2747 self.bound_target_column.clone(),
2748 self.edge_lance_filter.clone(),
2749 self.edge_property_conditions.clone(),
2750 self.used_edge_columns.clone(),
2751 self.path_mode.clone(),
2752 self.output_mode.clone(),
2753 Some((*self.nfa).clone()),
2754 )))
2755 }
2756
2757 fn execute(
2758 &self,
2759 partition: usize,
2760 context: Arc<TaskContext>,
2761 ) -> DFResult<SendableRecordBatchStream> {
2762 let input_stream = self.input.execute(partition, context)?;
2763
2764 let metrics = BaselineMetrics::new(&self.metrics, partition);
2765
2766 let warm_fut = self
2767 .graph_ctx
2768 .warming_future(self.edge_type_ids.clone(), self.direction);
2769
2770 Ok(Box::pin(GraphVariableLengthTraverseStream {
2771 input: input_stream,
2772 exec: Arc::new(self.clone_for_stream()),
2773 schema: self.schema.clone(),
2774 state: VarLengthStreamState::Warming(warm_fut),
2775 metrics,
2776 }))
2777 }
2778
2779 fn metrics(&self) -> Option<MetricsSet> {
2780 Some(self.metrics.clone_inner())
2781 }
2782}
2783
2784impl GraphVariableLengthTraverseExec {
2785 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2787 GraphVariableLengthTraverseExecData {
2788 source_column: self.source_column.clone(),
2789 edge_type_ids: self.edge_type_ids.clone(),
2790 direction: self.direction,
2791 min_hops: self.min_hops,
2792 max_hops: self.max_hops,
2793 target_variable: self.target_variable.clone(),
2794 step_variable: self.step_variable.clone(),
2795 path_variable: self.path_variable.clone(),
2796 target_properties: self.target_properties.clone(),
2797 target_label_name: self.target_label_name.clone(),
2798 is_optional: self.is_optional,
2799 bound_target_column: self.bound_target_column.clone(),
2800 edge_lance_filter: self.edge_lance_filter.clone(),
2801 edge_property_conditions: self.edge_property_conditions.clone(),
2802 used_edge_columns: self.used_edge_columns.clone(),
2803 path_mode: self.path_mode.clone(),
2804 output_mode: self.output_mode.clone(),
2805 nfa: self.nfa.clone(),
2806 graph_ctx: self.graph_ctx.clone(),
2807 }
2808 }
2809}
2810
2811#[expect(
2813 dead_code,
2814 reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
2815)]
2816struct GraphVariableLengthTraverseExecData {
2817 source_column: String,
2818 edge_type_ids: Vec<u32>,
2819 direction: Direction,
2820 min_hops: usize,
2821 max_hops: usize,
2822 target_variable: String,
2823 step_variable: Option<String>,
2824 path_variable: Option<String>,
2825 target_properties: Vec<String>,
2826 target_label_name: Option<String>,
2827 is_optional: bool,
2828 bound_target_column: Option<String>,
2829 #[expect(dead_code, reason = "Used in Phase 3 warming")]
2830 edge_lance_filter: Option<String>,
2831 edge_property_conditions: Vec<(String, UniValue)>,
2833 used_edge_columns: Vec<String>,
2834 path_mode: super::nfa::PathMode,
2835 output_mode: super::nfa::VlpOutputMode,
2836 nfa: Arc<PathNfa>,
2837 graph_ctx: Arc<GraphExecutionContext>,
2838}
2839
2840const MAX_FRONTIER_SIZE: usize = 500_000;
2842const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2844
2845impl GraphVariableLengthTraverseExecData {
2846 fn check_target_label(&self, vid: Vid) -> bool {
2848 if let Some(ref label_name) = self.target_label_name {
2849 let query_ctx = self.graph_ctx.query_context();
2850 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2851 Some(labels) => labels.contains(label_name),
2852 None => true, }
2854 } else {
2855 true
2856 }
2857 }
2858
2859 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2861 match constraint {
2862 super::nfa::VertexConstraint::Label(label_name) => {
2863 let query_ctx = self.graph_ctx.query_context();
2864 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2865 Some(labels) => labels.contains(label_name),
2866 None => true, }
2868 }
2869 }
2870 }
2871
2872 fn expand_neighbors(
2875 &self,
2876 vid: Vid,
2877 state: NfaStateId,
2878 eid_filter: &EidFilter,
2879 used_eids: &FxHashSet<u64>,
2880 ) -> Vec<(Vid, Eid, NfaStateId)> {
2881 let is_undirected = matches!(self.direction, Direction::Both);
2882 let mut results = Vec::new();
2883
2884 for transition in self.nfa.transitions_from(state) {
2885 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2886
2887 for &etype in &transition.edge_type_ids {
2888 for (neighbor, eid) in
2889 self.graph_ctx
2890 .get_neighbors(vid, etype, transition.direction)
2891 {
2892 if is_undirected && !seen_edges.insert(eid.as_u64()) {
2894 continue;
2895 }
2896
2897 if !eid_filter.contains(eid) {
2899 continue;
2900 }
2901
2902 if !self.edge_property_conditions.is_empty() {
2904 let query_ctx = self.graph_ctx.query_context();
2905 let passes = if let Some(props) =
2906 l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2907 {
2908 self.edge_property_conditions
2909 .iter()
2910 .all(|(name, expected)| {
2911 props.get(name).is_some_and(|actual| actual == expected)
2912 })
2913 } else {
2914 true
2918 };
2919 if !passes {
2920 continue;
2921 }
2922 }
2923
2924 if used_eids.contains(&eid.as_u64()) {
2926 continue;
2927 }
2928
2929 if let Some(constraint) = self.nfa.state_constraint(transition.to)
2931 && !self.check_state_constraint(neighbor, constraint)
2932 {
2933 continue;
2934 }
2935
2936 results.push((neighbor, eid, transition.to));
2937 }
2938 }
2939 }
2940
2941 results
2942 }
2943
2944 fn bfs_with_dag(
2949 &self,
2950 source: Vid,
2951 eid_filter: &EidFilter,
2952 used_eids: &FxHashSet<u64>,
2953 vid_filter: &VidFilter,
2954 ) -> Vec<BfsResult> {
2955 let nfa = &self.nfa;
2956 let selector = PathSelector::All;
2957 let mut dag = PredecessorDag::new(selector);
2958 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2959
2960 if nfa.is_accepting(nfa.start_state())
2962 && self.check_target_label(source)
2963 && vid_filter.contains(source)
2964 {
2965 accepting.push((source, nfa.start_state(), 0));
2966 }
2967
2968 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2970 let mut depth: u32 = 0;
2971
2972 while !frontier.is_empty() && depth < self.max_hops as u32 {
2973 depth += 1;
2974 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2975 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2976
2977 for &(vid, state) in &frontier {
2978 for (neighbor, eid, dst_state) in
2979 self.expand_neighbors(vid, state, eid_filter, used_eids)
2980 {
2981 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2983
2984 if seen_at_depth.insert((neighbor, dst_state)) {
2986 next_frontier.push((neighbor, dst_state));
2987
2988 if nfa.is_accepting(dst_state)
2990 && self.check_target_label(neighbor)
2991 && vid_filter.contains(neighbor)
2992 {
2993 accepting.push((neighbor, dst_state, depth));
2994 }
2995 }
2996 }
2997 }
2998
2999 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3001 break;
3002 }
3003
3004 frontier = next_frontier;
3005 }
3006
3007 let mut results: Vec<BfsResult> = Vec::new();
3009 for &(target, state, depth) in &accepting {
3010 dag.enumerate_paths(
3011 source,
3012 target,
3013 state,
3014 depth,
3015 depth,
3016 &self.path_mode,
3017 &mut |nodes, edges| {
3018 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
3019 std::ops::ControlFlow::Continue(())
3020 },
3021 );
3022 }
3023
3024 results
3025 }
3026
3027 fn bfs_endpoints_only(
3032 &self,
3033 source: Vid,
3034 eid_filter: &EidFilter,
3035 used_eids: &FxHashSet<u64>,
3036 vid_filter: &VidFilter,
3037 ) -> Vec<(Vid, u32)> {
3038 let nfa = &self.nfa;
3039 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
3041 let mut results: Vec<(Vid, u32)> = Vec::new();
3042
3043 if nfa.is_accepting(nfa.start_state())
3045 && self.check_target_label(source)
3046 && vid_filter.contains(source)
3047 {
3048 results.push((source, 0));
3049 }
3050
3051 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
3053 let mut depth: u32 = 0;
3054
3055 while !frontier.is_empty() && depth < self.max_hops as u32 {
3056 depth += 1;
3057 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
3058 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
3059
3060 for &(vid, state) in &frontier {
3061 for (neighbor, eid, dst_state) in
3062 self.expand_neighbors(vid, state, eid_filter, used_eids)
3063 {
3064 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
3065
3066 if seen_at_depth.insert((neighbor, dst_state)) {
3067 next_frontier.push((neighbor, dst_state));
3068
3069 if nfa.is_accepting(dst_state)
3071 && self.check_target_label(neighbor)
3072 && vid_filter.contains(neighbor)
3073 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3074 {
3075 results.push((neighbor, depth));
3076 }
3077 }
3078 }
3079 }
3080
3081 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3082 break;
3083 }
3084
3085 frontier = next_frontier;
3086 }
3087
3088 results
3089 }
3090}
3091
3092enum VarLengthStreamState {
3094 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3096 Reading,
3098 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3100 Done,
3102}
3103
3104struct GraphVariableLengthTraverseStream {
3106 input: SendableRecordBatchStream,
3107 exec: Arc<GraphVariableLengthTraverseExecData>,
3108 schema: SchemaRef,
3109 state: VarLengthStreamState,
3110 metrics: BaselineMetrics,
3111}
3112
3113impl Stream for GraphVariableLengthTraverseStream {
3114 type Item = DFResult<RecordBatch>;
3115
3116 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3117 let metrics = self.metrics.clone();
3118 let _timer = metrics.elapsed_compute().timer();
3119 loop {
3120 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3121
3122 match state {
3123 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3124 Poll::Ready(Ok(())) => {
3125 self.state = VarLengthStreamState::Reading;
3126 }
3128 Poll::Ready(Err(e)) => {
3129 self.state = VarLengthStreamState::Done;
3130 return Poll::Ready(Some(Err(e)));
3131 }
3132 Poll::Pending => {
3133 self.state = VarLengthStreamState::Warming(fut);
3134 return Poll::Pending;
3135 }
3136 },
3137 VarLengthStreamState::Reading => {
3138 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3140 return Poll::Ready(Some(Err(
3141 datafusion::error::DataFusionError::Execution(e.to_string()),
3142 )));
3143 }
3144
3145 match self.input.poll_next_unpin(cx) {
3146 Poll::Ready(Some(Ok(batch))) => {
3147 let eid_filter = EidFilter::AllAllowed;
3150 let vid_filter = VidFilter::AllAllowed;
3151 let base_result =
3152 self.process_batch_base(batch, &eid_filter, &vid_filter);
3153 let base_batch = match base_result {
3154 Ok(b) => b,
3155 Err(e) => {
3156 self.state = VarLengthStreamState::Reading;
3157 return Poll::Ready(Some(Err(e)));
3158 }
3159 };
3160
3161 if self.exec.target_properties.is_empty() {
3163 self.state = VarLengthStreamState::Reading;
3164 return Poll::Ready(Some(Ok(base_batch)));
3165 }
3166
3167 let schema = self.schema.clone();
3169 let target_variable = self.exec.target_variable.clone();
3170 let target_properties = self.exec.target_properties.clone();
3171 let target_label_name = self.exec.target_label_name.clone();
3172 let graph_ctx = self.exec.graph_ctx.clone();
3173
3174 let fut = hydrate_vlp_target_properties(
3175 base_batch,
3176 schema,
3177 target_variable,
3178 target_properties,
3179 target_label_name,
3180 graph_ctx,
3181 );
3182
3183 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3184 }
3186 Poll::Ready(Some(Err(e))) => {
3187 self.state = VarLengthStreamState::Done;
3188 return Poll::Ready(Some(Err(e)));
3189 }
3190 Poll::Ready(None) => {
3191 self.state = VarLengthStreamState::Done;
3192 return Poll::Ready(None);
3193 }
3194 Poll::Pending => {
3195 self.state = VarLengthStreamState::Reading;
3196 return Poll::Pending;
3197 }
3198 }
3199 }
3200 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3201 Poll::Ready(Ok(batch)) => {
3202 self.state = VarLengthStreamState::Reading;
3203 self.metrics.record_output(batch.num_rows());
3204 return Poll::Ready(Some(Ok(batch)));
3205 }
3206 Poll::Ready(Err(e)) => {
3207 self.state = VarLengthStreamState::Done;
3208 return Poll::Ready(Some(Err(e)));
3209 }
3210 Poll::Pending => {
3211 self.state = VarLengthStreamState::Materializing(fut);
3212 return Poll::Pending;
3213 }
3214 },
3215 VarLengthStreamState::Done => {
3216 return Poll::Ready(None);
3217 }
3218 }
3219 }
3220 }
3221}
3222
3223impl GraphVariableLengthTraverseStream {
3224 fn process_batch_base(
3225 &self,
3226 batch: RecordBatch,
3227 eid_filter: &EidFilter,
3228 vid_filter: &VidFilter,
3229 ) -> DFResult<RecordBatch> {
3230 let source_col = batch
3231 .column_by_name(&self.exec.source_column)
3232 .ok_or_else(|| {
3233 datafusion::error::DataFusionError::Execution(format!(
3234 "Source column '{}' not found",
3235 self.exec.source_column
3236 ))
3237 })?;
3238
3239 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3240 let source_vids: &UInt64Array = &source_vid_cow;
3241
3242 let bound_target_cow = self
3244 .exec
3245 .bound_target_column
3246 .as_ref()
3247 .and_then(|col| batch.column_by_name(col))
3248 .map(|c| column_as_vid_array(c.as_ref()))
3249 .transpose()?;
3250 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3251
3252 let used_edge_arrays: Vec<&UInt64Array> = self
3254 .exec
3255 .used_edge_columns
3256 .iter()
3257 .filter_map(|col| {
3258 batch
3259 .column_by_name(col)?
3260 .as_any()
3261 .downcast_ref::<UInt64Array>()
3262 })
3263 .collect();
3264
3265 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3267
3268 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3269 let mut emitted_for_row = false;
3270
3271 if let Some(src) = source_vid {
3272 let vid = Vid::from(src);
3273
3274 let used_eids: FxHashSet<u64> = used_edge_arrays
3276 .iter()
3277 .filter_map(|arr| {
3278 if arr.is_null(row_idx) {
3279 None
3280 } else {
3281 Some(arr.value(row_idx))
3282 }
3283 })
3284 .collect();
3285
3286 match &self.exec.output_mode {
3288 VlpOutputMode::EndpointsOnly => {
3289 let endpoints = self
3290 .exec
3291 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3292 for (target, depth) in endpoints {
3293 if let Some(targets) = expected_targets {
3295 if targets.is_null(row_idx) {
3296 continue;
3297 }
3298 if target.as_u64() != targets.value(row_idx) {
3299 continue;
3300 }
3301 }
3302 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3303 emitted_for_row = true;
3304 }
3305 }
3306 _ => {
3307 let bfs_results = self
3309 .exec
3310 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3311 for (target, hop_count, node_path, edge_path) in bfs_results {
3312 if let Some(targets) = expected_targets {
3314 if targets.is_null(row_idx) {
3315 continue;
3316 }
3317 if target.as_u64() != targets.value(row_idx) {
3318 continue;
3319 }
3320 }
3321 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3322 emitted_for_row = true;
3323 }
3324 }
3325 }
3326 }
3327
3328 if self.exec.is_optional && !emitted_for_row {
3329 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3332 }
3333 }
3334
3335 self.build_output_batch(&batch, &expansions)
3336 }
3337
3338 fn build_output_batch(
3339 &self,
3340 input: &RecordBatch,
3341 expansions: &[VarLengthExpansion],
3342 ) -> DFResult<RecordBatch> {
3343 if expansions.is_empty() {
3344 return Ok(RecordBatch::new_empty(self.schema.clone()));
3345 }
3346
3347 let num_rows = expansions.len();
3348
3349 let indices: Vec<u64> = expansions
3351 .iter()
3352 .map(|(idx, _, _, _, _)| *idx as u64)
3353 .collect();
3354 let indices_array = UInt64Array::from(indices);
3355
3356 let mut columns: Vec<ArrayRef> = Vec::new();
3358 for col in input.columns() {
3359 let expanded = take(col.as_ref(), &indices_array, None)?;
3360 columns.push(expanded);
3361 }
3362
3363 let unmatched_rows: Vec<bool> = expansions
3367 .iter()
3368 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3369 .collect();
3370 let target_vids: Vec<Option<u64>> = expansions
3371 .iter()
3372 .zip(unmatched_rows.iter())
3373 .map(
3374 |((_, vid, _, _, _), unmatched)| {
3375 if *unmatched { None } else { Some(vid.as_u64()) }
3376 },
3377 )
3378 .collect();
3379
3380 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3382 if input.schema().column_with_name(&target_vid_name).is_none() {
3383 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3384 }
3385
3386 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3388 if input
3389 .schema()
3390 .column_with_name(&target_labels_name)
3391 .is_none()
3392 {
3393 use arrow_array::builder::{ListBuilder, StringBuilder};
3394 let query_ctx = self.exec.graph_ctx.query_context();
3395 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3396 for target_vid in &target_vids {
3397 let Some(vid_u64) = target_vid else {
3398 labels_builder.append(false);
3399 continue;
3400 };
3401 let vid = Vid::from(*vid_u64);
3402 let row_labels: Vec<String> =
3403 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3404 Some(labels) => {
3405 labels
3407 }
3408 None => {
3409 if let Some(ref label_name) = self.exec.target_label_name {
3411 vec![label_name.clone()]
3412 } else {
3413 vec![]
3414 }
3415 }
3416 };
3417 let values = labels_builder.values();
3418 for lbl in &row_labels {
3419 values.append_value(lbl);
3420 }
3421 labels_builder.append(true);
3422 }
3423 columns.push(Arc::new(labels_builder.finish()));
3424 }
3425
3426 for prop_name in &self.exec.target_properties {
3428 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3429 if input.schema().column_with_name(&full_prop_name).is_none() {
3430 let col_idx = columns.len();
3431 if col_idx < self.schema.fields().len() {
3432 let field = self.schema.field(col_idx);
3433 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3434 }
3435 }
3436 }
3437
3438 let hop_counts: Vec<u64> = expansions
3440 .iter()
3441 .map(|(_, _, hops, _, _)| *hops as u64)
3442 .collect();
3443 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3444
3445 if self.exec.step_variable.is_some() {
3447 let mut edges_builder = new_edge_list_builder();
3448 let query_ctx = self.exec.graph_ctx.query_context();
3449
3450 for (_, _, _, node_path, edge_path) in expansions {
3451 if node_path.is_empty() && edge_path.is_empty() {
3452 edges_builder.append_null();
3454 } else if edge_path.is_empty() {
3455 edges_builder.append(true);
3457 } else {
3458 for (i, eid) in edge_path.iter().enumerate() {
3459 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3460 .unwrap_or_else(|| "UNKNOWN".to_string());
3461 append_edge_to_struct(
3462 edges_builder.values(),
3463 *eid,
3464 &type_name,
3465 node_path[i].as_u64(),
3466 node_path[i + 1].as_u64(),
3467 &query_ctx,
3468 );
3469 }
3470 edges_builder.append(true);
3471 }
3472 }
3473
3474 columns.push(Arc::new(edges_builder.finish()));
3475 }
3476
3477 if let Some(path_var_name) = &self.exec.path_variable {
3482 let existing_path_col_idx = input
3483 .schema()
3484 .column_with_name(path_var_name)
3485 .map(|(idx, _)| idx);
3486 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3488 let existing_path = existing_path_arc
3489 .as_ref()
3490 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3491
3492 let mut nodes_builder = new_node_list_builder();
3493 let mut rels_builder = new_edge_list_builder();
3494 let query_ctx = self.exec.graph_ctx.query_context();
3495 let mut path_validity = Vec::with_capacity(expansions.len());
3496
3497 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3498 if node_path.is_empty() && edge_path.is_empty() {
3499 nodes_builder.append(false);
3500 rels_builder.append(false);
3501 path_validity.push(false);
3502 continue;
3503 }
3504
3505 let skip_first_vlp_node = if let Some(existing) = existing_path {
3507 if !existing.is_null(row_out_idx) {
3508 prepend_existing_path(
3509 existing,
3510 row_out_idx,
3511 &mut nodes_builder,
3512 &mut rels_builder,
3513 &query_ctx,
3514 );
3515 true
3516 } else {
3517 false
3518 }
3519 } else {
3520 false
3521 };
3522
3523 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3525 for vid in &node_path[start_idx..] {
3526 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3527 }
3528 nodes_builder.append(true);
3529
3530 for (i, eid) in edge_path.iter().enumerate() {
3531 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3532 .unwrap_or_else(|| "UNKNOWN".to_string());
3533 append_edge_to_struct(
3534 rels_builder.values(),
3535 *eid,
3536 &type_name,
3537 node_path[i].as_u64(),
3538 node_path[i + 1].as_u64(),
3539 &query_ctx,
3540 );
3541 }
3542 rels_builder.append(true);
3543 path_validity.push(true);
3544 }
3545
3546 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3548 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3549
3550 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3552 let rels_field = Arc::new(Field::new(
3553 "relationships",
3554 rels_array.data_type().clone(),
3555 true,
3556 ));
3557
3558 let path_struct = arrow_array::StructArray::try_new(
3560 vec![nodes_field, rels_field].into(),
3561 vec![nodes_array, rels_array],
3562 Some(arrow::buffer::NullBuffer::from(path_validity)),
3563 )
3564 .map_err(arrow_err)?;
3565
3566 if let Some(idx) = existing_path_col_idx {
3567 columns[idx] = Arc::new(path_struct);
3568 } else {
3569 columns.push(Arc::new(path_struct));
3570 }
3571 }
3572
3573 self.metrics.record_output(num_rows);
3574
3575 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
3576 }
3577}
3578
3579impl RecordBatchStream for GraphVariableLengthTraverseStream {
3580 fn schema(&self) -> SchemaRef {
3581 self.schema.clone()
3582 }
3583}
3584
3585async fn hydrate_vlp_target_properties(
3590 base_batch: RecordBatch,
3591 schema: SchemaRef,
3592 target_variable: String,
3593 target_properties: Vec<String>,
3594 target_label_name: Option<String>,
3595 graph_ctx: Arc<GraphExecutionContext>,
3596) -> DFResult<RecordBatch> {
3597 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3598 return Ok(base_batch);
3599 }
3600
3601 let target_vid_col_name = format!("{}._vid", target_variable);
3608 let vid_col_idx = schema
3609 .fields()
3610 .iter()
3611 .enumerate()
3612 .rev()
3613 .find(|(_, f)| f.name() == &target_vid_col_name)
3614 .map(|(i, _)| i);
3615
3616 let Some(vid_col_idx) = vid_col_idx else {
3617 return Ok(base_batch);
3618 };
3619
3620 let vid_col = base_batch.column(vid_col_idx);
3621 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3622 let target_vid_array: &UInt64Array = &target_vid_cow;
3623
3624 let target_vids: Vec<Vid> = target_vid_array
3625 .iter()
3626 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3629 .collect();
3630
3631 let mut property_columns: Vec<ArrayRef> = Vec::new();
3633
3634 if let Some(ref label_name) = target_label_name {
3635 let property_manager = graph_ctx.property_manager();
3636 let query_ctx = graph_ctx.query_context();
3637
3638 let props_map = property_manager
3639 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3640 .await
3641 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3642
3643 let uni_schema = graph_ctx.storage().schema_manager().schema();
3644 let label_props = uni_schema.properties.get(label_name.as_str());
3645
3646 for prop_name in &target_properties {
3647 let data_type = resolve_property_type(prop_name, label_props);
3648 let column =
3649 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3650 property_columns.push(column);
3651 }
3652 } else {
3653 let non_internal_props: Vec<&str> = target_properties
3656 .iter()
3657 .filter(|p| *p != "_all_props")
3658 .map(|s| s.as_str())
3659 .collect();
3660 let property_manager = graph_ctx.property_manager();
3661 let query_ctx = graph_ctx.query_context();
3662
3663 let props_map = if !non_internal_props.is_empty() {
3664 property_manager
3665 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3666 .await
3667 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3668 } else {
3669 std::collections::HashMap::new()
3670 };
3671
3672 for prop_name in &target_properties {
3673 if prop_name == "_all_props" {
3674 use crate::query::df_graph::scan::encode_cypher_value;
3676 use arrow_array::builder::LargeBinaryBuilder;
3677
3678 let mut builder = LargeBinaryBuilder::new();
3679 let l0_ctx = graph_ctx.l0_context();
3680 for vid in &target_vids {
3681 let mut merged_props = serde_json::Map::new();
3682 if let Some(vid_props) = props_map.get(vid) {
3684 for (k, v) in vid_props.iter() {
3685 let json_val: serde_json::Value = v.clone().into();
3686 merged_props.insert(k.to_string(), json_val);
3687 }
3688 }
3689 for l0 in l0_ctx.iter_l0_buffers() {
3691 let guard = l0.read();
3692 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3693 for (k, v) in l0_props.iter() {
3694 let json_val: serde_json::Value = v.clone().into();
3695 merged_props.insert(k.to_string(), json_val);
3696 }
3697 }
3698 }
3699 if merged_props.is_empty() {
3700 builder.append_null();
3701 } else {
3702 let json = serde_json::Value::Object(merged_props);
3703 match encode_cypher_value(&json) {
3704 Ok(bytes) => builder.append_value(bytes),
3705 Err(_) => builder.append_null(),
3706 }
3707 }
3708 }
3709 property_columns.push(Arc::new(builder.finish()));
3710 } else {
3711 let column = build_property_column_static(
3712 &target_vids,
3713 &props_map,
3714 prop_name,
3715 &arrow::datatypes::DataType::LargeBinary,
3716 )?;
3717 property_columns.push(column);
3718 }
3719 }
3720 }
3721
3722 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3728 let mut prop_idx = 0;
3729 for (col_idx, field) in schema.fields().iter().enumerate() {
3730 let is_target_prop = col_idx > vid_col_idx
3731 && target_properties
3732 .iter()
3733 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3734 if is_target_prop && prop_idx < property_columns.len() {
3735 new_columns.push(property_columns[prop_idx].clone());
3736 prop_idx += 1;
3737 } else {
3738 new_columns.push(base_batch.column(col_idx).clone());
3739 }
3740 }
3741
3742 RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
3743}
3744
3745pub struct GraphVariableLengthTraverseMainExec {
3755 input: Arc<dyn ExecutionPlan>,
3757
3758 source_column: String,
3760
3761 type_names: Vec<String>,
3763
3764 direction: Direction,
3766
3767 min_hops: usize,
3769
3770 max_hops: usize,
3772
3773 target_variable: String,
3775
3776 step_variable: Option<String>,
3778
3779 path_variable: Option<String>,
3781
3782 target_properties: Vec<String>,
3784
3785 is_optional: bool,
3787
3788 bound_target_column: Option<String>,
3790
3791 edge_lance_filter: Option<String>,
3793
3794 edge_property_conditions: Vec<(String, UniValue)>,
3797
3798 used_edge_columns: Vec<String>,
3800
3801 path_mode: super::nfa::PathMode,
3803
3804 output_mode: super::nfa::VlpOutputMode,
3806
3807 graph_ctx: Arc<GraphExecutionContext>,
3809
3810 schema: SchemaRef,
3812
3813 properties: Arc<PlanProperties>,
3815
3816 metrics: ExecutionPlanMetricsSet,
3818}
3819
3820impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3821 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3822 f.debug_struct("GraphVariableLengthTraverseMainExec")
3823 .field("source_column", &self.source_column)
3824 .field("type_names", &self.type_names)
3825 .field("direction", &self.direction)
3826 .field("min_hops", &self.min_hops)
3827 .field("max_hops", &self.max_hops)
3828 .field("target_variable", &self.target_variable)
3829 .finish()
3830 }
3831}
3832
3833impl GraphVariableLengthTraverseMainExec {
3834 #[expect(clippy::too_many_arguments)]
3836 pub fn new(
3837 input: Arc<dyn ExecutionPlan>,
3838 source_column: impl Into<String>,
3839 type_names: Vec<String>,
3840 direction: Direction,
3841 min_hops: usize,
3842 max_hops: usize,
3843 target_variable: impl Into<String>,
3844 step_variable: Option<String>,
3845 path_variable: Option<String>,
3846 target_properties: Vec<String>,
3847 graph_ctx: Arc<GraphExecutionContext>,
3848 is_optional: bool,
3849 bound_target_column: Option<String>,
3850 edge_lance_filter: Option<String>,
3851 edge_property_conditions: Vec<(String, UniValue)>,
3852 used_edge_columns: Vec<String>,
3853 path_mode: super::nfa::PathMode,
3854 output_mode: super::nfa::VlpOutputMode,
3855 ) -> Self {
3856 let source_column = source_column.into();
3857 let target_variable = target_variable.into();
3858
3859 let schema = Self::build_schema(
3861 input.schema(),
3862 &target_variable,
3863 step_variable.as_deref(),
3864 path_variable.as_deref(),
3865 &target_properties,
3866 );
3867 let properties = compute_plan_properties(schema.clone());
3868
3869 Self {
3870 input,
3871 source_column,
3872 type_names,
3873 direction,
3874 min_hops,
3875 max_hops,
3876 target_variable,
3877 step_variable,
3878 path_variable,
3879 target_properties,
3880 is_optional,
3881 bound_target_column,
3882 edge_lance_filter,
3883 edge_property_conditions,
3884 used_edge_columns,
3885 path_mode,
3886 output_mode,
3887 graph_ctx,
3888 schema,
3889 properties,
3890 metrics: ExecutionPlanMetricsSet::new(),
3891 }
3892 }
3893
3894 fn build_schema(
3896 input_schema: SchemaRef,
3897 target_variable: &str,
3898 step_variable: Option<&str>,
3899 path_variable: Option<&str>,
3900 target_properties: &[String],
3901 ) -> SchemaRef {
3902 let mut fields: Vec<Field> = input_schema
3903 .fields()
3904 .iter()
3905 .map(|f| f.as_ref().clone())
3906 .collect();
3907
3908 let target_vid_name = format!("{}._vid", target_variable);
3910 if input_schema.column_with_name(&target_vid_name).is_none() {
3911 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3912 }
3913
3914 let target_labels_name = format!("{}._labels", target_variable);
3916 if input_schema.column_with_name(&target_labels_name).is_none() {
3917 fields.push(Field::new(target_labels_name, labels_data_type(), true));
3918 }
3919
3920 fields.push(Field::new("_hop_count", DataType::UInt64, false));
3922
3923 if let Some(step_var) = step_variable {
3926 fields.push(build_edge_list_field(step_var));
3927 }
3928
3929 if let Some(path_var) = path_variable
3931 && input_schema.column_with_name(path_var).is_none()
3932 {
3933 fields.push(build_path_struct_field(path_var));
3934 }
3935
3936 for prop in target_properties {
3939 let prop_name = format!("{}.{}", target_variable, prop);
3940 if input_schema.column_with_name(&prop_name).is_none() {
3941 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3942 }
3943 }
3944
3945 Arc::new(Schema::new(fields))
3946 }
3947}
3948
3949impl DisplayAs for GraphVariableLengthTraverseMainExec {
3950 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3951 write!(
3952 f,
3953 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3954 self.source_column, self.type_names, self.min_hops, self.max_hops
3955 )
3956 }
3957}
3958
3959impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3960 fn name(&self) -> &str {
3961 "GraphVariableLengthTraverseMainExec"
3962 }
3963
3964 fn as_any(&self) -> &dyn Any {
3965 self
3966 }
3967
3968 fn schema(&self) -> SchemaRef {
3969 self.schema.clone()
3970 }
3971
3972 fn properties(&self) -> &Arc<PlanProperties> {
3973 &self.properties
3974 }
3975
3976 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3977 vec![&self.input]
3978 }
3979
3980 fn with_new_children(
3981 self: Arc<Self>,
3982 children: Vec<Arc<dyn ExecutionPlan>>,
3983 ) -> DFResult<Arc<dyn ExecutionPlan>> {
3984 if children.len() != 1 {
3985 return Err(datafusion::error::DataFusionError::Plan(
3986 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3987 ));
3988 }
3989
3990 Ok(Arc::new(Self::new(
3991 children[0].clone(),
3992 self.source_column.clone(),
3993 self.type_names.clone(),
3994 self.direction,
3995 self.min_hops,
3996 self.max_hops,
3997 self.target_variable.clone(),
3998 self.step_variable.clone(),
3999 self.path_variable.clone(),
4000 self.target_properties.clone(),
4001 self.graph_ctx.clone(),
4002 self.is_optional,
4003 self.bound_target_column.clone(),
4004 self.edge_lance_filter.clone(),
4005 self.edge_property_conditions.clone(),
4006 self.used_edge_columns.clone(),
4007 self.path_mode.clone(),
4008 self.output_mode.clone(),
4009 )))
4010 }
4011
4012 fn execute(
4013 &self,
4014 partition: usize,
4015 context: Arc<TaskContext>,
4016 ) -> DFResult<SendableRecordBatchStream> {
4017 let input_stream = self.input.execute(partition, context)?;
4018 let metrics = BaselineMetrics::new(&self.metrics, partition);
4019
4020 let graph_ctx = self.graph_ctx.clone();
4022 let type_names = self.type_names.clone();
4023 let direction = self.direction;
4024 let load_fut =
4025 async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
4026
4027 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
4028 input: input_stream,
4029 source_column: self.source_column.clone(),
4030 type_names: self.type_names.clone(),
4031 direction: self.direction,
4032 min_hops: self.min_hops,
4033 max_hops: self.max_hops,
4034 target_variable: self.target_variable.clone(),
4035 step_variable: self.step_variable.clone(),
4036 path_variable: self.path_variable.clone(),
4037 target_properties: self.target_properties.clone(),
4038 graph_ctx: self.graph_ctx.clone(),
4039 is_optional: self.is_optional,
4040 bound_target_column: self.bound_target_column.clone(),
4041 edge_lance_filter: self.edge_lance_filter.clone(),
4042 edge_property_conditions: self.edge_property_conditions.clone(),
4043 used_edge_columns: self.used_edge_columns.clone(),
4044 path_mode: self.path_mode.clone(),
4045 output_mode: self.output_mode.clone(),
4046 schema: self.schema.clone(),
4047 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
4048 metrics,
4049 }))
4050 }
4051
4052 fn metrics(&self) -> Option<MetricsSet> {
4053 Some(self.metrics.clone_inner())
4054 }
4055}
4056
4057enum VarLengthMainStreamState {
4059 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
4061 Processing(EdgeAdjacencyMap),
4063 Materializing {
4065 adjacency: EdgeAdjacencyMap,
4066 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
4067 },
4068 Done,
4070}
4071
4072#[expect(dead_code, reason = "VLP fields used in Phase 3")]
4074struct GraphVariableLengthTraverseMainStream {
4075 input: SendableRecordBatchStream,
4076 source_column: String,
4077 type_names: Vec<String>,
4078 direction: Direction,
4079 min_hops: usize,
4080 max_hops: usize,
4081 target_variable: String,
4082 step_variable: Option<String>,
4084 path_variable: Option<String>,
4085 target_properties: Vec<String>,
4086 graph_ctx: Arc<GraphExecutionContext>,
4087 is_optional: bool,
4088 bound_target_column: Option<String>,
4089 edge_lance_filter: Option<String>,
4090 edge_property_conditions: Vec<(String, UniValue)>,
4092 used_edge_columns: Vec<String>,
4093 path_mode: super::nfa::PathMode,
4094 output_mode: super::nfa::VlpOutputMode,
4095 schema: SchemaRef,
4096 state: VarLengthMainStreamState,
4097 metrics: BaselineMetrics,
4098}
4099
4100type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4102
4103impl GraphVariableLengthTraverseMainStream {
4104 fn bfs(
4110 &self,
4111 source: Vid,
4112 adjacency: &EdgeAdjacencyMap,
4113 used_eids: &FxHashSet<u64>,
4114 ) -> Vec<MainBfsResult> {
4115 let mut results = Vec::new();
4116 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4117
4118 queue.push_back((source, 0, vec![source], vec![]));
4119
4120 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4121 if depth >= self.min_hops && depth <= self.max_hops {
4123 results.push((current, depth, node_path.clone(), edge_path.clone()));
4124 }
4125
4126 if depth >= self.max_hops {
4128 continue;
4129 }
4130
4131 if let Some(neighbors) = adjacency.get(¤t) {
4133 let is_undirected = matches!(self.direction, Direction::Both);
4134 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4135
4136 for (neighbor, eid, _edge_type, props) in neighbors {
4137 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4139 continue;
4140 }
4141
4142 if edge_path.contains(eid) {
4144 continue;
4145 }
4146
4147 if used_eids.contains(&eid.as_u64()) {
4150 continue;
4151 }
4152
4153 if !self.edge_property_conditions.is_empty() {
4155 let passes =
4156 self.edge_property_conditions
4157 .iter()
4158 .all(|(name, expected)| {
4159 props.get(name).is_some_and(|actual| actual == expected)
4160 });
4161 if !passes {
4162 continue;
4163 }
4164 }
4165
4166 let mut new_node_path = node_path.clone();
4167 new_node_path.push(*neighbor);
4168 let mut new_edge_path = edge_path.clone();
4169 new_edge_path.push(*eid);
4170 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4171 }
4172 }
4173 }
4174
4175 results
4176 }
4177
4178 fn process_batch(
4180 &self,
4181 batch: RecordBatch,
4182 adjacency: &EdgeAdjacencyMap,
4183 ) -> DFResult<RecordBatch> {
4184 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4185 datafusion::error::DataFusionError::Execution(format!(
4186 "Source column '{}' not found in input batch",
4187 self.source_column
4188 ))
4189 })?;
4190
4191 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4192 let source_vids: &UInt64Array = &source_vid_cow;
4193
4194 let bound_target_cow = self
4196 .bound_target_column
4197 .as_ref()
4198 .and_then(|col| batch.column_by_name(col))
4199 .map(|c| column_as_vid_array(c.as_ref()))
4200 .transpose()?;
4201 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4202
4203 let used_edge_arrays: Vec<&UInt64Array> = self
4205 .used_edge_columns
4206 .iter()
4207 .filter_map(|col| {
4208 batch
4209 .column_by_name(col)?
4210 .as_any()
4211 .downcast_ref::<UInt64Array>()
4212 })
4213 .collect();
4214
4215 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4217
4218 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4219 let mut emitted_for_row = false;
4220
4221 if let Some(source_u64) = source_opt {
4222 let source = Vid::from(source_u64);
4223
4224 let used_eids: FxHashSet<u64> = used_edge_arrays
4226 .iter()
4227 .filter_map(|arr| {
4228 if arr.is_null(row_idx) {
4229 None
4230 } else {
4231 Some(arr.value(row_idx))
4232 }
4233 })
4234 .collect();
4235
4236 let bfs_results = self.bfs(source, adjacency, &used_eids);
4237
4238 for (target, hops, node_path, edge_path) in bfs_results {
4239 if let Some(targets) = expected_targets {
4242 if targets.is_null(row_idx) {
4243 continue;
4244 }
4245 let expected_vid = targets.value(row_idx);
4246 if target.as_u64() != expected_vid {
4247 continue;
4248 }
4249 }
4250
4251 expansions.push((row_idx, target, hops, node_path, edge_path));
4252 emitted_for_row = true;
4253 }
4254 }
4255
4256 if self.is_optional && !emitted_for_row {
4257 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4259 }
4260 }
4261
4262 if expansions.is_empty() {
4263 if self.is_optional {
4264 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4265 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4266 }
4267 return Ok(RecordBatch::new_empty(self.schema.clone()));
4268 }
4269
4270 let num_rows = expansions.len();
4271 self.metrics.record_output(num_rows);
4272
4273 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4275
4276 for col_idx in 0..batch.num_columns() {
4278 let array = batch.column(col_idx);
4279 let indices: Vec<u64> = expansions
4280 .iter()
4281 .map(|(idx, _, _, _, _)| *idx as u64)
4282 .collect();
4283 let take_indices = UInt64Array::from(indices);
4284 let expanded = arrow::compute::take(array, &take_indices, None)?;
4285 columns.push(expanded);
4286 }
4287
4288 let target_vid_name = format!("{}._vid", self.target_variable);
4290 if batch.schema().column_with_name(&target_vid_name).is_none() {
4291 let target_vids: Vec<Option<u64>> = expansions
4292 .iter()
4293 .map(|(_, vid, _, node_path, edge_path)| {
4294 if node_path.is_empty() && edge_path.is_empty() {
4295 None
4296 } else {
4297 Some(vid.as_u64())
4298 }
4299 })
4300 .collect();
4301 columns.push(Arc::new(UInt64Array::from(target_vids)));
4302 }
4303
4304 let target_labels_name = format!("{}._labels", self.target_variable);
4306 if batch
4307 .schema()
4308 .column_with_name(&target_labels_name)
4309 .is_none()
4310 {
4311 use arrow_array::builder::{ListBuilder, StringBuilder};
4312 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4313 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4314 if node_path.is_empty() && edge_path.is_empty() {
4315 labels_builder.append(false);
4316 continue;
4317 }
4318 let mut row_labels: Vec<String> = Vec::new();
4319 let labels =
4320 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4321 for lbl in &labels {
4322 if !row_labels.contains(lbl) {
4323 row_labels.push(lbl.clone());
4324 }
4325 }
4326 let values = labels_builder.values();
4327 for lbl in &row_labels {
4328 values.append_value(lbl);
4329 }
4330 labels_builder.append(true);
4331 }
4332 columns.push(Arc::new(labels_builder.finish()));
4333 }
4334
4335 let hop_counts: Vec<u64> = expansions
4337 .iter()
4338 .map(|(_, _, hops, _, _)| *hops as u64)
4339 .collect();
4340 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4341
4342 if self.step_variable.is_some() {
4344 let mut edges_builder = new_edge_list_builder();
4345 let query_ctx = self.graph_ctx.query_context();
4346 let type_names_str = self.type_names.join("|");
4347
4348 for (_, _, _, node_path, edge_path) in expansions.iter() {
4349 if node_path.is_empty() && edge_path.is_empty() {
4350 edges_builder.append_null();
4351 } else if edge_path.is_empty() {
4352 edges_builder.append(true);
4354 } else {
4355 for (i, eid) in edge_path.iter().enumerate() {
4356 append_edge_to_struct(
4357 edges_builder.values(),
4358 *eid,
4359 &type_names_str,
4360 node_path[i].as_u64(),
4361 node_path[i + 1].as_u64(),
4362 &query_ctx,
4363 );
4364 }
4365 edges_builder.append(true);
4366 }
4367 }
4368
4369 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4370 }
4371
4372 if let Some(path_var_name) = &self.path_variable {
4376 let existing_path_col_idx = batch
4377 .schema()
4378 .column_with_name(path_var_name)
4379 .map(|(idx, _)| idx);
4380 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4381 let existing_path = existing_path_arc
4382 .as_ref()
4383 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4384
4385 let mut nodes_builder = new_node_list_builder();
4386 let mut rels_builder = new_edge_list_builder();
4387 let query_ctx = self.graph_ctx.query_context();
4388 let type_names_str = self.type_names.join("|");
4389 let mut path_validity = Vec::with_capacity(expansions.len());
4390
4391 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4392 if node_path.is_empty() && edge_path.is_empty() {
4393 nodes_builder.append(false);
4394 rels_builder.append(false);
4395 path_validity.push(false);
4396 continue;
4397 }
4398
4399 let skip_first_vlp_node = if let Some(existing) = existing_path {
4401 if !existing.is_null(row_out_idx) {
4402 prepend_existing_path(
4403 existing,
4404 row_out_idx,
4405 &mut nodes_builder,
4406 &mut rels_builder,
4407 &query_ctx,
4408 );
4409 true
4410 } else {
4411 false
4412 }
4413 } else {
4414 false
4415 };
4416
4417 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4419 for vid in &node_path[start_idx..] {
4420 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4421 }
4422 nodes_builder.append(true);
4423
4424 for (i, eid) in edge_path.iter().enumerate() {
4425 append_edge_to_struct(
4426 rels_builder.values(),
4427 *eid,
4428 &type_names_str,
4429 node_path[i].as_u64(),
4430 node_path[i + 1].as_u64(),
4431 &query_ctx,
4432 );
4433 }
4434 rels_builder.append(true);
4435 path_validity.push(true);
4436 }
4437
4438 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4440 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4441
4442 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4444 let rels_field = Arc::new(Field::new(
4445 "relationships",
4446 rels_array.data_type().clone(),
4447 true,
4448 ));
4449
4450 let path_struct = arrow_array::StructArray::try_new(
4452 vec![nodes_field, rels_field].into(),
4453 vec![nodes_array, rels_array],
4454 Some(arrow::buffer::NullBuffer::from(path_validity)),
4455 )
4456 .map_err(arrow_err)?;
4457
4458 if let Some(idx) = existing_path_col_idx {
4459 columns[idx] = Arc::new(path_struct);
4460 } else {
4461 columns.push(Arc::new(path_struct));
4462 }
4463 }
4464
4465 for prop_name in &self.target_properties {
4468 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4469 if batch.schema().column_with_name(&full_prop_name).is_none() {
4470 columns.push(arrow_array::new_null_array(
4471 &DataType::LargeBinary,
4472 num_rows,
4473 ));
4474 }
4475 }
4476
4477 RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
4478 }
4479}
4480
4481impl Stream for GraphVariableLengthTraverseMainStream {
4482 type Item = DFResult<RecordBatch>;
4483
4484 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4485 let metrics = self.metrics.clone();
4486 let _timer = metrics.elapsed_compute().timer();
4487 loop {
4488 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4489
4490 match state {
4491 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4492 Poll::Ready(Ok(adjacency)) => {
4493 self.state = VarLengthMainStreamState::Processing(adjacency);
4494 }
4496 Poll::Ready(Err(e)) => {
4497 self.state = VarLengthMainStreamState::Done;
4498 return Poll::Ready(Some(Err(e)));
4499 }
4500 Poll::Pending => {
4501 self.state = VarLengthMainStreamState::Loading(fut);
4502 return Poll::Pending;
4503 }
4504 },
4505 VarLengthMainStreamState::Processing(adjacency) => {
4506 match self.input.poll_next_unpin(cx) {
4507 Poll::Ready(Some(Ok(batch))) => {
4508 let base_batch = match self.process_batch(batch, &adjacency) {
4509 Ok(b) => b,
4510 Err(e) => {
4511 self.state = VarLengthMainStreamState::Processing(adjacency);
4512 return Poll::Ready(Some(Err(e)));
4513 }
4514 };
4515
4516 if self.target_properties.is_empty() {
4518 self.state = VarLengthMainStreamState::Processing(adjacency);
4519 return Poll::Ready(Some(Ok(base_batch)));
4520 }
4521
4522 let schema = self.schema.clone();
4524 let target_variable = self.target_variable.clone();
4525 let target_properties = self.target_properties.clone();
4526 let graph_ctx = self.graph_ctx.clone();
4527
4528 let fut = hydrate_vlp_target_properties(
4529 base_batch,
4530 schema,
4531 target_variable,
4532 target_properties,
4533 None, graph_ctx,
4535 );
4536
4537 self.state = VarLengthMainStreamState::Materializing {
4538 adjacency,
4539 fut: Box::pin(fut),
4540 };
4541 }
4543 Poll::Ready(Some(Err(e))) => {
4544 self.state = VarLengthMainStreamState::Done;
4545 return Poll::Ready(Some(Err(e)));
4546 }
4547 Poll::Ready(None) => {
4548 self.state = VarLengthMainStreamState::Done;
4549 return Poll::Ready(None);
4550 }
4551 Poll::Pending => {
4552 self.state = VarLengthMainStreamState::Processing(adjacency);
4553 return Poll::Pending;
4554 }
4555 }
4556 }
4557 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4558 match fut.as_mut().poll(cx) {
4559 Poll::Ready(Ok(batch)) => {
4560 self.state = VarLengthMainStreamState::Processing(adjacency);
4561 return Poll::Ready(Some(Ok(batch)));
4562 }
4563 Poll::Ready(Err(e)) => {
4564 self.state = VarLengthMainStreamState::Done;
4565 return Poll::Ready(Some(Err(e)));
4566 }
4567 Poll::Pending => {
4568 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4569 return Poll::Pending;
4570 }
4571 }
4572 }
4573 VarLengthMainStreamState::Done => {
4574 return Poll::Ready(None);
4575 }
4576 }
4577 }
4578 }
4579}
4580
4581impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4582 fn schema(&self) -> SchemaRef {
4583 self.schema.clone()
4584 }
4585}
4586
4587#[cfg(test)]
4588mod tests {
4589 use super::*;
4590
4591 #[test]
4592 fn test_traverse_schema_without_edge() {
4593 let input_schema = Arc::new(Schema::new(vec![Field::new(
4594 "a._vid",
4595 DataType::UInt64,
4596 false,
4597 )]));
4598
4599 let output_schema =
4600 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4601
4602 assert_eq!(output_schema.fields().len(), 4);
4604 assert_eq!(output_schema.field(0).name(), "a._vid");
4605 assert_eq!(output_schema.field(1).name(), "m._vid");
4606 assert_eq!(output_schema.field(2).name(), "m._labels");
4607 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4608 }
4609
4610 #[test]
4611 fn test_traverse_schema_with_edge() {
4612 let input_schema = Arc::new(Schema::new(vec![Field::new(
4613 "a._vid",
4614 DataType::UInt64,
4615 false,
4616 )]));
4617
4618 let output_schema = GraphTraverseExec::build_schema(
4619 input_schema,
4620 "m",
4621 Some("r"),
4622 &[],
4623 &[],
4624 None,
4625 None,
4626 false,
4627 );
4628
4629 assert_eq!(output_schema.fields().len(), 5);
4631 assert_eq!(output_schema.field(0).name(), "a._vid");
4632 assert_eq!(output_schema.field(1).name(), "m._vid");
4633 assert_eq!(output_schema.field(2).name(), "m._labels");
4634 assert_eq!(output_schema.field(3).name(), "r._eid");
4635 assert_eq!(output_schema.field(4).name(), "r._type");
4636 }
4637
4638 #[test]
4639 fn test_traverse_schema_with_target_properties() {
4640 let input_schema = Arc::new(Schema::new(vec![Field::new(
4641 "a._vid",
4642 DataType::UInt64,
4643 false,
4644 )]));
4645
4646 let target_props = vec!["name".to_string(), "age".to_string()];
4647 let output_schema = GraphTraverseExec::build_schema(
4648 input_schema,
4649 "m",
4650 Some("r"),
4651 &[],
4652 &target_props,
4653 None,
4654 None,
4655 false,
4656 );
4657
4658 assert_eq!(output_schema.fields().len(), 7);
4660 assert_eq!(output_schema.field(0).name(), "a._vid");
4661 assert_eq!(output_schema.field(1).name(), "m._vid");
4662 assert_eq!(output_schema.field(2).name(), "m._labels");
4663 assert_eq!(output_schema.field(3).name(), "m.name");
4664 assert_eq!(output_schema.field(4).name(), "m.age");
4665 assert_eq!(output_schema.field(5).name(), "r._eid");
4666 assert_eq!(output_schema.field(6).name(), "r._type");
4667 }
4668
4669 #[test]
4670 fn test_variable_length_schema() {
4671 let input_schema = Arc::new(Schema::new(vec![Field::new(
4672 "a._vid",
4673 DataType::UInt64,
4674 false,
4675 )]));
4676
4677 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4678 input_schema,
4679 "b",
4680 None,
4681 Some("p"),
4682 &[],
4683 None,
4684 );
4685
4686 assert_eq!(output_schema.fields().len(), 5);
4687 assert_eq!(output_schema.field(0).name(), "a._vid");
4688 assert_eq!(output_schema.field(1).name(), "b._vid");
4689 assert_eq!(output_schema.field(2).name(), "b._labels");
4690 assert_eq!(output_schema.field(3).name(), "_hop_count");
4691 assert_eq!(output_schema.field(4).name(), "p");
4692 }
4693
4694 #[test]
4695 fn test_traverse_main_schema_without_edge() {
4696 let input_schema = Arc::new(Schema::new(vec![Field::new(
4697 "a._vid",
4698 DataType::UInt64,
4699 false,
4700 )]));
4701
4702 let output_schema =
4703 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4704
4705 assert_eq!(output_schema.fields().len(), 4);
4707 assert_eq!(output_schema.field(0).name(), "a._vid");
4708 assert_eq!(output_schema.field(1).name(), "m._vid");
4709 assert_eq!(output_schema.field(2).name(), "m._labels");
4710 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4711 }
4712
4713 #[test]
4714 fn test_traverse_main_schema_with_edge() {
4715 let input_schema = Arc::new(Schema::new(vec![Field::new(
4716 "a._vid",
4717 DataType::UInt64,
4718 false,
4719 )]));
4720
4721 let output_schema = GraphTraverseMainExec::build_schema(
4722 &input_schema,
4723 "m",
4724 &Some("r".to_string()),
4725 &[],
4726 &[],
4727 false,
4728 );
4729
4730 assert_eq!(output_schema.fields().len(), 5);
4732 assert_eq!(output_schema.field(0).name(), "a._vid");
4733 assert_eq!(output_schema.field(1).name(), "m._vid");
4734 assert_eq!(output_schema.field(2).name(), "m._labels");
4735 assert_eq!(output_schema.field(3).name(), "r._eid");
4736 assert_eq!(output_schema.field(4).name(), "r._type");
4737 }
4738
4739 #[test]
4740 fn test_traverse_main_schema_with_edge_properties() {
4741 let input_schema = Arc::new(Schema::new(vec![Field::new(
4742 "a._vid",
4743 DataType::UInt64,
4744 false,
4745 )]));
4746
4747 let edge_props = vec!["weight".to_string(), "since".to_string()];
4748 let output_schema = GraphTraverseMainExec::build_schema(
4749 &input_schema,
4750 "m",
4751 &Some("r".to_string()),
4752 &edge_props,
4753 &[],
4754 false,
4755 );
4756
4757 assert_eq!(output_schema.fields().len(), 7);
4759 assert_eq!(output_schema.field(0).name(), "a._vid");
4760 assert_eq!(output_schema.field(1).name(), "m._vid");
4761 assert_eq!(output_schema.field(2).name(), "m._labels");
4762 assert_eq!(output_schema.field(3).name(), "r._eid");
4763 assert_eq!(output_schema.field(4).name(), "r._type");
4764 assert_eq!(output_schema.field(5).name(), "r.weight");
4765 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4766 assert_eq!(output_schema.field(6).name(), "r.since");
4767 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4768 }
4769
4770 #[test]
4771 fn test_traverse_main_schema_with_target_properties() {
4772 let input_schema = Arc::new(Schema::new(vec![Field::new(
4773 "a._vid",
4774 DataType::UInt64,
4775 false,
4776 )]));
4777
4778 let target_props = vec!["name".to_string(), "age".to_string()];
4779 let output_schema = GraphTraverseMainExec::build_schema(
4780 &input_schema,
4781 "m",
4782 &Some("r".to_string()),
4783 &[],
4784 &target_props,
4785 false,
4786 );
4787
4788 assert_eq!(output_schema.fields().len(), 7);
4790 assert_eq!(output_schema.field(0).name(), "a._vid");
4791 assert_eq!(output_schema.field(1).name(), "m._vid");
4792 assert_eq!(output_schema.field(2).name(), "m._labels");
4793 assert_eq!(output_schema.field(3).name(), "r._eid");
4794 assert_eq!(output_schema.field(4).name(), "r._type");
4795 assert_eq!(output_schema.field(5).name(), "m.name");
4796 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4797 assert_eq!(output_schema.field(6).name(), "m.age");
4798 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4799 }
4800}