reifydb_runtime/actor/
testing.rs1use std::{collections::VecDeque, marker::PhantomData};
5
6#[cfg(not(reifydb_single_threaded))]
7use crossbeam_channel::unbounded;
8
9#[cfg(not(reifydb_single_threaded))]
10use crate::actor::mailbox::ActorRef;
11#[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
12use crate::actor::mailbox::create_actor_ref;
13#[cfg(reifydb_target = "dst")]
14use crate::actor::mailbox::create_dst_mailbox;
15#[cfg(reifydb_target = "dst")]
16use crate::context::clock::MockClock;
17use crate::{
18 actor::{
19 context::{CancellationToken, Context},
20 system::ActorSystem,
21 traits::{Actor, Directive},
22 },
23 context::clock::Clock,
24 pool::{PoolConfig, Pools},
25};
26
27pub struct TestHarness<A: Actor> {
28 actor: A,
29 state: A::State,
30 mailbox: VecDeque<A::Message>,
31 ctx: TestContext<A::Message>,
32}
33
34impl<A: Actor> TestHarness<A> {
35 pub fn new(actor: A) -> Self {
36 let ctx = TestContext::new();
37 let state = actor.init(&ctx.to_context());
38
39 Self {
40 actor,
41 state,
42 mailbox: VecDeque::new(),
43 ctx,
44 }
45 }
46
47 pub fn with_state(actor: A, state: A::State) -> Self {
48 let ctx = TestContext::new();
49
50 Self {
51 actor,
52 state,
53 mailbox: VecDeque::new(),
54 ctx,
55 }
56 }
57
58 pub fn send(&mut self, msg: A::Message) {
59 self.mailbox.push_back(msg);
60 }
61
62 pub fn process_one(&mut self) -> Option<Directive> {
63 let msg = self.mailbox.pop_front()?;
64 let flow = self.actor.handle(&mut self.state, msg, &self.ctx.to_context());
65 Some(flow)
66 }
67
68 pub fn process_all(&mut self) -> Vec<Directive> {
69 let mut flows = Vec::new();
70
71 while let Some(flow) = self.process_one() {
72 flows.push(flow);
73 if flow == Directive::Stop {
74 break;
75 }
76 }
77
78 flows
79 }
80
81 pub fn process_until<F>(&mut self, mut condition: F) -> Vec<Directive>
82 where
83 F: FnMut(&A::State) -> bool,
84 {
85 let mut flows = Vec::new();
86
87 while !self.mailbox.is_empty() {
88 if condition(&self.state) {
89 break;
90 }
91
92 if let Some(flow) = self.process_one() {
93 flows.push(flow);
94 if flow == Directive::Stop {
95 break;
96 }
97 }
98 }
99
100 flows
101 }
102
103 pub fn idle(&mut self) -> Directive {
104 self.actor.idle(&self.ctx.to_context())
105 }
106
107 pub fn post_stop(&mut self) {
108 self.actor.post_stop();
109 }
110
111 pub fn state(&self) -> &A::State {
112 &self.state
113 }
114
115 pub fn state_mut(&mut self) -> &mut A::State {
116 &mut self.state
117 }
118
119 pub fn is_empty(&self) -> bool {
120 self.mailbox.is_empty()
121 }
122
123 pub fn mailbox_len(&self) -> usize {
124 self.mailbox.len()
125 }
126
127 pub fn cancel(&mut self) {
128 self.ctx.cancel();
129 }
130
131 pub fn is_cancelled(&self) -> bool {
132 self.ctx.is_cancelled()
133 }
134}
135
136struct TestContext<M> {
137 cancel: CancellationToken,
138 _marker: PhantomData<M>,
139}
140
141impl<M: Send + 'static> TestContext<M> {
142 fn new() -> Self {
143 Self {
144 cancel: CancellationToken::new(),
145 _marker: PhantomData,
146 }
147 }
148
149 fn cancel(&self) {
150 self.cancel.cancel();
151 }
152
153 fn is_cancelled(&self) -> bool {
154 self.cancel.is_cancelled()
155 }
156
157 fn to_context(&self) -> Context<M> {
158 #[cfg(not(reifydb_single_threaded))]
159 let actor_ref = {
160 let (tx, _rx) = unbounded();
161 ActorRef::new(tx)
162 };
163
164 #[cfg(all(reifydb_single_threaded, not(reifydb_target = "dst")))]
165 let actor_ref = create_actor_ref();
166
167 #[cfg(reifydb_target = "dst")]
168 let actor_ref = {
169 let (actor_ref, _queue) = create_dst_mailbox();
170 actor_ref
171 };
172
173 let pools = Pools::new(PoolConfig::default());
174
175 #[cfg(reifydb_target = "dst")]
176 let clock = Clock::Mock(MockClock::new(0));
177 #[cfg(not(reifydb_target = "dst"))]
178 let clock = Clock::Real;
179
180 let system = ActorSystem::new(pools, clock);
181
182 Context::new(actor_ref, system, self.cancel.clone())
183 }
184}
185
186#[cfg(test)]
187mod tests {
188 use super::*;
189
190 struct CounterActor;
191
192 impl Actor for CounterActor {
193 type State = i64;
194 type Message = CounterMessage;
195
196 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {
197 0
198 }
199
200 fn handle(
201 &self,
202 state: &mut Self::State,
203 msg: Self::Message,
204 _ctx: &Context<Self::Message>,
205 ) -> Directive {
206 match msg {
207 CounterMessage::Inc => *state += 1,
208 CounterMessage::Dec => *state -= 1,
209 CounterMessage::Set(v) => *state = v,
210 CounterMessage::Stop => return Directive::Stop,
211 }
212 Directive::Continue
213 }
214
215 fn idle(&self, _ctx: &Context<Self::Message>) -> Directive {
216 Directive::Park
217 }
218 }
219
220 #[derive(Debug)]
221 enum CounterMessage {
222 Inc,
223 Dec,
224 Set(i64),
225 Stop,
226 }
227
228 #[test]
229 fn test_harness_basic() {
230 let mut harness = TestHarness::new(CounterActor);
231
232 harness.send(CounterMessage::Inc);
233 harness.send(CounterMessage::Inc);
234 harness.send(CounterMessage::Inc);
235
236 assert_eq!(harness.mailbox_len(), 3);
237
238 let flows = harness.process_all();
239
240 assert_eq!(flows.len(), 3);
241 assert!(flows.iter().all(|f| *f == Directive::Continue));
242 assert_eq!(*harness.state(), 3);
243 }
244
245 #[test]
246 fn test_harness_stops_on_stop() {
247 let mut harness = TestHarness::new(CounterActor);
248
249 harness.send(CounterMessage::Inc);
250 harness.send(CounterMessage::Stop);
251 harness.send(CounterMessage::Inc); let flows = harness.process_all();
254
255 assert_eq!(flows.len(), 2);
256 assert_eq!(flows[1], Directive::Stop);
257 assert_eq!(*harness.state(), 1);
258 assert_eq!(harness.mailbox_len(), 1); }
260
261 #[test]
262 fn test_harness_process_one() {
263 let mut harness = TestHarness::new(CounterActor);
264
265 harness.send(CounterMessage::Set(42));
266 harness.send(CounterMessage::Inc);
267
268 assert_eq!(harness.process_one(), Some(Directive::Continue));
269 assert_eq!(*harness.state(), 42);
270
271 assert_eq!(harness.process_one(), Some(Directive::Continue));
272 assert_eq!(*harness.state(), 43);
273
274 assert_eq!(harness.process_one(), None);
275 }
276
277 #[test]
278 fn test_harness_idle() {
279 let mut harness = TestHarness::new(CounterActor);
280
281 let flow = harness.idle();
282 assert_eq!(flow, Directive::Park);
283 }
284}