Skip to main content

reifydb_sub_flow/operator/
append.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::sync::Arc;
5
6use reifydb_core::{
7	encoded::key::EncodedKey,
8	interface::{
9		catalog::flow::FlowNodeId,
10		change::{Change, ChangeOrigin, Diff},
11	},
12	internal,
13	util::encoding::keycode::serializer::KeySerializer,
14	value::column::columns::Columns,
15};
16use reifydb_type::{Result, error::Error, value::row_number::RowNumber};
17
18use crate::{
19	operator::{Operator, Operators, stateful::row::RowNumberProvider},
20	transaction::FlowTransaction,
21};
22
23/// APPEND operator that appends N input flows (N >= 2) with identical shapes
24/// into a single output flow. Keeps all rows including duplicates.
25pub struct AppendOperator {
26	node: FlowNodeId,
27	/// Parent operators indexed by their position (0..N)
28	parents: Vec<Arc<Operators>>,
29	/// Input node IDs for matching ChangeOrigin
30	input_nodes: Vec<FlowNodeId>,
31	/// Row number provider for stable output row numbers
32	row_number_provider: RowNumberProvider,
33}
34
35impl AppendOperator {
36	pub fn new(node: FlowNodeId, parents: Vec<Arc<Operators>>, input_nodes: Vec<FlowNodeId>) -> Self {
37		debug_assert_eq!(parents.len(), input_nodes.len());
38		debug_assert!(parents.len() >= 2, "Append requires at least 2 inputs");
39
40		Self {
41			node,
42			parents,
43			input_nodes,
44			row_number_provider: RowNumberProvider::new(node),
45		}
46	}
47
48	/// Find which parent index a change originated from
49	fn determine_parent_index(&self, change: &Change) -> Option<usize> {
50		match &change.origin {
51			ChangeOrigin::Flow(from_node) => self.input_nodes.iter().position(|n| n == from_node),
52			ChangeOrigin::Shape(_) => None,
53		}
54	}
55
56	/// Create composite key: [parent_index: u8][source_row_number: u64]
57	fn make_composite_key(parent_index: u8, source_row: RowNumber) -> EncodedKey {
58		let mut serializer = KeySerializer::new();
59		serializer.extend_u8(parent_index);
60		serializer.extend_u64(source_row.0);
61		EncodedKey::new(serializer.finish())
62	}
63
64	/// Parse composite key to extract (parent_index, source_row_number)
65	/// Key format after keycode encoding: [!parent_index: 1 byte][!source_row_number: 8 bytes]
66	fn parse_composite_key(key_bytes: &[u8]) -> Option<(usize, RowNumber)> {
67		if key_bytes.len() < 9 {
68			return None;
69		}
70		// Decode parent_index (u8 with bitwise NOT)
71		let parent_index = !key_bytes[0];
72		// Decode source_row_number (u64 big-endian with bitwise NOT)
73		let source_row = u64::from_be_bytes([
74			!key_bytes[1],
75			!key_bytes[2],
76			!key_bytes[3],
77			!key_bytes[4],
78			!key_bytes[5],
79			!key_bytes[6],
80			!key_bytes[7],
81			!key_bytes[8],
82		]);
83		Some((parent_index as usize, RowNumber(source_row)))
84	}
85}
86
87impl Operator for AppendOperator {
88	fn id(&self) -> FlowNodeId {
89		self.node
90	}
91
92	fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
93		let parent_index = self.determine_parent_index(&change).ok_or_else(|| {
94			Error(Box::new(internal!("Append received change from unknown node: {:?}", change.origin)))
95		})?;
96
97		let mut result_diffs = Vec::with_capacity(change.diffs.len());
98
99		for diff in change.diffs {
100			match diff {
101				Diff::Insert {
102					post,
103				} => {
104					if let Some(d) = self.translate_append_insert(txn, parent_index, post)? {
105						result_diffs.push(d);
106					}
107				}
108				Diff::Update {
109					pre,
110					post,
111				} => {
112					if let Some(d) = self.translate_append_update(txn, parent_index, pre, post)? {
113						result_diffs.push(d);
114					}
115				}
116				Diff::Remove {
117					pre,
118				} => {
119					if let Some(d) = self.translate_append_remove(txn, parent_index, pre)? {
120						result_diffs.push(d);
121					}
122				}
123			}
124		}
125
126		Ok(Change::from_flow(self.node, change.version, result_diffs, change.changed_at))
127	}
128
129	fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
130		let mut found_columns: Vec<Columns> = Vec::new();
131
132		for &row_number in rows {
133			// Reverse lookup: output row number -> composite key
134			let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
135				continue;
136			};
137
138			let Some((parent_index, source_row_number)) = Self::parse_composite_key(key.as_ref()) else {
139				continue;
140			};
141
142			if parent_index >= self.parents.len() {
143				continue;
144			}
145
146			let parent_cols = self.parents[parent_index].pull(txn, &[source_row_number])?;
147
148			if !parent_cols.is_empty() {
149				// Replace row number with append output row number
150				let updated = parent_cols.with_row_numbers(vec![row_number]);
151				found_columns.push(updated);
152			}
153		}
154
155		// Combine found rows
156		if found_columns.is_empty() {
157			self.parents[0].pull(txn, &[])
158		} else if found_columns.len() == 1 {
159			Ok(found_columns.remove(0))
160		} else {
161			let mut result = found_columns.remove(0);
162			for cols in found_columns {
163				result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
164				for (i, col) in cols.columns.into_iter().enumerate() {
165					result.columns.make_mut()[i]
166						.extend(col)
167						.expect("shape mismatch in append pull");
168				}
169			}
170			Ok(result)
171		}
172	}
173}
174
175impl AppendOperator {
176	#[inline]
177	fn translate_row_numbers(
178		&self,
179		txn: &mut FlowTransaction,
180		parent_index: usize,
181		source: &Columns,
182	) -> Result<Vec<RowNumber>> {
183		let row_count = source.row_count();
184		let mut output_row_numbers = Vec::with_capacity(row_count);
185		for row_idx in 0..row_count {
186			let source_row_number = source.row_numbers[row_idx];
187			let composite_key = Self::make_composite_key(parent_index as u8, source_row_number);
188			let (output_row_number, _) =
189				self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
190			output_row_numbers.push(output_row_number);
191		}
192		Ok(output_row_numbers)
193	}
194
195	#[inline]
196	fn translate_append_insert(
197		&self,
198		txn: &mut FlowTransaction,
199		parent_index: usize,
200		post: Arc<Columns>,
201	) -> Result<Option<Diff>> {
202		if post.row_count() == 0 {
203			return Ok(None);
204		}
205		let output_row_numbers = self.translate_row_numbers(txn, parent_index, &post)?;
206		let output = Arc::unwrap_or_clone(post).with_row_numbers(output_row_numbers);
207		Ok(Some(Diff::insert(output)))
208	}
209
210	#[inline]
211	fn translate_append_update(
212		&self,
213		txn: &mut FlowTransaction,
214		parent_index: usize,
215		pre: Arc<Columns>,
216		post: Arc<Columns>,
217	) -> Result<Option<Diff>> {
218		if post.row_count() == 0 {
219			return Ok(None);
220		}
221		let output_row_numbers = self.translate_row_numbers(txn, parent_index, &pre)?;
222		let pre_output = Arc::unwrap_or_clone(pre).with_row_numbers(output_row_numbers.clone());
223		let post_output = Arc::unwrap_or_clone(post).with_row_numbers(output_row_numbers);
224		Ok(Some(Diff::update(pre_output, post_output)))
225	}
226
227	#[inline]
228	fn translate_append_remove(
229		&self,
230		txn: &mut FlowTransaction,
231		parent_index: usize,
232		pre: Arc<Columns>,
233	) -> Result<Option<Diff>> {
234		if pre.row_count() == 0 {
235			return Ok(None);
236		}
237		let output_row_numbers = self.translate_row_numbers(txn, parent_index, &pre)?;
238		let output = Arc::unwrap_or_clone(pre).with_row_numbers(output_row_numbers);
239		Ok(Some(Diff::remove(output)))
240	}
241}