1use std::any::Any;
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display, Formatter};
17use std::hash::Hash;
18use std::sync::Arc;
19
20use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt32Array, UInt64Array};
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
23use datafusion::arrow::compute::{cast, filter, filter_record_batch, take};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema::Schema as UniSchema;
30use uni_cypher::ast::{
31 Direction as AstDirection, Expr, NodePattern, Pattern, PatternElement, RelationshipPattern,
32};
33use uni_store::QueryContext;
34use uni_store::storage::direction::Direction;
35
36use super::GraphExecutionContext;
37use crate::query::df_graph::common::{build_path_struct_field, column_as_vid_array};
38use crate::query::df_graph::scan::build_property_column_static;
39
40#[derive(Debug, Clone)]
42pub struct TraversalStep {
43 pub edge_type_ids: Vec<u32>,
45 pub direction: Direction,
47 pub target_variable: Option<String>,
49 pub target_label_name: Option<String>,
51 pub edge_variable: Option<String>,
53}
54
55#[derive(Debug)]
58pub struct PatternComprehensionExecExpr {
59 graph_ctx: Arc<GraphExecutionContext>,
61 anchor_column: String,
63 traversal_steps: Vec<TraversalStep>,
65 path_variable: Option<String>,
67 predicate: Option<Arc<dyn PhysicalExpr>>,
69 map_expr: Arc<dyn PhysicalExpr>,
71 input_schema: Arc<Schema>,
73 inner_schema: Arc<Schema>,
75 output_item_type: DataType,
77 needed_vertex_props: HashMap<String, Vec<String>>,
79 needed_edge_props: HashMap<String, Vec<String>>,
81}
82
83impl Clone for PatternComprehensionExecExpr {
84 fn clone(&self) -> Self {
85 Self {
86 graph_ctx: self.graph_ctx.clone(),
87 anchor_column: self.anchor_column.clone(),
88 traversal_steps: self.traversal_steps.clone(),
89 path_variable: self.path_variable.clone(),
90 predicate: self.predicate.clone(),
91 map_expr: self.map_expr.clone(),
92 input_schema: self.input_schema.clone(),
93 inner_schema: self.inner_schema.clone(),
94 output_item_type: self.output_item_type.clone(),
95 needed_vertex_props: self.needed_vertex_props.clone(),
96 needed_edge_props: self.needed_edge_props.clone(),
97 }
98 }
99}
100
101impl PatternComprehensionExecExpr {
102 #[expect(clippy::too_many_arguments, reason = "Constructor for complex expr")]
103 pub fn new(
104 graph_ctx: Arc<GraphExecutionContext>,
105 anchor_column: String,
106 traversal_steps: Vec<TraversalStep>,
107 path_variable: Option<String>,
108 predicate: Option<Arc<dyn PhysicalExpr>>,
109 map_expr: Arc<dyn PhysicalExpr>,
110 input_schema: Arc<Schema>,
111 inner_schema: Arc<Schema>,
112 output_item_type: DataType,
113 needed_vertex_props: HashMap<String, Vec<String>>,
114 needed_edge_props: HashMap<String, Vec<String>>,
115 ) -> Self {
116 Self {
117 graph_ctx,
118 anchor_column,
119 traversal_steps,
120 path_variable,
121 predicate,
122 map_expr,
123 input_schema,
124 inner_schema,
125 output_item_type,
126 needed_vertex_props,
127 needed_edge_props,
128 }
129 }
130}
131
132impl Display for PatternComprehensionExecExpr {
133 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
134 write!(
135 f,
136 "PatternComprehension(anchor={}, steps={})",
137 self.anchor_column,
138 self.traversal_steps.len()
139 )
140 }
141}
142
143impl PartialEq for PatternComprehensionExecExpr {
144 fn eq(&self, other: &Self) -> bool {
145 self.anchor_column == other.anchor_column
146 && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
147 && Arc::ptr_eq(&self.map_expr, &other.map_expr)
148 && match (&self.predicate, &other.predicate) {
149 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
150 (None, None) => true,
151 _ => false,
152 }
153 }
154}
155
156impl Eq for PatternComprehensionExecExpr {}
157
158impl Hash for PatternComprehensionExecExpr {
159 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
160 self.anchor_column.hash(state);
161 self.output_item_type.hash(state);
162 }
163}
164
165impl PartialEq<dyn Any> for PatternComprehensionExecExpr {
166 fn eq(&self, other: &dyn Any) -> bool {
167 other
168 .downcast_ref::<Self>()
169 .map(|x| self == x)
170 .unwrap_or(false)
171 }
172}
173
174impl PhysicalExpr for PatternComprehensionExecExpr {
175 fn as_any(&self) -> &dyn Any {
176 self
177 }
178
179 fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
180 Ok(DataType::LargeList(Arc::new(Field::new(
181 "item",
182 self.output_item_type.clone(),
183 true,
184 ))))
185 }
186
187 fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
188 Ok(true)
189 }
190
191 fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
192 let num_rows = batch.num_rows();
193
194 let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
196 col
197 } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
198 batch.column_by_name(var_name).ok_or_else(|| {
199 DataFusionError::Execution(format!(
200 "Anchor column '{}' not found in batch schema: {:?}",
201 self.anchor_column,
202 batch
203 .schema()
204 .fields()
205 .iter()
206 .map(|f| f.name().as_str())
207 .collect::<Vec<_>>()
208 ))
209 })?
210 } else {
211 return Err(DataFusionError::Execution(format!(
212 "Anchor column '{}' not found in batch schema: {:?}",
213 self.anchor_column,
214 batch
215 .schema()
216 .fields()
217 .iter()
218 .map(|f| f.name().as_str())
219 .collect::<Vec<_>>()
220 )));
221 };
222 let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
223 let anchor_vids: &UInt64Array = &anchor_vid_cow;
224
225 for step in &self.traversal_steps {
228 log::debug!(
229 "PatternComprehension: warming CSR for edge_type_ids={:?}, direction={:?}",
230 step.edge_type_ids,
231 step.direction
232 );
233 std::thread::scope(|s| {
234 s.spawn(|| {
235 let rt = tokio::runtime::Builder::new_current_thread()
236 .enable_all()
237 .build()
238 .map_err(|e| {
239 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
240 })?;
241 rt.block_on(
242 self.graph_ctx
243 .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
244 )
245 .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
246 })
247 .join()
248 .unwrap_or_else(|_| {
249 Err(DataFusionError::Execution(
250 "CSR warming thread panicked".to_string(),
251 ))
252 })
253 })?;
254 }
255
256 log::debug!(
257 "PatternComprehension: expanding {} anchor VIDs, steps={}",
258 anchor_vids.len(),
259 self.traversal_steps.len()
260 );
261
262 let expansion = self.expand_pattern(anchor_vids)?;
265
266 log::debug!(
267 "PatternComprehension: expansion produced {} rows",
268 expansion.row_indices.len()
269 );
270
271 if expansion.row_indices.is_empty() {
273 return self.build_empty_list_result(num_rows);
274 }
275
276 let indices_array = UInt32Array::from(expansion.row_indices.clone());
278 let mut inner_columns: Vec<ArrayRef> = Vec::new();
279
280 for col in batch.columns() {
282 inner_columns.push(take(col, &indices_array, None)?);
283 }
284
285 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
287 if let Some(ref _target_var) = step.target_variable {
288 inner_columns.push(Arc::new(UInt64Array::from(
289 expansion.step_target_vids[step_idx].clone(),
290 )));
291 }
292 if let Some(ref _edge_var) = step.edge_variable {
293 inner_columns.push(Arc::new(UInt64Array::from(
294 expansion.step_edge_ids[step_idx].clone(),
295 )));
296 }
297 }
298
299 let query_ctx = self.graph_ctx.query_context();
301 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
302 if let Some(ref target_var) = step.target_variable
304 && let Some(props) = self.needed_vertex_props.get(target_var)
305 {
306 let vids: Vec<Vid> = expansion.step_target_vids[step_idx]
307 .iter()
308 .map(|v| Vid::from(*v))
309 .collect();
310
311 let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
312
313 let props_map = std::thread::scope(|s| {
314 s.spawn(|| {
315 let rt = tokio::runtime::Builder::new_current_thread()
316 .enable_all()
317 .build()
318 .map_err(|e| {
319 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
320 })?;
321 rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
322 &vids,
323 &prop_refs,
324 Some(&query_ctx),
325 ))
326 .map_err(|e| {
327 DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
328 })
329 })
330 .join()
331 .unwrap_or_else(|_| {
332 Err(DataFusionError::Execution(
333 "Vertex prop load thread panicked".to_string(),
334 ))
335 })
336 })?;
337
338 for prop in props {
339 let col = build_property_column_static(
340 &vids,
341 &props_map,
342 prop,
343 &DataType::LargeBinary,
344 )?;
345 inner_columns.push(col);
346 }
347 }
348
349 if let Some(ref edge_var) = step.edge_variable
351 && let Some(props) = self.needed_edge_props.get(edge_var)
352 {
353 let eids: Vec<Eid> = expansion.step_edge_ids[step_idx]
354 .iter()
355 .map(|e| Eid::from(*e))
356 .collect();
357
358 let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
359
360 let props_map = std::thread::scope(|s| {
361 s.spawn(|| {
362 let rt = tokio::runtime::Builder::new_current_thread()
363 .enable_all()
364 .build()
365 .map_err(|e| {
366 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
367 })?;
368 rt.block_on(self.graph_ctx.property_manager().get_batch_edge_props(
369 &eids,
370 &prop_refs,
371 Some(&query_ctx),
372 ))
373 .map_err(|e| {
374 DataFusionError::Execution(format!("Edge prop load failed: {e}"))
375 })
376 })
377 .join()
378 .unwrap_or_else(|_| {
379 Err(DataFusionError::Execution(
380 "Edge prop load thread panicked".to_string(),
381 ))
382 })
383 })?;
384
385 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
387 for prop in props {
388 let col = build_property_column_static(
389 &vid_keys,
390 &props_map,
391 prop,
392 &DataType::LargeBinary,
393 )?;
394 inner_columns.push(col);
395 }
396 }
397 }
398
399 if self.path_variable.is_some() {
401 let path_col = self.build_path_column(&expansion, anchor_vids, &query_ctx)?;
402 inner_columns.push(path_col);
403 }
404
405 let inner_batch = RecordBatch::try_new(self.inner_schema.clone(), inner_columns)?;
406
407 let (filtered_batch, filtered_indices) = if let Some(pred) = &self.predicate {
409 let mask = pred
410 .evaluate(&inner_batch)?
411 .into_array(inner_batch.num_rows())?;
412 let mask = cast(&mask, &DataType::Boolean)?;
413 let boolean_mask = mask
414 .as_any()
415 .downcast_ref::<BooleanArray>()
416 .ok_or_else(|| {
417 DataFusionError::Execution(
418 "Pattern comprehension predicate did not produce BooleanArray".to_string(),
419 )
420 })?;
421
422 let filtered_batch = filter_record_batch(&inner_batch, boolean_mask)?;
423 let indices_array_ref: ArrayRef = Arc::new(indices_array.clone());
424 let filtered_idx = filter(&indices_array_ref, boolean_mask)?;
425 let filtered_idx = filtered_idx
426 .as_any()
427 .downcast_ref::<UInt32Array>()
428 .unwrap()
429 .clone();
430
431 (filtered_batch, filtered_idx)
432 } else {
433 (inner_batch, indices_array.clone())
434 };
435
436 let mapped_val = self.map_expr.evaluate(&filtered_batch)?;
438 let mapped_array = mapped_val.into_array(filtered_batch.num_rows())?;
439
440 let new_offsets = {
442 let mut offsets = Vec::with_capacity(num_rows + 1);
443 offsets.push(0i64);
444
445 let indices_slice = filtered_indices.values();
446 let mut pos = 0;
447 let mut current_len: i64 = 0;
448
449 for row_idx in 0..num_rows {
450 while pos < indices_slice.len() && indices_slice[pos] as usize == row_idx {
451 pos += 1;
452 current_len += 1;
453 }
454 offsets.push(current_len);
455 }
456 OffsetBuffer::new(ScalarBuffer::from(offsets))
457 };
458
459 let new_field = Arc::new(Field::new("item", mapped_array.data_type().clone(), true));
460 let new_list = datafusion::arrow::array::LargeListArray::new(
461 new_field,
462 new_offsets,
463 mapped_array,
464 None,
465 );
466
467 Ok(ColumnarValue::Array(Arc::new(new_list)))
468 }
469
470 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
471 vec![]
474 }
475
476 fn with_new_children(
477 self: Arc<Self>,
478 children: Vec<Arc<dyn PhysicalExpr>>,
479 ) -> DFResult<Arc<dyn PhysicalExpr>> {
480 if !children.is_empty() {
481 return Err(DataFusionError::Internal(
482 "PatternComprehension has no children".to_string(),
483 ));
484 }
485 Ok(self)
486 }
487
488 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
489 write!(f, "PatternComprehension({})", self.anchor_column)
490 }
491}
492
493struct PatternExpansion {
495 row_indices: Vec<u32>,
497 anchor_vids: Vec<u64>,
499 step_target_vids: Vec<Vec<u64>>,
501 step_edge_ids: Vec<Vec<u64>>,
503 step_edge_type_ids: Vec<Vec<u32>>,
505}
506
507impl PatternComprehensionExecExpr {
508 fn expand_pattern(&self, anchor_vids: &UInt64Array) -> DFResult<PatternExpansion> {
513 let mut frontier_row_indices: Vec<u32> = Vec::new();
515 let mut frontier_vids: Vec<u64> = Vec::new();
516
517 for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
518 if let Some(vid_u64) = vid_opt {
519 frontier_row_indices.push(row_idx as u32);
520 frontier_vids.push(vid_u64);
521 }
522 }
523
524 let mut result_row_indices: Vec<u32> = frontier_row_indices.clone();
526 let mut result_anchor_vids: Vec<u64> = frontier_vids.clone();
528
529 let mut accumulated_target_vids: Vec<Vec<u64>> = Vec::new();
531 let mut accumulated_edge_ids: Vec<Vec<u64>> = Vec::new();
532 let mut accumulated_edge_type_ids: Vec<Vec<u32>> = Vec::new();
533
534 for step in &self.traversal_steps {
535 let is_undirected = step.direction == Direction::Both;
536 let query_ctx = self.graph_ctx.query_context();
537
538 let mut new_row_indices: Vec<u32> = Vec::new();
539 let mut new_anchor_vids: Vec<u64> = Vec::new();
540 let mut new_target_vids: Vec<u64> = Vec::new();
541 let mut new_edge_ids: Vec<u64> = Vec::new();
542 let mut new_edge_type_ids: Vec<u32> = Vec::new();
543 let num_prev_cols = accumulated_target_vids.len();
545 let mut new_accumulated_targets: Vec<Vec<u64>> = vec![Vec::new(); num_prev_cols];
546 let mut new_accumulated_edges: Vec<Vec<u64>> =
547 vec![Vec::new(); accumulated_edge_ids.len()];
548 let mut new_accumulated_edge_types: Vec<Vec<u32>> =
549 vec![Vec::new(); accumulated_edge_type_ids.len()];
550
551 for (i, &src_vid_u64) in frontier_vids.iter().enumerate() {
552 let vid = Vid::from(src_vid_u64);
553 let outer_row = result_row_indices[i];
554 let anchor_vid = result_anchor_vids[i];
555
556 let mut seen_edges: HashSet<u64> = HashSet::new();
557
558 for &edge_type in &step.edge_type_ids {
559 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
560
561 for (target_vid, eid) in neighbors {
562 let eid_u64 = eid.as_u64();
563
564 if is_undirected && !seen_edges.insert(eid_u64) {
566 continue;
567 }
568
569 if let Some(ref label_name) = step.target_label_name
573 && let Some(vertex_labels) =
574 self.graph_ctx.resolve_vertex_labels(target_vid, &query_ctx)
575 && !vertex_labels.contains(label_name)
576 {
577 continue;
578 }
579
580 new_row_indices.push(outer_row);
581 new_anchor_vids.push(anchor_vid);
582 new_target_vids.push(target_vid.as_u64());
583 new_edge_ids.push(eid_u64);
584 new_edge_type_ids.push(edge_type);
585
586 for (col_idx, col) in accumulated_target_vids.iter().enumerate() {
588 new_accumulated_targets[col_idx].push(col[i]);
589 }
590 for (col_idx, col) in accumulated_edge_ids.iter().enumerate() {
591 new_accumulated_edges[col_idx].push(col[i]);
592 }
593 for (col_idx, col) in accumulated_edge_type_ids.iter().enumerate() {
594 new_accumulated_edge_types[col_idx].push(col[i]);
595 }
596 }
597 }
598 }
599
600 frontier_vids.clone_from(&new_target_vids);
602 result_row_indices = new_row_indices;
603 result_anchor_vids = new_anchor_vids;
604
605 new_accumulated_targets.push(new_target_vids);
607 new_accumulated_edges.push(new_edge_ids);
608 new_accumulated_edge_types.push(new_edge_type_ids);
609 accumulated_target_vids = new_accumulated_targets;
610 accumulated_edge_ids = new_accumulated_edges;
611 accumulated_edge_type_ids = new_accumulated_edge_types;
612 }
613
614 Ok(PatternExpansion {
615 row_indices: result_row_indices,
616 anchor_vids: result_anchor_vids,
617 step_target_vids: accumulated_target_vids,
618 step_edge_ids: accumulated_edge_ids,
619 step_edge_type_ids: accumulated_edge_type_ids,
620 })
621 }
622
623 fn build_empty_list_result(&self, num_rows: usize) -> DFResult<ColumnarValue> {
625 let offsets: Vec<i64> = vec![0; num_rows + 1];
626 let empty_values: ArrayRef = arrow_array::new_empty_array(&self.output_item_type);
627 let field = Arc::new(Field::new("item", self.output_item_type.clone(), true));
628 let list = datafusion::arrow::array::LargeListArray::new(
629 field,
630 OffsetBuffer::new(ScalarBuffer::from(offsets)),
631 empty_values,
632 None,
633 );
634 Ok(ColumnarValue::Array(Arc::new(list)))
635 }
636
637 fn build_path_column(
643 &self,
644 expansion: &PatternExpansion,
645 _anchor_vids: &UInt64Array,
646 query_ctx: &QueryContext,
647 ) -> DFResult<ArrayRef> {
648 use arrow_array::builder::{
649 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
650 };
651
652 let num_expanded = expansion.row_indices.len();
653
654 let node_struct_fields: Vec<Arc<Field>> =
655 crate::query::df_graph::common::node_struct_fields()
656 .iter()
657 .cloned()
658 .collect();
659 let edge_struct_fields: Vec<Arc<Field>> =
660 crate::query::df_graph::common::edge_struct_fields()
661 .iter()
662 .cloned()
663 .collect();
664
665 let mut nodes_builder = ListBuilder::new(StructBuilder::new(
666 node_struct_fields,
667 vec![
668 Box::new(UInt64Builder::new()),
669 Box::new(ListBuilder::new(StringBuilder::new())),
670 Box::new(LargeBinaryBuilder::new()),
671 ],
672 ));
673
674 let mut rels_builder = ListBuilder::new(StructBuilder::new(
675 edge_struct_fields,
676 vec![
677 Box::new(UInt64Builder::new()),
678 Box::new(StringBuilder::new()),
679 Box::new(UInt64Builder::new()),
680 Box::new(UInt64Builder::new()),
681 Box::new(LargeBinaryBuilder::new()),
682 ],
683 ));
684
685 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
686 let num_steps = self.traversal_steps.len();
687
688 for row_idx in 0..num_expanded {
689 let anchor_vid_u64 = expansion.anchor_vids[row_idx];
691 let anchor_vid = Vid::from(anchor_vid_u64);
692
693 super::common::append_node_to_struct(nodes_builder.values(), anchor_vid, query_ctx);
695
696 for step_idx in 0..num_steps {
698 let target_vid = Vid::from(expansion.step_target_vids[step_idx][row_idx]);
699 super::common::append_node_to_struct(nodes_builder.values(), target_vid, query_ctx);
700 }
701 nodes_builder.append(true);
702
703 for step_idx in 0..num_steps {
705 let eid = Eid::from(expansion.step_edge_ids[step_idx][row_idx]);
706 let edge_type_id = expansion.step_edge_type_ids[step_idx][row_idx];
707 let edge_type_name = uni_schema
708 .edge_type_name_by_id_unified(edge_type_id)
709 .unwrap_or_default();
710
711 let src_vid = if step_idx == 0 {
714 anchor_vid_u64
715 } else {
716 expansion.step_target_vids[step_idx - 1][row_idx]
717 };
718 let dst_vid = expansion.step_target_vids[step_idx][row_idx];
719
720 let (stored_src, stored_dst) = self.graph_ctx.resolve_stored_edge_endpoints(
725 eid,
726 Vid::from(src_vid),
727 Vid::from(dst_vid),
728 &[edge_type_id],
729 );
730 super::common::append_edge_to_struct(
731 rels_builder.values(),
732 eid,
733 &edge_type_name,
734 stored_src,
735 stored_dst,
736 query_ctx,
737 );
738 }
739 rels_builder.append(true);
740 }
741
742 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
743 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
744
745 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
746 let rels_field = Arc::new(Field::new(
747 "relationships",
748 rels_array.data_type().clone(),
749 true,
750 ));
751
752 let path_struct = arrow_array::StructArray::try_new(
753 vec![nodes_field, rels_field].into(),
754 vec![nodes_array, rels_array],
755 None,
756 )
757 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
758
759 Ok(Arc::new(path_struct))
760 }
761}
762
763pub fn analyze_pattern(
770 pattern: &Pattern,
771 input_schema: &Schema,
772 uni_schema: &UniSchema,
773) -> anyhow::Result<(String, Vec<TraversalStep>)> {
774 if pattern.paths.is_empty() {
775 return Err(anyhow::anyhow!(
776 "Pattern comprehension requires at least one path"
777 ));
778 }
779
780 let path = &pattern.paths[0];
781 let elements = &path.elements;
782
783 if elements.is_empty() {
784 return Err(anyhow::anyhow!(
785 "Pattern comprehension path has no elements"
786 ));
787 }
788
789 let (anchor_idx, anchor_var) = find_anchor_node(elements, input_schema)?;
791
792 let anchor_column = format!("{}._vid", anchor_var);
793
794 let steps = build_traversal_steps(elements, anchor_idx, uni_schema)?;
796
797 Ok((anchor_column, steps))
798}
799
800fn find_anchor_node(
805 elements: &[PatternElement],
806 input_schema: &Schema,
807) -> anyhow::Result<(usize, String)> {
808 for (idx, elem) in elements.iter().enumerate() {
809 if let PatternElement::Node(node) = elem
810 && let Some(ref var) = node.variable
811 {
812 let vid_col = format!("{}._vid", var);
813 if input_schema.column_with_name(&vid_col).is_some() {
814 return Ok((idx, var.clone()));
815 }
816 }
817 }
818
819 Err(anyhow::anyhow!(
820 "No anchor node found in pattern comprehension. \
821 None of the pattern variables have a corresponding `_vid` column in the input schema. \
822 Schema fields: {:?}",
823 input_schema
824 .fields()
825 .iter()
826 .map(|f| f.name().as_str())
827 .collect::<Vec<_>>()
828 ))
829}
830
831fn build_traversal_steps(
836 elements: &[PatternElement],
837 anchor_idx: usize,
838 uni_schema: &UniSchema,
839) -> anyhow::Result<Vec<TraversalStep>> {
840 let mut steps = Vec::new();
841
842 let mut i = anchor_idx + 1;
844 while i + 1 < elements.len() {
845 let rel_elem = &elements[i];
846 let target_elem = &elements[i + 1];
847
848 let PatternElement::Relationship(rel) = rel_elem else {
849 return Err(anyhow::anyhow!(
850 "Expected relationship at pattern index {}, got {:?}",
851 i,
852 rel_elem
853 ));
854 };
855
856 let PatternElement::Node(target_node) = target_elem else {
857 return Err(anyhow::anyhow!(
858 "Expected node at pattern index {}, got {:?}",
859 i + 1,
860 target_elem
861 ));
862 };
863
864 let step = build_step_from_rel_and_node(rel, target_node, uni_schema)?;
865 steps.push(step);
866
867 i += 2;
868 }
869
870 if steps.is_empty() {
871 return Err(anyhow::anyhow!(
872 "Pattern comprehension has no traversal steps after anchor"
873 ));
874 }
875
876 Ok(steps)
877}
878
879fn build_step_from_rel_and_node(
881 rel: &RelationshipPattern,
882 target_node: &NodePattern,
883 uni_schema: &UniSchema,
884) -> anyhow::Result<TraversalStep> {
885 let edge_type_ids = if rel.types.is_empty() {
887 uni_schema.all_edge_type_ids()
889 } else {
890 rel.types
891 .iter()
892 .filter_map(|t| resolve_edge_type_id_unified(uni_schema, t))
893 .collect()
894 };
895
896 if edge_type_ids.is_empty() && !rel.types.is_empty() {
897 return Ok(TraversalStep {
900 edge_type_ids: vec![],
901 direction: convert_direction(&rel.direction),
902 target_variable: target_node.variable.clone(),
903 target_label_name: target_node.labels.first().cloned(),
904 edge_variable: rel.variable.clone(),
905 });
906 }
907
908 let direction = convert_direction(&rel.direction);
909 let target_label_name = target_node.labels.first().cloned();
910
911 Ok(TraversalStep {
912 edge_type_ids,
913 direction,
914 target_variable: target_node.variable.clone(),
915 target_label_name,
916 edge_variable: rel.variable.clone(),
917 })
918}
919
920fn resolve_edge_type_id_unified(uni_schema: &UniSchema, type_name: &str) -> Option<u32> {
923 uni_schema.edge_type_id_unified_case_insensitive(type_name)
924}
925
926fn convert_direction(ast_dir: &AstDirection) -> Direction {
928 match ast_dir {
929 AstDirection::Outgoing => Direction::Outgoing,
930 AstDirection::Incoming => Direction::Incoming,
931 AstDirection::Both => Direction::Both,
932 }
933}
934
935pub fn collect_inner_properties(
941 where_clause: Option<&Expr>,
942 map_expr: &Expr,
943 steps: &[TraversalStep],
944) -> (HashMap<String, Vec<String>>, HashMap<String, Vec<String>>) {
945 let mut vertex_props: HashMap<String, Vec<String>> = HashMap::new();
946 let mut edge_props: HashMap<String, Vec<String>> = HashMap::new();
947
948 let node_vars: HashSet<String> = steps
950 .iter()
951 .filter_map(|s| s.target_variable.clone())
952 .collect();
953 let edge_vars: HashSet<String> = steps
954 .iter()
955 .filter_map(|s| s.edge_variable.clone())
956 .collect();
957
958 let mut exprs_to_visit: Vec<&Expr> = vec![map_expr];
960 if let Some(w) = where_clause {
961 exprs_to_visit.push(w);
962 }
963
964 while let Some(expr) = exprs_to_visit.pop() {
965 match expr {
966 Expr::Property(base, prop) => {
967 if let Expr::Variable(var) = base.as_ref() {
968 if node_vars.contains(var) {
969 vertex_props
970 .entry(var.clone())
971 .or_default()
972 .push(prop.clone());
973 } else if edge_vars.contains(var) {
974 edge_props
975 .entry(var.clone())
976 .or_default()
977 .push(prop.clone());
978 }
979 }
980 exprs_to_visit.push(base);
982 }
983 Expr::BinaryOp { left, right, .. } => {
984 exprs_to_visit.push(left);
985 exprs_to_visit.push(right);
986 }
987 Expr::UnaryOp { expr: inner, .. } => {
988 exprs_to_visit.push(inner);
989 }
990 Expr::FunctionCall { args, .. } => {
991 for arg in args {
992 exprs_to_visit.push(arg);
993 }
994 }
995 Expr::Case {
996 when_then,
997 else_expr,
998 ..
999 } => {
1000 for (w, t) in when_then {
1001 exprs_to_visit.push(w);
1002 exprs_to_visit.push(t);
1003 }
1004 if let Some(e) = else_expr {
1005 exprs_to_visit.push(e);
1006 }
1007 }
1008 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1009 exprs_to_visit.push(inner);
1010 }
1011 Expr::List(items) => {
1012 for item in items {
1013 exprs_to_visit.push(item);
1014 }
1015 }
1016 Expr::Map(entries) => {
1017 for (_, v) in entries {
1018 exprs_to_visit.push(v);
1019 }
1020 }
1021 Expr::In { expr: l, list: r } => {
1022 exprs_to_visit.push(l);
1023 exprs_to_visit.push(r);
1024 }
1025 _ => {}
1026 }
1027 }
1028
1029 for props in vertex_props.values_mut() {
1031 props.sort();
1032 props.dedup();
1033 }
1034 for props in edge_props.values_mut() {
1035 props.sort();
1036 props.dedup();
1037 }
1038
1039 (vertex_props, edge_props)
1040}
1041
1042pub fn build_inner_schema(
1048 input_schema: &Schema,
1049 steps: &[TraversalStep],
1050 vertex_props: &HashMap<String, Vec<String>>,
1051 edge_props: &HashMap<String, Vec<String>>,
1052 path_variable: Option<&str>,
1053) -> Schema {
1054 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
1055
1056 for step in steps {
1057 if let Some(ref target_var) = step.target_variable {
1059 fields.push(Arc::new(Field::new(
1060 format!("{}._vid", target_var),
1061 DataType::UInt64,
1062 true,
1063 )));
1064 }
1065
1066 if let Some(ref edge_var) = step.edge_variable {
1068 fields.push(Arc::new(Field::new(
1069 format!("{}._eid", edge_var),
1070 DataType::UInt64,
1071 true,
1072 )));
1073 }
1074 }
1075
1076 for step in steps {
1078 if let Some(ref target_var) = step.target_variable
1079 && let Some(props) = vertex_props.get(target_var)
1080 {
1081 for prop in props {
1082 fields.push(Arc::new(Field::new(
1083 format!("{}.{}", target_var, prop),
1084 DataType::LargeBinary,
1085 true,
1086 )));
1087 }
1088 }
1089 }
1090
1091 for step in steps {
1093 if let Some(ref edge_var) = step.edge_variable
1094 && let Some(props) = edge_props.get(edge_var)
1095 {
1096 for prop in props {
1097 fields.push(Arc::new(Field::new(
1098 format!("{}.{}", edge_var, prop),
1099 DataType::LargeBinary,
1100 true,
1101 )));
1102 }
1103 }
1104 }
1105
1106 if let Some(path_var) = path_variable {
1108 fields.push(Arc::new(build_path_struct_field(path_var)));
1109 }
1110
1111 Schema::new(fields)
1112}