Skip to main content

reifydb_sub_flow/operator/
append.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::key::EncodedKey,
8	interface::{
9		catalog::flow::FlowNodeId,
10		change::{Change, ChangeOrigin, Diff},
11	},
12	internal,
13	util::encoding::keycode::serializer::KeySerializer,
14	value::column::columns::Columns,
15};
16use reifydb_type::{Result, error::Error, value::row_number::RowNumber};
17
18use crate::{
19	operator::{Operator, Operators, stateful::row::RowNumberProvider},
20	transaction::FlowTransaction,
21};
22
23/// APPEND operator that appends N input flows (N >= 2) with identical schemas
24/// into a single output flow. Keeps all rows including duplicates.
25pub struct AppendOperator {
26	node: FlowNodeId,
27	/// Parent operators indexed by their position (0..N)
28	parents: Vec<Arc<Operators>>,
29	/// Input node IDs for matching ChangeOrigin
30	input_nodes: Vec<FlowNodeId>,
31	/// Row number provider for stable output row numbers
32	row_number_provider: RowNumberProvider,
33}
34
35impl AppendOperator {
36	pub fn new(node: FlowNodeId, parents: Vec<Arc<Operators>>, input_nodes: Vec<FlowNodeId>) -> Self {
37		debug_assert_eq!(parents.len(), input_nodes.len());
38		debug_assert!(parents.len() >= 2, "Append requires at least 2 inputs");
39
40		Self {
41			node,
42			parents,
43			input_nodes,
44			row_number_provider: RowNumberProvider::new(node),
45		}
46	}
47
48	/// Find which parent index a change originated from
49	fn determine_parent_index(&self, change: &Change) -> Option<usize> {
50		match &change.origin {
51			ChangeOrigin::Flow(from_node) => self.input_nodes.iter().position(|n| n == from_node),
52			ChangeOrigin::Schema(_) => None,
53		}
54	}
55
56	/// Create composite key: [parent_index: u8][source_row_number: u64]
57	fn make_composite_key(parent_index: u8, source_row: RowNumber) -> EncodedKey {
58		let mut serializer = KeySerializer::new();
59		serializer.extend_u8(parent_index);
60		serializer.extend_u64(source_row.0);
61		EncodedKey::new(serializer.finish())
62	}
63
64	/// Parse composite key to extract (parent_index, source_row_number)
65	/// Key format after keycode encoding: [!parent_index: 1 byte][!source_row_number: 8 bytes]
66	fn parse_composite_key(key_bytes: &[u8]) -> Option<(usize, RowNumber)> {
67		if key_bytes.len() < 9 {
68			return None;
69		}
70		// Decode parent_index (u8 with bitwise NOT)
71		let parent_index = !key_bytes[0];
72		// Decode source_row_number (u64 big-endian with bitwise NOT)
73		let source_row = u64::from_be_bytes([
74			!key_bytes[1],
75			!key_bytes[2],
76			!key_bytes[3],
77			!key_bytes[4],
78			!key_bytes[5],
79			!key_bytes[6],
80			!key_bytes[7],
81			!key_bytes[8],
82		]);
83		Some((parent_index as usize, RowNumber(source_row)))
84	}
85}
86
87impl Operator for AppendOperator {
88	fn id(&self) -> FlowNodeId {
89		self.node
90	}
91
92	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
93		let parent_index = self.determine_parent_index(&change).ok_or_else(|| {
94			Error(internal!("Append received change from unknown node: {:?}", change.origin))
95		})?;
96
97		let mut result_diffs = Vec::with_capacity(change.diffs.len());
98
99		for diff in change.diffs {
100			match diff {
101				Diff::Insert {
102					post,
103				} => {
104					let row_count = post.row_count();
105					if row_count == 0 {
106						continue;
107					}
108
109					let mut output_row_numbers = Vec::with_capacity(row_count);
110					for row_idx in 0..row_count {
111						let source_row_number = post.row_numbers[row_idx];
112						let composite_key =
113							Self::make_composite_key(parent_index as u8, source_row_number);
114						let (output_row_number, _is_new) = self
115							.row_number_provider
116							.get_or_create_row_number(txn, &composite_key)?;
117
118						output_row_numbers.push(output_row_number);
119					}
120
121					let output = Columns::with_row_numbers(
122						post.columns.as_ref().to_vec(),
123						output_row_numbers,
124					);
125
126					result_diffs.push(Diff::Insert {
127						post: output,
128					});
129				}
130				Diff::Update {
131					pre,
132					post,
133				} => {
134					let row_count = post.row_count();
135					if row_count == 0 {
136						continue;
137					}
138
139					let mut output_row_numbers = Vec::with_capacity(row_count);
140					for row_idx in 0..row_count {
141						let source_row_number = pre.row_numbers[row_idx];
142						let composite_key =
143							Self::make_composite_key(parent_index as u8, source_row_number);
144						let (output_row_number, _) = self
145							.row_number_provider
146							.get_or_create_row_number(txn, &composite_key)?;
147						output_row_numbers.push(output_row_number);
148					}
149
150					let pre_output = Columns::with_row_numbers(
151						pre.columns.as_ref().to_vec(),
152						output_row_numbers.clone(),
153					);
154					let post_output = Columns::with_row_numbers(
155						post.columns.as_ref().to_vec(),
156						output_row_numbers,
157					);
158
159					result_diffs.push(Diff::Update {
160						pre: pre_output,
161						post: post_output,
162					});
163				}
164				Diff::Remove {
165					pre,
166				} => {
167					let row_count = pre.row_count();
168					if row_count == 0 {
169						continue;
170					}
171
172					let mut output_row_numbers = Vec::with_capacity(row_count);
173					for row_idx in 0..row_count {
174						let source_row_number = pre.row_numbers[row_idx];
175						let composite_key =
176							Self::make_composite_key(parent_index as u8, source_row_number);
177						let (output_row_number, _) = self
178							.row_number_provider
179							.get_or_create_row_number(txn, &composite_key)?;
180						output_row_numbers.push(output_row_number);
181					}
182
183					let output = Columns::with_row_numbers(
184						pre.columns.as_ref().to_vec(),
185						output_row_numbers,
186					);
187
188					result_diffs.push(Diff::Remove {
189						pre: output,
190					});
191				}
192			}
193		}
194
195		Ok(Change::from_flow(self.node, change.version, result_diffs))
196	}
197
198	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
199		let mut found_columns: Vec<Columns> = Vec::new();
200
201		for &row_number in rows {
202			// Reverse lookup: output row number -> composite key
203			let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
204				continue;
205			};
206
207			let Some((parent_index, source_row_number)) = Self::parse_composite_key(key.as_ref()) else {
208				continue;
209			};
210
211			if parent_index >= self.parents.len() {
212				continue;
213			}
214
215			let parent_cols = self.parents[parent_index].pull(txn, &[source_row_number])?;
216
217			if !parent_cols.is_empty() {
218				// Replace row number with append output row number
219				let updated = Columns::with_row_numbers(
220					parent_cols.columns.as_ref().to_vec(),
221					vec![row_number],
222				);
223				found_columns.push(updated);
224			}
225		}
226
227		// Combine found rows
228		if found_columns.is_empty() {
229			self.parents[0].pull(txn, &[])
230		} else if found_columns.len() == 1 {
231			Ok(found_columns.remove(0))
232		} else {
233			let mut result = found_columns.remove(0);
234			for cols in found_columns {
235				result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
236				for (i, col) in cols.columns.into_iter().enumerate() {
237					result.columns.make_mut()[i]
238						.extend(col)
239						.expect("schema mismatch in append pull");
240				}
241			}
242			Ok(result)
243		}
244	}
245}