1use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{LogicalType, PropertyKey, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12pub enum ProjectExpr {
14 Column(usize),
16 Constant(Value),
18 PropertyAccess {
20 column: usize,
22 property: String,
24 },
25 EdgeType {
27 column: usize,
29 },
30 Expression {
32 expr: FilterExpression,
34 variable_columns: HashMap<String, usize>,
36 },
37 NodeResolve {
39 column: usize,
41 },
42 EdgeResolve {
44 column: usize,
46 },
47}
48
49pub struct ProjectOperator {
51 child: Box<dyn Operator>,
53 projections: Vec<ProjectExpr>,
55 output_types: Vec<LogicalType>,
57 store: Option<Arc<dyn GraphStore>>,
59}
60
61impl ProjectOperator {
62 pub fn new(
64 child: Box<dyn Operator>,
65 projections: Vec<ProjectExpr>,
66 output_types: Vec<LogicalType>,
67 ) -> Self {
68 assert_eq!(projections.len(), output_types.len());
69 Self {
70 child,
71 projections,
72 output_types,
73 store: None,
74 }
75 }
76
77 pub fn with_store(
79 child: Box<dyn Operator>,
80 projections: Vec<ProjectExpr>,
81 output_types: Vec<LogicalType>,
82 store: Arc<dyn GraphStore>,
83 ) -> Self {
84 assert_eq!(projections.len(), output_types.len());
85 Self {
86 child,
87 projections,
88 output_types,
89 store: Some(store),
90 }
91 }
92
93 pub fn select_columns(
95 child: Box<dyn Operator>,
96 columns: Vec<usize>,
97 types: Vec<LogicalType>,
98 ) -> Self {
99 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
100 Self::new(child, projections, types)
101 }
102}
103
104impl Operator for ProjectOperator {
105 fn next(&mut self) -> OperatorResult {
106 let Some(input) = self.child.next()? else {
108 return Ok(None);
109 };
110
111 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
113
114 for (i, proj) in self.projections.iter().enumerate() {
116 match proj {
117 ProjectExpr::Column(col_idx) => {
118 let input_col = input.column(*col_idx).ok_or_else(|| {
120 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
121 })?;
122
123 let output_col = output
124 .column_mut(i)
125 .expect("column exists: index matches projection schema");
126
127 for row in input.selected_indices() {
129 if let Some(value) = input_col.get_value(row) {
130 output_col.push_value(value);
131 }
132 }
133 }
134 ProjectExpr::Constant(value) => {
135 let output_col = output
137 .column_mut(i)
138 .expect("column exists: index matches projection schema");
139 for _ in input.selected_indices() {
140 output_col.push_value(value.clone());
141 }
142 }
143 ProjectExpr::PropertyAccess { column, property } => {
144 let input_col = input
146 .column(*column)
147 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
148
149 let output_col = output
150 .column_mut(i)
151 .expect("column exists: index matches projection schema");
152
153 let store = self.store.as_ref().ok_or_else(|| {
154 OperatorError::Execution("Store required for property access".to_string())
155 })?;
156
157 let prop_key = PropertyKey::new(property);
159 for row in input.selected_indices() {
160 let value = if let Some(node_id) = input_col.get_node_id(row) {
162 store
163 .get_node(node_id)
164 .and_then(|node| node.get_property(property).cloned())
165 .unwrap_or(Value::Null)
166 } else if let Some(edge_id) = input_col.get_edge_id(row) {
167 store
168 .get_edge(edge_id)
169 .and_then(|edge| edge.get_property(property).cloned())
170 .unwrap_or(Value::Null)
171 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
172 map.get(&prop_key).cloned().unwrap_or(Value::Null)
173 } else {
174 Value::Null
175 };
176 output_col.push_value(value);
177 }
178 }
179 ProjectExpr::EdgeType { column } => {
180 let input_col = input
182 .column(*column)
183 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
184
185 let output_col = output
186 .column_mut(i)
187 .expect("column exists: index matches projection schema");
188
189 let store = self.store.as_ref().ok_or_else(|| {
190 OperatorError::Execution("Store required for edge type access".to_string())
191 })?;
192
193 for row in input.selected_indices() {
194 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
195 store.edge_type(edge_id).map_or(Value::Null, Value::String)
196 } else {
197 Value::Null
198 };
199 output_col.push_value(value);
200 }
201 }
202 ProjectExpr::Expression {
203 expr,
204 variable_columns,
205 } => {
206 let output_col = output
207 .column_mut(i)
208 .expect("column exists: index matches projection schema");
209
210 let store = self.store.as_ref().ok_or_else(|| {
211 OperatorError::Execution(
212 "Store required for expression evaluation".to_string(),
213 )
214 })?;
215
216 let evaluator = ExpressionPredicate::new(
218 expr.clone(),
219 variable_columns.clone(),
220 Arc::clone(store),
221 );
222
223 for row in input.selected_indices() {
224 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
225 output_col.push_value(value);
226 }
227 }
228 ProjectExpr::NodeResolve { column } => {
229 let input_col = input
230 .column(*column)
231 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
232
233 let output_col = output
234 .column_mut(i)
235 .expect("column exists: index matches projection schema");
236
237 let store = self.store.as_ref().ok_or_else(|| {
238 OperatorError::Execution("Store required for node resolution".to_string())
239 })?;
240
241 for row in input.selected_indices() {
242 let value = if let Some(node_id) = input_col.get_node_id(row) {
243 store
244 .get_node(node_id)
245 .map_or(Value::Null, |n| node_to_map(&n))
246 } else {
247 Value::Null
248 };
249 output_col.push_value(value);
250 }
251 }
252 ProjectExpr::EdgeResolve { column } => {
253 let input_col = input
254 .column(*column)
255 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
256
257 let output_col = output
258 .column_mut(i)
259 .expect("column exists: index matches projection schema");
260
261 let store = self.store.as_ref().ok_or_else(|| {
262 OperatorError::Execution("Store required for edge resolution".to_string())
263 })?;
264
265 for row in input.selected_indices() {
266 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
267 store
268 .get_edge(edge_id)
269 .map_or(Value::Null, |e| edge_to_map(&e))
270 } else {
271 Value::Null
272 };
273 output_col.push_value(value);
274 }
275 }
276 }
277 }
278
279 output.set_count(input.row_count());
280 Ok(Some(output))
281 }
282
283 fn reset(&mut self) {
284 self.child.reset();
285 }
286
287 fn name(&self) -> &'static str {
288 "Project"
289 }
290}
291
292fn node_to_map(node: &Node) -> Value {
297 let mut map = BTreeMap::new();
298 map.insert(
299 PropertyKey::new("_id"),
300 Value::Int64(node.id.as_u64() as i64),
301 );
302 let labels: Vec<Value> = node
303 .labels
304 .iter()
305 .map(|l| Value::String(l.clone()))
306 .collect();
307 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
308 for (key, value) in &node.properties {
309 map.insert(key.clone(), value.clone());
310 }
311 Value::Map(Arc::new(map))
312}
313
314fn edge_to_map(edge: &Edge) -> Value {
319 let mut map = BTreeMap::new();
320 map.insert(
321 PropertyKey::new("_id"),
322 Value::Int64(edge.id.as_u64() as i64),
323 );
324 map.insert(
325 PropertyKey::new("_type"),
326 Value::String(edge.edge_type.clone()),
327 );
328 map.insert(
329 PropertyKey::new("_source"),
330 Value::Int64(edge.src.as_u64() as i64),
331 );
332 map.insert(
333 PropertyKey::new("_target"),
334 Value::Int64(edge.dst.as_u64() as i64),
335 );
336 for (key, value) in &edge.properties {
337 map.insert(key.clone(), value.clone());
338 }
339 Value::Map(Arc::new(map))
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345 use crate::execution::chunk::DataChunkBuilder;
346 use crate::graph::lpg::LpgStore;
347 use grafeo_common::types::Value;
348
349 struct MockScanOperator {
350 chunks: Vec<DataChunk>,
351 position: usize,
352 }
353
354 impl Operator for MockScanOperator {
355 fn next(&mut self) -> OperatorResult {
356 if self.position < self.chunks.len() {
357 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
358 self.position += 1;
359 Ok(Some(chunk))
360 } else {
361 Ok(None)
362 }
363 }
364
365 fn reset(&mut self) {
366 self.position = 0;
367 }
368
369 fn name(&self) -> &'static str {
370 "MockScan"
371 }
372 }
373
374 #[test]
375 fn test_project_select_columns() {
376 let mut builder =
378 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
379
380 builder.column_mut(0).unwrap().push_int64(1);
381 builder.column_mut(1).unwrap().push_string("hello");
382 builder.column_mut(2).unwrap().push_int64(100);
383 builder.advance_row();
384
385 builder.column_mut(0).unwrap().push_int64(2);
386 builder.column_mut(1).unwrap().push_string("world");
387 builder.column_mut(2).unwrap().push_int64(200);
388 builder.advance_row();
389
390 let chunk = builder.finish();
391
392 let mock_scan = MockScanOperator {
393 chunks: vec![chunk],
394 position: 0,
395 };
396
397 let mut project = ProjectOperator::select_columns(
399 Box::new(mock_scan),
400 vec![2, 0],
401 vec![LogicalType::Int64, LogicalType::Int64],
402 );
403
404 let result = project.next().unwrap().unwrap();
405
406 assert_eq!(result.column_count(), 2);
407 assert_eq!(result.row_count(), 2);
408
409 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
411 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
412 }
413
414 #[test]
415 fn test_project_constant() {
416 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
417 builder.column_mut(0).unwrap().push_int64(1);
418 builder.advance_row();
419 builder.column_mut(0).unwrap().push_int64(2);
420 builder.advance_row();
421
422 let chunk = builder.finish();
423
424 let mock_scan = MockScanOperator {
425 chunks: vec![chunk],
426 position: 0,
427 };
428
429 let mut project = ProjectOperator::new(
431 Box::new(mock_scan),
432 vec![
433 ProjectExpr::Column(0),
434 ProjectExpr::Constant(Value::String("constant".into())),
435 ],
436 vec![LogicalType::Int64, LogicalType::String],
437 );
438
439 let result = project.next().unwrap().unwrap();
440
441 assert_eq!(result.column_count(), 2);
442 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
443 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
444 }
445
446 #[test]
447 fn test_project_empty_input() {
448 let mock_scan = MockScanOperator {
449 chunks: vec![],
450 position: 0,
451 };
452
453 let mut project =
454 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
455
456 assert!(project.next().unwrap().is_none());
457 }
458
459 #[test]
460 fn test_project_column_not_found() {
461 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
462 builder.column_mut(0).unwrap().push_int64(1);
463 builder.advance_row();
464 let chunk = builder.finish();
465
466 let mock_scan = MockScanOperator {
467 chunks: vec![chunk],
468 position: 0,
469 };
470
471 let mut project = ProjectOperator::new(
473 Box::new(mock_scan),
474 vec![ProjectExpr::Column(5)],
475 vec![LogicalType::Int64],
476 );
477
478 let result = project.next();
479 assert!(result.is_err(), "Should fail with ColumnNotFound");
480 }
481
482 #[test]
483 fn test_project_multiple_constants() {
484 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
485 builder.column_mut(0).unwrap().push_int64(1);
486 builder.advance_row();
487 let chunk = builder.finish();
488
489 let mock_scan = MockScanOperator {
490 chunks: vec![chunk],
491 position: 0,
492 };
493
494 let mut project = ProjectOperator::new(
495 Box::new(mock_scan),
496 vec![
497 ProjectExpr::Constant(Value::Int64(42)),
498 ProjectExpr::Constant(Value::String("fixed".into())),
499 ProjectExpr::Constant(Value::Bool(true)),
500 ],
501 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
502 );
503
504 let result = project.next().unwrap().unwrap();
505 assert_eq!(result.column_count(), 3);
506 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
507 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
508 assert_eq!(
509 result.column(2).unwrap().get_value(0),
510 Some(Value::Bool(true))
511 );
512 }
513
514 #[test]
515 fn test_project_identity() {
516 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
518 builder.column_mut(0).unwrap().push_int64(10);
519 builder.column_mut(1).unwrap().push_string("test");
520 builder.advance_row();
521 let chunk = builder.finish();
522
523 let mock_scan = MockScanOperator {
524 chunks: vec![chunk],
525 position: 0,
526 };
527
528 let mut project = ProjectOperator::select_columns(
529 Box::new(mock_scan),
530 vec![0, 1],
531 vec![LogicalType::Int64, LogicalType::String],
532 );
533
534 let result = project.next().unwrap().unwrap();
535 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
536 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
537 }
538
539 #[test]
540 fn test_project_name() {
541 let mock_scan = MockScanOperator {
542 chunks: vec![],
543 position: 0,
544 };
545 let project =
546 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
547 assert_eq!(project.name(), "Project");
548 }
549
550 #[test]
551 fn test_project_node_resolve() {
552 let store = LpgStore::new().unwrap();
554 let node_id = store.create_node(&["Person"]);
555 store.set_node_property(node_id, "name", Value::String("Alix".into()));
556 store.set_node_property(node_id, "age", Value::Int64(30));
557
558 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
560 builder.column_mut(0).unwrap().push_node_id(node_id);
561 builder.advance_row();
562 let chunk = builder.finish();
563
564 let mock_scan = MockScanOperator {
565 chunks: vec![chunk],
566 position: 0,
567 };
568
569 let mut project = ProjectOperator::with_store(
570 Box::new(mock_scan),
571 vec![ProjectExpr::NodeResolve { column: 0 }],
572 vec![LogicalType::Any],
573 Arc::new(store),
574 );
575
576 let result = project.next().unwrap().unwrap();
577 assert_eq!(result.column_count(), 1);
578
579 let value = result.column(0).unwrap().get_value(0).unwrap();
580 if let Value::Map(map) = value {
581 assert_eq!(
582 map.get(&PropertyKey::new("_id")),
583 Some(&Value::Int64(node_id.as_u64() as i64))
584 );
585 assert!(map.get(&PropertyKey::new("_labels")).is_some());
586 assert_eq!(
587 map.get(&PropertyKey::new("name")),
588 Some(&Value::String("Alix".into()))
589 );
590 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
591 } else {
592 panic!("Expected Value::Map, got {:?}", value);
593 }
594 }
595
596 #[test]
597 fn test_project_edge_resolve() {
598 let store = LpgStore::new().unwrap();
599 let src = store.create_node(&["Person"]);
600 let dst = store.create_node(&["Company"]);
601 let edge_id = store.create_edge(src, dst, "WORKS_AT");
602 store.set_edge_property(edge_id, "since", Value::Int64(2020));
603
604 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
606 builder.column_mut(0).unwrap().push_edge_id(edge_id);
607 builder.advance_row();
608 let chunk = builder.finish();
609
610 let mock_scan = MockScanOperator {
611 chunks: vec![chunk],
612 position: 0,
613 };
614
615 let mut project = ProjectOperator::with_store(
616 Box::new(mock_scan),
617 vec![ProjectExpr::EdgeResolve { column: 0 }],
618 vec![LogicalType::Any],
619 Arc::new(store),
620 );
621
622 let result = project.next().unwrap().unwrap();
623 let value = result.column(0).unwrap().get_value(0).unwrap();
624 if let Value::Map(map) = value {
625 assert_eq!(
626 map.get(&PropertyKey::new("_id")),
627 Some(&Value::Int64(edge_id.as_u64() as i64))
628 );
629 assert_eq!(
630 map.get(&PropertyKey::new("_type")),
631 Some(&Value::String("WORKS_AT".into()))
632 );
633 assert_eq!(
634 map.get(&PropertyKey::new("_source")),
635 Some(&Value::Int64(src.as_u64() as i64))
636 );
637 assert_eq!(
638 map.get(&PropertyKey::new("_target")),
639 Some(&Value::Int64(dst.as_u64() as i64))
640 );
641 assert_eq!(
642 map.get(&PropertyKey::new("since")),
643 Some(&Value::Int64(2020))
644 );
645 } else {
646 panic!("Expected Value::Map, got {:?}", value);
647 }
648 }
649
650 #[test]
651 fn test_project_resolve_missing_entity() {
652 use grafeo_common::types::NodeId;
653
654 let store = LpgStore::new().unwrap();
655
656 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
658 builder
659 .column_mut(0)
660 .unwrap()
661 .push_node_id(NodeId::new(999));
662 builder.advance_row();
663 let chunk = builder.finish();
664
665 let mock_scan = MockScanOperator {
666 chunks: vec![chunk],
667 position: 0,
668 };
669
670 let mut project = ProjectOperator::with_store(
671 Box::new(mock_scan),
672 vec![ProjectExpr::NodeResolve { column: 0 }],
673 vec![LogicalType::Any],
674 Arc::new(store),
675 );
676
677 let result = project.next().unwrap().unwrap();
678 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
679 }
680}