#![allow(dead_code)]
use std::collections::HashMap;
use reifydb_abi::{
data::column::ColumnTypeCode, flow::diff::DiffType, operator::capabilities::CAPABILITY_ALL_STANDARD,
};
use reifydb_core::{
common::CommitVersion,
interface::{
catalog::flow::FlowNodeId,
change::{Change, Diff, Diffs},
},
value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns},
};
use reifydb_sdk::{
error::Result,
operator::{
FFIOperator, FFIOperatorMetadata,
builder::{ColumnsBuilder, CommittedColumn},
change::{BorrowedChange, BorrowedColumns},
column::OperatorColumn,
context::OperatorContext,
},
testing::harness::TestHarnessBuilder,
};
use reifydb_type::{
fragment::Fragment,
value::{Value, datetime::DateTime, row_number::RowNumber},
};
pub struct PassthroughOperator;
impl FFIOperatorMetadata for PassthroughOperator {
const NAME: &'static str = "ffi_round_trip_passthrough";
const API: u32 = 1;
const VERSION: &'static str = "1.0.0";
const DESCRIPTION: &'static str = "echoes every input diff back via ctx.builder";
const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
}
impl FFIOperator for PassthroughOperator {
fn new(_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
Ok(Self)
}
fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
let mut builder = ctx.builder();
for diff in input.diffs() {
match diff.kind() {
DiffType::Insert => {
let post = diff.post();
let (cols, names) = byte_clone_columns(&mut builder, &post)?;
let names_ref: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
let row_numbers: Vec<RowNumber> =
post.row_numbers().iter().copied().map(RowNumber).collect();
builder.emit_insert(&cols, &names_ref, &row_numbers)?;
}
DiffType::Update => {
let pre = diff.pre();
let post = diff.post();
let (pre_cols, pre_names) = byte_clone_columns(&mut builder, &pre)?;
let (post_cols, post_names) = byte_clone_columns(&mut builder, &post)?;
let pre_names_ref: Vec<&str> = pre_names.iter().map(|s| s.as_str()).collect();
let post_names_ref: Vec<&str> = post_names.iter().map(|s| s.as_str()).collect();
let pre_row_numbers: Vec<RowNumber> =
pre.row_numbers().iter().copied().map(RowNumber).collect();
let post_row_numbers: Vec<RowNumber> =
post.row_numbers().iter().copied().map(RowNumber).collect();
builder.emit_update(
&pre_cols,
&pre_names_ref,
pre.row_count(),
&pre_row_numbers,
&post_cols,
&post_names_ref,
post.row_count(),
&post_row_numbers,
)?;
}
DiffType::Remove => {
let pre = diff.pre();
let (cols, names) = byte_clone_columns(&mut builder, &pre)?;
let names_ref: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
let row_numbers: Vec<RowNumber> =
pre.row_numbers().iter().copied().map(RowNumber).collect();
builder.emit_remove(&cols, &names_ref, &row_numbers)?;
}
}
}
Ok(())
}
fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
Ok(())
}
}
fn byte_clone_columns(
builder: &mut ColumnsBuilder<'_>,
cols: &BorrowedColumns<'_>,
) -> Result<(Vec<CommittedColumn>, Vec<String>)> {
let row_count = cols.row_count();
let mut committed: Vec<CommittedColumn> = Vec::new();
let mut names: Vec<String> = Vec::new();
for col in cols.columns() {
let type_code = col.type_code();
let data_bytes = col.data_bytes();
let active = builder.acquire(type_code, row_count.max(1))?;
active.grow(data_bytes.len().max(row_count))?;
let dst = active.data_ptr();
if !dst.is_null() && !data_bytes.is_empty() {
unsafe {
core::ptr::copy_nonoverlapping(data_bytes.as_ptr(), dst, data_bytes.len());
}
}
if matches!(
type_code,
ColumnTypeCode::Utf8
| ColumnTypeCode::Blob | ColumnTypeCode::Int
| ColumnTypeCode::Uint | ColumnTypeCode::Decimal
| ColumnTypeCode::Any | ColumnTypeCode::DictionaryId
) {
let off = col.offsets();
let dst_off = active.offsets_ptr();
if !dst_off.is_null() && !off.is_empty() {
unsafe {
core::ptr::copy_nonoverlapping(off.as_ptr(), dst_off, off.len());
}
}
}
let bitvec = col.defined_bitvec();
if !bitvec.is_empty() {
let dst_bv = active.bitvec_ptr();
if !dst_bv.is_null() {
unsafe {
core::ptr::copy_nonoverlapping(bitvec.as_ptr(), dst_bv, bitvec.len());
}
}
}
let c = active.commit(row_count)?;
committed.push(c);
names.push(col.name().to_string());
}
Ok((committed, names))
}
pub fn round_trip_column(name: &str, input: ColumnBuffer) -> ColumnBuffer {
let n = input.len();
let row_numbers: Vec<RowNumber> = (1..=(n as u64).max(1)).map(RowNumber).take(n).collect();
let now = DateTime::default();
let timestamps: Vec<DateTime> = vec![now; n];
let cols = vec![ColumnWithName::new(Fragment::internal(name), input)];
let columns = Columns::with_system_columns(cols, row_numbers, timestamps.clone(), timestamps);
let mut diffs: Diffs = Diffs::new();
diffs.push(Diff::insert(columns));
let change = Change::from_flow(FlowNodeId(1), CommitVersion(1), diffs, now);
let mut harness = TestHarnessBuilder::<PassthroughOperator>::new()
.with_node_id(FlowNodeId(1))
.build()
.expect("build harness");
let output = harness.apply(change).expect("apply");
assert_eq!(output.diffs.len(), 1, "expected exactly one output diff");
let out_columns = match &output.diffs[0] {
Diff::Insert {
post,
} => post,
Diff::Update {
post,
..
} => post,
Diff::Remove {
pre,
} => pre,
};
assert_eq!(out_columns.columns.len(), 1, "expected exactly one output column");
out_columns.columns[0].clone()
}
pub fn assert_column_eq(label: &str, expected: &ColumnBuffer, actual: &ColumnBuffer) {
assert_eq!(
expected.get_type(),
actual.get_type(),
"{}: type mismatch: expected {:?}, got {:?}",
label,
expected.get_type(),
actual.get_type()
);
assert_eq!(
expected.len(),
actual.len(),
"{}: row count mismatch: expected {}, got {}",
label,
expected.len(),
actual.len()
);
let exp: Vec<Value> = expected.iter().collect();
let act: Vec<Value> = actual.iter().collect();
for (i, (e, a)) in exp.iter().zip(act.iter()).enumerate() {
let matches = values_match(e, a);
if !matches {
panic!("{}: row {}: expected {:?}, got {:?}", label, i, e, a);
}
}
}
fn values_match(a: &Value, b: &Value) -> bool {
use Value::*;
match (a, b) {
(Float4(av), Float4(bv)) => {
let af: f32 = (*av).into();
let bf: f32 = (*bv).into();
(af.is_nan() && bf.is_nan()) || af.to_bits() == bf.to_bits()
}
(Float8(av), Float8(bv)) => {
let af: f64 = (*av).into();
let bf: f64 = (*bv).into();
(af.is_nan() && bf.is_nan()) || af.to_bits() == bf.to_bits()
}
_ => a == b,
}
}