reifydb_sub_flow/testing/
mod.rs1pub mod harness;
5
6use harness::NativeOperatorHarness;
7use reifydb_abi::flow::diff::DiffType;
8use reifydb_core::{interface::change::Change, value::column::columns::Columns};
9use reifydb_sdk::{
10 operator::{FFIOperatorAdapter, OperatorLogic, OperatorMetadata},
11 testing::harness::FFIOperatorHarness,
12};
13use reifydb_value::value::{Value, row_number::RowNumber};
14
15#[derive(Debug, PartialEq)]
16struct ColumnsRender {
17 names: Vec<String>,
18 row_numbers: Vec<RowNumber>,
19 rows: Vec<Vec<Value>>,
20}
21
22#[derive(Debug, PartialEq)]
23struct DiffRender {
24 kind: DiffType,
25 pre: Option<ColumnsRender>,
26 post: Option<ColumnsRender>,
27}
28
29fn render_columns(cols: &Columns) -> ColumnsRender {
30 ColumnsRender {
31 names: (0..cols.len()).map(|i| cols.name_at(i).text().to_string()).collect(),
32 row_numbers: cols.row_numbers.iter().copied().collect(),
33 rows: (0..cols.row_count()).map(|r| cols.row(r)).collect(),
34 }
35}
36
37fn render_change(change: &Change) -> Vec<DiffRender> {
38 change.diffs
39 .iter()
40 .map(|d| DiffRender {
41 kind: d.kind(),
42 pre: d.pre().map(render_columns),
43 post: d.post().map(render_columns),
44 })
45 .collect()
46}
47
48fn run_ffi<C>(config: &[(&str, Value)], inputs: &[Change]) -> Vec<Change>
49where
50 C: OperatorLogic + OperatorMetadata + 'static,
51{
52 let mut harness = FFIOperatorHarness::<FFIOperatorAdapter<C>>::builder()
53 .with_config(config.iter().cloned())
54 .build()
55 .expect("ffi harness build");
56 inputs.iter().map(|input| harness.apply(input.clone()).expect("ffi apply")).collect()
57}
58
59fn run_native<C>(config: &[(&str, Value)], inputs: &[Change]) -> Vec<Change>
60where
61 C: OperatorLogic + OperatorMetadata + 'static,
62{
63 let mut harness = NativeOperatorHarness::<C>::builder()
64 .with_config(config.iter().cloned())
65 .build()
66 .expect("native harness build");
67 inputs.iter().map(|input| harness.apply(input.clone()).expect("native apply")).collect()
68}
69
70pub fn assert_backend_parity<C>(config: Vec<(&str, Value)>, scenarios: &[(&str, Vec<Change>)])
71where
72 C: OperatorLogic + OperatorMetadata + 'static,
73{
74 for (name, inputs) in scenarios {
75 let ffi = run_ffi::<C>(&config, inputs);
76 let native = run_native::<C>(&config, inputs);
77
78 assert_eq!(
79 ffi.len(),
80 native.len(),
81 "scenario '{name}': ffi emitted {} outputs, native emitted {}",
82 ffi.len(),
83 native.len()
84 );
85
86 for (i, (f, n)) in ffi.iter().zip(native.iter()).enumerate() {
87 assert_eq!(
88 render_change(f),
89 render_change(n),
90 "scenario '{name}' apply #{i}: ffi vs native emitted-output mismatch"
91 );
92 }
93 }
94}