reifydb_runtime/actor/
testing.rs1use std::{collections::VecDeque, marker::PhantomData};
48
49#[cfg(not(reifydb_single_threaded))]
50use crossbeam_channel::unbounded;
51
52#[cfg(not(reifydb_single_threaded))]
53use crate::actor::mailbox::ActorRef;
54#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
55use crate::actor::mailbox::create_actor_ref;
56#[cfg(reifydb_target = "dst")]
57use crate::actor::mailbox::create_dst_mailbox;
58#[cfg(reifydb_target = "dst")]
59use crate::context::clock::MockClock;
60use crate::{
61 actor::{
62 context::{CancellationToken, Context},
63 system::ActorSystem,
64 traits::{Actor, Directive},
65 },
66 context::clock::Clock,
67 pool::{PoolConfig, Pools},
68};
69
70pub struct TestHarness<A: Actor> {
77 actor: A,
78 state: A::State,
79 mailbox: VecDeque<A::Message>,
80 ctx: TestContext<A::Message>,
81}
82
83impl<A: Actor> TestHarness<A> {
84 pub fn new(actor: A) -> Self {
86 let ctx = TestContext::new();
87 let state = actor.init(&ctx.to_context());
88
89 Self {
90 actor,
91 state,
92 mailbox: VecDeque::new(),
93 ctx,
94 }
95 }
96
97 pub fn with_state(actor: A, state: A::State) -> Self {
102 let ctx = TestContext::new();
103
104 Self {
105 actor,
106 state,
107 mailbox: VecDeque::new(),
108 ctx,
109 }
110 }
111
112 pub fn send(&mut self, msg: A::Message) {
117 self.mailbox.push_back(msg);
118 }
119
120 pub fn process_one(&mut self) -> Option<Directive> {
125 let msg = self.mailbox.pop_front()?;
126 let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
127 Some(flow)
128 }
129
130 pub fn process_all(&mut self) -> Vec<Directive> {
135 let mut flows = Vec::new();
136
137 while let Some(flow) = self.process_one() {
138 flows.push(flow);
139 if flow == Directive::Stop {
140 break;
141 }
142 }
143
144 flows
145 }
146
147 pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
151 where
152 F: FnMut(&A::State) -> bool,
153 {
154 let mut flows = Vec::new();
155
156 while !self.mailbox.is_empty() {
157 if condition(&self.state) {
158 break;
159 }
160
161 if let Some(flow) = self.process_one() {
162 flows.push(flow);
163 if flow == Directive::Stop {
164 break;
165 }
166 }
167 }
168
169 flows
170 }
171
172 pub fn idle(&mut self) -> Directive {
176 self.actor.idle(&self.ctx.to_context())
177 }
178
179 pub fn post_stop(&mut self) {
181 self.actor.post_stop();
182 }
183
184 pub fn state(&self) -> &A::State {
186 &self.state
187 }
188
189 pub fn state_mut(&mut self) -> &mut A::State {
191 &mut self.state
192 }
193
194 pub fn is_empty(&self) -> bool {
196 self.mailbox.is_empty()
197 }
198
199 pub fn mailbox_len(&self) -> usize {
201 self.mailbox.len()
202 }
203
204 pub fn cancel(&mut self) {
206 self.ctx.cancel();
207 }
208
209 pub fn is_cancelled(&self) -> bool {
211 self.ctx.is_cancelled()
212 }
213}
214
215struct TestContext<M> {
217 cancel: CancellationToken,
218 _marker: PhantomData<M>,
219}
220
221impl<M: Send + 'static> TestContext<M> {
222 fn new() -> Self {
223 Self {
224 cancel: CancellationToken::new(),
225 _marker: PhantomData,
226 }
227 }
228
229 fn cancel(&self) {
230 self.cancel.cancel();
231 }
232
233 fn is_cancelled(&self) -> bool {
234 self.cancel.is_cancelled()
235 }
236
237 fn to_context(&self) -> Context<M> {
242 #[cfg(not(reifydb_single_threaded))]
244 let actor_ref = {
245 let (tx, _rx) = unbounded();
246 ActorRef::new(tx)
247 };
248
249 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
250 let actor_ref = create_actor_ref();
251
252 #[cfg(reifydb_target = "dst")]
253 let actor_ref = {
254 let (actor_ref, _queue) = create_dst_mailbox();
255 actor_ref
256 };
257
258 let pools = Pools::new(PoolConfig::default());
260
261 #[cfg(reifydb_target = "dst")]
262 let clock = Clock::Mock(MockClock::new(0));
263 #[cfg(not(reifydb_target = "dst"))]
264 let clock = Clock::Real;
265
266 let system = ActorSystem::new(pools, clock);
267
268 Context::new(actor_ref, system, self.cancel.clone())
269 }
270}
271
272#[cfg(test)]
273mod tests {
274 use super::*;
275
276 struct CounterActor;
277
278 impl Actor for CounterActor {
279 type State = i64;
280 type Message = CounterMessage;
281
282 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
283 0
284 }
285
286 fn handle(
287 &self,
288 state: &mut Self::State,
289 msg: Self::Message,
290 _ctx: &Context<Self::Message>,
291 ) -> Directive {
292 match msg {
293 CounterMessage::Inc => *state += 1,
294 CounterMessage::Dec => *state -= 1,
295 CounterMessage::Set(v) => *state = v,
296 CounterMessage::Stop => return Directive::Stop,
297 }
298 Directive::Continue
299 }
300
301 fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
302 Directive::Park
303 }
304 }
305
306 #[derive(Debug)]
307 enum CounterMessage {
308 Inc,
309 Dec,
310 Set(i64),
311 Stop,
312 }
313
314 #[test]
315 fn test_harness_basic() {
316 let mut harness = TestHarness::new(CounterActor);
317
318 harness.send(CounterMessage::Inc);
319 harness.send(CounterMessage::Inc);
320 harness.send(CounterMessage::Inc);
321
322 assert_eq!(harness.mailbox_len(), 3);
323
324 let flows = harness.process_all();
325
326 assert_eq!(flows.len(), 3);
327 assert!(flows.iter().all(|f| *f == Directive::Continue));
328 assert_eq!(*harness.state(), 3);
329 }
330
331 #[test]
332 fn test_harness_stops_on_stop() {
333 let mut harness = TestHarness::new(CounterActor);
334
335 harness.send(CounterMessage::Inc);
336 harness.send(CounterMessage::Stop);
337 harness.send(CounterMessage::Inc); let flows = harness.process_all();
340
341 assert_eq!(flows.len(), 2);
342 assert_eq!(flows[1], Directive::Stop);
343 assert_eq!(*harness.state(), 1);
344 assert_eq!(harness.mailbox_len(), 1); }
346
347 #[test]
348 fn test_harness_process_one() {
349 let mut harness = TestHarness::new(CounterActor);
350
351 harness.send(CounterMessage::Set(42));
352 harness.send(CounterMessage::Inc);
353
354 assert_eq!(harness.process_one(), Some(Directive::Continue));
355 assert_eq!(*harness.state(), 42);
356
357 assert_eq!(harness.process_one(), Some(Directive::Continue));
358 assert_eq!(*harness.state(), 43);
359
360 assert_eq!(harness.process_one(), None);
361 }
362
363 #[test]
364 fn test_harness_idle() {
365 let mut harness = TestHarness::new(CounterActor);
366
367 let flow = harness.idle();
368 assert_eq!(flow, Directive::Park);
369 }
370}