1use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47}
48
49pub struct ProjectOperator {
51 child: Box<dyn Operator>,
53 projections: Vec<ProjectExpr>,
55 output_types: Vec<LogicalType>,
57 store: Option<Arc<dyn GraphStore>>,
59}
60
61impl ProjectOperator {
62 pub fn new(
64 child: Box<dyn Operator>,
65 projections: Vec<ProjectExpr>,
66 output_types: Vec<LogicalType>,
67 ) -> Self {
68 assert_eq!(projections.len(), output_types.len());
69 Self {
70 child,
71 projections,
72 output_types,
73 store: None,
74 }
75 }
76
77 pub fn with_store(
79 child: Box<dyn Operator>,
80 projections: Vec<ProjectExpr>,
81 output_types: Vec<LogicalType>,
82 store: Arc<dyn GraphStore>,
83 ) -> Self {
84 assert_eq!(projections.len(), output_types.len());
85 Self {
86 child,
87 projections,
88 output_types,
89 store: Some(store),
90 }
91 }
92
93 pub fn select_columns(
95 child: Box<dyn Operator>,
96 columns: Vec<usize>,
97 types: Vec<LogicalType>,
98 ) -> Self {
99 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
100 Self::new(child, projections, types)
101 }
102}
103
104impl Operator for ProjectOperator {
105 fn next(&mut self) -> OperatorResult {
106 let Some(input) = self.child.next()? else {
108 return Ok(None);
109 };
110
111 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
113
114 for (i, proj) in self.projections.iter().enumerate() {
116 match proj {
117 ProjectExpr::Column(col_idx) => {
118 let input_col = input.column(*col_idx).ok_or_else(|| {
120 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
121 })?;
122
123 let output_col = output
124 .column_mut(i)
125 .expect("column exists: index matches projection schema");
126
127 for row in input.selected_indices() {
129 if let Some(value) = input_col.get_value(row) {
130 output_col.push_value(value);
131 }
132 }
133 }
134 ProjectExpr::Constant(value) => {
135 let output_col = output
137 .column_mut(i)
138 .expect("column exists: index matches projection schema");
139 for _ in input.selected_indices() {
140 output_col.push_value(value.clone());
141 }
142 }
143 ProjectExpr::PropertyAccess { column, property } => {
144 let input_col = input
146 .column(*column)
147 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
148
149 let output_col = output
150 .column_mut(i)
151 .expect("column exists: index matches projection schema");
152
153 let store = self.store.as_ref().ok_or_else(|| {
154 OperatorError::Execution("Store required for property access".to_string())
155 })?;
156
157 let prop_key = PropertyKey::new(property);
164 for row in input.selected_indices() {
165 let value = if let Some(node_id) = input_col.get_node_id(row) {
166 if let Some(prop) = store
167 .get_node(node_id)
168 .and_then(|node| node.get_property(property).cloned())
169 {
170 prop
171 } else if let Some(edge_id) = input_col.get_edge_id(row) {
172 store
175 .get_edge(edge_id)
176 .and_then(|edge| edge.get_property(property).cloned())
177 .unwrap_or(Value::Null)
178 } else {
179 Value::Null
180 }
181 } else if let Some(edge_id) = input_col.get_edge_id(row) {
182 store
183 .get_edge(edge_id)
184 .and_then(|edge| edge.get_property(property).cloned())
185 .unwrap_or(Value::Null)
186 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
187 map.get(&prop_key).cloned().unwrap_or(Value::Null)
188 } else {
189 Value::Null
190 };
191 output_col.push_value(value);
192 }
193 }
194 ProjectExpr::EdgeType { column } => {
195 let input_col = input
197 .column(*column)
198 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
199
200 let output_col = output
201 .column_mut(i)
202 .expect("column exists: index matches projection schema");
203
204 let store = self.store.as_ref().ok_or_else(|| {
205 OperatorError::Execution("Store required for edge type access".to_string())
206 })?;
207
208 for row in input.selected_indices() {
209 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
210 store.edge_type(edge_id).map_or(Value::Null, Value::String)
211 } else {
212 Value::Null
213 };
214 output_col.push_value(value);
215 }
216 }
217 ProjectExpr::Expression {
218 expr,
219 variable_columns,
220 } => {
221 let output_col = output
222 .column_mut(i)
223 .expect("column exists: index matches projection schema");
224
225 let store = self.store.as_ref().ok_or_else(|| {
226 OperatorError::Execution(
227 "Store required for expression evaluation".to_string(),
228 )
229 })?;
230
231 let evaluator = ExpressionPredicate::new(
233 expr.clone(),
234 variable_columns.clone(),
235 Arc::clone(store),
236 );
237
238 for row in input.selected_indices() {
239 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
240 output_col.push_value(value);
241 }
242 }
243 ProjectExpr::NodeResolve { column } => {
244 let input_col = input
245 .column(*column)
246 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
247
248 let output_col = output
249 .column_mut(i)
250 .expect("column exists: index matches projection schema");
251
252 let store = self.store.as_ref().ok_or_else(|| {
253 OperatorError::Execution("Store required for node resolution".to_string())
254 })?;
255
256 for row in input.selected_indices() {
257 let value = if let Some(node_id) = input_col.get_node_id(row) {
258 store
259 .get_node(node_id)
260 .map_or(Value::Null, |n| node_to_map(&n))
261 } else {
262 Value::Null
263 };
264 output_col.push_value(value);
265 }
266 }
267 ProjectExpr::EdgeResolve { column } => {
268 let input_col = input
269 .column(*column)
270 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
271
272 let output_col = output
273 .column_mut(i)
274 .expect("column exists: index matches projection schema");
275
276 let store = self.store.as_ref().ok_or_else(|| {
277 OperatorError::Execution("Store required for edge resolution".to_string())
278 })?;
279
280 for row in input.selected_indices() {
281 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
282 store
283 .get_edge(edge_id)
284 .map_or(Value::Null, |e| edge_to_map(&e))
285 } else {
286 Value::Null
287 };
288 output_col.push_value(value);
289 }
290 }
291 }
292 }
293
294 output.set_count(input.row_count());
295 Ok(Some(output))
296 }
297
298 fn reset(&mut self) {
299 self.child.reset();
300 }
301
302 fn name(&self) -> &'static str {
303 "Project"
304 }
305}
306
307fn node_to_map(node: &Node) -> Value {
312 let mut map = BTreeMap::new();
313 map.insert(
314 PropertyKey::new("_id"),
315 Value::Int64(node.id.as_u64() as i64),
316 );
317 let labels: Vec<Value> = node
318 .labels
319 .iter()
320 .map(|l| Value::String(l.clone()))
321 .collect();
322 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
323 for (key, value) in &node.properties {
324 map.insert(key.clone(), value.clone());
325 }
326 Value::Map(Arc::new(map))
327}
328
329fn edge_to_map(edge: &Edge) -> Value {
334 let mut map = BTreeMap::new();
335 map.insert(
336 PropertyKey::new("_id"),
337 Value::Int64(edge.id.as_u64() as i64),
338 );
339 map.insert(
340 PropertyKey::new("_type"),
341 Value::String(edge.edge_type.clone()),
342 );
343 map.insert(
344 PropertyKey::new("_source"),
345 Value::Int64(edge.src.as_u64() as i64),
346 );
347 map.insert(
348 PropertyKey::new("_target"),
349 Value::Int64(edge.dst.as_u64() as i64),
350 );
351 for (key, value) in &edge.properties {
352 map.insert(key.clone(), value.clone());
353 }
354 Value::Map(Arc::new(map))
355}
356
357#[cfg(test)]
358mod tests {
359 use super::*;
360 use crate::execution::chunk::DataChunkBuilder;
361 use crate::graph::lpg::LpgStore;
362 use grafeo_common::types::Value;
363
364 struct MockScanOperator {
365 chunks: Vec<DataChunk>,
366 position: usize,
367 }
368
369 impl Operator for MockScanOperator {
370 fn next(&mut self) -> OperatorResult {
371 if self.position < self.chunks.len() {
372 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
373 self.position += 1;
374 Ok(Some(chunk))
375 } else {
376 Ok(None)
377 }
378 }
379
380 fn reset(&mut self) {
381 self.position = 0;
382 }
383
384 fn name(&self) -> &'static str {
385 "MockScan"
386 }
387 }
388
389 #[test]
390 fn test_project_select_columns() {
391 let mut builder =
393 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
394
395 builder.column_mut(0).unwrap().push_int64(1);
396 builder.column_mut(1).unwrap().push_string("hello");
397 builder.column_mut(2).unwrap().push_int64(100);
398 builder.advance_row();
399
400 builder.column_mut(0).unwrap().push_int64(2);
401 builder.column_mut(1).unwrap().push_string("world");
402 builder.column_mut(2).unwrap().push_int64(200);
403 builder.advance_row();
404
405 let chunk = builder.finish();
406
407 let mock_scan = MockScanOperator {
408 chunks: vec![chunk],
409 position: 0,
410 };
411
412 let mut project = ProjectOperator::select_columns(
414 Box::new(mock_scan),
415 vec![2, 0],
416 vec![LogicalType::Int64, LogicalType::Int64],
417 );
418
419 let result = project.next().unwrap().unwrap();
420
421 assert_eq!(result.column_count(), 2);
422 assert_eq!(result.row_count(), 2);
423
424 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
426 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
427 }
428
429 #[test]
430 fn test_project_constant() {
431 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
432 builder.column_mut(0).unwrap().push_int64(1);
433 builder.advance_row();
434 builder.column_mut(0).unwrap().push_int64(2);
435 builder.advance_row();
436
437 let chunk = builder.finish();
438
439 let mock_scan = MockScanOperator {
440 chunks: vec![chunk],
441 position: 0,
442 };
443
444 let mut project = ProjectOperator::new(
446 Box::new(mock_scan),
447 vec![
448 ProjectExpr::Column(0),
449 ProjectExpr::Constant(Value::String("constant".into())),
450 ],
451 vec![LogicalType::Int64, LogicalType::String],
452 );
453
454 let result = project.next().unwrap().unwrap();
455
456 assert_eq!(result.column_count(), 2);
457 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
458 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
459 }
460
461 #[test]
462 fn test_project_empty_input() {
463 let mock_scan = MockScanOperator {
464 chunks: vec![],
465 position: 0,
466 };
467
468 let mut project =
469 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
470
471 assert!(project.next().unwrap().is_none());
472 }
473
474 #[test]
475 fn test_project_column_not_found() {
476 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
477 builder.column_mut(0).unwrap().push_int64(1);
478 builder.advance_row();
479 let chunk = builder.finish();
480
481 let mock_scan = MockScanOperator {
482 chunks: vec![chunk],
483 position: 0,
484 };
485
486 let mut project = ProjectOperator::new(
488 Box::new(mock_scan),
489 vec![ProjectExpr::Column(5)],
490 vec![LogicalType::Int64],
491 );
492
493 let result = project.next();
494 assert!(result.is_err(), "Should fail with ColumnNotFound");
495 }
496
497 #[test]
498 fn test_project_multiple_constants() {
499 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
500 builder.column_mut(0).unwrap().push_int64(1);
501 builder.advance_row();
502 let chunk = builder.finish();
503
504 let mock_scan = MockScanOperator {
505 chunks: vec![chunk],
506 position: 0,
507 };
508
509 let mut project = ProjectOperator::new(
510 Box::new(mock_scan),
511 vec![
512 ProjectExpr::Constant(Value::Int64(42)),
513 ProjectExpr::Constant(Value::String("fixed".into())),
514 ProjectExpr::Constant(Value::Bool(true)),
515 ],
516 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
517 );
518
519 let result = project.next().unwrap().unwrap();
520 assert_eq!(result.column_count(), 3);
521 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
522 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
523 assert_eq!(
524 result.column(2).unwrap().get_value(0),
525 Some(Value::Bool(true))
526 );
527 }
528
529 #[test]
530 fn test_project_identity() {
531 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
533 builder.column_mut(0).unwrap().push_int64(10);
534 builder.column_mut(1).unwrap().push_string("test");
535 builder.advance_row();
536 let chunk = builder.finish();
537
538 let mock_scan = MockScanOperator {
539 chunks: vec![chunk],
540 position: 0,
541 };
542
543 let mut project = ProjectOperator::select_columns(
544 Box::new(mock_scan),
545 vec![0, 1],
546 vec![LogicalType::Int64, LogicalType::String],
547 );
548
549 let result = project.next().unwrap().unwrap();
550 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
551 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
552 }
553
554 #[test]
555 fn test_project_name() {
556 let mock_scan = MockScanOperator {
557 chunks: vec![],
558 position: 0,
559 };
560 let project =
561 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
562 assert_eq!(project.name(), "Project");
563 }
564
565 #[test]
566 fn test_project_node_resolve() {
567 let store = LpgStore::new().unwrap();
569 let node_id = store.create_node(&["Person"]);
570 store.set_node_property(node_id, "name", Value::String("Alix".into()));
571 store.set_node_property(node_id, "age", Value::Int64(30));
572
573 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
575 builder.column_mut(0).unwrap().push_node_id(node_id);
576 builder.advance_row();
577 let chunk = builder.finish();
578
579 let mock_scan = MockScanOperator {
580 chunks: vec![chunk],
581 position: 0,
582 };
583
584 let mut project = ProjectOperator::with_store(
585 Box::new(mock_scan),
586 vec![ProjectExpr::NodeResolve { column: 0 }],
587 vec![LogicalType::Any],
588 Arc::new(store),
589 );
590
591 let result = project.next().unwrap().unwrap();
592 assert_eq!(result.column_count(), 1);
593
594 let value = result.column(0).unwrap().get_value(0).unwrap();
595 if let Value::Map(map) = value {
596 assert_eq!(
597 map.get(&PropertyKey::new("_id")),
598 Some(&Value::Int64(node_id.as_u64() as i64))
599 );
600 assert!(map.get(&PropertyKey::new("_labels")).is_some());
601 assert_eq!(
602 map.get(&PropertyKey::new("name")),
603 Some(&Value::String("Alix".into()))
604 );
605 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
606 } else {
607 panic!("Expected Value::Map, got {:?}", value);
608 }
609 }
610
611 #[test]
612 fn test_project_edge_resolve() {
613 let store = LpgStore::new().unwrap();
614 let src = store.create_node(&["Person"]);
615 let dst = store.create_node(&["Company"]);
616 let edge_id = store.create_edge(src, dst, "WORKS_AT");
617 store.set_edge_property(edge_id, "since", Value::Int64(2020));
618
619 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
621 builder.column_mut(0).unwrap().push_edge_id(edge_id);
622 builder.advance_row();
623 let chunk = builder.finish();
624
625 let mock_scan = MockScanOperator {
626 chunks: vec![chunk],
627 position: 0,
628 };
629
630 let mut project = ProjectOperator::with_store(
631 Box::new(mock_scan),
632 vec![ProjectExpr::EdgeResolve { column: 0 }],
633 vec![LogicalType::Any],
634 Arc::new(store),
635 );
636
637 let result = project.next().unwrap().unwrap();
638 let value = result.column(0).unwrap().get_value(0).unwrap();
639 if let Value::Map(map) = value {
640 assert_eq!(
641 map.get(&PropertyKey::new("_id")),
642 Some(&Value::Int64(edge_id.as_u64() as i64))
643 );
644 assert_eq!(
645 map.get(&PropertyKey::new("_type")),
646 Some(&Value::String("WORKS_AT".into()))
647 );
648 assert_eq!(
649 map.get(&PropertyKey::new("_source")),
650 Some(&Value::Int64(src.as_u64() as i64))
651 );
652 assert_eq!(
653 map.get(&PropertyKey::new("_target")),
654 Some(&Value::Int64(dst.as_u64() as i64))
655 );
656 assert_eq!(
657 map.get(&PropertyKey::new("since")),
658 Some(&Value::Int64(2020))
659 );
660 } else {
661 panic!("Expected Value::Map, got {:?}", value);
662 }
663 }
664
665 #[test]
666 fn test_project_resolve_missing_entity() {
667 use grafeo_common::types::NodeId;
668
669 let store = LpgStore::new().unwrap();
670
671 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
673 builder
674 .column_mut(0)
675 .unwrap()
676 .push_node_id(NodeId::new(999));
677 builder.advance_row();
678 let chunk = builder.finish();
679
680 let mock_scan = MockScanOperator {
681 chunks: vec![chunk],
682 position: 0,
683 };
684
685 let mut project = ProjectOperator::with_store(
686 Box::new(mock_scan),
687 vec![ProjectExpr::NodeResolve { column: 0 }],
688 vec![LogicalType::Any],
689 Arc::new(store),
690 );
691
692 let result = project.next().unwrap().unwrap();
693 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
694 }
695}