reifydb_runtime/actor/
testing.rs1use std::{collections::VecDeque, marker::PhantomData};
45
46#[cfg(reifydb_target = "native")]
47use crossbeam_channel::unbounded;
48
49#[cfg(reifydb_target = "native")]
50use crate::actor::mailbox::ActorRef;
51#[cfg(reifydb_target = "wasm")]
52use crate::actor::mailbox::create_actor_ref;
53use crate::{
54 SharedRuntimeConfig,
55 actor::{
56 context::{CancellationToken, Context},
57 system::ActorSystem,
58 traits::{Actor, Directive},
59 },
60};
61
62pub struct TestHarness<A: Actor> {
69 actor: A,
70 state: A::State,
71 mailbox: VecDeque<A::Message>,
72 ctx: TestContext<A::Message>,
73}
74
75impl<A: Actor> TestHarness<A> {
76 pub fn new(actor: A) -> Self {
78 let ctx = TestContext::new();
79 let state = actor.init(&ctx.to_context());
80
81 Self {
82 actor,
83 state,
84 mailbox: VecDeque::new(),
85 ctx,
86 }
87 }
88
89 pub fn with_state(actor: A, state: A::State) -> Self {
94 let ctx = TestContext::new();
95
96 Self {
97 actor,
98 state,
99 mailbox: VecDeque::new(),
100 ctx,
101 }
102 }
103
104 pub fn send(&mut self, msg: A::Message) {
109 self.mailbox.push_back(msg);
110 }
111
112 pub fn process_one(&mut self) -> Option<Directive> {
117 let msg = self.mailbox.pop_front()?;
118 let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
119 Some(flow)
120 }
121
122 pub fn process_all(&mut self) -> Vec<Directive> {
127 let mut flows = Vec::new();
128
129 while let Some(flow) = self.process_one() {
130 flows.push(flow);
131 if flow == Directive::Stop {
132 break;
133 }
134 }
135
136 flows
137 }
138
139 pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
143 where
144 F: FnMut(&A::State) -> bool,
145 {
146 let mut flows = Vec::new();
147
148 while !self.mailbox.is_empty() {
149 if condition(&self.state) {
150 break;
151 }
152
153 if let Some(flow) = self.process_one() {
154 flows.push(flow);
155 if flow == Directive::Stop {
156 break;
157 }
158 }
159 }
160
161 flows
162 }
163
164 pub fn idle(&mut self) -> Directive {
168 self.actor.idle(&self.ctx.to_context())
169 }
170
171 pub fn post_stop(&mut self) {
173 self.actor.post_stop();
174 }
175
176 pub fn state(&self) -> &A::State {
178 &self.state
179 }
180
181 pub fn state_mut(&mut self) -> &mut A::State {
183 &mut self.state
184 }
185
186 pub fn is_empty(&self) -> bool {
188 self.mailbox.is_empty()
189 }
190
191 pub fn mailbox_len(&self) -> usize {
193 self.mailbox.len()
194 }
195
196 pub fn cancel(&mut self) {
198 self.ctx.cancel();
199 }
200
201 pub fn is_cancelled(&self) -> bool {
203 self.ctx.is_cancelled()
204 }
205}
206
207struct TestContext<M> {
209 cancel: CancellationToken,
210 _marker: PhantomData<M>,
211}
212
213impl<M: Send + 'static> TestContext<M> {
214 fn new() -> Self {
215 Self {
216 cancel: CancellationToken::new(),
217 _marker: PhantomData,
218 }
219 }
220
221 fn cancel(&self) {
222 self.cancel.cancel();
223 }
224
225 fn is_cancelled(&self) -> bool {
226 self.cancel.is_cancelled()
227 }
228
229 fn to_context(&self) -> Context<M> {
234 #[cfg(reifydb_target = "native")]
236 let actor_ref = {
237 let (tx, _rx) = unbounded();
238 ActorRef::new(tx)
239 };
240
241 #[cfg(reifydb_target = "wasm")]
242 let actor_ref = create_actor_ref();
243
244 let system = ActorSystem::new(SharedRuntimeConfig::default().actor_system_config());
246
247 Context::new(actor_ref, system, self.cancel.clone())
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 struct CounterActor;
256
257 impl Actor for CounterActor {
258 type State = i64;
259 type Message = CounterMsg;
260
261 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
262 0
263 }
264
265 fn handle(
266 &self,
267 state: &mut Self::State,
268 msg: Self::Message,
269 _ctx: &Context<Self::Message>,
270 ) -> Directive {
271 match msg {
272 CounterMsg::Inc => *state += 1,
273 CounterMsg::Dec => *state -= 1,
274 CounterMsg::Set(v) => *state = v,
275 CounterMsg::Stop => return Directive::Stop,
276 }
277 Directive::Continue
278 }
279
280 fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
281 Directive::Park
282 }
283 }
284
285 #[derive(Debug)]
286 enum CounterMsg {
287 Inc,
288 Dec,
289 Set(i64),
290 Stop,
291 }
292
293 #[test]
294 fn test_harness_basic() {
295 let mut harness = TestHarness::new(CounterActor);
296
297 harness.send(CounterMsg::Inc);
298 harness.send(CounterMsg::Inc);
299 harness.send(CounterMsg::Inc);
300
301 assert_eq!(harness.mailbox_len(), 3);
302
303 let flows = harness.process_all();
304
305 assert_eq!(flows.len(), 3);
306 assert!(flows.iter().all(|f| *f == Directive::Continue));
307 assert_eq!(*harness.state(), 3);
308 }
309
310 #[test]
311 fn test_harness_stops_on_stop() {
312 let mut harness = TestHarness::new(CounterActor);
313
314 harness.send(CounterMsg::Inc);
315 harness.send(CounterMsg::Stop);
316 harness.send(CounterMsg::Inc); let flows = harness.process_all();
319
320 assert_eq!(flows.len(), 2);
321 assert_eq!(flows[1], Directive::Stop);
322 assert_eq!(*harness.state(), 1);
323 assert_eq!(harness.mailbox_len(), 1); }
325
326 #[test]
327 fn test_harness_process_one() {
328 let mut harness = TestHarness::new(CounterActor);
329
330 harness.send(CounterMsg::Set(42));
331 harness.send(CounterMsg::Inc);
332
333 assert_eq!(harness.process_one(), Some(Directive::Continue));
334 assert_eq!(*harness.state(), 42);
335
336 assert_eq!(harness.process_one(), Some(Directive::Continue));
337 assert_eq!(*harness.state(), 43);
338
339 assert_eq!(harness.process_one(), None);
340 }
341
342 #[test]
343 fn test_harness_idle() {
344 let mut harness = TestHarness::new(CounterActor);
345
346 let flow = harness.idle();
347 assert_eq!(flow, Directive::Park);
348 }
349}