Skip to main content

reifydb_sdk/testing/
harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, ffi::c_void, marker::PhantomData, ops::Index, ptr};
5
6use ptr::null;
7use reifydb_abi::context::context::ContextFFI;
8use reifydb_core::{
9	common::CommitVersion,
10	encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
11	interface::{
12		catalog::flow::FlowNodeId,
13		change::{Change, ChangeOrigin},
14	},
15	key::EncodableKey,
16	row::Row,
17	value::column::columns::Columns,
18};
19use reifydb_type::{
20	util::cowvec::CowVec,
21	value::{Value, row_number::RowNumber},
22};
23
24use crate::{
25	error::Result,
26	ffi::arena::Arena,
27	operator::{FFIOperator, FFIOperatorMetadata, change::BorrowedChange, context::OperatorContext},
28	testing::{
29		builders::TestChangeBuilder,
30		callbacks::create_test_callbacks,
31		context::TestContext,
32		registry::{TestBuilderRegistry, into_diffs, with_registry},
33		state::TestStateStore,
34	},
35};
36
37pub struct OperatorTestHarness<T: FFIOperator> {
38	operator: T,
39	context: Box<TestContext>,
40	ffi_context: Box<ContextFFI>,
41	config: HashMap<String, Value>,
42	node_id: FlowNodeId,
43	history: Vec<Change>,
44
45	builder_registry: TestBuilderRegistry,
46
47	input_arena: Arena,
48}
49
50impl<T: FFIOperator> OperatorTestHarness<T> {
51	pub fn builder() -> TestHarnessBuilder<T> {
52		TestHarnessBuilder::new()
53	}
54
55	pub fn apply(&mut self, input: Change) -> Result<Change> {
56		let version = input.version;
57		let changed_at = input.changed_at;
58		let origin = input.origin.clone();
59
60		self.input_arena.clear();
61		let ffi_change = self.input_arena.marshal_change(&input);
62		let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
63
64		let result: Result<()> = with_registry(&self.builder_registry, || {
65			let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
66			let borrowed = unsafe { BorrowedChange::from_raw(&ffi_change as *const _) };
67			self.operator.apply(&mut op_ctx, borrowed)?;
68
69			self.operator.flush_state(&mut op_ctx)
70		});
71
72		drop(input);
73		result?;
74
75		let emitted = self.builder_registry.drain_diffs();
76		let diffs = into_diffs(emitted);
77		let output = match origin {
78			ChangeOrigin::Flow(node) => Change::from_flow(node, version, diffs, changed_at),
79			ChangeOrigin::Shape(_) => Change::from_flow(self.node_id, version, diffs, changed_at),
80		};
81		self.history.push(output.clone());
82		Ok(output)
83	}
84
85	pub fn insert(&mut self, row: Row) -> &mut Self {
86		let change = TestChangeBuilder::new().insert(row).build();
87		self.apply(change).expect("insert failed");
88		self
89	}
90
91	pub fn update(&mut self, pre: Row, post: Row) -> &mut Self {
92		let change = TestChangeBuilder::new().update(pre, post).build();
93		self.apply(change).expect("update failed");
94		self
95	}
96
97	pub fn remove(&mut self, row: Row) -> &mut Self {
98		let change = TestChangeBuilder::new().remove(row).build();
99		self.apply(change).expect("remove failed");
100		self
101	}
102
103	pub fn history_len(&self) -> usize {
104		self.history.len()
105	}
106
107	pub fn last_change(&self) -> Option<&Change> {
108		self.history.last()
109	}
110
111	pub fn clear_history(&mut self) {
112		self.history.clear();
113	}
114
115	pub fn pull(&mut self, row_numbers: &[RowNumber]) -> Result<Columns> {
116		let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
117		let result: Result<()> = with_registry(&self.builder_registry, || {
118			let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
119			self.operator.pull(&mut op_ctx, row_numbers)?;
120			self.operator.flush_state(&mut op_ctx)
121		});
122		result?;
123
124		let mut emitted = self.builder_registry.drain_diffs();
125		let cols = if let Some(first) = emitted.drain(..).next() {
126			first.post.or(first.pre).unwrap_or_else(Columns::empty)
127		} else {
128			Columns::empty()
129		};
130		Ok(cols)
131	}
132
133	pub fn version(&self) -> CommitVersion {
134		(*self.context).version()
135	}
136
137	pub fn set_version(&mut self, version: CommitVersion) {
138		(*self.context).set_version(version);
139	}
140
141	pub fn state(&self) -> TestStateStore {
142		let store = self.context.state_store();
143		let data = store.lock().unwrap();
144		let mut result = TestStateStore::new();
145		for (k, v) in data.iter() {
146			result.set(k.clone(), v.clone());
147		}
148		result
149	}
150
151	pub fn assert_state<K>(&self, key: K, expected: Value)
152	where
153		K: EncodableKey,
154	{
155		let encoded_key = key.encode();
156		let store = self.state();
157		let shape = RowShape::testing(&[expected.get_type()]);
158
159		store.assert_value(&encoded_key, &[expected], &shape);
160	}
161
162	pub fn logs(&self) -> Vec<String> {
163		(*self.context).logs()
164	}
165
166	pub fn clear_logs(&self) {
167		(*self.context).clear_logs()
168	}
169
170	pub fn snapshot_state(&self) -> HashMap<EncodedKey, EncodedRow> {
171		self.state().snapshot()
172	}
173
174	pub fn restore_state(&mut self, snapshot: HashMap<EncodedKey, EncodedRow>) {
175		(*self.context).clear_state();
176		for (k, v) in snapshot {
177			(*self.context).set_state(k, v.0.to_vec());
178		}
179	}
180
181	pub fn reset(&mut self) -> Result<()> {
182		(*self.context).clear_state();
183		(*self.context).clear_logs();
184		(*self.context).set_version(CommitVersion(1));
185		self.history.clear();
186
187		self.operator = T::new(self.node_id, &self.config)?;
188		Ok(())
189	}
190
191	pub fn create_operator_context(&mut self) -> OperatorContext {
192		OperatorContext::new(&mut *self.ffi_context as *mut ContextFFI)
193	}
194
195	pub fn operator(&self) -> &T {
196		&self.operator
197	}
198
199	pub fn operator_mut(&mut self) -> &mut T {
200		&mut self.operator
201	}
202
203	pub fn node_id(&self) -> FlowNodeId {
204		self.node_id
205	}
206}
207
208impl<T: FFIOperator> Index<usize> for OperatorTestHarness<T> {
209	type Output = Change;
210
211	fn index(&self, index: usize) -> &Self::Output {
212		&self.history[index]
213	}
214}
215
216pub struct TestHarnessBuilder<T: FFIOperator> {
217	config: HashMap<String, Value>,
218	node_id: FlowNodeId,
219	version: CommitVersion,
220	initial_state: HashMap<EncodedKey, EncodedRow>,
221	_phantom: PhantomData<T>,
222}
223
224impl<T: FFIOperator> Default for TestHarnessBuilder<T> {
225	fn default() -> Self {
226		Self::new()
227	}
228}
229
230impl<T: FFIOperator> TestHarnessBuilder<T> {
231	pub fn new() -> Self {
232		Self {
233			config: HashMap::new(),
234			node_id: FlowNodeId(1),
235			version: CommitVersion(1),
236			initial_state: HashMap::new(),
237			_phantom: PhantomData,
238		}
239	}
240
241	pub fn with_config<I, K>(mut self, config: I) -> Self
242	where
243		I: IntoIterator<Item = (K, Value)>,
244		K: Into<String>,
245	{
246		self.config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
247		self
248	}
249
250	pub fn add_config(mut self, key: impl Into<String>, value: Value) -> Self {
251		self.config.insert(key.into(), value);
252		self
253	}
254
255	pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
256		self.node_id = node_id;
257		self
258	}
259
260	pub fn with_version(mut self, version: CommitVersion) -> Self {
261		self.version = version;
262		self
263	}
264
265	pub fn with_initial_state<K>(mut self, key: K, value: Vec<u8>) -> Self
266	where
267		K: EncodableKey,
268	{
269		self.initial_state.insert(key.encode(), EncodedRow(CowVec::new(value)));
270		self
271	}
272
273	pub fn build(self) -> Result<OperatorTestHarness<T>> {
274		let context = Box::new(TestContext::new(self.version));
275
276		for (k, v) in self.initial_state {
277			context.set_state(k, v.0.to_vec());
278		}
279
280		let ffi_context = Box::new(ContextFFI {
281			txn_ptr: &*context as *const TestContext as *mut c_void,
282			executor_ptr: null(),
283			operator_id: self.node_id.0,
284			clock_now_nanos: 0,
285			callbacks: create_test_callbacks(),
286		});
287
288		let operator = T::new(self.node_id, &self.config)?;
289
290		Ok(OperatorTestHarness {
291			operator,
292			context,
293			ffi_context,
294			config: self.config,
295			node_id: self.node_id,
296			history: Vec::new(),
297			builder_registry: TestBuilderRegistry::new(),
298			input_arena: Arena::new(),
299		})
300	}
301}
302
303pub struct TestMetadataHarness;
304
305impl TestMetadataHarness {
306	pub fn assert_name<T: FFIOperatorMetadata>(expected: &str) {
307		assert_eq!(T::NAME, expected, "Operator name mismatch. Expected: {}, Actual: {}", expected, T::NAME);
308	}
309
310	pub fn assert_api<T: FFIOperatorMetadata>(expected: u32) {
311		assert_eq!(
312			T::API,
313			expected,
314			"Operator API version mismatch. Expected: {}, Actual: {}",
315			expected,
316			T::API
317		);
318	}
319
320	pub fn assert_version<T: FFIOperatorMetadata>(expected: &str) {
321		assert_eq!(
322			T::VERSION,
323			expected,
324			"Operator version mismatch. Expected: {}, Actual: {}",
325			expected,
326			T::VERSION
327		);
328	}
329}
330
331#[cfg(test)]
332pub mod tests {
333	use reifydb_abi::{
334		callbacks::builder::EmitDiffKind, data::column::ColumnTypeCode, flow::diff::DiffType,
335		operator::capabilities::CAPABILITY_ALL_STANDARD,
336	};
337	use reifydb_core::{common::CommitVersion, encoded::key::IntoEncodedKey, interface::catalog::flow::FlowNodeId};
338	use reifydb_type::value::row_number::RowNumber;
339
340	use super::{super::helpers::encode_key, *};
341	use crate::{
342		operator::{
343			FFIOperator, FFIOperatorMetadata,
344			builder::{ColumnsBuilder, CommittedColumn},
345			change::{BorrowedChange, BorrowedColumns},
346			column::operator::OperatorColumn,
347			context::OperatorContext,
348		},
349		testing::builders::{TestChangeBuilder, TestRowBuilder},
350	};
351
352	// Simple pass-through operator for basic tests
353	struct TestOperator {
354		_node_id: FlowNodeId,
355		_config: HashMap<String, Value>,
356	}
357
358	impl FFIOperatorMetadata for TestOperator {
359		const NAME: &'static str = "test_operator";
360		const API: u32 = 1;
361		const VERSION: &'static str = "1.0.0";
362		const DESCRIPTION: &'static str = "Simple pass-through test operator";
363		const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
364		const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
365		const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
366	}
367
368	impl FFIOperator for TestOperator {
369		fn new(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self> {
370			Ok(Self {
371				_node_id: operator_id,
372				_config: config.clone(),
373			})
374		}
375
376		fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
377			// Pass-through: forward each input diff via the builder.
378			forward_diffs_passthrough(ctx, &input)
379		}
380
381		fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
382			Ok(())
383		}
384	}
385
386	// Stateful operator that stores values from flow changes
387	struct StatefulTestOperator;
388
389	impl FFIOperatorMetadata for StatefulTestOperator {
390		const NAME: &'static str = "stateful_test_operator";
391		const API: u32 = 1;
392		const VERSION: &'static str = "1.0.0";
393		const DESCRIPTION: &'static str = "Stateful test operator that stores values";
394		const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
395		const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
396		const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
397	}
398
399	impl FFIOperator for StatefulTestOperator {
400		fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
401			Ok(Self)
402		}
403
404		fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
405			// Stash the post-row's first int8 value into operator
406			// state, keyed by the row number. Then forward the
407			// diffs unchanged via the builder so callers can still
408			// inspect the apply output.
409			for diff in input.diffs() {
410				let post = match diff.kind() {
411					DiffType::Insert | DiffType::Update => Some(diff.post()),
412					DiffType::Remove => None,
413				};
414				if let Some(columns) = post {
415					let row_numbers = columns.row_numbers();
416					let first_int8 = columns
417						.columns()
418						.next()
419						.and_then(|c| unsafe { c.as_slice::<i64>() })
420						.and_then(|s| s.first().copied());
421					if let (Some(&rn), Some(v)) = (row_numbers.first(), first_int8) {
422						let row_key = format!("row_{}", rn);
423						ctx.state().set::<i64>(&row_key.into_encoded_key(), &v)?;
424					}
425				}
426			}
427			forward_diffs_passthrough(ctx, &input)
428		}
429
430		fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
431			Ok(())
432		}
433	}
434
435	/// Helper used by both test operators: read each input diff and emit
436	/// it back unchanged via `ctx.builder()`. This keeps the harness's
437	/// `apply` returning a `Change` that mirrors the input - same shape
438	/// the legacy `Ok(input)` pass-through produced.
439	fn forward_diffs_passthrough(ctx: &mut OperatorContext, input: &BorrowedChange<'_>) -> Result<()> {
440		let mut builder = ctx.builder();
441		for diff in input.diffs() {
442			match diff.kind() {
443				DiffType::Insert => {
444					let (cols, names) = clone_columns(&mut builder, diff.post())?;
445					let post: Vec<CommittedColumn> = cols;
446					let post_names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
447					let row_numbers: Vec<RowNumber> =
448						diff.post().row_numbers().iter().copied().map(RowNumber).collect();
449					let _ = post; // satisfy borrow checker if unused
450					builder.emit_insert(&post, &post_names, &row_numbers)?;
451				}
452				DiffType::Update => {
453					let (pre_cols, pre_names) = clone_columns(&mut builder, diff.pre())?;
454					let (post_cols, post_names) = clone_columns(&mut builder, diff.post())?;
455					let pre_names: Vec<&str> = pre_names.iter().map(|s| s.as_str()).collect();
456					let post_names: Vec<&str> = post_names.iter().map(|s| s.as_str()).collect();
457					let pre_row_count = diff.pre().row_count();
458					let post_row_count = diff.post().row_count();
459					let pre_row_numbers: Vec<RowNumber> =
460						diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
461					let post_row_numbers: Vec<RowNumber> =
462						diff.post().row_numbers().iter().copied().map(RowNumber).collect();
463					builder.emit_update(
464						&pre_cols,
465						&pre_names,
466						pre_row_count,
467						&pre_row_numbers,
468						&post_cols,
469						&post_names,
470						post_row_count,
471						&post_row_numbers,
472					)?;
473				}
474				DiffType::Remove => {
475					let (cols, names) = clone_columns(&mut builder, diff.pre())?;
476					let names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
477					let row_numbers: Vec<RowNumber> =
478						diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
479					builder.emit_remove(&cols, &names, &row_numbers)?;
480				}
481			}
482		}
483		// Suppress emit-kind-not-used warning by silencing the import.
484		let _ = EmitDiffKind::Insert;
485		Ok(())
486	}
487
488	/// Acquire matching builders for each column in `cols`, copy bytes +
489	/// offsets across, commit, and return the committed handles + names.
490	fn clone_columns(
491		builder: &mut ColumnsBuilder<'_>,
492		cols: BorrowedColumns<'_>,
493	) -> Result<(Vec<CommittedColumn>, Vec<String>)> {
494		let row_count = cols.row_count();
495		let mut committed: Vec<CommittedColumn> = Vec::new();
496		let mut names: Vec<String> = Vec::new();
497		for col in cols.columns() {
498			let type_code = col.type_code();
499			let bytes = col.data_bytes();
500			let active = builder.acquire(type_code, row_count.max(1))?;
501			active.grow(bytes.len().max(row_count))?;
502			let dst = active.data_ptr();
503			if !dst.is_null() && !bytes.is_empty() {
504				unsafe {
505					core::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, bytes.len());
506				}
507			}
508			// For var-len types, copy offsets too.
509			if matches!(type_code, ColumnTypeCode::Utf8 | ColumnTypeCode::Blob) {
510				let off = col.offsets();
511				let dst_off = active.offsets_ptr();
512				if !dst_off.is_null() && !off.is_empty() {
513					unsafe {
514						core::ptr::copy_nonoverlapping(off.as_ptr(), dst_off, off.len());
515					}
516				}
517			}
518			let c = active.commit(row_count)?;
519			committed.push(c);
520			names.push(col.name().to_string());
521		}
522		Ok((committed, names))
523	}
524
525	#[test]
526	fn test_operator_metadata() {
527		TestMetadataHarness::assert_name::<TestOperator>("test_operator");
528		TestMetadataHarness::assert_api::<TestOperator>(1);
529		TestMetadataHarness::assert_version::<TestOperator>("1.0.0");
530	}
531
532	#[test]
533	fn test_harness_builder() {
534		let result = TestHarnessBuilder::<TestOperator>::new()
535			.with_node_id(FlowNodeId(42))
536			.with_version(CommitVersion(10))
537			.add_config("key", Value::Utf8("value".into()))
538			.build();
539
540		assert!(result.is_ok());
541
542		let harness = result.unwrap();
543		assert_eq!(harness.node_id, 42);
544		assert_eq!(harness.version(), 10);
545	}
546
547	#[test]
548	fn test_harness_with_stateful_operator() {
549		// Build harness with stateful operator
550		let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
551			.with_node_id(FlowNodeId(1))
552			.build()
553			.expect("Failed to build harness");
554
555		// Create a flow change with an insert
556		let input = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(42i64)]).build();
557
558		// Apply the flow change - operator should store the value in state
559		let output = harness.apply(input).expect("Apply failed");
560
561		// Verify output has the expected diff
562		assert_eq!(output.diffs.len(), 1);
563
564		// Verify the operator stored state correctly via FFI callbacks.
565		// State is wrapped in the canonical operator_state row + postcard
566		// payload, so assertions go through the typed accessor.
567		let state = harness.state();
568		state.assert_typed_value::<i64>(&encode_key("row_1"), &42i64);
569	}
570
571	#[test]
572	fn test_harness_history_index() {
573		let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
574			.with_node_id(FlowNodeId(1))
575			.build()
576			.expect("Failed to build harness");
577
578		// History starts empty
579		assert_eq!(harness.history_len(), 0);
580		assert!(harness.last_change().is_none());
581
582		// Each apply() call records a Change
583		let input_a = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(1i64)]).build();
584		harness.apply(input_a).expect("apply a failed");
585		assert_eq!(harness.history_len(), 1);
586
587		let input_b = TestChangeBuilder::new().insert_row(2, vec![Value::Int8(2i64)]).build();
588		harness.apply(input_b).expect("apply b failed");
589		assert_eq!(harness.history_len(), 2);
590
591		// Index returns the i-th recorded Change
592		assert_eq!(harness[0].diffs.len(), 1);
593		assert_eq!(harness[1].diffs.len(), 1);
594
595		// Chainable insert also records
596		harness.insert(TestRowBuilder::new(3).add_value(Value::Int8(3i64)).build());
597		assert_eq!(harness.history_len(), 3);
598
599		// last_change returns the most recent
600		assert!(harness.last_change().is_some());
601
602		// clear_history resets without affecting state
603		let state_count_before = harness.state().len();
604		harness.clear_history();
605		assert_eq!(harness.history_len(), 0);
606		assert!(harness.last_change().is_none());
607		assert_eq!(harness.state().len(), state_count_before);
608	}
609
610	#[test]
611	fn test_harness_multiple_operations() {
612		let mut harness =
613			TestHarnessBuilder::<StatefulTestOperator>::new().build().expect("Failed to build harness");
614
615		// Insert multiple rows
616		let input1 = TestChangeBuilder::new()
617			.insert_row(1, vec![Value::Int8(10i64)])
618			.insert_row(2, vec![Value::Int8(20i64)])
619			.build();
620
621		harness.apply(input1).expect("First apply failed");
622
623		let state = harness.state();
624		assert_eq!(state.len(), 2);
625
626		// Insert another row
627		let input2 = TestChangeBuilder::new().insert_row(RowNumber(3), vec![Value::Int8(30i64)]).build();
628
629		harness.apply(input2).expect("Second apply failed");
630
631		// Verify all three values were stored
632		let state = harness.state();
633		state.assert_typed_value::<i64>(&encode_key("row_1"), &10i64);
634		state.assert_typed_value::<i64>(&encode_key("row_2"), &20i64);
635		state.assert_typed_value::<i64>(&encode_key("row_3"), &30i64);
636
637		// Verify total state count
638		assert_eq!(state.len(), 3);
639	}
640}