Skip to main content

reifydb_runtime/actor/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Testing utilities for actors.
5//!
6//! This module provides a [`TestHarness`] for synchronous actor testing
7//! without spawning actual tasks or threads.
8//!
9//! # Example
10//!
11//! ```
12//! use reifydb_runtime::actor::{
13//! 	context::Context,
14//! 	testing::TestHarness,
15//! 	traits::{Actor, Directive},
16//! };
17//!
18//! struct Counter;
19//!
20//! impl Actor for Counter {
21//! 	type State = i64;
22//! 	type Message = i64;
23//!
24//! 	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
25//! 		0
26//! 	}
27//!
28//! 	fn handle(
29//! 		&self,
30//! 		state: &mut Self::State,
31//! 		msg: Self::Message,
32//! 		_ctx: &Context<Self::Message>,
33//! 	) -> Directive {
34//! 		*state += msg;
35//! 		Directive::Continue
36//! 	}
37//! }
38//!
39//! let mut harness = TestHarness::new(Counter);
40//! harness.send(5);
41//! harness.send(3);
42//! harness.process_all();
43//!
44//! assert_eq!(*harness.state(), 8);
45//! ```
46
47use std::{collections::VecDeque, marker::PhantomData};
48
49#[cfg(not(reifydb_single_threaded))]
50use crossbeam_channel::unbounded;
51
52#[cfg(not(reifydb_single_threaded))]
53use crate::actor::mailbox::ActorRef;
54#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
55use crate::actor::mailbox::create_actor_ref;
56#[cfg(reifydb_target = "dst")]
57use crate::actor::mailbox::create_dst_mailbox;
58#[cfg(reifydb_target = "dst")]
59use crate::context::clock::MockClock;
60use crate::{
61	actor::{
62		context::{CancellationToken, Context},
63		system::ActorSystem,
64		traits::{Actor, Directive},
65	},
66	context::clock::Clock,
67	pool::{PoolConfig, Pools},
68};
69
70/// Test harness for synchronous actor testing.
71///
72/// This harness allows testing actors without spawning tasks:
73/// - Messages are queued in a local VecDeque
74/// - Processing is explicit via `process_one()` or `process_all()`
75/// - State is directly accessible for assertions
76pub struct TestHarness<A: Actor> {
77	actor: A,
78	state: A::State,
79	mailbox: VecDeque<A::Message>,
80	ctx: TestContext<A::Message>,
81}
82
83impl<A: Actor> TestHarness<A> {
84	/// Create a new test harness for the given actor.
85	pub fn new(actor: A) -> Self {
86		let ctx = TestContext::new();
87		let state = actor.init(&ctx.to_context());
88
89		Self {
90			actor,
91			state,
92			mailbox: VecDeque::new(),
93			ctx,
94		}
95	}
96
97	/// Create a new test harness with a pre-initialized state.
98	///
99	/// This is useful when you want to test specific state transitions
100	/// without going through the init process.
101	pub fn with_state(actor: A, state: A::State) -> Self {
102		let ctx = TestContext::new();
103
104		Self {
105			actor,
106			state,
107			mailbox: VecDeque::new(),
108			ctx,
109		}
110	}
111
112	/// Send a message to the actor's mailbox.
113	///
114	/// The message will be queued and processed when `process_one()`
115	/// or `process_all()` is called.
116	pub fn send(&mut self, msg: A::Message) {
117		self.mailbox.push_back(msg);
118	}
119
120	/// Process a single message from the mailbox.
121	///
122	/// Returns `Some(flow)` if a message was processed,
123	/// or `None` if the mailbox was empty.
124	pub fn process_one(&mut self) -> Option<Directive> {
125		let msg = self.mailbox.pop_front()?;
126		let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
127		Some(flow)
128	}
129
130	/// Process all messages in the mailbox.
131	///
132	/// Returns a Vec of all Directive values returned by handle().
133	/// Processing stops early if any handler returns `Directive::Stop`.
134	pub fn process_all(&mut self) -> Vec<Directive> {
135		let mut flows = Vec::new();
136
137		while let Some(flow) = self.process_one() {
138			flows.push(flow);
139			if flow == Directive::Stop {
140				break;
141			}
142		}
143
144		flows
145	}
146
147	/// Process messages until the mailbox is empty or a condition is met.
148	///
149	/// Returns the flows from all processed messages.
150	pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
151	where
152		F: FnMut(&A::State) -> bool,
153	{
154		let mut flows = Vec::new();
155
156		while !self.mailbox.is_empty() {
157			if condition(&self.state) {
158				break;
159			}
160
161			if let Some(flow) = self.process_one() {
162				flows.push(flow);
163				if flow == Directive::Stop {
164					break;
165				}
166			}
167		}
168
169		flows
170	}
171
172	/// Call the actor's idle hook.
173	///
174	/// This is useful for testing background work behavior.
175	pub fn idle(&mut self) -> Directive {
176		self.actor.idle(&self.ctx.to_context())
177	}
178
179	/// Call the actor's post_stop hook.
180	pub fn post_stop(&mut self) {
181		self.actor.post_stop();
182	}
183
184	/// Get a reference to the actor's state.
185	pub fn state(&self) -> &A::State {
186		&self.state
187	}
188
189	/// Get a mutable reference to the actor's state.
190	pub fn state_mut(&mut self) -> &mut A::State {
191		&mut self.state
192	}
193
194	/// Check if the mailbox is empty.
195	pub fn is_empty(&self) -> bool {
196		self.mailbox.is_empty()
197	}
198
199	/// Get the number of messages in the mailbox.
200	pub fn mailbox_len(&self) -> usize {
201		self.mailbox.len()
202	}
203
204	/// Signal cancellation.
205	pub fn cancel(&mut self) {
206		self.ctx.cancel();
207	}
208
209	/// Check if cancelled.
210	pub fn is_cancelled(&self) -> bool {
211		self.ctx.is_cancelled()
212	}
213}
214
215/// Test context that doesn't require a real runtime.
216struct TestContext<M> {
217	cancel: CancellationToken,
218	_marker: PhantomData<M>,
219}
220
221impl<M: Send + 'static> TestContext<M> {
222	fn new() -> Self {
223		Self {
224			cancel: CancellationToken::new(),
225			_marker: PhantomData,
226		}
227	}
228
229	fn cancel(&self) {
230		self.cancel.cancel();
231	}
232
233	fn is_cancelled(&self) -> bool {
234		self.cancel.is_cancelled()
235	}
236
237	/// Convert to a Context.
238	///
239	/// Note: The ActorRef in this context is not usable for sending
240	/// messages in tests. Use `harness.send()` instead.
241	fn to_context(&self) -> Context<M> {
242		// Create a dummy actor ref using platform-specific implementation
243		#[cfg(not(reifydb_single_threaded))]
244		let actor_ref = {
245			let (tx, _rx) = unbounded();
246			ActorRef::new(tx)
247		};
248
249		#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
250		let actor_ref = create_actor_ref();
251
252		#[cfg(reifydb_target = "dst")]
253		let actor_ref = {
254			let (actor_ref, _queue) = create_dst_mailbox();
255			actor_ref
256		};
257
258		// Create an actor system for testing
259		let pools = Pools::new(PoolConfig::default());
260
261		#[cfg(reifydb_target = "dst")]
262		let clock = Clock::Mock(MockClock::new(0));
263		#[cfg(not(reifydb_target = "dst"))]
264		let clock = Clock::Real;
265
266		let system = ActorSystem::new(pools, clock);
267
268		Context::new(actor_ref, system, self.cancel.clone())
269	}
270}
271
272#[cfg(test)]
273mod tests {
274	use super::*;
275
276	struct CounterActor;
277
278	impl Actor for CounterActor {
279		type State = i64;
280		type Message = CounterMessage;
281
282		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
283			0
284		}
285
286		fn handle(
287			&self,
288			state: &mut Self::State,
289			msg: Self::Message,
290			_ctx: &Context<Self::Message>,
291		) -> Directive {
292			match msg {
293				CounterMessage::Inc => *state += 1,
294				CounterMessage::Dec => *state -= 1,
295				CounterMessage::Set(v) => *state = v,
296				CounterMessage::Stop => return Directive::Stop,
297			}
298			Directive::Continue
299		}
300
301		fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
302			Directive::Park
303		}
304	}
305
306	#[derive(Debug)]
307	enum CounterMessage {
308		Inc,
309		Dec,
310		Set(i64),
311		Stop,
312	}
313
314	#[test]
315	fn test_harness_basic() {
316		let mut harness = TestHarness::new(CounterActor);
317
318		harness.send(CounterMessage::Inc);
319		harness.send(CounterMessage::Inc);
320		harness.send(CounterMessage::Inc);
321
322		assert_eq!(harness.mailbox_len(), 3);
323
324		let flows = harness.process_all();
325
326		assert_eq!(flows.len(), 3);
327		assert!(flows.iter().all(|f| *f == Directive::Continue));
328		assert_eq!(*harness.state(), 3);
329	}
330
331	#[test]
332	fn test_harness_stops_on_stop() {
333		let mut harness = TestHarness::new(CounterActor);
334
335		harness.send(CounterMessage::Inc);
336		harness.send(CounterMessage::Stop);
337		harness.send(CounterMessage::Inc); // Should not be processed
338
339		let flows = harness.process_all();
340
341		assert_eq!(flows.len(), 2);
342		assert_eq!(flows[1], Directive::Stop);
343		assert_eq!(*harness.state(), 1);
344		assert_eq!(harness.mailbox_len(), 1); // One message left
345	}
346
347	#[test]
348	fn test_harness_process_one() {
349		let mut harness = TestHarness::new(CounterActor);
350
351		harness.send(CounterMessage::Set(42));
352		harness.send(CounterMessage::Inc);
353
354		assert_eq!(harness.process_one(), Some(Directive::Continue));
355		assert_eq!(*harness.state(), 42);
356
357		assert_eq!(harness.process_one(), Some(Directive::Continue));
358		assert_eq!(*harness.state(), 43);
359
360		assert_eq!(harness.process_one(), None);
361	}
362
363	#[test]
364	fn test_harness_idle() {
365		let mut harness = TestHarness::new(CounterActor);
366
367		let flow = harness.idle();
368		assert_eq!(flow, Directive::Park);
369	}
370}