use std::sync::Arc;
use reifydb_abi::operator::capabilities::CAPABILITY_ALL_STANDARD;
use reifydb_core::{
encoded::key::EncodedKey,
interface::{
catalog::flow::FlowNodeId,
change::{Change, ChangeOrigin, Diff},
},
internal,
util::encoding::keycode::{deserializer::KeyDeserializer, serializer::KeySerializer},
value::column::columns::Columns,
};
use reifydb_type::{Result, error::Error, value::row_number::RowNumber};
use crate::{
operator::{Operator, Operators, stateful::row::RowNumberProvider},
transaction::FlowTransaction,
};
pub struct AppendOperator {
node: FlowNodeId,
parents: Vec<Arc<Operators>>,
input_nodes: Vec<FlowNodeId>,
row_number_provider: RowNumberProvider,
}
impl AppendOperator {
pub fn new(node: FlowNodeId, parents: Vec<Arc<Operators>>, input_nodes: Vec<FlowNodeId>) -> Self {
debug_assert_eq!(parents.len(), input_nodes.len());
debug_assert!(parents.len() >= 2, "Append requires at least 2 inputs");
Self {
node,
parents,
input_nodes,
row_number_provider: RowNumberProvider::new(node),
}
}
fn parent_index_for_origin(&self, origin: &ChangeOrigin) -> Option<usize> {
match origin {
ChangeOrigin::Flow(from_node) => self.input_nodes.iter().position(|n| n == from_node),
ChangeOrigin::Shape(_) => None,
}
}
fn make_composite_key(parent_index: u8, source_row: RowNumber) -> EncodedKey {
let mut serializer = KeySerializer::new();
serializer.extend_u8(parent_index);
serializer.extend_u64(source_row.0);
serializer.finish()
}
fn parse_composite_key(key_bytes: &[u8]) -> Option<(usize, RowNumber)> {
if key_bytes.is_empty() {
return None;
}
let mut de = KeyDeserializer::from_bytes(key_bytes);
let parent_index = de.read_u8().ok()?;
let source_row = de.read_u64().ok()?;
Some((parent_index as usize, RowNumber(source_row)))
}
}
impl Operator for AppendOperator {
fn id(&self) -> FlowNodeId {
self.node
}
fn capabilities(&self) -> u32 {
CAPABILITY_ALL_STANDARD
}
fn apply(&self, txn: &mut FlowTransaction, change: Change) -> Result<Change> {
let parent_origin = change.origin.clone();
let mut result_diffs = Vec::with_capacity(change.diffs.len());
for diff in change.diffs {
let diff_origin = diff.origin().cloned().unwrap_or_else(|| parent_origin.clone());
let parent_index = self.parent_index_for_origin(&diff_origin).ok_or_else(|| {
Error(Box::new(internal!("Append received diff from unknown node: {:?}", diff_origin)))
})?;
match diff {
Diff::Insert {
post,
..
} => {
if let Some(d) = self.translate_append_insert(txn, parent_index, post)? {
result_diffs.push(d);
}
}
Diff::Update {
pre,
post,
..
} => {
if let Some(d) = self.translate_append_update(txn, parent_index, pre, post)? {
result_diffs.push(d);
}
}
Diff::Remove {
pre,
..
} => {
if let Some(d) = self.translate_append_remove(txn, parent_index, pre)? {
result_diffs.push(d);
}
}
}
}
Ok(Change::from_flow(self.node, change.version, result_diffs, change.changed_at))
}
fn pull(&self, txn: &mut FlowTransaction, rows: &[RowNumber]) -> Result<Columns> {
let mut found_columns: Vec<Columns> = Vec::new();
for &row_number in rows {
let Some(key) = self.row_number_provider.get_key_for_row_number(txn, row_number)? else {
continue;
};
let Some((parent_index, source_row_number)) = Self::parse_composite_key(key.as_ref()) else {
continue;
};
if parent_index >= self.parents.len() {
continue;
}
let parent_cols = self.parents[parent_index].pull(txn, &[source_row_number])?;
if !parent_cols.is_empty() {
let updated = parent_cols.with_row_numbers(vec![row_number]);
found_columns.push(updated);
}
}
if found_columns.is_empty() {
self.parents[0].pull(txn, &[])
} else if found_columns.len() == 1 {
Ok(found_columns.remove(0))
} else {
let mut result = found_columns.remove(0);
for cols in found_columns {
result.row_numbers.make_mut().extend(cols.row_numbers.iter().copied());
for (i, col) in cols.columns.into_iter().enumerate() {
result.columns.make_mut()[i]
.extend(col)
.expect("shape mismatch in append pull");
}
}
Ok(result)
}
}
}
impl AppendOperator {
#[inline]
fn translate_row_numbers(
&self,
txn: &mut FlowTransaction,
parent_index: usize,
source: &Columns,
) -> Result<Vec<RowNumber>> {
let row_count = source.row_count();
let mut output_row_numbers = Vec::with_capacity(row_count);
for row_idx in 0..row_count {
let source_row_number = source.row_numbers[row_idx];
let composite_key = Self::make_composite_key(parent_index as u8, source_row_number);
let (output_row_number, _) =
self.row_number_provider.get_or_create_row_number(txn, &composite_key)?;
output_row_numbers.push(output_row_number);
}
Ok(output_row_numbers)
}
#[inline]
fn translate_append_insert(
&self,
txn: &mut FlowTransaction,
parent_index: usize,
post: Arc<Columns>,
) -> Result<Option<Diff>> {
if post.row_count() == 0 {
return Ok(None);
}
let output_row_numbers = self.translate_row_numbers(txn, parent_index, &post)?;
let output = Arc::unwrap_or_clone(post).with_row_numbers(output_row_numbers);
Ok(Some(Diff::insert(output)))
}
#[inline]
fn translate_append_update(
&self,
txn: &mut FlowTransaction,
parent_index: usize,
pre: Arc<Columns>,
post: Arc<Columns>,
) -> Result<Option<Diff>> {
if post.row_count() == 0 {
return Ok(None);
}
let output_row_numbers = self.translate_row_numbers(txn, parent_index, &pre)?;
let pre_output = Arc::unwrap_or_clone(pre).with_row_numbers(output_row_numbers.clone());
let post_output = Arc::unwrap_or_clone(post).with_row_numbers(output_row_numbers);
Ok(Some(Diff::update(pre_output, post_output)))
}
#[inline]
fn translate_append_remove(
&self,
txn: &mut FlowTransaction,
parent_index: usize,
pre: Arc<Columns>,
) -> Result<Option<Diff>> {
if pre.row_count() == 0 {
return Ok(None);
}
let output_row_numbers = self.translate_row_numbers(txn, parent_index, &pre)?;
let output = Arc::unwrap_or_clone(pre).with_row_numbers(output_row_numbers);
Ok(Some(Diff::remove(output)))
}
}