1use super::filter::{ExpressionPredicate, FilterExpression};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::lpg::{Edge, LpgStore, Node};
7use grafeo_common::types::{LogicalType, PropertyKey, Value};
8use std::collections::{BTreeMap, HashMap};
9use std::sync::Arc;
10
11pub enum ProjectExpr {
13 Column(usize),
15 Constant(Value),
17 PropertyAccess {
19 column: usize,
21 property: String,
23 },
24 EdgeType {
26 column: usize,
28 },
29 Expression {
31 expr: FilterExpression,
33 variable_columns: HashMap<String, usize>,
35 },
36 NodeResolve {
38 column: usize,
40 },
41 EdgeResolve {
43 column: usize,
45 },
46}
47
48pub struct ProjectOperator {
50 child: Box<dyn Operator>,
52 projections: Vec<ProjectExpr>,
54 output_types: Vec<LogicalType>,
56 store: Option<Arc<LpgStore>>,
58}
59
60impl ProjectOperator {
61 pub fn new(
63 child: Box<dyn Operator>,
64 projections: Vec<ProjectExpr>,
65 output_types: Vec<LogicalType>,
66 ) -> Self {
67 assert_eq!(projections.len(), output_types.len());
68 Self {
69 child,
70 projections,
71 output_types,
72 store: None,
73 }
74 }
75
76 pub fn with_store(
78 child: Box<dyn Operator>,
79 projections: Vec<ProjectExpr>,
80 output_types: Vec<LogicalType>,
81 store: Arc<LpgStore>,
82 ) -> Self {
83 assert_eq!(projections.len(), output_types.len());
84 Self {
85 child,
86 projections,
87 output_types,
88 store: Some(store),
89 }
90 }
91
92 pub fn select_columns(
94 child: Box<dyn Operator>,
95 columns: Vec<usize>,
96 types: Vec<LogicalType>,
97 ) -> Self {
98 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
99 Self::new(child, projections, types)
100 }
101}
102
103impl Operator for ProjectOperator {
104 fn next(&mut self) -> OperatorResult {
105 let Some(input) = self.child.next()? else {
107 return Ok(None);
108 };
109
110 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
112
113 for (i, proj) in self.projections.iter().enumerate() {
115 match proj {
116 ProjectExpr::Column(col_idx) => {
117 let input_col = input.column(*col_idx).ok_or_else(|| {
119 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
120 })?;
121
122 let output_col = output.column_mut(i).unwrap();
123
124 for row in input.selected_indices() {
126 if let Some(value) = input_col.get_value(row) {
127 output_col.push_value(value);
128 }
129 }
130 }
131 ProjectExpr::Constant(value) => {
132 let output_col = output.column_mut(i).unwrap();
134 for _ in input.selected_indices() {
135 output_col.push_value(value.clone());
136 }
137 }
138 ProjectExpr::PropertyAccess { column, property } => {
139 let input_col = input
141 .column(*column)
142 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
143
144 let output_col = output.column_mut(i).unwrap();
145
146 let store = self.store.as_ref().ok_or_else(|| {
147 OperatorError::Execution("Store required for property access".to_string())
148 })?;
149
150 let prop_key = PropertyKey::new(property);
152 for row in input.selected_indices() {
153 let value = if let Some(node_id) = input_col.get_node_id(row) {
155 store
156 .get_node(node_id)
157 .and_then(|node| node.get_property(property).cloned())
158 .unwrap_or(Value::Null)
159 } else if let Some(edge_id) = input_col.get_edge_id(row) {
160 store
161 .get_edge(edge_id)
162 .and_then(|edge| edge.get_property(property).cloned())
163 .unwrap_or(Value::Null)
164 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
165 map.get(&prop_key).cloned().unwrap_or(Value::Null)
166 } else {
167 Value::Null
168 };
169 output_col.push_value(value);
170 }
171 }
172 ProjectExpr::EdgeType { column } => {
173 let input_col = input
175 .column(*column)
176 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
177
178 let output_col = output.column_mut(i).unwrap();
179
180 let store = self.store.as_ref().ok_or_else(|| {
181 OperatorError::Execution("Store required for edge type access".to_string())
182 })?;
183
184 for row in input.selected_indices() {
185 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
186 store.edge_type(edge_id).map_or(Value::Null, Value::String)
187 } else {
188 Value::Null
189 };
190 output_col.push_value(value);
191 }
192 }
193 ProjectExpr::Expression {
194 expr,
195 variable_columns,
196 } => {
197 let output_col = output.column_mut(i).unwrap();
198
199 let store = self.store.as_ref().ok_or_else(|| {
200 OperatorError::Execution(
201 "Store required for expression evaluation".to_string(),
202 )
203 })?;
204
205 let evaluator = ExpressionPredicate::new(
207 expr.clone(),
208 variable_columns.clone(),
209 Arc::clone(store),
210 );
211
212 for row in input.selected_indices() {
213 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
214 output_col.push_value(value);
215 }
216 }
217 ProjectExpr::NodeResolve { column } => {
218 let input_col = input
219 .column(*column)
220 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
221
222 let output_col = output.column_mut(i).unwrap();
223
224 let store = self.store.as_ref().ok_or_else(|| {
225 OperatorError::Execution("Store required for node resolution".to_string())
226 })?;
227
228 for row in input.selected_indices() {
229 let value = if let Some(node_id) = input_col.get_node_id(row) {
230 store
231 .get_node(node_id)
232 .map_or(Value::Null, |n| node_to_map(&n))
233 } else {
234 Value::Null
235 };
236 output_col.push_value(value);
237 }
238 }
239 ProjectExpr::EdgeResolve { column } => {
240 let input_col = input
241 .column(*column)
242 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
243
244 let output_col = output.column_mut(i).unwrap();
245
246 let store = self.store.as_ref().ok_or_else(|| {
247 OperatorError::Execution("Store required for edge resolution".to_string())
248 })?;
249
250 for row in input.selected_indices() {
251 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
252 store
253 .get_edge(edge_id)
254 .map_or(Value::Null, |e| edge_to_map(&e))
255 } else {
256 Value::Null
257 };
258 output_col.push_value(value);
259 }
260 }
261 }
262 }
263
264 output.set_count(input.row_count());
265 Ok(Some(output))
266 }
267
268 fn reset(&mut self) {
269 self.child.reset();
270 }
271
272 fn name(&self) -> &'static str {
273 "Project"
274 }
275}
276
277fn node_to_map(node: &Node) -> Value {
282 let mut map = BTreeMap::new();
283 map.insert(
284 PropertyKey::new("_id"),
285 Value::Int64(node.id.as_u64() as i64),
286 );
287 let labels: Vec<Value> = node
288 .labels
289 .iter()
290 .map(|l| Value::String(l.clone()))
291 .collect();
292 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
293 for (key, value) in &node.properties {
294 map.insert(key.clone(), value.clone());
295 }
296 Value::Map(Arc::new(map))
297}
298
299fn edge_to_map(edge: &Edge) -> Value {
304 let mut map = BTreeMap::new();
305 map.insert(
306 PropertyKey::new("_id"),
307 Value::Int64(edge.id.as_u64() as i64),
308 );
309 map.insert(
310 PropertyKey::new("_type"),
311 Value::String(edge.edge_type.clone()),
312 );
313 map.insert(
314 PropertyKey::new("_source"),
315 Value::Int64(edge.src.as_u64() as i64),
316 );
317 map.insert(
318 PropertyKey::new("_target"),
319 Value::Int64(edge.dst.as_u64() as i64),
320 );
321 for (key, value) in &edge.properties {
322 map.insert(key.clone(), value.clone());
323 }
324 Value::Map(Arc::new(map))
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use crate::execution::chunk::DataChunkBuilder;
331 use grafeo_common::types::Value;
332
333 struct MockScanOperator {
334 chunks: Vec<DataChunk>,
335 position: usize,
336 }
337
338 impl Operator for MockScanOperator {
339 fn next(&mut self) -> OperatorResult {
340 if self.position < self.chunks.len() {
341 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
342 self.position += 1;
343 Ok(Some(chunk))
344 } else {
345 Ok(None)
346 }
347 }
348
349 fn reset(&mut self) {
350 self.position = 0;
351 }
352
353 fn name(&self) -> &'static str {
354 "MockScan"
355 }
356 }
357
358 #[test]
359 fn test_project_select_columns() {
360 let mut builder =
362 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
363
364 builder.column_mut(0).unwrap().push_int64(1);
365 builder.column_mut(1).unwrap().push_string("hello");
366 builder.column_mut(2).unwrap().push_int64(100);
367 builder.advance_row();
368
369 builder.column_mut(0).unwrap().push_int64(2);
370 builder.column_mut(1).unwrap().push_string("world");
371 builder.column_mut(2).unwrap().push_int64(200);
372 builder.advance_row();
373
374 let chunk = builder.finish();
375
376 let mock_scan = MockScanOperator {
377 chunks: vec![chunk],
378 position: 0,
379 };
380
381 let mut project = ProjectOperator::select_columns(
383 Box::new(mock_scan),
384 vec![2, 0],
385 vec![LogicalType::Int64, LogicalType::Int64],
386 );
387
388 let result = project.next().unwrap().unwrap();
389
390 assert_eq!(result.column_count(), 2);
391 assert_eq!(result.row_count(), 2);
392
393 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
395 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
396 }
397
398 #[test]
399 fn test_project_constant() {
400 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
401 builder.column_mut(0).unwrap().push_int64(1);
402 builder.advance_row();
403 builder.column_mut(0).unwrap().push_int64(2);
404 builder.advance_row();
405
406 let chunk = builder.finish();
407
408 let mock_scan = MockScanOperator {
409 chunks: vec![chunk],
410 position: 0,
411 };
412
413 let mut project = ProjectOperator::new(
415 Box::new(mock_scan),
416 vec![
417 ProjectExpr::Column(0),
418 ProjectExpr::Constant(Value::String("constant".into())),
419 ],
420 vec![LogicalType::Int64, LogicalType::String],
421 );
422
423 let result = project.next().unwrap().unwrap();
424
425 assert_eq!(result.column_count(), 2);
426 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
427 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
428 }
429
430 #[test]
431 fn test_project_empty_input() {
432 let mock_scan = MockScanOperator {
433 chunks: vec![],
434 position: 0,
435 };
436
437 let mut project =
438 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
439
440 assert!(project.next().unwrap().is_none());
441 }
442
443 #[test]
444 fn test_project_column_not_found() {
445 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
446 builder.column_mut(0).unwrap().push_int64(1);
447 builder.advance_row();
448 let chunk = builder.finish();
449
450 let mock_scan = MockScanOperator {
451 chunks: vec![chunk],
452 position: 0,
453 };
454
455 let mut project = ProjectOperator::new(
457 Box::new(mock_scan),
458 vec![ProjectExpr::Column(5)],
459 vec![LogicalType::Int64],
460 );
461
462 let result = project.next();
463 assert!(result.is_err(), "Should fail with ColumnNotFound");
464 }
465
466 #[test]
467 fn test_project_multiple_constants() {
468 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
469 builder.column_mut(0).unwrap().push_int64(1);
470 builder.advance_row();
471 let chunk = builder.finish();
472
473 let mock_scan = MockScanOperator {
474 chunks: vec![chunk],
475 position: 0,
476 };
477
478 let mut project = ProjectOperator::new(
479 Box::new(mock_scan),
480 vec![
481 ProjectExpr::Constant(Value::Int64(42)),
482 ProjectExpr::Constant(Value::String("fixed".into())),
483 ProjectExpr::Constant(Value::Bool(true)),
484 ],
485 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
486 );
487
488 let result = project.next().unwrap().unwrap();
489 assert_eq!(result.column_count(), 3);
490 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
491 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
492 assert_eq!(
493 result.column(2).unwrap().get_value(0),
494 Some(Value::Bool(true))
495 );
496 }
497
498 #[test]
499 fn test_project_identity() {
500 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
502 builder.column_mut(0).unwrap().push_int64(10);
503 builder.column_mut(1).unwrap().push_string("test");
504 builder.advance_row();
505 let chunk = builder.finish();
506
507 let mock_scan = MockScanOperator {
508 chunks: vec![chunk],
509 position: 0,
510 };
511
512 let mut project = ProjectOperator::select_columns(
513 Box::new(mock_scan),
514 vec![0, 1],
515 vec![LogicalType::Int64, LogicalType::String],
516 );
517
518 let result = project.next().unwrap().unwrap();
519 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
520 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
521 }
522
523 #[test]
524 fn test_project_name() {
525 let mock_scan = MockScanOperator {
526 chunks: vec![],
527 position: 0,
528 };
529 let project =
530 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
531 assert_eq!(project.name(), "Project");
532 }
533
534 #[test]
535 fn test_project_node_resolve() {
536 let store = LpgStore::new();
538 let node_id = store.create_node(&["Person"]);
539 store.set_node_property(node_id, "name", Value::String("Alice".into()));
540 store.set_node_property(node_id, "age", Value::Int64(30));
541
542 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
544 builder.column_mut(0).unwrap().push_node_id(node_id);
545 builder.advance_row();
546 let chunk = builder.finish();
547
548 let mock_scan = MockScanOperator {
549 chunks: vec![chunk],
550 position: 0,
551 };
552
553 let mut project = ProjectOperator::with_store(
554 Box::new(mock_scan),
555 vec![ProjectExpr::NodeResolve { column: 0 }],
556 vec![LogicalType::Any],
557 Arc::new(store),
558 );
559
560 let result = project.next().unwrap().unwrap();
561 assert_eq!(result.column_count(), 1);
562
563 let value = result.column(0).unwrap().get_value(0).unwrap();
564 if let Value::Map(map) = value {
565 assert_eq!(
566 map.get(&PropertyKey::new("_id")),
567 Some(&Value::Int64(node_id.as_u64() as i64))
568 );
569 assert!(map.get(&PropertyKey::new("_labels")).is_some());
570 assert_eq!(
571 map.get(&PropertyKey::new("name")),
572 Some(&Value::String("Alice".into()))
573 );
574 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
575 } else {
576 panic!("Expected Value::Map, got {:?}", value);
577 }
578 }
579
580 #[test]
581 fn test_project_edge_resolve() {
582 let store = LpgStore::new();
583 let src = store.create_node(&["Person"]);
584 let dst = store.create_node(&["Company"]);
585 let edge_id = store.create_edge(src, dst, "WORKS_AT");
586 store.set_edge_property(edge_id, "since", Value::Int64(2020));
587
588 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
590 builder.column_mut(0).unwrap().push_edge_id(edge_id);
591 builder.advance_row();
592 let chunk = builder.finish();
593
594 let mock_scan = MockScanOperator {
595 chunks: vec![chunk],
596 position: 0,
597 };
598
599 let mut project = ProjectOperator::with_store(
600 Box::new(mock_scan),
601 vec![ProjectExpr::EdgeResolve { column: 0 }],
602 vec![LogicalType::Any],
603 Arc::new(store),
604 );
605
606 let result = project.next().unwrap().unwrap();
607 let value = result.column(0).unwrap().get_value(0).unwrap();
608 if let Value::Map(map) = value {
609 assert_eq!(
610 map.get(&PropertyKey::new("_id")),
611 Some(&Value::Int64(edge_id.as_u64() as i64))
612 );
613 assert_eq!(
614 map.get(&PropertyKey::new("_type")),
615 Some(&Value::String("WORKS_AT".into()))
616 );
617 assert_eq!(
618 map.get(&PropertyKey::new("_source")),
619 Some(&Value::Int64(src.as_u64() as i64))
620 );
621 assert_eq!(
622 map.get(&PropertyKey::new("_target")),
623 Some(&Value::Int64(dst.as_u64() as i64))
624 );
625 assert_eq!(
626 map.get(&PropertyKey::new("since")),
627 Some(&Value::Int64(2020))
628 );
629 } else {
630 panic!("Expected Value::Map, got {:?}", value);
631 }
632 }
633
634 #[test]
635 fn test_project_resolve_missing_entity() {
636 use grafeo_common::types::NodeId;
637
638 let store = LpgStore::new();
639
640 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
642 builder
643 .column_mut(0)
644 .unwrap()
645 .push_node_id(NodeId::new(999));
646 builder.advance_row();
647 let chunk = builder.finish();
648
649 let mock_scan = MockScanOperator {
650 chunks: vec![chunk],
651 position: 0,
652 };
653
654 let mut project = ProjectOperator::with_store(
655 Box::new(mock_scan),
656 vec![ProjectExpr::NodeResolve { column: 0 }],
657 vec![LogicalType::Any],
658 Arc::new(store),
659 );
660
661 let result = project.next().unwrap().unwrap();
662 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
663 }
664}