1use std::collections::hash_map::Entry::*;
2use std::sync::Arc;
3
4use futures::{FutureExt, StreamExt};
5use mm1_address::address::Address;
6use mm1_common::log;
7use tokio::sync::{Mutex, mpsc};
8
9use super::MainActorOutcome;
10use crate::rt::actor_fn::ActorFn;
11use crate::rt::{
12 ActorEntry, ActorTaskOutcome, AddressLease, Event, EventKind, ForkTaskOutcome, RuntimeError,
13 RuntimeShared, TaskKey, TestContext, TestRuntime,
14};
15
16impl<R> TestRuntime<R> {
17 pub fn new() -> Self {
18 let (queries_tx, queries_rx) = mpsc::unbounded_channel();
19 let shared = Arc::new(Mutex::new(RuntimeShared {
20 queries_rx,
21 tasks: Default::default(),
22 entries: Default::default(),
23 }));
24 Self { queries_tx, shared }
25 }
26
27 pub fn new_context(
28 &self,
29 task_key: TaskKey,
30 address_lease: Option<AddressLease>,
31 ) -> TestContext<R> {
32 let queries_tx = self.queries_tx.clone();
33 TestContext {
34 task_key,
35 address_lease,
36 queries_tx,
37 }
38 }
39
40 pub async fn add_actor<A>(
41 &self,
42 actor_address: Address,
43 address_lease: Option<AddressLease>,
44 actor: A,
45 ) -> Result<&Self, RuntimeError>
46 where
47 R: Send + 'static,
48 A: for<'a> ActorFn<'a, R>,
49 {
50 let task_key = TaskKey::actor(actor_address);
51 let context = self.new_context(task_key, None);
52 let fut = async move {
53 let mut context = context;
54 let _ = actor.run(&mut context).await;
55 };
56 self.add_task(task_key, address_lease, fut).await?;
57 Ok(self)
58 }
59
60 pub async fn with_actor<A>(
61 self,
62 actor_address: Address,
63 address_lease: Option<AddressLease>,
64 actor: A,
65 ) -> Result<Self, RuntimeError>
66 where
67 R: Send + 'static,
68 A: for<'a> ActorFn<'a, R>,
69 {
70 self.add_actor(actor_address, address_lease, actor).await?;
71 Ok(self)
72 }
73
74 pub async fn add_task<F>(
75 &self,
76 task_key: TaskKey,
77 address_lease: Option<AddressLease>,
78 fut: F,
79 ) -> Result<(), RuntimeError>
80 where
81 F: Future<Output = ()> + Send + 'static,
82 {
83 self.shared
84 .lock()
85 .await
86 .add_task(task_key, address_lease, fut)
87 .await
88 }
89
90 pub async fn expect_next_event(&self) -> Event<R, EventKind<R>> {
91 self.next_event()
92 .await
93 .expect("error fetching next_event")
94 .expect("no more events")
95 }
96
97 pub async fn next_event(&self) -> Result<Option<Event<R, EventKind<R>>>, RuntimeError> {
98 let runtime = self.shared.clone();
99 let mut shared = self.shared.lock().await;
100 let RuntimeShared {
101 queries_rx, tasks, ..
102 } = &mut *shared;
103
104 let next_query = queries_rx.recv();
105 let task_done = tasks.next();
106
107 let event_opt = tokio::select! {
108 done = task_done => {
109 if let Some(done) = done {
110 let event = Event {runtime, kind: EventKind::Done(done)};
111 Some(event)
112 } else {
113 None
114 }},
115 query = next_query => {
116 if let Some(query) = query {
117 let event = Event {runtime, kind: EventKind::Query(query)};
118 Some(event)
119 } else {
120 None
121 }},
122 };
123
124 Ok(event_opt)
125 }
126}
127
128impl<R> Default for TestRuntime<R> {
129 fn default() -> Self {
130 Self::new()
131 }
132}
133
134impl<R> RuntimeShared<R> {
135 pub(crate) async fn add_task<F>(
136 &mut self,
137 task_key: TaskKey,
138 address_lease: Option<AddressLease>,
139 fut: F,
140 ) -> Result<(), RuntimeError>
141 where
142 F: Future<Output = ()> + Send + 'static,
143 {
144 match (
145 self.entries.entry(task_key.actor),
146 task_key.actor == task_key.context,
147 ) {
148 (Vacant(_), false) => Err(RuntimeError::NoActor(task_key.actor)),
149 (Vacant(vacant), true) => {
150 log::trace!("adding main actor [key: {:?}]", task_key);
151
152 let entry = ActorEntry {
153 forks: Default::default(),
154 actor_canceled: Default::default(),
155 actor_done: Default::default(),
156 };
157 let actor_canceled = entry.actor_canceled.clone();
158 let actor_done = entry.actor_done.clone();
159 let fut = async move {
160 tokio::select! {
161 _actor_canceled = actor_canceled.notified() => log::trace!("actor canceled [key: {:?}]", task_key),
162 _actor_completed = fut => log::trace!("actor completed [key: {:?}]", task_key),
163 }
164 actor_done.notify_waiters();
165 ActorTaskOutcome::Main(MainActorOutcome {
166 address: task_key.actor,
167 address_lease,
168 })
169 }
170 .boxed();
171
172 vacant.insert(entry);
173 self.tasks.push(fut);
174
175 Ok(())
176 },
177 (Occupied(_), true) => Err(RuntimeError::Duplicate(task_key.actor)),
178 (Occupied(occupied), false) => {
179 log::trace!("adding fork task [key: {:?}]", task_key);
180 let entry = occupied.into_mut();
181 let true = entry.forks.insert(task_key.context) else {
182 return Err(RuntimeError::Duplicate(task_key.context))
183 };
184 let actor_done = entry.actor_done.clone();
185 let fut = async move {
186 tokio::select! {
187 _fork_done = fut => log::trace!("fork task done [key: {:?}]", task_key),
188 _actor_done = actor_done.notified() => log::trace!("main actor notified completion [key: {:?}]", task_key),
189 };
190 ActorTaskOutcome::Fork(ForkTaskOutcome { task_key , address_lease, })
191 }
192 .boxed();
193 self.tasks.push(fut);
194
195 Ok(())
196 },
197 }
198 }
199}