1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;

use agner_actors::system_error::SysSpawnError;
use agner_actors::{ActorID, Event, Exit, SpawnOpts, System};
use tokio::sync::{mpsc, oneshot, Mutex};

use crate::exited::Exited;
use crate::query::{ExitRq, InitAckRq, NextEventRq, Query, SetLinkRq, SetTrapExitRq};
use crate::TestActorRegistry;

#[derive(Debug, Clone)]
pub struct TestActor<M> {
    pub(crate) system: System,
    pub(crate) actor_id: ActorID,
    pub(crate) exited: Arc<Mutex<Exited>>,
    pub(crate) ctl_tx: mpsc::UnboundedSender<Query<M>>,
}

impl<M> TestActor<M> {
    pub fn prepare_args(
        registry: TestActorRegistry,
    ) -> (crate::behaviour::Args<M>, impl Future<Output = Option<ActorID>>) {
        let (init_ack_tx, init_ack_rx) = oneshot::channel();
        let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
        let args = crate::behaviour::Args::<M> { init_ack_tx, ctl_rx, ctl_tx, registry };

        (args, async move { init_ack_rx.await.ok() })
    }

    pub async fn start(
        registry: TestActorRegistry,
        system: System,
        spawn_opts: SpawnOpts,
    ) -> Result<Self, SysSpawnError>
    where
        M: Send + Sync + Unpin + 'static,
    {
        let (init_ack_tx, init_ack_rx) = oneshot::channel();
        let (ctl_tx, ctl_rx) = mpsc::unbounded_channel();
        let actor_id = system
            .spawn(
                crate::behaviour::run::<M>,
                crate::behaviour::Args {
                    init_ack_tx,
                    ctl_rx,
                    ctl_tx: ctl_tx.to_owned(),
                    registry: registry.to_owned(),
                },
                spawn_opts,
            )
            .await?;
        let _ = init_ack_rx.await;

        Ok(registry
            .lookup(actor_id)
            .await
            .expect("Failed to lookup the actor in the registry"))
    }

    pub async fn wait(&self) -> Exit {
        crate::exited::wait(self.exited.as_ref()).await
    }

    pub fn system(&self) -> &System {
        &self.system
    }

    pub fn actor_id(&self) -> ActorID {
        self.actor_id
    }
}

impl<M> TestActor<M> {
    pub async fn post_message(&self, message: M)
    where
        M: Send + Sync + Unpin + 'static,
    {
        self.system.send(self.actor_id, message).await
    }

    pub async fn init_ack(&self, value: Option<ActorID>) {
        let (reply_on_drop, done) = oneshot::channel();
        assert!(self.ctl_tx.send(InitAckRq { value, reply_on_drop }.into()).is_ok());
        let _ = done.await;
    }

    pub async fn exit(&self, reason: Exit) {
        let (reply_on_drop, done) = oneshot::channel();
        assert!(self.ctl_tx.send(ExitRq { reason, reply_on_drop }.into()).is_ok());
        let _ = done.await;
    }

    pub async fn set_trap_exit(&self, set_to: bool) {
        let (reply_on_drop, done) = oneshot::channel();
        assert!(self.ctl_tx.send(SetTrapExitRq { set_to, reply_on_drop }.into()).is_ok());
        let _ = done.await;
    }

    pub async fn next_event(&self, timeout: Duration) -> Option<Event<M>> {
        let (reply_to, done) = oneshot::channel();
        assert!(self.ctl_tx.send(NextEventRq { timeout, reply_to }.into()).is_ok());
        done.await.ok()
    }

    pub async fn set_link(&self, actor: ActorID, link: bool) {
        let (reply_on_drop, done) = oneshot::channel();
        assert!(self.ctl_tx.send(SetLinkRq { actor, link, reply_on_drop }.into()).is_ok());
        let _ = done.await;
    }
}