Skip to main content

mm1_test_rt/rt/
runtime.rs

1use std::collections::hash_map::Entry::*;
2use std::sync::Arc;
3
4use futures::{FutureExt, StreamExt};
5use mm1_address::address::Address;
6use mm1_common::log;
7use tokio::sync::{Mutex, mpsc};
8
9use super::MainActorOutcome;
10use crate::rt::actor_fn::ActorFn;
11use crate::rt::{
12    ActorEntry, ActorTaskOutcome, AddressLease, Event, EventKind, ForkTaskOutcome, RuntimeError,
13    RuntimeShared, TaskKey, TestContext, TestRuntime,
14};
15
16impl<R> TestRuntime<R> {
17    pub fn new() -> Self {
18        let (queries_tx, queries_rx) = mpsc::unbounded_channel();
19        let shared = Arc::new(Mutex::new(RuntimeShared {
20            queries_rx,
21            tasks: Default::default(),
22            entries: Default::default(),
23        }));
24        Self { queries_tx, shared }
25    }
26
27    pub fn new_context(
28        &self,
29        task_key: TaskKey,
30        address_lease: Option<AddressLease>,
31    ) -> TestContext<R> {
32        let queries_tx = self.queries_tx.clone();
33        TestContext {
34            task_key,
35            address_lease,
36            queries_tx,
37        }
38    }
39
40    pub async fn add_actor<A>(
41        &self,
42        actor_address: Address,
43        address_lease: Option<AddressLease>,
44        actor: A,
45    ) -> Result<&Self, RuntimeError>
46    where
47        R: Send + 'static,
48        A: for<'a> ActorFn<'a, R>,
49    {
50        let task_key = TaskKey::actor(actor_address);
51        let context = self.new_context(task_key, None);
52        let fut = async move {
53            let mut context = context;
54            let _ = actor.run(&mut context).await;
55        };
56        self.add_task(task_key, address_lease, fut).await?;
57        Ok(self)
58    }
59
60    pub async fn with_actor<A>(
61        self,
62        actor_address: Address,
63        address_lease: Option<AddressLease>,
64        actor: A,
65    ) -> Result<Self, RuntimeError>
66    where
67        R: Send + 'static,
68        A: for<'a> ActorFn<'a, R>,
69    {
70        self.add_actor(actor_address, address_lease, actor).await?;
71        Ok(self)
72    }
73
74    pub async fn add_task<F>(
75        &self,
76        task_key: TaskKey,
77        address_lease: Option<AddressLease>,
78        fut: F,
79    ) -> Result<(), RuntimeError>
80    where
81        F: Future<Output = ()> + Send + 'static,
82    {
83        self.shared
84            .lock()
85            .await
86            .add_task(task_key, address_lease, fut)
87            .await
88    }
89
90    pub async fn expect_next_event(&self) -> Event<R, EventKind<R>> {
91        self.next_event()
92            .await
93            .expect("error fetching next_event")
94            .expect("no more events")
95    }
96
97    pub async fn next_event(&self) -> Result<Option<Event<R, EventKind<R>>>, RuntimeError> {
98        let runtime = self.shared.clone();
99        let mut shared = self.shared.lock().await;
100        let RuntimeShared {
101            queries_rx, tasks, ..
102        } = &mut *shared;
103
104        let next_query = queries_rx.recv();
105        let task_done = tasks.next();
106
107        let event_opt = tokio::select! {
108            done = task_done => {
109                if let Some(done) = done {
110                    let event = Event {runtime, kind: EventKind::Done(done)};
111                    Some(event)
112                } else {
113                    None
114                }},
115            query = next_query => {
116                if let Some(query) = query {
117                    let event = Event {runtime, kind: EventKind::Query(query)};
118                    Some(event)
119                } else {
120                    None
121                }},
122        };
123
124        Ok(event_opt)
125    }
126}
127
128impl<R> Default for TestRuntime<R> {
129    fn default() -> Self {
130        Self::new()
131    }
132}
133
134impl<R> RuntimeShared<R> {
135    pub(crate) async fn add_task<F>(
136        &mut self,
137        task_key: TaskKey,
138        address_lease: Option<AddressLease>,
139        fut: F,
140    ) -> Result<(), RuntimeError>
141    where
142        F: Future<Output = ()> + Send + 'static,
143    {
144        match (
145            self.entries.entry(task_key.actor),
146            task_key.actor == task_key.context,
147        ) {
148            (Vacant(_), false) => Err(RuntimeError::NoActor(task_key.actor)),
149            (Vacant(vacant), true) => {
150                log::trace!("adding main actor [key: {:?}]", task_key);
151
152                let entry = ActorEntry {
153                    forks:          Default::default(),
154                    actor_canceled: Default::default(),
155                    actor_done:     Default::default(),
156                };
157                let actor_canceled = entry.actor_canceled.clone();
158                let actor_done = entry.actor_done.clone();
159                let fut = async move {
160                    tokio::select! {
161                        _actor_canceled = actor_canceled.notified() => log::trace!("actor canceled [key: {:?}]", task_key),
162                        _actor_completed = fut => log::trace!("actor completed [key: {:?}]", task_key),
163                    }
164                    actor_done.notify_waiters();
165                    ActorTaskOutcome::Main(MainActorOutcome {
166                        address: task_key.actor,
167                        address_lease,
168                    })
169                }
170                .boxed();
171
172                vacant.insert(entry);
173                self.tasks.push(fut);
174
175                Ok(())
176            },
177            (Occupied(_), true) => Err(RuntimeError::Duplicate(task_key.actor)),
178            (Occupied(occupied), false) => {
179                log::trace!("adding fork task [key: {:?}]", task_key);
180                let entry = occupied.into_mut();
181                let true = entry.forks.insert(task_key.context) else {
182                    return Err(RuntimeError::Duplicate(task_key.context))
183                };
184                let actor_done = entry.actor_done.clone();
185                let fut = async move {
186                    tokio::select! {
187                        _fork_done = fut => log::trace!("fork task done [key: {:?}]", task_key),
188                        _actor_done = actor_done.notified() => log::trace!("main actor notified completion [key: {:?}]", task_key),
189                    };
190                    ActorTaskOutcome::Fork(ForkTaskOutcome { task_key , address_lease, })
191                }
192                .boxed();
193                self.tasks.push(fut);
194
195                Ok(())
196            },
197        }
198    }
199}