1use std::{collections::HashMap, ffi::c_void, marker::PhantomData, ops::Index, ptr};
5
6use ptr::null;
7use reifydb_abi::context::context::ContextFFI;
8use reifydb_core::{
9 common::CommitVersion,
10 encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
11 interface::{
12 catalog::flow::FlowNodeId,
13 change::{Change, ChangeOrigin},
14 },
15 key::EncodableKey,
16 row::Row,
17 value::column::columns::Columns,
18};
19use reifydb_type::{
20 util::cowvec::CowVec,
21 value::{Value, row_number::RowNumber},
22};
23
24use crate::{
25 error::Result,
26 ffi::arena::Arena,
27 operator::{FFIOperator, FFIOperatorMetadata, change::BorrowedChange, context::OperatorContext},
28 testing::{
29 builders::TestChangeBuilder,
30 callbacks::create_test_callbacks,
31 context::TestContext,
32 registry::{TestBuilderRegistry, into_diffs, with_registry},
33 state::TestStateStore,
34 },
35};
36
37pub struct OperatorTestHarness<T: FFIOperator> {
46 operator: T,
47 context: Box<TestContext>, ffi_context: Box<ContextFFI>,
49 config: HashMap<String, Value>,
50 node_id: FlowNodeId,
51 history: Vec<Change>,
52 builder_registry: TestBuilderRegistry,
57 input_arena: Arena,
60}
61
62impl<T: FFIOperator> OperatorTestHarness<T> {
63 pub fn builder() -> TestHarnessBuilder<T> {
65 TestHarnessBuilder::new()
66 }
67
68 pub fn apply(&mut self, input: Change) -> Result<Change> {
76 let version = input.version;
77 let changed_at = input.changed_at;
78 let origin = input.origin.clone();
79 self.input_arena.clear();
81 let ffi_change = self.input_arena.marshal_change(&input);
82 let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
83
84 let result: Result<()> = with_registry(&self.builder_registry, || {
85 let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
86 let borrowed = unsafe { BorrowedChange::from_raw(&ffi_change as *const _) };
87 self.operator.apply(&mut op_ctx, borrowed)?;
88 self.operator.flush_state(&mut op_ctx)
93 });
94 drop(input);
97 result?;
98
99 let emitted = self.builder_registry.drain_diffs();
100 let diffs = into_diffs(emitted);
101 let output = match origin {
102 ChangeOrigin::Flow(node) => Change::from_flow(node, version, diffs, changed_at),
103 ChangeOrigin::Shape(_) => Change::from_flow(self.node_id, version, diffs, changed_at),
104 };
105 self.history.push(output.clone());
106 Ok(output)
107 }
108
109 pub fn insert(&mut self, row: Row) -> &mut Self {
117 let change = TestChangeBuilder::new().insert(row).build();
118 self.apply(change).expect("insert failed");
119 self
120 }
121
122 pub fn update(&mut self, pre: Row, post: Row) -> &mut Self {
124 let change = TestChangeBuilder::new().update(pre, post).build();
125 self.apply(change).expect("update failed");
126 self
127 }
128
129 pub fn remove(&mut self, row: Row) -> &mut Self {
131 let change = TestChangeBuilder::new().remove(row).build();
132 self.apply(change).expect("remove failed");
133 self
134 }
135
136 pub fn history_len(&self) -> usize {
138 self.history.len()
139 }
140
141 pub fn last_change(&self) -> Option<&Change> {
143 self.history.last()
144 }
145
146 pub fn clear_history(&mut self) {
148 self.history.clear();
149 }
150
151 pub fn pull(&mut self, row_numbers: &[RowNumber]) -> Result<Columns> {
155 let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
156 let result: Result<()> = with_registry(&self.builder_registry, || {
157 let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
158 self.operator.pull(&mut op_ctx, row_numbers)?;
159 self.operator.flush_state(&mut op_ctx)
160 });
161 result?;
162
163 let mut emitted = self.builder_registry.drain_diffs();
164 let cols = if let Some(first) = emitted.drain(..).next() {
165 first.post.or(first.pre).unwrap_or_else(Columns::empty)
166 } else {
167 Columns::empty()
168 };
169 Ok(cols)
170 }
171
172 pub fn version(&self) -> CommitVersion {
174 (*self.context).version()
175 }
176
177 pub fn set_version(&mut self, version: CommitVersion) {
179 (*self.context).set_version(version);
180 }
181
182 pub fn state(&self) -> TestStateStore {
184 let store = self.context.state_store();
185 let data = store.lock().unwrap();
186 let mut result = TestStateStore::new();
187 for (k, v) in data.iter() {
188 result.set(k.clone(), v.clone());
189 }
190 result
191 }
192
193 pub fn assert_state<K>(&self, key: K, expected: Value)
195 where
196 K: EncodableKey,
197 {
198 let encoded_key = key.encode();
199 let store = self.state();
200 let shape = RowShape::testing(&[expected.get_type()]);
201
202 store.assert_value(&encoded_key, &[expected], &shape);
203 }
204
205 pub fn logs(&self) -> Vec<String> {
207 (*self.context).logs()
208 }
209
210 pub fn clear_logs(&self) {
212 (*self.context).clear_logs()
213 }
214
215 pub fn snapshot_state(&self) -> HashMap<EncodedKey, EncodedRow> {
217 self.state().snapshot()
218 }
219
220 pub fn restore_state(&mut self, snapshot: HashMap<EncodedKey, EncodedRow>) {
222 (*self.context).clear_state();
223 for (k, v) in snapshot {
224 (*self.context).set_state(k, v.0.to_vec());
225 }
226 }
227
228 pub fn reset(&mut self) -> Result<()> {
230 (*self.context).clear_state();
231 (*self.context).clear_logs();
232 (*self.context).set_version(CommitVersion(1));
233 self.history.clear();
234
235 self.operator = T::new(self.node_id, &self.config)?;
237 Ok(())
238 }
239
240 pub fn create_operator_context(&mut self) -> OperatorContext {
253 OperatorContext::new(&mut *self.ffi_context as *mut ContextFFI)
254 }
255
256 pub fn operator(&self) -> &T {
258 &self.operator
259 }
260
261 pub fn operator_mut(&mut self) -> &mut T {
263 &mut self.operator
264 }
265
266 pub fn node_id(&self) -> FlowNodeId {
268 self.node_id
269 }
270}
271
272impl<T: FFIOperator> Index<usize> for OperatorTestHarness<T> {
276 type Output = Change;
277
278 fn index(&self, index: usize) -> &Self::Output {
279 &self.history[index]
280 }
281}
282
283pub struct TestHarnessBuilder<T: FFIOperator> {
285 config: HashMap<String, Value>,
286 node_id: FlowNodeId,
287 version: CommitVersion,
288 initial_state: HashMap<EncodedKey, EncodedRow>,
289 _phantom: PhantomData<T>,
290}
291
292impl<T: FFIOperator> Default for TestHarnessBuilder<T> {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298impl<T: FFIOperator> TestHarnessBuilder<T> {
299 pub fn new() -> Self {
301 Self {
302 config: HashMap::new(),
303 node_id: FlowNodeId(1),
304 version: CommitVersion(1),
305 initial_state: HashMap::new(),
306 _phantom: PhantomData,
307 }
308 }
309
310 pub fn with_config<I, K>(mut self, config: I) -> Self
312 where
313 I: IntoIterator<Item = (K, Value)>,
314 K: Into<String>,
315 {
316 self.config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
317 self
318 }
319
320 pub fn add_config(mut self, key: impl Into<String>, value: Value) -> Self {
322 self.config.insert(key.into(), value);
323 self
324 }
325
326 pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
328 self.node_id = node_id;
329 self
330 }
331
332 pub fn with_version(mut self, version: CommitVersion) -> Self {
334 self.version = version;
335 self
336 }
337
338 pub fn with_initial_state<K>(mut self, key: K, value: Vec<u8>) -> Self
340 where
341 K: EncodableKey,
342 {
343 self.initial_state.insert(key.encode(), EncodedRow(CowVec::new(value)));
344 self
345 }
346
347 pub fn build(self) -> Result<OperatorTestHarness<T>> {
349 let context = Box::new(TestContext::new(self.version));
351
352 for (k, v) in self.initial_state {
354 context.set_state(k, v.0.to_vec());
355 }
356
357 let ffi_context = Box::new(ContextFFI {
360 txn_ptr: &*context as *const TestContext as *mut c_void,
361 executor_ptr: null(),
362 operator_id: self.node_id.0,
363 clock_now_nanos: 0,
364 callbacks: create_test_callbacks(),
365 });
366
367 let operator = T::new(self.node_id, &self.config)?;
369
370 Ok(OperatorTestHarness {
371 operator,
372 context,
373 ffi_context,
374 config: self.config,
375 node_id: self.node_id,
376 history: Vec::new(),
377 builder_registry: TestBuilderRegistry::new(),
378 input_arena: Arena::new(),
379 })
380 }
381}
382
383pub struct TestMetadataHarness;
385
386impl TestMetadataHarness {
387 pub fn assert_name<T: FFIOperatorMetadata>(expected: &str) {
389 assert_eq!(T::NAME, expected, "Operator name mismatch. Expected: {}, Actual: {}", expected, T::NAME);
390 }
391
392 pub fn assert_api<T: FFIOperatorMetadata>(expected: u32) {
394 assert_eq!(
395 T::API,
396 expected,
397 "Operator API version mismatch. Expected: {}, Actual: {}",
398 expected,
399 T::API
400 );
401 }
402
403 pub fn assert_version<T: FFIOperatorMetadata>(expected: &str) {
405 assert_eq!(
406 T::VERSION,
407 expected,
408 "Operator version mismatch. Expected: {}, Actual: {}",
409 expected,
410 T::VERSION
411 );
412 }
413}
414
415#[cfg(test)]
416pub mod tests {
417 use reifydb_abi::{
418 callbacks::builder::EmitDiffKind, data::column::ColumnTypeCode, flow::diff::DiffType,
419 operator::capabilities::CAPABILITY_ALL_STANDARD,
420 };
421 use reifydb_core::{
422 common::CommitVersion,
423 encoded::{key::IntoEncodedKey, shape::RowShape},
424 interface::catalog::flow::FlowNodeId,
425 };
426 use reifydb_type::value::{row_number::RowNumber, r#type::Type};
427
428 use super::{super::helpers::encode_key, *};
429 use crate::{
430 operator::{
431 FFIOperator, FFIOperatorMetadata,
432 builder::{ColumnsBuilder, CommittedColumn},
433 change::{BorrowedChange, BorrowedColumns},
434 column::OperatorColumn,
435 context::OperatorContext,
436 },
437 testing::builders::{TestChangeBuilder, TestRowBuilder},
438 };
439
440 struct TestOperator {
442 _node_id: FlowNodeId,
443 _config: HashMap<String, Value>,
444 }
445
446 impl FFIOperatorMetadata for TestOperator {
447 const NAME: &'static str = "test_operator";
448 const API: u32 = 1;
449 const VERSION: &'static str = "1.0.0";
450 const DESCRIPTION: &'static str = "Simple pass-through test operator";
451 const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
452 const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
453 const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
454 }
455
456 impl FFIOperator for TestOperator {
457 fn new(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self> {
458 Ok(Self {
459 _node_id: operator_id,
460 _config: config.clone(),
461 })
462 }
463
464 fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
465 forward_diffs_passthrough(ctx, &input)
467 }
468
469 fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
470 Ok(())
471 }
472 }
473
474 struct StatefulTestOperator;
476
477 impl FFIOperatorMetadata for StatefulTestOperator {
478 const NAME: &'static str = "stateful_test_operator";
479 const API: u32 = 1;
480 const VERSION: &'static str = "1.0.0";
481 const DESCRIPTION: &'static str = "Stateful test operator that stores values";
482 const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
483 const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
484 const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
485 }
486
487 impl FFIOperator for StatefulTestOperator {
488 fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
489 Ok(Self)
490 }
491
492 fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
493 for diff in input.diffs() {
498 let post = match diff.kind() {
499 DiffType::Insert | DiffType::Update => Some(diff.post()),
500 DiffType::Remove => None,
501 };
502 if let Some(columns) = post {
503 let row_numbers = columns.row_numbers();
504 let first_int8 = columns
505 .columns()
506 .next()
507 .and_then(|c| unsafe { c.as_slice::<i64>() })
508 .and_then(|s| s.first().copied());
509 if let (Some(&rn), Some(v)) = (row_numbers.first(), first_int8) {
510 let row_key = format!("row_{}", rn);
511 let shape = RowShape::testing(&[Type::Int8]);
512 let mut encoded = shape.allocate();
513 shape.set_values(&mut encoded, &[Value::Int8(v)]);
514 ctx.state().set(&row_key.into_encoded_key(), &encoded)?;
515 }
516 }
517 }
518 forward_diffs_passthrough(ctx, &input)
519 }
520
521 fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
522 Ok(())
523 }
524 }
525
526 fn forward_diffs_passthrough(ctx: &mut OperatorContext, input: &BorrowedChange<'_>) -> Result<()> {
531 let mut builder = ctx.builder();
532 for diff in input.diffs() {
533 match diff.kind() {
534 DiffType::Insert => {
535 let (cols, names) = clone_columns(&mut builder, diff.post())?;
536 let post: Vec<CommittedColumn> = cols;
537 let post_names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
538 let row_numbers: Vec<RowNumber> =
539 diff.post().row_numbers().iter().copied().map(RowNumber).collect();
540 let _ = post; builder.emit_insert(&post, &post_names, &row_numbers)?;
542 }
543 DiffType::Update => {
544 let (pre_cols, pre_names) = clone_columns(&mut builder, diff.pre())?;
545 let (post_cols, post_names) = clone_columns(&mut builder, diff.post())?;
546 let pre_names: Vec<&str> = pre_names.iter().map(|s| s.as_str()).collect();
547 let post_names: Vec<&str> = post_names.iter().map(|s| s.as_str()).collect();
548 let pre_row_count = diff.pre().row_count();
549 let post_row_count = diff.post().row_count();
550 let pre_row_numbers: Vec<RowNumber> =
551 diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
552 let post_row_numbers: Vec<RowNumber> =
553 diff.post().row_numbers().iter().copied().map(RowNumber).collect();
554 builder.emit_update(
555 &pre_cols,
556 &pre_names,
557 pre_row_count,
558 &pre_row_numbers,
559 &post_cols,
560 &post_names,
561 post_row_count,
562 &post_row_numbers,
563 )?;
564 }
565 DiffType::Remove => {
566 let (cols, names) = clone_columns(&mut builder, diff.pre())?;
567 let names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
568 let row_numbers: Vec<RowNumber> =
569 diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
570 builder.emit_remove(&cols, &names, &row_numbers)?;
571 }
572 }
573 }
574 let _ = EmitDiffKind::Insert;
576 Ok(())
577 }
578
579 fn clone_columns(
582 builder: &mut ColumnsBuilder<'_>,
583 cols: BorrowedColumns<'_>,
584 ) -> Result<(Vec<CommittedColumn>, Vec<String>)> {
585 let row_count = cols.row_count();
586 let mut committed: Vec<CommittedColumn> = Vec::new();
587 let mut names: Vec<String> = Vec::new();
588 for col in cols.columns() {
589 let type_code = col.type_code();
590 let bytes = col.data_bytes();
591 let active = builder.acquire(type_code, row_count.max(1))?;
592 active.grow(bytes.len().max(row_count))?;
593 let dst = active.data_ptr();
594 if !dst.is_null() && !bytes.is_empty() {
595 unsafe {
596 core::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, bytes.len());
597 }
598 }
599 if matches!(type_code, ColumnTypeCode::Utf8 | ColumnTypeCode::Blob) {
601 let off = col.offsets();
602 let dst_off = active.offsets_ptr();
603 if !dst_off.is_null() && !off.is_empty() {
604 unsafe {
605 core::ptr::copy_nonoverlapping(off.as_ptr(), dst_off, off.len());
606 }
607 }
608 }
609 let c = active.commit(row_count)?;
610 committed.push(c);
611 names.push(col.name().to_string());
612 }
613 Ok((committed, names))
614 }
615
616 #[test]
617 fn test_operator_metadata() {
618 TestMetadataHarness::assert_name::<TestOperator>("test_operator");
619 TestMetadataHarness::assert_api::<TestOperator>(1);
620 TestMetadataHarness::assert_version::<TestOperator>("1.0.0");
621 }
622
623 #[test]
624 fn test_harness_builder() {
625 let result = TestHarnessBuilder::<TestOperator>::new()
626 .with_node_id(FlowNodeId(42))
627 .with_version(CommitVersion(10))
628 .add_config("key", Value::Utf8("value".into()))
629 .build();
630
631 assert!(result.is_ok());
632
633 let harness = result.unwrap();
634 assert_eq!(harness.node_id, 42);
635 assert_eq!(harness.version(), 10);
636 }
637
638 #[test]
639 fn test_harness_with_stateful_operator() {
640 let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
642 .with_node_id(FlowNodeId(1))
643 .build()
644 .expect("Failed to build harness");
645
646 let input = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(42i64)]).build();
648
649 let output = harness.apply(input).expect("Apply failed");
651
652 assert_eq!(output.diffs.len(), 1);
654
655 let state = harness.state();
657 let shape = RowShape::testing(&[Type::Int8]);
658 let key = encode_key("row_1");
659
660 state.assert_value(&key, &[Value::Int8(42i64)], &shape);
662 }
663
664 #[test]
665 fn test_harness_history_index() {
666 let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
667 .with_node_id(FlowNodeId(1))
668 .build()
669 .expect("Failed to build harness");
670
671 assert_eq!(harness.history_len(), 0);
673 assert!(harness.last_change().is_none());
674
675 let input_a = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(1i64)]).build();
677 harness.apply(input_a).expect("apply a failed");
678 assert_eq!(harness.history_len(), 1);
679
680 let input_b = TestChangeBuilder::new().insert_row(2, vec![Value::Int8(2i64)]).build();
681 harness.apply(input_b).expect("apply b failed");
682 assert_eq!(harness.history_len(), 2);
683
684 assert_eq!(harness[0].diffs.len(), 1);
686 assert_eq!(harness[1].diffs.len(), 1);
687
688 harness.insert(TestRowBuilder::new(3).add_value(Value::Int8(3i64)).build());
690 assert_eq!(harness.history_len(), 3);
691
692 assert!(harness.last_change().is_some());
694
695 let state_count_before = harness.state().len();
697 harness.clear_history();
698 assert_eq!(harness.history_len(), 0);
699 assert!(harness.last_change().is_none());
700 assert_eq!(harness.state().len(), state_count_before);
701 }
702
703 #[test]
704 fn test_harness_multiple_operations() {
705 let mut harness =
706 TestHarnessBuilder::<StatefulTestOperator>::new().build().expect("Failed to build harness");
707
708 let input1 = TestChangeBuilder::new()
710 .insert_row(1, vec![Value::Int8(10i64)])
711 .insert_row(2, vec![Value::Int8(20i64)])
712 .build();
713
714 harness.apply(input1).expect("First apply failed");
715
716 let state = harness.state();
717 assert_eq!(state.len(), 2);
718
719 let input2 = TestChangeBuilder::new().insert_row(RowNumber(3), vec![Value::Int8(30i64)]).build();
721
722 harness.apply(input2).expect("Second apply failed");
723
724 let state = harness.state();
726 let shape = RowShape::testing(&[Type::Int8]);
727
728 state.assert_value(&encode_key("row_1"), &[Value::Int8(10i64)], &shape);
729 state.assert_value(&encode_key("row_2"), &[Value::Int8(20i64)], &shape);
730 state.assert_value(&encode_key("row_3"), &[Value::Int8(30i64)], &shape);
731
732 assert_eq!(state.len(), 3);
734 }
735}