1use super::filter::{ExpressionPredicate, FilterExpression, SessionContext};
4use super::{Operator, OperatorError, OperatorResult};
5use crate::execution::DataChunk;
6use crate::graph::GraphStore;
7use crate::graph::lpg::{Edge, Node};
8use grafeo_common::types::{EpochId, LogicalType, PropertyKey, TransactionId, Value};
9use std::collections::{BTreeMap, HashMap};
10use std::sync::Arc;
11
12#[non_exhaustive]
14pub enum ProjectExpr {
15 Column(usize),
17 Constant(Value),
19 PropertyAccess {
21 column: usize,
23 property: String,
25 },
26 EdgeType {
28 column: usize,
30 },
31 Expression {
33 expr: FilterExpression,
35 variable_columns: HashMap<String, usize>,
37 },
38 NodeResolve {
40 column: usize,
42 },
43 EdgeResolve {
45 column: usize,
47 },
48 Coalesce {
50 first: usize,
52 second: usize,
54 },
55}
56
57pub struct ProjectOperator {
59 child: Box<dyn Operator>,
61 projections: Vec<ProjectExpr>,
63 output_types: Vec<LogicalType>,
65 store: Option<Arc<dyn GraphStore>>,
67 transaction_id: Option<TransactionId>,
69 viewing_epoch: Option<EpochId>,
71 session_context: SessionContext,
73}
74
75impl ProjectOperator {
76 pub fn new(
82 child: Box<dyn Operator>,
83 projections: Vec<ProjectExpr>,
84 output_types: Vec<LogicalType>,
85 ) -> Self {
86 assert_eq!(projections.len(), output_types.len());
87 Self {
88 child,
89 projections,
90 output_types,
91 store: None,
92 transaction_id: None,
93 viewing_epoch: None,
94 session_context: SessionContext::default(),
95 }
96 }
97
98 pub fn with_store(
104 child: Box<dyn Operator>,
105 projections: Vec<ProjectExpr>,
106 output_types: Vec<LogicalType>,
107 store: Arc<dyn GraphStore>,
108 ) -> Self {
109 assert_eq!(projections.len(), output_types.len());
110 Self {
111 child,
112 projections,
113 output_types,
114 store: Some(store),
115 transaction_id: None,
116 viewing_epoch: None,
117 session_context: SessionContext::default(),
118 }
119 }
120
121 pub fn with_transaction_context(
123 mut self,
124 epoch: EpochId,
125 transaction_id: Option<TransactionId>,
126 ) -> Self {
127 self.viewing_epoch = Some(epoch);
128 self.transaction_id = transaction_id;
129 self
130 }
131
132 pub fn with_session_context(mut self, context: SessionContext) -> Self {
134 self.session_context = context;
135 self
136 }
137
138 pub fn into_parts(self) -> (Box<dyn Operator>, Vec<ProjectExpr>, Vec<LogicalType>) {
140 (self.child, self.projections, self.output_types)
141 }
142
143 pub fn select_columns(
145 child: Box<dyn Operator>,
146 columns: Vec<usize>,
147 types: Vec<LogicalType>,
148 ) -> Self {
149 let projections = columns.into_iter().map(ProjectExpr::Column).collect();
150 Self::new(child, projections, types)
151 }
152}
153
154impl Operator for ProjectOperator {
155 fn next(&mut self) -> OperatorResult {
156 let Some(input) = self.child.next()? else {
158 return Ok(None);
159 };
160
161 let mut output = DataChunk::with_capacity(&self.output_types, input.row_count());
163
164 for (i, proj) in self.projections.iter().enumerate() {
166 match proj {
167 ProjectExpr::Column(col_idx) => {
168 let input_col = input.column(*col_idx).ok_or_else(|| {
170 OperatorError::ColumnNotFound(format!("Column {col_idx}"))
171 })?;
172
173 let output_col = output
174 .column_mut(i)
175 .expect("column exists: index matches projection schema");
176
177 for row in input.selected_indices() {
179 if let Some(value) = input_col.get_value(row) {
180 output_col.push_value(value);
181 }
182 }
183 }
184 ProjectExpr::Constant(value) => {
185 let output_col = output
187 .column_mut(i)
188 .expect("column exists: index matches projection schema");
189 for _ in input.selected_indices() {
190 output_col.push_value(value.clone());
191 }
192 }
193 ProjectExpr::PropertyAccess { column, property } => {
194 let input_col = input
196 .column(*column)
197 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
198
199 let output_col = output
200 .column_mut(i)
201 .expect("column exists: index matches projection schema");
202
203 let store = self.store.as_ref().ok_or_else(|| {
204 OperatorError::Execution("Store required for property access".to_string())
205 })?;
206
207 let prop_key = PropertyKey::new(property);
214 let epoch = self.viewing_epoch;
215 let tx_id = self.transaction_id;
216 for row in input.selected_indices() {
217 let value = if let Some(node_id) = input_col.get_node_id(row) {
218 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
219 store.get_node_versioned(node_id, ep, tx)
220 } else if let Some(ep) = epoch {
221 store.get_node_at_epoch(node_id, ep)
222 } else {
223 store.get_node(node_id)
224 };
225 if let Some(prop) = node.and_then(|n| n.get_property(property).cloned())
226 {
227 prop
228 } else if let Some(edge_id) = input_col.get_edge_id(row) {
229 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
232 store.get_edge_versioned(edge_id, ep, tx)
233 } else if let Some(ep) = epoch {
234 store.get_edge_at_epoch(edge_id, ep)
235 } else {
236 store.get_edge(edge_id)
237 };
238 edge.and_then(|e| e.get_property(property).cloned())
239 .unwrap_or(Value::Null)
240 } else {
241 Value::Null
242 }
243 } else if let Some(edge_id) = input_col.get_edge_id(row) {
244 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
245 store.get_edge_versioned(edge_id, ep, tx)
246 } else if let Some(ep) = epoch {
247 store.get_edge_at_epoch(edge_id, ep)
248 } else {
249 store.get_edge(edge_id)
250 };
251 edge.and_then(|e| e.get_property(property).cloned())
252 .unwrap_or(Value::Null)
253 } else if let Some(Value::Map(map)) = input_col.get_value(row) {
254 map.get(&prop_key).cloned().unwrap_or(Value::Null)
255 } else {
256 Value::Null
257 };
258 output_col.push_value(value);
259 }
260 }
261 ProjectExpr::EdgeType { column } => {
262 let input_col = input
264 .column(*column)
265 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
266
267 let output_col = output
268 .column_mut(i)
269 .expect("column exists: index matches projection schema");
270
271 let store = self.store.as_ref().ok_or_else(|| {
272 OperatorError::Execution("Store required for edge type access".to_string())
273 })?;
274
275 let epoch = self.viewing_epoch;
276 let tx_id = self.transaction_id;
277 for row in input.selected_indices() {
278 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
279 let etype = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
280 store.edge_type_versioned(edge_id, ep, tx)
281 } else {
282 store.edge_type(edge_id)
283 };
284 etype.map_or(Value::Null, Value::String)
285 } else {
286 Value::Null
287 };
288 output_col.push_value(value);
289 }
290 }
291 ProjectExpr::Expression {
292 expr,
293 variable_columns,
294 } => {
295 let output_col = output
296 .column_mut(i)
297 .expect("column exists: index matches projection schema");
298
299 let store = self.store.as_ref().ok_or_else(|| {
300 OperatorError::Execution(
301 "Store required for expression evaluation".to_string(),
302 )
303 })?;
304
305 let mut evaluator = ExpressionPredicate::new(
307 expr.clone(),
308 variable_columns.clone(),
309 Arc::clone(store),
310 )
311 .with_session_context(self.session_context.clone());
312 if let (Some(ep), tx_id) = (self.viewing_epoch, self.transaction_id) {
313 evaluator = evaluator.with_transaction_context(ep, tx_id);
314 }
315
316 for row in input.selected_indices() {
317 let value = evaluator.eval_at(&input, row).unwrap_or(Value::Null);
318 output_col.push_value(value);
319 }
320 }
321 ProjectExpr::NodeResolve { column } => {
322 let input_col = input
323 .column(*column)
324 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
325
326 let output_col = output
327 .column_mut(i)
328 .expect("column exists: index matches projection schema");
329
330 let store = self.store.as_ref().ok_or_else(|| {
331 OperatorError::Execution("Store required for node resolution".to_string())
332 })?;
333
334 let epoch = self.viewing_epoch;
335 let tx_id = self.transaction_id;
336 for row in input.selected_indices() {
337 let value = if let Some(node_id) = input_col.get_node_id(row) {
338 let node = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
339 store.get_node_versioned(node_id, ep, tx)
340 } else if let Some(ep) = epoch {
341 store.get_node_at_epoch(node_id, ep)
342 } else {
343 store.get_node(node_id)
344 };
345 node.map_or(Value::Null, |n| node_to_map(&n))
346 } else {
347 Value::Null
348 };
349 output_col.push_value(value);
350 }
351 }
352 ProjectExpr::EdgeResolve { column } => {
353 let input_col = input
354 .column(*column)
355 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {column}")))?;
356
357 let output_col = output
358 .column_mut(i)
359 .expect("column exists: index matches projection schema");
360
361 let store = self.store.as_ref().ok_or_else(|| {
362 OperatorError::Execution("Store required for edge resolution".to_string())
363 })?;
364
365 let epoch = self.viewing_epoch;
366 let tx_id = self.transaction_id;
367 for row in input.selected_indices() {
368 let value = if let Some(edge_id) = input_col.get_edge_id(row) {
369 let edge = if let (Some(ep), Some(tx)) = (epoch, tx_id) {
370 store.get_edge_versioned(edge_id, ep, tx)
371 } else if let Some(ep) = epoch {
372 store.get_edge_at_epoch(edge_id, ep)
373 } else {
374 store.get_edge(edge_id)
375 };
376 edge.map_or(Value::Null, |e| edge_to_map(&e))
377 } else {
378 Value::Null
379 };
380 output_col.push_value(value);
381 }
382 }
383 ProjectExpr::Coalesce { first, second } => {
384 let first_col = input
385 .column(*first)
386 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {first}")))?;
387 let second_col = input
388 .column(*second)
389 .ok_or_else(|| OperatorError::ColumnNotFound(format!("Column {second}")))?;
390
391 let output_col = output
392 .column_mut(i)
393 .expect("column exists: index matches projection schema");
394
395 for row in input.selected_indices() {
396 let value = match first_col.get_value(row) {
397 Some(Value::Null) | None => {
398 second_col.get_value(row).unwrap_or(Value::Null)
399 }
400 Some(v) => v,
401 };
402 output_col.push_value(value);
403 }
404 }
405 }
406 }
407
408 output.set_count(input.row_count());
409 Ok(Some(output))
410 }
411
412 fn reset(&mut self) {
413 self.child.reset();
414 }
415
416 fn name(&self) -> &'static str {
417 "Project"
418 }
419
420 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
421 self
422 }
423}
424
425fn node_to_map(node: &Node) -> Value {
430 let mut map = BTreeMap::new();
431 #[allow(clippy::cast_possible_wrap)]
433 let node_id_i64 = node.id.as_u64() as i64;
434 map.insert(PropertyKey::new("_id"), Value::Int64(node_id_i64));
435 let labels: Vec<Value> = node
436 .labels
437 .iter()
438 .map(|l| Value::String(l.clone()))
439 .collect();
440 map.insert(PropertyKey::new("_labels"), Value::List(labels.into()));
441 for (key, value) in &node.properties {
442 map.insert(key.clone(), value.clone());
443 }
444 Value::Map(Arc::new(map))
445}
446
447fn edge_to_map(edge: &Edge) -> Value {
452 let mut map = BTreeMap::new();
453 #[allow(clippy::cast_possible_wrap)]
455 let edge_id_i64 = edge.id.as_u64() as i64;
456 #[allow(clippy::cast_possible_wrap)]
458 let src_id_i64 = edge.src.as_u64() as i64;
459 #[allow(clippy::cast_possible_wrap)]
461 let dst_id_i64 = edge.dst.as_u64() as i64;
462 map.insert(PropertyKey::new("_id"), Value::Int64(edge_id_i64));
463 map.insert(
464 PropertyKey::new("_type"),
465 Value::String(edge.edge_type.clone()),
466 );
467 map.insert(PropertyKey::new("_source"), Value::Int64(src_id_i64));
468 map.insert(PropertyKey::new("_target"), Value::Int64(dst_id_i64));
469 for (key, value) in &edge.properties {
470 map.insert(key.clone(), value.clone());
471 }
472 Value::Map(Arc::new(map))
473}
474
475#[cfg(all(test, feature = "lpg"))]
476mod tests {
477 use super::*;
478 use crate::execution::chunk::DataChunkBuilder;
479 use crate::graph::lpg::LpgStore;
480 use grafeo_common::types::Value;
481
482 struct MockScanOperator {
483 chunks: Vec<DataChunk>,
484 position: usize,
485 }
486
487 impl Operator for MockScanOperator {
488 fn next(&mut self) -> OperatorResult {
489 if self.position < self.chunks.len() {
490 let chunk = std::mem::replace(&mut self.chunks[self.position], DataChunk::empty());
491 self.position += 1;
492 Ok(Some(chunk))
493 } else {
494 Ok(None)
495 }
496 }
497
498 fn reset(&mut self) {
499 self.position = 0;
500 }
501
502 fn name(&self) -> &'static str {
503 "MockScan"
504 }
505
506 fn into_any(self: Box<Self>) -> Box<dyn std::any::Any + Send> {
507 self
508 }
509 }
510
511 #[test]
512 fn test_project_select_columns() {
513 let mut builder =
515 DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String, LogicalType::Int64]);
516
517 builder.column_mut(0).unwrap().push_int64(1);
518 builder.column_mut(1).unwrap().push_string("hello");
519 builder.column_mut(2).unwrap().push_int64(100);
520 builder.advance_row();
521
522 builder.column_mut(0).unwrap().push_int64(2);
523 builder.column_mut(1).unwrap().push_string("world");
524 builder.column_mut(2).unwrap().push_int64(200);
525 builder.advance_row();
526
527 let chunk = builder.finish();
528
529 let mock_scan = MockScanOperator {
530 chunks: vec![chunk],
531 position: 0,
532 };
533
534 let mut project = ProjectOperator::select_columns(
536 Box::new(mock_scan),
537 vec![2, 0],
538 vec![LogicalType::Int64, LogicalType::Int64],
539 );
540
541 let result = project.next().unwrap().unwrap();
542
543 assert_eq!(result.column_count(), 2);
544 assert_eq!(result.row_count(), 2);
545
546 assert_eq!(result.column(0).unwrap().get_int64(0), Some(100));
548 assert_eq!(result.column(1).unwrap().get_int64(0), Some(1));
549 }
550
551 #[test]
552 fn test_project_constant() {
553 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
554 builder.column_mut(0).unwrap().push_int64(1);
555 builder.advance_row();
556 builder.column_mut(0).unwrap().push_int64(2);
557 builder.advance_row();
558
559 let chunk = builder.finish();
560
561 let mock_scan = MockScanOperator {
562 chunks: vec![chunk],
563 position: 0,
564 };
565
566 let mut project = ProjectOperator::new(
568 Box::new(mock_scan),
569 vec![
570 ProjectExpr::Column(0),
571 ProjectExpr::Constant(Value::String("constant".into())),
572 ],
573 vec![LogicalType::Int64, LogicalType::String],
574 );
575
576 let result = project.next().unwrap().unwrap();
577
578 assert_eq!(result.column_count(), 2);
579 assert_eq!(result.column(1).unwrap().get_string(0), Some("constant"));
580 assert_eq!(result.column(1).unwrap().get_string(1), Some("constant"));
581 }
582
583 #[test]
584 fn test_project_empty_input() {
585 let mock_scan = MockScanOperator {
586 chunks: vec![],
587 position: 0,
588 };
589
590 let mut project =
591 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
592
593 assert!(project.next().unwrap().is_none());
594 }
595
596 #[test]
597 fn test_project_column_not_found() {
598 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
599 builder.column_mut(0).unwrap().push_int64(1);
600 builder.advance_row();
601 let chunk = builder.finish();
602
603 let mock_scan = MockScanOperator {
604 chunks: vec![chunk],
605 position: 0,
606 };
607
608 let mut project = ProjectOperator::new(
610 Box::new(mock_scan),
611 vec![ProjectExpr::Column(5)],
612 vec![LogicalType::Int64],
613 );
614
615 let result = project.next();
616 assert!(result.is_err(), "Should fail with ColumnNotFound");
617 }
618
619 #[test]
620 fn test_project_multiple_constants() {
621 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64]);
622 builder.column_mut(0).unwrap().push_int64(1);
623 builder.advance_row();
624 let chunk = builder.finish();
625
626 let mock_scan = MockScanOperator {
627 chunks: vec![chunk],
628 position: 0,
629 };
630
631 let mut project = ProjectOperator::new(
632 Box::new(mock_scan),
633 vec![
634 ProjectExpr::Constant(Value::Int64(42)),
635 ProjectExpr::Constant(Value::String("fixed".into())),
636 ProjectExpr::Constant(Value::Bool(true)),
637 ],
638 vec![LogicalType::Int64, LogicalType::String, LogicalType::Bool],
639 );
640
641 let result = project.next().unwrap().unwrap();
642 assert_eq!(result.column_count(), 3);
643 assert_eq!(result.column(0).unwrap().get_int64(0), Some(42));
644 assert_eq!(result.column(1).unwrap().get_string(0), Some("fixed"));
645 assert_eq!(
646 result.column(2).unwrap().get_value(0),
647 Some(Value::Bool(true))
648 );
649 }
650
651 #[test]
652 fn test_project_identity() {
653 let mut builder = DataChunkBuilder::new(&[LogicalType::Int64, LogicalType::String]);
655 builder.column_mut(0).unwrap().push_int64(10);
656 builder.column_mut(1).unwrap().push_string("test");
657 builder.advance_row();
658 let chunk = builder.finish();
659
660 let mock_scan = MockScanOperator {
661 chunks: vec![chunk],
662 position: 0,
663 };
664
665 let mut project = ProjectOperator::select_columns(
666 Box::new(mock_scan),
667 vec![0, 1],
668 vec![LogicalType::Int64, LogicalType::String],
669 );
670
671 let result = project.next().unwrap().unwrap();
672 assert_eq!(result.column(0).unwrap().get_int64(0), Some(10));
673 assert_eq!(result.column(1).unwrap().get_string(0), Some("test"));
674 }
675
676 #[test]
677 fn test_project_name() {
678 let mock_scan = MockScanOperator {
679 chunks: vec![],
680 position: 0,
681 };
682 let project =
683 ProjectOperator::select_columns(Box::new(mock_scan), vec![0], vec![LogicalType::Int64]);
684 assert_eq!(project.name(), "Project");
685 }
686
687 #[test]
688 #[allow(clippy::cast_possible_wrap)]
690 fn test_project_node_resolve() {
691 let store = LpgStore::new().unwrap();
693 let node_id = store.create_node(&["Person"]);
694 store.set_node_property(node_id, "name", Value::String("Alix".into()));
695 store.set_node_property(node_id, "age", Value::Int64(30));
696
697 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
699 builder.column_mut(0).unwrap().push_node_id(node_id);
700 builder.advance_row();
701 let chunk = builder.finish();
702
703 let mock_scan = MockScanOperator {
704 chunks: vec![chunk],
705 position: 0,
706 };
707
708 let mut project = ProjectOperator::with_store(
709 Box::new(mock_scan),
710 vec![ProjectExpr::NodeResolve { column: 0 }],
711 vec![LogicalType::Any],
712 Arc::new(store),
713 );
714
715 let result = project.next().unwrap().unwrap();
716 assert_eq!(result.column_count(), 1);
717
718 let value = result.column(0).unwrap().get_value(0).unwrap();
719 if let Value::Map(map) = value {
720 assert_eq!(
721 map.get(&PropertyKey::new("_id")),
722 Some(&Value::Int64(node_id.as_u64() as i64))
723 );
724 assert!(map.get(&PropertyKey::new("_labels")).is_some());
725 assert_eq!(
726 map.get(&PropertyKey::new("name")),
727 Some(&Value::String("Alix".into()))
728 );
729 assert_eq!(map.get(&PropertyKey::new("age")), Some(&Value::Int64(30)));
730 } else {
731 panic!("Expected Value::Map, got {:?}", value);
732 }
733 }
734
735 #[test]
736 #[allow(clippy::cast_possible_wrap)]
738 fn test_project_edge_resolve() {
739 let store = LpgStore::new().unwrap();
740 let src = store.create_node(&["Person"]);
741 let dst = store.create_node(&["Company"]);
742 let edge_id = store.create_edge(src, dst, "WORKS_AT");
743 store.set_edge_property(edge_id, "since", Value::Int64(2020));
744
745 let mut builder = DataChunkBuilder::new(&[LogicalType::Edge]);
747 builder.column_mut(0).unwrap().push_edge_id(edge_id);
748 builder.advance_row();
749 let chunk = builder.finish();
750
751 let mock_scan = MockScanOperator {
752 chunks: vec![chunk],
753 position: 0,
754 };
755
756 let mut project = ProjectOperator::with_store(
757 Box::new(mock_scan),
758 vec![ProjectExpr::EdgeResolve { column: 0 }],
759 vec![LogicalType::Any],
760 Arc::new(store),
761 );
762
763 let result = project.next().unwrap().unwrap();
764 let value = result.column(0).unwrap().get_value(0).unwrap();
765 if let Value::Map(map) = value {
766 assert_eq!(
767 map.get(&PropertyKey::new("_id")),
768 Some(&Value::Int64(edge_id.as_u64() as i64))
769 );
770 assert_eq!(
771 map.get(&PropertyKey::new("_type")),
772 Some(&Value::String("WORKS_AT".into()))
773 );
774 assert_eq!(
775 map.get(&PropertyKey::new("_source")),
776 Some(&Value::Int64(src.as_u64() as i64))
777 );
778 assert_eq!(
779 map.get(&PropertyKey::new("_target")),
780 Some(&Value::Int64(dst.as_u64() as i64))
781 );
782 assert_eq!(
783 map.get(&PropertyKey::new("since")),
784 Some(&Value::Int64(2020))
785 );
786 } else {
787 panic!("Expected Value::Map, got {:?}", value);
788 }
789 }
790
791 #[test]
792 fn test_project_resolve_missing_entity() {
793 use grafeo_common::types::NodeId;
794
795 let store = LpgStore::new().unwrap();
796
797 let mut builder = DataChunkBuilder::new(&[LogicalType::Node]);
799 builder
800 .column_mut(0)
801 .unwrap()
802 .push_node_id(NodeId::new(999));
803 builder.advance_row();
804 let chunk = builder.finish();
805
806 let mock_scan = MockScanOperator {
807 chunks: vec![chunk],
808 position: 0,
809 };
810
811 let mut project = ProjectOperator::with_store(
812 Box::new(mock_scan),
813 vec![ProjectExpr::NodeResolve { column: 0 }],
814 vec![LogicalType::Any],
815 Arc::new(store),
816 );
817
818 let result = project.next().unwrap().unwrap();
819 assert_eq!(result.column(0).unwrap().get_value(0), Some(Value::Null));
820 }
821
822 #[test]
823 fn test_project_into_any() {
824 let mock = MockScanOperator {
825 chunks: vec![],
826 position: 0,
827 };
828 let op = ProjectOperator::select_columns(Box::new(mock), vec![0], vec![LogicalType::Int64]);
829 let any = Box::new(op).into_any();
830 assert!(any.downcast::<ProjectOperator>().is_ok());
831 }
832
833 #[test]
834 fn test_project_into_parts() {
835 let mock = MockScanOperator {
836 chunks: vec![],
837 position: 0,
838 };
839 let op = ProjectOperator::new(
840 Box::new(mock),
841 vec![
842 ProjectExpr::Column(0),
843 ProjectExpr::Constant(Value::Int64(1)),
844 ],
845 vec![LogicalType::Int64, LogicalType::Int64],
846 );
847 let (child, projections, output_types) = op.into_parts();
848 assert_eq!(projections.len(), 2);
849 assert_eq!(output_types.len(), 2);
850 let mut child = child;
852 assert!(child.next().unwrap().is_none());
853 }
854}