1use std::future::Future;
2use std::sync::Arc;
3use std::time::Duration;
4
5use agner_actors::system_error::SysSpawnError;
6use agner_actors::{ActorID, Event, Exit, SpawnOpts, System};
7use tokio::sync::{mpsc, oneshot, Mutex};
8
9use crate::exited::Exited;
10use crate::query::{ExitRq, InitAckRq, NextEventRq, Query, SetLinkRq, SetTrapExitRq};
11use crate::TestActorRegistry;
12
13#[derive(Debug, Clone)]
14pub struct TestActor<M> {
15 pub(crate) system: System,
16 pub(crate) actor_id: ActorID,
17 pub(crate) exited: Arc<Mutex<Exited>>,
18 pub(crate) ctl_tx: mpsc::UnboundedSender<Query<M>>,
19}
20
21impl<M> TestActor<M> {
22 pub fn prepare_args(
23 registry: TestActorRegistry,
24 ) -> (crate::behaviour::Args<M>, impl Future<Output = Option<ActorID>>) {
25 let (init_ack_tx, init_ack_rx) = oneshot::channel();
26 let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
27 let args = crate::behaviour::Args::<M> { init_ack_tx, ctl_rx, ctl_tx, registry };
28
29 (args, async move { init_ack_rx.await.ok() })
30 }
31
32 pub async fn start(
33 registry: TestActorRegistry,
34 system: System,
35 spawn_opts: SpawnOpts,
36 ) -> Result<Self, SysSpawnError>
37 where
38 M: Send + Sync + Unpin + 'static,
39 {
40 let (init_ack_tx, init_ack_rx) = oneshot::channel();
41 let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
42 let actor_id = system
43 .spawn(
44 crate::behaviour::run::<M>,
45 crate::behaviour::Args {
46 init_ack_tx,
47 ctl_rx,
48 ctl_tx: ctl_tx.to_owned(),
49 registry: registry.to_owned(),
50 },
51 spawn_opts,
52 )
53 .await?;
54 let _ = init_ack_rx.await;
55
56 Ok(registry
57 .lookup(actor_id)
58 .await
59 .expect("Failed to lookup the actor in the registry"))
60 }
61
62 pub async fn wait(&self) -> Exit {
63 crate::exited::wait(self.exited.as_ref()).await
64 }
65
66 pub fn system(&self) -> &System {
67 &self.system
68 }
69
70 pub fn actor_id(&self) -> ActorID {
71 self.actor_id
72 }
73}
74
75impl<M> TestActor<M> {
76 pub async fn post_message(&self, message: M)
77 where
78 M: Send + Sync + Unpin + 'static,
79 {
80 self.system.send(self.actor_id, message).await
81 }
82
83 pub async fn init_ack(&self, value: Option<ActorID>) {
84 let (reply_on_drop, done) = oneshot::channel();
85 assert!(self.ctl_tx.send(InitAckRq { value, reply_on_drop }.into()).is_ok());
86 let _ = done.await;
87 }
88
89 pub async fn exit(&self, reason: Exit) {
90 let (reply_on_drop, done) = oneshot::channel();
91 assert!(self.ctl_tx.send(ExitRq { reason, reply_on_drop }.into()).is_ok());
92 let _ = done.await;
93 }
94
95 pub async fn set_trap_exit(&self, set_to: bool) {
96 let (reply_on_drop, done) = oneshot::channel();
97 assert!(self.ctl_tx.send(SetTrapExitRq { set_to, reply_on_drop }.into()).is_ok());
98 let _ = done.await;
99 }
100
101 pub async fn next_event(&self, timeout: Duration) -> Option<Event<M>> {
102 let (reply_to, done) = oneshot::channel();
103 assert!(self.ctl_tx.send(NextEventRq { timeout, reply_to }.into()).is_ok());
104 done.await.ok()
105 }
106
107 pub async fn set_link(&self, actor: ActorID, link: bool) {
108 let (reply_on_drop, done) = oneshot::channel();
109 assert!(self.ctl_tx.send(SetLinkRq { actor, link, reply_on_drop }.into()).is_ok());
110 let _ = done.await;
111 }
112}