reifydb_flow_operator_sdk/testing/
harness.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::{collections::HashMap, ffi::c_void, marker::PhantomData};
5
6use reifydb_core::{
7	CommitVersion, Row,
8	interface::FlowNodeId,
9	value::encoded::{EncodedKey, EncodedValues},
10};
11use reifydb_flow_operator_abi::FFIContext;
12use reifydb_type::{RowNumber, Value};
13
14use super::{TestContext, TestStateStore, callbacks};
15use crate::{FFIOperator, FFIOperatorMetadata, FlowChange, OperatorContext, Result};
16
17/// Test harness for FFI operators
18///
19/// This harness provides a complete testing environment for FFI operators with:
20/// - Mock FFI context with test-specific callbacks
21/// - State management via TestContext
22/// - Version tracking
23/// - Log capture (to stderr for now)
24/// - Full support for apply() and get_rows()
25pub struct OperatorTestHarness<T: FFIOperator> {
26	operator: T,
27	context: Box<TestContext>, // Boxed for stable address (pointed to by ffi_context)
28	ffi_context: Box<FFIContext>,
29	config: HashMap<String, Value>,
30	node_id: FlowNodeId,
31}
32
33impl<T: FFIOperator> OperatorTestHarness<T> {
34	/// Create a new test harness builder
35	pub fn builder() -> TestHarnessBuilder<T> {
36		TestHarnessBuilder::new()
37	}
38
39	/// Apply a flow change to the operator
40	pub fn apply(&mut self, input: FlowChange) -> Result<FlowChange> {
41		let mut ctx = self.create_operator_context();
42		self.operator.apply(&mut ctx, input)
43	}
44
45	/// Get rows by their row numbers
46	pub fn get_rows(&mut self, row_numbers: &[RowNumber]) -> Result<Vec<Option<Row>>> {
47		let mut ctx = self.create_operator_context();
48		self.operator.get_rows(&mut ctx, row_numbers)
49	}
50
51	/// Get the current version
52	pub fn version(&self) -> CommitVersion {
53		(*self.context).version()
54	}
55
56	/// Set the current version
57	pub fn set_version(&mut self, version: CommitVersion) {
58		(*self.context).set_version(version);
59	}
60
61	/// Get access to the state store for assertions
62	pub fn state(&self) -> TestStateStore {
63		let store = self.context.state_store();
64		let data = store.lock().unwrap();
65		let mut result = TestStateStore::new();
66		for (k, v) in data.iter() {
67			result.set(k.clone(), v.clone());
68		}
69		result
70	}
71
72	/// Assert that a state key exists with the given value
73	pub fn assert_state<K>(&self, key: K, expected: Value)
74	where
75		K: reifydb_core::key::EncodableKey,
76	{
77		use reifydb_core::value::encoded::EncodedValuesLayout;
78		let encoded_key = key.encode();
79		let store = self.state();
80		let layout = EncodedValuesLayout::new(&[expected.get_type()]);
81
82		store.assert_value(&encoded_key, &[expected], &layout);
83	}
84
85	/// Get captured log messages
86	pub fn logs(&self) -> Vec<String> {
87		(*self.context).logs()
88	}
89
90	/// Clear captured log messages
91	pub fn clear_logs(&self) {
92		(*self.context).clear_logs()
93	}
94
95	/// Take a snapshot of the current state
96	pub fn snapshot_state(&self) -> HashMap<EncodedKey, EncodedValues> {
97		self.state().snapshot()
98	}
99
100	/// Restore state from a snapshot
101	pub fn restore_state(&mut self, snapshot: HashMap<EncodedKey, EncodedValues>) {
102		(*self.context).clear_state();
103		for (k, v) in snapshot {
104			(*self.context).set_state(k, v.0.to_vec());
105		}
106	}
107
108	/// Reset the harness to initial state
109	pub fn reset(&mut self) -> Result<()> {
110		(*self.context).clear_state();
111		(*self.context).clear_logs();
112		(*self.context).set_version(CommitVersion(1));
113
114		// Recreate the operator
115		self.operator = T::new(self.node_id, &self.config)?;
116		Ok(())
117	}
118
119	/// Create an operator context for direct access
120	///
121	/// This is useful for testing components that need an OperatorContext
122	/// without going through the apply() or get_rows() methods.
123	///
124	/// # Example
125	///
126	/// ```ignore
127	/// let mut harness = TestHarnessBuilder::<MyOperator>::new().build()?;
128	/// let mut ctx = harness.create_operator_context();
129	/// let (row_num, is_new) = ctx.get_or_create_row_number(harness.operator(), &key)?;
130	/// ```
131	pub fn create_operator_context(&mut self) -> OperatorContext {
132		OperatorContext::new(&mut *self.ffi_context as *mut FFIContext)
133	}
134
135	/// Get a reference to the operator
136	pub fn operator(&self) -> &T {
137		&self.operator
138	}
139
140	/// Get a mutable reference to the operator
141	pub fn operator_mut(&mut self) -> &mut T {
142		&mut self.operator
143	}
144
145	/// Get the node ID
146	pub fn node_id(&self) -> FlowNodeId {
147		self.node_id
148	}
149}
150
151/// Builder for OperatorTestHarness
152pub struct TestHarnessBuilder<T: FFIOperator> {
153	config: HashMap<String, Value>,
154	node_id: FlowNodeId,
155	version: CommitVersion,
156	initial_state: HashMap<EncodedKey, EncodedValues>,
157	_phantom: PhantomData<T>,
158}
159
160impl<T: FFIOperator> TestHarnessBuilder<T> {
161	/// Create a new builder
162	pub fn new() -> Self {
163		Self {
164			config: HashMap::new(),
165			node_id: FlowNodeId(1),
166			version: CommitVersion(1),
167			initial_state: HashMap::new(),
168			_phantom: PhantomData,
169		}
170	}
171
172	/// Set the operator configuration
173	pub fn with_config<I, K>(mut self, config: I) -> Self
174	where
175		I: IntoIterator<Item = (K, Value)>,
176		K: Into<String>,
177	{
178		self.config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
179		self
180	}
181
182	/// Add a single config value
183	pub fn add_config(mut self, key: impl Into<String>, value: Value) -> Self {
184		self.config.insert(key.into(), value);
185		self
186	}
187
188	/// Set the node ID
189	pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
190		self.node_id = node_id;
191		self
192	}
193
194	/// Set the initial version
195	pub fn with_version(mut self, version: CommitVersion) -> Self {
196		self.version = version;
197		self
198	}
199
200	/// Set initial state
201	pub fn with_initial_state<K>(mut self, key: K, value: Vec<u8>) -> Self
202	where
203		K: reifydb_core::key::EncodableKey,
204	{
205		use reifydb_core::CowVec;
206		self.initial_state.insert(key.encode(), EncodedValues(CowVec::new(value)));
207		self
208	}
209
210	/// Build the test harness
211	pub fn build(self) -> Result<OperatorTestHarness<T>> {
212		// Create TestContext in a Box for stable address
213		let context = Box::new(TestContext::new(self.version));
214
215		// Set initial state
216		for (k, v) in self.initial_state {
217			context.set_state(k, v.0.to_vec());
218		}
219
220		// Create FFI context with test callbacks
221		// The txn_ptr points to the TestContext
222		let ffi_context = Box::new(FFIContext {
223			txn_ptr: &*context as *const TestContext as *mut c_void,
224			operator_id: self.node_id.0,
225			callbacks: callbacks::create_test_callbacks(),
226		});
227
228		// Create the operator
229		let operator = T::new(self.node_id, &self.config)?;
230
231		Ok(OperatorTestHarness {
232			operator,
233			context,
234			ffi_context,
235			config: self.config,
236			node_id: self.node_id,
237		})
238	}
239}
240
241/// Helper for testing operators with metadata
242pub struct TestMetadataHarness;
243
244impl TestMetadataHarness {
245	/// Assert an operator has the expected name
246	pub fn assert_name<T: FFIOperatorMetadata>(expected: &str) {
247		assert_eq!(T::NAME, expected, "Operator name mismatch. Expected: {}, Actual: {}", expected, T::NAME);
248	}
249
250	/// Assert an operator has the expected API version
251	pub fn assert_api<T: FFIOperatorMetadata>(expected: u32) {
252		assert_eq!(
253			T::API,
254			expected,
255			"Operator API version mismatch. Expected: {}, Actual: {}",
256			expected,
257			T::API
258		);
259	}
260
261	/// Assert an operator has the expected semantic version
262	pub fn assert_version<T: FFIOperatorMetadata>(expected: &str) {
263		assert_eq!(
264			T::VERSION,
265			expected,
266			"Operator version mismatch. Expected: {}, Actual: {}",
267			expected,
268			T::VERSION
269		);
270	}
271}
272
273#[cfg(test)]
274mod tests {
275	use reifydb_core::value::encoded::{EncodedValuesLayout, IntoEncodedKey};
276	use reifydb_type::Type;
277
278	use super::{super::helpers::encode_key, *};
279	use crate::testing::TestFlowChangeBuilder;
280
281	// Simple pass-through operator for basic tests
282	struct TestOperator {
283		_node_id: FlowNodeId,
284		_config: HashMap<String, Value>,
285	}
286
287	impl FFIOperatorMetadata for TestOperator {
288		const NAME: &'static str = "test_operator";
289		const API: u32 = 1;
290		const VERSION: &'static str = "1.0.0";
291		const DESCRIPTION: &'static str = "Simple pass-through test operator";
292		const INPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
293		const OUTPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
294		const CAPABILITIES: u32 = crate::prelude::CAPABILITY_ALL_STANDARD;
295	}
296
297	impl FFIOperator for TestOperator {
298		fn new(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self> {
299			Ok(Self {
300				_node_id: operator_id,
301				_config: config.clone(),
302			})
303		}
304
305		fn apply(&mut self, _ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
306			// Simple pass-through for testing
307			Ok(input)
308		}
309
310		fn get_rows(
311			&mut self,
312			_ctx: &mut OperatorContext,
313			_row_numbers: &[RowNumber],
314		) -> Result<Vec<Option<Row>>> {
315			Ok(vec![])
316		}
317	}
318
319	// Stateful operator that stores values from flow changes
320	struct StatefulTestOperator;
321
322	impl FFIOperatorMetadata for StatefulTestOperator {
323		const NAME: &'static str = "stateful_test_operator";
324		const API: u32 = 1;
325		const VERSION: &'static str = "1.0.0";
326		const DESCRIPTION: &'static str = "Stateful test operator that stores values";
327		const INPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
328		const OUTPUT_COLUMNS: &'static [crate::OperatorColumnDef] = &[];
329		const CAPABILITIES: u32 = crate::prelude::CAPABILITY_ALL_STANDARD;
330	}
331
332	impl FFIOperator for StatefulTestOperator {
333		fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
334			Ok(Self)
335		}
336
337		fn apply(&mut self, ctx: &mut OperatorContext, input: FlowChange) -> Result<FlowChange> {
338			let mut state = ctx.state();
339
340			for diff in &input.diffs {
341				let post_row = match diff {
342					crate::FlowDiff::Insert {
343						post,
344					} => Some(post),
345					crate::FlowDiff::Update {
346						post,
347						..
348					} => Some(post),
349					crate::FlowDiff::Remove {
350						..
351					} => unreachable!(),
352				};
353
354				if let Some(row) = post_row {
355					let row_key = format!("row_{}", row.number.0);
356
357					let first_value = row.layout.get_value_by_idx(&row.encoded, 0);
358
359					// Encode the value and store in state
360					let layout = EncodedValuesLayout::new(&[Type::Int8]);
361					let mut encoded = layout.allocate();
362					layout.set_values(&mut encoded, &[first_value]);
363
364					state.set(&row_key.into_encoded_key(), &encoded)?;
365				}
366			}
367
368			Ok(input)
369		}
370
371		fn get_rows(
372			&mut self,
373			_ctx: &mut OperatorContext,
374			_row_numbers: &[RowNumber],
375		) -> Result<Vec<Option<Row>>> {
376			Ok(vec![])
377		}
378	}
379
380	#[test]
381	fn test_operator_metadata() {
382		TestMetadataHarness::assert_name::<TestOperator>("test_operator");
383		TestMetadataHarness::assert_api::<TestOperator>(1);
384		TestMetadataHarness::assert_version::<TestOperator>("1.0.0");
385	}
386
387	#[test]
388	fn test_harness_builder() {
389		let result = TestHarnessBuilder::<TestOperator>::new()
390			.with_node_id(FlowNodeId(42))
391			.with_version(CommitVersion(10))
392			.add_config("key", Value::Utf8("value".into()))
393			.build();
394
395		assert!(result.is_ok());
396
397		let harness = result.unwrap();
398		assert_eq!(harness.node_id, 42);
399		assert_eq!(harness.version(), 10);
400	}
401
402	#[test]
403	fn test_harness_with_stateful_operator() {
404		// Build harness with stateful operator
405		let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
406			.with_node_id(FlowNodeId(1))
407			.build()
408			.expect("Failed to build harness");
409
410		// Create a flow change with an insert
411		let input = TestFlowChangeBuilder::new().insert_row(1, vec![Value::Int8(42i64)]).build();
412
413		// Apply the flow change - operator should store the value in state
414		let output = harness.apply(input).expect("Apply failed");
415
416		// Verify output has the expected diff
417		assert_eq!(output.diffs.len(), 1);
418
419		// Verify the operator stored state correctly via FFI callbacks
420		let state = harness.state();
421		let layout = EncodedValuesLayout::new(&[Type::Int8]);
422		let key = encode_key("row_1");
423
424		// Assert the state was set through the FFI bridge
425		state.assert_value(&key, &[Value::Int8(42i64)], &layout);
426	}
427
428	#[test]
429	fn test_harness_multiple_operations() {
430		let mut harness =
431			TestHarnessBuilder::<StatefulTestOperator>::new().build().expect("Failed to build harness");
432
433		// Insert multiple rows
434		let input1 = TestFlowChangeBuilder::new()
435			.insert_row(1, vec![Value::Int8(10i64)])
436			.insert_row(2, vec![Value::Int8(20i64)])
437			.build();
438
439		harness.apply(input1).expect("First apply failed");
440
441		let state = harness.state();
442		assert_eq!(state.len(), 2);
443
444		// Insert another row
445		let input2 = TestFlowChangeBuilder::new().insert_row(RowNumber(3), vec![Value::Int8(30i64)]).build();
446
447		harness.apply(input2).expect("Second apply failed");
448
449		// Verify all three values were stored
450		let state = harness.state();
451		let layout = EncodedValuesLayout::new(&[Type::Int8]);
452
453		state.assert_value(&encode_key("row_1"), &[Value::Int8(10i64)], &layout);
454		state.assert_value(&encode_key("row_2"), &[Value::Int8(20i64)], &layout);
455		state.assert_value(&encode_key("row_3"), &[Value::Int8(30i64)], &layout);
456
457		// Verify total state count
458		assert_eq!(state.len(), 3);
459	}
460}