Skip to main content

mm1_test_rt/rt/
event.rs

1use std::fmt;
2use std::ops::Deref;
3
4use mm1_address::address::Address;
5use mm1_address::subnet::NetAddress;
6use mm1_common::errors::error_of::ErrorOf;
7use mm1_core::context::{BindErrorKind, ForkErrorKind, RecvErrorKind, SendErrorKind};
8use mm1_core::envelope::{Envelope, EnvelopeHeader};
9use mm1_proto_system::{SpawnErrorKind, StartErrorKind, WatchRef};
10use tokio::sync::oneshot;
11
12use super::{RuntimeError, TestContext};
13use crate::rt::{
14    ActorTaskOutcome, Event, EventKind, ForkTaskOutcome, MainActorOutcome, Query, query,
15};
16
17pub trait EventResolve<Outcome> {
18    fn resolve(self, outcome: Outcome);
19}
20
21pub trait EventResolveResult<T, E> {
22    fn resolve_ok(self, outcome_ok: T);
23    fn resolve_err(self, outcome_err: E);
24}
25
26pub trait EventResolveErrorOfKind<K> {
27    fn resolve_err_kind(self, outcome_err_kind: K);
28}
29
30impl<R, E> Deref for Event<R, E> {
31    type Target = E;
32
33    fn deref(&self) -> &Self::Target {
34        &self.kind
35    }
36}
37
38impl<R, K, O> EventResolve<O> for Event<R, K>
39where
40    K: Into<oneshot::Sender<O>>,
41{
42    fn resolve(self, outcome: O) {
43        let Self { kind, .. } = self;
44        let sender: oneshot::Sender<O> = kind.into();
45        let _ = sender.send(outcome);
46    }
47}
48impl<T, O, E> EventResolveResult<O, E> for T
49where
50    T: EventResolve<Result<O, E>>,
51{
52    fn resolve_ok(self, outcome_ok: O) {
53        self.resolve(Ok(outcome_ok));
54    }
55
56    fn resolve_err(self, outcome_err: E) {
57        self.resolve(Err(outcome_err));
58    }
59}
60
61impl<R, K> Event<R, K> {
62    pub fn convert<K1>(self) -> Result<Event<R, K1>, Self>
63    where
64        Event<R, K1>: TryFrom<Event<R, K>, Error = Self>,
65    {
66        self.try_into()
67    }
68
69    pub fn expect<K1>(self) -> Event<R, K1>
70    where
71        Event<R, K1>: TryFrom<Event<R, K>, Error = Self>,
72        K: fmt::Debug,
73    {
74        self.convert::<K1>()
75            .map_err(|actual| {
76                format!(
77                    "expected: {}; actual: {:?}",
78                    std::any::type_name::<K1>(),
79                    actual,
80                )
81            })
82            .unwrap()
83    }
84}
85
86impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, ActorTaskOutcome> {
87    type Error = Event<R, EventKind<R>>;
88
89    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
90        let Event { runtime, kind } = value;
91        match kind {
92            EventKind::Done(kind) => Ok(Event { runtime, kind }),
93            kind => Err(Event { runtime, kind }),
94        }
95    }
96}
97
98impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, MainActorOutcome> {
99    type Error = Event<R, EventKind<R>>;
100
101    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
102        let Event { runtime, kind } = value;
103        match kind {
104            EventKind::Done(ActorTaskOutcome::Main(kind)) => Ok(Event { runtime, kind }),
105            kind => Err(Event { runtime, kind }),
106        }
107    }
108}
109
110impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, ForkTaskOutcome> {
111    type Error = Event<R, EventKind<R>>;
112
113    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
114        let Event { runtime, kind } = value;
115        match kind {
116            EventKind::Done(ActorTaskOutcome::Fork(kind)) => Ok(Event { runtime, kind }),
117            kind => Err(Event { runtime, kind }),
118        }
119    }
120}
121
122impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, Query<R>> {
123    type Error = Event<R, EventKind<R>>;
124
125    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
126        let Event { runtime, kind } = value;
127        match kind {
128            EventKind::Query(kind) => Ok(Event { runtime, kind }),
129            kind => Err(Event { runtime, kind }),
130        }
131    }
132}
133
134impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Spawn<R>> {
135    type Error = Event<R, EventKind<R>>;
136
137    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
138        let Event { runtime, kind } = value;
139        match kind {
140            EventKind::Query(Query::Spawn(kind)) => Ok(Event { runtime, kind }),
141            kind => Err(Event { runtime, kind }),
142        }
143    }
144}
145impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Start<R>> {
146    type Error = Event<R, EventKind<R>>;
147
148    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
149        let Event { runtime, kind } = value;
150        match kind {
151            EventKind::Query(Query::Start(kind)) => Ok(Event { runtime, kind }),
152            kind => Err(Event { runtime, kind }),
153        }
154    }
155}
156
157impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Bind<NetAddress>> {
158    type Error = Event<R, EventKind<R>>;
159
160    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
161        let Event { runtime, kind } = value;
162        match kind {
163            EventKind::Query(Query::BindNetAddress(kind)) => Ok(Event { runtime, kind }),
164            kind => Err(Event { runtime, kind }),
165        }
166    }
167}
168
169impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Recv> {
170    type Error = Event<R, EventKind<R>>;
171
172    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
173        let Event { runtime, kind } = value;
174        match kind {
175            EventKind::Query(Query::Recv(kind)) => Ok(Event { runtime, kind }),
176            kind => Err(Event { runtime, kind }),
177        }
178    }
179}
180
181impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::RecvClose> {
182    type Error = Event<R, EventKind<R>>;
183
184    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
185        let Event { runtime, kind } = value;
186        match kind {
187            EventKind::Query(Query::RecvClose(kind)) => Ok(Event { runtime, kind }),
188            kind => Err(Event { runtime, kind }),
189        }
190    }
191}
192
193impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Fork<R>> {
194    type Error = Event<R, EventKind<R>>;
195
196    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
197        let Event { runtime, kind } = value;
198        match kind {
199            EventKind::Query(Query::Fork(kind)) => Ok(Event { runtime, kind }),
200            kind => Err(Event { runtime, kind }),
201        }
202    }
203}
204
205impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::ForkRun> {
206    type Error = Event<R, EventKind<R>>;
207
208    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
209        let Event { runtime, kind } = value;
210        match kind {
211            EventKind::Query(Query::ForkRun(kind)) => Ok(Event { runtime, kind }),
212            kind => Err(Event { runtime, kind }),
213        }
214    }
215}
216
217impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Quit> {
218    type Error = Event<R, EventKind<R>>;
219
220    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
221        let Event { runtime, kind } = value;
222        match kind {
223            EventKind::Query(Query::Quit(kind)) => Ok(Event { runtime, kind }),
224            kind => Err(Event { runtime, kind }),
225        }
226    }
227}
228
229impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Tell> {
230    type Error = Event<R, EventKind<R>>;
231
232    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
233        let Event { runtime, kind } = value;
234        match kind {
235            EventKind::Query(Query::Tell(kind)) => Ok(Event { runtime, kind }),
236            kind => Err(Event { runtime, kind }),
237        }
238    }
239}
240
241impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Watch> {
242    type Error = Event<R, EventKind<R>>;
243
244    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
245        let Event { runtime, kind } = value;
246        match kind {
247            EventKind::Query(Query::Watch(kind)) => Ok(Event { runtime, kind }),
248            kind => Err(Event { runtime, kind }),
249        }
250    }
251}
252
253impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Unwatch> {
254    type Error = Event<R, EventKind<R>>;
255
256    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
257        let Event { runtime, kind } = value;
258        match kind {
259            EventKind::Query(Query::Unwatch(kind)) => Ok(Event { runtime, kind }),
260            kind => Err(Event { runtime, kind }),
261        }
262    }
263}
264
265impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Link> {
266    type Error = Event<R, EventKind<R>>;
267
268    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
269        let Event { runtime, kind } = value;
270        match kind {
271            EventKind::Query(Query::Link(kind)) => Ok(Event { runtime, kind }),
272            kind => Err(Event { runtime, kind }),
273        }
274    }
275}
276
277impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Unlink> {
278    type Error = Event<R, EventKind<R>>;
279
280    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
281        let Event { runtime, kind } = value;
282        match kind {
283            EventKind::Query(Query::Unlink(kind)) => Ok(Event { runtime, kind }),
284            kind => Err(Event { runtime, kind }),
285        }
286    }
287}
288
289impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::SetTrapExit> {
290    type Error = Event<R, EventKind<R>>;
291
292    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
293        let Event { runtime, kind } = value;
294        match kind {
295            EventKind::Query(Query::SetTrapExit(kind)) => Ok(Event { runtime, kind }),
296            kind => Err(Event { runtime, kind }),
297        }
298    }
299}
300
301impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::InitDone> {
302    type Error = Event<R, EventKind<R>>;
303
304    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
305        let Event { runtime, kind } = value;
306        match kind {
307            EventKind::Query(Query::InitDone(kind)) => Ok(Event { runtime, kind }),
308            kind => Err(Event { runtime, kind }),
309        }
310    }
311}
312
313impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Exit> {
314    type Error = Event<R, EventKind<R>>;
315
316    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
317        let Event { runtime, kind } = value;
318        match kind {
319            EventKind::Query(Query::Exit(kind)) => Ok(Event { runtime, kind }),
320            kind => Err(Event { runtime, kind }),
321        }
322    }
323}
324
325impl<R> TryFrom<Event<R, EventKind<R>>> for Event<R, query::Kill> {
326    type Error = Event<R, EventKind<R>>;
327
328    fn try_from(value: Event<R, EventKind<R>>) -> Result<Self, Self::Error> {
329        let Event { runtime, kind } = value;
330        match kind {
331            EventKind::Query(Query::Kill(kind)) => Ok(Event { runtime, kind }),
332            kind => Err(Event { runtime, kind }),
333        }
334    }
335}
336
337impl<R> Event<R, query::Spawn<R>> {
338    pub fn take_runnable(self) -> (R, Event<R, query::Spawn<()>>) {
339        let Self { runtime, kind } = self;
340        let query::Spawn {
341            task_key,
342            runnable,
343            link,
344            outcome_tx,
345        } = kind;
346        let kind = query::Spawn {
347            task_key,
348            runnable: (),
349            link,
350            outcome_tx,
351        };
352        let event = Event { runtime, kind };
353        (runnable, event)
354    }
355}
356
357impl<R> Event<R, query::Tell> {
358    pub fn take_envelope(&mut self) -> Envelope {
359        let info = EnvelopeHeader::to_address(self.kind.envelope.header().to);
360        std::mem::replace(
361            &mut self.kind.envelope,
362            Envelope::new(info, ()).into_erased(),
363        )
364    }
365}
366
367impl<R> Event<R, query::Start<R>> {
368    pub fn take_runnable(self) -> (R, Event<R, query::Start<()>>) {
369        let Self { runtime, kind } = self;
370        let query::Start {
371            task_key,
372            runnable,
373            link,
374            start_timeout,
375            outcome_tx,
376        } = kind;
377        let kind = query::Start {
378            task_key,
379            runnable: (),
380            link,
381            start_timeout,
382            outcome_tx,
383        };
384        let event = Event { runtime, kind };
385        (runnable, event)
386    }
387}
388
389impl<R> Event<R, query::ForkRun> {
390    pub fn resolve(self) -> Event<R, query::PendingTask> {
391        let Self { runtime, kind } = self;
392        let query::ForkRun {
393            task_key,
394            address_lease,
395            task_fut,
396            outcome_tx,
397        } = kind;
398        let _ = outcome_tx.send(());
399
400        let kind = query::PendingTask {
401            task_key,
402            address_lease,
403            task_fut,
404        };
405        Event { runtime, kind }
406    }
407}
408
409impl<R> Event<R, query::PendingTask> {
410    pub async fn run(self) -> Result<(), RuntimeError> {
411        let Self { runtime, kind } = self;
412        let query::PendingTask {
413            task_key,
414            address_lease,
415            task_fut,
416        } = kind;
417        let mut runtime = runtime.lock().await;
418        runtime.add_task(task_key, address_lease, task_fut).await?;
419        Ok(())
420    }
421}
422
423impl<R> Event<R, query::Quit> {
424    pub async fn stop_tasks(self) -> Result<(), RuntimeError> {
425        let Self { runtime, kind } = self;
426        let runtime = runtime.lock().await;
427        let actor_entry = runtime
428            .entries
429            .get(&kind.task_key.actor)
430            .ok_or(RuntimeError::NoActor(kind.task_key.actor))?;
431        actor_entry.actor_canceled.notify_waiters();
432        Ok(())
433    }
434}
435
436impl<R> Event<R, MainActorOutcome> {
437    pub async fn remove_actor_entry(self) -> Result<(), RuntimeError> {
438        let Self { runtime, kind } = self;
439        let _ = runtime
440            .lock()
441            .await
442            .entries
443            .remove(&kind.address)
444            .ok_or(RuntimeError::NoActor(kind.address))?;
445        Ok(())
446    }
447}
448
449impl<R> From<query::Spawn<R>> for oneshot::Sender<Result<Address, ErrorOf<SpawnErrorKind>>> {
450    fn from(value: query::Spawn<R>) -> Self {
451        value.outcome_tx
452    }
453}
454
455impl<R> From<query::Start<R>> for oneshot::Sender<Result<Address, ErrorOf<StartErrorKind>>> {
456    fn from(value: query::Start<R>) -> Self {
457        value.outcome_tx
458    }
459}
460
461impl<A> From<query::Bind<A>> for oneshot::Sender<Result<(), ErrorOf<BindErrorKind>>> {
462    fn from(value: query::Bind<A>) -> Self {
463        value.outcome_tx
464    }
465}
466
467impl From<query::Recv> for oneshot::Sender<Result<Envelope, ErrorOf<RecvErrorKind>>> {
468    fn from(value: query::Recv) -> Self {
469        value.outcome_tx
470    }
471}
472impl From<query::RecvClose> for oneshot::Sender<()> {
473    fn from(value: query::RecvClose) -> Self {
474        value.outcome_tx
475    }
476}
477
478impl<R> From<query::Fork<R>> for oneshot::Sender<Result<TestContext<R>, ErrorOf<ForkErrorKind>>> {
479    fn from(value: query::Fork<R>) -> Self {
480        value.outcome_tx
481    }
482}
483
484impl From<query::Tell> for oneshot::Sender<Result<(), ErrorOf<SendErrorKind>>> {
485    fn from(value: query::Tell) -> Self {
486        value.outcome_tx
487    }
488}
489
490impl From<query::Watch> for oneshot::Sender<WatchRef> {
491    fn from(value: query::Watch) -> Self {
492        value.outcome_tx
493    }
494}
495impl From<query::Unwatch> for oneshot::Sender<()> {
496    fn from(value: query::Unwatch) -> Self {
497        value.outcome_tx
498    }
499}
500
501impl From<query::Link> for oneshot::Sender<()> {
502    fn from(value: query::Link) -> Self {
503        value.outcome_tx
504    }
505}
506
507impl From<query::Unlink> for oneshot::Sender<()> {
508    fn from(value: query::Unlink) -> Self {
509        value.outcome_tx
510    }
511}
512
513impl From<query::SetTrapExit> for oneshot::Sender<()> {
514    fn from(value: query::SetTrapExit) -> Self {
515        value.outcome_tx
516    }
517}
518
519impl From<query::InitDone> for oneshot::Sender<()> {
520    fn from(value: query::InitDone) -> Self {
521        value.outcome_tx
522    }
523}
524
525impl From<query::Exit> for oneshot::Sender<bool> {
526    fn from(value: query::Exit) -> Self {
527        value.outcome_tx
528    }
529}
530
531impl From<query::Kill> for oneshot::Sender<bool> {
532    fn from(value: query::Kill) -> Self {
533        value.outcome_tx
534    }
535}