Skip to main content

reifydb_runtime/actor/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::VecDeque, marker::PhantomData};
5
6#[cfg(not(reifydb_single_threaded))]
7use crossbeam_channel::unbounded;
8
9#[cfg(not(reifydb_single_threaded))]
10use crate::actor::mailbox::ActorRef;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use crate::actor::mailbox::create_actor_ref;
13#[cfg(reifydb_target = "dst")]
14use crate::actor::mailbox::create_dst_mailbox;
15#[cfg(reifydb_target = "dst")]
16use crate::context::clock::MockClock;
17use crate::{
18	actor::{
19		context::{CancellationToken, Context},
20		system::ActorSystem,
21		traits::{Actor, Directive},
22	},
23	context::clock::Clock,
24	pool::{PoolConfig, Pools},
25};
26
27/// Test harness for synchronous actor testing.
28///
29/// This harness allows testing actors without spawning tasks:
30/// - Messages are queued in a local VecDeque
31/// - Processing is explicit via `process_one()` or `process_all()`
32/// - State is directly accessible for assertions
33pub struct TestHarness<A: Actor> {
34	actor: A,
35	state: A::State,
36	mailbox: VecDeque<A::Message>,
37	ctx: TestContext<A::Message>,
38}
39
40impl<A: Actor> TestHarness<A> {
41	/// Create a new test harness for the given actor.
42	pub fn new(actor: A) -> Self {
43		let ctx = TestContext::new();
44		let state = actor.init(&ctx.to_context());
45
46		Self {
47			actor,
48			state,
49			mailbox: VecDeque::new(),
50			ctx,
51		}
52	}
53
54	/// Create a new test harness with a pre-initialized state.
55	///
56	/// This is useful when you want to test specific state transitions
57	/// without going through the init process.
58	pub fn with_state(actor: A, state: A::State) -> Self {
59		let ctx = TestContext::new();
60
61		Self {
62			actor,
63			state,
64			mailbox: VecDeque::new(),
65			ctx,
66		}
67	}
68
69	/// Send a message to the actor's mailbox.
70	///
71	/// The message will be queued and processed when `process_one()`
72	/// or `process_all()` is called.
73	pub fn send(&mut self, msg: A::Message) {
74		self.mailbox.push_back(msg);
75	}
76
77	/// Process a single message from the mailbox.
78	///
79	/// Returns `Some(flow)` if a message was processed,
80	/// or `None` if the mailbox was empty.
81	pub fn process_one(&mut self) -> Option<Directive> {
82		let msg = self.mailbox.pop_front()?;
83		let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
84		Some(flow)
85	}
86
87	/// Process all messages in the mailbox.
88	///
89	/// Returns a Vec of all Directive values returned by handle().
90	/// Processing stops early if any handler returns `Directive::Stop`.
91	pub fn process_all(&mut self) -> Vec<Directive> {
92		let mut flows = Vec::new();
93
94		while let Some(flow) = self.process_one() {
95			flows.push(flow);
96			if flow == Directive::Stop {
97				break;
98			}
99		}
100
101		flows
102	}
103
104	/// Process messages until the mailbox is empty or a condition is met.
105	///
106	/// Returns the flows from all processed messages.
107	pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
108	where
109		F: FnMut(&A::State) -> bool,
110	{
111		let mut flows = Vec::new();
112
113		while !self.mailbox.is_empty() {
114			if condition(&self.state) {
115				break;
116			}
117
118			if let Some(flow) = self.process_one() {
119				flows.push(flow);
120				if flow == Directive::Stop {
121					break;
122				}
123			}
124		}
125
126		flows
127	}
128
129	/// Call the actor's idle hook.
130	///
131	/// This is useful for testing background work behavior.
132	pub fn idle(&mut self) -> Directive {
133		self.actor.idle(&self.ctx.to_context())
134	}
135
136	/// Call the actor's post_stop hook.
137	pub fn post_stop(&mut self) {
138		self.actor.post_stop();
139	}
140
141	/// Get a reference to the actor's state.
142	pub fn state(&self) -> &A::State {
143		&self.state
144	}
145
146	/// Get a mutable reference to the actor's state.
147	pub fn state_mut(&mut self) -> &mut A::State {
148		&mut self.state
149	}
150
151	/// Check if the mailbox is empty.
152	pub fn is_empty(&self) -> bool {
153		self.mailbox.is_empty()
154	}
155
156	/// Get the number of messages in the mailbox.
157	pub fn mailbox_len(&self) -> usize {
158		self.mailbox.len()
159	}
160
161	/// Signal cancellation.
162	pub fn cancel(&mut self) {
163		self.ctx.cancel();
164	}
165
166	/// Check if cancelled.
167	pub fn is_cancelled(&self) -> bool {
168		self.ctx.is_cancelled()
169	}
170}
171
172/// Test context that doesn't require a real runtime.
173struct TestContext<M> {
174	cancel: CancellationToken,
175	_marker: PhantomData<M>,
176}
177
178impl<M: Send + 'static> TestContext<M> {
179	fn new() -> Self {
180		Self {
181			cancel: CancellationToken::new(),
182			_marker: PhantomData,
183		}
184	}
185
186	fn cancel(&self) {
187		self.cancel.cancel();
188	}
189
190	fn is_cancelled(&self) -> bool {
191		self.cancel.is_cancelled()
192	}
193
194	/// Convert to a Context.
195	///
196	/// Note: The ActorRef in this context is not usable for sending
197	/// messages in tests. Use `harness.send()` instead.
198	fn to_context(&self) -> Context<M> {
199		// Create a dummy actor ref using platform-specific implementation
200		#[cfg(not(reifydb_single_threaded))]
201		let actor_ref = {
202			let (tx, _rx) = unbounded();
203			ActorRef::new(tx)
204		};
205
206		#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
207		let actor_ref = create_actor_ref();
208
209		#[cfg(reifydb_target = "dst")]
210		let actor_ref = {
211			let (actor_ref, _queue) = create_dst_mailbox();
212			actor_ref
213		};
214
215		// Create an actor system for testing
216		let pools = Pools::new(PoolConfig::default());
217
218		#[cfg(reifydb_target = "dst")]
219		let clock = Clock::Mock(MockClock::new(0));
220		#[cfg(not(reifydb_target = "dst"))]
221		let clock = Clock::Real;
222
223		let system = ActorSystem::new(pools, clock);
224
225		Context::new(actor_ref, system, self.cancel.clone())
226	}
227}
228
229#[cfg(test)]
230mod tests {
231	use super::*;
232
233	struct CounterActor;
234
235	impl Actor for CounterActor {
236		type State = i64;
237		type Message = CounterMessage;
238
239		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
240			0
241		}
242
243		fn handle(
244			&self,
245			state: &mut Self::State,
246			msg: Self::Message,
247			_ctx: &Context<Self::Message>,
248		) -> Directive {
249			match msg {
250				CounterMessage::Inc => *state += 1,
251				CounterMessage::Dec => *state -= 1,
252				CounterMessage::Set(v) => *state = v,
253				CounterMessage::Stop => return Directive::Stop,
254			}
255			Directive::Continue
256		}
257
258		fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
259			Directive::Park
260		}
261	}
262
263	#[derive(Debug)]
264	enum CounterMessage {
265		Inc,
266		Dec,
267		Set(i64),
268		Stop,
269	}
270
271	#[test]
272	fn test_harness_basic() {
273		let mut harness = TestHarness::new(CounterActor);
274
275		harness.send(CounterMessage::Inc);
276		harness.send(CounterMessage::Inc);
277		harness.send(CounterMessage::Inc);
278
279		assert_eq!(harness.mailbox_len(), 3);
280
281		let flows = harness.process_all();
282
283		assert_eq!(flows.len(), 3);
284		assert!(flows.iter().all(|f| *f == Directive::Continue));
285		assert_eq!(*harness.state(), 3);
286	}
287
288	#[test]
289	fn test_harness_stops_on_stop() {
290		let mut harness = TestHarness::new(CounterActor);
291
292		harness.send(CounterMessage::Inc);
293		harness.send(CounterMessage::Stop);
294		harness.send(CounterMessage::Inc); // Should not be processed
295
296		let flows = harness.process_all();
297
298		assert_eq!(flows.len(), 2);
299		assert_eq!(flows[1], Directive::Stop);
300		assert_eq!(*harness.state(), 1);
301		assert_eq!(harness.mailbox_len(), 1); // One message left
302	}
303
304	#[test]
305	fn test_harness_process_one() {
306		let mut harness = TestHarness::new(CounterActor);
307
308		harness.send(CounterMessage::Set(42));
309		harness.send(CounterMessage::Inc);
310
311		assert_eq!(harness.process_one(), Some(Directive::Continue));
312		assert_eq!(*harness.state(), 42);
313
314		assert_eq!(harness.process_one(), Some(Directive::Continue));
315		assert_eq!(*harness.state(), 43);
316
317		assert_eq!(harness.process_one(), None);
318	}
319
320	#[test]
321	fn test_harness_idle() {
322		let mut harness = TestHarness::new(CounterActor);
323
324		let flow = harness.idle();
325		assert_eq!(flow, Directive::Park);
326	}
327}