agner_test_actor/
behaviour.rs1use std::sync::Arc;
2
3use agner_actors::{ActorID, Context};
4use agner_init_ack::ContextInitAckExt;
5use agner_utils::future_timeout_ext::FutureTimeoutExt;
6
7use tokio::sync::{mpsc, oneshot, Mutex};
8
9use crate::exited::Exited;
10use crate::query::{ExitRq, InitAckRq, NextEventRq, Query, SetLinkRq, SetTrapExitRq};
11use crate::registry::TestActorEntry;
12use crate::TestActorRegistry;
13
14pub struct Args<M> {
15 pub registry: TestActorRegistry,
16 pub init_ack_tx: oneshot::Sender<ActorID>,
17 pub ctl_rx: mpsc::UnboundedReceiver<Query<M>>,
18 pub ctl_tx: mpsc::UnboundedSender<Query<M>>,
19}
20
21pub async fn run<M>(context: &mut Context<M>, args: Args<M>)
22where
23 M: Send + Sync + Unpin + 'static,
24{
25 let Args { init_ack_tx, mut ctl_rx, ctl_tx, registry } = args;
26
27 let exited = context.system().wait(context.actor_id());
28
29 let entry = TestActorEntry {
30 system: context.system(),
31 ctl_tx: Box::new(ctl_tx),
32 exited: Arc::new(Mutex::new(Exited::Waiting(Box::pin(exited)))),
33 };
34 registry.0.write().await.insert(context.actor_id(), entry);
35
36 let _ = init_ack_tx.send(context.actor_id());
37
38 while let Some(query) = ctl_rx.recv().await {
39 match query {
40 Query::Exit(ExitRq { reason, .. }) => {
41 tracing::trace!("[{}] exitting: {}", context.actor_id(), reason);
42 context.exit(reason).await;
43 },
44 Query::SetTrapExit(SetTrapExitRq { set_to, .. }) => {
45 tracing::trace!("[{}] setting trap_exit={}", context.actor_id(), set_to);
46 context.trap_exit(set_to).await;
47 },
48 Query::NextEvent(NextEventRq { timeout, reply_to }) => {
49 tracing::trace!(
50 "[{}] fetching next-event (timeout: {:?})",
51 context.actor_id(),
52 timeout
53 );
54 if let Ok(event) = context.next_event().timeout(timeout).await {
55 tracing::trace!("[{}] received next-event", context.actor_id());
56 let _ = reply_to.send(event);
57 }
58 },
59 Query::SetLink(SetLinkRq { actor, link, .. }) =>
60 if link {
61 context.link(actor).await;
62 } else {
63 context.unlink(actor).await;
64 },
65 Query::InitAck(InitAckRq { value, .. }) => {
66 context.init_ack_ok(value);
67 },
68 }
69 }
70}