Skip to main content

mm1_test_rt/
rt.rs

1use std::collections::{HashMap, HashSet};
2use std::sync::Arc;
3
4use futures::future::BoxFuture;
5use futures::stream::FuturesUnordered;
6use mm1_address::address::Address;
7use mm1_address::pool::Lease as AddressLease;
8use mm1_address::subnet::NetAddress;
9use tokio::sync::{Mutex, Notify, mpsc};
10
11mod actor_fn;
12mod context;
13pub mod event;
14pub mod query;
15mod runtime;
16
17#[derive(Debug, thiserror::Error)]
18pub enum RuntimeError {
19    #[error("no actor: {}", _0)]
20    NoActor(Address),
21    #[error("duplicate: {}", _0)]
22    Duplicate(Address),
23}
24
25#[derive(Debug, Clone)]
26pub struct TestRuntime<R> {
27    queries_tx: mpsc::UnboundedSender<Query<R>>,
28    shared:     Arc<Mutex<RuntimeShared<R>>>,
29}
30
31#[derive(derive_more::Debug)]
32pub struct Event<R, K> {
33    #[debug(skip)]
34    runtime: Arc<Mutex<RuntimeShared<R>>>,
35
36    pub kind: K,
37}
38
39#[derive(Debug)]
40pub enum EventKind<R> {
41    Done(ActorTaskOutcome),
42    Query(Query<R>),
43}
44
45#[derive(derive_more::Debug)]
46pub struct TestContext<R> {
47    task_key:      TaskKey,
48    queries_tx:    mpsc::UnboundedSender<Query<R>>,
49    address_lease: Option<AddressLease>,
50}
51
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
53pub struct TaskKey {
54    pub actor:   Address,
55    pub context: Address,
56}
57
58#[derive(Debug, derive_more::From)]
59pub enum Query<R> {
60    InitDone(query::InitDone),
61    Spawn(query::Spawn<R>),
62    Start(query::Start<R>),
63    Recv(query::Recv),
64    RecvClose(query::RecvClose),
65    Fork(query::Fork<R>),
66    ForkRun(query::ForkRun),
67    Quit(query::Quit),
68    Tell(query::Tell),
69    Watch(query::Watch),
70    Unwatch(query::Unwatch),
71    Link(query::Link),
72    Unlink(query::Unlink),
73    SetTrapExit(query::SetTrapExit),
74    Exit(query::Exit),
75    Kill(query::Kill),
76    BindNetAddress(query::Bind<NetAddress>),
77}
78
79#[derive(derive_more::Debug)]
80struct RuntimeShared<R> {
81    queries_rx: mpsc::UnboundedReceiver<Query<R>>,
82    tasks:      FuturesUnordered<BoxFuture<'static, ActorTaskOutcome>>,
83    entries:    HashMap<Address, ActorEntry>,
84}
85
86#[derive(derive_more::Debug)]
87struct ActorEntry {
88    forks: HashSet<Address>,
89
90    #[debug(skip)]
91    actor_canceled: Arc<Notify>,
92
93    #[debug(skip)]
94    actor_done: Arc<Notify>,
95}
96
97#[derive(Debug)]
98pub enum ActorTaskOutcome {
99    Main(MainActorOutcome),
100    Fork(ForkTaskOutcome),
101}
102
103#[derive(Debug)]
104pub struct MainActorOutcome {
105    pub address:       Address,
106    pub address_lease: Option<AddressLease>,
107}
108
109#[derive(Debug)]
110pub struct ForkTaskOutcome {
111    pub task_key:      TaskKey,
112    pub address_lease: Option<AddressLease>,
113}