reifydb_runtime/actor/
testing.rs1use std::{collections::VecDeque, marker::PhantomData};
5
6#[cfg(not(reifydb_single_threaded))]
7use crossbeam_channel::unbounded;
8
9#[cfg(not(reifydb_single_threaded))]
10use crate::actor::mailbox::ActorRef;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use crate::actor::mailbox::create_actor_ref;
13#[cfg(reifydb_target = "dst")]
14use crate::actor::mailbox::create_dst_mailbox;
15#[cfg(reifydb_target = "dst")]
16use crate::context::clock::MockClock;
17use crate::{
18 actor::{
19 context::{CancellationToken, Context},
20 system::ActorSystem,
21 traits::{Actor, Directive},
22 },
23 context::clock::Clock,
24 pool::{PoolConfig, Pools},
25};
26
27pub struct TestHarness<A: Actor> {
34 actor: A,
35 state: A::State,
36 mailbox: VecDeque<A::Message>,
37 ctx: TestContext<A::Message>,
38}
39
40impl<A: Actor> TestHarness<A> {
41 pub fn new(actor: A) -> Self {
43 let ctx = TestContext::new();
44 let state = actor.init(&ctx.to_context());
45
46 Self {
47 actor,
48 state,
49 mailbox: VecDeque::new(),
50 ctx,
51 }
52 }
53
54 pub fn with_state(actor: A, state: A::State) -> Self {
59 let ctx = TestContext::new();
60
61 Self {
62 actor,
63 state,
64 mailbox: VecDeque::new(),
65 ctx,
66 }
67 }
68
69 pub fn send(&mut self, msg: A::Message) {
74 self.mailbox.push_back(msg);
75 }
76
77 pub fn process_one(&mut self) -> Option<Directive> {
82 let msg = self.mailbox.pop_front()?;
83 let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
84 Some(flow)
85 }
86
87 pub fn process_all(&mut self) -> Vec<Directive> {
92 let mut flows = Vec::new();
93
94 while let Some(flow) = self.process_one() {
95 flows.push(flow);
96 if flow == Directive::Stop {
97 break;
98 }
99 }
100
101 flows
102 }
103
104 pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
108 where
109 F: FnMut(&A::State) -> bool,
110 {
111 let mut flows = Vec::new();
112
113 while !self.mailbox.is_empty() {
114 if condition(&self.state) {
115 break;
116 }
117
118 if let Some(flow) = self.process_one() {
119 flows.push(flow);
120 if flow == Directive::Stop {
121 break;
122 }
123 }
124 }
125
126 flows
127 }
128
129 pub fn idle(&mut self) -> Directive {
133 self.actor.idle(&self.ctx.to_context())
134 }
135
136 pub fn post_stop(&mut self) {
138 self.actor.post_stop();
139 }
140
141 pub fn state(&self) -> &A::State {
143 &self.state
144 }
145
146 pub fn state_mut(&mut self) -> &mut A::State {
148 &mut self.state
149 }
150
151 pub fn is_empty(&self) -> bool {
153 self.mailbox.is_empty()
154 }
155
156 pub fn mailbox_len(&self) -> usize {
158 self.mailbox.len()
159 }
160
161 pub fn cancel(&mut self) {
163 self.ctx.cancel();
164 }
165
166 pub fn is_cancelled(&self) -> bool {
168 self.ctx.is_cancelled()
169 }
170}
171
172struct TestContext<M> {
174 cancel: CancellationToken,
175 _marker: PhantomData<M>,
176}
177
178impl<M: Send + 'static> TestContext<M> {
179 fn new() -> Self {
180 Self {
181 cancel: CancellationToken::new(),
182 _marker: PhantomData,
183 }
184 }
185
186 fn cancel(&self) {
187 self.cancel.cancel();
188 }
189
190 fn is_cancelled(&self) -> bool {
191 self.cancel.is_cancelled()
192 }
193
194 fn to_context(&self) -> Context<M> {
199 #[cfg(not(reifydb_single_threaded))]
201 let actor_ref = {
202 let (tx, _rx) = unbounded();
203 ActorRef::new(tx)
204 };
205
206 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
207 let actor_ref = create_actor_ref();
208
209 #[cfg(reifydb_target = "dst")]
210 let actor_ref = {
211 let (actor_ref, _queue) = create_dst_mailbox();
212 actor_ref
213 };
214
215 let pools = Pools::new(PoolConfig::default());
217
218 #[cfg(reifydb_target = "dst")]
219 let clock = Clock::Mock(MockClock::new(0));
220 #[cfg(not(reifydb_target = "dst"))]
221 let clock = Clock::Real;
222
223 let system = ActorSystem::new(pools, clock);
224
225 Context::new(actor_ref, system, self.cancel.clone())
226 }
227}
228
229#[cfg(test)]
230mod tests {
231 use super::*;
232
233 struct CounterActor;
234
235 impl Actor for CounterActor {
236 type State = i64;
237 type Message = CounterMessage;
238
239 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
240 0
241 }
242
243 fn handle(
244 &self,
245 state: &mut Self::State,
246 msg: Self::Message,
247 _ctx: &Context<Self::Message>,
248 ) -> Directive {
249 match msg {
250 CounterMessage::Inc => *state += 1,
251 CounterMessage::Dec => *state -= 1,
252 CounterMessage::Set(v) => *state = v,
253 CounterMessage::Stop => return Directive::Stop,
254 }
255 Directive::Continue
256 }
257
258 fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
259 Directive::Park
260 }
261 }
262
263 #[derive(Debug)]
264 enum CounterMessage {
265 Inc,
266 Dec,
267 Set(i64),
268 Stop,
269 }
270
271 #[test]
272 fn test_harness_basic() {
273 let mut harness = TestHarness::new(CounterActor);
274
275 harness.send(CounterMessage::Inc);
276 harness.send(CounterMessage::Inc);
277 harness.send(CounterMessage::Inc);
278
279 assert_eq!(harness.mailbox_len(), 3);
280
281 let flows = harness.process_all();
282
283 assert_eq!(flows.len(), 3);
284 assert!(flows.iter().all(|f| *f == Directive::Continue));
285 assert_eq!(*harness.state(), 3);
286 }
287
288 #[test]
289 fn test_harness_stops_on_stop() {
290 let mut harness = TestHarness::new(CounterActor);
291
292 harness.send(CounterMessage::Inc);
293 harness.send(CounterMessage::Stop);
294 harness.send(CounterMessage::Inc); let flows = harness.process_all();
297
298 assert_eq!(flows.len(), 2);
299 assert_eq!(flows[1], Directive::Stop);
300 assert_eq!(*harness.state(), 1);
301 assert_eq!(harness.mailbox_len(), 1); }
303
304 #[test]
305 fn test_harness_process_one() {
306 let mut harness = TestHarness::new(CounterActor);
307
308 harness.send(CounterMessage::Set(42));
309 harness.send(CounterMessage::Inc);
310
311 assert_eq!(harness.process_one(), Some(Directive::Continue));
312 assert_eq!(*harness.state(), 42);
313
314 assert_eq!(harness.process_one(), Some(Directive::Continue));
315 assert_eq!(*harness.state(), 43);
316
317 assert_eq!(harness.process_one(), None);
318 }
319
320 #[test]
321 fn test_harness_idle() {
322 let mut harness = TestHarness::new(CounterActor);
323
324 let flow = harness.idle();
325 assert_eq!(flow, Directive::Park);
326 }
327}