agner_test_actor/
api.rs

1use std::future::Future;
2use std::sync::Arc;
3use std::time::Duration;
4
5use agner_actors::system_error::SysSpawnError;
6use agner_actors::{ActorID, Event, Exit, SpawnOpts, System};
7use tokio::sync::{mpsc, oneshot, Mutex};
8
9use crate::exited::Exited;
10use crate::query::{ExitRq, InitAckRq, NextEventRq, Query, SetLinkRq, SetTrapExitRq};
11use crate::TestActorRegistry;
12
13#[derive(Debug, Clone)]
14pub struct TestActor<M> {
15    pub(crate) system: System,
16    pub(crate) actor_id: ActorID,
17    pub(crate) exited: Arc<Mutex<Exited>>,
18    pub(crate) ctl_tx: mpsc::UnboundedSender<Query<M>>,
19}
20
21impl<M> TestActor<M> {
22    pub fn prepare_args(
23        registry: TestActorRegistry,
24    ) -> (crate::behaviour::Args<M>, impl Future<Output = Option<ActorID>>) {
25        let (init_ack_tx, init_ack_rx) = oneshot::channel();
26        let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
27        let args = crate::behaviour::Args::<M> { init_ack_tx, ctl_rx, ctl_tx, registry };
28
29        (args, async move { init_ack_rx.await.ok() })
30    }
31
32    pub async fn start(
33        registry: TestActorRegistry,
34        system: System,
35        spawn_opts: SpawnOpts,
36    ) -> Result<Self, SysSpawnError>
37    where
38        M: Send + Sync + Unpin + 'static,
39    {
40        let (init_ack_tx, init_ack_rx) = oneshot::channel();
41        let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
42        let actor_id = system
43            .spawn(
44                crate::behaviour::run::<M>,
45                crate::behaviour::Args {
46                    init_ack_tx,
47                    ctl_rx,
48                    ctl_tx: ctl_tx.to_owned(),
49                    registry: registry.to_owned(),
50                },
51                spawn_opts,
52            )
53            .await?;
54        let _ = init_ack_rx.await;
55
56        Ok(registry
57            .lookup(actor_id)
58            .await
59            .expect("Failed to lookup the actor in the registry"))
60    }
61
62    pub async fn wait(&self) -> Exit {
63        crate::exited::wait(self.exited.as_ref()).await
64    }
65
66    pub fn system(&self) -> &System {
67        &self.system
68    }
69
70    pub fn actor_id(&self) -> ActorID {
71        self.actor_id
72    }
73}
74
75impl<M> TestActor<M> {
76    pub async fn post_message(&self, message: M)
77    where
78        M: Send + Sync + Unpin + 'static,
79    {
80        self.system.send(self.actor_id, message).await
81    }
82
83    pub async fn init_ack(&self, value: Option<ActorID>) {
84        let (reply_on_drop, done) = oneshot::channel();
85        assert!(self.ctl_tx.send(InitAckRq { value, reply_on_drop }.into()).is_ok());
86        let _ = done.await;
87    }
88
89    pub async fn exit(&self, reason: Exit) {
90        let (reply_on_drop, done) = oneshot::channel();
91        assert!(self.ctl_tx.send(ExitRq { reason, reply_on_drop }.into()).is_ok());
92        let _ = done.await;
93    }
94
95    pub async fn set_trap_exit(&self, set_to: bool) {
96        let (reply_on_drop, done) = oneshot::channel();
97        assert!(self.ctl_tx.send(SetTrapExitRq { set_to, reply_on_drop }.into()).is_ok());
98        let _ = done.await;
99    }
100
101    pub async fn next_event(&self, timeout: Duration) -> Option<Event<M>> {
102        let (reply_to, done) = oneshot::channel();
103        assert!(self.ctl_tx.send(NextEventRq { timeout, reply_to }.into()).is_ok());
104        done.await.ok()
105    }
106
107    pub async fn set_link(&self, actor: ActorID, link: bool) {
108        let (reply_on_drop, done) = oneshot::channel();
109        assert!(self.ctl_tx.send(SetLinkRq { actor, link, reply_on_drop }.into()).is_ok());
110        let _ = done.await;
111    }
112}