1use crate::query::df_graph::GraphExecutionContext;
33use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
34use crate::query::df_graph::common::{
35 append_edge_to_struct, append_node_to_struct, build_edge_list_field, build_path_struct_field,
36 column_as_vid_array, compute_plan_properties, labels_data_type, new_edge_list_builder,
37 new_node_list_builder,
38};
39use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
40use crate::query::df_graph::pred_dag::PredecessorDag;
41use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
42use arrow::compute::take;
43use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
44use arrow_schema::{DataType, Field, Schema, SchemaRef};
45use datafusion::common::Result as DFResult;
46use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
47use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
48use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
49use futures::{Stream, StreamExt};
50use fxhash::FxHashSet;
51use std::any::Any;
52use std::collections::{HashMap, HashSet, VecDeque};
53use std::fmt;
54use std::pin::Pin;
55use std::sync::Arc;
56use std::task::{Context, Poll};
57use uni_common::Value as UniValue;
58use uni_common::core::id::{Eid, Vid};
59use uni_store::runtime::l0_visibility;
60use uni_store::storage::direction::Direction;
61
62type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
64
65type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
67
68fn prepend_existing_path(
75 existing_path: &arrow_array::StructArray,
76 row_idx: usize,
77 nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
78 rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
79 query_ctx: &uni_store::runtime::context::QueryContext,
80) {
81 let nodes_list = existing_path
83 .column(0)
84 .as_any()
85 .downcast_ref::<arrow_array::ListArray>()
86 .unwrap();
87 let node_values = nodes_list.value(row_idx);
88 let node_struct = node_values
89 .as_any()
90 .downcast_ref::<arrow_array::StructArray>()
91 .unwrap();
92 let vid_col = node_struct
93 .column(0)
94 .as_any()
95 .downcast_ref::<UInt64Array>()
96 .unwrap();
97 for i in 0..vid_col.len() {
98 append_node_to_struct(
99 nodes_builder.values(),
100 Vid::from(vid_col.value(i)),
101 query_ctx,
102 );
103 }
104
105 let rels_list = existing_path
107 .column(1)
108 .as_any()
109 .downcast_ref::<arrow_array::ListArray>()
110 .unwrap();
111 let edge_values = rels_list.value(row_idx);
112 let edge_struct = edge_values
113 .as_any()
114 .downcast_ref::<arrow_array::StructArray>()
115 .unwrap();
116 let eid_col = edge_struct
117 .column(0)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap();
121 let type_col = edge_struct
122 .column(1)
123 .as_any()
124 .downcast_ref::<arrow_array::StringArray>()
125 .unwrap();
126 let src_col = edge_struct
127 .column(2)
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .unwrap();
131 let dst_col = edge_struct
132 .column(3)
133 .as_any()
134 .downcast_ref::<UInt64Array>()
135 .unwrap();
136 for i in 0..eid_col.len() {
137 append_edge_to_struct(
138 rels_builder.values(),
139 Eid::from(eid_col.value(i)),
140 type_col.value(i),
141 src_col.value(i),
142 dst_col.value(i),
143 query_ctx,
144 );
145 }
146}
147
148fn resolve_edge_property_type(
153 prop: &str,
154 schema_props: Option<
155 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
156 >,
157) -> DataType {
158 if prop == "overflow_json" {
159 DataType::LargeBinary
160 } else {
161 schema_props
162 .and_then(|props| props.get(prop))
163 .map(|meta| meta.r#type.to_arrow())
164 .unwrap_or(DataType::LargeBinary)
165 }
166}
167
168use crate::query::df_graph::common::merged_edge_schema_props;
169
170type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
172
173pub struct GraphTraverseExec {
198 input: Arc<dyn ExecutionPlan>,
200
201 source_column: String,
203
204 edge_type_ids: Vec<u32>,
206
207 direction: Direction,
209
210 target_variable: String,
212
213 edge_variable: Option<String>,
215
216 edge_properties: Vec<String>,
218
219 target_properties: Vec<String>,
221
222 target_label_name: Option<String>,
224
225 target_label_id: Option<u16>,
227
228 graph_ctx: Arc<GraphExecutionContext>,
230
231 optional: bool,
233
234 optional_pattern_vars: HashSet<String>,
237
238 bound_target_column: Option<String>,
241
242 used_edge_columns: Vec<String>,
245
246 schema: SchemaRef,
248
249 properties: PlanProperties,
251
252 metrics: ExecutionPlanMetricsSet,
254}
255
256impl fmt::Debug for GraphTraverseExec {
257 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
258 f.debug_struct("GraphTraverseExec")
259 .field("source_column", &self.source_column)
260 .field("edge_type_ids", &self.edge_type_ids)
261 .field("direction", &self.direction)
262 .field("target_variable", &self.target_variable)
263 .field("edge_variable", &self.edge_variable)
264 .finish()
265 }
266}
267
268impl GraphTraverseExec {
269 #[expect(clippy::too_many_arguments)]
285 pub fn new(
286 input: Arc<dyn ExecutionPlan>,
287 source_column: impl Into<String>,
288 edge_type_ids: Vec<u32>,
289 direction: Direction,
290 target_variable: impl Into<String>,
291 edge_variable: Option<String>,
292 edge_properties: Vec<String>,
293 target_properties: Vec<String>,
294 target_label_name: Option<String>,
295 target_label_id: Option<u16>,
296 graph_ctx: Arc<GraphExecutionContext>,
297 optional: bool,
298 optional_pattern_vars: HashSet<String>,
299 bound_target_column: Option<String>,
300 used_edge_columns: Vec<String>,
301 ) -> Self {
302 let source_column = source_column.into();
303 let target_variable = target_variable.into();
304
305 let uni_schema = graph_ctx.storage().schema_manager().schema();
307 let label_props = target_label_name
308 .as_deref()
309 .and_then(|ln| uni_schema.properties.get(ln));
310 let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
311 let edge_props = if merged_edge_props.is_empty() {
312 None
313 } else {
314 Some(&merged_edge_props)
315 };
316
317 let schema = Self::build_schema(
319 input.schema(),
320 &target_variable,
321 edge_variable.as_deref(),
322 &edge_properties,
323 &target_properties,
324 label_props,
325 edge_props,
326 optional,
327 );
328
329 let properties = compute_plan_properties(schema.clone());
330
331 Self {
332 input,
333 source_column,
334 edge_type_ids,
335 direction,
336 target_variable,
337 edge_variable,
338 edge_properties,
339 target_properties,
340 target_label_name,
341 target_label_id,
342 graph_ctx,
343 optional,
344 optional_pattern_vars,
345 bound_target_column,
346 used_edge_columns,
347 schema,
348 properties,
349 metrics: ExecutionPlanMetricsSet::new(),
350 }
351 }
352
353 #[expect(
355 clippy::too_many_arguments,
356 reason = "Schema construction needs all field metadata"
357 )]
358 fn build_schema(
359 input_schema: SchemaRef,
360 target_variable: &str,
361 edge_variable: Option<&str>,
362 edge_properties: &[String],
363 target_properties: &[String],
364 label_props: Option<
365 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
366 >,
367 edge_props: Option<
368 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
369 >,
370 optional: bool,
371 ) -> SchemaRef {
372 let mut fields: Vec<Field> = input_schema
373 .fields()
374 .iter()
375 .map(|f| f.as_ref().clone())
376 .collect();
377
378 let target_vid_name = format!("{}._vid", target_variable);
380 fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
381
382 fields.push(Field::new(
384 format!("{}._labels", target_variable),
385 labels_data_type(),
386 true,
387 ));
388
389 for prop_name in target_properties {
391 let col_name = format!("{}.{}", target_variable, prop_name);
392 let arrow_type = resolve_property_type(prop_name, label_props);
393 fields.push(Field::new(&col_name, arrow_type, true));
394 }
395
396 if let Some(edge_var) = edge_variable {
398 let edge_id_name = format!("{}._eid", edge_var);
399 fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
400
401 fields.push(Field::new(
403 format!("{}._type", edge_var),
404 DataType::Utf8,
405 true,
406 ));
407
408 for prop_name in edge_properties {
410 let prop_col_name = format!("{}.{}", edge_var, prop_name);
411 let arrow_type = resolve_edge_property_type(prop_name, edge_props);
412 fields.push(Field::new(&prop_col_name, arrow_type, true));
413 }
414 } else {
415 let internal_eid_name = format!("__eid_to_{}", target_variable);
418 fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
419 }
420
421 Arc::new(Schema::new(fields))
422 }
423}
424
425impl DisplayAs for GraphTraverseExec {
426 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
427 write!(
428 f,
429 "GraphTraverseExec: {} --[{:?}]--> {}",
430 self.source_column, self.edge_type_ids, self.target_variable
431 )?;
432 if let Some(ref edge_var) = self.edge_variable {
433 write!(f, " as {}", edge_var)?;
434 }
435 Ok(())
436 }
437}
438
439impl ExecutionPlan for GraphTraverseExec {
440 fn name(&self) -> &str {
441 "GraphTraverseExec"
442 }
443
444 fn as_any(&self) -> &dyn Any {
445 self
446 }
447
448 fn schema(&self) -> SchemaRef {
449 self.schema.clone()
450 }
451
452 fn properties(&self) -> &PlanProperties {
453 &self.properties
454 }
455
456 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
457 vec![&self.input]
458 }
459
460 fn with_new_children(
461 self: Arc<Self>,
462 children: Vec<Arc<dyn ExecutionPlan>>,
463 ) -> DFResult<Arc<dyn ExecutionPlan>> {
464 if children.len() != 1 {
465 return Err(datafusion::error::DataFusionError::Plan(
466 "GraphTraverseExec requires exactly one child".to_string(),
467 ));
468 }
469
470 Ok(Arc::new(Self::new(
471 children[0].clone(),
472 self.source_column.clone(),
473 self.edge_type_ids.clone(),
474 self.direction,
475 self.target_variable.clone(),
476 self.edge_variable.clone(),
477 self.edge_properties.clone(),
478 self.target_properties.clone(),
479 self.target_label_name.clone(),
480 self.target_label_id,
481 self.graph_ctx.clone(),
482 self.optional,
483 self.optional_pattern_vars.clone(),
484 self.bound_target_column.clone(),
485 self.used_edge_columns.clone(),
486 )))
487 }
488
489 fn execute(
490 &self,
491 partition: usize,
492 context: Arc<TaskContext>,
493 ) -> DFResult<SendableRecordBatchStream> {
494 let input_stream = self.input.execute(partition, context)?;
495
496 let metrics = BaselineMetrics::new(&self.metrics, partition);
497
498 let warm_fut = self
499 .graph_ctx
500 .warming_future(self.edge_type_ids.clone(), self.direction);
501
502 Ok(Box::pin(GraphTraverseStream {
503 input: input_stream,
504 source_column: self.source_column.clone(),
505 edge_type_ids: self.edge_type_ids.clone(),
506 direction: self.direction,
507 target_variable: self.target_variable.clone(),
508 edge_variable: self.edge_variable.clone(),
509 edge_properties: self.edge_properties.clone(),
510 target_properties: self.target_properties.clone(),
511 target_label_name: self.target_label_name.clone(),
512 graph_ctx: self.graph_ctx.clone(),
513 optional: self.optional,
514 optional_pattern_vars: self.optional_pattern_vars.clone(),
515 bound_target_column: self.bound_target_column.clone(),
516 used_edge_columns: self.used_edge_columns.clone(),
517 schema: self.schema.clone(),
518 state: TraverseStreamState::Warming(warm_fut),
519 metrics,
520 }))
521 }
522
523 fn metrics(&self) -> Option<MetricsSet> {
524 Some(self.metrics.clone_inner())
525 }
526}
527
528enum TraverseStreamState {
530 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
532 Reading,
534 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
536 Done,
538}
539
540struct GraphTraverseStream {
542 input: SendableRecordBatchStream,
544
545 source_column: String,
547
548 edge_type_ids: Vec<u32>,
550
551 direction: Direction,
553
554 #[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
556 target_variable: String,
557
558 edge_variable: Option<String>,
560
561 edge_properties: Vec<String>,
563
564 target_properties: Vec<String>,
566
567 target_label_name: Option<String>,
569
570 graph_ctx: Arc<GraphExecutionContext>,
572
573 optional: bool,
575
576 optional_pattern_vars: HashSet<String>,
578
579 bound_target_column: Option<String>,
581
582 used_edge_columns: Vec<String>,
584
585 schema: SchemaRef,
587
588 state: TraverseStreamState,
590
591 metrics: BaselineMetrics,
593}
594
595impl GraphTraverseStream {
596 fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
599 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
600 datafusion::error::DataFusionError::Execution(format!(
601 "Source column '{}' not found",
602 self.source_column
603 ))
604 })?;
605
606 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
607 let source_vids: &UInt64Array = &source_vid_cow;
608
609 let bound_target_cow = self
612 .bound_target_column
613 .as_ref()
614 .and_then(|col| batch.column_by_name(col))
615 .map(|c| column_as_vid_array(c.as_ref()))
616 .transpose()?;
617 let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
618
619 let used_edge_arrays: Vec<&UInt64Array> = self
621 .used_edge_columns
622 .iter()
623 .filter_map(|col| {
624 batch
625 .column_by_name(col)
626 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
627 })
628 .collect();
629
630 let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
631 let is_undirected = matches!(self.direction, Direction::Both);
632
633 for (row_idx, source_vid) in source_vids.iter().enumerate() {
634 let Some(src) = source_vid else {
635 continue;
636 };
637
638 let expected_target = bound_target_vids.map(|arr| {
644 if arr.is_null(row_idx) {
645 None
646 } else {
647 Some(arr.value(row_idx))
648 }
649 });
650
651 let used_eids: HashSet<u64> = used_edge_arrays
653 .iter()
654 .filter_map(|arr| {
655 if arr.is_null(row_idx) {
656 None
657 } else {
658 Some(arr.value(row_idx))
659 }
660 })
661 .collect();
662
663 let vid = Vid::from(src);
664 let mut seen_edges: HashSet<u64> = HashSet::new();
667
668 for &edge_type in &self.edge_type_ids {
669 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
670
671 for (target_vid, eid) in neighbors {
672 let eid_u64 = eid.as_u64();
673
674 if used_eids.contains(&eid_u64) {
676 continue;
677 }
678
679 if is_undirected && !seen_edges.insert(eid_u64) {
681 continue;
682 }
683
684 if let Some(expected_opt) = expected_target {
687 let Some(expected) = expected_opt else {
688 continue;
689 };
690 if target_vid.as_u64() != expected {
691 continue;
692 }
693 }
694
695 if let Some(ref label_name) = self.target_label_name {
698 let query_ctx = self.graph_ctx.query_context();
699 if let Some(vertex_labels) =
700 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
701 {
702 if !vertex_labels.contains(label_name) {
704 continue;
705 }
706 }
707 }
709
710 expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
711 }
712 }
713 }
714
715 Ok(expanded_rows)
716 }
717}
718
719fn build_target_labels_column(
721 target_vids: &[Vid],
722 target_label_name: &Option<String>,
723 graph_ctx: &GraphExecutionContext,
724) -> ArrayRef {
725 use arrow_array::builder::{ListBuilder, StringBuilder};
726 let mut labels_builder = ListBuilder::new(StringBuilder::new());
727 let query_ctx = graph_ctx.query_context();
728 for vid in target_vids {
729 let row_labels: Vec<String> =
730 match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
731 Some(labels) => labels,
732 None => {
733 if let Some(label_name) = target_label_name {
735 vec![label_name.clone()]
736 } else {
737 vec![]
738 }
739 }
740 };
741 let values = labels_builder.values();
742 for lbl in &row_labels {
743 values.append_value(lbl);
744 }
745 labels_builder.append(true);
746 }
747 Arc::new(labels_builder.finish())
748}
749
750async fn build_target_property_columns(
752 target_vids: &[Vid],
753 target_properties: &[String],
754 target_label_name: &Option<String>,
755 graph_ctx: &Arc<GraphExecutionContext>,
756) -> DFResult<Vec<ArrayRef>> {
757 let mut columns = Vec::new();
758
759 if let Some(label_name) = target_label_name {
760 let property_manager = graph_ctx.property_manager();
761 let query_ctx = graph_ctx.query_context();
762
763 let props_map = property_manager
764 .get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
765 .await
766 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
767
768 let uni_schema = graph_ctx.storage().schema_manager().schema();
769 let label_props = uni_schema.properties.get(label_name.as_str());
770
771 for prop_name in target_properties {
772 let data_type = resolve_property_type(prop_name, label_props);
773 let column =
774 build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
775 columns.push(column);
776 }
777 } else {
778 let non_internal_props: Vec<&str> = target_properties
780 .iter()
781 .filter(|p| *p != "_all_props")
782 .map(|s| s.as_str())
783 .collect();
784 let property_manager = graph_ctx.property_manager();
785 let query_ctx = graph_ctx.query_context();
786
787 let props_map = if !non_internal_props.is_empty() {
788 property_manager
789 .get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
790 .await
791 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
792 } else {
793 std::collections::HashMap::new()
794 };
795
796 for prop_name in target_properties {
797 if prop_name == "_all_props" {
798 columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
799 } else {
800 let column = build_property_column_static(
801 target_vids,
802 &props_map,
803 prop_name,
804 &arrow::datatypes::DataType::LargeBinary,
805 )?;
806 columns.push(column);
807 }
808 }
809 }
810
811 Ok(columns)
812}
813
814fn build_all_props_column(
816 target_vids: &[Vid],
817 props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
818 graph_ctx: &Arc<GraphExecutionContext>,
819) -> ArrayRef {
820 use crate::query::df_graph::scan::encode_cypher_value;
821 use arrow_array::builder::LargeBinaryBuilder;
822
823 let mut builder = LargeBinaryBuilder::new();
824 let l0_ctx = graph_ctx.l0_context();
825 for vid in target_vids {
826 let mut merged_props = serde_json::Map::new();
827 if let Some(vid_props) = props_map.get(vid) {
828 for (k, v) in vid_props.iter() {
829 let json_val: serde_json::Value = v.clone().into();
830 merged_props.insert(k.to_string(), json_val);
831 }
832 }
833 for l0 in l0_ctx.iter_l0_buffers() {
834 let guard = l0.read();
835 if let Some(l0_props) = guard.vertex_properties.get(vid) {
836 for (k, v) in l0_props.iter() {
837 let json_val: serde_json::Value = v.clone().into();
838 merged_props.insert(k.to_string(), json_val);
839 }
840 }
841 }
842 if merged_props.is_empty() {
843 builder.append_null();
844 } else {
845 let json = serde_json::Value::Object(merged_props);
846 match encode_cypher_value(&json) {
847 Ok(bytes) => builder.append_value(bytes),
848 Err(_) => builder.append_null(),
849 }
850 }
851 }
852 Arc::new(builder.finish())
853}
854
855async fn build_edge_columns(
857 expansions: &[(usize, Vid, u64, u32)],
858 edge_properties: &[String],
859 edge_type_ids: &[u32],
860 graph_ctx: &Arc<GraphExecutionContext>,
861) -> DFResult<Vec<ArrayRef>> {
862 let mut columns = Vec::new();
863
864 let eids: Vec<Eid> = expansions
865 .iter()
866 .map(|(_, _, eid, _)| Eid::from(*eid))
867 .collect();
868 let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
869 columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
870
871 {
873 let uni_schema = graph_ctx.storage().schema_manager().schema();
874 let mut type_builder = arrow_array::builder::StringBuilder::new();
875 for (_, _, _, edge_type_id) in expansions {
876 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
877 type_builder.append_value(&name);
878 } else {
879 type_builder.append_null();
880 }
881 }
882 columns.push(Arc::new(type_builder.finish()) as ArrayRef);
883 }
884
885 if !edge_properties.is_empty() {
886 let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
887 let property_manager = graph_ctx.property_manager();
888 let query_ctx = graph_ctx.query_context();
889
890 let props_map = property_manager
891 .get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
892 .await
893 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
894
895 let uni_schema = graph_ctx.storage().schema_manager().schema();
896 let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
897 let edge_type_props = if merged_edge_props.is_empty() {
898 None
899 } else {
900 Some(&merged_edge_props)
901 };
902
903 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
904
905 for prop_name in edge_properties {
906 let data_type = resolve_edge_property_type(prop_name, edge_type_props);
907 let column =
908 build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
909 columns.push(column);
910 }
911 }
912
913 Ok(columns)
914}
915
916#[expect(
921 clippy::too_many_arguments,
922 reason = "Standalone async fn needs all context passed explicitly"
923)]
924async fn build_traverse_output_batch(
925 input: RecordBatch,
926 expansions: Vec<(usize, Vid, u64, u32)>,
927 schema: SchemaRef,
928 edge_variable: Option<String>,
929 edge_properties: Vec<String>,
930 edge_type_ids: Vec<u32>,
931 target_properties: Vec<String>,
932 target_label_name: Option<String>,
933 graph_ctx: Arc<GraphExecutionContext>,
934 optional: bool,
935 optional_pattern_vars: HashSet<String>,
936) -> DFResult<RecordBatch> {
937 if expansions.is_empty() {
938 if !optional {
939 return Ok(RecordBatch::new_empty(schema));
940 }
941 let unmatched_reps = collect_unmatched_optional_group_rows(
942 &input,
943 &HashSet::new(),
944 &schema,
945 &optional_pattern_vars,
946 )?;
947 if unmatched_reps.is_empty() {
948 return Ok(RecordBatch::new_empty(schema));
949 }
950 return build_optional_null_batch_for_rows_with_optional_vars(
951 &input,
952 &unmatched_reps,
953 &schema,
954 &optional_pattern_vars,
955 );
956 }
957
958 let indices: Vec<u64> = expansions
960 .iter()
961 .map(|(idx, _, _, _)| *idx as u64)
962 .collect();
963 let indices_array = UInt64Array::from(indices);
964 let mut columns: Vec<ArrayRef> = input
965 .columns()
966 .iter()
967 .map(|col| take(col.as_ref(), &indices_array, None))
968 .collect::<Result<_, _>>()?;
969
970 let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
972 let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
973 columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
974
975 columns.push(build_target_labels_column(
977 &target_vids,
978 &target_label_name,
979 &graph_ctx,
980 ));
981
982 if !target_properties.is_empty() {
984 let prop_cols = build_target_property_columns(
985 &target_vids,
986 &target_properties,
987 &target_label_name,
988 &graph_ctx,
989 )
990 .await?;
991 columns.extend(prop_cols);
992 }
993
994 if edge_variable.is_some() {
996 let edge_cols =
997 build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
998 columns.extend(edge_cols);
999 } else {
1000 let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1001 columns.push(Arc::new(UInt64Array::from(eid_u64s)));
1002 }
1003
1004 let expanded_batch = RecordBatch::try_new(schema.clone(), columns)
1005 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
1006
1007 if optional {
1009 let matched_indices: HashSet<usize> =
1010 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1011 let unmatched = collect_unmatched_optional_group_rows(
1012 &input,
1013 &matched_indices,
1014 &schema,
1015 &optional_pattern_vars,
1016 )?;
1017
1018 if !unmatched.is_empty() {
1019 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1020 &input,
1021 &unmatched,
1022 &schema,
1023 &optional_pattern_vars,
1024 )?;
1025 let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
1026 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
1027 return Ok(combined);
1028 }
1029 }
1030
1031 Ok(expanded_batch)
1032}
1033
1034fn build_optional_null_batch_for_rows(
1037 input: &RecordBatch,
1038 unmatched_indices: &[usize],
1039 schema: &SchemaRef,
1040) -> DFResult<RecordBatch> {
1041 let num_rows = unmatched_indices.len();
1042 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1043 let indices_array = UInt64Array::from(indices);
1044
1045 let mut columns: Vec<ArrayRef> = Vec::new();
1047 for col in input.columns() {
1048 let taken = take(col.as_ref(), &indices_array, None)?;
1049 columns.push(taken);
1050 }
1051 for field in schema.fields().iter().skip(input.num_columns()) {
1053 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1054 }
1055 RecordBatch::try_new(schema.clone(), columns)
1056 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1057}
1058
1059fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
1060 optional_vars.contains(col_name)
1061 || optional_vars.iter().any(|var| {
1062 (col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
1063 || (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
1064 })
1065}
1066
1067fn collect_unmatched_optional_group_rows(
1068 input: &RecordBatch,
1069 matched_indices: &HashSet<usize>,
1070 schema: &SchemaRef,
1071 optional_vars: &HashSet<String>,
1072) -> DFResult<Vec<usize>> {
1073 if input.num_rows() == 0 {
1074 return Ok(Vec::new());
1075 }
1076
1077 if optional_vars.is_empty() {
1078 return Ok((0..input.num_rows())
1079 .filter(|idx| !matched_indices.contains(idx))
1080 .collect());
1081 }
1082
1083 let source_vid_indices: Vec<usize> = schema
1084 .fields()
1085 .iter()
1086 .enumerate()
1087 .filter_map(|(idx, field)| {
1088 if idx >= input.num_columns() {
1089 return None;
1090 }
1091 let name = field.name();
1092 if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
1093 Some(idx)
1094 } else {
1095 None
1096 }
1097 })
1098 .collect();
1099
1100 let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
1103
1104 for row_idx in 0..input.num_rows() {
1105 let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
1106 let entry = groups.entry(key.clone());
1107 if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
1108 group_order.push(key.clone());
1109 }
1110 let matched = matched_indices.contains(&row_idx);
1111 entry
1112 .and_modify(|(_, any_matched)| *any_matched |= matched)
1113 .or_insert((row_idx, matched));
1114 }
1115
1116 Ok(group_order
1117 .into_iter()
1118 .filter_map(|key| {
1119 groups
1120 .get(&key)
1121 .and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
1122 })
1123 .collect())
1124}
1125
1126fn compute_optional_group_key(
1127 batch: &RecordBatch,
1128 row_idx: usize,
1129 source_vid_indices: &[usize],
1130) -> DFResult<Vec<u8>> {
1131 let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
1132 for &col_idx in source_vid_indices {
1133 let col = batch.column(col_idx);
1134 let vid_cow = column_as_vid_array(col.as_ref())?;
1135 let arr: &UInt64Array = &vid_cow;
1136 if arr.is_null(row_idx) {
1137 key.extend_from_slice(&u64::MAX.to_le_bytes());
1138 } else {
1139 key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
1140 }
1141 }
1142 Ok(key)
1143}
1144
1145fn build_optional_null_batch_for_rows_with_optional_vars(
1146 input: &RecordBatch,
1147 unmatched_indices: &[usize],
1148 schema: &SchemaRef,
1149 optional_vars: &HashSet<String>,
1150) -> DFResult<RecordBatch> {
1151 if optional_vars.is_empty() {
1152 return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
1153 }
1154
1155 let num_rows = unmatched_indices.len();
1156 let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
1157 let indices_array = UInt64Array::from(indices);
1158
1159 let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
1160 for (col_idx, field) in schema.fields().iter().enumerate() {
1161 if col_idx < input.num_columns() {
1162 if is_optional_column_for_vars(field.name(), optional_vars) {
1163 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1164 } else {
1165 let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
1166 columns.push(taken);
1167 }
1168 } else {
1169 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
1170 }
1171 }
1172
1173 RecordBatch::try_new(schema.clone(), columns)
1174 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
1175}
1176
1177impl Stream for GraphTraverseStream {
1178 type Item = DFResult<RecordBatch>;
1179
1180 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1181 loop {
1182 let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
1183
1184 match state {
1185 TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
1186 Poll::Ready(Ok(())) => {
1187 self.state = TraverseStreamState::Reading;
1188 }
1190 Poll::Ready(Err(e)) => {
1191 self.state = TraverseStreamState::Done;
1192 return Poll::Ready(Some(Err(e)));
1193 }
1194 Poll::Pending => {
1195 self.state = TraverseStreamState::Warming(fut);
1196 return Poll::Pending;
1197 }
1198 },
1199 TraverseStreamState::Reading => {
1200 if let Err(e) = self.graph_ctx.check_timeout() {
1202 return Poll::Ready(Some(Err(
1203 datafusion::error::DataFusionError::Execution(e.to_string()),
1204 )));
1205 }
1206
1207 match self.input.poll_next_unpin(cx) {
1208 Poll::Ready(Some(Ok(batch))) => {
1209 let expansions = match self.expand_neighbors(&batch) {
1211 Ok(exp) => exp,
1212 Err(e) => {
1213 self.state = TraverseStreamState::Reading;
1214 return Poll::Ready(Some(Err(e)));
1215 }
1216 };
1217
1218 if self.target_properties.is_empty() && self.edge_properties.is_empty()
1220 {
1221 let result = build_traverse_output_batch_sync(
1222 &batch,
1223 &expansions,
1224 &self.schema,
1225 self.edge_variable.as_ref(),
1226 &self.graph_ctx,
1227 self.optional,
1228 &self.optional_pattern_vars,
1229 );
1230 self.state = TraverseStreamState::Reading;
1231 if let Ok(ref r) = result {
1232 self.metrics.record_output(r.num_rows());
1233 }
1234 return Poll::Ready(Some(result));
1235 }
1236
1237 let schema = self.schema.clone();
1239 let edge_variable = self.edge_variable.clone();
1240 let edge_properties = self.edge_properties.clone();
1241 let edge_type_ids = self.edge_type_ids.clone();
1242 let target_properties = self.target_properties.clone();
1243 let target_label_name = self.target_label_name.clone();
1244 let graph_ctx = self.graph_ctx.clone();
1245
1246 let optional = self.optional;
1247 let optional_pattern_vars = self.optional_pattern_vars.clone();
1248
1249 let fut = build_traverse_output_batch(
1250 batch,
1251 expansions,
1252 schema,
1253 edge_variable,
1254 edge_properties,
1255 edge_type_ids,
1256 target_properties,
1257 target_label_name,
1258 graph_ctx,
1259 optional,
1260 optional_pattern_vars,
1261 );
1262
1263 self.state = TraverseStreamState::Materializing(Box::pin(fut));
1264 }
1266 Poll::Ready(Some(Err(e))) => {
1267 self.state = TraverseStreamState::Done;
1268 return Poll::Ready(Some(Err(e)));
1269 }
1270 Poll::Ready(None) => {
1271 self.state = TraverseStreamState::Done;
1272 return Poll::Ready(None);
1273 }
1274 Poll::Pending => {
1275 self.state = TraverseStreamState::Reading;
1276 return Poll::Pending;
1277 }
1278 }
1279 }
1280 TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
1281 Poll::Ready(Ok(batch)) => {
1282 self.state = TraverseStreamState::Reading;
1283 self.metrics.record_output(batch.num_rows());
1284 return Poll::Ready(Some(Ok(batch)));
1285 }
1286 Poll::Ready(Err(e)) => {
1287 self.state = TraverseStreamState::Done;
1288 return Poll::Ready(Some(Err(e)));
1289 }
1290 Poll::Pending => {
1291 self.state = TraverseStreamState::Materializing(fut);
1292 return Poll::Pending;
1293 }
1294 },
1295 TraverseStreamState::Done => {
1296 return Poll::Ready(None);
1297 }
1298 }
1299 }
1300 }
1301}
1302
1303fn build_traverse_output_batch_sync(
1308 input: &RecordBatch,
1309 expansions: &[(usize, Vid, u64, u32)],
1310 schema: &SchemaRef,
1311 edge_variable: Option<&String>,
1312 graph_ctx: &GraphExecutionContext,
1313 optional: bool,
1314 optional_pattern_vars: &HashSet<String>,
1315) -> DFResult<RecordBatch> {
1316 if expansions.is_empty() {
1317 if !optional {
1318 return Ok(RecordBatch::new_empty(schema.clone()));
1319 }
1320 let unmatched_reps = collect_unmatched_optional_group_rows(
1321 input,
1322 &HashSet::new(),
1323 schema,
1324 optional_pattern_vars,
1325 )?;
1326 if unmatched_reps.is_empty() {
1327 return Ok(RecordBatch::new_empty(schema.clone()));
1328 }
1329 return build_optional_null_batch_for_rows_with_optional_vars(
1330 input,
1331 &unmatched_reps,
1332 schema,
1333 optional_pattern_vars,
1334 );
1335 }
1336
1337 let indices: Vec<u64> = expansions
1338 .iter()
1339 .map(|(idx, _, _, _)| *idx as u64)
1340 .collect();
1341 let indices_array = UInt64Array::from(indices);
1342
1343 let mut columns: Vec<ArrayRef> = Vec::new();
1344 for col in input.columns() {
1345 let expanded = take(col.as_ref(), &indices_array, None)?;
1346 columns.push(expanded);
1347 }
1348
1349 let target_vids: Vec<u64> = expansions
1351 .iter()
1352 .map(|(_, vid, _, _)| vid.as_u64())
1353 .collect();
1354 columns.push(Arc::new(UInt64Array::from(target_vids)));
1355
1356 {
1358 use arrow_array::builder::{ListBuilder, StringBuilder};
1359 let l0_ctx = graph_ctx.l0_context();
1360 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1361 for (_, vid, _, _) in expansions {
1362 let mut row_labels: Vec<String> = Vec::new();
1363 for l0 in l0_ctx.iter_l0_buffers() {
1364 let guard = l0.read();
1365 if let Some(l0_labels) = guard.vertex_labels.get(vid) {
1366 for lbl in l0_labels {
1367 if !row_labels.contains(lbl) {
1368 row_labels.push(lbl.clone());
1369 }
1370 }
1371 }
1372 }
1373 let values = labels_builder.values();
1374 for lbl in &row_labels {
1375 values.append_value(lbl);
1376 }
1377 labels_builder.append(true);
1378 }
1379 columns.push(Arc::new(labels_builder.finish()));
1380 }
1381
1382 if edge_variable.is_some() {
1384 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1385 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1386
1387 let uni_schema = graph_ctx.storage().schema_manager().schema();
1389 let mut type_builder = arrow_array::builder::StringBuilder::new();
1390 for (_, _, _, edge_type_id) in expansions {
1391 if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
1392 type_builder.append_value(&name);
1393 } else {
1394 type_builder.append_null();
1395 }
1396 }
1397 columns.push(Arc::new(type_builder.finish()));
1398 } else {
1399 let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
1401 columns.push(Arc::new(UInt64Array::from(edge_ids)));
1402 }
1403
1404 let expanded_batch = RecordBatch::try_new(schema.clone(), columns)
1405 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
1406
1407 if optional {
1408 let matched_indices: HashSet<usize> =
1409 expansions.iter().map(|(idx, _, _, _)| *idx).collect();
1410 let unmatched = collect_unmatched_optional_group_rows(
1411 input,
1412 &matched_indices,
1413 schema,
1414 optional_pattern_vars,
1415 )?;
1416
1417 if !unmatched.is_empty() {
1418 let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
1419 input,
1420 &unmatched,
1421 schema,
1422 optional_pattern_vars,
1423 )?;
1424 let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
1425 .map_err(|e| {
1426 datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
1427 })?;
1428 return Ok(combined);
1429 }
1430 }
1431
1432 Ok(expanded_batch)
1433}
1434
1435impl RecordBatchStream for GraphTraverseStream {
1436 fn schema(&self) -> SchemaRef {
1437 self.schema.clone()
1438 }
1439}
1440
1441type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
1443
1444pub struct GraphTraverseMainExec {
1467 input: Arc<dyn ExecutionPlan>,
1469
1470 source_column: String,
1472
1473 type_names: Vec<String>,
1476
1477 direction: Direction,
1479
1480 target_variable: String,
1482
1483 edge_variable: Option<String>,
1485
1486 edge_properties: Vec<String>,
1488
1489 target_properties: Vec<String>,
1491
1492 graph_ctx: Arc<GraphExecutionContext>,
1494
1495 optional: bool,
1497
1498 optional_pattern_vars: HashSet<String>,
1500
1501 bound_target_column: Option<String>,
1504
1505 used_edge_columns: Vec<String>,
1508
1509 schema: SchemaRef,
1511
1512 properties: PlanProperties,
1514
1515 metrics: ExecutionPlanMetricsSet,
1517}
1518
1519impl fmt::Debug for GraphTraverseMainExec {
1520 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1521 f.debug_struct("GraphTraverseMainExec")
1522 .field("type_names", &self.type_names)
1523 .field("direction", &self.direction)
1524 .field("target_variable", &self.target_variable)
1525 .field("edge_variable", &self.edge_variable)
1526 .finish()
1527 }
1528}
1529
1530impl GraphTraverseMainExec {
1531 #[expect(clippy::too_many_arguments)]
1533 pub fn new(
1534 input: Arc<dyn ExecutionPlan>,
1535 source_column: impl Into<String>,
1536 type_names: Vec<String>,
1537 direction: Direction,
1538 target_variable: impl Into<String>,
1539 edge_variable: Option<String>,
1540 edge_properties: Vec<String>,
1541 target_properties: Vec<String>,
1542 graph_ctx: Arc<GraphExecutionContext>,
1543 optional: bool,
1544 optional_pattern_vars: HashSet<String>,
1545 bound_target_column: Option<String>,
1546 used_edge_columns: Vec<String>,
1547 ) -> Self {
1548 let source_column = source_column.into();
1549 let target_variable = target_variable.into();
1550
1551 let schema = Self::build_schema(
1553 &input.schema(),
1554 &target_variable,
1555 &edge_variable,
1556 &edge_properties,
1557 &target_properties,
1558 optional,
1559 );
1560
1561 let properties = compute_plan_properties(schema.clone());
1562
1563 Self {
1564 input,
1565 source_column,
1566 type_names,
1567 direction,
1568 target_variable,
1569 edge_variable,
1570 edge_properties,
1571 target_properties,
1572 graph_ctx,
1573 optional,
1574 optional_pattern_vars,
1575 bound_target_column,
1576 used_edge_columns,
1577 schema,
1578 properties,
1579 metrics: ExecutionPlanMetricsSet::new(),
1580 }
1581 }
1582
1583 fn build_schema(
1585 input_schema: &SchemaRef,
1586 target_variable: &str,
1587 edge_variable: &Option<String>,
1588 edge_properties: &[String],
1589 target_properties: &[String],
1590 optional: bool,
1591 ) -> SchemaRef {
1592 let mut fields: Vec<Field> = input_schema
1593 .fields()
1594 .iter()
1595 .map(|f| f.as_ref().clone())
1596 .collect();
1597
1598 let target_vid_name = format!("{}._vid", target_variable);
1600 if input_schema.column_with_name(&target_vid_name).is_none() {
1601 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
1602 }
1603
1604 let target_labels_name = format!("{}._labels", target_variable);
1606 if input_schema.column_with_name(&target_labels_name).is_none() {
1607 fields.push(Field::new(target_labels_name, labels_data_type(), true));
1608 }
1609
1610 if let Some(edge_var) = edge_variable {
1612 fields.push(Field::new(
1613 format!("{}._eid", edge_var),
1614 DataType::UInt64,
1615 optional,
1616 ));
1617
1618 fields.push(Field::new(
1620 format!("{}._type", edge_var),
1621 DataType::Utf8,
1622 true,
1623 ));
1624
1625 for prop in edge_properties {
1629 let col_name = format!("{}.{}", edge_var, prop);
1630 let mut metadata = std::collections::HashMap::new();
1631 metadata.insert("cv_encoded".to_string(), "true".to_string());
1632 fields.push(
1633 Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
1634 );
1635 }
1636 } else {
1637 fields.push(Field::new(
1640 format!("__eid_to_{}", target_variable),
1641 DataType::UInt64,
1642 optional,
1643 ));
1644 }
1645
1646 for prop in target_properties {
1648 fields.push(Field::new(
1649 format!("{}.{}", target_variable, prop),
1650 DataType::LargeBinary,
1651 true,
1652 ));
1653 }
1654
1655 Arc::new(Schema::new(fields))
1656 }
1657}
1658
1659impl DisplayAs for GraphTraverseMainExec {
1660 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1661 write!(
1662 f,
1663 "GraphTraverseMainExec: types={:?}, direction={:?}",
1664 self.type_names, self.direction
1665 )
1666 }
1667}
1668
1669impl ExecutionPlan for GraphTraverseMainExec {
1670 fn name(&self) -> &str {
1671 "GraphTraverseMainExec"
1672 }
1673
1674 fn as_any(&self) -> &dyn Any {
1675 self
1676 }
1677
1678 fn schema(&self) -> SchemaRef {
1679 self.schema.clone()
1680 }
1681
1682 fn properties(&self) -> &PlanProperties {
1683 &self.properties
1684 }
1685
1686 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
1687 vec![&self.input]
1688 }
1689
1690 fn with_new_children(
1691 self: Arc<Self>,
1692 children: Vec<Arc<dyn ExecutionPlan>>,
1693 ) -> DFResult<Arc<dyn ExecutionPlan>> {
1694 if children.len() != 1 {
1695 return Err(datafusion::error::DataFusionError::Plan(
1696 "GraphTraverseMainExec expects exactly one child".to_string(),
1697 ));
1698 }
1699
1700 Ok(Arc::new(Self {
1701 input: children[0].clone(),
1702 source_column: self.source_column.clone(),
1703 type_names: self.type_names.clone(),
1704 direction: self.direction,
1705 target_variable: self.target_variable.clone(),
1706 edge_variable: self.edge_variable.clone(),
1707 edge_properties: self.edge_properties.clone(),
1708 target_properties: self.target_properties.clone(),
1709 graph_ctx: self.graph_ctx.clone(),
1710 optional: self.optional,
1711 optional_pattern_vars: self.optional_pattern_vars.clone(),
1712 bound_target_column: self.bound_target_column.clone(),
1713 used_edge_columns: self.used_edge_columns.clone(),
1714 schema: self.schema.clone(),
1715 properties: self.properties.clone(),
1716 metrics: self.metrics.clone(),
1717 }))
1718 }
1719
1720 fn execute(
1721 &self,
1722 partition: usize,
1723 context: Arc<TaskContext>,
1724 ) -> DFResult<SendableRecordBatchStream> {
1725 let input_stream = self.input.execute(partition, context)?;
1726 let metrics = BaselineMetrics::new(&self.metrics, partition);
1727
1728 Ok(Box::pin(GraphTraverseMainStream::new(
1729 input_stream,
1730 self.source_column.clone(),
1731 self.type_names.clone(),
1732 self.direction,
1733 self.target_variable.clone(),
1734 self.edge_variable.clone(),
1735 self.edge_properties.clone(),
1736 self.target_properties.clone(),
1737 self.graph_ctx.clone(),
1738 self.optional,
1739 self.optional_pattern_vars.clone(),
1740 self.bound_target_column.clone(),
1741 self.used_edge_columns.clone(),
1742 self.schema.clone(),
1743 metrics,
1744 )))
1745 }
1746
1747 fn metrics(&self) -> Option<MetricsSet> {
1748 Some(self.metrics.clone_inner())
1749 }
1750}
1751
1752enum GraphTraverseMainState {
1754 LoadingEdges {
1756 future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
1757 input_stream: SendableRecordBatchStream,
1758 },
1759 Processing {
1761 adjacency: EdgeAdjacencyMap,
1762 input_stream: SendableRecordBatchStream,
1763 },
1764 Done,
1766}
1767
1768struct GraphTraverseMainStream {
1770 source_column: String,
1772
1773 target_variable: String,
1775
1776 edge_variable: Option<String>,
1778
1779 edge_properties: Vec<String>,
1781
1782 target_properties: Vec<String>,
1784
1785 graph_ctx: Arc<GraphExecutionContext>,
1787
1788 optional: bool,
1790
1791 optional_pattern_vars: HashSet<String>,
1793
1794 bound_target_column: Option<String>,
1796
1797 used_edge_columns: Vec<String>,
1799
1800 schema: SchemaRef,
1802
1803 state: GraphTraverseMainState,
1805
1806 metrics: BaselineMetrics,
1808}
1809
1810impl GraphTraverseMainStream {
1811 #[expect(clippy::too_many_arguments)]
1813 fn new(
1814 input_stream: SendableRecordBatchStream,
1815 source_column: String,
1816 type_names: Vec<String>,
1817 direction: Direction,
1818 target_variable: String,
1819 edge_variable: Option<String>,
1820 edge_properties: Vec<String>,
1821 target_properties: Vec<String>,
1822 graph_ctx: Arc<GraphExecutionContext>,
1823 optional: bool,
1824 optional_pattern_vars: HashSet<String>,
1825 bound_target_column: Option<String>,
1826 used_edge_columns: Vec<String>,
1827 schema: SchemaRef,
1828 metrics: BaselineMetrics,
1829 ) -> Self {
1830 let loading_ctx = graph_ctx.clone();
1832 let loading_types = type_names.clone();
1833 let fut =
1834 async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
1835
1836 Self {
1837 source_column,
1838 target_variable,
1839 edge_variable,
1840 edge_properties,
1841 target_properties,
1842 graph_ctx,
1843 optional,
1844 optional_pattern_vars,
1845 bound_target_column,
1846 used_edge_columns,
1847 schema,
1848 state: GraphTraverseMainState::LoadingEdges {
1849 future: Box::pin(fut),
1850 input_stream,
1851 },
1852 metrics,
1853 }
1854 }
1855
1856 fn expand_batch(
1858 &self,
1859 input: &RecordBatch,
1860 adjacency: &EdgeAdjacencyMap,
1861 ) -> DFResult<RecordBatch> {
1862 let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
1864 datafusion::error::DataFusionError::Execution(format!(
1865 "Source column {} not found",
1866 self.source_column
1867 ))
1868 })?;
1869
1870 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
1871 let source_vids: &UInt64Array = &source_vid_cow;
1872
1873 let bound_target_cow = self
1875 .bound_target_column
1876 .as_ref()
1877 .and_then(|col| input.column_by_name(col))
1878 .map(|c| column_as_vid_array(c.as_ref()))
1879 .transpose()?;
1880 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
1881
1882 let used_edge_arrays: Vec<&UInt64Array> = self
1884 .used_edge_columns
1885 .iter()
1886 .filter_map(|col| {
1887 input
1888 .column_by_name(col)
1889 .and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
1890 })
1891 .collect();
1892
1893 type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
1895 let mut expansions: Vec<Expansion> = Vec::new();
1896
1897 for (row_idx, src_u64) in source_vids.iter().enumerate() {
1898 if let Some(src_u64) = src_u64 {
1899 let src_vid = Vid::from(src_u64);
1900
1901 let used_eids: HashSet<u64> = used_edge_arrays
1903 .iter()
1904 .filter_map(|arr| {
1905 if arr.is_null(row_idx) {
1906 None
1907 } else {
1908 Some(arr.value(row_idx))
1909 }
1910 })
1911 .collect();
1912
1913 if let Some(neighbors) = adjacency.get(&src_vid) {
1914 for (target_vid, eid, edge_type, props) in neighbors {
1915 if used_eids.contains(&eid.as_u64()) {
1917 continue;
1918 }
1919
1920 if let Some(targets) = expected_targets {
1923 if targets.is_null(row_idx) {
1924 continue;
1925 }
1926 let expected_vid = targets.value(row_idx);
1927 if target_vid.as_u64() != expected_vid {
1928 continue;
1929 }
1930 }
1931
1932 expansions.push((
1933 row_idx,
1934 *target_vid,
1935 *eid,
1936 edge_type.clone(),
1937 props.clone(),
1938 ));
1939 }
1940 }
1941 }
1942 }
1943
1944 if expansions.is_empty() && self.optional {
1946 let all_indices: Vec<usize> = (0..input.num_rows()).collect();
1948 return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
1949 }
1950
1951 if expansions.is_empty() {
1952 return Ok(RecordBatch::new_empty(self.schema.clone()));
1954 }
1955
1956 let matched_rows: HashSet<usize> = if self.optional {
1958 expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
1959 } else {
1960 HashSet::new()
1961 };
1962
1963 let mut columns: Vec<ArrayRef> = Vec::new();
1965 let indices: Vec<u64> = expansions
1966 .iter()
1967 .map(|(idx, _, _, _, _)| *idx as u64)
1968 .collect();
1969 let indices_array = UInt64Array::from(indices);
1970
1971 for col in input.columns() {
1972 let expanded = take(col.as_ref(), &indices_array, None)?;
1973 columns.push(expanded);
1974 }
1975
1976 let target_vid_name = format!("{}._vid", self.target_variable);
1978 let target_vids: Vec<u64> = expansions
1979 .iter()
1980 .map(|(_, vid, _, _, _)| vid.as_u64())
1981 .collect();
1982 if input.schema().column_with_name(&target_vid_name).is_none() {
1983 columns.push(Arc::new(UInt64Array::from(target_vids)));
1984 }
1985
1986 let target_labels_name = format!("{}._labels", self.target_variable);
1988 if input
1989 .schema()
1990 .column_with_name(&target_labels_name)
1991 .is_none()
1992 {
1993 use arrow_array::builder::{ListBuilder, StringBuilder};
1994 let l0_ctx = self.graph_ctx.l0_context();
1995 let mut labels_builder = ListBuilder::new(StringBuilder::new());
1996 for (_, target_vid, _, _, _) in &expansions {
1997 let mut row_labels: Vec<String> = Vec::new();
1998 for l0 in l0_ctx.iter_l0_buffers() {
1999 let guard = l0.read();
2000 if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
2001 for lbl in l0_labels {
2002 if !row_labels.contains(lbl) {
2003 row_labels.push(lbl.clone());
2004 }
2005 }
2006 }
2007 }
2008 let values = labels_builder.values();
2009 for lbl in &row_labels {
2010 values.append_value(lbl);
2011 }
2012 labels_builder.append(true);
2013 }
2014 columns.push(Arc::new(labels_builder.finish()));
2015 }
2016
2017 if self.edge_variable.is_some() {
2020 let eids: Vec<u64> = expansions
2022 .iter()
2023 .map(|(_, _, eid, _, _)| eid.as_u64())
2024 .collect();
2025 columns.push(Arc::new(UInt64Array::from(eids)));
2026
2027 {
2029 let mut type_builder = arrow_array::builder::StringBuilder::new();
2030 for (_, _, _, edge_type, _) in &expansions {
2031 type_builder.append_value(edge_type);
2032 }
2033 columns.push(Arc::new(type_builder.finish()));
2034 }
2035
2036 for prop_name in &self.edge_properties {
2038 use crate::query::df_graph::scan::encode_cypher_value;
2039 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2040 if prop_name == "_all_props" {
2041 for (_, _, _, _, props) in &expansions {
2043 if props.is_empty() {
2044 builder.append_null();
2045 } else {
2046 let mut json_map = serde_json::Map::new();
2047 for (k, v) in props.iter() {
2048 let json_val: serde_json::Value = v.clone().into();
2049 json_map.insert(k.clone(), json_val);
2050 }
2051 let json = serde_json::Value::Object(json_map);
2052 match encode_cypher_value(&json) {
2053 Ok(bytes) => builder.append_value(bytes),
2054 Err(_) => builder.append_null(),
2055 }
2056 }
2057 }
2058 } else {
2059 for (_, _, _, _, props) in &expansions {
2061 match props.get(prop_name) {
2062 Some(uni_common::Value::Null) | None => builder.append_null(),
2063 Some(val) => {
2064 let json_val: serde_json::Value = val.clone().into();
2065 match encode_cypher_value(&json_val) {
2066 Ok(bytes) => builder.append_value(bytes),
2067 Err(_) => builder.append_null(),
2068 }
2069 }
2070 }
2071 }
2072 }
2073 columns.push(Arc::new(builder.finish()));
2074 }
2075 } else {
2076 let eids: Vec<u64> = expansions
2077 .iter()
2078 .map(|(_, _, eid, _, _)| eid.as_u64())
2079 .collect();
2080 columns.push(Arc::new(UInt64Array::from(eids)));
2081 }
2082
2083 {
2085 use crate::query::df_graph::scan::encode_cypher_value;
2086 let l0_ctx = self.graph_ctx.l0_context();
2087
2088 for prop_name in &self.target_properties {
2089 if prop_name == "_all_props" {
2090 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2092 for (_, target_vid, _, _, _) in &expansions {
2093 let mut merged_props = serde_json::Map::new();
2094 for l0 in l0_ctx.iter_l0_buffers() {
2095 let guard = l0.read();
2096 if let Some(props) = guard.vertex_properties.get(target_vid) {
2097 for (k, v) in props.iter() {
2098 let json_val: serde_json::Value = v.clone().into();
2099 merged_props.insert(k.to_string(), json_val);
2100 }
2101 }
2102 }
2103 if merged_props.is_empty() {
2104 builder.append_null();
2105 } else {
2106 let json = serde_json::Value::Object(merged_props);
2107 match encode_cypher_value(&json) {
2108 Ok(bytes) => builder.append_value(bytes),
2109 Err(_) => builder.append_null(),
2110 }
2111 }
2112 }
2113 columns.push(Arc::new(builder.finish()));
2114 } else {
2115 let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
2117 for (_, target_vid, _, _, _) in &expansions {
2118 let mut found = false;
2119 for l0 in l0_ctx.iter_l0_buffers() {
2120 let guard = l0.read();
2121 if let Some(props) = guard.vertex_properties.get(target_vid)
2122 && let Some(val) = props.get(prop_name.as_str())
2123 && !val.is_null()
2124 {
2125 let json_val: serde_json::Value = val.clone().into();
2126 if let Ok(bytes) = encode_cypher_value(&json_val) {
2127 builder.append_value(bytes);
2128 found = true;
2129 break;
2130 }
2131 }
2132 }
2133 if !found {
2134 builder.append_null();
2135 }
2136 }
2137 columns.push(Arc::new(builder.finish()));
2138 }
2139 }
2140 }
2141
2142 let matched_batch = RecordBatch::try_new(self.schema.clone(), columns)
2143 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
2144
2145 if self.optional {
2147 let unmatched = collect_unmatched_optional_group_rows(
2148 input,
2149 &matched_rows,
2150 &self.schema,
2151 &self.optional_pattern_vars,
2152 )?;
2153
2154 if unmatched.is_empty() {
2155 return Ok(matched_batch);
2156 }
2157
2158 let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
2159 input,
2160 &unmatched,
2161 &self.schema,
2162 &self.optional_pattern_vars,
2163 )?;
2164
2165 use arrow::compute::concat_batches;
2167 concat_batches(&self.schema, &[matched_batch, unmatched_batch])
2168 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
2169 } else {
2170 Ok(matched_batch)
2171 }
2172 }
2173}
2174
2175async fn build_edge_adjacency_map(
2181 graph_ctx: &GraphExecutionContext,
2182 type_names: &[String],
2183 direction: Direction,
2184) -> DFResult<EdgeAdjacencyMap> {
2185 use uni_store::storage::main_edge::MainEdgeDataset;
2186
2187 let storage = graph_ctx.storage();
2188 let l0_ctx = graph_ctx.l0_context();
2189 let lancedb_store = storage.lancedb_store();
2190
2191 let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
2193 let edges_with_type = MainEdgeDataset::find_edges_by_type_names(lancedb_store, &type_refs)
2194 .await
2195 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
2196
2197 let mut edges: Vec<(
2199 uni_common::Eid,
2200 uni_common::Vid,
2201 uni_common::Vid,
2202 String,
2203 uni_common::Properties,
2204 )> = edges_with_type.into_iter().collect();
2205
2206 for l0 in l0_ctx.iter_l0_buffers() {
2208 let l0_guard = l0.read();
2209
2210 for type_name in type_names {
2211 let l0_eids = l0_guard.eids_for_type(type_name);
2212
2213 for &eid in &l0_eids {
2215 if let Some(edge_ref) = l0_guard.graph.edge(eid) {
2216 let src_vid = edge_ref.src_vid;
2217 let dst_vid = edge_ref.dst_vid;
2218
2219 let props = l0_guard
2221 .edge_properties
2222 .get(&eid)
2223 .cloned()
2224 .unwrap_or_default();
2225
2226 edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
2227 }
2228 }
2229 }
2230 }
2231
2232 let mut seen_eids = HashSet::new();
2234 let mut unique_edges = Vec::new();
2235 for edge in edges.into_iter().rev() {
2236 if seen_eids.insert(edge.0) {
2237 unique_edges.push(edge);
2238 }
2239 }
2240 unique_edges.reverse();
2241
2242 let mut tombstoned_eids = HashSet::new();
2244 for l0 in l0_ctx.iter_l0_buffers() {
2245 let l0_guard = l0.read();
2246 for eid in l0_guard.tombstones.keys() {
2247 tombstoned_eids.insert(*eid);
2248 }
2249 }
2250 if !tombstoned_eids.is_empty() {
2251 unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
2252 }
2253
2254 let mut adjacency: EdgeAdjacencyMap = HashMap::new();
2256
2257 for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
2258 match direction {
2259 Direction::Outgoing => {
2260 adjacency
2261 .entry(src_vid)
2262 .or_default()
2263 .push((dst_vid, eid, edge_type, props));
2264 }
2265 Direction::Incoming => {
2266 adjacency
2267 .entry(dst_vid)
2268 .or_default()
2269 .push((src_vid, eid, edge_type, props));
2270 }
2271 Direction::Both => {
2272 adjacency.entry(src_vid).or_default().push((
2273 dst_vid,
2274 eid,
2275 edge_type.clone(),
2276 props.clone(),
2277 ));
2278 adjacency
2279 .entry(dst_vid)
2280 .or_default()
2281 .push((src_vid, eid, edge_type, props));
2282 }
2283 }
2284 }
2285
2286 Ok(adjacency)
2287}
2288
2289impl Stream for GraphTraverseMainStream {
2290 type Item = DFResult<RecordBatch>;
2291
2292 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
2293 loop {
2294 let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
2295
2296 match state {
2297 GraphTraverseMainState::LoadingEdges {
2298 mut future,
2299 input_stream,
2300 } => match future.as_mut().poll(cx) {
2301 Poll::Ready(Ok(adjacency)) => {
2302 self.state = GraphTraverseMainState::Processing {
2304 adjacency,
2305 input_stream,
2306 };
2307 }
2309 Poll::Ready(Err(e)) => {
2310 self.state = GraphTraverseMainState::Done;
2311 return Poll::Ready(Some(Err(e)));
2312 }
2313 Poll::Pending => {
2314 self.state = GraphTraverseMainState::LoadingEdges {
2315 future,
2316 input_stream,
2317 };
2318 return Poll::Pending;
2319 }
2320 },
2321 GraphTraverseMainState::Processing {
2322 adjacency,
2323 mut input_stream,
2324 } => {
2325 if let Err(e) = self.graph_ctx.check_timeout() {
2327 return Poll::Ready(Some(Err(
2328 datafusion::error::DataFusionError::Execution(e.to_string()),
2329 )));
2330 }
2331
2332 match input_stream.poll_next_unpin(cx) {
2333 Poll::Ready(Some(Ok(batch))) => {
2334 let result = self.expand_batch(&batch, &adjacency);
2336
2337 self.state = GraphTraverseMainState::Processing {
2338 adjacency,
2339 input_stream,
2340 };
2341
2342 if let Ok(ref r) = result {
2343 self.metrics.record_output(r.num_rows());
2344 }
2345 return Poll::Ready(Some(result));
2346 }
2347 Poll::Ready(Some(Err(e))) => {
2348 self.state = GraphTraverseMainState::Done;
2349 return Poll::Ready(Some(Err(e)));
2350 }
2351 Poll::Ready(None) => {
2352 self.state = GraphTraverseMainState::Done;
2353 return Poll::Ready(None);
2354 }
2355 Poll::Pending => {
2356 self.state = GraphTraverseMainState::Processing {
2357 adjacency,
2358 input_stream,
2359 };
2360 return Poll::Pending;
2361 }
2362 }
2363 }
2364 GraphTraverseMainState::Done => {
2365 return Poll::Ready(None);
2366 }
2367 }
2368 }
2369 }
2370}
2371
2372impl RecordBatchStream for GraphTraverseMainStream {
2373 fn schema(&self) -> SchemaRef {
2374 self.schema.clone()
2375 }
2376}
2377
2378pub struct GraphVariableLengthTraverseExec {
2399 input: Arc<dyn ExecutionPlan>,
2401
2402 source_column: String,
2404
2405 edge_type_ids: Vec<u32>,
2407
2408 direction: Direction,
2410
2411 min_hops: usize,
2413
2414 max_hops: usize,
2416
2417 target_variable: String,
2419
2420 step_variable: Option<String>,
2422
2423 path_variable: Option<String>,
2425
2426 target_properties: Vec<String>,
2428
2429 target_label_name: Option<String>,
2431
2432 is_optional: bool,
2434
2435 bound_target_column: Option<String>,
2437
2438 edge_lance_filter: Option<String>,
2440
2441 edge_property_conditions: Vec<(String, UniValue)>,
2444
2445 used_edge_columns: Vec<String>,
2447
2448 path_mode: super::nfa::PathMode,
2450
2451 output_mode: super::nfa::VlpOutputMode,
2453
2454 nfa: Arc<PathNfa>,
2456
2457 graph_ctx: Arc<GraphExecutionContext>,
2459
2460 schema: SchemaRef,
2462
2463 properties: PlanProperties,
2465
2466 metrics: ExecutionPlanMetricsSet,
2468}
2469
2470impl fmt::Debug for GraphVariableLengthTraverseExec {
2471 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2472 f.debug_struct("GraphVariableLengthTraverseExec")
2473 .field("source_column", &self.source_column)
2474 .field("edge_type_ids", &self.edge_type_ids)
2475 .field("direction", &self.direction)
2476 .field("min_hops", &self.min_hops)
2477 .field("max_hops", &self.max_hops)
2478 .field("target_variable", &self.target_variable)
2479 .finish()
2480 }
2481}
2482
2483impl GraphVariableLengthTraverseExec {
2484 #[expect(clippy::too_many_arguments)]
2490 pub fn new(
2491 input: Arc<dyn ExecutionPlan>,
2492 source_column: impl Into<String>,
2493 edge_type_ids: Vec<u32>,
2494 direction: Direction,
2495 min_hops: usize,
2496 max_hops: usize,
2497 target_variable: impl Into<String>,
2498 step_variable: Option<String>,
2499 path_variable: Option<String>,
2500 target_properties: Vec<String>,
2501 target_label_name: Option<String>,
2502 graph_ctx: Arc<GraphExecutionContext>,
2503 is_optional: bool,
2504 bound_target_column: Option<String>,
2505 edge_lance_filter: Option<String>,
2506 edge_property_conditions: Vec<(String, UniValue)>,
2507 used_edge_columns: Vec<String>,
2508 path_mode: super::nfa::PathMode,
2509 output_mode: super::nfa::VlpOutputMode,
2510 qpp_nfa: Option<PathNfa>,
2511 ) -> Self {
2512 let source_column = source_column.into();
2513 let target_variable = target_variable.into();
2514
2515 let uni_schema = graph_ctx.storage().schema_manager().schema();
2517 let label_props = target_label_name
2518 .as_deref()
2519 .and_then(|ln| uni_schema.properties.get(ln));
2520
2521 let schema = Self::build_schema(
2523 input.schema(),
2524 &target_variable,
2525 step_variable.as_deref(),
2526 path_variable.as_deref(),
2527 &target_properties,
2528 label_props,
2529 );
2530 let properties = compute_plan_properties(schema.clone());
2531
2532 let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
2534 PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
2535 }));
2536
2537 Self {
2538 input,
2539 source_column,
2540 edge_type_ids,
2541 direction,
2542 min_hops,
2543 max_hops,
2544 target_variable,
2545 step_variable,
2546 path_variable,
2547 target_properties,
2548 target_label_name,
2549 is_optional,
2550 bound_target_column,
2551 edge_lance_filter,
2552 edge_property_conditions,
2553 used_edge_columns,
2554 path_mode,
2555 output_mode,
2556 nfa,
2557 graph_ctx,
2558 schema,
2559 properties,
2560 metrics: ExecutionPlanMetricsSet::new(),
2561 }
2562 }
2563
2564 fn build_schema(
2566 input_schema: SchemaRef,
2567 target_variable: &str,
2568 step_variable: Option<&str>,
2569 path_variable: Option<&str>,
2570 target_properties: &[String],
2571 label_props: Option<
2572 &std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
2573 >,
2574 ) -> SchemaRef {
2575 let mut fields: Vec<Field> = input_schema
2576 .fields()
2577 .iter()
2578 .map(|f| f.as_ref().clone())
2579 .collect();
2580
2581 let target_vid_name = format!("{}._vid", target_variable);
2583 if input_schema.column_with_name(&target_vid_name).is_none() {
2584 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
2585 }
2586
2587 let target_labels_name = format!("{}._labels", target_variable);
2589 if input_schema.column_with_name(&target_labels_name).is_none() {
2590 fields.push(Field::new(target_labels_name, labels_data_type(), true));
2591 }
2592
2593 for prop_name in target_properties {
2595 let col_name = format!("{}.{}", target_variable, prop_name);
2596 if input_schema.column_with_name(&col_name).is_none() {
2597 let arrow_type = resolve_property_type(prop_name, label_props);
2598 fields.push(Field::new(&col_name, arrow_type, true));
2599 }
2600 }
2601
2602 fields.push(Field::new("_hop_count", DataType::UInt64, false));
2604
2605 if let Some(step_var) = step_variable {
2607 fields.push(build_edge_list_field(step_var));
2608 }
2609
2610 if let Some(path_var) = path_variable
2612 && input_schema.column_with_name(path_var).is_none()
2613 {
2614 fields.push(build_path_struct_field(path_var));
2615 }
2616
2617 Arc::new(Schema::new(fields))
2618 }
2619}
2620
2621impl DisplayAs for GraphVariableLengthTraverseExec {
2622 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2623 write!(
2624 f,
2625 "GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
2626 self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
2627 )
2628 }
2629}
2630
2631impl ExecutionPlan for GraphVariableLengthTraverseExec {
2632 fn name(&self) -> &str {
2633 "GraphVariableLengthTraverseExec"
2634 }
2635
2636 fn as_any(&self) -> &dyn Any {
2637 self
2638 }
2639
2640 fn schema(&self) -> SchemaRef {
2641 self.schema.clone()
2642 }
2643
2644 fn properties(&self) -> &PlanProperties {
2645 &self.properties
2646 }
2647
2648 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
2649 vec![&self.input]
2650 }
2651
2652 fn with_new_children(
2653 self: Arc<Self>,
2654 children: Vec<Arc<dyn ExecutionPlan>>,
2655 ) -> DFResult<Arc<dyn ExecutionPlan>> {
2656 if children.len() != 1 {
2657 return Err(datafusion::error::DataFusionError::Plan(
2658 "GraphVariableLengthTraverseExec requires exactly one child".to_string(),
2659 ));
2660 }
2661
2662 Ok(Arc::new(Self::new(
2664 children[0].clone(),
2665 self.source_column.clone(),
2666 self.edge_type_ids.clone(),
2667 self.direction,
2668 self.min_hops,
2669 self.max_hops,
2670 self.target_variable.clone(),
2671 self.step_variable.clone(),
2672 self.path_variable.clone(),
2673 self.target_properties.clone(),
2674 self.target_label_name.clone(),
2675 self.graph_ctx.clone(),
2676 self.is_optional,
2677 self.bound_target_column.clone(),
2678 self.edge_lance_filter.clone(),
2679 self.edge_property_conditions.clone(),
2680 self.used_edge_columns.clone(),
2681 self.path_mode.clone(),
2682 self.output_mode.clone(),
2683 Some((*self.nfa).clone()),
2684 )))
2685 }
2686
2687 fn execute(
2688 &self,
2689 partition: usize,
2690 context: Arc<TaskContext>,
2691 ) -> DFResult<SendableRecordBatchStream> {
2692 let input_stream = self.input.execute(partition, context)?;
2693
2694 let metrics = BaselineMetrics::new(&self.metrics, partition);
2695
2696 let warm_fut = self
2697 .graph_ctx
2698 .warming_future(self.edge_type_ids.clone(), self.direction);
2699
2700 Ok(Box::pin(GraphVariableLengthTraverseStream {
2701 input: input_stream,
2702 exec: Arc::new(self.clone_for_stream()),
2703 schema: self.schema.clone(),
2704 state: VarLengthStreamState::Warming(warm_fut),
2705 metrics,
2706 }))
2707 }
2708
2709 fn metrics(&self) -> Option<MetricsSet> {
2710 Some(self.metrics.clone_inner())
2711 }
2712}
2713
2714impl GraphVariableLengthTraverseExec {
2715 fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
2717 GraphVariableLengthTraverseExecData {
2718 source_column: self.source_column.clone(),
2719 edge_type_ids: self.edge_type_ids.clone(),
2720 direction: self.direction,
2721 min_hops: self.min_hops,
2722 max_hops: self.max_hops,
2723 target_variable: self.target_variable.clone(),
2724 step_variable: self.step_variable.clone(),
2725 path_variable: self.path_variable.clone(),
2726 target_properties: self.target_properties.clone(),
2727 target_label_name: self.target_label_name.clone(),
2728 is_optional: self.is_optional,
2729 bound_target_column: self.bound_target_column.clone(),
2730 edge_lance_filter: self.edge_lance_filter.clone(),
2731 edge_property_conditions: self.edge_property_conditions.clone(),
2732 used_edge_columns: self.used_edge_columns.clone(),
2733 path_mode: self.path_mode.clone(),
2734 output_mode: self.output_mode.clone(),
2735 nfa: self.nfa.clone(),
2736 graph_ctx: self.graph_ctx.clone(),
2737 }
2738 }
2739}
2740
2741#[allow(dead_code)] struct GraphVariableLengthTraverseExecData {
2744 source_column: String,
2745 edge_type_ids: Vec<u32>,
2746 direction: Direction,
2747 min_hops: usize,
2748 max_hops: usize,
2749 target_variable: String,
2750 step_variable: Option<String>,
2751 path_variable: Option<String>,
2752 target_properties: Vec<String>,
2753 target_label_name: Option<String>,
2754 is_optional: bool,
2755 bound_target_column: Option<String>,
2756 #[allow(dead_code)] edge_lance_filter: Option<String>,
2758 edge_property_conditions: Vec<(String, UniValue)>,
2760 used_edge_columns: Vec<String>,
2761 path_mode: super::nfa::PathMode,
2762 output_mode: super::nfa::VlpOutputMode,
2763 nfa: Arc<PathNfa>,
2764 graph_ctx: Arc<GraphExecutionContext>,
2765}
2766
2767const MAX_FRONTIER_SIZE: usize = 500_000;
2769const MAX_PRED_POOL_SIZE: usize = 2_000_000;
2771
2772impl GraphVariableLengthTraverseExecData {
2773 fn check_target_label(&self, vid: Vid) -> bool {
2775 if let Some(ref label_name) = self.target_label_name {
2776 let query_ctx = self.graph_ctx.query_context();
2777 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2778 Some(labels) => labels.contains(label_name),
2779 None => true, }
2781 } else {
2782 true
2783 }
2784 }
2785
2786 fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
2788 match constraint {
2789 super::nfa::VertexConstraint::Label(label_name) => {
2790 let query_ctx = self.graph_ctx.query_context();
2791 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
2792 Some(labels) => labels.contains(label_name),
2793 None => true, }
2795 }
2796 }
2797 }
2798
2799 fn expand_neighbors(
2802 &self,
2803 vid: Vid,
2804 state: NfaStateId,
2805 eid_filter: &EidFilter,
2806 used_eids: &FxHashSet<u64>,
2807 ) -> Vec<(Vid, Eid, NfaStateId)> {
2808 let is_undirected = matches!(self.direction, Direction::Both);
2809 let mut results = Vec::new();
2810
2811 for transition in self.nfa.transitions_from(state) {
2812 let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
2813
2814 for &etype in &transition.edge_type_ids {
2815 for (neighbor, eid) in
2816 self.graph_ctx
2817 .get_neighbors(vid, etype, transition.direction)
2818 {
2819 if is_undirected && !seen_edges.insert(eid.as_u64()) {
2821 continue;
2822 }
2823
2824 if !eid_filter.contains(eid) {
2826 continue;
2827 }
2828
2829 if !self.edge_property_conditions.is_empty() {
2831 let query_ctx = self.graph_ctx.query_context();
2832 let passes = if let Some(props) =
2833 l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
2834 {
2835 self.edge_property_conditions
2836 .iter()
2837 .all(|(name, expected)| {
2838 props.get(name).is_some_and(|actual| actual == expected)
2839 })
2840 } else {
2841 true
2845 };
2846 if !passes {
2847 continue;
2848 }
2849 }
2850
2851 if used_eids.contains(&eid.as_u64()) {
2853 continue;
2854 }
2855
2856 if let Some(constraint) = self.nfa.state_constraint(transition.to)
2858 && !self.check_state_constraint(neighbor, constraint)
2859 {
2860 continue;
2861 }
2862
2863 results.push((neighbor, eid, transition.to));
2864 }
2865 }
2866 }
2867
2868 results
2869 }
2870
2871 #[allow(clippy::too_many_arguments)]
2876 fn bfs_with_dag(
2877 &self,
2878 source: Vid,
2879 eid_filter: &EidFilter,
2880 used_eids: &FxHashSet<u64>,
2881 vid_filter: &VidFilter,
2882 ) -> Vec<BfsResult> {
2883 let nfa = &self.nfa;
2884 let selector = PathSelector::All;
2885 let mut dag = PredecessorDag::new(selector);
2886 let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
2887
2888 if nfa.is_accepting(nfa.start_state())
2890 && self.check_target_label(source)
2891 && vid_filter.contains(source)
2892 {
2893 accepting.push((source, nfa.start_state(), 0));
2894 }
2895
2896 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2898 let mut depth: u32 = 0;
2899
2900 while !frontier.is_empty() && depth < self.max_hops as u32 {
2901 depth += 1;
2902 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2903 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2904
2905 for &(vid, state) in &frontier {
2906 for (neighbor, eid, dst_state) in
2907 self.expand_neighbors(vid, state, eid_filter, used_eids)
2908 {
2909 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2911
2912 if seen_at_depth.insert((neighbor, dst_state)) {
2914 next_frontier.push((neighbor, dst_state));
2915
2916 if nfa.is_accepting(dst_state)
2918 && self.check_target_label(neighbor)
2919 && vid_filter.contains(neighbor)
2920 {
2921 accepting.push((neighbor, dst_state, depth));
2922 }
2923 }
2924 }
2925 }
2926
2927 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
2929 break;
2930 }
2931
2932 frontier = next_frontier;
2933 }
2934
2935 let mut results: Vec<BfsResult> = Vec::new();
2937 for &(target, state, depth) in &accepting {
2938 dag.enumerate_paths(
2939 source,
2940 target,
2941 state,
2942 depth,
2943 depth,
2944 &self.path_mode,
2945 &mut |nodes, edges| {
2946 results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
2947 std::ops::ControlFlow::Continue(())
2948 },
2949 );
2950 }
2951
2952 results
2953 }
2954
2955 #[allow(clippy::too_many_arguments)]
2960 fn bfs_endpoints_only(
2961 &self,
2962 source: Vid,
2963 eid_filter: &EidFilter,
2964 used_eids: &FxHashSet<u64>,
2965 vid_filter: &VidFilter,
2966 ) -> Vec<(Vid, u32)> {
2967 let nfa = &self.nfa;
2968 let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
2970 let mut results: Vec<(Vid, u32)> = Vec::new();
2971
2972 if nfa.is_accepting(nfa.start_state())
2974 && self.check_target_label(source)
2975 && vid_filter.contains(source)
2976 {
2977 results.push((source, 0));
2978 }
2979
2980 let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
2982 let mut depth: u32 = 0;
2983
2984 while !frontier.is_empty() && depth < self.max_hops as u32 {
2985 depth += 1;
2986 let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
2987 let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
2988
2989 for &(vid, state) in &frontier {
2990 for (neighbor, eid, dst_state) in
2991 self.expand_neighbors(vid, state, eid_filter, used_eids)
2992 {
2993 dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
2994
2995 if seen_at_depth.insert((neighbor, dst_state)) {
2996 next_frontier.push((neighbor, dst_state));
2997
2998 if nfa.is_accepting(dst_state)
3000 && self.check_target_label(neighbor)
3001 && vid_filter.contains(neighbor)
3002 && dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
3003 {
3004 results.push((neighbor, depth));
3005 }
3006 }
3007 }
3008 }
3009
3010 if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
3011 break;
3012 }
3013
3014 frontier = next_frontier;
3015 }
3016
3017 results
3018 }
3019}
3020
3021enum VarLengthStreamState {
3023 Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
3025 Reading,
3027 Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
3029 Done,
3031}
3032
3033struct GraphVariableLengthTraverseStream {
3035 input: SendableRecordBatchStream,
3036 exec: Arc<GraphVariableLengthTraverseExecData>,
3037 schema: SchemaRef,
3038 state: VarLengthStreamState,
3039 metrics: BaselineMetrics,
3040}
3041
3042impl Stream for GraphVariableLengthTraverseStream {
3043 type Item = DFResult<RecordBatch>;
3044
3045 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3046 loop {
3047 let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
3048
3049 match state {
3050 VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
3051 Poll::Ready(Ok(())) => {
3052 self.state = VarLengthStreamState::Reading;
3053 }
3055 Poll::Ready(Err(e)) => {
3056 self.state = VarLengthStreamState::Done;
3057 return Poll::Ready(Some(Err(e)));
3058 }
3059 Poll::Pending => {
3060 self.state = VarLengthStreamState::Warming(fut);
3061 return Poll::Pending;
3062 }
3063 },
3064 VarLengthStreamState::Reading => {
3065 if let Err(e) = self.exec.graph_ctx.check_timeout() {
3067 return Poll::Ready(Some(Err(
3068 datafusion::error::DataFusionError::Execution(e.to_string()),
3069 )));
3070 }
3071
3072 match self.input.poll_next_unpin(cx) {
3073 Poll::Ready(Some(Ok(batch))) => {
3074 let eid_filter = EidFilter::AllAllowed;
3077 let vid_filter = VidFilter::AllAllowed;
3078 let base_result =
3079 self.process_batch_base(batch, &eid_filter, &vid_filter);
3080 let base_batch = match base_result {
3081 Ok(b) => b,
3082 Err(e) => {
3083 self.state = VarLengthStreamState::Reading;
3084 return Poll::Ready(Some(Err(e)));
3085 }
3086 };
3087
3088 if self.exec.target_properties.is_empty() {
3090 self.state = VarLengthStreamState::Reading;
3091 return Poll::Ready(Some(Ok(base_batch)));
3092 }
3093
3094 let schema = self.schema.clone();
3096 let target_variable = self.exec.target_variable.clone();
3097 let target_properties = self.exec.target_properties.clone();
3098 let target_label_name = self.exec.target_label_name.clone();
3099 let graph_ctx = self.exec.graph_ctx.clone();
3100
3101 let fut = hydrate_vlp_target_properties(
3102 base_batch,
3103 schema,
3104 target_variable,
3105 target_properties,
3106 target_label_name,
3107 graph_ctx,
3108 );
3109
3110 self.state = VarLengthStreamState::Materializing(Box::pin(fut));
3111 }
3113 Poll::Ready(Some(Err(e))) => {
3114 self.state = VarLengthStreamState::Done;
3115 return Poll::Ready(Some(Err(e)));
3116 }
3117 Poll::Ready(None) => {
3118 self.state = VarLengthStreamState::Done;
3119 return Poll::Ready(None);
3120 }
3121 Poll::Pending => {
3122 self.state = VarLengthStreamState::Reading;
3123 return Poll::Pending;
3124 }
3125 }
3126 }
3127 VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
3128 Poll::Ready(Ok(batch)) => {
3129 self.state = VarLengthStreamState::Reading;
3130 self.metrics.record_output(batch.num_rows());
3131 return Poll::Ready(Some(Ok(batch)));
3132 }
3133 Poll::Ready(Err(e)) => {
3134 self.state = VarLengthStreamState::Done;
3135 return Poll::Ready(Some(Err(e)));
3136 }
3137 Poll::Pending => {
3138 self.state = VarLengthStreamState::Materializing(fut);
3139 return Poll::Pending;
3140 }
3141 },
3142 VarLengthStreamState::Done => {
3143 return Poll::Ready(None);
3144 }
3145 }
3146 }
3147 }
3148}
3149
3150impl GraphVariableLengthTraverseStream {
3151 fn process_batch_base(
3152 &self,
3153 batch: RecordBatch,
3154 eid_filter: &EidFilter,
3155 vid_filter: &VidFilter,
3156 ) -> DFResult<RecordBatch> {
3157 let source_col = batch
3158 .column_by_name(&self.exec.source_column)
3159 .ok_or_else(|| {
3160 datafusion::error::DataFusionError::Execution(format!(
3161 "Source column '{}' not found",
3162 self.exec.source_column
3163 ))
3164 })?;
3165
3166 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
3167 let source_vids: &UInt64Array = &source_vid_cow;
3168
3169 let bound_target_cow = self
3171 .exec
3172 .bound_target_column
3173 .as_ref()
3174 .and_then(|col| batch.column_by_name(col))
3175 .map(|c| column_as_vid_array(c.as_ref()))
3176 .transpose()?;
3177 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
3178
3179 let used_edge_arrays: Vec<&UInt64Array> = self
3181 .exec
3182 .used_edge_columns
3183 .iter()
3184 .filter_map(|col| {
3185 batch
3186 .column_by_name(col)?
3187 .as_any()
3188 .downcast_ref::<UInt64Array>()
3189 })
3190 .collect();
3191
3192 let mut expansions: Vec<VarLengthExpansion> = Vec::new();
3194
3195 for (row_idx, source_vid) in source_vids.iter().enumerate() {
3196 let mut emitted_for_row = false;
3197
3198 if let Some(src) = source_vid {
3199 let vid = Vid::from(src);
3200
3201 let used_eids: FxHashSet<u64> = used_edge_arrays
3203 .iter()
3204 .filter_map(|arr| {
3205 if arr.is_null(row_idx) {
3206 None
3207 } else {
3208 Some(arr.value(row_idx))
3209 }
3210 })
3211 .collect();
3212
3213 match &self.exec.output_mode {
3215 VlpOutputMode::EndpointsOnly => {
3216 let endpoints = self
3217 .exec
3218 .bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
3219 for (target, depth) in endpoints {
3220 if let Some(targets) = expected_targets {
3222 if targets.is_null(row_idx) {
3223 continue;
3224 }
3225 if target.as_u64() != targets.value(row_idx) {
3226 continue;
3227 }
3228 }
3229 expansions.push((row_idx, target, depth as usize, vec![], vec![]));
3230 emitted_for_row = true;
3231 }
3232 }
3233 _ => {
3234 let bfs_results = self
3236 .exec
3237 .bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
3238 for (target, hop_count, node_path, edge_path) in bfs_results {
3239 if let Some(targets) = expected_targets {
3241 if targets.is_null(row_idx) {
3242 continue;
3243 }
3244 if target.as_u64() != targets.value(row_idx) {
3245 continue;
3246 }
3247 }
3248 expansions.push((row_idx, target, hop_count, node_path, edge_path));
3249 emitted_for_row = true;
3250 }
3251 }
3252 }
3253 }
3254
3255 if self.exec.is_optional && !emitted_for_row {
3256 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
3259 }
3260 }
3261
3262 self.build_output_batch(&batch, &expansions)
3263 }
3264
3265 fn build_output_batch(
3266 &self,
3267 input: &RecordBatch,
3268 expansions: &[VarLengthExpansion],
3269 ) -> DFResult<RecordBatch> {
3270 if expansions.is_empty() {
3271 return Ok(RecordBatch::new_empty(self.schema.clone()));
3272 }
3273
3274 let num_rows = expansions.len();
3275
3276 let indices: Vec<u64> = expansions
3278 .iter()
3279 .map(|(idx, _, _, _, _)| *idx as u64)
3280 .collect();
3281 let indices_array = UInt64Array::from(indices);
3282
3283 let mut columns: Vec<ArrayRef> = Vec::new();
3285 for col in input.columns() {
3286 let expanded = take(col.as_ref(), &indices_array, None)?;
3287 columns.push(expanded);
3288 }
3289
3290 let unmatched_rows: Vec<bool> = expansions
3294 .iter()
3295 .map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
3296 .collect();
3297 let target_vids: Vec<Option<u64>> = expansions
3298 .iter()
3299 .zip(unmatched_rows.iter())
3300 .map(
3301 |((_, vid, _, _, _), unmatched)| {
3302 if *unmatched { None } else { Some(vid.as_u64()) }
3303 },
3304 )
3305 .collect();
3306
3307 let target_vid_name = format!("{}._vid", self.exec.target_variable);
3309 if input.schema().column_with_name(&target_vid_name).is_none() {
3310 columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
3311 }
3312
3313 let target_labels_name = format!("{}._labels", self.exec.target_variable);
3315 if input
3316 .schema()
3317 .column_with_name(&target_labels_name)
3318 .is_none()
3319 {
3320 use arrow_array::builder::{ListBuilder, StringBuilder};
3321 let query_ctx = self.exec.graph_ctx.query_context();
3322 let mut labels_builder = ListBuilder::new(StringBuilder::new());
3323 for target_vid in &target_vids {
3324 let Some(vid_u64) = target_vid else {
3325 labels_builder.append(false);
3326 continue;
3327 };
3328 let vid = Vid::from(*vid_u64);
3329 let row_labels: Vec<String> =
3330 match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
3331 Some(labels) => {
3332 labels
3334 }
3335 None => {
3336 if let Some(ref label_name) = self.exec.target_label_name {
3338 vec![label_name.clone()]
3339 } else {
3340 vec![]
3341 }
3342 }
3343 };
3344 let values = labels_builder.values();
3345 for lbl in &row_labels {
3346 values.append_value(lbl);
3347 }
3348 labels_builder.append(true);
3349 }
3350 columns.push(Arc::new(labels_builder.finish()));
3351 }
3352
3353 for prop_name in &self.exec.target_properties {
3355 let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
3356 if input.schema().column_with_name(&full_prop_name).is_none() {
3357 let col_idx = columns.len();
3358 if col_idx < self.schema.fields().len() {
3359 let field = self.schema.field(col_idx);
3360 columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
3361 }
3362 }
3363 }
3364
3365 let hop_counts: Vec<u64> = expansions
3367 .iter()
3368 .map(|(_, _, hops, _, _)| *hops as u64)
3369 .collect();
3370 columns.push(Arc::new(UInt64Array::from(hop_counts)));
3371
3372 if self.exec.step_variable.is_some() {
3374 let mut edges_builder = new_edge_list_builder();
3375 let query_ctx = self.exec.graph_ctx.query_context();
3376
3377 for (_, _, _, node_path, edge_path) in expansions {
3378 if node_path.is_empty() && edge_path.is_empty() {
3379 edges_builder.append_null();
3381 } else if edge_path.is_empty() {
3382 edges_builder.append(true);
3384 } else {
3385 for (i, eid) in edge_path.iter().enumerate() {
3386 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3387 .unwrap_or_else(|| "UNKNOWN".to_string());
3388 append_edge_to_struct(
3389 edges_builder.values(),
3390 *eid,
3391 &type_name,
3392 node_path[i].as_u64(),
3393 node_path[i + 1].as_u64(),
3394 &query_ctx,
3395 );
3396 }
3397 edges_builder.append(true);
3398 }
3399 }
3400
3401 columns.push(Arc::new(edges_builder.finish()));
3402 }
3403
3404 if let Some(path_var_name) = &self.exec.path_variable {
3409 let existing_path_col_idx = input
3410 .schema()
3411 .column_with_name(path_var_name)
3412 .map(|(idx, _)| idx);
3413 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
3415 let existing_path = existing_path_arc
3416 .as_ref()
3417 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
3418
3419 let mut nodes_builder = new_node_list_builder();
3420 let mut rels_builder = new_edge_list_builder();
3421 let query_ctx = self.exec.graph_ctx.query_context();
3422 let mut path_validity = Vec::with_capacity(expansions.len());
3423
3424 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
3425 if node_path.is_empty() && edge_path.is_empty() {
3426 nodes_builder.append(false);
3427 rels_builder.append(false);
3428 path_validity.push(false);
3429 continue;
3430 }
3431
3432 let skip_first_vlp_node = if let Some(existing) = existing_path {
3434 if !existing.is_null(row_out_idx) {
3435 prepend_existing_path(
3436 existing,
3437 row_out_idx,
3438 &mut nodes_builder,
3439 &mut rels_builder,
3440 &query_ctx,
3441 );
3442 true
3443 } else {
3444 false
3445 }
3446 } else {
3447 false
3448 };
3449
3450 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
3452 for vid in &node_path[start_idx..] {
3453 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
3454 }
3455 nodes_builder.append(true);
3456
3457 for (i, eid) in edge_path.iter().enumerate() {
3458 let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
3459 .unwrap_or_else(|| "UNKNOWN".to_string());
3460 append_edge_to_struct(
3461 rels_builder.values(),
3462 *eid,
3463 &type_name,
3464 node_path[i].as_u64(),
3465 node_path[i + 1].as_u64(),
3466 &query_ctx,
3467 );
3468 }
3469 rels_builder.append(true);
3470 path_validity.push(true);
3471 }
3472
3473 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
3475 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
3476
3477 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
3479 let rels_field = Arc::new(Field::new(
3480 "relationships",
3481 rels_array.data_type().clone(),
3482 true,
3483 ));
3484
3485 let path_struct = arrow_array::StructArray::try_new(
3487 vec![nodes_field, rels_field].into(),
3488 vec![nodes_array, rels_array],
3489 Some(arrow::buffer::NullBuffer::from(path_validity)),
3490 )
3491 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
3492
3493 if let Some(idx) = existing_path_col_idx {
3494 columns[idx] = Arc::new(path_struct);
3495 } else {
3496 columns.push(Arc::new(path_struct));
3497 }
3498 }
3499
3500 self.metrics.record_output(num_rows);
3501
3502 RecordBatch::try_new(self.schema.clone(), columns)
3503 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
3504 }
3505}
3506
3507impl RecordBatchStream for GraphVariableLengthTraverseStream {
3508 fn schema(&self) -> SchemaRef {
3509 self.schema.clone()
3510 }
3511}
3512
3513async fn hydrate_vlp_target_properties(
3518 base_batch: RecordBatch,
3519 schema: SchemaRef,
3520 target_variable: String,
3521 target_properties: Vec<String>,
3522 target_label_name: Option<String>,
3523 graph_ctx: Arc<GraphExecutionContext>,
3524) -> DFResult<RecordBatch> {
3525 if base_batch.num_rows() == 0 || target_properties.is_empty() {
3526 return Ok(base_batch);
3527 }
3528
3529 let target_vid_col_name = format!("{}._vid", target_variable);
3536 let vid_col_idx = schema
3537 .fields()
3538 .iter()
3539 .enumerate()
3540 .rev()
3541 .find(|(_, f)| f.name() == &target_vid_col_name)
3542 .map(|(i, _)| i);
3543
3544 let Some(vid_col_idx) = vid_col_idx else {
3545 return Ok(base_batch);
3546 };
3547
3548 let vid_col = base_batch.column(vid_col_idx);
3549 let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
3550 let target_vid_array: &UInt64Array = &target_vid_cow;
3551
3552 let target_vids: Vec<Vid> = target_vid_array
3553 .iter()
3554 .map(|v| Vid::from(v.unwrap_or(u64::MAX)))
3557 .collect();
3558
3559 let mut property_columns: Vec<ArrayRef> = Vec::new();
3561
3562 if let Some(ref label_name) = target_label_name {
3563 let property_manager = graph_ctx.property_manager();
3564 let query_ctx = graph_ctx.query_context();
3565
3566 let props_map = property_manager
3567 .get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
3568 .await
3569 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
3570
3571 let uni_schema = graph_ctx.storage().schema_manager().schema();
3572 let label_props = uni_schema.properties.get(label_name.as_str());
3573
3574 for prop_name in &target_properties {
3575 let data_type = resolve_property_type(prop_name, label_props);
3576 let column =
3577 build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
3578 property_columns.push(column);
3579 }
3580 } else {
3581 let non_internal_props: Vec<&str> = target_properties
3584 .iter()
3585 .filter(|p| *p != "_all_props")
3586 .map(|s| s.as_str())
3587 .collect();
3588 let property_manager = graph_ctx.property_manager();
3589 let query_ctx = graph_ctx.query_context();
3590
3591 let props_map = if !non_internal_props.is_empty() {
3592 property_manager
3593 .get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
3594 .await
3595 .map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
3596 } else {
3597 std::collections::HashMap::new()
3598 };
3599
3600 for prop_name in &target_properties {
3601 if prop_name == "_all_props" {
3602 use crate::query::df_graph::scan::encode_cypher_value;
3604 use arrow_array::builder::LargeBinaryBuilder;
3605
3606 let mut builder = LargeBinaryBuilder::new();
3607 let l0_ctx = graph_ctx.l0_context();
3608 for vid in &target_vids {
3609 let mut merged_props = serde_json::Map::new();
3610 if let Some(vid_props) = props_map.get(vid) {
3612 for (k, v) in vid_props.iter() {
3613 let json_val: serde_json::Value = v.clone().into();
3614 merged_props.insert(k.to_string(), json_val);
3615 }
3616 }
3617 for l0 in l0_ctx.iter_l0_buffers() {
3619 let guard = l0.read();
3620 if let Some(l0_props) = guard.vertex_properties.get(vid) {
3621 for (k, v) in l0_props.iter() {
3622 let json_val: serde_json::Value = v.clone().into();
3623 merged_props.insert(k.to_string(), json_val);
3624 }
3625 }
3626 }
3627 if merged_props.is_empty() {
3628 builder.append_null();
3629 } else {
3630 let json = serde_json::Value::Object(merged_props);
3631 match encode_cypher_value(&json) {
3632 Ok(bytes) => builder.append_value(bytes),
3633 Err(_) => builder.append_null(),
3634 }
3635 }
3636 }
3637 property_columns.push(Arc::new(builder.finish()));
3638 } else {
3639 let column = build_property_column_static(
3640 &target_vids,
3641 &props_map,
3642 prop_name,
3643 &arrow::datatypes::DataType::LargeBinary,
3644 )?;
3645 property_columns.push(column);
3646 }
3647 }
3648 }
3649
3650 let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
3656 let mut prop_idx = 0;
3657 for (col_idx, field) in schema.fields().iter().enumerate() {
3658 let is_target_prop = col_idx > vid_col_idx
3659 && target_properties
3660 .iter()
3661 .any(|p| *field.name() == format!("{}.{}", target_variable, p));
3662 if is_target_prop && prop_idx < property_columns.len() {
3663 new_columns.push(property_columns[prop_idx].clone());
3664 prop_idx += 1;
3665 } else {
3666 new_columns.push(base_batch.column(col_idx).clone());
3667 }
3668 }
3669
3670 RecordBatch::try_new(schema, new_columns)
3671 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
3672}
3673
3674pub struct GraphVariableLengthTraverseMainExec {
3684 input: Arc<dyn ExecutionPlan>,
3686
3687 source_column: String,
3689
3690 type_names: Vec<String>,
3692
3693 direction: Direction,
3695
3696 min_hops: usize,
3698
3699 max_hops: usize,
3701
3702 target_variable: String,
3704
3705 step_variable: Option<String>,
3707
3708 path_variable: Option<String>,
3710
3711 target_properties: Vec<String>,
3713
3714 is_optional: bool,
3716
3717 bound_target_column: Option<String>,
3719
3720 edge_lance_filter: Option<String>,
3722
3723 edge_property_conditions: Vec<(String, UniValue)>,
3726
3727 used_edge_columns: Vec<String>,
3729
3730 path_mode: super::nfa::PathMode,
3732
3733 output_mode: super::nfa::VlpOutputMode,
3735
3736 graph_ctx: Arc<GraphExecutionContext>,
3738
3739 schema: SchemaRef,
3741
3742 properties: PlanProperties,
3744
3745 metrics: ExecutionPlanMetricsSet,
3747}
3748
3749impl fmt::Debug for GraphVariableLengthTraverseMainExec {
3750 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3751 f.debug_struct("GraphVariableLengthTraverseMainExec")
3752 .field("source_column", &self.source_column)
3753 .field("type_names", &self.type_names)
3754 .field("direction", &self.direction)
3755 .field("min_hops", &self.min_hops)
3756 .field("max_hops", &self.max_hops)
3757 .field("target_variable", &self.target_variable)
3758 .finish()
3759 }
3760}
3761
3762impl GraphVariableLengthTraverseMainExec {
3763 #[expect(clippy::too_many_arguments)]
3765 pub fn new(
3766 input: Arc<dyn ExecutionPlan>,
3767 source_column: impl Into<String>,
3768 type_names: Vec<String>,
3769 direction: Direction,
3770 min_hops: usize,
3771 max_hops: usize,
3772 target_variable: impl Into<String>,
3773 step_variable: Option<String>,
3774 path_variable: Option<String>,
3775 target_properties: Vec<String>,
3776 graph_ctx: Arc<GraphExecutionContext>,
3777 is_optional: bool,
3778 bound_target_column: Option<String>,
3779 edge_lance_filter: Option<String>,
3780 edge_property_conditions: Vec<(String, UniValue)>,
3781 used_edge_columns: Vec<String>,
3782 path_mode: super::nfa::PathMode,
3783 output_mode: super::nfa::VlpOutputMode,
3784 ) -> Self {
3785 let source_column = source_column.into();
3786 let target_variable = target_variable.into();
3787
3788 let schema = Self::build_schema(
3790 input.schema(),
3791 &target_variable,
3792 step_variable.as_deref(),
3793 path_variable.as_deref(),
3794 &target_properties,
3795 );
3796 let properties = compute_plan_properties(schema.clone());
3797
3798 Self {
3799 input,
3800 source_column,
3801 type_names,
3802 direction,
3803 min_hops,
3804 max_hops,
3805 target_variable,
3806 step_variable,
3807 path_variable,
3808 target_properties,
3809 is_optional,
3810 bound_target_column,
3811 edge_lance_filter,
3812 edge_property_conditions,
3813 used_edge_columns,
3814 path_mode,
3815 output_mode,
3816 graph_ctx,
3817 schema,
3818 properties,
3819 metrics: ExecutionPlanMetricsSet::new(),
3820 }
3821 }
3822
3823 fn build_schema(
3825 input_schema: SchemaRef,
3826 target_variable: &str,
3827 step_variable: Option<&str>,
3828 path_variable: Option<&str>,
3829 target_properties: &[String],
3830 ) -> SchemaRef {
3831 let mut fields: Vec<Field> = input_schema
3832 .fields()
3833 .iter()
3834 .map(|f| f.as_ref().clone())
3835 .collect();
3836
3837 let target_vid_name = format!("{}._vid", target_variable);
3839 if input_schema.column_with_name(&target_vid_name).is_none() {
3840 fields.push(Field::new(target_vid_name, DataType::UInt64, true));
3841 }
3842
3843 let target_labels_name = format!("{}._labels", target_variable);
3845 if input_schema.column_with_name(&target_labels_name).is_none() {
3846 fields.push(Field::new(target_labels_name, labels_data_type(), true));
3847 }
3848
3849 fields.push(Field::new("_hop_count", DataType::UInt64, false));
3851
3852 if let Some(step_var) = step_variable {
3855 fields.push(build_edge_list_field(step_var));
3856 }
3857
3858 if let Some(path_var) = path_variable
3860 && input_schema.column_with_name(path_var).is_none()
3861 {
3862 fields.push(build_path_struct_field(path_var));
3863 }
3864
3865 for prop in target_properties {
3868 let prop_name = format!("{}.{}", target_variable, prop);
3869 if input_schema.column_with_name(&prop_name).is_none() {
3870 fields.push(Field::new(prop_name, DataType::LargeBinary, true));
3871 }
3872 }
3873
3874 Arc::new(Schema::new(fields))
3875 }
3876}
3877
3878impl DisplayAs for GraphVariableLengthTraverseMainExec {
3879 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
3880 write!(
3881 f,
3882 "GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
3883 self.source_column, self.type_names, self.min_hops, self.max_hops
3884 )
3885 }
3886}
3887
3888impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
3889 fn name(&self) -> &str {
3890 "GraphVariableLengthTraverseMainExec"
3891 }
3892
3893 fn as_any(&self) -> &dyn Any {
3894 self
3895 }
3896
3897 fn schema(&self) -> SchemaRef {
3898 self.schema.clone()
3899 }
3900
3901 fn properties(&self) -> &PlanProperties {
3902 &self.properties
3903 }
3904
3905 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
3906 vec![&self.input]
3907 }
3908
3909 fn with_new_children(
3910 self: Arc<Self>,
3911 children: Vec<Arc<dyn ExecutionPlan>>,
3912 ) -> DFResult<Arc<dyn ExecutionPlan>> {
3913 if children.len() != 1 {
3914 return Err(datafusion::error::DataFusionError::Plan(
3915 "GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
3916 ));
3917 }
3918
3919 Ok(Arc::new(Self::new(
3920 children[0].clone(),
3921 self.source_column.clone(),
3922 self.type_names.clone(),
3923 self.direction,
3924 self.min_hops,
3925 self.max_hops,
3926 self.target_variable.clone(),
3927 self.step_variable.clone(),
3928 self.path_variable.clone(),
3929 self.target_properties.clone(),
3930 self.graph_ctx.clone(),
3931 self.is_optional,
3932 self.bound_target_column.clone(),
3933 self.edge_lance_filter.clone(),
3934 self.edge_property_conditions.clone(),
3935 self.used_edge_columns.clone(),
3936 self.path_mode.clone(),
3937 self.output_mode.clone(),
3938 )))
3939 }
3940
3941 fn execute(
3942 &self,
3943 partition: usize,
3944 context: Arc<TaskContext>,
3945 ) -> DFResult<SendableRecordBatchStream> {
3946 let input_stream = self.input.execute(partition, context)?;
3947 let metrics = BaselineMetrics::new(&self.metrics, partition);
3948
3949 let graph_ctx = self.graph_ctx.clone();
3951 let type_names = self.type_names.clone();
3952 let direction = self.direction;
3953 let load_fut =
3954 async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
3955
3956 Ok(Box::pin(GraphVariableLengthTraverseMainStream {
3957 input: input_stream,
3958 source_column: self.source_column.clone(),
3959 type_names: self.type_names.clone(),
3960 direction: self.direction,
3961 min_hops: self.min_hops,
3962 max_hops: self.max_hops,
3963 target_variable: self.target_variable.clone(),
3964 step_variable: self.step_variable.clone(),
3965 path_variable: self.path_variable.clone(),
3966 target_properties: self.target_properties.clone(),
3967 graph_ctx: self.graph_ctx.clone(),
3968 is_optional: self.is_optional,
3969 bound_target_column: self.bound_target_column.clone(),
3970 edge_lance_filter: self.edge_lance_filter.clone(),
3971 edge_property_conditions: self.edge_property_conditions.clone(),
3972 used_edge_columns: self.used_edge_columns.clone(),
3973 path_mode: self.path_mode.clone(),
3974 output_mode: self.output_mode.clone(),
3975 schema: self.schema.clone(),
3976 state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
3977 metrics,
3978 }))
3979 }
3980
3981 fn metrics(&self) -> Option<MetricsSet> {
3982 Some(self.metrics.clone_inner())
3983 }
3984}
3985
3986enum VarLengthMainStreamState {
3988 Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
3990 Processing(EdgeAdjacencyMap),
3992 Materializing {
3994 adjacency: EdgeAdjacencyMap,
3995 fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
3996 },
3997 Done,
3999}
4000
4001#[allow(dead_code)] struct GraphVariableLengthTraverseMainStream {
4004 input: SendableRecordBatchStream,
4005 source_column: String,
4006 type_names: Vec<String>,
4007 direction: Direction,
4008 min_hops: usize,
4009 max_hops: usize,
4010 target_variable: String,
4011 step_variable: Option<String>,
4013 path_variable: Option<String>,
4014 target_properties: Vec<String>,
4015 graph_ctx: Arc<GraphExecutionContext>,
4016 is_optional: bool,
4017 bound_target_column: Option<String>,
4018 edge_lance_filter: Option<String>,
4019 edge_property_conditions: Vec<(String, UniValue)>,
4021 used_edge_columns: Vec<String>,
4022 path_mode: super::nfa::PathMode,
4023 output_mode: super::nfa::VlpOutputMode,
4024 schema: SchemaRef,
4025 state: VarLengthMainStreamState,
4026 metrics: BaselineMetrics,
4027}
4028
4029type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
4031
4032impl GraphVariableLengthTraverseMainStream {
4033 fn bfs(
4039 &self,
4040 source: Vid,
4041 adjacency: &EdgeAdjacencyMap,
4042 used_eids: &FxHashSet<u64>,
4043 ) -> Vec<MainBfsResult> {
4044 let mut results = Vec::new();
4045 let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
4046
4047 queue.push_back((source, 0, vec![source], vec![]));
4048
4049 while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
4050 if depth >= self.min_hops && depth <= self.max_hops {
4052 results.push((current, depth, node_path.clone(), edge_path.clone()));
4053 }
4054
4055 if depth >= self.max_hops {
4057 continue;
4058 }
4059
4060 if let Some(neighbors) = adjacency.get(¤t) {
4062 let is_undirected = matches!(self.direction, Direction::Both);
4063 let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
4064
4065 for (neighbor, eid, _edge_type, props) in neighbors {
4066 if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
4068 continue;
4069 }
4070
4071 if edge_path.contains(eid) {
4073 continue;
4074 }
4075
4076 if used_eids.contains(&eid.as_u64()) {
4079 continue;
4080 }
4081
4082 if !self.edge_property_conditions.is_empty() {
4084 let passes =
4085 self.edge_property_conditions
4086 .iter()
4087 .all(|(name, expected)| {
4088 props.get(name).is_some_and(|actual| actual == expected)
4089 });
4090 if !passes {
4091 continue;
4092 }
4093 }
4094
4095 let mut new_node_path = node_path.clone();
4096 new_node_path.push(*neighbor);
4097 let mut new_edge_path = edge_path.clone();
4098 new_edge_path.push(*eid);
4099 queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
4100 }
4101 }
4102 }
4103
4104 results
4105 }
4106
4107 fn process_batch(
4109 &self,
4110 batch: RecordBatch,
4111 adjacency: &EdgeAdjacencyMap,
4112 ) -> DFResult<RecordBatch> {
4113 let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
4114 datafusion::error::DataFusionError::Execution(format!(
4115 "Source column '{}' not found in input batch",
4116 self.source_column
4117 ))
4118 })?;
4119
4120 let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
4121 let source_vids: &UInt64Array = &source_vid_cow;
4122
4123 let bound_target_cow = self
4125 .bound_target_column
4126 .as_ref()
4127 .and_then(|col| batch.column_by_name(col))
4128 .map(|c| column_as_vid_array(c.as_ref()))
4129 .transpose()?;
4130 let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
4131
4132 let used_edge_arrays: Vec<&UInt64Array> = self
4134 .used_edge_columns
4135 .iter()
4136 .filter_map(|col| {
4137 batch
4138 .column_by_name(col)?
4139 .as_any()
4140 .downcast_ref::<UInt64Array>()
4141 })
4142 .collect();
4143
4144 let mut expansions: Vec<ExpansionRecord> = Vec::new();
4146
4147 for (row_idx, source_opt) in source_vids.iter().enumerate() {
4148 let mut emitted_for_row = false;
4149
4150 if let Some(source_u64) = source_opt {
4151 let source = Vid::from(source_u64);
4152
4153 let used_eids: FxHashSet<u64> = used_edge_arrays
4155 .iter()
4156 .filter_map(|arr| {
4157 if arr.is_null(row_idx) {
4158 None
4159 } else {
4160 Some(arr.value(row_idx))
4161 }
4162 })
4163 .collect();
4164
4165 let bfs_results = self.bfs(source, adjacency, &used_eids);
4166
4167 for (target, hops, node_path, edge_path) in bfs_results {
4168 if let Some(targets) = expected_targets {
4171 if targets.is_null(row_idx) {
4172 continue;
4173 }
4174 let expected_vid = targets.value(row_idx);
4175 if target.as_u64() != expected_vid {
4176 continue;
4177 }
4178 }
4179
4180 expansions.push((row_idx, target, hops, node_path, edge_path));
4181 emitted_for_row = true;
4182 }
4183 }
4184
4185 if self.is_optional && !emitted_for_row {
4186 expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
4188 }
4189 }
4190
4191 if expansions.is_empty() {
4192 if self.is_optional {
4193 let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
4194 return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
4195 }
4196 return Ok(RecordBatch::new_empty(self.schema.clone()));
4197 }
4198
4199 let num_rows = expansions.len();
4200 self.metrics.record_output(num_rows);
4201
4202 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
4204
4205 for col_idx in 0..batch.num_columns() {
4207 let array = batch.column(col_idx);
4208 let indices: Vec<u64> = expansions
4209 .iter()
4210 .map(|(idx, _, _, _, _)| *idx as u64)
4211 .collect();
4212 let take_indices = UInt64Array::from(indices);
4213 let expanded = arrow::compute::take(array, &take_indices, None)?;
4214 columns.push(expanded);
4215 }
4216
4217 let target_vid_name = format!("{}._vid", self.target_variable);
4219 if batch.schema().column_with_name(&target_vid_name).is_none() {
4220 let target_vids: Vec<Option<u64>> = expansions
4221 .iter()
4222 .map(|(_, vid, _, node_path, edge_path)| {
4223 if node_path.is_empty() && edge_path.is_empty() {
4224 None
4225 } else {
4226 Some(vid.as_u64())
4227 }
4228 })
4229 .collect();
4230 columns.push(Arc::new(UInt64Array::from(target_vids)));
4231 }
4232
4233 let target_labels_name = format!("{}._labels", self.target_variable);
4235 if batch
4236 .schema()
4237 .column_with_name(&target_labels_name)
4238 .is_none()
4239 {
4240 use arrow_array::builder::{ListBuilder, StringBuilder};
4241 let mut labels_builder = ListBuilder::new(StringBuilder::new());
4242 for (_, vid, _, node_path, edge_path) in expansions.iter() {
4243 if node_path.is_empty() && edge_path.is_empty() {
4244 labels_builder.append(false);
4245 continue;
4246 }
4247 let mut row_labels: Vec<String> = Vec::new();
4248 let labels =
4249 l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
4250 for lbl in &labels {
4251 if !row_labels.contains(lbl) {
4252 row_labels.push(lbl.clone());
4253 }
4254 }
4255 let values = labels_builder.values();
4256 for lbl in &row_labels {
4257 values.append_value(lbl);
4258 }
4259 labels_builder.append(true);
4260 }
4261 columns.push(Arc::new(labels_builder.finish()));
4262 }
4263
4264 let hop_counts: Vec<u64> = expansions
4266 .iter()
4267 .map(|(_, _, hops, _, _)| *hops as u64)
4268 .collect();
4269 columns.push(Arc::new(UInt64Array::from(hop_counts)));
4270
4271 if self.step_variable.is_some() {
4273 let mut edges_builder = new_edge_list_builder();
4274 let query_ctx = self.graph_ctx.query_context();
4275 let type_names_str = self.type_names.join("|");
4276
4277 for (_, _, _, node_path, edge_path) in expansions.iter() {
4278 if node_path.is_empty() && edge_path.is_empty() {
4279 edges_builder.append_null();
4280 } else if edge_path.is_empty() {
4281 edges_builder.append(true);
4283 } else {
4284 for (i, eid) in edge_path.iter().enumerate() {
4285 append_edge_to_struct(
4286 edges_builder.values(),
4287 *eid,
4288 &type_names_str,
4289 node_path[i].as_u64(),
4290 node_path[i + 1].as_u64(),
4291 &query_ctx,
4292 );
4293 }
4294 edges_builder.append(true);
4295 }
4296 }
4297
4298 columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
4299 }
4300
4301 if let Some(path_var_name) = &self.path_variable {
4305 let existing_path_col_idx = batch
4306 .schema()
4307 .column_with_name(path_var_name)
4308 .map(|(idx, _)| idx);
4309 let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
4310 let existing_path = existing_path_arc
4311 .as_ref()
4312 .and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
4313
4314 let mut nodes_builder = new_node_list_builder();
4315 let mut rels_builder = new_edge_list_builder();
4316 let query_ctx = self.graph_ctx.query_context();
4317 let type_names_str = self.type_names.join("|");
4318 let mut path_validity = Vec::with_capacity(expansions.len());
4319
4320 for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
4321 if node_path.is_empty() && edge_path.is_empty() {
4322 nodes_builder.append(false);
4323 rels_builder.append(false);
4324 path_validity.push(false);
4325 continue;
4326 }
4327
4328 let skip_first_vlp_node = if let Some(existing) = existing_path {
4330 if !existing.is_null(row_out_idx) {
4331 prepend_existing_path(
4332 existing,
4333 row_out_idx,
4334 &mut nodes_builder,
4335 &mut rels_builder,
4336 &query_ctx,
4337 );
4338 true
4339 } else {
4340 false
4341 }
4342 } else {
4343 false
4344 };
4345
4346 let start_idx = if skip_first_vlp_node { 1 } else { 0 };
4348 for vid in &node_path[start_idx..] {
4349 append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
4350 }
4351 nodes_builder.append(true);
4352
4353 for (i, eid) in edge_path.iter().enumerate() {
4354 append_edge_to_struct(
4355 rels_builder.values(),
4356 *eid,
4357 &type_names_str,
4358 node_path[i].as_u64(),
4359 node_path[i + 1].as_u64(),
4360 &query_ctx,
4361 );
4362 }
4363 rels_builder.append(true);
4364 path_validity.push(true);
4365 }
4366
4367 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
4369 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
4370
4371 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
4373 let rels_field = Arc::new(Field::new(
4374 "relationships",
4375 rels_array.data_type().clone(),
4376 true,
4377 ));
4378
4379 let path_struct = arrow_array::StructArray::try_new(
4381 vec![nodes_field, rels_field].into(),
4382 vec![nodes_array, rels_array],
4383 Some(arrow::buffer::NullBuffer::from(path_validity)),
4384 )
4385 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))?;
4386
4387 if let Some(idx) = existing_path_col_idx {
4388 columns[idx] = Arc::new(path_struct);
4389 } else {
4390 columns.push(Arc::new(path_struct));
4391 }
4392 }
4393
4394 for prop_name in &self.target_properties {
4397 let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
4398 if batch.schema().column_with_name(&full_prop_name).is_none() {
4399 columns.push(arrow_array::new_null_array(
4400 &DataType::LargeBinary,
4401 num_rows,
4402 ));
4403 }
4404 }
4405
4406 RecordBatch::try_new(self.schema.clone(), columns)
4407 .map_err(|e| datafusion::error::DataFusionError::ArrowError(Box::new(e), None))
4408 }
4409}
4410
4411impl Stream for GraphVariableLengthTraverseMainStream {
4412 type Item = DFResult<RecordBatch>;
4413
4414 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
4415 loop {
4416 let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
4417
4418 match state {
4419 VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
4420 Poll::Ready(Ok(adjacency)) => {
4421 self.state = VarLengthMainStreamState::Processing(adjacency);
4422 }
4424 Poll::Ready(Err(e)) => {
4425 self.state = VarLengthMainStreamState::Done;
4426 return Poll::Ready(Some(Err(e)));
4427 }
4428 Poll::Pending => {
4429 self.state = VarLengthMainStreamState::Loading(fut);
4430 return Poll::Pending;
4431 }
4432 },
4433 VarLengthMainStreamState::Processing(adjacency) => {
4434 match self.input.poll_next_unpin(cx) {
4435 Poll::Ready(Some(Ok(batch))) => {
4436 let base_batch = match self.process_batch(batch, &adjacency) {
4437 Ok(b) => b,
4438 Err(e) => {
4439 self.state = VarLengthMainStreamState::Processing(adjacency);
4440 return Poll::Ready(Some(Err(e)));
4441 }
4442 };
4443
4444 if self.target_properties.is_empty() {
4446 self.state = VarLengthMainStreamState::Processing(adjacency);
4447 return Poll::Ready(Some(Ok(base_batch)));
4448 }
4449
4450 let schema = self.schema.clone();
4452 let target_variable = self.target_variable.clone();
4453 let target_properties = self.target_properties.clone();
4454 let graph_ctx = self.graph_ctx.clone();
4455
4456 let fut = hydrate_vlp_target_properties(
4457 base_batch,
4458 schema,
4459 target_variable,
4460 target_properties,
4461 None, graph_ctx,
4463 );
4464
4465 self.state = VarLengthMainStreamState::Materializing {
4466 adjacency,
4467 fut: Box::pin(fut),
4468 };
4469 }
4471 Poll::Ready(Some(Err(e))) => {
4472 self.state = VarLengthMainStreamState::Done;
4473 return Poll::Ready(Some(Err(e)));
4474 }
4475 Poll::Ready(None) => {
4476 self.state = VarLengthMainStreamState::Done;
4477 return Poll::Ready(None);
4478 }
4479 Poll::Pending => {
4480 self.state = VarLengthMainStreamState::Processing(adjacency);
4481 return Poll::Pending;
4482 }
4483 }
4484 }
4485 VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
4486 match fut.as_mut().poll(cx) {
4487 Poll::Ready(Ok(batch)) => {
4488 self.state = VarLengthMainStreamState::Processing(adjacency);
4489 return Poll::Ready(Some(Ok(batch)));
4490 }
4491 Poll::Ready(Err(e)) => {
4492 self.state = VarLengthMainStreamState::Done;
4493 return Poll::Ready(Some(Err(e)));
4494 }
4495 Poll::Pending => {
4496 self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
4497 return Poll::Pending;
4498 }
4499 }
4500 }
4501 VarLengthMainStreamState::Done => {
4502 return Poll::Ready(None);
4503 }
4504 }
4505 }
4506 }
4507}
4508
4509impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
4510 fn schema(&self) -> SchemaRef {
4511 self.schema.clone()
4512 }
4513}
4514
4515#[cfg(test)]
4516mod tests {
4517 use super::*;
4518
4519 #[test]
4520 fn test_traverse_schema_without_edge() {
4521 let input_schema = Arc::new(Schema::new(vec![Field::new(
4522 "a._vid",
4523 DataType::UInt64,
4524 false,
4525 )]));
4526
4527 let output_schema =
4528 GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
4529
4530 assert_eq!(output_schema.fields().len(), 4);
4532 assert_eq!(output_schema.field(0).name(), "a._vid");
4533 assert_eq!(output_schema.field(1).name(), "m._vid");
4534 assert_eq!(output_schema.field(2).name(), "m._labels");
4535 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4536 }
4537
4538 #[test]
4539 fn test_traverse_schema_with_edge() {
4540 let input_schema = Arc::new(Schema::new(vec![Field::new(
4541 "a._vid",
4542 DataType::UInt64,
4543 false,
4544 )]));
4545
4546 let output_schema = GraphTraverseExec::build_schema(
4547 input_schema,
4548 "m",
4549 Some("r"),
4550 &[],
4551 &[],
4552 None,
4553 None,
4554 false,
4555 );
4556
4557 assert_eq!(output_schema.fields().len(), 5);
4559 assert_eq!(output_schema.field(0).name(), "a._vid");
4560 assert_eq!(output_schema.field(1).name(), "m._vid");
4561 assert_eq!(output_schema.field(2).name(), "m._labels");
4562 assert_eq!(output_schema.field(3).name(), "r._eid");
4563 assert_eq!(output_schema.field(4).name(), "r._type");
4564 }
4565
4566 #[test]
4567 fn test_traverse_schema_with_target_properties() {
4568 let input_schema = Arc::new(Schema::new(vec![Field::new(
4569 "a._vid",
4570 DataType::UInt64,
4571 false,
4572 )]));
4573
4574 let target_props = vec!["name".to_string(), "age".to_string()];
4575 let output_schema = GraphTraverseExec::build_schema(
4576 input_schema,
4577 "m",
4578 Some("r"),
4579 &[],
4580 &target_props,
4581 None,
4582 None,
4583 false,
4584 );
4585
4586 assert_eq!(output_schema.fields().len(), 7);
4588 assert_eq!(output_schema.field(0).name(), "a._vid");
4589 assert_eq!(output_schema.field(1).name(), "m._vid");
4590 assert_eq!(output_schema.field(2).name(), "m._labels");
4591 assert_eq!(output_schema.field(3).name(), "m.name");
4592 assert_eq!(output_schema.field(4).name(), "m.age");
4593 assert_eq!(output_schema.field(5).name(), "r._eid");
4594 assert_eq!(output_schema.field(6).name(), "r._type");
4595 }
4596
4597 #[test]
4598 fn test_variable_length_schema() {
4599 let input_schema = Arc::new(Schema::new(vec![Field::new(
4600 "a._vid",
4601 DataType::UInt64,
4602 false,
4603 )]));
4604
4605 let output_schema = GraphVariableLengthTraverseExec::build_schema(
4606 input_schema,
4607 "b",
4608 None,
4609 Some("p"),
4610 &[],
4611 None,
4612 );
4613
4614 assert_eq!(output_schema.fields().len(), 5);
4615 assert_eq!(output_schema.field(0).name(), "a._vid");
4616 assert_eq!(output_schema.field(1).name(), "b._vid");
4617 assert_eq!(output_schema.field(2).name(), "b._labels");
4618 assert_eq!(output_schema.field(3).name(), "_hop_count");
4619 assert_eq!(output_schema.field(4).name(), "p");
4620 }
4621
4622 #[test]
4623 fn test_traverse_main_schema_without_edge() {
4624 let input_schema = Arc::new(Schema::new(vec![Field::new(
4625 "a._vid",
4626 DataType::UInt64,
4627 false,
4628 )]));
4629
4630 let output_schema =
4631 GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
4632
4633 assert_eq!(output_schema.fields().len(), 4);
4635 assert_eq!(output_schema.field(0).name(), "a._vid");
4636 assert_eq!(output_schema.field(1).name(), "m._vid");
4637 assert_eq!(output_schema.field(2).name(), "m._labels");
4638 assert_eq!(output_schema.field(3).name(), "__eid_to_m");
4639 }
4640
4641 #[test]
4642 fn test_traverse_main_schema_with_edge() {
4643 let input_schema = Arc::new(Schema::new(vec![Field::new(
4644 "a._vid",
4645 DataType::UInt64,
4646 false,
4647 )]));
4648
4649 let output_schema = GraphTraverseMainExec::build_schema(
4650 &input_schema,
4651 "m",
4652 &Some("r".to_string()),
4653 &[],
4654 &[],
4655 false,
4656 );
4657
4658 assert_eq!(output_schema.fields().len(), 5);
4660 assert_eq!(output_schema.field(0).name(), "a._vid");
4661 assert_eq!(output_schema.field(1).name(), "m._vid");
4662 assert_eq!(output_schema.field(2).name(), "m._labels");
4663 assert_eq!(output_schema.field(3).name(), "r._eid");
4664 assert_eq!(output_schema.field(4).name(), "r._type");
4665 }
4666
4667 #[test]
4668 fn test_traverse_main_schema_with_edge_properties() {
4669 let input_schema = Arc::new(Schema::new(vec![Field::new(
4670 "a._vid",
4671 DataType::UInt64,
4672 false,
4673 )]));
4674
4675 let edge_props = vec!["weight".to_string(), "since".to_string()];
4676 let output_schema = GraphTraverseMainExec::build_schema(
4677 &input_schema,
4678 "m",
4679 &Some("r".to_string()),
4680 &edge_props,
4681 &[],
4682 false,
4683 );
4684
4685 assert_eq!(output_schema.fields().len(), 7);
4687 assert_eq!(output_schema.field(0).name(), "a._vid");
4688 assert_eq!(output_schema.field(1).name(), "m._vid");
4689 assert_eq!(output_schema.field(2).name(), "m._labels");
4690 assert_eq!(output_schema.field(3).name(), "r._eid");
4691 assert_eq!(output_schema.field(4).name(), "r._type");
4692 assert_eq!(output_schema.field(5).name(), "r.weight");
4693 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4694 assert_eq!(output_schema.field(6).name(), "r.since");
4695 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4696 }
4697
4698 #[test]
4699 fn test_traverse_main_schema_with_target_properties() {
4700 let input_schema = Arc::new(Schema::new(vec![Field::new(
4701 "a._vid",
4702 DataType::UInt64,
4703 false,
4704 )]));
4705
4706 let target_props = vec!["name".to_string(), "age".to_string()];
4707 let output_schema = GraphTraverseMainExec::build_schema(
4708 &input_schema,
4709 "m",
4710 &Some("r".to_string()),
4711 &[],
4712 &target_props,
4713 false,
4714 );
4715
4716 assert_eq!(output_schema.fields().len(), 7);
4718 assert_eq!(output_schema.field(0).name(), "a._vid");
4719 assert_eq!(output_schema.field(1).name(), "m._vid");
4720 assert_eq!(output_schema.field(2).name(), "m._labels");
4721 assert_eq!(output_schema.field(3).name(), "r._eid");
4722 assert_eq!(output_schema.field(4).name(), "r._type");
4723 assert_eq!(output_schema.field(5).name(), "m.name");
4724 assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
4725 assert_eq!(output_schema.field(6).name(), "m.age");
4726 assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
4727 }
4728}