agner_test_actor/
behaviour.rs

1use std::sync::Arc;
2
3use agner_actors::{ActorID, Context};
4use agner_init_ack::ContextInitAckExt;
5use agner_utils::future_timeout_ext::FutureTimeoutExt;
6
7use tokio::sync::{mpsc, oneshot, Mutex};
8
9use crate::exited::Exited;
10use crate::query::{ExitRq, InitAckRq, NextEventRq, Query, SetLinkRq, SetTrapExitRq};
11use crate::registry::TestActorEntry;
12use crate::TestActorRegistry;
13
14pub struct Args<M> {
15    pub registry: TestActorRegistry,
16    pub init_ack_tx: oneshot::Sender<ActorID>,
17    pub ctl_rx: mpsc::UnboundedReceiver<Query<M>>,
18    pub ctl_tx: mpsc::UnboundedSender<Query<M>>,
19}
20
21pub async fn run<M>(context: &mut Context<M>, args: Args<M>)
22where
23    M: Send + Sync + Unpin + 'static,
24{
25    let Args { init_ack_tx, mut ctl_rx, ctl_tx, registry } = args;
26
27    let exited = context.system().wait(context.actor_id());
28
29    let entry = TestActorEntry {
30        system: context.system(),
31        ctl_tx: Box::new(ctl_tx),
32        exited: Arc::new(Mutex::new(Exited::Waiting(Box::pin(exited)))),
33    };
34    registry.0.write().await.insert(context.actor_id(), entry);
35
36    let _ = init_ack_tx.send(context.actor_id());
37
38    while let Some(query) = ctl_rx.recv().await {
39        match query {
40            Query::Exit(ExitRq { reason, .. }) => {
41                tracing::trace!("[{}] exitting: {}", context.actor_id(), reason);
42                context.exit(reason).await;
43            },
44            Query::SetTrapExit(SetTrapExitRq { set_to, .. }) => {
45                tracing::trace!("[{}] setting trap_exit={}", context.actor_id(), set_to);
46                context.trap_exit(set_to).await;
47            },
48            Query::NextEvent(NextEventRq { timeout, reply_to }) => {
49                tracing::trace!(
50                    "[{}] fetching next-event (timeout: {:?})",
51                    context.actor_id(),
52                    timeout
53                );
54                if let Ok(event) = context.next_event().timeout(timeout).await {
55                    tracing::trace!("[{}] received next-event", context.actor_id());
56                    let _ = reply_to.send(event);
57                }
58            },
59            Query::SetLink(SetLinkRq { actor, link, .. }) =>
60                if link {
61                    context.link(actor).await;
62                } else {
63                    context.unlink(actor).await;
64                },
65            Query::InitAck(InitAckRq { value, .. }) => {
66                context.init_ack_ok(value);
67            },
68        }
69    }
70}