1use std::any::Any;
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display, Formatter};
17use std::hash::Hash;
18use std::sync::Arc;
19
20use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt32Array, UInt64Array};
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
23use datafusion::arrow::compute::{cast, filter, filter_record_batch, take};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema::Schema as UniSchema;
30use uni_cypher::ast::{
31 Direction as AstDirection, Expr, NodePattern, Pattern, PatternElement, RelationshipPattern,
32};
33use uni_store::QueryContext;
34use uni_store::runtime::l0_visibility;
35use uni_store::storage::direction::Direction;
36
37use super::GraphExecutionContext;
38use crate::query::df_graph::common::{build_path_struct_field, column_as_vid_array};
39use crate::query::df_graph::scan::build_property_column_static;
40
41#[derive(Debug, Clone)]
43pub struct TraversalStep {
44 pub edge_type_ids: Vec<u32>,
46 pub direction: Direction,
48 pub target_variable: Option<String>,
50 pub target_label_name: Option<String>,
52 pub edge_variable: Option<String>,
54}
55
56#[derive(Debug)]
59pub struct PatternComprehensionExecExpr {
60 graph_ctx: Arc<GraphExecutionContext>,
62 anchor_column: String,
64 traversal_steps: Vec<TraversalStep>,
66 path_variable: Option<String>,
68 predicate: Option<Arc<dyn PhysicalExpr>>,
70 map_expr: Arc<dyn PhysicalExpr>,
72 input_schema: Arc<Schema>,
74 inner_schema: Arc<Schema>,
76 output_item_type: DataType,
78 needed_vertex_props: HashMap<String, Vec<String>>,
80 needed_edge_props: HashMap<String, Vec<String>>,
82}
83
84impl Clone for PatternComprehensionExecExpr {
85 fn clone(&self) -> Self {
86 Self {
87 graph_ctx: self.graph_ctx.clone(),
88 anchor_column: self.anchor_column.clone(),
89 traversal_steps: self.traversal_steps.clone(),
90 path_variable: self.path_variable.clone(),
91 predicate: self.predicate.clone(),
92 map_expr: self.map_expr.clone(),
93 input_schema: self.input_schema.clone(),
94 inner_schema: self.inner_schema.clone(),
95 output_item_type: self.output_item_type.clone(),
96 needed_vertex_props: self.needed_vertex_props.clone(),
97 needed_edge_props: self.needed_edge_props.clone(),
98 }
99 }
100}
101
102impl PatternComprehensionExecExpr {
103 #[expect(clippy::too_many_arguments, reason = "Constructor for complex expr")]
104 pub fn new(
105 graph_ctx: Arc<GraphExecutionContext>,
106 anchor_column: String,
107 traversal_steps: Vec<TraversalStep>,
108 path_variable: Option<String>,
109 predicate: Option<Arc<dyn PhysicalExpr>>,
110 map_expr: Arc<dyn PhysicalExpr>,
111 input_schema: Arc<Schema>,
112 inner_schema: Arc<Schema>,
113 output_item_type: DataType,
114 needed_vertex_props: HashMap<String, Vec<String>>,
115 needed_edge_props: HashMap<String, Vec<String>>,
116 ) -> Self {
117 Self {
118 graph_ctx,
119 anchor_column,
120 traversal_steps,
121 path_variable,
122 predicate,
123 map_expr,
124 input_schema,
125 inner_schema,
126 output_item_type,
127 needed_vertex_props,
128 needed_edge_props,
129 }
130 }
131}
132
133impl Display for PatternComprehensionExecExpr {
134 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
135 write!(
136 f,
137 "PatternComprehension(anchor={}, steps={})",
138 self.anchor_column,
139 self.traversal_steps.len()
140 )
141 }
142}
143
144impl PartialEq for PatternComprehensionExecExpr {
145 fn eq(&self, other: &Self) -> bool {
146 self.anchor_column == other.anchor_column
147 && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
148 && Arc::ptr_eq(&self.map_expr, &other.map_expr)
149 && match (&self.predicate, &other.predicate) {
150 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
151 (None, None) => true,
152 _ => false,
153 }
154 }
155}
156
157impl Eq for PatternComprehensionExecExpr {}
158
159impl Hash for PatternComprehensionExecExpr {
160 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
161 self.anchor_column.hash(state);
162 self.output_item_type.hash(state);
163 }
164}
165
166impl PartialEq<dyn Any> for PatternComprehensionExecExpr {
167 fn eq(&self, other: &dyn Any) -> bool {
168 other
169 .downcast_ref::<Self>()
170 .map(|x| self == x)
171 .unwrap_or(false)
172 }
173}
174
175impl PhysicalExpr for PatternComprehensionExecExpr {
176 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
181 Ok(DataType::LargeList(Arc::new(Field::new(
182 "item",
183 self.output_item_type.clone(),
184 true,
185 ))))
186 }
187
188 fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
189 Ok(true)
190 }
191
192 fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
193 let num_rows = batch.num_rows();
194
195 let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
197 col
198 } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
199 batch.column_by_name(var_name).ok_or_else(|| {
200 DataFusionError::Execution(format!(
201 "Anchor column '{}' not found in batch schema: {:?}",
202 self.anchor_column,
203 batch
204 .schema()
205 .fields()
206 .iter()
207 .map(|f| f.name().as_str())
208 .collect::<Vec<_>>()
209 ))
210 })?
211 } else {
212 return Err(DataFusionError::Execution(format!(
213 "Anchor column '{}' not found in batch schema: {:?}",
214 self.anchor_column,
215 batch
216 .schema()
217 .fields()
218 .iter()
219 .map(|f| f.name().as_str())
220 .collect::<Vec<_>>()
221 )));
222 };
223 let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
224 let anchor_vids: &UInt64Array = &anchor_vid_cow;
225
226 for step in &self.traversal_steps {
229 log::debug!(
230 "PatternComprehension: warming CSR for edge_type_ids={:?}, direction={:?}",
231 step.edge_type_ids,
232 step.direction
233 );
234 std::thread::scope(|s| {
235 s.spawn(|| {
236 let rt = tokio::runtime::Builder::new_current_thread()
237 .enable_all()
238 .build()
239 .map_err(|e| {
240 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
241 })?;
242 rt.block_on(
243 self.graph_ctx
244 .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
245 )
246 .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
247 })
248 .join()
249 .unwrap_or_else(|_| {
250 Err(DataFusionError::Execution(
251 "CSR warming thread panicked".to_string(),
252 ))
253 })
254 })?;
255 }
256
257 log::debug!(
258 "PatternComprehension: expanding {} anchor VIDs, steps={}",
259 anchor_vids.len(),
260 self.traversal_steps.len()
261 );
262
263 let expansion = self.expand_pattern(anchor_vids)?;
266
267 log::debug!(
268 "PatternComprehension: expansion produced {} rows",
269 expansion.row_indices.len()
270 );
271
272 if expansion.row_indices.is_empty() {
274 return self.build_empty_list_result(num_rows);
275 }
276
277 let indices_array = UInt32Array::from(expansion.row_indices.clone());
279 let mut inner_columns: Vec<ArrayRef> = Vec::new();
280
281 for col in batch.columns() {
283 inner_columns.push(take(col, &indices_array, None)?);
284 }
285
286 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
288 if let Some(ref _target_var) = step.target_variable {
289 inner_columns.push(Arc::new(UInt64Array::from(
290 expansion.step_target_vids[step_idx].clone(),
291 )));
292 }
293 if let Some(ref _edge_var) = step.edge_variable {
294 inner_columns.push(Arc::new(UInt64Array::from(
295 expansion.step_edge_ids[step_idx].clone(),
296 )));
297 }
298 }
299
300 let query_ctx = self.graph_ctx.query_context();
302 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
303 if let Some(ref target_var) = step.target_variable
305 && let Some(props) = self.needed_vertex_props.get(target_var)
306 {
307 let vids: Vec<Vid> = expansion.step_target_vids[step_idx]
308 .iter()
309 .map(|v| Vid::from(*v))
310 .collect();
311
312 let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
313
314 let props_map = std::thread::scope(|s| {
315 s.spawn(|| {
316 let rt = tokio::runtime::Builder::new_current_thread()
317 .enable_all()
318 .build()
319 .map_err(|e| {
320 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
321 })?;
322 rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
323 &vids,
324 &prop_refs,
325 Some(&query_ctx),
326 ))
327 .map_err(|e| {
328 DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
329 })
330 })
331 .join()
332 .unwrap_or_else(|_| {
333 Err(DataFusionError::Execution(
334 "Vertex prop load thread panicked".to_string(),
335 ))
336 })
337 })?;
338
339 for prop in props {
340 let col = build_property_column_static(
341 &vids,
342 &props_map,
343 prop,
344 &DataType::LargeBinary,
345 )?;
346 inner_columns.push(col);
347 }
348 }
349
350 if let Some(ref edge_var) = step.edge_variable
352 && let Some(props) = self.needed_edge_props.get(edge_var)
353 {
354 let eids: Vec<Eid> = expansion.step_edge_ids[step_idx]
355 .iter()
356 .map(|e| Eid::from(*e))
357 .collect();
358
359 let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
360
361 let props_map = std::thread::scope(|s| {
362 s.spawn(|| {
363 let rt = tokio::runtime::Builder::new_current_thread()
364 .enable_all()
365 .build()
366 .map_err(|e| {
367 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
368 })?;
369 rt.block_on(self.graph_ctx.property_manager().get_batch_edge_props(
370 &eids,
371 &prop_refs,
372 Some(&query_ctx),
373 ))
374 .map_err(|e| {
375 DataFusionError::Execution(format!("Edge prop load failed: {e}"))
376 })
377 })
378 .join()
379 .unwrap_or_else(|_| {
380 Err(DataFusionError::Execution(
381 "Edge prop load thread panicked".to_string(),
382 ))
383 })
384 })?;
385
386 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
388 for prop in props {
389 let col = build_property_column_static(
390 &vid_keys,
391 &props_map,
392 prop,
393 &DataType::LargeBinary,
394 )?;
395 inner_columns.push(col);
396 }
397 }
398 }
399
400 if self.path_variable.is_some() {
402 let path_col = self.build_path_column(&expansion, anchor_vids, &query_ctx)?;
403 inner_columns.push(path_col);
404 }
405
406 let inner_batch = RecordBatch::try_new(self.inner_schema.clone(), inner_columns)?;
407
408 let (filtered_batch, filtered_indices) = if let Some(pred) = &self.predicate {
410 let mask = pred
411 .evaluate(&inner_batch)?
412 .into_array(inner_batch.num_rows())?;
413 let mask = cast(&mask, &DataType::Boolean)?;
414 let boolean_mask = mask
415 .as_any()
416 .downcast_ref::<BooleanArray>()
417 .ok_or_else(|| {
418 DataFusionError::Execution(
419 "Pattern comprehension predicate did not produce BooleanArray".to_string(),
420 )
421 })?;
422
423 let filtered_batch = filter_record_batch(&inner_batch, boolean_mask)?;
424 let indices_array_ref: ArrayRef = Arc::new(indices_array.clone());
425 let filtered_idx = filter(&indices_array_ref, boolean_mask)?;
426 let filtered_idx = filtered_idx
427 .as_any()
428 .downcast_ref::<UInt32Array>()
429 .unwrap()
430 .clone();
431
432 (filtered_batch, filtered_idx)
433 } else {
434 (inner_batch, indices_array.clone())
435 };
436
437 let mapped_val = self.map_expr.evaluate(&filtered_batch)?;
439 let mapped_array = mapped_val.into_array(filtered_batch.num_rows())?;
440
441 let new_offsets = {
443 let mut offsets = Vec::with_capacity(num_rows + 1);
444 offsets.push(0i64);
445
446 let indices_slice = filtered_indices.values();
447 let mut pos = 0;
448 let mut current_len: i64 = 0;
449
450 for row_idx in 0..num_rows {
451 while pos < indices_slice.len() && indices_slice[pos] as usize == row_idx {
452 pos += 1;
453 current_len += 1;
454 }
455 offsets.push(current_len);
456 }
457 OffsetBuffer::new(ScalarBuffer::from(offsets))
458 };
459
460 let new_field = Arc::new(Field::new("item", mapped_array.data_type().clone(), true));
461 let new_list = datafusion::arrow::array::LargeListArray::new(
462 new_field,
463 new_offsets,
464 mapped_array,
465 None,
466 );
467
468 Ok(ColumnarValue::Array(Arc::new(new_list)))
469 }
470
471 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
472 vec![]
475 }
476
477 fn with_new_children(
478 self: Arc<Self>,
479 children: Vec<Arc<dyn PhysicalExpr>>,
480 ) -> DFResult<Arc<dyn PhysicalExpr>> {
481 if !children.is_empty() {
482 return Err(DataFusionError::Internal(
483 "PatternComprehension has no children".to_string(),
484 ));
485 }
486 Ok(self)
487 }
488
489 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490 write!(f, "PatternComprehension({})", self.anchor_column)
491 }
492}
493
494struct PatternExpansion {
496 row_indices: Vec<u32>,
498 anchor_vids: Vec<u64>,
500 step_target_vids: Vec<Vec<u64>>,
502 step_edge_ids: Vec<Vec<u64>>,
504 step_edge_type_ids: Vec<Vec<u32>>,
506}
507
508impl PatternComprehensionExecExpr {
509 fn expand_pattern(&self, anchor_vids: &UInt64Array) -> DFResult<PatternExpansion> {
514 let mut frontier_row_indices: Vec<u32> = Vec::new();
516 let mut frontier_vids: Vec<u64> = Vec::new();
517
518 for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
519 if let Some(vid_u64) = vid_opt {
520 frontier_row_indices.push(row_idx as u32);
521 frontier_vids.push(vid_u64);
522 }
523 }
524
525 let mut result_row_indices: Vec<u32> = frontier_row_indices.clone();
527 let mut result_anchor_vids: Vec<u64> = frontier_vids.clone();
529
530 let mut accumulated_target_vids: Vec<Vec<u64>> = Vec::new();
532 let mut accumulated_edge_ids: Vec<Vec<u64>> = Vec::new();
533 let mut accumulated_edge_type_ids: Vec<Vec<u32>> = Vec::new();
534
535 for step in &self.traversal_steps {
536 let is_undirected = step.direction == Direction::Both;
537 let query_ctx = self.graph_ctx.query_context();
538
539 let mut new_row_indices: Vec<u32> = Vec::new();
540 let mut new_anchor_vids: Vec<u64> = Vec::new();
541 let mut new_target_vids: Vec<u64> = Vec::new();
542 let mut new_edge_ids: Vec<u64> = Vec::new();
543 let mut new_edge_type_ids: Vec<u32> = Vec::new();
544 let num_prev_cols = accumulated_target_vids.len();
546 let mut new_accumulated_targets: Vec<Vec<u64>> = vec![Vec::new(); num_prev_cols];
547 let mut new_accumulated_edges: Vec<Vec<u64>> =
548 vec![Vec::new(); accumulated_edge_ids.len()];
549 let mut new_accumulated_edge_types: Vec<Vec<u32>> =
550 vec![Vec::new(); accumulated_edge_type_ids.len()];
551
552 for (i, &src_vid_u64) in frontier_vids.iter().enumerate() {
553 let vid = Vid::from(src_vid_u64);
554 let outer_row = result_row_indices[i];
555 let anchor_vid = result_anchor_vids[i];
556
557 let mut seen_edges: HashSet<u64> = HashSet::new();
558
559 for &edge_type in &step.edge_type_ids {
560 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
561
562 for (target_vid, eid) in neighbors {
563 let eid_u64 = eid.as_u64();
564
565 if is_undirected && !seen_edges.insert(eid_u64) {
567 continue;
568 }
569
570 if let Some(ref label_name) = step.target_label_name
572 && let Some(vertex_labels) =
573 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
574 && !vertex_labels.contains(label_name)
575 {
576 continue;
577 }
578
579 new_row_indices.push(outer_row);
580 new_anchor_vids.push(anchor_vid);
581 new_target_vids.push(target_vid.as_u64());
582 new_edge_ids.push(eid_u64);
583 new_edge_type_ids.push(edge_type);
584
585 for (col_idx, col) in accumulated_target_vids.iter().enumerate() {
587 new_accumulated_targets[col_idx].push(col[i]);
588 }
589 for (col_idx, col) in accumulated_edge_ids.iter().enumerate() {
590 new_accumulated_edges[col_idx].push(col[i]);
591 }
592 for (col_idx, col) in accumulated_edge_type_ids.iter().enumerate() {
593 new_accumulated_edge_types[col_idx].push(col[i]);
594 }
595 }
596 }
597 }
598
599 frontier_vids.clone_from(&new_target_vids);
601 result_row_indices = new_row_indices;
602 result_anchor_vids = new_anchor_vids;
603
604 new_accumulated_targets.push(new_target_vids);
606 new_accumulated_edges.push(new_edge_ids);
607 new_accumulated_edge_types.push(new_edge_type_ids);
608 accumulated_target_vids = new_accumulated_targets;
609 accumulated_edge_ids = new_accumulated_edges;
610 accumulated_edge_type_ids = new_accumulated_edge_types;
611 }
612
613 Ok(PatternExpansion {
614 row_indices: result_row_indices,
615 anchor_vids: result_anchor_vids,
616 step_target_vids: accumulated_target_vids,
617 step_edge_ids: accumulated_edge_ids,
618 step_edge_type_ids: accumulated_edge_type_ids,
619 })
620 }
621
622 fn build_empty_list_result(&self, num_rows: usize) -> DFResult<ColumnarValue> {
624 let offsets: Vec<i64> = vec![0; num_rows + 1];
625 let empty_values: ArrayRef = arrow_array::new_empty_array(&self.output_item_type);
626 let field = Arc::new(Field::new("item", self.output_item_type.clone(), true));
627 let list = datafusion::arrow::array::LargeListArray::new(
628 field,
629 OffsetBuffer::new(ScalarBuffer::from(offsets)),
630 empty_values,
631 None,
632 );
633 Ok(ColumnarValue::Array(Arc::new(list)))
634 }
635
636 fn build_path_column(
642 &self,
643 expansion: &PatternExpansion,
644 _anchor_vids: &UInt64Array,
645 query_ctx: &QueryContext,
646 ) -> DFResult<ArrayRef> {
647 use arrow_array::builder::{
648 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
649 };
650
651 let num_expanded = expansion.row_indices.len();
652
653 let node_struct_fields: Vec<Arc<Field>> =
654 crate::query::df_graph::common::node_struct_fields()
655 .iter()
656 .cloned()
657 .collect();
658 let edge_struct_fields: Vec<Arc<Field>> =
659 crate::query::df_graph::common::edge_struct_fields()
660 .iter()
661 .cloned()
662 .collect();
663
664 let mut nodes_builder = ListBuilder::new(StructBuilder::new(
665 node_struct_fields,
666 vec![
667 Box::new(UInt64Builder::new()),
668 Box::new(ListBuilder::new(StringBuilder::new())),
669 Box::new(LargeBinaryBuilder::new()),
670 ],
671 ));
672
673 let mut rels_builder = ListBuilder::new(StructBuilder::new(
674 edge_struct_fields,
675 vec![
676 Box::new(UInt64Builder::new()),
677 Box::new(StringBuilder::new()),
678 Box::new(UInt64Builder::new()),
679 Box::new(UInt64Builder::new()),
680 Box::new(LargeBinaryBuilder::new()),
681 ],
682 ));
683
684 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
685 let num_steps = self.traversal_steps.len();
686
687 for row_idx in 0..num_expanded {
688 let anchor_vid_u64 = expansion.anchor_vids[row_idx];
690 let anchor_vid = Vid::from(anchor_vid_u64);
691
692 super::common::append_node_to_struct(nodes_builder.values(), anchor_vid, query_ctx);
694
695 for step_idx in 0..num_steps {
697 let target_vid = Vid::from(expansion.step_target_vids[step_idx][row_idx]);
698 super::common::append_node_to_struct(nodes_builder.values(), target_vid, query_ctx);
699 }
700 nodes_builder.append(true);
701
702 for step_idx in 0..num_steps {
704 let eid = Eid::from(expansion.step_edge_ids[step_idx][row_idx]);
705 let edge_type_id = expansion.step_edge_type_ids[step_idx][row_idx];
706 let edge_type_name = uni_schema
707 .edge_type_name_by_id_unified(edge_type_id)
708 .unwrap_or_default();
709
710 let src_vid = if step_idx == 0 {
713 anchor_vid_u64
714 } else {
715 expansion.step_target_vids[step_idx - 1][row_idx]
716 };
717 let dst_vid = expansion.step_target_vids[step_idx][row_idx];
718
719 super::common::append_edge_to_struct(
720 rels_builder.values(),
721 eid,
722 &edge_type_name,
723 src_vid,
724 dst_vid,
725 query_ctx,
726 );
727 }
728 rels_builder.append(true);
729 }
730
731 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
732 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
733
734 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
735 let rels_field = Arc::new(Field::new(
736 "relationships",
737 rels_array.data_type().clone(),
738 true,
739 ));
740
741 let path_struct = arrow_array::StructArray::try_new(
742 vec![nodes_field, rels_field].into(),
743 vec![nodes_array, rels_array],
744 None,
745 )
746 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
747
748 Ok(Arc::new(path_struct))
749 }
750}
751
752pub fn analyze_pattern(
759 pattern: &Pattern,
760 input_schema: &Schema,
761 uni_schema: &UniSchema,
762) -> anyhow::Result<(String, Vec<TraversalStep>)> {
763 if pattern.paths.is_empty() {
764 return Err(anyhow::anyhow!(
765 "Pattern comprehension requires at least one path"
766 ));
767 }
768
769 let path = &pattern.paths[0];
770 let elements = &path.elements;
771
772 if elements.is_empty() {
773 return Err(anyhow::anyhow!(
774 "Pattern comprehension path has no elements"
775 ));
776 }
777
778 let (anchor_idx, anchor_var) = find_anchor_node(elements, input_schema)?;
780
781 let anchor_column = format!("{}._vid", anchor_var);
782
783 let steps = build_traversal_steps(elements, anchor_idx, uni_schema)?;
785
786 Ok((anchor_column, steps))
787}
788
789fn find_anchor_node(
794 elements: &[PatternElement],
795 input_schema: &Schema,
796) -> anyhow::Result<(usize, String)> {
797 for (idx, elem) in elements.iter().enumerate() {
798 if let PatternElement::Node(node) = elem
799 && let Some(ref var) = node.variable
800 {
801 let vid_col = format!("{}._vid", var);
802 if input_schema.column_with_name(&vid_col).is_some() {
803 return Ok((idx, var.clone()));
804 }
805 }
806 }
807
808 Err(anyhow::anyhow!(
809 "No anchor node found in pattern comprehension. \
810 None of the pattern variables have a corresponding `_vid` column in the input schema. \
811 Schema fields: {:?}",
812 input_schema
813 .fields()
814 .iter()
815 .map(|f| f.name().as_str())
816 .collect::<Vec<_>>()
817 ))
818}
819
820fn build_traversal_steps(
825 elements: &[PatternElement],
826 anchor_idx: usize,
827 uni_schema: &UniSchema,
828) -> anyhow::Result<Vec<TraversalStep>> {
829 let mut steps = Vec::new();
830
831 let mut i = anchor_idx + 1;
833 while i + 1 < elements.len() {
834 let rel_elem = &elements[i];
835 let target_elem = &elements[i + 1];
836
837 let PatternElement::Relationship(rel) = rel_elem else {
838 return Err(anyhow::anyhow!(
839 "Expected relationship at pattern index {}, got {:?}",
840 i,
841 rel_elem
842 ));
843 };
844
845 let PatternElement::Node(target_node) = target_elem else {
846 return Err(anyhow::anyhow!(
847 "Expected node at pattern index {}, got {:?}",
848 i + 1,
849 target_elem
850 ));
851 };
852
853 let step = build_step_from_rel_and_node(rel, target_node, uni_schema)?;
854 steps.push(step);
855
856 i += 2;
857 }
858
859 if steps.is_empty() {
860 return Err(anyhow::anyhow!(
861 "Pattern comprehension has no traversal steps after anchor"
862 ));
863 }
864
865 Ok(steps)
866}
867
868fn build_step_from_rel_and_node(
870 rel: &RelationshipPattern,
871 target_node: &NodePattern,
872 uni_schema: &UniSchema,
873) -> anyhow::Result<TraversalStep> {
874 let edge_type_ids = if rel.types.is_empty() {
876 uni_schema.all_edge_type_ids()
878 } else {
879 rel.types
880 .iter()
881 .filter_map(|t| resolve_edge_type_id_unified(uni_schema, t))
882 .collect()
883 };
884
885 if edge_type_ids.is_empty() && !rel.types.is_empty() {
886 return Ok(TraversalStep {
889 edge_type_ids: vec![],
890 direction: convert_direction(&rel.direction),
891 target_variable: target_node.variable.clone(),
892 target_label_name: target_node.labels.first().cloned(),
893 edge_variable: rel.variable.clone(),
894 });
895 }
896
897 let direction = convert_direction(&rel.direction);
898 let target_label_name = target_node.labels.first().cloned();
899
900 Ok(TraversalStep {
901 edge_type_ids,
902 direction,
903 target_variable: target_node.variable.clone(),
904 target_label_name,
905 edge_variable: rel.variable.clone(),
906 })
907}
908
909fn resolve_edge_type_id_unified(uni_schema: &UniSchema, type_name: &str) -> Option<u32> {
912 uni_schema.edge_type_id_unified_case_insensitive(type_name)
913}
914
915fn convert_direction(ast_dir: &AstDirection) -> Direction {
917 match ast_dir {
918 AstDirection::Outgoing => Direction::Outgoing,
919 AstDirection::Incoming => Direction::Incoming,
920 AstDirection::Both => Direction::Both,
921 }
922}
923
924pub fn collect_inner_properties(
930 where_clause: Option<&Expr>,
931 map_expr: &Expr,
932 steps: &[TraversalStep],
933) -> (HashMap<String, Vec<String>>, HashMap<String, Vec<String>>) {
934 let mut vertex_props: HashMap<String, Vec<String>> = HashMap::new();
935 let mut edge_props: HashMap<String, Vec<String>> = HashMap::new();
936
937 let node_vars: HashSet<String> = steps
939 .iter()
940 .filter_map(|s| s.target_variable.clone())
941 .collect();
942 let edge_vars: HashSet<String> = steps
943 .iter()
944 .filter_map(|s| s.edge_variable.clone())
945 .collect();
946
947 let mut exprs_to_visit: Vec<&Expr> = vec![map_expr];
949 if let Some(w) = where_clause {
950 exprs_to_visit.push(w);
951 }
952
953 while let Some(expr) = exprs_to_visit.pop() {
954 match expr {
955 Expr::Property(base, prop) => {
956 if let Expr::Variable(var) = base.as_ref() {
957 if node_vars.contains(var) {
958 vertex_props
959 .entry(var.clone())
960 .or_default()
961 .push(prop.clone());
962 } else if edge_vars.contains(var) {
963 edge_props
964 .entry(var.clone())
965 .or_default()
966 .push(prop.clone());
967 }
968 }
969 exprs_to_visit.push(base);
971 }
972 Expr::BinaryOp { left, right, .. } => {
973 exprs_to_visit.push(left);
974 exprs_to_visit.push(right);
975 }
976 Expr::UnaryOp { expr: inner, .. } => {
977 exprs_to_visit.push(inner);
978 }
979 Expr::FunctionCall { args, .. } => {
980 for arg in args {
981 exprs_to_visit.push(arg);
982 }
983 }
984 Expr::Case {
985 when_then,
986 else_expr,
987 ..
988 } => {
989 for (w, t) in when_then {
990 exprs_to_visit.push(w);
991 exprs_to_visit.push(t);
992 }
993 if let Some(e) = else_expr {
994 exprs_to_visit.push(e);
995 }
996 }
997 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
998 exprs_to_visit.push(inner);
999 }
1000 Expr::List(items) => {
1001 for item in items {
1002 exprs_to_visit.push(item);
1003 }
1004 }
1005 Expr::Map(entries) => {
1006 for (_, v) in entries {
1007 exprs_to_visit.push(v);
1008 }
1009 }
1010 Expr::In { expr: l, list: r } => {
1011 exprs_to_visit.push(l);
1012 exprs_to_visit.push(r);
1013 }
1014 _ => {}
1015 }
1016 }
1017
1018 for props in vertex_props.values_mut() {
1020 props.sort();
1021 props.dedup();
1022 }
1023 for props in edge_props.values_mut() {
1024 props.sort();
1025 props.dedup();
1026 }
1027
1028 (vertex_props, edge_props)
1029}
1030
1031pub fn build_inner_schema(
1037 input_schema: &Schema,
1038 steps: &[TraversalStep],
1039 vertex_props: &HashMap<String, Vec<String>>,
1040 edge_props: &HashMap<String, Vec<String>>,
1041 path_variable: Option<&str>,
1042) -> Schema {
1043 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
1044
1045 for step in steps {
1046 if let Some(ref target_var) = step.target_variable {
1048 fields.push(Arc::new(Field::new(
1049 format!("{}._vid", target_var),
1050 DataType::UInt64,
1051 true,
1052 )));
1053 }
1054
1055 if let Some(ref edge_var) = step.edge_variable {
1057 fields.push(Arc::new(Field::new(
1058 format!("{}._eid", edge_var),
1059 DataType::UInt64,
1060 true,
1061 )));
1062 }
1063 }
1064
1065 for step in steps {
1067 if let Some(ref target_var) = step.target_variable
1068 && let Some(props) = vertex_props.get(target_var)
1069 {
1070 for prop in props {
1071 fields.push(Arc::new(Field::new(
1072 format!("{}.{}", target_var, prop),
1073 DataType::LargeBinary,
1074 true,
1075 )));
1076 }
1077 }
1078 }
1079
1080 for step in steps {
1082 if let Some(ref edge_var) = step.edge_variable
1083 && let Some(props) = edge_props.get(edge_var)
1084 {
1085 for prop in props {
1086 fields.push(Arc::new(Field::new(
1087 format!("{}.{}", edge_var, prop),
1088 DataType::LargeBinary,
1089 true,
1090 )));
1091 }
1092 }
1093 }
1094
1095 if let Some(path_var) = path_variable {
1097 fields.push(Arc::new(build_path_struct_field(path_var)));
1098 }
1099
1100 Schema::new(fields)
1101}