Skip to main content

reifydb_runtime/actor/
testing.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::{collections::VecDeque, marker::PhantomData};
5
6#[cfg(not(reifydb_single_threaded))]
7use crossbeam_channel::unbounded;
8
9#[cfg(not(reifydb_single_threaded))]
10use crate::actor::mailbox::ActorRef;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use crate::actor::mailbox::create_actor_ref;
13#[cfg(reifydb_target = "dst")]
14use crate::actor::mailbox::create_dst_mailbox;
15#[cfg(reifydb_target = "dst")]
16use crate::context::clock::MockClock;
17use crate::{
18	actor::{
19		context::{CancellationToken, Context},
20		system::ActorSystem,
21		traits::{Actor, Directive},
22	},
23	context::clock::Clock,
24	pool::{PoolConfig, Pools},
25};
26
27pub struct TestHarness<A: Actor> {
28	actor: A,
29	state: A::State,
30	mailbox: VecDeque<A::Message>,
31	ctx: TestContext<A::Message>,
32}
33
34impl<A: Actor> TestHarness<A> {
35	pub fn new(actor: A) -> Self {
36		let ctx = TestContext::new();
37		let state = actor.init(&ctx.to_context());
38
39		Self {
40			actor,
41			state,
42			mailbox: VecDeque::new(),
43			ctx,
44		}
45	}
46
47	pub fn with_state(actor: A, state: A::State) -> Self {
48		let ctx = TestContext::new();
49
50		Self {
51			actor,
52			state,
53			mailbox: VecDeque::new(),
54			ctx,
55		}
56	}
57
58	pub fn send(&mut self, msg: A::Message) {
59		self.mailbox.push_back(msg);
60	}
61
62	pub fn process_one(&mut self) -> Option<Directive> {
63		let msg = self.mailbox.pop_front()?;
64		let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
65		Some(flow)
66	}
67
68	pub fn process_all(&mut self) -> Vec<Directive> {
69		let mut flows = Vec::new();
70
71		while let Some(flow) = self.process_one() {
72			flows.push(flow);
73			if flow == Directive::Stop {
74				break;
75			}
76		}
77
78		flows
79	}
80
81	pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
82	where
83		F: FnMut(&A::State) -> bool,
84	{
85		let mut flows = Vec::new();
86
87		while !self.mailbox.is_empty() {
88			if condition(&self.state) {
89				break;
90			}
91
92			if let Some(flow) = self.process_one() {
93				flows.push(flow);
94				if flow == Directive::Stop {
95					break;
96				}
97			}
98		}
99
100		flows
101	}
102
103	pub fn idle(&mut self) -> Directive {
104		self.actor.idle(&self.ctx.to_context())
105	}
106
107	pub fn post_stop(&mut self) {
108		self.actor.post_stop();
109	}
110
111	pub fn state(&self) -> &A::State {
112		&self.state
113	}
114
115	pub fn state_mut(&mut self) -> &mut A::State {
116		&mut self.state
117	}
118
119	pub fn is_empty(&self) -> bool {
120		self.mailbox.is_empty()
121	}
122
123	pub fn mailbox_len(&self) -> usize {
124		self.mailbox.len()
125	}
126
127	pub fn cancel(&mut self) {
128		self.ctx.cancel();
129	}
130
131	pub fn is_cancelled(&self) -> bool {
132		self.ctx.is_cancelled()
133	}
134}
135
136struct TestContext<M> {
137	cancel: CancellationToken,
138	_marker: PhantomData<M>,
139}
140
141impl<M: Send + 'static> TestContext<M> {
142	fn new() -> Self {
143		Self {
144			cancel: CancellationToken::new(),
145			_marker: PhantomData,
146		}
147	}
148
149	fn cancel(&self) {
150		self.cancel.cancel();
151	}
152
153	fn is_cancelled(&self) -> bool {
154		self.cancel.is_cancelled()
155	}
156
157	fn to_context(&self) -> Context<M> {
158		#[cfg(not(reifydb_single_threaded))]
159		let actor_ref = {
160			let (tx, _rx) = unbounded();
161			ActorRef::new(tx)
162		};
163
164		#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
165		let actor_ref = create_actor_ref();
166
167		#[cfg(reifydb_target = "dst")]
168		let actor_ref = {
169			let (actor_ref, _queue) = create_dst_mailbox();
170			actor_ref
171		};
172
173		let pools = Pools::new(PoolConfig::default());
174
175		#[cfg(reifydb_target = "dst")]
176		let clock = Clock::Mock(MockClock::new(0));
177		#[cfg(not(reifydb_target = "dst"))]
178		let clock = Clock::Real;
179
180		let system = ActorSystem::new(pools, clock);
181
182		Context::new(actor_ref, system, self.cancel.clone())
183	}
184}
185
186#[cfg(test)]
187mod tests {
188	use super::*;
189
190	struct CounterActor;
191
192	impl Actor for CounterActor {
193		type State = i64;
194		type Message = CounterMessage;
195
196		fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
197			0
198		}
199
200		fn handle(
201			&self,
202			state: &mut Self::State,
203			msg: Self::Message,
204			_ctx: &Context<Self::Message>,
205		) -> Directive {
206			match msg {
207				CounterMessage::Inc => *state += 1,
208				CounterMessage::Dec => *state -= 1,
209				CounterMessage::Set(v) => *state = v,
210				CounterMessage::Stop => return Directive::Stop,
211			}
212			Directive::Continue
213		}
214
215		fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
216			Directive::Park
217		}
218	}
219
220	#[derive(Debug)]
221	enum CounterMessage {
222		Inc,
223		Dec,
224		Set(i64),
225		Stop,
226	}
227
228	#[test]
229	fn test_harness_basic() {
230		let mut harness = TestHarness::new(CounterActor);
231
232		harness.send(CounterMessage::Inc);
233		harness.send(CounterMessage::Inc);
234		harness.send(CounterMessage::Inc);
235
236		assert_eq!(harness.mailbox_len(), 3);
237
238		let flows = harness.process_all();
239
240		assert_eq!(flows.len(), 3);
241		assert!(flows.iter().all(|f| *f == Directive::Continue));
242		assert_eq!(*harness.state(), 3);
243	}
244
245	#[test]
246	fn test_harness_stops_on_stop() {
247		let mut harness = TestHarness::new(CounterActor);
248
249		harness.send(CounterMessage::Inc);
250		harness.send(CounterMessage::Stop);
251		harness.send(CounterMessage::Inc); // Should not be processed
252
253		let flows = harness.process_all();
254
255		assert_eq!(flows.len(), 2);
256		assert_eq!(flows[1], Directive::Stop);
257		assert_eq!(*harness.state(), 1);
258		assert_eq!(harness.mailbox_len(), 1); // One message left
259	}
260
261	#[test]
262	fn test_harness_process_one() {
263		let mut harness = TestHarness::new(CounterActor);
264
265		harness.send(CounterMessage::Set(42));
266		harness.send(CounterMessage::Inc);
267
268		assert_eq!(harness.process_one(), Some(Directive::Continue));
269		assert_eq!(*harness.state(), 42);
270
271		assert_eq!(harness.process_one(), Some(Directive::Continue));
272		assert_eq!(*harness.state(), 43);
273
274		assert_eq!(harness.process_one(), None);
275	}
276
277	#[test]
278	fn test_harness_idle() {
279		let mut harness = TestHarness::new(CounterActor);
280
281		let flow = harness.idle();
282		assert_eq!(flow, Directive::Park);
283	}
284}