1use std::{collections::HashMap, ffi::c_void, marker::PhantomData, ops::Index, ptr};
5
6use ptr::null;
7use reifydb_abi::context::context::ContextFFI;
8use reifydb_core::{
9 common::CommitVersion,
10 encoded::{key::EncodedKey, row::EncodedRow, shape::RowShape},
11 interface::{
12 catalog::flow::FlowNodeId,
13 change::{Change, ChangeOrigin},
14 },
15 key::EncodableKey,
16 row::Row,
17 value::column::columns::Columns,
18};
19use reifydb_type::{
20 util::cowvec::CowVec,
21 value::{Value, row_number::RowNumber},
22};
23
24use crate::{
25 error::Result,
26 ffi::arena::Arena,
27 operator::{FFIOperator, FFIOperatorMetadata, change::BorrowedChange, context::OperatorContext},
28 testing::{
29 builders::TestChangeBuilder,
30 callbacks::create_test_callbacks,
31 context::TestContext,
32 registry::{TestBuilderRegistry, into_diffs, with_registry},
33 state::TestStateStore,
34 },
35};
36
37pub struct OperatorTestHarness<T: FFIOperator> {
38 operator: T,
39 context: Box<TestContext>,
40 ffi_context: Box<ContextFFI>,
41 config: HashMap<String, Value>,
42 node_id: FlowNodeId,
43 history: Vec<Change>,
44
45 builder_registry: TestBuilderRegistry,
46
47 input_arena: Arena,
48}
49
50impl<T: FFIOperator> OperatorTestHarness<T> {
51 pub fn builder() -> TestHarnessBuilder<T> {
52 TestHarnessBuilder::new()
53 }
54
55 pub fn apply(&mut self, input: Change) -> Result<Change> {
56 let version = input.version;
57 let changed_at = input.changed_at;
58 let origin = input.origin.clone();
59
60 self.input_arena.clear();
61 let ffi_change = self.input_arena.marshal_change(&input);
62 let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
63
64 let result: Result<()> = with_registry(&self.builder_registry, || {
65 let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
66 let borrowed = unsafe { BorrowedChange::from_raw(&ffi_change as *const _) };
67 self.operator.apply(&mut op_ctx, borrowed)?;
68
69 self.operator.flush_state(&mut op_ctx)
70 });
71
72 drop(input);
73 result?;
74
75 let emitted = self.builder_registry.drain_diffs();
76 let diffs = into_diffs(emitted);
77 let output = match origin {
78 ChangeOrigin::Flow(node) => Change::from_flow(node, version, diffs, changed_at),
79 ChangeOrigin::Shape(_) => Change::from_flow(self.node_id, version, diffs, changed_at),
80 };
81 self.history.push(output.clone());
82 Ok(output)
83 }
84
85 pub fn insert(&mut self, row: Row) -> &mut Self {
86 let change = TestChangeBuilder::new().insert(row).build();
87 self.apply(change).expect("insert failed");
88 self
89 }
90
91 pub fn update(&mut self, pre: Row, post: Row) -> &mut Self {
92 let change = TestChangeBuilder::new().update(pre, post).build();
93 self.apply(change).expect("update failed");
94 self
95 }
96
97 pub fn remove(&mut self, row: Row) -> &mut Self {
98 let change = TestChangeBuilder::new().remove(row).build();
99 self.apply(change).expect("remove failed");
100 self
101 }
102
103 pub fn history_len(&self) -> usize {
104 self.history.len()
105 }
106
107 pub fn last_change(&self) -> Option<&Change> {
108 self.history.last()
109 }
110
111 pub fn clear_history(&mut self) {
112 self.history.clear();
113 }
114
115 pub fn pull(&mut self, row_numbers: &[RowNumber]) -> Result<Columns> {
116 let ffi_ctx_ptr = &mut *self.ffi_context as *mut ContextFFI;
117 let result: Result<()> = with_registry(&self.builder_registry, || {
118 let mut op_ctx = OperatorContext::new(ffi_ctx_ptr);
119 self.operator.pull(&mut op_ctx, row_numbers)?;
120 self.operator.flush_state(&mut op_ctx)
121 });
122 result?;
123
124 let mut emitted = self.builder_registry.drain_diffs();
125 let cols = if let Some(first) = emitted.drain(..).next() {
126 first.post.or(first.pre).unwrap_or_else(Columns::empty)
127 } else {
128 Columns::empty()
129 };
130 Ok(cols)
131 }
132
133 pub fn version(&self) -> CommitVersion {
134 (*self.context).version()
135 }
136
137 pub fn set_version(&mut self, version: CommitVersion) {
138 (*self.context).set_version(version);
139 }
140
141 pub fn state(&self) -> TestStateStore {
142 let store = self.context.state_store();
143 let data = store.lock().unwrap();
144 let mut result = TestStateStore::new();
145 for (k, v) in data.iter() {
146 result.set(k.clone(), v.clone());
147 }
148 result
149 }
150
151 pub fn assert_state<K>(&self, key: K, expected: Value)
152 where
153 K: EncodableKey,
154 {
155 let encoded_key = key.encode();
156 let store = self.state();
157 let shape = RowShape::testing(&[expected.get_type()]);
158
159 store.assert_value(&encoded_key, &[expected], &shape);
160 }
161
162 pub fn logs(&self) -> Vec<String> {
163 (*self.context).logs()
164 }
165
166 pub fn clear_logs(&self) {
167 (*self.context).clear_logs()
168 }
169
170 pub fn snapshot_state(&self) -> HashMap<EncodedKey, EncodedRow> {
171 self.state().snapshot()
172 }
173
174 pub fn restore_state(&mut self, snapshot: HashMap<EncodedKey, EncodedRow>) {
175 (*self.context).clear_state();
176 for (k, v) in snapshot {
177 (*self.context).set_state(k, v.0.to_vec());
178 }
179 }
180
181 pub fn reset(&mut self) -> Result<()> {
182 (*self.context).clear_state();
183 (*self.context).clear_logs();
184 (*self.context).set_version(CommitVersion(1));
185 self.history.clear();
186
187 self.operator = T::new(self.node_id, &self.config)?;
188 Ok(())
189 }
190
191 pub fn create_operator_context(&mut self) -> OperatorContext {
192 OperatorContext::new(&mut *self.ffi_context as *mut ContextFFI)
193 }
194
195 pub fn operator(&self) -> &T {
196 &self.operator
197 }
198
199 pub fn operator_mut(&mut self) -> &mut T {
200 &mut self.operator
201 }
202
203 pub fn node_id(&self) -> FlowNodeId {
204 self.node_id
205 }
206}
207
208impl<T: FFIOperator> Index<usize> for OperatorTestHarness<T> {
209 type Output = Change;
210
211 fn index(&self, index: usize) -> &Self::Output {
212 &self.history[index]
213 }
214}
215
216pub struct TestHarnessBuilder<T: FFIOperator> {
217 config: HashMap<String, Value>,
218 node_id: FlowNodeId,
219 version: CommitVersion,
220 initial_state: HashMap<EncodedKey, EncodedRow>,
221 _phantom: PhantomData<T>,
222}
223
224impl<T: FFIOperator> Default for TestHarnessBuilder<T> {
225 fn default() -> Self {
226 Self::new()
227 }
228}
229
230impl<T: FFIOperator> TestHarnessBuilder<T> {
231 pub fn new() -> Self {
232 Self {
233 config: HashMap::new(),
234 node_id: FlowNodeId(1),
235 version: CommitVersion(1),
236 initial_state: HashMap::new(),
237 _phantom: PhantomData,
238 }
239 }
240
241 pub fn with_config<I, K>(mut self, config: I) -> Self
242 where
243 I: IntoIterator<Item = (K, Value)>,
244 K: Into<String>,
245 {
246 self.config = config.into_iter().map(|(k, v)| (k.into(), v)).collect();
247 self
248 }
249
250 pub fn add_config(mut self, key: impl Into<String>, value: Value) -> Self {
251 self.config.insert(key.into(), value);
252 self
253 }
254
255 pub fn with_node_id(mut self, node_id: FlowNodeId) -> Self {
256 self.node_id = node_id;
257 self
258 }
259
260 pub fn with_version(mut self, version: CommitVersion) -> Self {
261 self.version = version;
262 self
263 }
264
265 pub fn with_initial_state<K>(mut self, key: K, value: Vec<u8>) -> Self
266 where
267 K: EncodableKey,
268 {
269 self.initial_state.insert(key.encode(), EncodedRow(CowVec::new(value)));
270 self
271 }
272
273 pub fn build(self) -> Result<OperatorTestHarness<T>> {
274 let context = Box::new(TestContext::new(self.version));
275
276 for (k, v) in self.initial_state {
277 context.set_state(k, v.0.to_vec());
278 }
279
280 let ffi_context = Box::new(ContextFFI {
281 txn_ptr: &*context as *const TestContext as *mut c_void,
282 executor_ptr: null(),
283 operator_id: self.node_id.0,
284 clock_now_nanos: 0,
285 callbacks: create_test_callbacks(),
286 });
287
288 let operator = T::new(self.node_id, &self.config)?;
289
290 Ok(OperatorTestHarness {
291 operator,
292 context,
293 ffi_context,
294 config: self.config,
295 node_id: self.node_id,
296 history: Vec::new(),
297 builder_registry: TestBuilderRegistry::new(),
298 input_arena: Arena::new(),
299 })
300 }
301}
302
303pub struct TestMetadataHarness;
304
305impl TestMetadataHarness {
306 pub fn assert_name<T: FFIOperatorMetadata>(expected: &str) {
307 assert_eq!(T::NAME, expected, "Operator name mismatch. Expected: {}, Actual: {}", expected, T::NAME);
308 }
309
310 pub fn assert_api<T: FFIOperatorMetadata>(expected: u32) {
311 assert_eq!(
312 T::API,
313 expected,
314 "Operator API version mismatch. Expected: {}, Actual: {}",
315 expected,
316 T::API
317 );
318 }
319
320 pub fn assert_version<T: FFIOperatorMetadata>(expected: &str) {
321 assert_eq!(
322 T::VERSION,
323 expected,
324 "Operator version mismatch. Expected: {}, Actual: {}",
325 expected,
326 T::VERSION
327 );
328 }
329}
330
331#[cfg(test)]
332pub mod tests {
333 use reifydb_abi::{
334 callbacks::builder::EmitDiffKind, data::column::ColumnTypeCode, flow::diff::DiffType,
335 operator::capabilities::CAPABILITY_ALL_STANDARD,
336 };
337 use reifydb_core::{common::CommitVersion, encoded::key::IntoEncodedKey, interface::catalog::flow::FlowNodeId};
338 use reifydb_type::value::row_number::RowNumber;
339
340 use super::{super::helpers::encode_key, *};
341 use crate::{
342 operator::{
343 FFIOperator, FFIOperatorMetadata,
344 builder::{ColumnsBuilder, CommittedColumn},
345 change::{BorrowedChange, BorrowedColumns},
346 column::operator::OperatorColumn,
347 context::OperatorContext,
348 },
349 testing::builders::{TestChangeBuilder, TestRowBuilder},
350 };
351
352 struct TestOperator {
354 _node_id: FlowNodeId,
355 _config: HashMap<String, Value>,
356 }
357
358 impl FFIOperatorMetadata for TestOperator {
359 const NAME: &'static str = "test_operator";
360 const API: u32 = 1;
361 const VERSION: &'static str = "1.0.0";
362 const DESCRIPTION: &'static str = "Simple pass-through test operator";
363 const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
364 const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
365 const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
366 }
367
368 impl FFIOperator for TestOperator {
369 fn new(operator_id: FlowNodeId, config: &HashMap<String, Value>) -> Result<Self> {
370 Ok(Self {
371 _node_id: operator_id,
372 _config: config.clone(),
373 })
374 }
375
376 fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
377 forward_diffs_passthrough(ctx, &input)
379 }
380
381 fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
382 Ok(())
383 }
384 }
385
386 struct StatefulTestOperator;
388
389 impl FFIOperatorMetadata for StatefulTestOperator {
390 const NAME: &'static str = "stateful_test_operator";
391 const API: u32 = 1;
392 const VERSION: &'static str = "1.0.0";
393 const DESCRIPTION: &'static str = "Stateful test operator that stores values";
394 const INPUT_COLUMNS: &'static [OperatorColumn] = &[];
395 const OUTPUT_COLUMNS: &'static [OperatorColumn] = &[];
396 const CAPABILITIES: u32 = CAPABILITY_ALL_STANDARD;
397 }
398
399 impl FFIOperator for StatefulTestOperator {
400 fn new(_operator_id: FlowNodeId, _config: &HashMap<String, Value>) -> Result<Self> {
401 Ok(Self)
402 }
403
404 fn apply(&mut self, ctx: &mut OperatorContext, input: BorrowedChange<'_>) -> Result<()> {
405 for diff in input.diffs() {
410 let post = match diff.kind() {
411 DiffType::Insert | DiffType::Update => Some(diff.post()),
412 DiffType::Remove => None,
413 };
414 if let Some(columns) = post {
415 let row_numbers = columns.row_numbers();
416 let first_int8 = columns
417 .columns()
418 .next()
419 .and_then(|c| unsafe { c.as_slice::<i64>() })
420 .and_then(|s| s.first().copied());
421 if let (Some(&rn), Some(v)) = (row_numbers.first(), first_int8) {
422 let row_key = format!("row_{}", rn);
423 ctx.state().set::<i64>(&row_key.into_encoded_key(), &v)?;
424 }
425 }
426 }
427 forward_diffs_passthrough(ctx, &input)
428 }
429
430 fn pull(&mut self, _ctx: &mut OperatorContext, _row_numbers: &[RowNumber]) -> Result<()> {
431 Ok(())
432 }
433 }
434
435 fn forward_diffs_passthrough(ctx: &mut OperatorContext, input: &BorrowedChange<'_>) -> Result<()> {
440 let mut builder = ctx.builder();
441 for diff in input.diffs() {
442 match diff.kind() {
443 DiffType::Insert => {
444 let (cols, names) = clone_columns(&mut builder, diff.post())?;
445 let post: Vec<CommittedColumn> = cols;
446 let post_names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
447 let row_numbers: Vec<RowNumber> =
448 diff.post().row_numbers().iter().copied().map(RowNumber).collect();
449 let _ = post; builder.emit_insert(&post, &post_names, &row_numbers)?;
451 }
452 DiffType::Update => {
453 let (pre_cols, pre_names) = clone_columns(&mut builder, diff.pre())?;
454 let (post_cols, post_names) = clone_columns(&mut builder, diff.post())?;
455 let pre_names: Vec<&str> = pre_names.iter().map(|s| s.as_str()).collect();
456 let post_names: Vec<&str> = post_names.iter().map(|s| s.as_str()).collect();
457 let pre_row_count = diff.pre().row_count();
458 let post_row_count = diff.post().row_count();
459 let pre_row_numbers: Vec<RowNumber> =
460 diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
461 let post_row_numbers: Vec<RowNumber> =
462 diff.post().row_numbers().iter().copied().map(RowNumber).collect();
463 builder.emit_update(
464 &pre_cols,
465 &pre_names,
466 pre_row_count,
467 &pre_row_numbers,
468 &post_cols,
469 &post_names,
470 post_row_count,
471 &post_row_numbers,
472 )?;
473 }
474 DiffType::Remove => {
475 let (cols, names) = clone_columns(&mut builder, diff.pre())?;
476 let names: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
477 let row_numbers: Vec<RowNumber> =
478 diff.pre().row_numbers().iter().copied().map(RowNumber).collect();
479 builder.emit_remove(&cols, &names, &row_numbers)?;
480 }
481 }
482 }
483 let _ = EmitDiffKind::Insert;
485 Ok(())
486 }
487
488 fn clone_columns(
491 builder: &mut ColumnsBuilder<'_>,
492 cols: BorrowedColumns<'_>,
493 ) -> Result<(Vec<CommittedColumn>, Vec<String>)> {
494 let row_count = cols.row_count();
495 let mut committed: Vec<CommittedColumn> = Vec::new();
496 let mut names: Vec<String> = Vec::new();
497 for col in cols.columns() {
498 let type_code = col.type_code();
499 let bytes = col.data_bytes();
500 let active = builder.acquire(type_code, row_count.max(1))?;
501 active.grow(bytes.len().max(row_count))?;
502 let dst = active.data_ptr();
503 if !dst.is_null() && !bytes.is_empty() {
504 unsafe {
505 core::ptr::copy_nonoverlapping(bytes.as_ptr(), dst, bytes.len());
506 }
507 }
508 if matches!(type_code, ColumnTypeCode::Utf8 | ColumnTypeCode::Blob) {
510 let off = col.offsets();
511 let dst_off = active.offsets_ptr();
512 if !dst_off.is_null() && !off.is_empty() {
513 unsafe {
514 core::ptr::copy_nonoverlapping(off.as_ptr(), dst_off, off.len());
515 }
516 }
517 }
518 let c = active.commit(row_count)?;
519 committed.push(c);
520 names.push(col.name().to_string());
521 }
522 Ok((committed, names))
523 }
524
525 #[test]
526 fn test_operator_metadata() {
527 TestMetadataHarness::assert_name::<TestOperator>("test_operator");
528 TestMetadataHarness::assert_api::<TestOperator>(1);
529 TestMetadataHarness::assert_version::<TestOperator>("1.0.0");
530 }
531
532 #[test]
533 fn test_harness_builder() {
534 let result = TestHarnessBuilder::<TestOperator>::new()
535 .with_node_id(FlowNodeId(42))
536 .with_version(CommitVersion(10))
537 .add_config("key", Value::Utf8("value".into()))
538 .build();
539
540 assert!(result.is_ok());
541
542 let harness = result.unwrap();
543 assert_eq!(harness.node_id, 42);
544 assert_eq!(harness.version(), 10);
545 }
546
547 #[test]
548 fn test_harness_with_stateful_operator() {
549 let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
551 .with_node_id(FlowNodeId(1))
552 .build()
553 .expect("Failed to build harness");
554
555 let input = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(42i64)]).build();
557
558 let output = harness.apply(input).expect("Apply failed");
560
561 assert_eq!(output.diffs.len(), 1);
563
564 let state = harness.state();
568 state.assert_typed_value::<i64>(&encode_key("row_1"), &42i64);
569 }
570
571 #[test]
572 fn test_harness_history_index() {
573 let mut harness = TestHarnessBuilder::<StatefulTestOperator>::new()
574 .with_node_id(FlowNodeId(1))
575 .build()
576 .expect("Failed to build harness");
577
578 assert_eq!(harness.history_len(), 0);
580 assert!(harness.last_change().is_none());
581
582 let input_a = TestChangeBuilder::new().insert_row(1, vec![Value::Int8(1i64)]).build();
584 harness.apply(input_a).expect("apply a failed");
585 assert_eq!(harness.history_len(), 1);
586
587 let input_b = TestChangeBuilder::new().insert_row(2, vec![Value::Int8(2i64)]).build();
588 harness.apply(input_b).expect("apply b failed");
589 assert_eq!(harness.history_len(), 2);
590
591 assert_eq!(harness[0].diffs.len(), 1);
593 assert_eq!(harness[1].diffs.len(), 1);
594
595 harness.insert(TestRowBuilder::new(3).add_value(Value::Int8(3i64)).build());
597 assert_eq!(harness.history_len(), 3);
598
599 assert!(harness.last_change().is_some());
601
602 let state_count_before = harness.state().len();
604 harness.clear_history();
605 assert_eq!(harness.history_len(), 0);
606 assert!(harness.last_change().is_none());
607 assert_eq!(harness.state().len(), state_count_before);
608 }
609
610 #[test]
611 fn test_harness_multiple_operations() {
612 let mut harness =
613 TestHarnessBuilder::<StatefulTestOperator>::new().build().expect("Failed to build harness");
614
615 let input1 = TestChangeBuilder::new()
617 .insert_row(1, vec![Value::Int8(10i64)])
618 .insert_row(2, vec![Value::Int8(20i64)])
619 .build();
620
621 harness.apply(input1).expect("First apply failed");
622
623 let state = harness.state();
624 assert_eq!(state.len(), 2);
625
626 let input2 = TestChangeBuilder::new().insert_row(RowNumber(3), vec![Value::Int8(30i64)]).build();
628
629 harness.apply(input2).expect("Second apply failed");
630
631 let state = harness.state();
633 state.assert_typed_value::<i64>(&encode_key("row_1"), &10i64);
634 state.assert_typed_value::<i64>(&encode_key("row_2"), &20i64);
635 state.assert_typed_value::<i64>(&encode_key("row_3"), &30i64);
636
637 assert_eq!(state.len(), 3);
639 }
640}