grafeo_core/execution/operators/
apply.rs1use grafeo_common::types::{LogicalType, Value};
12
13use super::{DataChunk, Operator, OperatorResult};
14use crate::execution::vector::ValueVector;
15
16pub struct ApplyOperator {
22 outer: Box<dyn Operator>,
23 inner: Box<dyn Operator>,
24 state: ApplyState,
26}
27
28enum ApplyState {
29 Init,
31 Processing {
34 outer_chunk: DataChunk,
35 outer_row: usize,
36 output: Vec<Vec<Value>>,
38 },
39 Done,
41}
42
43impl ApplyOperator {
44 pub fn new(outer: Box<dyn Operator>, inner: Box<dyn Operator>) -> Self {
46 Self {
47 outer,
48 inner,
49 state: ApplyState::Init,
50 }
51 }
52
53 fn extract_row(chunk: &DataChunk, row: usize) -> Vec<Value> {
55 let mut values = Vec::with_capacity(chunk.num_columns());
56 for col_idx in 0..chunk.num_columns() {
57 let val = chunk
58 .column(col_idx)
59 .and_then(|col| col.get_value(row))
60 .unwrap_or(Value::Null);
61 values.push(val);
62 }
63 values
64 }
65
66 fn build_chunk(rows: &[Vec<Value>]) -> DataChunk {
68 if rows.is_empty() {
69 return DataChunk::empty();
70 }
71 let num_cols = rows[0].len();
72 let mut columns: Vec<ValueVector> = (0..num_cols)
73 .map(|_| ValueVector::with_capacity(LogicalType::Any, rows.len()))
74 .collect();
75
76 for row in rows {
77 for (col_idx, val) in row.iter().enumerate() {
78 if col_idx < columns.len() {
79 columns[col_idx].push_value(val.clone());
80 }
81 }
82 }
83 DataChunk::new(columns)
84 }
85}
86
87impl Operator for ApplyOperator {
88 fn next(&mut self) -> OperatorResult {
89 loop {
90 match &mut self.state {
91 ApplyState::Init => match self.outer.next()? {
92 Some(chunk) => {
93 self.state = ApplyState::Processing {
94 outer_chunk: chunk,
95 outer_row: 0,
96 output: Vec::new(),
97 };
98 }
99 None => {
100 self.state = ApplyState::Done;
101 return Ok(None);
102 }
103 },
104 ApplyState::Processing {
105 outer_chunk,
106 outer_row,
107 output,
108 } => {
109 let selected: Vec<usize> = outer_chunk.selected_indices().collect();
110 while *outer_row < selected.len() {
111 let row = selected[*outer_row];
112 let outer_values = Self::extract_row(outer_chunk, row);
113
114 self.inner.reset();
116 while let Some(inner_chunk) = self.inner.next()? {
117 for inner_row in inner_chunk.selected_indices() {
118 let inner_values = Self::extract_row(&inner_chunk, inner_row);
119 let mut combined = outer_values.clone();
120 combined.extend(inner_values);
121 output.push(combined);
122 }
123 }
124
125 *outer_row += 1;
126
127 if output.len() >= 1024 {
129 let chunk = Self::build_chunk(output);
130 output.clear();
131 return Ok(Some(chunk));
132 }
133 }
134
135 if !output.is_empty() {
137 let chunk = Self::build_chunk(output);
138 output.clear();
139 self.state = ApplyState::Init;
140 return Ok(Some(chunk));
141 }
142
143 self.state = ApplyState::Init;
145 }
146 ApplyState::Done => return Ok(None),
147 }
148 }
149 }
150
151 fn reset(&mut self) {
152 self.outer.reset();
153 self.inner.reset();
154 self.state = ApplyState::Init;
155 }
156
157 fn name(&self) -> &'static str {
158 "Apply"
159 }
160}