Skip to main content

reifydb_sub_flow/testing/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4pub mod harness;
5
6use harness::NativeOperatorHarness;
7use reifydb_abi::flow::diff::DiffType;
8use reifydb_core::{interface::change::Change, value::column::columns::Columns};
9use reifydb_sdk::{
10	operator::{FFIOperatorAdapter, OperatorLogic, OperatorMetadata},
11	testing::harness::FFIOperatorHarness,
12};
13use reifydb_value::value::{Value, row_number::RowNumber};
14
15#[derive(Debug, PartialEq)]
16struct ColumnsRender {
17	names: Vec<String>,
18	row_numbers: Vec<RowNumber>,
19	rows: Vec<Vec<Value>>,
20}
21
22#[derive(Debug, PartialEq)]
23struct DiffRender {
24	kind: DiffType,
25	pre: Option<ColumnsRender>,
26	post: Option<ColumnsRender>,
27}
28
29fn render_columns(cols: &Columns) -> ColumnsRender {
30	ColumnsRender {
31		names: (0..cols.len()).map(|i| cols.name_at(i).text().to_string()).collect(),
32		row_numbers: cols.row_numbers.iter().copied().collect(),
33		rows: (0..cols.row_count()).map(|r| cols.row(r)).collect(),
34	}
35}
36
37fn render_change(change: &Change) -> Vec<DiffRender> {
38	change.diffs
39		.iter()
40		.map(|d| DiffRender {
41			kind: d.kind(),
42			pre: d.pre().map(render_columns),
43			post: d.post().map(render_columns),
44		})
45		.collect()
46}
47
48fn run_ffi<C>(config: &[(&str, Value)], inputs: &[Change]) -> Vec<Change>
49where
50	C: OperatorLogic + OperatorMetadata + 'static,
51{
52	let mut harness = FFIOperatorHarness::<FFIOperatorAdapter<C>>::builder()
53		.with_config(config.iter().cloned())
54		.build()
55		.expect("ffi harness build");
56	inputs.iter().map(|input| harness.apply(input.clone()).expect("ffi apply")).collect()
57}
58
59fn run_native<C>(config: &[(&str, Value)], inputs: &[Change]) -> Vec<Change>
60where
61	C: OperatorLogic + OperatorMetadata + 'static,
62{
63	let mut harness = NativeOperatorHarness::<C>::builder()
64		.with_config(config.iter().cloned())
65		.build()
66		.expect("native harness build");
67	inputs.iter().map(|input| harness.apply(input.clone()).expect("native apply")).collect()
68}
69
70pub fn assert_backend_parity<C>(config: Vec<(&str, Value)>, scenarios: &[(&str, Vec<Change>)])
71where
72	C: OperatorLogic + OperatorMetadata + 'static,
73{
74	for (name, inputs) in scenarios {
75		let ffi = run_ffi::<C>(&config, inputs);
76		let native = run_native::<C>(&config, inputs);
77
78		assert_eq!(
79			ffi.len(),
80			native.len(),
81			"scenario '{name}': ffi emitted {} outputs, native emitted {}",
82			ffi.len(),
83			native.len()
84		);
85
86		for (i, (f, n)) in ffi.iter().zip(native.iter()).enumerate() {
87			assert_eq!(
88				render_change(f),
89				render_change(n),
90				"scenario '{name}' apply #{i}: ffi vs native emitted-output mismatch"
91			);
92		}
93	}
94}