use crate::query::df_graph::GraphExecutionContext;
use crate::query::df_graph::bitmap::{EidFilter, VidFilter};
use crate::query::df_graph::common::{
append_edge_to_struct, append_node_to_struct, arrow_err, build_edge_list_field,
build_path_struct_field, column_as_vid_array, compute_plan_properties, labels_data_type,
new_edge_list_builder, new_node_list_builder,
};
use crate::query::df_graph::nfa::{NfaStateId, PathNfa, PathSelector, VlpOutputMode};
use crate::query::df_graph::pred_dag::PredecessorDag;
use crate::query::df_graph::scan::{build_property_column_static, resolve_property_type};
use arrow::compute::take;
use arrow_array::{Array, ArrayRef, RecordBatch, UInt64Array};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use datafusion::common::Result as DFResult;
use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
use futures::{Stream, StreamExt};
use fxhash::FxHashSet;
use std::any::Any;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use uni_common::Value as UniValue;
use uni_common::core::id::{Eid, Vid};
use uni_store::runtime::l0_visibility;
use uni_store::storage::direction::Direction;
type BfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
type ExpansionRecord = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
fn prepend_existing_path(
existing_path: &arrow_array::StructArray,
row_idx: usize,
nodes_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
rels_builder: &mut arrow_array::builder::ListBuilder<arrow_array::builder::StructBuilder>,
query_ctx: &uni_store::runtime::context::QueryContext,
) {
let nodes_list = existing_path
.column(0)
.as_any()
.downcast_ref::<arrow_array::ListArray>()
.unwrap();
let node_values = nodes_list.value(row_idx);
let node_struct = node_values
.as_any()
.downcast_ref::<arrow_array::StructArray>()
.unwrap();
let vid_col = node_struct
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
for i in 0..vid_col.len() {
append_node_to_struct(
nodes_builder.values(),
Vid::from(vid_col.value(i)),
query_ctx,
);
}
let rels_list = existing_path
.column(1)
.as_any()
.downcast_ref::<arrow_array::ListArray>()
.unwrap();
let edge_values = rels_list.value(row_idx);
let edge_struct = edge_values
.as_any()
.downcast_ref::<arrow_array::StructArray>()
.unwrap();
let eid_col = edge_struct
.column(0)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let type_col = edge_struct
.column(1)
.as_any()
.downcast_ref::<arrow_array::StringArray>()
.unwrap();
let src_col = edge_struct
.column(2)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
let dst_col = edge_struct
.column(3)
.as_any()
.downcast_ref::<UInt64Array>()
.unwrap();
for i in 0..eid_col.len() {
append_edge_to_struct(
rels_builder.values(),
Eid::from(eid_col.value(i)),
type_col.value(i),
src_col.value(i),
dst_col.value(i),
query_ctx,
);
}
}
fn resolve_edge_property_type(
prop: &str,
schema_props: Option<
&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
>,
) -> DataType {
if prop == "overflow_json" {
DataType::LargeBinary
} else {
schema_props
.and_then(|props| props.get(prop))
.map(|meta| meta.r#type.to_arrow())
.unwrap_or(DataType::LargeBinary)
}
}
use crate::query::df_graph::common::merged_edge_schema_props;
type VarLengthExpansion = (usize, Vid, usize, Vec<Vid>, Vec<Eid>);
pub struct GraphTraverseExec {
input: Arc<dyn ExecutionPlan>,
source_column: String,
edge_type_ids: Vec<u32>,
direction: Direction,
target_variable: String,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
target_label_name: Option<String>,
target_label_id: Option<u16>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
schema: SchemaRef,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl fmt::Debug for GraphTraverseExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GraphTraverseExec")
.field("source_column", &self.source_column)
.field("edge_type_ids", &self.edge_type_ids)
.field("direction", &self.direction)
.field("target_variable", &self.target_variable)
.field("edge_variable", &self.edge_variable)
.finish()
}
}
impl GraphTraverseExec {
#[expect(clippy::too_many_arguments)]
pub fn new(
input: Arc<dyn ExecutionPlan>,
source_column: impl Into<String>,
edge_type_ids: Vec<u32>,
direction: Direction,
target_variable: impl Into<String>,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
target_label_name: Option<String>,
target_label_id: Option<u16>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
) -> Self {
let source_column = source_column.into();
let target_variable = target_variable.into();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let label_props = target_label_name
.as_deref()
.and_then(|ln| uni_schema.properties.get(ln));
let merged_edge_props = merged_edge_schema_props(&uni_schema, &edge_type_ids);
let edge_props = if merged_edge_props.is_empty() {
None
} else {
Some(&merged_edge_props)
};
let schema = Self::build_schema(
input.schema(),
&target_variable,
edge_variable.as_deref(),
&edge_properties,
&target_properties,
label_props,
edge_props,
optional,
);
let properties = compute_plan_properties(schema.clone());
Self {
input,
source_column,
edge_type_ids,
direction,
target_variable,
edge_variable,
edge_properties,
target_properties,
target_label_name,
target_label_id,
graph_ctx,
optional,
optional_pattern_vars,
bound_target_column,
used_edge_columns,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
#[expect(
clippy::too_many_arguments,
reason = "Schema construction needs all field metadata"
)]
fn build_schema(
input_schema: SchemaRef,
target_variable: &str,
edge_variable: Option<&str>,
edge_properties: &[String],
target_properties: &[String],
label_props: Option<
&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
>,
edge_props: Option<
&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
>,
optional: bool,
) -> SchemaRef {
let mut fields: Vec<Field> = input_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
let target_vid_name = format!("{}._vid", target_variable);
fields.push(Field::new(&target_vid_name, DataType::UInt64, optional));
fields.push(Field::new(
format!("{}._labels", target_variable),
labels_data_type(),
true,
));
for prop_name in target_properties {
let col_name = format!("{}.{}", target_variable, prop_name);
let arrow_type = resolve_property_type(prop_name, label_props);
fields.push(Field::new(&col_name, arrow_type, true));
}
if let Some(edge_var) = edge_variable {
let edge_id_name = format!("{}._eid", edge_var);
fields.push(Field::new(&edge_id_name, DataType::UInt64, optional));
fields.push(Field::new(
format!("{}._type", edge_var),
DataType::Utf8,
true,
));
for prop_name in edge_properties {
let prop_col_name = format!("{}.{}", edge_var, prop_name);
let arrow_type = resolve_edge_property_type(prop_name, edge_props);
fields.push(Field::new(&prop_col_name, arrow_type, true));
}
} else {
let internal_eid_name = format!("__eid_to_{}", target_variable);
fields.push(Field::new(&internal_eid_name, DataType::UInt64, optional));
}
Arc::new(Schema::new(fields))
}
}
impl DisplayAs for GraphTraverseExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"GraphTraverseExec: {} --[{:?}]--> {}",
self.source_column, self.edge_type_ids, self.target_variable
)?;
if let Some(ref edge_var) = self.edge_variable {
write!(f, " as {}", edge_var)?;
}
Ok(())
}
}
impl ExecutionPlan for GraphTraverseExec {
fn name(&self) -> &str {
"GraphTraverseExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(datafusion::error::DataFusionError::Plan(
"GraphTraverseExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
children[0].clone(),
self.source_column.clone(),
self.edge_type_ids.clone(),
self.direction,
self.target_variable.clone(),
self.edge_variable.clone(),
self.edge_properties.clone(),
self.target_properties.clone(),
self.target_label_name.clone(),
self.target_label_id,
self.graph_ctx.clone(),
self.optional,
self.optional_pattern_vars.clone(),
self.bound_target_column.clone(),
self.used_edge_columns.clone(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let metrics = BaselineMetrics::new(&self.metrics, partition);
let warm_fut = self
.graph_ctx
.warming_future(self.edge_type_ids.clone(), self.direction);
Ok(Box::pin(GraphTraverseStream {
input: input_stream,
source_column: self.source_column.clone(),
edge_type_ids: self.edge_type_ids.clone(),
direction: self.direction,
target_variable: self.target_variable.clone(),
edge_variable: self.edge_variable.clone(),
edge_properties: self.edge_properties.clone(),
target_properties: self.target_properties.clone(),
target_label_name: self.target_label_name.clone(),
graph_ctx: self.graph_ctx.clone(),
optional: self.optional,
optional_pattern_vars: self.optional_pattern_vars.clone(),
bound_target_column: self.bound_target_column.clone(),
used_edge_columns: self.used_edge_columns.clone(),
schema: self.schema.clone(),
state: TraverseStreamState::Warming(warm_fut),
metrics,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
enum TraverseStreamState {
Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
Reading,
Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
Done,
}
struct GraphTraverseStream {
input: SendableRecordBatchStream,
source_column: String,
edge_type_ids: Vec<u32>,
direction: Direction,
#[expect(dead_code, reason = "Retained for debug logging and diagnostics")]
target_variable: String,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
target_label_name: Option<String>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
schema: SchemaRef,
state: TraverseStreamState,
metrics: BaselineMetrics,
}
impl GraphTraverseStream {
fn expand_neighbors(&self, batch: &RecordBatch) -> DFResult<Vec<(usize, Vid, u64, u32)>> {
let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"Source column '{}' not found",
self.source_column
))
})?;
let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
let source_vids: &UInt64Array = &source_vid_cow;
let bound_target_cow = self
.bound_target_column
.as_ref()
.and_then(|col| batch.column_by_name(col))
.map(|c| column_as_vid_array(c.as_ref()))
.transpose()?;
let bound_target_vids: Option<&UInt64Array> = bound_target_cow.as_deref();
let used_edge_arrays: Vec<&UInt64Array> = self
.used_edge_columns
.iter()
.filter_map(|col| {
batch
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
})
.collect();
let mut expanded_rows: Vec<(usize, Vid, u64, u32)> = Vec::new();
let is_undirected = matches!(self.direction, Direction::Both);
for (row_idx, source_vid) in source_vids.iter().enumerate() {
let Some(src) = source_vid else {
continue;
};
let expected_target = bound_target_vids.map(|arr| {
if arr.is_null(row_idx) {
None
} else {
Some(arr.value(row_idx))
}
});
let used_eids: HashSet<u64> = used_edge_arrays
.iter()
.filter_map(|arr| {
if arr.is_null(row_idx) {
None
} else {
Some(arr.value(row_idx))
}
})
.collect();
let vid = Vid::from(src);
let mut seen_edges: HashSet<u64> = HashSet::new();
for &edge_type in &self.edge_type_ids {
let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, self.direction);
for (target_vid, eid) in neighbors {
let eid_u64 = eid.as_u64();
if used_eids.contains(&eid_u64) {
continue;
}
if is_undirected && !seen_edges.insert(eid_u64) {
continue;
}
if let Some(expected_opt) = expected_target {
let Some(expected) = expected_opt else {
continue;
};
if target_vid.as_u64() != expected {
continue;
}
}
if let Some(ref label_name) = self.target_label_name {
let query_ctx = self.graph_ctx.query_context();
if let Some(vertex_labels) =
l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
{
if !vertex_labels.contains(label_name) {
continue;
}
}
}
expanded_rows.push((row_idx, target_vid, eid_u64, edge_type));
}
}
}
Ok(expanded_rows)
}
}
fn build_target_labels_column(
target_vids: &[Vid],
target_label_name: &Option<String>,
graph_ctx: &GraphExecutionContext,
) -> ArrayRef {
use arrow_array::builder::{ListBuilder, StringBuilder};
let mut labels_builder = ListBuilder::new(StringBuilder::new());
let query_ctx = graph_ctx.query_context();
for vid in target_vids {
let row_labels: Vec<String> =
match l0_visibility::get_vertex_labels_optional(*vid, &query_ctx) {
Some(labels) => labels,
None => {
if let Some(label_name) = target_label_name {
vec![label_name.clone()]
} else {
vec![]
}
}
};
let values = labels_builder.values();
for lbl in &row_labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
Arc::new(labels_builder.finish())
}
async fn build_target_property_columns(
target_vids: &[Vid],
target_properties: &[String],
target_label_name: &Option<String>,
graph_ctx: &Arc<GraphExecutionContext>,
) -> DFResult<Vec<ArrayRef>> {
let mut columns = Vec::new();
if let Some(label_name) = target_label_name {
let property_manager = graph_ctx.property_manager();
let query_ctx = graph_ctx.query_context();
let props_map = property_manager
.get_batch_vertex_props_for_label(target_vids, label_name, Some(&query_ctx))
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let uni_schema = graph_ctx.storage().schema_manager().schema();
let label_props = uni_schema.properties.get(label_name.as_str());
for prop_name in target_properties {
let data_type = resolve_property_type(prop_name, label_props);
let column =
build_property_column_static(target_vids, &props_map, prop_name, &data_type)?;
columns.push(column);
}
} else {
let non_internal_props: Vec<&str> = target_properties
.iter()
.filter(|p| *p != "_all_props")
.map(|s| s.as_str())
.collect();
let property_manager = graph_ctx.property_manager();
let query_ctx = graph_ctx.query_context();
let props_map = if !non_internal_props.is_empty() {
property_manager
.get_batch_vertex_props(target_vids, &non_internal_props, Some(&query_ctx))
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
} else {
std::collections::HashMap::new()
};
for prop_name in target_properties {
if prop_name == "_all_props" {
columns.push(build_all_props_column(target_vids, &props_map, graph_ctx));
} else {
let column = build_property_column_static(
target_vids,
&props_map,
prop_name,
&arrow::datatypes::DataType::LargeBinary,
)?;
columns.push(column);
}
}
}
Ok(columns)
}
fn build_all_props_column(
target_vids: &[Vid],
props_map: &HashMap<Vid, HashMap<String, uni_common::Value>>,
graph_ctx: &Arc<GraphExecutionContext>,
) -> ArrayRef {
use crate::query::df_graph::scan::encode_cypher_value;
use arrow_array::builder::LargeBinaryBuilder;
let mut builder = LargeBinaryBuilder::new();
let l0_ctx = graph_ctx.l0_context();
for vid in target_vids {
let mut merged_props = serde_json::Map::new();
if let Some(vid_props) = props_map.get(vid) {
for (k, v) in vid_props.iter() {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.to_string(), json_val);
}
}
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_props) = guard.vertex_properties.get(vid) {
for (k, v) in l0_props.iter() {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.to_string(), json_val);
}
}
}
if merged_props.is_empty() {
builder.append_null();
} else {
let json = serde_json::Value::Object(merged_props);
match encode_cypher_value(&json) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
Arc::new(builder.finish())
}
async fn build_edge_columns(
expansions: &[(usize, Vid, u64, u32)],
edge_properties: &[String],
edge_type_ids: &[u32],
graph_ctx: &Arc<GraphExecutionContext>,
) -> DFResult<Vec<ArrayRef>> {
let mut columns = Vec::new();
let eids: Vec<Eid> = expansions
.iter()
.map(|(_, _, eid, _)| Eid::from(*eid))
.collect();
let eid_u64s: Vec<u64> = eids.iter().map(|e| e.as_u64()).collect();
columns.push(Arc::new(UInt64Array::from(eid_u64s)) as ArrayRef);
{
let uni_schema = graph_ctx.storage().schema_manager().schema();
let mut type_builder = arrow_array::builder::StringBuilder::new();
for (_, _, _, edge_type_id) in expansions {
if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
type_builder.append_value(&name);
} else {
type_builder.append_null();
}
}
columns.push(Arc::new(type_builder.finish()) as ArrayRef);
}
if !edge_properties.is_empty() {
let prop_name_refs: Vec<&str> = edge_properties.iter().map(|s| s.as_str()).collect();
let property_manager = graph_ctx.property_manager();
let query_ctx = graph_ctx.query_context();
let props_map = property_manager
.get_batch_edge_props(&eids, &prop_name_refs, Some(&query_ctx))
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let uni_schema = graph_ctx.storage().schema_manager().schema();
let merged_edge_props = merged_edge_schema_props(&uni_schema, edge_type_ids);
let edge_type_props = if merged_edge_props.is_empty() {
None
} else {
Some(&merged_edge_props)
};
let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
for prop_name in edge_properties {
let data_type = resolve_edge_property_type(prop_name, edge_type_props);
let column =
build_property_column_static(&vid_keys, &props_map, prop_name, &data_type)?;
columns.push(column);
}
}
Ok(columns)
}
#[expect(
clippy::too_many_arguments,
reason = "Standalone async fn needs all context passed explicitly"
)]
async fn build_traverse_output_batch(
input: RecordBatch,
expansions: Vec<(usize, Vid, u64, u32)>,
schema: SchemaRef,
edge_variable: Option<String>,
edge_properties: Vec<String>,
edge_type_ids: Vec<u32>,
target_properties: Vec<String>,
target_label_name: Option<String>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
) -> DFResult<RecordBatch> {
if expansions.is_empty() {
if !optional {
return Ok(RecordBatch::new_empty(schema));
}
let unmatched_reps = collect_unmatched_optional_group_rows(
&input,
&HashSet::new(),
&schema,
&optional_pattern_vars,
)?;
if unmatched_reps.is_empty() {
return Ok(RecordBatch::new_empty(schema));
}
return build_optional_null_batch_for_rows_with_optional_vars(
&input,
&unmatched_reps,
&schema,
&optional_pattern_vars,
);
}
let indices: Vec<u64> = expansions
.iter()
.map(|(idx, _, _, _)| *idx as u64)
.collect();
let indices_array = UInt64Array::from(indices);
let mut columns: Vec<ArrayRef> = input
.columns()
.iter()
.map(|col| take(col.as_ref(), &indices_array, None))
.collect::<Result<_, _>>()?;
let target_vids: Vec<Vid> = expansions.iter().map(|(_, vid, _, _)| *vid).collect();
let target_vid_u64s: Vec<u64> = target_vids.iter().map(|v| v.as_u64()).collect();
columns.push(Arc::new(UInt64Array::from(target_vid_u64s)));
columns.push(build_target_labels_column(
&target_vids,
&target_label_name,
&graph_ctx,
));
if !target_properties.is_empty() {
let prop_cols = build_target_property_columns(
&target_vids,
&target_properties,
&target_label_name,
&graph_ctx,
)
.await?;
columns.extend(prop_cols);
}
if edge_variable.is_some() {
let edge_cols =
build_edge_columns(&expansions, &edge_properties, &edge_type_ids, &graph_ctx).await?;
columns.extend(edge_cols);
} else {
let eid_u64s: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
columns.push(Arc::new(UInt64Array::from(eid_u64s)));
}
let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
if optional {
let matched_indices: HashSet<usize> =
expansions.iter().map(|(idx, _, _, _)| *idx).collect();
let unmatched = collect_unmatched_optional_group_rows(
&input,
&matched_indices,
&schema,
&optional_pattern_vars,
)?;
if !unmatched.is_empty() {
let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
&input,
&unmatched,
&schema,
&optional_pattern_vars,
)?;
let combined = arrow::compute::concat_batches(&schema, [&expanded_batch, &null_batch])
.map_err(arrow_err)?;
return Ok(combined);
}
}
Ok(expanded_batch)
}
fn build_optional_null_batch_for_rows(
input: &RecordBatch,
unmatched_indices: &[usize],
schema: &SchemaRef,
) -> DFResult<RecordBatch> {
let num_rows = unmatched_indices.len();
let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
let indices_array = UInt64Array::from(indices);
let mut columns: Vec<ArrayRef> = Vec::new();
for col in input.columns() {
let taken = take(col.as_ref(), &indices_array, None)?;
columns.push(taken);
}
for field in schema.fields().iter().skip(input.num_columns()) {
columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
}
RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
}
fn is_optional_column_for_vars(col_name: &str, optional_vars: &HashSet<String>) -> bool {
optional_vars.contains(col_name)
|| optional_vars.iter().any(|var| {
(col_name.starts_with(var.as_str()) && col_name[var.len()..].starts_with('.'))
|| (col_name.starts_with("__eid_to_") && col_name.ends_with(var.as_str()))
})
}
fn collect_unmatched_optional_group_rows(
input: &RecordBatch,
matched_indices: &HashSet<usize>,
schema: &SchemaRef,
optional_vars: &HashSet<String>,
) -> DFResult<Vec<usize>> {
if input.num_rows() == 0 {
return Ok(Vec::new());
}
if optional_vars.is_empty() {
return Ok((0..input.num_rows())
.filter(|idx| !matched_indices.contains(idx))
.collect());
}
let source_vid_indices: Vec<usize> = schema
.fields()
.iter()
.enumerate()
.filter_map(|(idx, field)| {
if idx >= input.num_columns() {
return None;
}
let name = field.name();
if !is_optional_column_for_vars(name, optional_vars) && name.ends_with("._vid") {
Some(idx)
} else {
None
}
})
.collect();
let mut groups: HashMap<Vec<u8>, (usize, bool)> = HashMap::new(); let mut group_order: Vec<Vec<u8>> = Vec::new();
for row_idx in 0..input.num_rows() {
let key = compute_optional_group_key(input, row_idx, &source_vid_indices)?;
let entry = groups.entry(key.clone());
if matches!(entry, std::collections::hash_map::Entry::Vacant(_)) {
group_order.push(key.clone());
}
let matched = matched_indices.contains(&row_idx);
entry
.and_modify(|(_, any_matched)| *any_matched |= matched)
.or_insert((row_idx, matched));
}
Ok(group_order
.into_iter()
.filter_map(|key| {
groups
.get(&key)
.and_then(|(first_idx, any_matched)| (!*any_matched).then_some(*first_idx))
})
.collect())
}
fn compute_optional_group_key(
batch: &RecordBatch,
row_idx: usize,
source_vid_indices: &[usize],
) -> DFResult<Vec<u8>> {
let mut key = Vec::with_capacity(source_vid_indices.len() * std::mem::size_of::<u64>());
for &col_idx in source_vid_indices {
let col = batch.column(col_idx);
let vid_cow = column_as_vid_array(col.as_ref())?;
let arr: &UInt64Array = &vid_cow;
if arr.is_null(row_idx) {
key.extend_from_slice(&u64::MAX.to_le_bytes());
} else {
key.extend_from_slice(&arr.value(row_idx).to_le_bytes());
}
}
Ok(key)
}
fn build_optional_null_batch_for_rows_with_optional_vars(
input: &RecordBatch,
unmatched_indices: &[usize],
schema: &SchemaRef,
optional_vars: &HashSet<String>,
) -> DFResult<RecordBatch> {
if optional_vars.is_empty() {
return build_optional_null_batch_for_rows(input, unmatched_indices, schema);
}
let num_rows = unmatched_indices.len();
let indices: Vec<u64> = unmatched_indices.iter().map(|&idx| idx as u64).collect();
let indices_array = UInt64Array::from(indices);
let mut columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
for (col_idx, field) in schema.fields().iter().enumerate() {
if col_idx < input.num_columns() {
if is_optional_column_for_vars(field.name(), optional_vars) {
columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
} else {
let taken = take(input.column(col_idx).as_ref(), &indices_array, None)?;
columns.push(taken);
}
} else {
columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
}
}
RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)
}
impl Stream for GraphTraverseStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let state = std::mem::replace(&mut self.state, TraverseStreamState::Done);
match state {
TraverseStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
self.state = TraverseStreamState::Reading;
}
Poll::Ready(Err(e)) => {
self.state = TraverseStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = TraverseStreamState::Warming(fut);
return Poll::Pending;
}
},
TraverseStreamState::Reading => {
if let Err(e) = self.graph_ctx.check_timeout() {
return Poll::Ready(Some(Err(
datafusion::error::DataFusionError::Execution(e.to_string()),
)));
}
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let expansions = match self.expand_neighbors(&batch) {
Ok(exp) => exp,
Err(e) => {
self.state = TraverseStreamState::Reading;
return Poll::Ready(Some(Err(e)));
}
};
if self.target_properties.is_empty() && self.edge_properties.is_empty()
{
let result = build_traverse_output_batch_sync(
&batch,
&expansions,
&self.schema,
self.edge_variable.as_ref(),
&self.graph_ctx,
self.optional,
&self.optional_pattern_vars,
);
self.state = TraverseStreamState::Reading;
if let Ok(ref r) = result {
self.metrics.record_output(r.num_rows());
}
return Poll::Ready(Some(result));
}
let schema = self.schema.clone();
let edge_variable = self.edge_variable.clone();
let edge_properties = self.edge_properties.clone();
let edge_type_ids = self.edge_type_ids.clone();
let target_properties = self.target_properties.clone();
let target_label_name = self.target_label_name.clone();
let graph_ctx = self.graph_ctx.clone();
let optional = self.optional;
let optional_pattern_vars = self.optional_pattern_vars.clone();
let fut = build_traverse_output_batch(
batch,
expansions,
schema,
edge_variable,
edge_properties,
edge_type_ids,
target_properties,
target_label_name,
graph_ctx,
optional,
optional_pattern_vars,
);
self.state = TraverseStreamState::Materializing(Box::pin(fut));
}
Poll::Ready(Some(Err(e))) => {
self.state = TraverseStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
self.state = TraverseStreamState::Done;
return Poll::Ready(None);
}
Poll::Pending => {
self.state = TraverseStreamState::Reading;
return Poll::Pending;
}
}
}
TraverseStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(batch)) => {
self.state = TraverseStreamState::Reading;
self.metrics.record_output(batch.num_rows());
return Poll::Ready(Some(Ok(batch)));
}
Poll::Ready(Err(e)) => {
self.state = TraverseStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = TraverseStreamState::Materializing(fut);
return Poll::Pending;
}
},
TraverseStreamState::Done => {
return Poll::Ready(None);
}
}
}
}
}
fn build_traverse_output_batch_sync(
input: &RecordBatch,
expansions: &[(usize, Vid, u64, u32)],
schema: &SchemaRef,
edge_variable: Option<&String>,
graph_ctx: &GraphExecutionContext,
optional: bool,
optional_pattern_vars: &HashSet<String>,
) -> DFResult<RecordBatch> {
if expansions.is_empty() {
if !optional {
return Ok(RecordBatch::new_empty(schema.clone()));
}
let unmatched_reps = collect_unmatched_optional_group_rows(
input,
&HashSet::new(),
schema,
optional_pattern_vars,
)?;
if unmatched_reps.is_empty() {
return Ok(RecordBatch::new_empty(schema.clone()));
}
return build_optional_null_batch_for_rows_with_optional_vars(
input,
&unmatched_reps,
schema,
optional_pattern_vars,
);
}
let indices: Vec<u64> = expansions
.iter()
.map(|(idx, _, _, _)| *idx as u64)
.collect();
let indices_array = UInt64Array::from(indices);
let mut columns: Vec<ArrayRef> = Vec::new();
for col in input.columns() {
let expanded = take(col.as_ref(), &indices_array, None)?;
columns.push(expanded);
}
let target_vids: Vec<u64> = expansions
.iter()
.map(|(_, vid, _, _)| vid.as_u64())
.collect();
columns.push(Arc::new(UInt64Array::from(target_vids)));
{
use arrow_array::builder::{ListBuilder, StringBuilder};
let l0_ctx = graph_ctx.l0_context();
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for (_, vid, _, _) in expansions {
let mut row_labels: Vec<String> = Vec::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_labels) = guard.vertex_labels.get(vid) {
for lbl in l0_labels {
if !row_labels.contains(lbl) {
row_labels.push(lbl.clone());
}
}
}
}
let values = labels_builder.values();
for lbl in &row_labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
columns.push(Arc::new(labels_builder.finish()));
}
if edge_variable.is_some() {
let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
columns.push(Arc::new(UInt64Array::from(edge_ids)));
let uni_schema = graph_ctx.storage().schema_manager().schema();
let mut type_builder = arrow_array::builder::StringBuilder::new();
for (_, _, _, edge_type_id) in expansions {
if let Some(name) = uni_schema.edge_type_name_by_id_unified(*edge_type_id) {
type_builder.append_value(&name);
} else {
type_builder.append_null();
}
}
columns.push(Arc::new(type_builder.finish()));
} else {
let edge_ids: Vec<u64> = expansions.iter().map(|(_, _, eid, _)| *eid).collect();
columns.push(Arc::new(UInt64Array::from(edge_ids)));
}
let expanded_batch = RecordBatch::try_new(schema.clone(), columns).map_err(arrow_err)?;
if optional {
let matched_indices: HashSet<usize> =
expansions.iter().map(|(idx, _, _, _)| *idx).collect();
let unmatched = collect_unmatched_optional_group_rows(
input,
&matched_indices,
schema,
optional_pattern_vars,
)?;
if !unmatched.is_empty() {
let null_batch = build_optional_null_batch_for_rows_with_optional_vars(
input,
&unmatched,
schema,
optional_pattern_vars,
)?;
let combined = arrow::compute::concat_batches(schema, [&expanded_batch, &null_batch])
.map_err(|e| {
datafusion::error::DataFusionError::ArrowError(Box::new(e), None)
})?;
return Ok(combined);
}
}
Ok(expanded_batch)
}
impl RecordBatchStream for GraphTraverseStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
type EdgeAdjacencyMap = HashMap<Vid, Vec<(Vid, Eid, String, uni_common::Properties)>>;
pub struct GraphTraverseMainExec {
input: Arc<dyn ExecutionPlan>,
source_column: String,
type_names: Vec<String>,
direction: Direction,
target_variable: String,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
schema: SchemaRef,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl fmt::Debug for GraphTraverseMainExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GraphTraverseMainExec")
.field("type_names", &self.type_names)
.field("direction", &self.direction)
.field("target_variable", &self.target_variable)
.field("edge_variable", &self.edge_variable)
.finish()
}
}
impl GraphTraverseMainExec {
#[expect(clippy::too_many_arguments)]
pub fn new(
input: Arc<dyn ExecutionPlan>,
source_column: impl Into<String>,
type_names: Vec<String>,
direction: Direction,
target_variable: impl Into<String>,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
) -> Self {
let source_column = source_column.into();
let target_variable = target_variable.into();
let schema = Self::build_schema(
&input.schema(),
&target_variable,
&edge_variable,
&edge_properties,
&target_properties,
optional,
);
let properties = compute_plan_properties(schema.clone());
Self {
input,
source_column,
type_names,
direction,
target_variable,
edge_variable,
edge_properties,
target_properties,
graph_ctx,
optional,
optional_pattern_vars,
bound_target_column,
used_edge_columns,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn build_schema(
input_schema: &SchemaRef,
target_variable: &str,
edge_variable: &Option<String>,
edge_properties: &[String],
target_properties: &[String],
optional: bool,
) -> SchemaRef {
let mut fields: Vec<Field> = input_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
let target_vid_name = format!("{}._vid", target_variable);
if input_schema.column_with_name(&target_vid_name).is_none() {
fields.push(Field::new(target_vid_name, DataType::UInt64, true));
}
let target_labels_name = format!("{}._labels", target_variable);
if input_schema.column_with_name(&target_labels_name).is_none() {
fields.push(Field::new(target_labels_name, labels_data_type(), true));
}
if let Some(edge_var) = edge_variable {
fields.push(Field::new(
format!("{}._eid", edge_var),
DataType::UInt64,
optional,
));
fields.push(Field::new(
format!("{}._type", edge_var),
DataType::Utf8,
true,
));
for prop in edge_properties {
let col_name = format!("{}.{}", edge_var, prop);
let mut metadata = std::collections::HashMap::new();
metadata.insert("cv_encoded".to_string(), "true".to_string());
fields.push(
Field::new(&col_name, DataType::LargeBinary, true).with_metadata(metadata),
);
}
} else {
fields.push(Field::new(
format!("__eid_to_{}", target_variable),
DataType::UInt64,
optional,
));
}
for prop in target_properties {
fields.push(Field::new(
format!("{}.{}", target_variable, prop),
DataType::LargeBinary,
true,
));
}
Arc::new(Schema::new(fields))
}
}
impl DisplayAs for GraphTraverseMainExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"GraphTraverseMainExec: types={:?}, direction={:?}",
self.type_names, self.direction
)
}
}
impl ExecutionPlan for GraphTraverseMainExec {
fn name(&self) -> &str {
"GraphTraverseMainExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(datafusion::error::DataFusionError::Plan(
"GraphTraverseMainExec expects exactly one child".to_string(),
));
}
Ok(Arc::new(Self {
input: children[0].clone(),
source_column: self.source_column.clone(),
type_names: self.type_names.clone(),
direction: self.direction,
target_variable: self.target_variable.clone(),
edge_variable: self.edge_variable.clone(),
edge_properties: self.edge_properties.clone(),
target_properties: self.target_properties.clone(),
graph_ctx: self.graph_ctx.clone(),
optional: self.optional,
optional_pattern_vars: self.optional_pattern_vars.clone(),
bound_target_column: self.bound_target_column.clone(),
used_edge_columns: self.used_edge_columns.clone(),
schema: self.schema.clone(),
properties: self.properties.clone(),
metrics: self.metrics.clone(),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let metrics = BaselineMetrics::new(&self.metrics, partition);
Ok(Box::pin(GraphTraverseMainStream::new(
input_stream,
self.source_column.clone(),
self.type_names.clone(),
self.direction,
self.target_variable.clone(),
self.edge_variable.clone(),
self.edge_properties.clone(),
self.target_properties.clone(),
self.graph_ctx.clone(),
self.optional,
self.optional_pattern_vars.clone(),
self.bound_target_column.clone(),
self.used_edge_columns.clone(),
self.schema.clone(),
metrics,
)))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
enum GraphTraverseMainState {
LoadingEdges {
future: Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>,
input_stream: SendableRecordBatchStream,
},
Processing {
adjacency: EdgeAdjacencyMap,
input_stream: SendableRecordBatchStream,
},
Done,
}
struct GraphTraverseMainStream {
source_column: String,
target_variable: String,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
schema: SchemaRef,
state: GraphTraverseMainState,
metrics: BaselineMetrics,
}
impl GraphTraverseMainStream {
#[expect(clippy::too_many_arguments)]
fn new(
input_stream: SendableRecordBatchStream,
source_column: String,
type_names: Vec<String>,
direction: Direction,
target_variable: String,
edge_variable: Option<String>,
edge_properties: Vec<String>,
target_properties: Vec<String>,
graph_ctx: Arc<GraphExecutionContext>,
optional: bool,
optional_pattern_vars: HashSet<String>,
bound_target_column: Option<String>,
used_edge_columns: Vec<String>,
schema: SchemaRef,
metrics: BaselineMetrics,
) -> Self {
let loading_ctx = graph_ctx.clone();
let loading_types = type_names.clone();
let fut =
async move { build_edge_adjacency_map(&loading_ctx, &loading_types, direction).await };
Self {
source_column,
target_variable,
edge_variable,
edge_properties,
target_properties,
graph_ctx,
optional,
optional_pattern_vars,
bound_target_column,
used_edge_columns,
schema,
state: GraphTraverseMainState::LoadingEdges {
future: Box::pin(fut),
input_stream,
},
metrics,
}
}
fn expand_batch(
&self,
input: &RecordBatch,
adjacency: &EdgeAdjacencyMap,
) -> DFResult<RecordBatch> {
let source_col = input.column_by_name(&self.source_column).ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"Source column {} not found",
self.source_column
))
})?;
let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
let source_vids: &UInt64Array = &source_vid_cow;
let bound_target_cow = self
.bound_target_column
.as_ref()
.and_then(|col| input.column_by_name(col))
.map(|c| column_as_vid_array(c.as_ref()))
.transpose()?;
let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
let used_edge_arrays: Vec<&UInt64Array> = self
.used_edge_columns
.iter()
.filter_map(|col| {
input
.column_by_name(col)
.and_then(|c| c.as_any().downcast_ref::<UInt64Array>())
})
.collect();
type Expansion = (usize, Vid, Eid, String, uni_common::Properties);
let mut expansions: Vec<Expansion> = Vec::new();
for (row_idx, src_u64) in source_vids.iter().enumerate() {
if let Some(src_u64) = src_u64 {
let src_vid = Vid::from(src_u64);
let used_eids: HashSet<u64> = used_edge_arrays
.iter()
.filter_map(|arr| {
if arr.is_null(row_idx) {
None
} else {
Some(arr.value(row_idx))
}
})
.collect();
if let Some(neighbors) = adjacency.get(&src_vid) {
for (target_vid, eid, edge_type, props) in neighbors {
if used_eids.contains(&eid.as_u64()) {
continue;
}
if let Some(targets) = expected_targets {
if targets.is_null(row_idx) {
continue;
}
let expected_vid = targets.value(row_idx);
if target_vid.as_u64() != expected_vid {
continue;
}
}
expansions.push((
row_idx,
*target_vid,
*eid,
edge_type.clone(),
props.clone(),
));
}
}
}
}
if expansions.is_empty() && self.optional {
let all_indices: Vec<usize> = (0..input.num_rows()).collect();
return build_optional_null_batch_for_rows(input, &all_indices, &self.schema);
}
if expansions.is_empty() {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
let matched_rows: HashSet<usize> = if self.optional {
expansions.iter().map(|(idx, _, _, _, _)| *idx).collect()
} else {
HashSet::new()
};
let mut columns: Vec<ArrayRef> = Vec::new();
let indices: Vec<u64> = expansions
.iter()
.map(|(idx, _, _, _, _)| *idx as u64)
.collect();
let indices_array = UInt64Array::from(indices);
for col in input.columns() {
let expanded = take(col.as_ref(), &indices_array, None)?;
columns.push(expanded);
}
let target_vid_name = format!("{}._vid", self.target_variable);
let target_vids: Vec<u64> = expansions
.iter()
.map(|(_, vid, _, _, _)| vid.as_u64())
.collect();
if input.schema().column_with_name(&target_vid_name).is_none() {
columns.push(Arc::new(UInt64Array::from(target_vids)));
}
let target_labels_name = format!("{}._labels", self.target_variable);
if input
.schema()
.column_with_name(&target_labels_name)
.is_none()
{
use arrow_array::builder::{ListBuilder, StringBuilder};
let l0_ctx = self.graph_ctx.l0_context();
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for (_, target_vid, _, _, _) in &expansions {
let mut row_labels: Vec<String> = Vec::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_labels) = guard.vertex_labels.get(target_vid) {
for lbl in l0_labels {
if !row_labels.contains(lbl) {
row_labels.push(lbl.clone());
}
}
}
}
let values = labels_builder.values();
for lbl in &row_labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
columns.push(Arc::new(labels_builder.finish()));
}
if self.edge_variable.is_some() {
let eids: Vec<u64> = expansions
.iter()
.map(|(_, _, eid, _, _)| eid.as_u64())
.collect();
columns.push(Arc::new(UInt64Array::from(eids)));
{
let mut type_builder = arrow_array::builder::StringBuilder::new();
for (_, _, _, edge_type, _) in &expansions {
type_builder.append_value(edge_type);
}
columns.push(Arc::new(type_builder.finish()));
}
for prop_name in &self.edge_properties {
use crate::query::df_graph::scan::encode_cypher_value;
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
if prop_name == "_all_props" {
for (_, _, _, _, props) in &expansions {
if props.is_empty() {
builder.append_null();
} else {
let mut json_map = serde_json::Map::new();
for (k, v) in props.iter() {
let json_val: serde_json::Value = v.clone().into();
json_map.insert(k.clone(), json_val);
}
let json = serde_json::Value::Object(json_map);
match encode_cypher_value(&json) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
} else {
for (_, _, _, _, props) in &expansions {
match props.get(prop_name) {
Some(uni_common::Value::Null) | None => builder.append_null(),
Some(val) => {
let json_val: serde_json::Value = val.clone().into();
match encode_cypher_value(&json_val) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
}
}
columns.push(Arc::new(builder.finish()));
}
} else {
let eids: Vec<u64> = expansions
.iter()
.map(|(_, _, eid, _, _)| eid.as_u64())
.collect();
columns.push(Arc::new(UInt64Array::from(eids)));
}
{
use crate::query::df_graph::scan::encode_cypher_value;
let l0_ctx = self.graph_ctx.l0_context();
for prop_name in &self.target_properties {
if prop_name == "_all_props" {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for (_, target_vid, _, _, _) in &expansions {
let mut merged_props = serde_json::Map::new();
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(props) = guard.vertex_properties.get(target_vid) {
for (k, v) in props.iter() {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.to_string(), json_val);
}
}
}
if merged_props.is_empty() {
builder.append_null();
} else {
let json = serde_json::Value::Object(merged_props);
match encode_cypher_value(&json) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
columns.push(Arc::new(builder.finish()));
} else {
let mut builder = arrow_array::builder::LargeBinaryBuilder::new();
for (_, target_vid, _, _, _) in &expansions {
let mut found = false;
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(props) = guard.vertex_properties.get(target_vid)
&& let Some(val) = props.get(prop_name.as_str())
&& !val.is_null()
{
let json_val: serde_json::Value = val.clone().into();
if let Ok(bytes) = encode_cypher_value(&json_val) {
builder.append_value(bytes);
found = true;
break;
}
}
}
if !found {
builder.append_null();
}
}
columns.push(Arc::new(builder.finish()));
}
}
}
let matched_batch =
RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)?;
if self.optional {
let unmatched = collect_unmatched_optional_group_rows(
input,
&matched_rows,
&self.schema,
&self.optional_pattern_vars,
)?;
if unmatched.is_empty() {
return Ok(matched_batch);
}
let unmatched_batch = build_optional_null_batch_for_rows_with_optional_vars(
input,
&unmatched,
&self.schema,
&self.optional_pattern_vars,
)?;
use arrow::compute::concat_batches;
concat_batches(&self.schema, &[matched_batch, unmatched_batch]).map_err(arrow_err)
} else {
Ok(matched_batch)
}
}
}
async fn build_edge_adjacency_map(
graph_ctx: &GraphExecutionContext,
type_names: &[String],
direction: Direction,
) -> DFResult<EdgeAdjacencyMap> {
let storage = graph_ctx.storage();
let l0_ctx = graph_ctx.l0_context();
let type_refs: Vec<&str> = type_names.iter().map(|s| s.as_str()).collect();
let edges_with_type = storage
.find_edges_by_type_names(&type_refs)
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let mut edges: Vec<(
uni_common::Eid,
uni_common::Vid,
uni_common::Vid,
String,
uni_common::Properties,
)> = edges_with_type.into_iter().collect();
for l0 in l0_ctx.iter_l0_buffers() {
let l0_guard = l0.read();
for type_name in type_names {
let l0_eids = l0_guard.eids_for_type(type_name);
for &eid in &l0_eids {
if let Some(edge_ref) = l0_guard.graph.edge(eid) {
let src_vid = edge_ref.src_vid;
let dst_vid = edge_ref.dst_vid;
let props = l0_guard
.edge_properties
.get(&eid)
.cloned()
.unwrap_or_default();
edges.push((eid, src_vid, dst_vid, type_name.clone(), props));
}
}
}
}
let mut seen_eids = HashSet::new();
let mut unique_edges = Vec::new();
for edge in edges.into_iter().rev() {
if seen_eids.insert(edge.0) {
unique_edges.push(edge);
}
}
unique_edges.reverse();
let mut tombstoned_eids = HashSet::new();
for l0 in l0_ctx.iter_l0_buffers() {
let l0_guard = l0.read();
for eid in l0_guard.tombstones.keys() {
tombstoned_eids.insert(*eid);
}
}
if !tombstoned_eids.is_empty() {
unique_edges.retain(|edge| !tombstoned_eids.contains(&edge.0));
}
let mut adjacency: EdgeAdjacencyMap = HashMap::new();
for (eid, src_vid, dst_vid, edge_type, props) in unique_edges {
match direction {
Direction::Outgoing => {
adjacency
.entry(src_vid)
.or_default()
.push((dst_vid, eid, edge_type, props));
}
Direction::Incoming => {
adjacency
.entry(dst_vid)
.or_default()
.push((src_vid, eid, edge_type, props));
}
Direction::Both => {
adjacency.entry(src_vid).or_default().push((
dst_vid,
eid,
edge_type.clone(),
props.clone(),
));
adjacency
.entry(dst_vid)
.or_default()
.push((src_vid, eid, edge_type, props));
}
}
}
Ok(adjacency)
}
impl Stream for GraphTraverseMainStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let state = std::mem::replace(&mut self.state, GraphTraverseMainState::Done);
match state {
GraphTraverseMainState::LoadingEdges {
mut future,
input_stream,
} => match future.as_mut().poll(cx) {
Poll::Ready(Ok(adjacency)) => {
self.state = GraphTraverseMainState::Processing {
adjacency,
input_stream,
};
}
Poll::Ready(Err(e)) => {
self.state = GraphTraverseMainState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = GraphTraverseMainState::LoadingEdges {
future,
input_stream,
};
return Poll::Pending;
}
},
GraphTraverseMainState::Processing {
adjacency,
mut input_stream,
} => {
if let Err(e) = self.graph_ctx.check_timeout() {
return Poll::Ready(Some(Err(
datafusion::error::DataFusionError::Execution(e.to_string()),
)));
}
match input_stream.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let result = self.expand_batch(&batch, &adjacency);
self.state = GraphTraverseMainState::Processing {
adjacency,
input_stream,
};
if let Ok(ref r) = result {
self.metrics.record_output(r.num_rows());
}
return Poll::Ready(Some(result));
}
Poll::Ready(Some(Err(e))) => {
self.state = GraphTraverseMainState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
self.state = GraphTraverseMainState::Done;
return Poll::Ready(None);
}
Poll::Pending => {
self.state = GraphTraverseMainState::Processing {
adjacency,
input_stream,
};
return Poll::Pending;
}
}
}
GraphTraverseMainState::Done => {
return Poll::Ready(None);
}
}
}
}
}
impl RecordBatchStream for GraphTraverseMainStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
pub struct GraphVariableLengthTraverseExec {
input: Arc<dyn ExecutionPlan>,
source_column: String,
edge_type_ids: Vec<u32>,
direction: Direction,
min_hops: usize,
max_hops: usize,
target_variable: String,
step_variable: Option<String>,
path_variable: Option<String>,
target_properties: Vec<String>,
target_label_name: Option<String>,
is_optional: bool,
bound_target_column: Option<String>,
edge_lance_filter: Option<String>,
edge_property_conditions: Vec<(String, UniValue)>,
used_edge_columns: Vec<String>,
path_mode: super::nfa::PathMode,
output_mode: super::nfa::VlpOutputMode,
nfa: Arc<PathNfa>,
graph_ctx: Arc<GraphExecutionContext>,
schema: SchemaRef,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl fmt::Debug for GraphVariableLengthTraverseExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GraphVariableLengthTraverseExec")
.field("source_column", &self.source_column)
.field("edge_type_ids", &self.edge_type_ids)
.field("direction", &self.direction)
.field("min_hops", &self.min_hops)
.field("max_hops", &self.max_hops)
.field("target_variable", &self.target_variable)
.finish()
}
}
impl GraphVariableLengthTraverseExec {
#[expect(clippy::too_many_arguments)]
pub fn new(
input: Arc<dyn ExecutionPlan>,
source_column: impl Into<String>,
edge_type_ids: Vec<u32>,
direction: Direction,
min_hops: usize,
max_hops: usize,
target_variable: impl Into<String>,
step_variable: Option<String>,
path_variable: Option<String>,
target_properties: Vec<String>,
target_label_name: Option<String>,
graph_ctx: Arc<GraphExecutionContext>,
is_optional: bool,
bound_target_column: Option<String>,
edge_lance_filter: Option<String>,
edge_property_conditions: Vec<(String, UniValue)>,
used_edge_columns: Vec<String>,
path_mode: super::nfa::PathMode,
output_mode: super::nfa::VlpOutputMode,
qpp_nfa: Option<PathNfa>,
) -> Self {
let source_column = source_column.into();
let target_variable = target_variable.into();
let uni_schema = graph_ctx.storage().schema_manager().schema();
let label_props = target_label_name
.as_deref()
.and_then(|ln| uni_schema.properties.get(ln));
let schema = Self::build_schema(
input.schema(),
&target_variable,
step_variable.as_deref(),
path_variable.as_deref(),
&target_properties,
label_props,
);
let properties = compute_plan_properties(schema.clone());
let nfa = Arc::new(qpp_nfa.unwrap_or_else(|| {
PathNfa::from_vlp(edge_type_ids.clone(), direction, min_hops, max_hops)
}));
Self {
input,
source_column,
edge_type_ids,
direction,
min_hops,
max_hops,
target_variable,
step_variable,
path_variable,
target_properties,
target_label_name,
is_optional,
bound_target_column,
edge_lance_filter,
edge_property_conditions,
used_edge_columns,
path_mode,
output_mode,
nfa,
graph_ctx,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn build_schema(
input_schema: SchemaRef,
target_variable: &str,
step_variable: Option<&str>,
path_variable: Option<&str>,
target_properties: &[String],
label_props: Option<
&std::collections::HashMap<String, uni_common::core::schema::PropertyMeta>,
>,
) -> SchemaRef {
let mut fields: Vec<Field> = input_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
let target_vid_name = format!("{}._vid", target_variable);
if input_schema.column_with_name(&target_vid_name).is_none() {
fields.push(Field::new(target_vid_name, DataType::UInt64, true));
}
let target_labels_name = format!("{}._labels", target_variable);
if input_schema.column_with_name(&target_labels_name).is_none() {
fields.push(Field::new(target_labels_name, labels_data_type(), true));
}
for prop_name in target_properties {
let col_name = format!("{}.{}", target_variable, prop_name);
if input_schema.column_with_name(&col_name).is_none() {
let arrow_type = resolve_property_type(prop_name, label_props);
fields.push(Field::new(&col_name, arrow_type, true));
}
}
fields.push(Field::new("_hop_count", DataType::UInt64, false));
if let Some(step_var) = step_variable {
fields.push(build_edge_list_field(step_var));
}
if let Some(path_var) = path_variable
&& input_schema.column_with_name(path_var).is_none()
{
fields.push(build_path_struct_field(path_var));
}
Arc::new(Schema::new(fields))
}
}
impl DisplayAs for GraphVariableLengthTraverseExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"GraphVariableLengthTraverseExec: {} --[{:?}*{}..{}]--> target",
self.source_column, self.edge_type_ids, self.min_hops, self.max_hops
)
}
}
impl ExecutionPlan for GraphVariableLengthTraverseExec {
fn name(&self) -> &str {
"GraphVariableLengthTraverseExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(datafusion::error::DataFusionError::Plan(
"GraphVariableLengthTraverseExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
children[0].clone(),
self.source_column.clone(),
self.edge_type_ids.clone(),
self.direction,
self.min_hops,
self.max_hops,
self.target_variable.clone(),
self.step_variable.clone(),
self.path_variable.clone(),
self.target_properties.clone(),
self.target_label_name.clone(),
self.graph_ctx.clone(),
self.is_optional,
self.bound_target_column.clone(),
self.edge_lance_filter.clone(),
self.edge_property_conditions.clone(),
self.used_edge_columns.clone(),
self.path_mode.clone(),
self.output_mode.clone(),
Some((*self.nfa).clone()),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let metrics = BaselineMetrics::new(&self.metrics, partition);
let warm_fut = self
.graph_ctx
.warming_future(self.edge_type_ids.clone(), self.direction);
Ok(Box::pin(GraphVariableLengthTraverseStream {
input: input_stream,
exec: Arc::new(self.clone_for_stream()),
schema: self.schema.clone(),
state: VarLengthStreamState::Warming(warm_fut),
metrics,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
impl GraphVariableLengthTraverseExec {
fn clone_for_stream(&self) -> GraphVariableLengthTraverseExecData {
GraphVariableLengthTraverseExecData {
source_column: self.source_column.clone(),
edge_type_ids: self.edge_type_ids.clone(),
direction: self.direction,
min_hops: self.min_hops,
max_hops: self.max_hops,
target_variable: self.target_variable.clone(),
step_variable: self.step_variable.clone(),
path_variable: self.path_variable.clone(),
target_properties: self.target_properties.clone(),
target_label_name: self.target_label_name.clone(),
is_optional: self.is_optional,
bound_target_column: self.bound_target_column.clone(),
edge_lance_filter: self.edge_lance_filter.clone(),
edge_property_conditions: self.edge_property_conditions.clone(),
used_edge_columns: self.used_edge_columns.clone(),
path_mode: self.path_mode.clone(),
output_mode: self.output_mode.clone(),
nfa: self.nfa.clone(),
graph_ctx: self.graph_ctx.clone(),
}
}
}
#[expect(
dead_code,
reason = "Fields accessed via NFA; kept for with_new_children reconstruction"
)]
struct GraphVariableLengthTraverseExecData {
source_column: String,
edge_type_ids: Vec<u32>,
direction: Direction,
min_hops: usize,
max_hops: usize,
target_variable: String,
step_variable: Option<String>,
path_variable: Option<String>,
target_properties: Vec<String>,
target_label_name: Option<String>,
is_optional: bool,
bound_target_column: Option<String>,
#[expect(dead_code, reason = "Used in Phase 3 warming")]
edge_lance_filter: Option<String>,
edge_property_conditions: Vec<(String, UniValue)>,
used_edge_columns: Vec<String>,
path_mode: super::nfa::PathMode,
output_mode: super::nfa::VlpOutputMode,
nfa: Arc<PathNfa>,
graph_ctx: Arc<GraphExecutionContext>,
}
const MAX_FRONTIER_SIZE: usize = 500_000;
const MAX_PRED_POOL_SIZE: usize = 2_000_000;
impl GraphVariableLengthTraverseExecData {
fn check_target_label(&self, vid: Vid) -> bool {
if let Some(ref label_name) = self.target_label_name {
let query_ctx = self.graph_ctx.query_context();
match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
Some(labels) => labels.contains(label_name),
None => true, }
} else {
true
}
}
fn check_state_constraint(&self, vid: Vid, constraint: &super::nfa::VertexConstraint) -> bool {
match constraint {
super::nfa::VertexConstraint::Label(label_name) => {
let query_ctx = self.graph_ctx.query_context();
match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
Some(labels) => labels.contains(label_name),
None => true, }
}
}
}
fn expand_neighbors(
&self,
vid: Vid,
state: NfaStateId,
eid_filter: &EidFilter,
used_eids: &FxHashSet<u64>,
) -> Vec<(Vid, Eid, NfaStateId)> {
let is_undirected = matches!(self.direction, Direction::Both);
let mut results = Vec::new();
for transition in self.nfa.transitions_from(state) {
let mut seen_edges: FxHashSet<u64> = FxHashSet::default();
for &etype in &transition.edge_type_ids {
for (neighbor, eid) in
self.graph_ctx
.get_neighbors(vid, etype, transition.direction)
{
if is_undirected && !seen_edges.insert(eid.as_u64()) {
continue;
}
if !eid_filter.contains(eid) {
continue;
}
if !self.edge_property_conditions.is_empty() {
let query_ctx = self.graph_ctx.query_context();
let passes = if let Some(props) =
l0_visibility::accumulate_edge_props(eid, Some(&query_ctx))
{
self.edge_property_conditions
.iter()
.all(|(name, expected)| {
props.get(name).is_some_and(|actual| actual == expected)
})
} else {
true
};
if !passes {
continue;
}
}
if used_eids.contains(&eid.as_u64()) {
continue;
}
if let Some(constraint) = self.nfa.state_constraint(transition.to)
&& !self.check_state_constraint(neighbor, constraint)
{
continue;
}
results.push((neighbor, eid, transition.to));
}
}
}
results
}
fn bfs_with_dag(
&self,
source: Vid,
eid_filter: &EidFilter,
used_eids: &FxHashSet<u64>,
vid_filter: &VidFilter,
) -> Vec<BfsResult> {
let nfa = &self.nfa;
let selector = PathSelector::All;
let mut dag = PredecessorDag::new(selector);
let mut accepting: Vec<(Vid, NfaStateId, u32)> = Vec::new();
if nfa.is_accepting(nfa.start_state())
&& self.check_target_label(source)
&& vid_filter.contains(source)
{
accepting.push((source, nfa.start_state(), 0));
}
let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
let mut depth: u32 = 0;
while !frontier.is_empty() && depth < self.max_hops as u32 {
depth += 1;
let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
for &(vid, state) in &frontier {
for (neighbor, eid, dst_state) in
self.expand_neighbors(vid, state, eid_filter, used_eids)
{
dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
if seen_at_depth.insert((neighbor, dst_state)) {
next_frontier.push((neighbor, dst_state));
if nfa.is_accepting(dst_state)
&& self.check_target_label(neighbor)
&& vid_filter.contains(neighbor)
{
accepting.push((neighbor, dst_state, depth));
}
}
}
}
if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
break;
}
frontier = next_frontier;
}
let mut results: Vec<BfsResult> = Vec::new();
for &(target, state, depth) in &accepting {
dag.enumerate_paths(
source,
target,
state,
depth,
depth,
&self.path_mode,
&mut |nodes, edges| {
results.push((target, depth as usize, nodes.to_vec(), edges.to_vec()));
std::ops::ControlFlow::Continue(())
},
);
}
results
}
fn bfs_endpoints_only(
&self,
source: Vid,
eid_filter: &EidFilter,
used_eids: &FxHashSet<u64>,
vid_filter: &VidFilter,
) -> Vec<(Vid, u32)> {
let nfa = &self.nfa;
let selector = PathSelector::Any; let mut dag = PredecessorDag::new(selector);
let mut results: Vec<(Vid, u32)> = Vec::new();
if nfa.is_accepting(nfa.start_state())
&& self.check_target_label(source)
&& vid_filter.contains(source)
{
results.push((source, 0));
}
let mut frontier: Vec<(Vid, NfaStateId)> = vec![(source, nfa.start_state())];
let mut depth: u32 = 0;
while !frontier.is_empty() && depth < self.max_hops as u32 {
depth += 1;
let mut next_frontier: Vec<(Vid, NfaStateId)> = Vec::new();
let mut seen_at_depth: FxHashSet<(Vid, NfaStateId)> = FxHashSet::default();
for &(vid, state) in &frontier {
for (neighbor, eid, dst_state) in
self.expand_neighbors(vid, state, eid_filter, used_eids)
{
dag.add_predecessor(neighbor, dst_state, vid, state, eid, depth);
if seen_at_depth.insert((neighbor, dst_state)) {
next_frontier.push((neighbor, dst_state));
if nfa.is_accepting(dst_state)
&& self.check_target_label(neighbor)
&& vid_filter.contains(neighbor)
&& dag.has_trail_valid_path(source, neighbor, dst_state, depth, depth)
{
results.push((neighbor, depth));
}
}
}
}
if next_frontier.len() > MAX_FRONTIER_SIZE || dag.pool_len() > MAX_PRED_POOL_SIZE {
break;
}
frontier = next_frontier;
}
results
}
}
enum VarLengthStreamState {
Warming(Pin<Box<dyn std::future::Future<Output = DFResult<()>> + Send>>),
Reading,
Materializing(Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>),
Done,
}
struct GraphVariableLengthTraverseStream {
input: SendableRecordBatchStream,
exec: Arc<GraphVariableLengthTraverseExecData>,
schema: SchemaRef,
state: VarLengthStreamState,
metrics: BaselineMetrics,
}
impl Stream for GraphVariableLengthTraverseStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let state = std::mem::replace(&mut self.state, VarLengthStreamState::Done);
match state {
VarLengthStreamState::Warming(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(())) => {
self.state = VarLengthStreamState::Reading;
}
Poll::Ready(Err(e)) => {
self.state = VarLengthStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = VarLengthStreamState::Warming(fut);
return Poll::Pending;
}
},
VarLengthStreamState::Reading => {
if let Err(e) = self.exec.graph_ctx.check_timeout() {
return Poll::Ready(Some(Err(
datafusion::error::DataFusionError::Execution(e.to_string()),
)));
}
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let eid_filter = EidFilter::AllAllowed;
let vid_filter = VidFilter::AllAllowed;
let base_result =
self.process_batch_base(batch, &eid_filter, &vid_filter);
let base_batch = match base_result {
Ok(b) => b,
Err(e) => {
self.state = VarLengthStreamState::Reading;
return Poll::Ready(Some(Err(e)));
}
};
if self.exec.target_properties.is_empty() {
self.state = VarLengthStreamState::Reading;
return Poll::Ready(Some(Ok(base_batch)));
}
let schema = self.schema.clone();
let target_variable = self.exec.target_variable.clone();
let target_properties = self.exec.target_properties.clone();
let target_label_name = self.exec.target_label_name.clone();
let graph_ctx = self.exec.graph_ctx.clone();
let fut = hydrate_vlp_target_properties(
base_batch,
schema,
target_variable,
target_properties,
target_label_name,
graph_ctx,
);
self.state = VarLengthStreamState::Materializing(Box::pin(fut));
}
Poll::Ready(Some(Err(e))) => {
self.state = VarLengthStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
self.state = VarLengthStreamState::Done;
return Poll::Ready(None);
}
Poll::Pending => {
self.state = VarLengthStreamState::Reading;
return Poll::Pending;
}
}
}
VarLengthStreamState::Materializing(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(batch)) => {
self.state = VarLengthStreamState::Reading;
self.metrics.record_output(batch.num_rows());
return Poll::Ready(Some(Ok(batch)));
}
Poll::Ready(Err(e)) => {
self.state = VarLengthStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = VarLengthStreamState::Materializing(fut);
return Poll::Pending;
}
},
VarLengthStreamState::Done => {
return Poll::Ready(None);
}
}
}
}
}
impl GraphVariableLengthTraverseStream {
fn process_batch_base(
&self,
batch: RecordBatch,
eid_filter: &EidFilter,
vid_filter: &VidFilter,
) -> DFResult<RecordBatch> {
let source_col = batch
.column_by_name(&self.exec.source_column)
.ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"Source column '{}' not found",
self.exec.source_column
))
})?;
let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
let source_vids: &UInt64Array = &source_vid_cow;
let bound_target_cow = self
.exec
.bound_target_column
.as_ref()
.and_then(|col| batch.column_by_name(col))
.map(|c| column_as_vid_array(c.as_ref()))
.transpose()?;
let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
let used_edge_arrays: Vec<&UInt64Array> = self
.exec
.used_edge_columns
.iter()
.filter_map(|col| {
batch
.column_by_name(col)?
.as_any()
.downcast_ref::<UInt64Array>()
})
.collect();
let mut expansions: Vec<VarLengthExpansion> = Vec::new();
for (row_idx, source_vid) in source_vids.iter().enumerate() {
let mut emitted_for_row = false;
if let Some(src) = source_vid {
let vid = Vid::from(src);
let used_eids: FxHashSet<u64> = used_edge_arrays
.iter()
.filter_map(|arr| {
if arr.is_null(row_idx) {
None
} else {
Some(arr.value(row_idx))
}
})
.collect();
match &self.exec.output_mode {
VlpOutputMode::EndpointsOnly => {
let endpoints = self
.exec
.bfs_endpoints_only(vid, eid_filter, &used_eids, vid_filter);
for (target, depth) in endpoints {
if let Some(targets) = expected_targets {
if targets.is_null(row_idx) {
continue;
}
if target.as_u64() != targets.value(row_idx) {
continue;
}
}
expansions.push((row_idx, target, depth as usize, vec![], vec![]));
emitted_for_row = true;
}
}
_ => {
let bfs_results = self
.exec
.bfs_with_dag(vid, eid_filter, &used_eids, vid_filter);
for (target, hop_count, node_path, edge_path) in bfs_results {
if let Some(targets) = expected_targets {
if targets.is_null(row_idx) {
continue;
}
if target.as_u64() != targets.value(row_idx) {
continue;
}
}
expansions.push((row_idx, target, hop_count, node_path, edge_path));
emitted_for_row = true;
}
}
}
}
if self.exec.is_optional && !emitted_for_row {
expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
}
}
self.build_output_batch(&batch, &expansions)
}
fn build_output_batch(
&self,
input: &RecordBatch,
expansions: &[VarLengthExpansion],
) -> DFResult<RecordBatch> {
if expansions.is_empty() {
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
let num_rows = expansions.len();
let indices: Vec<u64> = expansions
.iter()
.map(|(idx, _, _, _, _)| *idx as u64)
.collect();
let indices_array = UInt64Array::from(indices);
let mut columns: Vec<ArrayRef> = Vec::new();
for col in input.columns() {
let expanded = take(col.as_ref(), &indices_array, None)?;
columns.push(expanded);
}
let unmatched_rows: Vec<bool> = expansions
.iter()
.map(|(_, vid, _, _, _)| vid.as_u64() == u64::MAX)
.collect();
let target_vids: Vec<Option<u64>> = expansions
.iter()
.zip(unmatched_rows.iter())
.map(
|((_, vid, _, _, _), unmatched)| {
if *unmatched { None } else { Some(vid.as_u64()) }
},
)
.collect();
let target_vid_name = format!("{}._vid", self.exec.target_variable);
if input.schema().column_with_name(&target_vid_name).is_none() {
columns.push(Arc::new(UInt64Array::from(target_vids.clone())));
}
let target_labels_name = format!("{}._labels", self.exec.target_variable);
if input
.schema()
.column_with_name(&target_labels_name)
.is_none()
{
use arrow_array::builder::{ListBuilder, StringBuilder};
let query_ctx = self.exec.graph_ctx.query_context();
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for target_vid in &target_vids {
let Some(vid_u64) = target_vid else {
labels_builder.append(false);
continue;
};
let vid = Vid::from(*vid_u64);
let row_labels: Vec<String> =
match l0_visibility::get_vertex_labels_optional(vid, &query_ctx) {
Some(labels) => {
labels
}
None => {
if let Some(ref label_name) = self.exec.target_label_name {
vec![label_name.clone()]
} else {
vec![]
}
}
};
let values = labels_builder.values();
for lbl in &row_labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
columns.push(Arc::new(labels_builder.finish()));
}
for prop_name in &self.exec.target_properties {
let full_prop_name = format!("{}.{}", self.exec.target_variable, prop_name);
if input.schema().column_with_name(&full_prop_name).is_none() {
let col_idx = columns.len();
if col_idx < self.schema.fields().len() {
let field = self.schema.field(col_idx);
columns.push(arrow_array::new_null_array(field.data_type(), num_rows));
}
}
}
let hop_counts: Vec<u64> = expansions
.iter()
.map(|(_, _, hops, _, _)| *hops as u64)
.collect();
columns.push(Arc::new(UInt64Array::from(hop_counts)));
if self.exec.step_variable.is_some() {
let mut edges_builder = new_edge_list_builder();
let query_ctx = self.exec.graph_ctx.query_context();
for (_, _, _, node_path, edge_path) in expansions {
if node_path.is_empty() && edge_path.is_empty() {
edges_builder.append_null();
} else if edge_path.is_empty() {
edges_builder.append(true);
} else {
for (i, eid) in edge_path.iter().enumerate() {
let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
.unwrap_or_else(|| "UNKNOWN".to_string());
append_edge_to_struct(
edges_builder.values(),
*eid,
&type_name,
node_path[i].as_u64(),
node_path[i + 1].as_u64(),
&query_ctx,
);
}
edges_builder.append(true);
}
}
columns.push(Arc::new(edges_builder.finish()));
}
if let Some(path_var_name) = &self.exec.path_variable {
let existing_path_col_idx = input
.schema()
.column_with_name(path_var_name)
.map(|(idx, _)| idx);
let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
let existing_path = existing_path_arc
.as_ref()
.and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
let mut nodes_builder = new_node_list_builder();
let mut rels_builder = new_edge_list_builder();
let query_ctx = self.exec.graph_ctx.query_context();
let mut path_validity = Vec::with_capacity(expansions.len());
for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
if node_path.is_empty() && edge_path.is_empty() {
nodes_builder.append(false);
rels_builder.append(false);
path_validity.push(false);
continue;
}
let skip_first_vlp_node = if let Some(existing) = existing_path {
if !existing.is_null(row_out_idx) {
prepend_existing_path(
existing,
row_out_idx,
&mut nodes_builder,
&mut rels_builder,
&query_ctx,
);
true
} else {
false
}
} else {
false
};
let start_idx = if skip_first_vlp_node { 1 } else { 0 };
for vid in &node_path[start_idx..] {
append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
}
nodes_builder.append(true);
for (i, eid) in edge_path.iter().enumerate() {
let type_name = l0_visibility::get_edge_type(*eid, &query_ctx)
.unwrap_or_else(|| "UNKNOWN".to_string());
append_edge_to_struct(
rels_builder.values(),
*eid,
&type_name,
node_path[i].as_u64(),
node_path[i + 1].as_u64(),
&query_ctx,
);
}
rels_builder.append(true);
path_validity.push(true);
}
let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
let rels_field = Arc::new(Field::new(
"relationships",
rels_array.data_type().clone(),
true,
));
let path_struct = arrow_array::StructArray::try_new(
vec![nodes_field, rels_field].into(),
vec![nodes_array, rels_array],
Some(arrow::buffer::NullBuffer::from(path_validity)),
)
.map_err(arrow_err)?;
if let Some(idx) = existing_path_col_idx {
columns[idx] = Arc::new(path_struct);
} else {
columns.push(Arc::new(path_struct));
}
}
self.metrics.record_output(num_rows);
RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
}
}
impl RecordBatchStream for GraphVariableLengthTraverseStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
async fn hydrate_vlp_target_properties(
base_batch: RecordBatch,
schema: SchemaRef,
target_variable: String,
target_properties: Vec<String>,
target_label_name: Option<String>,
graph_ctx: Arc<GraphExecutionContext>,
) -> DFResult<RecordBatch> {
if base_batch.num_rows() == 0 || target_properties.is_empty() {
return Ok(base_batch);
}
let target_vid_col_name = format!("{}._vid", target_variable);
let vid_col_idx = schema
.fields()
.iter()
.enumerate()
.rev()
.find(|(_, f)| f.name() == &target_vid_col_name)
.map(|(i, _)| i);
let Some(vid_col_idx) = vid_col_idx else {
return Ok(base_batch);
};
let vid_col = base_batch.column(vid_col_idx);
let target_vid_cow = column_as_vid_array(vid_col.as_ref())?;
let target_vid_array: &UInt64Array = &target_vid_cow;
let target_vids: Vec<Vid> = target_vid_array
.iter()
.map(|v| Vid::from(v.unwrap_or(u64::MAX)))
.collect();
let mut property_columns: Vec<ArrayRef> = Vec::new();
if let Some(ref label_name) = target_label_name {
let property_manager = graph_ctx.property_manager();
let query_ctx = graph_ctx.query_context();
let props_map = property_manager
.get_batch_vertex_props_for_label(&target_vids, label_name, Some(&query_ctx))
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?;
let uni_schema = graph_ctx.storage().schema_manager().schema();
let label_props = uni_schema.properties.get(label_name.as_str());
for prop_name in &target_properties {
let data_type = resolve_property_type(prop_name, label_props);
let column =
build_property_column_static(&target_vids, &props_map, prop_name, &data_type)?;
property_columns.push(column);
}
} else {
let non_internal_props: Vec<&str> = target_properties
.iter()
.filter(|p| *p != "_all_props")
.map(|s| s.as_str())
.collect();
let property_manager = graph_ctx.property_manager();
let query_ctx = graph_ctx.query_context();
let props_map = if !non_internal_props.is_empty() {
property_manager
.get_batch_vertex_props(&target_vids, &non_internal_props, Some(&query_ctx))
.await
.map_err(|e| datafusion::error::DataFusionError::Execution(e.to_string()))?
} else {
std::collections::HashMap::new()
};
for prop_name in &target_properties {
if prop_name == "_all_props" {
use crate::query::df_graph::scan::encode_cypher_value;
use arrow_array::builder::LargeBinaryBuilder;
let mut builder = LargeBinaryBuilder::new();
let l0_ctx = graph_ctx.l0_context();
for vid in &target_vids {
let mut merged_props = serde_json::Map::new();
if let Some(vid_props) = props_map.get(vid) {
for (k, v) in vid_props.iter() {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.to_string(), json_val);
}
}
for l0 in l0_ctx.iter_l0_buffers() {
let guard = l0.read();
if let Some(l0_props) = guard.vertex_properties.get(vid) {
for (k, v) in l0_props.iter() {
let json_val: serde_json::Value = v.clone().into();
merged_props.insert(k.to_string(), json_val);
}
}
}
if merged_props.is_empty() {
builder.append_null();
} else {
let json = serde_json::Value::Object(merged_props);
match encode_cypher_value(&json) {
Ok(bytes) => builder.append_value(bytes),
Err(_) => builder.append_null(),
}
}
}
property_columns.push(Arc::new(builder.finish()));
} else {
let column = build_property_column_static(
&target_vids,
&props_map,
prop_name,
&arrow::datatypes::DataType::LargeBinary,
)?;
property_columns.push(column);
}
}
}
let mut new_columns: Vec<ArrayRef> = Vec::with_capacity(schema.fields().len());
let mut prop_idx = 0;
for (col_idx, field) in schema.fields().iter().enumerate() {
let is_target_prop = col_idx > vid_col_idx
&& target_properties
.iter()
.any(|p| *field.name() == format!("{}.{}", target_variable, p));
if is_target_prop && prop_idx < property_columns.len() {
new_columns.push(property_columns[prop_idx].clone());
prop_idx += 1;
} else {
new_columns.push(base_batch.column(col_idx).clone());
}
}
RecordBatch::try_new(schema, new_columns).map_err(arrow_err)
}
pub struct GraphVariableLengthTraverseMainExec {
input: Arc<dyn ExecutionPlan>,
source_column: String,
type_names: Vec<String>,
direction: Direction,
min_hops: usize,
max_hops: usize,
target_variable: String,
step_variable: Option<String>,
path_variable: Option<String>,
target_properties: Vec<String>,
is_optional: bool,
bound_target_column: Option<String>,
edge_lance_filter: Option<String>,
edge_property_conditions: Vec<(String, UniValue)>,
used_edge_columns: Vec<String>,
path_mode: super::nfa::PathMode,
output_mode: super::nfa::VlpOutputMode,
graph_ctx: Arc<GraphExecutionContext>,
schema: SchemaRef,
properties: PlanProperties,
metrics: ExecutionPlanMetricsSet,
}
impl fmt::Debug for GraphVariableLengthTraverseMainExec {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("GraphVariableLengthTraverseMainExec")
.field("source_column", &self.source_column)
.field("type_names", &self.type_names)
.field("direction", &self.direction)
.field("min_hops", &self.min_hops)
.field("max_hops", &self.max_hops)
.field("target_variable", &self.target_variable)
.finish()
}
}
impl GraphVariableLengthTraverseMainExec {
#[expect(clippy::too_many_arguments)]
pub fn new(
input: Arc<dyn ExecutionPlan>,
source_column: impl Into<String>,
type_names: Vec<String>,
direction: Direction,
min_hops: usize,
max_hops: usize,
target_variable: impl Into<String>,
step_variable: Option<String>,
path_variable: Option<String>,
target_properties: Vec<String>,
graph_ctx: Arc<GraphExecutionContext>,
is_optional: bool,
bound_target_column: Option<String>,
edge_lance_filter: Option<String>,
edge_property_conditions: Vec<(String, UniValue)>,
used_edge_columns: Vec<String>,
path_mode: super::nfa::PathMode,
output_mode: super::nfa::VlpOutputMode,
) -> Self {
let source_column = source_column.into();
let target_variable = target_variable.into();
let schema = Self::build_schema(
input.schema(),
&target_variable,
step_variable.as_deref(),
path_variable.as_deref(),
&target_properties,
);
let properties = compute_plan_properties(schema.clone());
Self {
input,
source_column,
type_names,
direction,
min_hops,
max_hops,
target_variable,
step_variable,
path_variable,
target_properties,
is_optional,
bound_target_column,
edge_lance_filter,
edge_property_conditions,
used_edge_columns,
path_mode,
output_mode,
graph_ctx,
schema,
properties,
metrics: ExecutionPlanMetricsSet::new(),
}
}
fn build_schema(
input_schema: SchemaRef,
target_variable: &str,
step_variable: Option<&str>,
path_variable: Option<&str>,
target_properties: &[String],
) -> SchemaRef {
let mut fields: Vec<Field> = input_schema
.fields()
.iter()
.map(|f| f.as_ref().clone())
.collect();
let target_vid_name = format!("{}._vid", target_variable);
if input_schema.column_with_name(&target_vid_name).is_none() {
fields.push(Field::new(target_vid_name, DataType::UInt64, true));
}
let target_labels_name = format!("{}._labels", target_variable);
if input_schema.column_with_name(&target_labels_name).is_none() {
fields.push(Field::new(target_labels_name, labels_data_type(), true));
}
fields.push(Field::new("_hop_count", DataType::UInt64, false));
if let Some(step_var) = step_variable {
fields.push(build_edge_list_field(step_var));
}
if let Some(path_var) = path_variable
&& input_schema.column_with_name(path_var).is_none()
{
fields.push(build_path_struct_field(path_var));
}
for prop in target_properties {
let prop_name = format!("{}.{}", target_variable, prop);
if input_schema.column_with_name(&prop_name).is_none() {
fields.push(Field::new(prop_name, DataType::LargeBinary, true));
}
}
Arc::new(Schema::new(fields))
}
}
impl DisplayAs for GraphVariableLengthTraverseMainExec {
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"GraphVariableLengthTraverseMainExec: {} --[{:?}*{}..{}]--> target",
self.source_column, self.type_names, self.min_hops, self.max_hops
)
}
}
impl ExecutionPlan for GraphVariableLengthTraverseMainExec {
fn name(&self) -> &str {
"GraphVariableLengthTraverseMainExec"
}
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn properties(&self) -> &PlanProperties {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![&self.input]
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> DFResult<Arc<dyn ExecutionPlan>> {
if children.len() != 1 {
return Err(datafusion::error::DataFusionError::Plan(
"GraphVariableLengthTraverseMainExec requires exactly one child".to_string(),
));
}
Ok(Arc::new(Self::new(
children[0].clone(),
self.source_column.clone(),
self.type_names.clone(),
self.direction,
self.min_hops,
self.max_hops,
self.target_variable.clone(),
self.step_variable.clone(),
self.path_variable.clone(),
self.target_properties.clone(),
self.graph_ctx.clone(),
self.is_optional,
self.bound_target_column.clone(),
self.edge_lance_filter.clone(),
self.edge_property_conditions.clone(),
self.used_edge_columns.clone(),
self.path_mode.clone(),
self.output_mode.clone(),
)))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let input_stream = self.input.execute(partition, context)?;
let metrics = BaselineMetrics::new(&self.metrics, partition);
let graph_ctx = self.graph_ctx.clone();
let type_names = self.type_names.clone();
let direction = self.direction;
let load_fut =
async move { build_edge_adjacency_map(&graph_ctx, &type_names, direction).await };
Ok(Box::pin(GraphVariableLengthTraverseMainStream {
input: input_stream,
source_column: self.source_column.clone(),
type_names: self.type_names.clone(),
direction: self.direction,
min_hops: self.min_hops,
max_hops: self.max_hops,
target_variable: self.target_variable.clone(),
step_variable: self.step_variable.clone(),
path_variable: self.path_variable.clone(),
target_properties: self.target_properties.clone(),
graph_ctx: self.graph_ctx.clone(),
is_optional: self.is_optional,
bound_target_column: self.bound_target_column.clone(),
edge_lance_filter: self.edge_lance_filter.clone(),
edge_property_conditions: self.edge_property_conditions.clone(),
used_edge_columns: self.used_edge_columns.clone(),
path_mode: self.path_mode.clone(),
output_mode: self.output_mode.clone(),
schema: self.schema.clone(),
state: VarLengthMainStreamState::Loading(Box::pin(load_fut)),
metrics,
}))
}
fn metrics(&self) -> Option<MetricsSet> {
Some(self.metrics.clone_inner())
}
}
enum VarLengthMainStreamState {
Loading(Pin<Box<dyn std::future::Future<Output = DFResult<EdgeAdjacencyMap>> + Send>>),
Processing(EdgeAdjacencyMap),
Materializing {
adjacency: EdgeAdjacencyMap,
fut: Pin<Box<dyn std::future::Future<Output = DFResult<RecordBatch>> + Send>>,
},
Done,
}
#[expect(dead_code, reason = "VLP fields used in Phase 3")]
struct GraphVariableLengthTraverseMainStream {
input: SendableRecordBatchStream,
source_column: String,
type_names: Vec<String>,
direction: Direction,
min_hops: usize,
max_hops: usize,
target_variable: String,
step_variable: Option<String>,
path_variable: Option<String>,
target_properties: Vec<String>,
graph_ctx: Arc<GraphExecutionContext>,
is_optional: bool,
bound_target_column: Option<String>,
edge_lance_filter: Option<String>,
edge_property_conditions: Vec<(String, UniValue)>,
used_edge_columns: Vec<String>,
path_mode: super::nfa::PathMode,
output_mode: super::nfa::VlpOutputMode,
schema: SchemaRef,
state: VarLengthMainStreamState,
metrics: BaselineMetrics,
}
type MainBfsResult = (Vid, usize, Vec<Vid>, Vec<Eid>);
impl GraphVariableLengthTraverseMainStream {
fn bfs(
&self,
source: Vid,
adjacency: &EdgeAdjacencyMap,
used_eids: &FxHashSet<u64>,
) -> Vec<MainBfsResult> {
let mut results = Vec::new();
let mut queue: VecDeque<MainBfsResult> = VecDeque::new();
queue.push_back((source, 0, vec![source], vec![]));
while let Some((current, depth, node_path, edge_path)) = queue.pop_front() {
if depth >= self.min_hops && depth <= self.max_hops {
results.push((current, depth, node_path.clone(), edge_path.clone()));
}
if depth >= self.max_hops {
continue;
}
if let Some(neighbors) = adjacency.get(¤t) {
let is_undirected = matches!(self.direction, Direction::Both);
let mut seen_edges_at_hop: HashSet<u64> = HashSet::new();
for (neighbor, eid, _edge_type, props) in neighbors {
if is_undirected && !seen_edges_at_hop.insert(eid.as_u64()) {
continue;
}
if edge_path.contains(eid) {
continue;
}
if used_eids.contains(&eid.as_u64()) {
continue;
}
if !self.edge_property_conditions.is_empty() {
let passes =
self.edge_property_conditions
.iter()
.all(|(name, expected)| {
props.get(name).is_some_and(|actual| actual == expected)
});
if !passes {
continue;
}
}
let mut new_node_path = node_path.clone();
new_node_path.push(*neighbor);
let mut new_edge_path = edge_path.clone();
new_edge_path.push(*eid);
queue.push_back((*neighbor, depth + 1, new_node_path, new_edge_path));
}
}
}
results
}
fn process_batch(
&self,
batch: RecordBatch,
adjacency: &EdgeAdjacencyMap,
) -> DFResult<RecordBatch> {
let source_col = batch.column_by_name(&self.source_column).ok_or_else(|| {
datafusion::error::DataFusionError::Execution(format!(
"Source column '{}' not found in input batch",
self.source_column
))
})?;
let source_vid_cow = column_as_vid_array(source_col.as_ref())?;
let source_vids: &UInt64Array = &source_vid_cow;
let bound_target_cow = self
.bound_target_column
.as_ref()
.and_then(|col| batch.column_by_name(col))
.map(|c| column_as_vid_array(c.as_ref()))
.transpose()?;
let expected_targets: Option<&UInt64Array> = bound_target_cow.as_deref();
let used_edge_arrays: Vec<&UInt64Array> = self
.used_edge_columns
.iter()
.filter_map(|col| {
batch
.column_by_name(col)?
.as_any()
.downcast_ref::<UInt64Array>()
})
.collect();
let mut expansions: Vec<ExpansionRecord> = Vec::new();
for (row_idx, source_opt) in source_vids.iter().enumerate() {
let mut emitted_for_row = false;
if let Some(source_u64) = source_opt {
let source = Vid::from(source_u64);
let used_eids: FxHashSet<u64> = used_edge_arrays
.iter()
.filter_map(|arr| {
if arr.is_null(row_idx) {
None
} else {
Some(arr.value(row_idx))
}
})
.collect();
let bfs_results = self.bfs(source, adjacency, &used_eids);
for (target, hops, node_path, edge_path) in bfs_results {
if let Some(targets) = expected_targets {
if targets.is_null(row_idx) {
continue;
}
let expected_vid = targets.value(row_idx);
if target.as_u64() != expected_vid {
continue;
}
}
expansions.push((row_idx, target, hops, node_path, edge_path));
emitted_for_row = true;
}
}
if self.is_optional && !emitted_for_row {
expansions.push((row_idx, Vid::from(u64::MAX), 0, vec![], vec![]));
}
}
if expansions.is_empty() {
if self.is_optional {
let all_indices: Vec<usize> = (0..batch.num_rows()).collect();
return build_optional_null_batch_for_rows(&batch, &all_indices, &self.schema);
}
return Ok(RecordBatch::new_empty(self.schema.clone()));
}
let num_rows = expansions.len();
self.metrics.record_output(num_rows);
let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.schema.fields().len());
for col_idx in 0..batch.num_columns() {
let array = batch.column(col_idx);
let indices: Vec<u64> = expansions
.iter()
.map(|(idx, _, _, _, _)| *idx as u64)
.collect();
let take_indices = UInt64Array::from(indices);
let expanded = arrow::compute::take(array, &take_indices, None)?;
columns.push(expanded);
}
let target_vid_name = format!("{}._vid", self.target_variable);
if batch.schema().column_with_name(&target_vid_name).is_none() {
let target_vids: Vec<Option<u64>> = expansions
.iter()
.map(|(_, vid, _, node_path, edge_path)| {
if node_path.is_empty() && edge_path.is_empty() {
None
} else {
Some(vid.as_u64())
}
})
.collect();
columns.push(Arc::new(UInt64Array::from(target_vids)));
}
let target_labels_name = format!("{}._labels", self.target_variable);
if batch
.schema()
.column_with_name(&target_labels_name)
.is_none()
{
use arrow_array::builder::{ListBuilder, StringBuilder};
let mut labels_builder = ListBuilder::new(StringBuilder::new());
for (_, vid, _, node_path, edge_path) in expansions.iter() {
if node_path.is_empty() && edge_path.is_empty() {
labels_builder.append(false);
continue;
}
let mut row_labels: Vec<String> = Vec::new();
let labels =
l0_visibility::get_vertex_labels(*vid, &self.graph_ctx.query_context());
for lbl in &labels {
if !row_labels.contains(lbl) {
row_labels.push(lbl.clone());
}
}
let values = labels_builder.values();
for lbl in &row_labels {
values.append_value(lbl);
}
labels_builder.append(true);
}
columns.push(Arc::new(labels_builder.finish()));
}
let hop_counts: Vec<u64> = expansions
.iter()
.map(|(_, _, hops, _, _)| *hops as u64)
.collect();
columns.push(Arc::new(UInt64Array::from(hop_counts)));
if self.step_variable.is_some() {
let mut edges_builder = new_edge_list_builder();
let query_ctx = self.graph_ctx.query_context();
let type_names_str = self.type_names.join("|");
for (_, _, _, node_path, edge_path) in expansions.iter() {
if node_path.is_empty() && edge_path.is_empty() {
edges_builder.append_null();
} else if edge_path.is_empty() {
edges_builder.append(true);
} else {
for (i, eid) in edge_path.iter().enumerate() {
append_edge_to_struct(
edges_builder.values(),
*eid,
&type_names_str,
node_path[i].as_u64(),
node_path[i + 1].as_u64(),
&query_ctx,
);
}
edges_builder.append(true);
}
}
columns.push(Arc::new(edges_builder.finish()) as ArrayRef);
}
if let Some(path_var_name) = &self.path_variable {
let existing_path_col_idx = batch
.schema()
.column_with_name(path_var_name)
.map(|(idx, _)| idx);
let existing_path_arc = existing_path_col_idx.map(|idx| columns[idx].clone());
let existing_path = existing_path_arc
.as_ref()
.and_then(|arc| arc.as_any().downcast_ref::<arrow_array::StructArray>());
let mut nodes_builder = new_node_list_builder();
let mut rels_builder = new_edge_list_builder();
let query_ctx = self.graph_ctx.query_context();
let type_names_str = self.type_names.join("|");
let mut path_validity = Vec::with_capacity(expansions.len());
for (row_out_idx, (_, _, _, node_path, edge_path)) in expansions.iter().enumerate() {
if node_path.is_empty() && edge_path.is_empty() {
nodes_builder.append(false);
rels_builder.append(false);
path_validity.push(false);
continue;
}
let skip_first_vlp_node = if let Some(existing) = existing_path {
if !existing.is_null(row_out_idx) {
prepend_existing_path(
existing,
row_out_idx,
&mut nodes_builder,
&mut rels_builder,
&query_ctx,
);
true
} else {
false
}
} else {
false
};
let start_idx = if skip_first_vlp_node { 1 } else { 0 };
for vid in &node_path[start_idx..] {
append_node_to_struct(nodes_builder.values(), *vid, &query_ctx);
}
nodes_builder.append(true);
for (i, eid) in edge_path.iter().enumerate() {
append_edge_to_struct(
rels_builder.values(),
*eid,
&type_names_str,
node_path[i].as_u64(),
node_path[i + 1].as_u64(),
&query_ctx,
);
}
rels_builder.append(true);
path_validity.push(true);
}
let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
let rels_field = Arc::new(Field::new(
"relationships",
rels_array.data_type().clone(),
true,
));
let path_struct = arrow_array::StructArray::try_new(
vec![nodes_field, rels_field].into(),
vec![nodes_array, rels_array],
Some(arrow::buffer::NullBuffer::from(path_validity)),
)
.map_err(arrow_err)?;
if let Some(idx) = existing_path_col_idx {
columns[idx] = Arc::new(path_struct);
} else {
columns.push(Arc::new(path_struct));
}
}
for prop_name in &self.target_properties {
let full_prop_name = format!("{}.{}", self.target_variable, prop_name);
if batch.schema().column_with_name(&full_prop_name).is_none() {
columns.push(arrow_array::new_null_array(
&DataType::LargeBinary,
num_rows,
));
}
}
RecordBatch::try_new(self.schema.clone(), columns).map_err(arrow_err)
}
}
impl Stream for GraphVariableLengthTraverseMainStream {
type Item = DFResult<RecordBatch>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let state = std::mem::replace(&mut self.state, VarLengthMainStreamState::Done);
match state {
VarLengthMainStreamState::Loading(mut fut) => match fut.as_mut().poll(cx) {
Poll::Ready(Ok(adjacency)) => {
self.state = VarLengthMainStreamState::Processing(adjacency);
}
Poll::Ready(Err(e)) => {
self.state = VarLengthMainStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = VarLengthMainStreamState::Loading(fut);
return Poll::Pending;
}
},
VarLengthMainStreamState::Processing(adjacency) => {
match self.input.poll_next_unpin(cx) {
Poll::Ready(Some(Ok(batch))) => {
let base_batch = match self.process_batch(batch, &adjacency) {
Ok(b) => b,
Err(e) => {
self.state = VarLengthMainStreamState::Processing(adjacency);
return Poll::Ready(Some(Err(e)));
}
};
if self.target_properties.is_empty() {
self.state = VarLengthMainStreamState::Processing(adjacency);
return Poll::Ready(Some(Ok(base_batch)));
}
let schema = self.schema.clone();
let target_variable = self.target_variable.clone();
let target_properties = self.target_properties.clone();
let graph_ctx = self.graph_ctx.clone();
let fut = hydrate_vlp_target_properties(
base_batch,
schema,
target_variable,
target_properties,
None, graph_ctx,
);
self.state = VarLengthMainStreamState::Materializing {
adjacency,
fut: Box::pin(fut),
};
}
Poll::Ready(Some(Err(e))) => {
self.state = VarLengthMainStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
self.state = VarLengthMainStreamState::Done;
return Poll::Ready(None);
}
Poll::Pending => {
self.state = VarLengthMainStreamState::Processing(adjacency);
return Poll::Pending;
}
}
}
VarLengthMainStreamState::Materializing { adjacency, mut fut } => {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(batch)) => {
self.state = VarLengthMainStreamState::Processing(adjacency);
return Poll::Ready(Some(Ok(batch)));
}
Poll::Ready(Err(e)) => {
self.state = VarLengthMainStreamState::Done;
return Poll::Ready(Some(Err(e)));
}
Poll::Pending => {
self.state = VarLengthMainStreamState::Materializing { adjacency, fut };
return Poll::Pending;
}
}
}
VarLengthMainStreamState::Done => {
return Poll::Ready(None);
}
}
}
}
}
impl RecordBatchStream for GraphVariableLengthTraverseMainStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_traverse_schema_without_edge() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let output_schema =
GraphTraverseExec::build_schema(input_schema, "m", None, &[], &[], None, None, false);
assert_eq!(output_schema.fields().len(), 4);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "__eid_to_m");
}
#[test]
fn test_traverse_schema_with_edge() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let output_schema = GraphTraverseExec::build_schema(
input_schema,
"m",
Some("r"),
&[],
&[],
None,
None,
false,
);
assert_eq!(output_schema.fields().len(), 5);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "r._eid");
assert_eq!(output_schema.field(4).name(), "r._type");
}
#[test]
fn test_traverse_schema_with_target_properties() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let target_props = vec!["name".to_string(), "age".to_string()];
let output_schema = GraphTraverseExec::build_schema(
input_schema,
"m",
Some("r"),
&[],
&target_props,
None,
None,
false,
);
assert_eq!(output_schema.fields().len(), 7);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "m.name");
assert_eq!(output_schema.field(4).name(), "m.age");
assert_eq!(output_schema.field(5).name(), "r._eid");
assert_eq!(output_schema.field(6).name(), "r._type");
}
#[test]
fn test_variable_length_schema() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let output_schema = GraphVariableLengthTraverseExec::build_schema(
input_schema,
"b",
None,
Some("p"),
&[],
None,
);
assert_eq!(output_schema.fields().len(), 5);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "b._vid");
assert_eq!(output_schema.field(2).name(), "b._labels");
assert_eq!(output_schema.field(3).name(), "_hop_count");
assert_eq!(output_schema.field(4).name(), "p");
}
#[test]
fn test_traverse_main_schema_without_edge() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let output_schema =
GraphTraverseMainExec::build_schema(&input_schema, "m", &None, &[], &[], false);
assert_eq!(output_schema.fields().len(), 4);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "__eid_to_m");
}
#[test]
fn test_traverse_main_schema_with_edge() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let output_schema = GraphTraverseMainExec::build_schema(
&input_schema,
"m",
&Some("r".to_string()),
&[],
&[],
false,
);
assert_eq!(output_schema.fields().len(), 5);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "r._eid");
assert_eq!(output_schema.field(4).name(), "r._type");
}
#[test]
fn test_traverse_main_schema_with_edge_properties() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let edge_props = vec!["weight".to_string(), "since".to_string()];
let output_schema = GraphTraverseMainExec::build_schema(
&input_schema,
"m",
&Some("r".to_string()),
&edge_props,
&[],
false,
);
assert_eq!(output_schema.fields().len(), 7);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "r._eid");
assert_eq!(output_schema.field(4).name(), "r._type");
assert_eq!(output_schema.field(5).name(), "r.weight");
assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
assert_eq!(output_schema.field(6).name(), "r.since");
assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
}
#[test]
fn test_traverse_main_schema_with_target_properties() {
let input_schema = Arc::new(Schema::new(vec![Field::new(
"a._vid",
DataType::UInt64,
false,
)]));
let target_props = vec!["name".to_string(), "age".to_string()];
let output_schema = GraphTraverseMainExec::build_schema(
&input_schema,
"m",
&Some("r".to_string()),
&[],
&target_props,
false,
);
assert_eq!(output_schema.fields().len(), 7);
assert_eq!(output_schema.field(0).name(), "a._vid");
assert_eq!(output_schema.field(1).name(), "m._vid");
assert_eq!(output_schema.field(2).name(), "m._labels");
assert_eq!(output_schema.field(3).name(), "r._eid");
assert_eq!(output_schema.field(4).name(), "r._type");
assert_eq!(output_schema.field(5).name(), "m.name");
assert_eq!(output_schema.field(5).data_type(), &DataType::LargeBinary);
assert_eq!(output_schema.field(6).name(), "m.age");
assert_eq!(output_schema.field(6).data_type(), &DataType::LargeBinary);
}
}