1use std::any::Any;
15use std::collections::{HashMap, HashSet};
16use std::fmt::{self, Display, Formatter};
17use std::hash::Hash;
18use std::sync::Arc;
19
20use arrow_array::{Array, ArrayRef, BooleanArray, RecordBatch, UInt32Array, UInt64Array};
21use arrow_schema::{DataType, Field, Schema};
22use datafusion::arrow::buffer::{OffsetBuffer, ScalarBuffer};
23use datafusion::arrow::compute::{cast, filter, filter_record_batch, take};
24use datafusion::common::Result as DFResult;
25use datafusion::error::DataFusionError;
26use datafusion::logical_expr::ColumnarValue;
27use datafusion::physical_plan::PhysicalExpr;
28use uni_common::core::id::{Eid, Vid};
29use uni_common::core::schema::Schema as UniSchema;
30use uni_cypher::ast::{
31 Direction as AstDirection, Expr, NodePattern, Pattern, PatternElement, RelationshipPattern,
32};
33use uni_store::QueryContext;
34use uni_store::runtime::l0_visibility;
35use uni_store::storage::direction::Direction;
36
37use super::GraphExecutionContext;
38use crate::query::df_graph::common::{build_path_struct_field, column_as_vid_array};
39use crate::query::df_graph::scan::build_property_column_static;
40
41#[derive(Debug, Clone)]
43pub struct TraversalStep {
44 pub edge_type_ids: Vec<u32>,
46 pub direction: Direction,
48 pub target_variable: Option<String>,
50 pub target_label_name: Option<String>,
52 pub edge_variable: Option<String>,
54}
55
56#[derive(Debug)]
59pub struct PatternComprehensionExecExpr {
60 graph_ctx: Arc<GraphExecutionContext>,
62 anchor_column: String,
64 traversal_steps: Vec<TraversalStep>,
66 path_variable: Option<String>,
68 predicate: Option<Arc<dyn PhysicalExpr>>,
70 map_expr: Arc<dyn PhysicalExpr>,
72 input_schema: Arc<Schema>,
74 inner_schema: Arc<Schema>,
76 output_item_type: DataType,
78 needed_vertex_props: HashMap<String, Vec<String>>,
80 needed_edge_props: HashMap<String, Vec<String>>,
82}
83
84impl Clone for PatternComprehensionExecExpr {
85 fn clone(&self) -> Self {
86 Self {
87 graph_ctx: self.graph_ctx.clone(),
88 anchor_column: self.anchor_column.clone(),
89 traversal_steps: self.traversal_steps.clone(),
90 path_variable: self.path_variable.clone(),
91 predicate: self.predicate.clone(),
92 map_expr: self.map_expr.clone(),
93 input_schema: self.input_schema.clone(),
94 inner_schema: self.inner_schema.clone(),
95 output_item_type: self.output_item_type.clone(),
96 needed_vertex_props: self.needed_vertex_props.clone(),
97 needed_edge_props: self.needed_edge_props.clone(),
98 }
99 }
100}
101
102impl PatternComprehensionExecExpr {
103 #[expect(clippy::too_many_arguments, reason = "Constructor for complex expr")]
104 pub fn new(
105 graph_ctx: Arc<GraphExecutionContext>,
106 anchor_column: String,
107 traversal_steps: Vec<TraversalStep>,
108 path_variable: Option<String>,
109 predicate: Option<Arc<dyn PhysicalExpr>>,
110 map_expr: Arc<dyn PhysicalExpr>,
111 input_schema: Arc<Schema>,
112 inner_schema: Arc<Schema>,
113 output_item_type: DataType,
114 needed_vertex_props: HashMap<String, Vec<String>>,
115 needed_edge_props: HashMap<String, Vec<String>>,
116 ) -> Self {
117 Self {
118 graph_ctx,
119 anchor_column,
120 traversal_steps,
121 path_variable,
122 predicate,
123 map_expr,
124 input_schema,
125 inner_schema,
126 output_item_type,
127 needed_vertex_props,
128 needed_edge_props,
129 }
130 }
131}
132
133impl Display for PatternComprehensionExecExpr {
134 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
135 write!(
136 f,
137 "PatternComprehension(anchor={}, steps={})",
138 self.anchor_column,
139 self.traversal_steps.len()
140 )
141 }
142}
143
144impl PartialEq for PatternComprehensionExecExpr {
145 fn eq(&self, other: &Self) -> bool {
146 self.anchor_column == other.anchor_column
147 && Arc::ptr_eq(&self.graph_ctx, &other.graph_ctx)
148 && Arc::ptr_eq(&self.map_expr, &other.map_expr)
149 && match (&self.predicate, &other.predicate) {
150 (Some(a), Some(b)) => Arc::ptr_eq(a, b),
151 (None, None) => true,
152 _ => false,
153 }
154 }
155}
156
157impl Eq for PatternComprehensionExecExpr {}
158
159impl Hash for PatternComprehensionExecExpr {
160 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
161 self.anchor_column.hash(state);
162 self.output_item_type.hash(state);
163 }
164}
165
166impl PartialEq<dyn Any> for PatternComprehensionExecExpr {
167 fn eq(&self, other: &dyn Any) -> bool {
168 other
169 .downcast_ref::<Self>()
170 .map(|x| self == x)
171 .unwrap_or(false)
172 }
173}
174
175impl PhysicalExpr for PatternComprehensionExecExpr {
176 fn as_any(&self) -> &dyn Any {
177 self
178 }
179
180 fn data_type(&self, _input_schema: &Schema) -> DFResult<DataType> {
181 Ok(DataType::LargeList(Arc::new(Field::new(
182 "item",
183 self.output_item_type.clone(),
184 true,
185 ))))
186 }
187
188 fn nullable(&self, _input_schema: &Schema) -> DFResult<bool> {
189 Ok(true)
190 }
191
192 fn evaluate(&self, batch: &RecordBatch) -> DFResult<ColumnarValue> {
193 let num_rows = batch.num_rows();
194
195 let anchor_col = if let Some(col) = batch.column_by_name(&self.anchor_column) {
197 col
198 } else if let Some(var_name) = self.anchor_column.strip_suffix("._vid") {
199 batch.column_by_name(var_name).ok_or_else(|| {
200 DataFusionError::Execution(format!(
201 "Anchor column '{}' not found in batch schema: {:?}",
202 self.anchor_column,
203 batch
204 .schema()
205 .fields()
206 .iter()
207 .map(|f| f.name().as_str())
208 .collect::<Vec<_>>()
209 ))
210 })?
211 } else {
212 return Err(DataFusionError::Execution(format!(
213 "Anchor column '{}' not found in batch schema: {:?}",
214 self.anchor_column,
215 batch
216 .schema()
217 .fields()
218 .iter()
219 .map(|f| f.name().as_str())
220 .collect::<Vec<_>>()
221 )));
222 };
223 let anchor_vid_cow = column_as_vid_array(anchor_col.as_ref())?;
224 let anchor_vids: &UInt64Array = &anchor_vid_cow;
225
226 for step in &self.traversal_steps {
229 log::debug!(
230 "PatternComprehension: warming CSR for edge_type_ids={:?}, direction={:?}",
231 step.edge_type_ids,
232 step.direction
233 );
234 std::thread::scope(|s| {
235 s.spawn(|| {
236 let rt = tokio::runtime::Builder::new_current_thread()
237 .enable_all()
238 .build()
239 .map_err(|e| {
240 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
241 })?;
242 rt.block_on(
243 self.graph_ctx
244 .ensure_adjacency_warmed(&step.edge_type_ids, step.direction),
245 )
246 .map_err(|e| DataFusionError::Execution(format!("CSR warming failed: {e}")))
247 })
248 .join()
249 .unwrap_or_else(|_| {
250 Err(DataFusionError::Execution(
251 "CSR warming thread panicked".to_string(),
252 ))
253 })
254 })?;
255 }
256
257 log::debug!(
258 "PatternComprehension: expanding {} anchor VIDs, steps={}",
259 anchor_vids.len(),
260 self.traversal_steps.len()
261 );
262
263 let expansion = self.expand_pattern(anchor_vids)?;
266
267 log::debug!(
268 "PatternComprehension: expansion produced {} rows",
269 expansion.row_indices.len()
270 );
271
272 if expansion.row_indices.is_empty() {
274 return self.build_empty_list_result(num_rows);
275 }
276
277 let indices_array = UInt32Array::from(expansion.row_indices.clone());
279 let mut inner_columns: Vec<ArrayRef> = Vec::new();
280
281 for col in batch.columns() {
283 inner_columns.push(take(col, &indices_array, None)?);
284 }
285
286 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
288 if let Some(ref _target_var) = step.target_variable {
289 inner_columns.push(Arc::new(UInt64Array::from(
290 expansion.step_target_vids[step_idx].clone(),
291 )));
292 }
293 if let Some(ref _edge_var) = step.edge_variable {
294 inner_columns.push(Arc::new(UInt64Array::from(
295 expansion.step_edge_ids[step_idx].clone(),
296 )));
297 }
298 }
299
300 let query_ctx = self.graph_ctx.query_context();
302 for (step_idx, step) in self.traversal_steps.iter().enumerate() {
303 if let Some(ref target_var) = step.target_variable
305 && let Some(props) = self.needed_vertex_props.get(target_var)
306 {
307 let vids: Vec<Vid> = expansion.step_target_vids[step_idx]
308 .iter()
309 .map(|v| Vid::from(*v))
310 .collect();
311
312 let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
313
314 let props_map = std::thread::scope(|s| {
315 s.spawn(|| {
316 let rt = tokio::runtime::Builder::new_current_thread()
317 .enable_all()
318 .build()
319 .map_err(|e| {
320 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
321 })?;
322 rt.block_on(self.graph_ctx.property_manager().get_batch_vertex_props(
323 &vids,
324 &prop_refs,
325 Some(&query_ctx),
326 ))
327 .map_err(|e| {
328 DataFusionError::Execution(format!("Vertex prop load failed: {e}"))
329 })
330 })
331 .join()
332 .unwrap_or_else(|_| {
333 Err(DataFusionError::Execution(
334 "Vertex prop load thread panicked".to_string(),
335 ))
336 })
337 })?;
338
339 for prop in props {
340 let col = build_property_column_static(
341 &vids,
342 &props_map,
343 prop,
344 &DataType::LargeBinary,
345 )?;
346 inner_columns.push(col);
347 }
348 }
349
350 if let Some(ref edge_var) = step.edge_variable
352 && let Some(props) = self.needed_edge_props.get(edge_var)
353 {
354 let eids: Vec<Eid> = expansion.step_edge_ids[step_idx]
355 .iter()
356 .map(|e| Eid::from(*e))
357 .collect();
358
359 let prop_refs: Vec<&str> = props.iter().map(|s| s.as_str()).collect();
360
361 let props_map = std::thread::scope(|s| {
362 s.spawn(|| {
363 let rt = tokio::runtime::Builder::new_current_thread()
364 .enable_all()
365 .build()
366 .map_err(|e| {
367 DataFusionError::Execution(format!("Runtime creation failed: {e}"))
368 })?;
369 rt.block_on(self.graph_ctx.property_manager().get_batch_edge_props(
370 &eids,
371 &prop_refs,
372 Some(&query_ctx),
373 ))
374 .map_err(|e| {
375 DataFusionError::Execution(format!("Edge prop load failed: {e}"))
376 })
377 })
378 .join()
379 .unwrap_or_else(|_| {
380 Err(DataFusionError::Execution(
381 "Edge prop load thread panicked".to_string(),
382 ))
383 })
384 })?;
385
386 let vid_keys: Vec<Vid> = eids.iter().map(|e| Vid::from(e.as_u64())).collect();
388 for prop in props {
389 let col = build_property_column_static(
390 &vid_keys,
391 &props_map,
392 prop,
393 &DataType::LargeBinary,
394 )?;
395 inner_columns.push(col);
396 }
397 }
398 }
399
400 if self.path_variable.is_some() {
402 let path_col = self.build_path_column(&expansion, anchor_vids, &query_ctx)?;
403 inner_columns.push(path_col);
404 }
405
406 let inner_batch = RecordBatch::try_new(self.inner_schema.clone(), inner_columns)?;
407
408 let (filtered_batch, filtered_indices) = if let Some(pred) = &self.predicate {
410 let mask = pred
411 .evaluate(&inner_batch)?
412 .into_array(inner_batch.num_rows())?;
413 let mask = cast(&mask, &DataType::Boolean)?;
414 let boolean_mask = mask
415 .as_any()
416 .downcast_ref::<BooleanArray>()
417 .ok_or_else(|| {
418 DataFusionError::Execution(
419 "Pattern comprehension predicate did not produce BooleanArray".to_string(),
420 )
421 })?;
422
423 let filtered_batch = filter_record_batch(&inner_batch, boolean_mask)?;
424 let indices_array_ref: ArrayRef = Arc::new(indices_array.clone());
425 let filtered_idx = filter(&indices_array_ref, boolean_mask)?;
426 let filtered_idx = filtered_idx
427 .as_any()
428 .downcast_ref::<UInt32Array>()
429 .unwrap()
430 .clone();
431
432 (filtered_batch, filtered_idx)
433 } else {
434 (inner_batch, indices_array.clone())
435 };
436
437 let mapped_val = self.map_expr.evaluate(&filtered_batch)?;
439 let mapped_array = mapped_val.into_array(filtered_batch.num_rows())?;
440
441 let new_offsets = {
443 let mut offsets = Vec::with_capacity(num_rows + 1);
444 offsets.push(0i64);
445
446 let indices_slice = filtered_indices.values();
447 let mut pos = 0;
448 let mut current_len: i64 = 0;
449
450 for row_idx in 0..num_rows {
451 while pos < indices_slice.len() && indices_slice[pos] as usize == row_idx {
452 pos += 1;
453 current_len += 1;
454 }
455 offsets.push(current_len);
456 }
457 OffsetBuffer::new(ScalarBuffer::from(offsets))
458 };
459
460 let new_field = Arc::new(Field::new("item", mapped_array.data_type().clone(), true));
461 let new_list = datafusion::arrow::array::LargeListArray::new(
462 new_field,
463 new_offsets,
464 mapped_array,
465 None,
466 );
467
468 Ok(ColumnarValue::Array(Arc::new(new_list)))
469 }
470
471 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
472 vec![]
475 }
476
477 fn with_new_children(
478 self: Arc<Self>,
479 children: Vec<Arc<dyn PhysicalExpr>>,
480 ) -> DFResult<Arc<dyn PhysicalExpr>> {
481 if !children.is_empty() {
482 return Err(DataFusionError::Internal(
483 "PatternComprehension has no children".to_string(),
484 ));
485 }
486 Ok(self)
487 }
488
489 fn fmt_sql(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490 write!(f, "PatternComprehension({})", self.anchor_column)
491 }
492}
493
494struct PatternExpansion {
496 row_indices: Vec<u32>,
498 anchor_vids: Vec<u64>,
500 step_target_vids: Vec<Vec<u64>>,
502 step_edge_ids: Vec<Vec<u64>>,
504 step_edge_type_ids: Vec<Vec<u32>>,
506}
507
508impl PatternComprehensionExecExpr {
509 fn expand_pattern(&self, anchor_vids: &UInt64Array) -> DFResult<PatternExpansion> {
514 let mut frontier_row_indices: Vec<u32> = Vec::new();
516 let mut frontier_vids: Vec<u64> = Vec::new();
517
518 for (row_idx, vid_opt) in anchor_vids.iter().enumerate() {
519 if let Some(vid_u64) = vid_opt {
520 frontier_row_indices.push(row_idx as u32);
521 frontier_vids.push(vid_u64);
522 }
523 }
524
525 let mut result_row_indices: Vec<u32> = frontier_row_indices.clone();
527 let mut result_anchor_vids: Vec<u64> = frontier_vids.clone();
529
530 let mut accumulated_target_vids: Vec<Vec<u64>> = Vec::new();
532 let mut accumulated_edge_ids: Vec<Vec<u64>> = Vec::new();
533 let mut accumulated_edge_type_ids: Vec<Vec<u32>> = Vec::new();
534
535 for step in &self.traversal_steps {
536 let is_undirected = step.direction == Direction::Both;
537 let query_ctx = self.graph_ctx.query_context();
538
539 let mut new_row_indices: Vec<u32> = Vec::new();
540 let mut new_anchor_vids: Vec<u64> = Vec::new();
541 let mut new_target_vids: Vec<u64> = Vec::new();
542 let mut new_edge_ids: Vec<u64> = Vec::new();
543 let mut new_edge_type_ids: Vec<u32> = Vec::new();
544 let num_prev_cols = accumulated_target_vids.len();
546 let mut new_accumulated_targets: Vec<Vec<u64>> = vec![Vec::new(); num_prev_cols];
547 let mut new_accumulated_edges: Vec<Vec<u64>> =
548 vec![Vec::new(); accumulated_edge_ids.len()];
549 let mut new_accumulated_edge_types: Vec<Vec<u32>> =
550 vec![Vec::new(); accumulated_edge_type_ids.len()];
551
552 for (i, &src_vid_u64) in frontier_vids.iter().enumerate() {
553 let vid = Vid::from(src_vid_u64);
554 let outer_row = result_row_indices[i];
555 let anchor_vid = result_anchor_vids[i];
556
557 let mut seen_edges: HashSet<u64> = HashSet::new();
558
559 for &edge_type in &step.edge_type_ids {
560 let neighbors = self.graph_ctx.get_neighbors(vid, edge_type, step.direction);
561
562 for (target_vid, eid) in neighbors {
563 let eid_u64 = eid.as_u64();
564
565 if is_undirected && !seen_edges.insert(eid_u64) {
567 continue;
568 }
569
570 if let Some(ref label_name) = step.target_label_name
572 && let Some(vertex_labels) =
573 l0_visibility::get_vertex_labels_optional(target_vid, &query_ctx)
574 && !vertex_labels.contains(label_name)
575 {
576 continue;
577 }
578
579 new_row_indices.push(outer_row);
580 new_anchor_vids.push(anchor_vid);
581 new_target_vids.push(target_vid.as_u64());
582 new_edge_ids.push(eid_u64);
583 new_edge_type_ids.push(edge_type);
584
585 for (col_idx, col) in accumulated_target_vids.iter().enumerate() {
587 new_accumulated_targets[col_idx].push(col[i]);
588 }
589 for (col_idx, col) in accumulated_edge_ids.iter().enumerate() {
590 new_accumulated_edges[col_idx].push(col[i]);
591 }
592 for (col_idx, col) in accumulated_edge_type_ids.iter().enumerate() {
593 new_accumulated_edge_types[col_idx].push(col[i]);
594 }
595 }
596 }
597 }
598
599 frontier_vids.clone_from(&new_target_vids);
601 result_row_indices = new_row_indices;
602 result_anchor_vids = new_anchor_vids;
603
604 new_accumulated_targets.push(new_target_vids);
606 new_accumulated_edges.push(new_edge_ids);
607 new_accumulated_edge_types.push(new_edge_type_ids);
608 accumulated_target_vids = new_accumulated_targets;
609 accumulated_edge_ids = new_accumulated_edges;
610 accumulated_edge_type_ids = new_accumulated_edge_types;
611 }
612
613 Ok(PatternExpansion {
614 row_indices: result_row_indices,
615 anchor_vids: result_anchor_vids,
616 step_target_vids: accumulated_target_vids,
617 step_edge_ids: accumulated_edge_ids,
618 step_edge_type_ids: accumulated_edge_type_ids,
619 })
620 }
621
622 fn build_empty_list_result(&self, num_rows: usize) -> DFResult<ColumnarValue> {
624 let offsets: Vec<i64> = vec![0; num_rows + 1];
625 let empty_values: ArrayRef = arrow_array::new_empty_array(&self.output_item_type);
626 let field = Arc::new(Field::new("item", self.output_item_type.clone(), true));
627 let list = datafusion::arrow::array::LargeListArray::new(
628 field,
629 OffsetBuffer::new(ScalarBuffer::from(offsets)),
630 empty_values,
631 None,
632 );
633 Ok(ColumnarValue::Array(Arc::new(list)))
634 }
635
636 fn build_path_column(
642 &self,
643 expansion: &PatternExpansion,
644 _anchor_vids: &UInt64Array,
645 query_ctx: &QueryContext,
646 ) -> DFResult<ArrayRef> {
647 use arrow_array::builder::{
648 LargeBinaryBuilder, ListBuilder, StringBuilder, StructBuilder, UInt64Builder,
649 };
650
651 let num_expanded = expansion.row_indices.len();
652
653 let node_struct_fields: Vec<Arc<Field>> =
654 crate::query::df_graph::common::node_struct_fields()
655 .iter()
656 .cloned()
657 .collect();
658 let edge_struct_fields: Vec<Arc<Field>> =
659 crate::query::df_graph::common::edge_struct_fields()
660 .iter()
661 .cloned()
662 .collect();
663
664 let mut nodes_builder = ListBuilder::new(StructBuilder::new(
665 node_struct_fields,
666 vec![
667 Box::new(UInt64Builder::new()),
668 Box::new(ListBuilder::new(StringBuilder::new())),
669 Box::new(LargeBinaryBuilder::new()),
670 ],
671 ));
672
673 let mut rels_builder = ListBuilder::new(StructBuilder::new(
674 edge_struct_fields,
675 vec![
676 Box::new(UInt64Builder::new()),
677 Box::new(StringBuilder::new()),
678 Box::new(UInt64Builder::new()),
679 Box::new(UInt64Builder::new()),
680 Box::new(LargeBinaryBuilder::new()),
681 ],
682 ));
683
684 let uni_schema = self.graph_ctx.storage().schema_manager().schema();
685 let num_steps = self.traversal_steps.len();
686
687 for row_idx in 0..num_expanded {
688 let anchor_vid_u64 = expansion.anchor_vids[row_idx];
690 let anchor_vid = Vid::from(anchor_vid_u64);
691
692 super::common::append_node_to_struct(nodes_builder.values(), anchor_vid, query_ctx);
694
695 for step_idx in 0..num_steps {
697 let target_vid = Vid::from(expansion.step_target_vids[step_idx][row_idx]);
698 super::common::append_node_to_struct(nodes_builder.values(), target_vid, query_ctx);
699 }
700 nodes_builder.append(true);
701
702 for step_idx in 0..num_steps {
704 let eid = Eid::from(expansion.step_edge_ids[step_idx][row_idx]);
705 let edge_type_id = expansion.step_edge_type_ids[step_idx][row_idx];
706 let edge_type_name = uni_schema
707 .edge_type_name_by_id_unified(edge_type_id)
708 .unwrap_or_default();
709
710 let src_vid = if step_idx == 0 {
713 anchor_vid_u64
714 } else {
715 expansion.step_target_vids[step_idx - 1][row_idx]
716 };
717 let dst_vid = expansion.step_target_vids[step_idx][row_idx];
718
719 let (stored_src, stored_dst) = self.graph_ctx.resolve_stored_edge_endpoints(
724 eid,
725 Vid::from(src_vid),
726 Vid::from(dst_vid),
727 &[edge_type_id],
728 );
729 super::common::append_edge_to_struct(
730 rels_builder.values(),
731 eid,
732 &edge_type_name,
733 stored_src,
734 stored_dst,
735 query_ctx,
736 );
737 }
738 rels_builder.append(true);
739 }
740
741 let nodes_array = Arc::new(nodes_builder.finish()) as ArrayRef;
742 let rels_array = Arc::new(rels_builder.finish()) as ArrayRef;
743
744 let nodes_field = Arc::new(Field::new("nodes", nodes_array.data_type().clone(), true));
745 let rels_field = Arc::new(Field::new(
746 "relationships",
747 rels_array.data_type().clone(),
748 true,
749 ));
750
751 let path_struct = arrow_array::StructArray::try_new(
752 vec![nodes_field, rels_field].into(),
753 vec![nodes_array, rels_array],
754 None,
755 )
756 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
757
758 Ok(Arc::new(path_struct))
759 }
760}
761
762pub fn analyze_pattern(
769 pattern: &Pattern,
770 input_schema: &Schema,
771 uni_schema: &UniSchema,
772) -> anyhow::Result<(String, Vec<TraversalStep>)> {
773 if pattern.paths.is_empty() {
774 return Err(anyhow::anyhow!(
775 "Pattern comprehension requires at least one path"
776 ));
777 }
778
779 let path = &pattern.paths[0];
780 let elements = &path.elements;
781
782 if elements.is_empty() {
783 return Err(anyhow::anyhow!(
784 "Pattern comprehension path has no elements"
785 ));
786 }
787
788 let (anchor_idx, anchor_var) = find_anchor_node(elements, input_schema)?;
790
791 let anchor_column = format!("{}._vid", anchor_var);
792
793 let steps = build_traversal_steps(elements, anchor_idx, uni_schema)?;
795
796 Ok((anchor_column, steps))
797}
798
799fn find_anchor_node(
804 elements: &[PatternElement],
805 input_schema: &Schema,
806) -> anyhow::Result<(usize, String)> {
807 for (idx, elem) in elements.iter().enumerate() {
808 if let PatternElement::Node(node) = elem
809 && let Some(ref var) = node.variable
810 {
811 let vid_col = format!("{}._vid", var);
812 if input_schema.column_with_name(&vid_col).is_some() {
813 return Ok((idx, var.clone()));
814 }
815 }
816 }
817
818 Err(anyhow::anyhow!(
819 "No anchor node found in pattern comprehension. \
820 None of the pattern variables have a corresponding `_vid` column in the input schema. \
821 Schema fields: {:?}",
822 input_schema
823 .fields()
824 .iter()
825 .map(|f| f.name().as_str())
826 .collect::<Vec<_>>()
827 ))
828}
829
830fn build_traversal_steps(
835 elements: &[PatternElement],
836 anchor_idx: usize,
837 uni_schema: &UniSchema,
838) -> anyhow::Result<Vec<TraversalStep>> {
839 let mut steps = Vec::new();
840
841 let mut i = anchor_idx + 1;
843 while i + 1 < elements.len() {
844 let rel_elem = &elements[i];
845 let target_elem = &elements[i + 1];
846
847 let PatternElement::Relationship(rel) = rel_elem else {
848 return Err(anyhow::anyhow!(
849 "Expected relationship at pattern index {}, got {:?}",
850 i,
851 rel_elem
852 ));
853 };
854
855 let PatternElement::Node(target_node) = target_elem else {
856 return Err(anyhow::anyhow!(
857 "Expected node at pattern index {}, got {:?}",
858 i + 1,
859 target_elem
860 ));
861 };
862
863 let step = build_step_from_rel_and_node(rel, target_node, uni_schema)?;
864 steps.push(step);
865
866 i += 2;
867 }
868
869 if steps.is_empty() {
870 return Err(anyhow::anyhow!(
871 "Pattern comprehension has no traversal steps after anchor"
872 ));
873 }
874
875 Ok(steps)
876}
877
878fn build_step_from_rel_and_node(
880 rel: &RelationshipPattern,
881 target_node: &NodePattern,
882 uni_schema: &UniSchema,
883) -> anyhow::Result<TraversalStep> {
884 let edge_type_ids = if rel.types.is_empty() {
886 uni_schema.all_edge_type_ids()
888 } else {
889 rel.types
890 .iter()
891 .filter_map(|t| resolve_edge_type_id_unified(uni_schema, t))
892 .collect()
893 };
894
895 if edge_type_ids.is_empty() && !rel.types.is_empty() {
896 return Ok(TraversalStep {
899 edge_type_ids: vec![],
900 direction: convert_direction(&rel.direction),
901 target_variable: target_node.variable.clone(),
902 target_label_name: target_node.labels.first().cloned(),
903 edge_variable: rel.variable.clone(),
904 });
905 }
906
907 let direction = convert_direction(&rel.direction);
908 let target_label_name = target_node.labels.first().cloned();
909
910 Ok(TraversalStep {
911 edge_type_ids,
912 direction,
913 target_variable: target_node.variable.clone(),
914 target_label_name,
915 edge_variable: rel.variable.clone(),
916 })
917}
918
919fn resolve_edge_type_id_unified(uni_schema: &UniSchema, type_name: &str) -> Option<u32> {
922 uni_schema.edge_type_id_unified_case_insensitive(type_name)
923}
924
925fn convert_direction(ast_dir: &AstDirection) -> Direction {
927 match ast_dir {
928 AstDirection::Outgoing => Direction::Outgoing,
929 AstDirection::Incoming => Direction::Incoming,
930 AstDirection::Both => Direction::Both,
931 }
932}
933
934pub fn collect_inner_properties(
940 where_clause: Option<&Expr>,
941 map_expr: &Expr,
942 steps: &[TraversalStep],
943) -> (HashMap<String, Vec<String>>, HashMap<String, Vec<String>>) {
944 let mut vertex_props: HashMap<String, Vec<String>> = HashMap::new();
945 let mut edge_props: HashMap<String, Vec<String>> = HashMap::new();
946
947 let node_vars: HashSet<String> = steps
949 .iter()
950 .filter_map(|s| s.target_variable.clone())
951 .collect();
952 let edge_vars: HashSet<String> = steps
953 .iter()
954 .filter_map(|s| s.edge_variable.clone())
955 .collect();
956
957 let mut exprs_to_visit: Vec<&Expr> = vec![map_expr];
959 if let Some(w) = where_clause {
960 exprs_to_visit.push(w);
961 }
962
963 while let Some(expr) = exprs_to_visit.pop() {
964 match expr {
965 Expr::Property(base, prop) => {
966 if let Expr::Variable(var) = base.as_ref() {
967 if node_vars.contains(var) {
968 vertex_props
969 .entry(var.clone())
970 .or_default()
971 .push(prop.clone());
972 } else if edge_vars.contains(var) {
973 edge_props
974 .entry(var.clone())
975 .or_default()
976 .push(prop.clone());
977 }
978 }
979 exprs_to_visit.push(base);
981 }
982 Expr::BinaryOp { left, right, .. } => {
983 exprs_to_visit.push(left);
984 exprs_to_visit.push(right);
985 }
986 Expr::UnaryOp { expr: inner, .. } => {
987 exprs_to_visit.push(inner);
988 }
989 Expr::FunctionCall { args, .. } => {
990 for arg in args {
991 exprs_to_visit.push(arg);
992 }
993 }
994 Expr::Case {
995 when_then,
996 else_expr,
997 ..
998 } => {
999 for (w, t) in when_then {
1000 exprs_to_visit.push(w);
1001 exprs_to_visit.push(t);
1002 }
1003 if let Some(e) = else_expr {
1004 exprs_to_visit.push(e);
1005 }
1006 }
1007 Expr::IsNull(inner) | Expr::IsNotNull(inner) => {
1008 exprs_to_visit.push(inner);
1009 }
1010 Expr::List(items) => {
1011 for item in items {
1012 exprs_to_visit.push(item);
1013 }
1014 }
1015 Expr::Map(entries) => {
1016 for (_, v) in entries {
1017 exprs_to_visit.push(v);
1018 }
1019 }
1020 Expr::In { expr: l, list: r } => {
1021 exprs_to_visit.push(l);
1022 exprs_to_visit.push(r);
1023 }
1024 _ => {}
1025 }
1026 }
1027
1028 for props in vertex_props.values_mut() {
1030 props.sort();
1031 props.dedup();
1032 }
1033 for props in edge_props.values_mut() {
1034 props.sort();
1035 props.dedup();
1036 }
1037
1038 (vertex_props, edge_props)
1039}
1040
1041pub fn build_inner_schema(
1047 input_schema: &Schema,
1048 steps: &[TraversalStep],
1049 vertex_props: &HashMap<String, Vec<String>>,
1050 edge_props: &HashMap<String, Vec<String>>,
1051 path_variable: Option<&str>,
1052) -> Schema {
1053 let mut fields: Vec<Arc<Field>> = input_schema.fields().to_vec();
1054
1055 for step in steps {
1056 if let Some(ref target_var) = step.target_variable {
1058 fields.push(Arc::new(Field::new(
1059 format!("{}._vid", target_var),
1060 DataType::UInt64,
1061 true,
1062 )));
1063 }
1064
1065 if let Some(ref edge_var) = step.edge_variable {
1067 fields.push(Arc::new(Field::new(
1068 format!("{}._eid", edge_var),
1069 DataType::UInt64,
1070 true,
1071 )));
1072 }
1073 }
1074
1075 for step in steps {
1077 if let Some(ref target_var) = step.target_variable
1078 && let Some(props) = vertex_props.get(target_var)
1079 {
1080 for prop in props {
1081 fields.push(Arc::new(Field::new(
1082 format!("{}.{}", target_var, prop),
1083 DataType::LargeBinary,
1084 true,
1085 )));
1086 }
1087 }
1088 }
1089
1090 for step in steps {
1092 if let Some(ref edge_var) = step.edge_variable
1093 && let Some(props) = edge_props.get(edge_var)
1094 {
1095 for prop in props {
1096 fields.push(Arc::new(Field::new(
1097 format!("{}.{}", edge_var, prop),
1098 DataType::LargeBinary,
1099 true,
1100 )));
1101 }
1102 }
1103 }
1104
1105 if let Some(path_var) = path_variable {
1107 fields.push(Arc::new(build_path_struct_field(path_var)));
1108 }
1109
1110 Schema::new(fields)
1111}