reifydb_runtime/actor/
testing.rs1use std::{collections::VecDeque, marker::PhantomData};
45
46#[cfg(reifydb_target = "native")]
47use crate::actor::mailbox::ActorRef;
48use crate::{
49 SharedRuntimeConfig,
50 actor::{
51 context::{CancellationToken, Context},
52 system::ActorSystem,
53 traits::{Actor, Directive},
54 },
55};
56
57pub struct TestHarness<A: Actor> {
64 actor: A,
65 state: A::State,
66 mailbox: VecDeque<A::Message>,
67 ctx: TestContext<A::Message>,
68}
69
70impl<A: Actor> TestHarness<A> {
71 pub fn new(actor: A) -> Self {
73 let ctx = TestContext::new();
74 let state = actor.init(&ctx.to_context());
75
76 Self {
77 actor,
78 state,
79 mailbox: VecDeque::new(),
80 ctx,
81 }
82 }
83
84 pub fn with_state(actor: A, state: A::State) -> Self {
89 let ctx = TestContext::new();
90
91 Self {
92 actor,
93 state,
94 mailbox: VecDeque::new(),
95 ctx,
96 }
97 }
98
99 pub fn send(&mut self, msg: A::Message) {
104 self.mailbox.push_back(msg);
105 }
106
107 pub fn process_one(&mut self) -> Option<Directive> {
112 let msg = self.mailbox.pop_front()?;
113 let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
114 Some(flow)
115 }
116
117 pub fn process_all(&mut self) -> Vec<Directive> {
122 let mut flows = Vec::new();
123
124 while let Some(flow) = self.process_one() {
125 flows.push(flow);
126 if flow == Directive::Stop {
127 break;
128 }
129 }
130
131 flows
132 }
133
134 pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
138 where
139 F: FnMut(&A::State) -> bool,
140 {
141 let mut flows = Vec::new();
142
143 while !self.mailbox.is_empty() {
144 if condition(&self.state) {
145 break;
146 }
147
148 if let Some(flow) = self.process_one() {
149 flows.push(flow);
150 if flow == Directive::Stop {
151 break;
152 }
153 }
154 }
155
156 flows
157 }
158
159 pub fn idle(&mut self) -> Directive {
163 self.actor.idle(&self.ctx.to_context())
164 }
165
166 pub fn post_stop(&mut self) {
168 self.actor.post_stop();
169 }
170
171 pub fn state(&self) -> &A::State {
173 &self.state
174 }
175
176 pub fn state_mut(&mut self) -> &mut A::State {
178 &mut self.state
179 }
180
181 pub fn is_empty(&self) -> bool {
183 self.mailbox.is_empty()
184 }
185
186 pub fn mailbox_len(&self) -> usize {
188 self.mailbox.len()
189 }
190
191 pub fn cancel(&mut self) {
193 self.ctx.cancel();
194 }
195
196 pub fn is_cancelled(&self) -> bool {
198 self.ctx.is_cancelled()
199 }
200}
201
202struct TestContext<M> {
204 cancel: CancellationToken,
205 _marker: PhantomData<M>,
206}
207
208impl<M: Send + 'static> TestContext<M> {
209 fn new() -> Self {
210 Self {
211 cancel: CancellationToken::new(),
212 _marker: PhantomData,
213 }
214 }
215
216 fn cancel(&self) {
217 self.cancel.cancel();
218 }
219
220 fn is_cancelled(&self) -> bool {
221 self.cancel.is_cancelled()
222 }
223
224 fn to_context(&self) -> Context<M> {
229 #[cfg(reifydb_target = "native")]
231 let actor_ref = {
232 let (tx, _rx) = crossbeam_channel::unbounded();
233 ActorRef::new(tx)
234 };
235
236 #[cfg(reifydb_target = "wasm")]
237 let actor_ref = crate::actor::mailbox::create_actor_ref();
238
239 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
241
242 Context::new(actor_ref, system, self.cancel.clone())
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use super::*;
249
250 struct CounterActor;
251
252 impl Actor for CounterActor {
253 type State = i64;
254 type Message = CounterMsg;
255
256 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
257 0
258 }
259
260 fn handle(
261 &self,
262 state: &mut Self::State,
263 msg: Self::Message,
264 _ctx: &Context<Self::Message>,
265 ) -> Directive {
266 match msg {
267 CounterMsg::Inc => *state += 1,
268 CounterMsg::Dec => *state -= 1,
269 CounterMsg::Set(v) => *state = v,
270 CounterMsg::Stop => return Directive::Stop,
271 }
272 Directive::Continue
273 }
274
275 fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
276 Directive::Park
277 }
278 }
279
280 #[derive(Debug)]
281 enum CounterMsg {
282 Inc,
283 Dec,
284 Set(i64),
285 Stop,
286 }
287
288 #[test]
289 fn test_harness_basic() {
290 let mut harness = TestHarness::new(CounterActor);
291
292 harness.send(CounterMsg::Inc);
293 harness.send(CounterMsg::Inc);
294 harness.send(CounterMsg::Inc);
295
296 assert_eq!(harness.mailbox_len(), 3);
297
298 let flows = harness.process_all();
299
300 assert_eq!(flows.len(), 3);
301 assert!(flows.iter().all(|f| *f == Directive::Continue));
302 assert_eq!(*harness.state(), 3);
303 }
304
305 #[test]
306 fn test_harness_stops_on_stop() {
307 let mut harness = TestHarness::new(CounterActor);
308
309 harness.send(CounterMsg::Inc);
310 harness.send(CounterMsg::Stop);
311 harness.send(CounterMsg::Inc); let flows = harness.process_all();
314
315 assert_eq!(flows.len(), 2);
316 assert_eq!(flows[1], Directive::Stop);
317 assert_eq!(*harness.state(), 1);
318 assert_eq!(harness.mailbox_len(), 1); }
320
321 #[test]
322 fn test_harness_process_one() {
323 let mut harness = TestHarness::new(CounterActor);
324
325 harness.send(CounterMsg::Set(42));
326 harness.send(CounterMsg::Inc);
327
328 assert_eq!(harness.process_one(), Some(Directive::Continue));
329 assert_eq!(*harness.state(), 42);
330
331 assert_eq!(harness.process_one(), Some(Directive::Continue));
332 assert_eq!(*harness.state(), 43);
333
334 assert_eq!(harness.process_one(), None);
335 }
336
337 #[test]
338 fn test_harness_idle() {
339 let mut harness = TestHarness::new(CounterActor);
340
341 let flow = harness.idle();
342 assert_eq!(flow, Directive::Park);
343 }
344}