1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
use std::sync::Arc;
use agner_actors::{ActorID, Context};
use agner_utils::future_timeout_ext::FutureTimeoutExt;
use tokio::sync::{mpsc, oneshot, Mutex};
use crate::exited::Exited;
use crate::query::{ExitRq, InitAckRq, NextEventRq, Query, SetLinkRq, SetTrapExitRq};
use crate::registry::TestActorEntry;
use crate::TestActorRegistry;
pub struct Args<M> {
pub registry: TestActorRegistry,
pub init_ack_tx: oneshot::Sender<ActorID>,
pub ctl_rx: mpsc::UnboundedReceiver<Query<M>>,
pub ctl_tx: mpsc::UnboundedSender<Query<M>>,
}
pub async fn run<M>(context: &mut Context<M>, args: Args<M>)
where
M: Send + Sync + Unpin + 'static,
{
let Args { init_ack_tx, mut ctl_rx, ctl_tx, registry } = args;
let exited = context.system().wait(context.actor_id());
let entry = TestActorEntry {
system: context.system(),
ctl_tx: Box::new(ctl_tx),
exited: Arc::new(Mutex::new(Exited::Waiting(Box::pin(exited)))),
};
registry.0.write().await.insert(context.actor_id(), entry);
let _ = init_ack_tx.send(context.actor_id());
while let Some(query) = ctl_rx.recv().await {
match query {
Query::Exit(ExitRq { reason, .. }) => {
log::trace!("[{}] exitting: {}", context.actor_id(), reason);
context.exit(reason).await;
},
Query::SetTrapExit(SetTrapExitRq { set_to, .. }) => {
log::trace!("[{}] setting trap_exit={}", context.actor_id(), set_to);
context.trap_exit(set_to).await;
},
Query::NextEvent(NextEventRq { timeout, reply_to }) => {
log::trace!(
"[{}] fetching next-event (timeout: {:?})",
context.actor_id(),
timeout
);
if let Ok(event) = context.next_event().timeout(timeout).await {
log::trace!("[{}] received next-event", context.actor_id());
let _ = reply_to.send(event);
}
},
Query::SetLink(SetLinkRq { actor, link, .. }) =>
if link {
context.link(actor).await;
} else {
context.unlink(actor).await;
},
Query::InitAck(InitAckRq { value, .. }) => {
context.init_ack_ok(value);
},
}
}
}