reifydb_sub_flow/operator/
append.rs1use std::sync::Arc;
5
6use reifydb_core::{
7 encoded::key::EncodedKey,
8 interface::{
9 catalog::flow::FlowNodeId,
10 change::{Change, ChangeOrigin, Diff},
11 },
12 internal,
13 util::encoding::keycode::serializer::KeySerializer,
14 value::column::columns::Columns,
15};
16use reifydb_type::{Result, error::Error, value::row_number::RowNumber};
17
18use crate::{
19 operator::{Operator, Operators, stateful::row::RowNumberProvider},
20 transaction::FlowTransaction,
21};
22
23pub struct AppendOperator {
26 node: FlowNodeId,
27 parents: Vec<Arc<Operators>>,
29 input_nodes: Vec<FlowNodeId>,
31 row_number_provider: RowNumberProvider,
33}
34
35impl AppendOperator {
36 pub fn new(node: FlowNodeId, parents: Vec<Arc<Operators>>, input_nodes: Vec<FlowNodeId>) -> Self {
37 debug_assert_eq!(parents.len(), input_nodes.len());
38 debug_assert!(parents.len() >= 2, "Append requires at least 2 inputs");
39
40 Self {
41 node,
42 parents,
43 input_nodes,
44 row_number_provider: RowNumberProvider::new(node),
45 }
46 }
47
48 fn determine_parent_index(&self, change: &Change) -> Option<usize> {
50 match &change.origin {
51 ChangeOrigin::Flow(from_node) => self.input_nodes.iter().position(|n| n == from_node),
52 ChangeOrigin::Shape(_) => None,
53 }
54 }
55
56 fn make_composite_key(parent_index: u8, source_row: RowNumber) -> EncodedKey {
58 let mut serializer = KeySerializer::new();
59 serializer.extend_u8(parent_index);
60 serializer.extend_u64(source_row.0);
61 EncodedKey::new(serializer.finish())
62 }
63
64 fn parse_composite_key(key_bytes: &[u8]) -> Option<(usize, RowNumber)> {
67 if key_bytes.len() < 9 {
68 return None;
69 }
70 let parent_index = !key_bytes[0];
72 let source_row = u64::from_be_bytes([
74 !key_bytes[1],
75 !key_bytes[2],
76 !key_bytes[3],
77 !key_bytes[4],
78 !key_bytes[5],
79 !key_bytes[6],
80 !key_bytes[7],
81 !key_bytes[8],
82 ]);
83 Some((parent_index as usize, RowNumber(source_row)))
84 }
85}
86
87impl Operator for AppendOperator {
88 fn id(&self) -> FlowNodeId {
89 self.node
90 }
91
92 fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
93 let parent_index = self.determine_parent_index(&change).ok_or_else(|| {
94 Error(Box::new(internal!("Append received change from unknown node: {:?}", change.origin)))
95 })?;
96
97 let mut result_diffs = Vec::with_capacity(change.diffs.len());
98
99 for diff in change.diffs {
100 match diff {
101 Diff::Insert {
102 post,
103 } => {
104 if let Some(d) = self.translate_append_insert(txn, parent_index, post)? {
105 result_diffs.push(d);
106 }
107 }
108 Diff::Update {
109 pre,
110 post,
111 } => {
112 if let Some(d) = self.translate_append_update(txn, parent_index, pre, post)? {
113 result_diffs.push(d);
114 }
115 }
116 Diff::Remove {
117 pre,
118 } => {
119 if let Some(d) = self.translate_append_remove(txn, parent_index, pre)? {
120 result_diffs.push(d);
121 }
122 }
123 }
124 }
125
126 Ok(Change::from_flow(self.node, change.version, result_diffs, change.changed_at))
127 }
128
129 fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
130 let mut found_columns: Vec<Columns> = Vec::new();
131
132 for &row_number in rows {
133 let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
135 continue;
136 };
137
138 let Some((parent_index, source_row_number)) = Self::parse_composite_key(key.as_ref()) else {
139 continue;
140 };
141
142 if parent_index >= self.parents.len() {
143 continue;
144 }
145
146 let parent_cols = self.parents[parent_index].pull(txn, &[source_row_number])?;
147
148 if !parent_cols.is_empty() {
149 let updated = parent_cols.with_row_numbers(vec![row_number]);
151 found_columns.push(updated);
152 }
153 }
154
155 if found_columns.is_empty() {
157 self.parents[0].pull(txn, &[])
158 } else if found_columns.len() == 1 {
159 Ok(found_columns.remove(0))
160 } else {
161 let mut result = found_columns.remove(0);
162 for cols in found_columns {
163 result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
164 for (i, col) in cols.columns.into_iter().enumerate() {
165 result.columns.make_mut()[i]
166 .extend(col)
167 .expect("shape mismatch in append pull");
168 }
169 }
170 Ok(result)
171 }
172 }
173}
174
175impl AppendOperator {
176 #[inline]
177 fn translate_row_numbers(
178 &self,
179 txn: &mut FlowTransaction,
180 parent_index: usize,
181 source: &Columns,
182 ) -> Result<Vec<RowNumber>> {
183 let row_count = source.row_count();
184 let mut output_row_numbers = Vec::with_capacity(row_count);
185 for row_idx in 0..row_count {
186 let source_row_number = source.row_numbers[row_idx];
187 let composite_key = Self::make_composite_key(parent_index as u8, source_row_number);
188 let (output_row_number, _) =
189 self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
190 output_row_numbers.push(output_row_number);
191 }
192 Ok(output_row_numbers)
193 }
194
195 #[inline]
196 fn translate_append_insert(
197 &self,
198 txn: &mut FlowTransaction,
199 parent_index: usize,
200 post: Arc<Columns>,
201 ) -> Result<Option<Diff>> {
202 if post.row_count() == 0 {
203 return Ok(None);
204 }
205 let output_row_numbers = self.translate_row_numbers(txn, parent_index, &post)?;
206 let output = Arc::unwrap_or_clone(post).with_row_numbers(output_row_numbers);
207 Ok(Some(Diff::insert(output)))
208 }
209
210 #[inline]
211 fn translate_append_update(
212 &self,
213 txn: &mut FlowTransaction,
214 parent_index: usize,
215 pre: Arc<Columns>,
216 post: Arc<Columns>,
217 ) -> Result<Option<Diff>> {
218 if post.row_count() == 0 {
219 return Ok(None);
220 }
221 let output_row_numbers = self.translate_row_numbers(txn, parent_index, &pre)?;
222 let pre_output = Arc::unwrap_or_clone(pre).with_row_numbers(output_row_numbers.clone());
223 let post_output = Arc::unwrap_or_clone(post).with_row_numbers(output_row_numbers);
224 Ok(Some(Diff::update(pre_output, post_output)))
225 }
226
227 #[inline]
228 fn translate_append_remove(
229 &self,
230 txn: &mut FlowTransaction,
231 parent_index: usize,
232 pre: Arc<Columns>,
233 ) -> Result<Option<Diff>> {
234 if pre.row_count() == 0 {
235 return Ok(None);
236 }
237 let output_row_numbers = self.translate_row_numbers(txn, parent_index, &pre)?;
238 let output = Arc::unwrap_or_clone(pre).with_row_numbers(output_row_numbers);
239 Ok(Some(Diff::remove(output)))
240 }
241}