reifydb_flow_operator_sdk/testing/
harness.rs1use std::{collections::HashMap, ffi::c_void, marker::PhantomData};
5
6use reifydb_core::{
7 CommitVersion, Row,
8 interface::FlowNodeId,
9 value::encoded::{EncodedKey, EncodedValues},
10};
11use reifydb_flow_operator_abi::FFIContext;
12use reifydb_type::{RowNumber, Value};
13
14use super::{TestContext, TestStateStore, callbacks};
15use crate::{FFIOperator, FFIOperatorMetadata, FlowChange, OperatorContext, Result};
16
17pub struct OperatorTestHarness<T: FFIOperator> {
26 operator: T,
27 context: Box<TestContext>, ffi_context: Box<FFIContext>,
29 config: HashMap<String, Value>,
30 node_id: FlowNodeId,
31}
32
33impl<T: FFIOperator> OperatorTestHarness<T> {
34 pub fn builder() -> TestHarnessBuilder<T> {
36 TestHarnessBuilder::new()
37 }
38
39 pub fn apply(&mut self, input: FlowChange) -> Result<FlowChange> {
41 let mut ctx = self.create_operator_context();
42 self.operator.apply(&mut ctx, input)
43 }
44
45 pub fn get_rows(&mut self, row_numbers: &[RowNumber]) -> Result<Vec<Option<Row>>> {
47 let mut ctx = self.create_operator_context();
48 self.operator.get_rows(&mut ctx, row_numbers)
49 }
50
51 pub fn version(&self) -> CommitVersion {
53 (*self.context).version()
54 }
55
56 pub fn set_version(&mut self, version: CommitVersion) {
58 (*self.context).set_version(version);
59 }
60
61 pub fn state(&self) -> TestStateStore {
63 let store = self.context.state_store();
64 let data = store.lock().unwrap();
65 let mut result = TestStateStore::new();
66 for (k, v) in data.iter() {
67 result.set(k.clone(), v.clone());
68 }
69 result
70 }
71
72 pub fn assert_state<K>(&self, key: K, expected: Value)
74 where
75 K: reifydb_core::key::EncodableKey,
76 {
77 use reifydb_core::value::encoded::EncodedValuesLayout;
78 let encoded_key = key.encode();
79 let store = self.state();
80 let layout = EncodedValuesLayout::new(&[expected.get_type()]);
81
82 store.assert_value(&encoded_key, &[expected], &layout);
83 }
84
85 pub fn logs(&self) -> Vec<String> {
87 (*self.context).logs()
88 }
89
90 pub fn clear_logs(&self) {
92 (*self.context).clear_logs()
93 }
94
95 pub fn snapshot_state(&self) -> HashMap<EncodedKey, EncodedValues> {
97 self.state().snapshot()
98 }
99
100 pub fn restore_state(&mut self, snapshot: HashMap<EncodedKey, EncodedValues>) {
102 (*self.context).clear_state();
103 for (k, v) in snapshot {
104 (*self.context).set_state(k, v.0.to_vec());
105 }
106 }
107
108 pub fn reset(&mut self) -> Result<()> {
110 (*self.context).clear_state();
111 (*self.context).clear_logs();
112 (*self.context).set_version(CommitVersion(1));
113
114 self.operator = T::new(self.node_id, &self.config)?;
116 Ok(())
117 }
118
119 pub fn create_operator_context(&mut self) -> OperatorContext {
132 OperatorContext::new(&mut *self.ffi_context as *mut FFIContext)
133 }
134
135 pub fn operator(&self) -> &T {
137 &self.operator
138 }
139
140 pub fn operator_mut(&mut self) -> &mut T {
142 &mut self.operator
143 }
144
145 pub fn node_id(&self) -> FlowNodeId {
147 self.node_id
148 }
149}
150
151pub struct TestHarnessBuilder<T: FFIOperator> {
153 config: HashMap<String, Value>,
154 node_id: FlowNodeId,
155 version: CommitVersion,
156 initial_state: HashMap<EncodedKey, EncodedValues>,
157 _phantom: PhantomData<T>,
158}
159
160impl<T: FFIOperator> TestHarnessBuilder<T> {
161 pub fn new() -> Self {
163 Self {
164 config: HashMap::new(),
165 node_id: FlowNodeId(1),
166 version: CommitVersion(1),
167 initial_state: HashMap::new(),
168 _phantom: PhantomData,
169 }
170 }
171
172 pub fn with_config<I, K>(mut self, config: I) -> Self
174 where
175 I: IntoIterator<Item = (K, Value)>,
176 K: Into<String>,
177 {
178 self.config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
179 self
180 }
181
182 pub fn add_config(mut self, key: impl Into<String>, value: Value) -> Self {
184 self.config.insert(key.into(), value);
185 self
186 }
187
188 pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
190 self.node_id = node_id;
191 self
192 }
193
194 pub fn with_version(mut self, version: CommitVersion) -> Self {
196 self.version = version;
197 self
198 }
199
200 pub fn with_initial_state<K>(mut self, key: K, value: Vec<u8>) -> Self
202 where
203 K: reifydb_core::key::EncodableKey,
204 {
205 use reifydb_core::CowVec;
206 self.initial_state.insert(key.encode(), EncodedValues(CowVec::new(value)));
207 self
208 }
209
210 pub fn build(self) -> Result<OperatorTestHarness<T>> {
212 let context = Box::new(TestContext::new(self.version));
214
215 for (k, v) in self.initial_state {
217 context.set_state(k, v.0.to_vec());
218 }
219
220 let ffi_context = Box::new(FFIContext {
223 txn_ptr: &*context as *const TestContext as *mut c_void,
224 operator_id: self.node_id.0,
225 callbacks: callbacks::create_test_callbacks(),
226 });
227
228 let operator = T::new(self.node_id, &self.config)?;
230
231 Ok(OperatorTestHarness {
232 operator,
233 context,
234 ffi_context,
235 config: self.config,
236 node_id: self.node_id,
237 })
238 }
239}
240
241pub struct TestMetadataHarness;
243
244impl TestMetadataHarness {
245 pub fn assert_name<T: FFIOperatorMetadata>(expected: &str) {
247 assert_eq!(T::NAME, expected, "Operator name mismatch. Expected: {}, Actual: {}", expected, T::NAME);
248 }
249
250 pub fn assert_api<T: FFIOperatorMetadata>(expected: u32) {
252 assert_eq!(
253 T::API,
254 expected,
255 "Operator API version mismatch. Expected: {}, Actual: {}",
256 expected,
257 T::API
258 );
259 }
260
261 pub fn assert_version<T: FFIOperatorMetadata>(expected: &str) {
263 assert_eq!(
264 T::VERSION,
265 expected,
266 "Operator version mismatch. Expected: {}, Actual: {}",
267 expected,
268 T::VERSION
269 );
270 }
271}
272
273#[cfg(test)]
274mod tests {
275 use reifydb_core::value::encoded::{EncodedValuesLayout, IntoEncodedKey};
276 use reifydb_type::Type;
277
278 use super::{super::helpers::encode_key, *};
279 use crate::testing::TestFlowChangeBuilder;
280
281 struct TestOperator {
283 _node_id: FlowNodeId,
284 _config: HashMap<String, Value>,
285 }
286
287 impl FFIOperatorMetadata for TestOperator {
288 const NAME: &'static str = "test_operator";
289 const API: u32 = 1;
290 const VERSION: &'static str = "1.0.0";
291 const DESCRIPTION: &'static str = "Simple pass-through test operator";
292 const INPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
293 const OUTPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
294 const CAPABILITIES: u32 = crate::prelude::CAPABILITY_ALL_STANDARD;
295 }
296
297 impl FFIOperator for TestOperator {
298 fn new(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self> {
299 Ok(Self {
300 _node_id: operator_id,
301 _config: config.clone(),
302 })
303 }
304
305 fn apply(&mut self, _ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
306 Ok(input)
308 }
309
310 fn get_rows(
311 &mut self,
312 _ctx: &mut OperatorContext,
313 _row_numbers: &[RowNumber],
314 ) -> Result<Vec<Option<Row>>> {
315 Ok(vec![])
316 }
317 }
318
319 struct StatefulTestOperator;
321
322 impl FFIOperatorMetadata for StatefulTestOperator {
323 const NAME: &'static str = "stateful_test_operator";
324 const API: u32 = 1;
325 const VERSION: &'static str = "1.0.0";
326 const DESCRIPTION: &'static str = "Stateful test operator that stores values";
327 const INPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
328 const OUTPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
329 const CAPABILITIES: u32 = crate::prelude::CAPABILITY_ALL_STANDARD;
330 }
331
332 impl FFIOperator for StatefulTestOperator {
333 fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
334 Ok(Self)
335 }
336
337 fn apply(&mut self, ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
338 let mut state = ctx.state();
339
340 for diff in &input.diffs {
341 let post_row = match diff {
342 crate::FlowDiff::Insert {
343 post,
344 } => Some(post),
345 crate::FlowDiff::Update {
346 post,
347 ..
348 } => Some(post),
349 crate::FlowDiff::Remove {
350 ..
351 } => unreachable!(),
352 };
353
354 if let Some(row) = post_row {
355 let row_key = format!("row_{}", row.number.0);
356
357 let first_value = row.layout.get_value_by_idx(&row.encoded, 0);
358
359 let layout = EncodedValuesLayout::new(&[Type::Int8]);
361 let mut encoded = layout.allocate();
362 layout.set_values(&mut encoded, &[first_value]);
363
364 state.set(&row_key.into_encoded_key(), &encoded)?;
365 }
366 }
367
368 Ok(input)
369 }
370
371 fn get_rows(
372 &mut self,
373 _ctx: &mut OperatorContext,
374 _row_numbers: &[RowNumber],
375 ) -> Result<Vec<Option<Row>>> {
376 Ok(vec![])
377 }
378 }
379
380 #[test]
381 fn test_operator_metadata() {
382 TestMetadataHarness::assert_name::<TestOperator>("test_operator");
383 TestMetadataHarness::assert_api::<TestOperator>(1);
384 TestMetadataHarness::assert_version::<TestOperator>("1.0.0");
385 }
386
387 #[test]
388 fn test_harness_builder() {
389 let result = TestHarnessBuilder::<TestOperator>::new()
390 .with_node_id(FlowNodeId(42))
391 .with_version(CommitVersion(10))
392 .add_config("key", Value::Utf8("value".into()))
393 .build();
394
395 assert!(result.is_ok());
396
397 let harness = result.unwrap();
398 assert_eq!(harness.node_id, 42);
399 assert_eq!(harness.version(), 10);
400 }
401
402 #[test]
403 fn test_harness_with_stateful_operator() {
404 let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
406 .with_node_id(FlowNodeId(1))
407 .build()
408 .expect("Failed to build harness");
409
410 let input = TestFlowChangeBuilder::new().insert_row(1, vec![Value::Int8(42i64)]).build();
412
413 let output = harness.apply(input).expect("Apply failed");
415
416 assert_eq!(output.diffs.len(), 1);
418
419 let state = harness.state();
421 let layout = EncodedValuesLayout::new(&[Type::Int8]);
422 let key = encode_key("row_1");
423
424 state.assert_value(&key, &[Value::Int8(42i64)], &layout);
426 }
427
428 #[test]
429 fn test_harness_multiple_operations() {
430 let mut harness =
431 TestHarnessBuilder::<StatefulTestOperator>::new().build().expect("Failed to build harness");
432
433 let input1 = TestFlowChangeBuilder::new()
435 .insert_row(1, vec![Value::Int8(10i64)])
436 .insert_row(2, vec![Value::Int8(20i64)])
437 .build();
438
439 harness.apply(input1).expect("First apply failed");
440
441 let state = harness.state();
442 assert_eq!(state.len(), 2);
443
444 let input2 = TestFlowChangeBuilder::new().insert_row(RowNumber(3), vec![Value::Int8(30i64)]).build();
446
447 harness.apply(input2).expect("Second apply failed");
448
449 let state = harness.state();
451 let layout = EncodedValuesLayout::new(&[Type::Int8]);
452
453 state.assert_value(&encode_key("row_1"), &[Value::Int8(10i64)], &layout);
454 state.assert_value(&encode_key("row_2"), &[Value::Int8(20i64)], &layout);
455 state.assert_value(&encode_key("row_3"), &[Value::Int8(30i64)], &layout);
456
457 assert_eq!(state.len(), 3);
459 }
460}