Skip to main content

reifydb_sdk/testing/
harness.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::HashMap, ffi::c_void, marker::PhantomData, ops::Index, ptr};
5
6use ptr::null;
7use reifydb_abi::context::context::ContextFFI;
8use reifydb_core::{
9	common::CommitVersion,
10	encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
11	interface::{
12		catalog::flow::FlowNodeId,
13		change::{Change, ChangeOrigin},
14	},
15	key::EncodableKey,
16	row::Row,
17	value::column::columns::Columns,
18};
19use reifydb_type::{
20	util::cowvec::CowVec,
21	value::{Value, row_number::RowNumber},
22};
23
24use crate::{
25	error::Result,
26	ffi::arena::Arena,
27	operator::{FFIOperator, FFIOperatorMetadata, change::BorrowedChange, context::OperatorContext},
28	testing::{
29		builders::TestChangeBuilder,
30		callbacks::create_test_callbacks,
31		context::TestContext,
32		registry::{TestBuilderRegistry, into_diffs, with_registry},
33		state::TestStateStore,
34	},
35};
36
37/// Test harness for FFI operators
38///
39/// This harness provides a complete testing environment for FFI operators with:
40/// - Mock FFI context with test-specific callbacks
41/// - State management via TestContext
42/// - Version tracking
43/// - Log capture (to stderr for now)
44/// - Full support for apply() and pull() driven through the zero-copy ABI
45pub struct OperatorTestHarness<T: FFIOperator> {
46	operator: T,
47	context: Box<TestContext>, // Boxed for stable address (pointed to by ffi_context)
48	ffi_context: Box<ContextFFI>,
49	config: HashMap<String, Value>,
50	node_id: FlowNodeId,
51	history: Vec<Change>,
52	/// Test-side mirror of the host's BuilderRegistry. Captures whatever
53	/// the operator emits via `ctx.builder()` during `apply` / `pull` /
54	/// `tick` so the harness can synthesise an output `Change` for the
55	/// caller to inspect.
56	builder_registry: TestBuilderRegistry,
57	/// Arena used to marshal input `Change` -> `ChangeFFI` so the
58	/// operator can read it as a `BorrowedChange`.
59	input_arena: Arena,
60}
61
62impl<T: FFIOperator> OperatorTestHarness<T> {
63	/// Create a new test harness builder
64	pub fn builder() -> TestHarnessBuilder<T> {
65		TestHarnessBuilder::new()
66	}
67
68	/// Apply a flow change to the operator via the zero-copy ABI.
69	///
70	/// Marshals the input as a `ChangeFFI` borrow, drives the operator,
71	/// and assembles an output `Change` from whatever the operator emitted
72	/// via `ctx.builder()`. The result is also appended to the harness
73	/// history so it can be inspected via `harness[i]`, `last_change()`,
74	/// or `history_len()`.
75	pub fn apply(&mut self, input: Change) -> Result<Change> {
76		let version = input.version;
77		let changed_at = input.changed_at;
78		let origin = input.origin.clone();
79		// Reset arena for this call (cheap; bumpalo reset).
80		self.input_arena.clear();
81		let ffi_change = self.input_arena.marshal_change(&input);
82		let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
83
84		let result: Result<()> = with_registry(&self.builder_registry, || {
85			let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
86			let borrowed = unsafe { BorrowedChange::from_raw(&ffi_change as *const _) };
87			self.operator.apply(&mut op_ctx, borrowed)?;
88			// Mirror the production txn-commit lifecycle: drain dirty StateCache
89			// entries before the call returns. Without this step, operators
90			// that buffer state through StateCache silently drop their in-flight
91			// state across snapshot/restore cycles.
92			self.operator.flush_state(&mut op_ctx)
93		});
94		// Drop the input arena's outstanding scaffolding before doing
95		// anything else (input pointers are now invalid).
96		drop(input);
97		result?;
98
99		let emitted = self.builder_registry.drain_diffs();
100		let diffs = into_diffs(emitted);
101		let output = match origin {
102			ChangeOrigin::Flow(node) => Change::from_flow(node, version, diffs, changed_at),
103			ChangeOrigin::Shape(_) => Change::from_flow(self.node_id, version, diffs, changed_at),
104		};
105		self.history.push(output.clone());
106		Ok(output)
107	}
108
109	/// Chainable Insert: applies immediately, records output in history, panics on error.
110	///
111	/// Use for seeding state and/or inspecting emissions:
112	/// ```ignore
113	/// harness.insert(row1).insert(row2);
114	/// assert_eq!(harness[0].diffs.len(), 1);
115	/// ```
116	pub fn insert(&mut self, row: Row) -> &mut Self {
117		let change = TestChangeBuilder::new().insert(row).build();
118		self.apply(change).expect("insert failed");
119		self
120	}
121
122	/// Chainable Update: applies immediately, records output in history, panics on error.
123	pub fn update(&mut self, pre: Row, post: Row) -> &mut Self {
124		let change = TestChangeBuilder::new().update(pre, post).build();
125		self.apply(change).expect("update failed");
126		self
127	}
128
129	/// Chainable Remove: applies immediately, records output in history, panics on error.
130	pub fn remove(&mut self, row: Row) -> &mut Self {
131		let change = TestChangeBuilder::new().remove(row).build();
132		self.apply(change).expect("remove failed");
133		self
134	}
135
136	/// Number of changes recorded in the history so far.
137	pub fn history_len(&self) -> usize {
138		self.history.len()
139	}
140
141	/// Reference to the most recent change, or `None` if the history is empty.
142	pub fn last_change(&self) -> Option<&Change> {
143		self.history.last()
144	}
145
146	/// Clear the recorded history without affecting operator state.
147	pub fn clear_history(&mut self) {
148		self.history.clear();
149	}
150
151	/// Pull rows by their row numbers. The operator emits its result via
152	/// `ctx.builder()` as a single Insert-shaped diff; we read its `post`
153	/// columns as the return value.
154	pub fn pull(&mut self, row_numbers: &[RowNumber]) -> Result<Columns> {
155		let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
156		let result: Result<()> = with_registry(&self.builder_registry, || {
157			let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
158			self.operator.pull(&mut op_ctx, row_numbers)?;
159			self.operator.flush_state(&mut op_ctx)
160		});
161		result?;
162
163		let mut emitted = self.builder_registry.drain_diffs();
164		let cols = if let Some(first) = emitted.drain(..).next() {
165			first.post.or(first.pre).unwrap_or_else(Columns::empty)
166		} else {
167			Columns::empty()
168		};
169		Ok(cols)
170	}
171
172	/// Get the current version
173	pub fn version(&self) -> CommitVersion {
174		(*self.context).version()
175	}
176
177	/// Set the current version
178	pub fn set_version(&mut self, version: CommitVersion) {
179		(*self.context).set_version(version);
180	}
181
182	/// Get access to the state store for assertions
183	pub fn state(&self) -> TestStateStore {
184		let store = self.context.state_store();
185		let data = store.lock().unwrap();
186		let mut result = TestStateStore::new();
187		for (k, v) in data.iter() {
188			result.set(k.clone(), v.clone());
189		}
190		result
191	}
192
193	/// Assert that a state key exists with the given value
194	pub fn assert_state<K>(&self, key: K, expected: Value)
195	where
196		K: EncodableKey,
197	{
198		let encoded_key = key.encode();
199		let store = self.state();
200		let shape = RowShape::testing(&[expected.get_type()]);
201
202		store.assert_value(&encoded_key, &[expected], &shape);
203	}
204
205	/// Get captured log messages
206	pub fn logs(&self) -> Vec<String> {
207		(*self.context).logs()
208	}
209
210	/// Clear captured log messages
211	pub fn clear_logs(&self) {
212		(*self.context).clear_logs()
213	}
214
215	/// Take a snapshot of the current state
216	pub fn snapshot_state(&self) -> HashMap<EncodedKey, EncodedRow> {
217		self.state().snapshot()
218	}
219
220	/// Restore state from a snapshot
221	pub fn restore_state(&mut self, snapshot: HashMap<EncodedKey, EncodedRow>) {
222		(*self.context).clear_state();
223		for (k, v) in snapshot {
224			(*self.context).set_state(k, v.0.to_vec());
225		}
226	}
227
228	/// Reset the harness to initial state
229	pub fn reset(&mut self) -> Result<()> {
230		(*self.context).clear_state();
231		(*self.context).clear_logs();
232		(*self.context).set_version(CommitVersion(1));
233		self.history.clear();
234
235		// Recreate the operator
236		self.operator = T::new(self.node_id, &self.config)?;
237		Ok(())
238	}
239
240	/// Create an operator context for direct access
241	///
242	/// This is useful for testing components that need an OperatorContext
243	/// without going through the apply() or pull() methods.
244	///
245	/// # Example
246	///
247	/// ```ignore
248	/// let mut harness = TestHarnessBuilder::<MyOperator>::new().build()?;
249	/// let mut ctx = harness.create_operator_context();
250	/// let (row_num, is_new) = ctx.get_or_create_row_number(harness.operator(), &key)?;
251	/// ```
252	pub fn create_operator_context(&mut self) -> OperatorContext {
253		OperatorContext::new(&mut *self.ffi_context as *mut ContextFFI)
254	}
255
256	/// Get a reference to the operator
257	pub fn operator(&self) -> &T {
258		&self.operator
259	}
260
261	/// Get a mutable reference to the operator
262	pub fn operator_mut(&mut self) -> &mut T {
263		&mut self.operator
264	}
265
266	/// Get the node ID
267	pub fn node_id(&self) -> FlowNodeId {
268		self.node_id
269	}
270}
271
272/// Index into the harness history - `harness[i]` returns the i-th recorded `Change`.
273///
274/// Panics if `i` is out of bounds.
275impl<T: FFIOperator> Index<usize> for OperatorTestHarness<T> {
276	type Output = Change;
277
278	fn index(&self, index: usize) -> &Self::Output {
279		&self.history[index]
280	}
281}
282
283/// Builder for OperatorTestHarness
284pub struct TestHarnessBuilder<T: FFIOperator> {
285	config: HashMap<String, Value>,
286	node_id: FlowNodeId,
287	version: CommitVersion,
288	initial_state: HashMap<EncodedKey, EncodedRow>,
289	_phantom: PhantomData<T>,
290}
291
292impl<T: FFIOperator> Default for TestHarnessBuilder<T> {
293	fn default() -> Self {
294		Self::new()
295	}
296}
297
298impl<T: FFIOperator> TestHarnessBuilder<T> {
299	/// Create a new builder
300	pub fn new() -> Self {
301		Self {
302			config: HashMap::new(),
303			node_id: FlowNodeId(1),
304			version: CommitVersion(1),
305			initial_state: HashMap::new(),
306			_phantom: PhantomData,
307		}
308	}
309
310	/// Set the operator configuration
311	pub fn with_config<I, K>(mut self, config: I) -> Self
312	where
313		I: IntoIterator<Item = (K, Value)>,
314		K: Into<String>,
315	{
316		self.config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
317		self
318	}
319
320	/// Add a single config value
321	pub fn add_config(mut self, key: impl Into<String>, value: Value) -> Self {
322		self.config.insert(key.into(), value);
323		self
324	}
325
326	/// Set the node ID
327	pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
328		self.node_id = node_id;
329		self
330	}
331
332	/// Set the initial version
333	pub fn with_version(mut self, version: CommitVersion) -> Self {
334		self.version = version;
335		self
336	}
337
338	/// Set initial state
339	pub fn with_initial_state<K>(mut self, key: K, value: Vec<u8>) -> Self
340	where
341		K: EncodableKey,
342	{
343		self.initial_state.insert(key.encode(), EncodedRow(CowVec::new(value)));
344		self
345	}
346
347	/// Build the test harness
348	pub fn build(self) -> Result<OperatorTestHarness<T>> {
349		// Create TestContext in a Box for stable address
350		let context = Box::new(TestContext::new(self.version));
351
352		// Set initial state
353		for (k, v) in self.initial_state {
354			context.set_state(k, v.0.to_vec());
355		}
356
357		// Create FFI context with test callbacks
358		// The txn_ptr points to the TestContext
359		let ffi_context = Box::new(ContextFFI {
360			txn_ptr: &*context as *const TestContext as *mut c_void,
361			executor_ptr: null(),
362			operator_id: self.node_id.0,
363			clock_now_nanos: 0,
364			callbacks: create_test_callbacks(),
365		});
366
367		// Create the operator
368		let operator = T::new(self.node_id, &self.config)?;
369
370		Ok(OperatorTestHarness {
371			operator,
372			context,
373			ffi_context,
374			config: self.config,
375			node_id: self.node_id,
376			history: Vec::new(),
377			builder_registry: TestBuilderRegistry::new(),
378			input_arena: Arena::new(),
379		})
380	}
381}
382
383/// Helper for testing operators with metadata
384pub struct TestMetadataHarness;
385
386impl TestMetadataHarness {
387	/// Assert an operator has the expected name
388	pub fn assert_name<T: FFIOperatorMetadata>(expected: &str) {
389		assert_eq!(T::NAME, expected, "Operator name mismatch. Expected: {}, Actual: {}", expected, T::NAME);
390	}
391
392	/// Assert an operator has the expected API version
393	pub fn assert_api<T: FFIOperatorMetadata>(expected: u32) {
394		assert_eq!(
395			T::API,
396			expected,
397			"Operator API version mismatch. Expected: {}, Actual: {}",
398			expected,
399			T::API
400		);
401	}
402
403	/// Assert an operator has the expected semantic version
404	pub fn assert_version<T: FFIOperatorMetadata>(expected: &str) {
405		assert_eq!(
406			T::VERSION,
407			expected,
408			"Operator version mismatch. Expected: {}, Actual: {}",
409			expected,
410			T::VERSION
411		);
412	}
413}
414
415#[cfg(test)]
416pub mod tests {
417	use reifydb_abi::{
418		callbacks::builder::EmitDiffKind, data::column::ColumnTypeCode, flow::diff::DiffType,
419		operator::capabilities::CAPABILITY_ALL_STANDARD,
420	};
421	use reifydb_core::{
422		common::CommitVersion,
423		encoded::{key::IntoEncodedKey, shape::RowShape},
424		interface::catalog::flow::FlowNodeId,
425	};
426	use reifydb_type::value::{row_number::RowNumber, r#type::Type};
427
428	use super::{super::helpers::encode_key, *};
429	use crate::{
430		operator::{
431			FFIOperator, FFIOperatorMetadata,
432			builder::{ColumnsBuilder, CommittedColumn},
433			change::{BorrowedChange, BorrowedColumns},
434			column::OperatorColumn,
435			context::OperatorContext,
436		},
437		testing::builders::{TestChangeBuilder, TestRowBuilder},
438	};
439
440	// Simple pass-through operator for basic tests
441	struct TestOperator {
442		_node_id: FlowNodeId,
443		_config: HashMap<String, Value>,
444	}
445
446	impl FFIOperatorMetadata for TestOperator {
447		const NAME: &'static str = "test_operator";
448		const API: u32 = 1;
449		const VERSION: &'static str = "1.0.0";
450		const DESCRIPTION: &'static str = "Simple pass-through test operator";
451		const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
452		const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
453		const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
454	}
455
456	impl FFIOperator for TestOperator {
457		fn new(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self> {
458			Ok(Self {
459				_node_id: operator_id,
460				_config: config.clone(),
461			})
462		}
463
464		fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
465			// Pass-through: forward each input diff via the builder.
466			forward_diffs_passthrough(ctx, &input)
467		}
468
469		fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
470			Ok(())
471		}
472	}
473
474	// Stateful operator that stores values from flow changes
475	struct StatefulTestOperator;
476
477	impl FFIOperatorMetadata for StatefulTestOperator {
478		const NAME: &'static str = "stateful_test_operator";
479		const API: u32 = 1;
480		const VERSION: &'static str = "1.0.0";
481		const DESCRIPTION: &'static str = "Stateful test operator that stores values";
482		const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
483		const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
484		const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
485	}
486
487	impl FFIOperator for StatefulTestOperator {
488		fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
489			Ok(Self)
490		}
491
492		fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
493			// Stash the post-row's first int8 value into operator
494			// state, keyed by the row number. Then forward the
495			// diffs unchanged via the builder so callers can still
496			// inspect the apply output.
497			for diff in input.diffs() {
498				let post = match diff.kind() {
499					DiffType::Insert | DiffType::Update => Some(diff.post()),
500					DiffType::Remove => None,
501				};
502				if let Some(columns) = post {
503					let row_numbers = columns.row_numbers();
504					let first_int8 = columns
505						.columns()
506						.next()
507						.and_then(|c| unsafe { c.as_slice::<i64>() })
508						.and_then(|s| s.first().copied());
509					if let (Some(&rn), Some(v)) = (row_numbers.first(), first_int8) {
510						let row_key = format!("row_{}", rn);
511						let shape = RowShape::testing(&[Type::Int8]);
512						let mut encoded = shape.allocate();
513						shape.set_values(&mut encoded, &[Value::Int8(v)]);
514						ctx.state().set(&row_key.into_encoded_key(), &encoded)?;
515					}
516				}
517			}
518			forward_diffs_passthrough(ctx, &input)
519		}
520
521		fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
522			Ok(())
523		}
524	}
525
526	/// Helper used by both test operators: read each input diff and emit
527	/// it back unchanged via `ctx.builder()`. This keeps the harness's
528	/// `apply` returning a `Change` that mirrors the input - same shape
529	/// the legacy `Ok(input)` pass-through produced.
530	fn forward_diffs_passthrough(ctx: &mut OperatorContext, input: &BorrowedChange<'_>) -> Result<()> {
531		let mut builder = ctx.builder();
532		for diff in input.diffs() {
533			match diff.kind() {
534				DiffType::Insert => {
535					let (cols, names) = clone_columns(&mut builder, diff.post())?;
536					let post: Vec<CommittedColumn> = cols;
537					let post_names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
538					let row_numbers: Vec<RowNumber> =
539						diff.post().row_numbers().iter().copied().map(RowNumber).collect();
540					let _ = post; // satisfy borrow checker if unused
541					builder.emit_insert(&post, &post_names, &row_numbers)?;
542				}
543				DiffType::Update => {
544					let (pre_cols, pre_names) = clone_columns(&mut builder, diff.pre())?;
545					let (post_cols, post_names) = clone_columns(&mut builder, diff.post())?;
546					let pre_names: Vec<&str> = pre_names.iter().map(|s| s.as_str()).collect();
547					let post_names: Vec<&str> = post_names.iter().map(|s| s.as_str()).collect();
548					let pre_row_count = diff.pre().row_count();
549					let post_row_count = diff.post().row_count();
550					let pre_row_numbers: Vec<RowNumber> =
551						diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
552					let post_row_numbers: Vec<RowNumber> =
553						diff.post().row_numbers().iter().copied().map(RowNumber).collect();
554					builder.emit_update(
555						&pre_cols,
556						&pre_names,
557						pre_row_count,
558						&pre_row_numbers,
559						&post_cols,
560						&post_names,
561						post_row_count,
562						&post_row_numbers,
563					)?;
564				}
565				DiffType::Remove => {
566					let (cols, names) = clone_columns(&mut builder, diff.pre())?;
567					let names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
568					let row_numbers: Vec<RowNumber> =
569						diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
570					builder.emit_remove(&cols, &names, &row_numbers)?;
571				}
572			}
573		}
574		// Suppress emit-kind-not-used warning by silencing the import.
575		let _ = EmitDiffKind::Insert;
576		Ok(())
577	}
578
579	/// Acquire matching builders for each column in `cols`, copy bytes +
580	/// offsets across, commit, and return the committed handles + names.
581	fn clone_columns(
582		builder: &mut ColumnsBuilder<'_>,
583		cols: BorrowedColumns<'_>,
584	) -> Result<(Vec<CommittedColumn>, Vec<String>)> {
585		let row_count = cols.row_count();
586		let mut committed: Vec<CommittedColumn> = Vec::new();
587		let mut names: Vec<String> = Vec::new();
588		for col in cols.columns() {
589			let type_code = col.type_code();
590			let bytes = col.data_bytes();
591			let active = builder.acquire(type_code, row_count.max(1))?;
592			active.grow(bytes.len().max(row_count))?;
593			let dst = active.data_ptr();
594			if !dst.is_null() && !bytes.is_empty() {
595				unsafe {
596					core::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, bytes.len());
597				}
598			}
599			// For var-len types, copy offsets too.
600			if matches!(type_code, ColumnTypeCode::Utf8 | ColumnTypeCode::Blob) {
601				let off = col.offsets();
602				let dst_off = active.offsets_ptr();
603				if !dst_off.is_null() && !off.is_empty() {
604					unsafe {
605						core::ptr::copy_nonoverlapping(off.as_ptr(), dst_off, off.len());
606					}
607				}
608			}
609			let c = active.commit(row_count)?;
610			committed.push(c);
611			names.push(col.name().to_string());
612		}
613		Ok((committed, names))
614	}
615
616	#[test]
617	fn test_operator_metadata() {
618		TestMetadataHarness::assert_name::<TestOperator>("test_operator");
619		TestMetadataHarness::assert_api::<TestOperator>(1);
620		TestMetadataHarness::assert_version::<TestOperator>("1.0.0");
621	}
622
623	#[test]
624	fn test_harness_builder() {
625		let result = TestHarnessBuilder::<TestOperator>::new()
626			.with_node_id(FlowNodeId(42))
627			.with_version(CommitVersion(10))
628			.add_config("key", Value::Utf8("value".into()))
629			.build();
630
631		assert!(result.is_ok());
632
633		let harness = result.unwrap();
634		assert_eq!(harness.node_id, 42);
635		assert_eq!(harness.version(), 10);
636	}
637
638	#[test]
639	fn test_harness_with_stateful_operator() {
640		// Build harness with stateful operator
641		let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
642			.with_node_id(FlowNodeId(1))
643			.build()
644			.expect("Failed to build harness");
645
646		// Create a flow change with an insert
647		let input = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(42i64)]).build();
648
649		// Apply the flow change - operator should store the value in state
650		let output = harness.apply(input).expect("Apply failed");
651
652		// Verify output has the expected diff
653		assert_eq!(output.diffs.len(), 1);
654
655		// Verify the operator stored state correctly via FFI callbacks
656		let state = harness.state();
657		let shape = RowShape::testing(&[Type::Int8]);
658		let key = encode_key("row_1");
659
660		// Assert the state was set through the FFI bridge
661		state.assert_value(&key, &[Value::Int8(42i64)], &shape);
662	}
663
664	#[test]
665	fn test_harness_history_index() {
666		let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
667			.with_node_id(FlowNodeId(1))
668			.build()
669			.expect("Failed to build harness");
670
671		// History starts empty
672		assert_eq!(harness.history_len(), 0);
673		assert!(harness.last_change().is_none());
674
675		// Each apply() call records a Change
676		let input_a = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(1i64)]).build();
677		harness.apply(input_a).expect("apply a failed");
678		assert_eq!(harness.history_len(), 1);
679
680		let input_b = TestChangeBuilder::new().insert_row(2, vec![Value::Int8(2i64)]).build();
681		harness.apply(input_b).expect("apply b failed");
682		assert_eq!(harness.history_len(), 2);
683
684		// Index returns the i-th recorded Change
685		assert_eq!(harness[0].diffs.len(), 1);
686		assert_eq!(harness[1].diffs.len(), 1);
687
688		// Chainable insert also records
689		harness.insert(TestRowBuilder::new(3).add_value(Value::Int8(3i64)).build());
690		assert_eq!(harness.history_len(), 3);
691
692		// last_change returns the most recent
693		assert!(harness.last_change().is_some());
694
695		// clear_history resets without affecting state
696		let state_count_before = harness.state().len();
697		harness.clear_history();
698		assert_eq!(harness.history_len(), 0);
699		assert!(harness.last_change().is_none());
700		assert_eq!(harness.state().len(), state_count_before);
701	}
702
703	#[test]
704	fn test_harness_multiple_operations() {
705		let mut harness =
706			TestHarnessBuilder::<StatefulTestOperator>::new().build().expect("Failed to build harness");
707
708		// Insert multiple rows
709		let input1 = TestChangeBuilder::new()
710			.insert_row(1, vec![Value::Int8(10i64)])
711			.insert_row(2, vec![Value::Int8(20i64)])
712			.build();
713
714		harness.apply(input1).expect("First apply failed");
715
716		let state = harness.state();
717		assert_eq!(state.len(), 2);
718
719		// Insert another row
720		let input2 = TestChangeBuilder::new().insert_row(RowNumber(3), vec![Value::Int8(30i64)]).build();
721
722		harness.apply(input2).expect("Second apply failed");
723
724		// Verify all three values were stored
725		let state = harness.state();
726		let shape = RowShape::testing(&[Type::Int8]);
727
728		state.assert_value(&encode_key("row_1"), &[Value::Int8(10i64)], &shape);
729		state.assert_value(&encode_key("row_2"), &[Value::Int8(20i64)], &shape);
730		state.assert_value(&encode_key("row_3"), &[Value::Int8(30i64)], &shape);
731
732		// Verify total state count
733		assert_eq!(state.len(), 3);
734	}
735}