1use std::future::Future;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::subnet::NetMask;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::futures::timeout::FutureTimeoutExt;
9use mm1_common::types::Never;
10use mm1_core::context::{
11 Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit, RecvErrorKind, SendErrorKind,
12 Start, Stop, Watching,
13};
14use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
15use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
16use tokio::sync::oneshot;
17use tokio::time::Instant;
18use tracing::trace;
19
20use crate::runtime::config::EffectiveActorConfig;
21use crate::runtime::context::ActorContext;
22use crate::runtime::runnable::BoxedRunnable;
23use crate::runtime::sys_call::SysCall;
24use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
25use crate::runtime::{container, mq};
26
27impl Quit for ActorContext {
28 async fn quit_ok(&mut self) -> Never {
29 self.call.invoke(SysCall::Exit(Ok(()))).await;
30 std::future::pending().await
31 }
32
33 async fn quit_err<E>(&mut self, reason: E) -> Never
34 where
35 E: std::error::Error + Send + Sync + 'static,
36 {
37 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
38 std::future::pending().await
39 }
40}
41
42impl Fork for ActorContext {
43 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
44 let address_lease = self
45 .subnet_pool
46 .lease(NetMask::M_64)
47 .map_err(|lease_error| {
48 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
49 })?;
50 let actor_address = address_lease.address;
51
52 let (tx_priority, rx_priority) = mq::unbounded();
53 let (tx_regular, rx_regular) = mq::bounded(
54 self.rt_config
55 .actor_config(&self.actor_key)
56 .fork_inbox_size(),
57 );
58 let tx_system_weak = self.tx_system_weak.clone();
59 let tx_system = tx_system_weak
60 .upgrade()
61 .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "tx_system_weak.upgrade"))?;
62
63 self.call
64 .invoke(SysCall::ForkAdded(
65 address_lease.address,
66 tx_priority.downgrade(),
67 ))
68 .await;
69
70 let () = self
71 .rt_api
72 .register(address_lease, tx_system, tx_priority, tx_regular);
73
74 let subnet_pool = self.subnet_pool.clone();
75 let rt_api = self.rt_api.clone();
76 let rt_config = self.rt_config.clone();
77
78 let actor_key = self.actor_key.clone();
79
80 let context = Self {
81 rt_api,
82 rt_config,
83 actor_address,
84 rx_priority,
85 rx_regular,
86 tx_system_weak,
87 call: self.call.clone(),
88 actor_key,
89 subnet_pool,
90 ack_to: None,
91 unregister_on_drop: true,
92 };
93
94 Ok(context)
95 }
96
97 async fn run<F, Fut>(self, fun: F)
98 where
99 F: FnOnce(Self) -> Fut,
100 F: Send + 'static,
101 Fut: std::future::Future + Send + 'static,
102 {
103 let call = self.call.clone();
104 let fut = fun(self).map(|_| ()).boxed();
105 call.invoke(SysCall::Spawn(fut)).await;
106 }
107}
108
109impl Now for ActorContext {
110 type Instant = Instant;
111
112 fn now(&self) -> Self::Instant {
113 Instant::now()
114 }
115}
116
117impl Start<BoxedRunnable<Self>> for ActorContext {
118 fn spawn(
119 &mut self,
120 runnable: BoxedRunnable<Self>,
121 link: bool,
122 ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
123 do_spawn(self, runnable, None, link.then_some(self.actor_address))
124 }
125
126 fn start(
127 &mut self,
128 runnable: BoxedRunnable<Self>,
129 link: bool,
130 start_timeout: Duration,
131 ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
132 do_start(self, runnable, link, start_timeout)
133 }
134}
135
136impl Stop for ActorContext {
137 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
138 let this = self.actor_address;
139 let out = do_exit(self, this, peer).is_ok();
140 std::future::ready(out)
141 }
142
143 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
144 let out = do_kill(self, peer).is_ok();
145 std::future::ready(out)
146 }
147}
148
149impl Linking for ActorContext {
150 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
151 let this = self.actor_address;
152 do_link(self, this, peer)
153 }
154
155 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
156 let this = self.actor_address;
157 do_unlink(self, this, peer)
158 }
159
160 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
161 do_set_trap_exit(self, enable)
162 }
163}
164
165impl Watching for ActorContext {
166 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
167 do_watch(self, peer)
168 }
169
170 fn unwatch(
171 &mut self,
172 watch_ref: mm1_proto_system::WatchRef,
173 ) -> impl Future<Output = ()> + Send {
174 do_unwatch(self, watch_ref)
175 }
176}
177
178impl InitDone for ActorContext {
179 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
180 do_init_done(self, address)
181 }
182}
183
184impl Messaging for ActorContext {
185 fn address(&self) -> Address {
186 self.actor_address
187 }
188
189 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
190 let (priority, inbound_opt) = tokio::select! {
191 biased;
192
193 inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
194 inbound_opt = self.rx_regular.recv() => (false, inbound_opt),
195 };
196
197 trace!(
198 "received [priority: {}; inbound: {:?}]",
199 priority, inbound_opt
200 );
201
202 inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
203 }
204
205 async fn close(&mut self) {
206 self.rx_regular.close();
207 self.rx_priority.close();
208 }
209
210 fn send(
211 &mut self,
212 envelope: Envelope,
213 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
214 std::future::ready(do_send(self, envelope))
215 }
216}
217
218async fn do_start(
219 context: &mut ActorContext,
220 runnable: BoxedRunnable<ActorContext>,
221 link: bool,
222 timeout: Duration,
223) -> Result<Address, ErrorOf<StartErrorKind>> {
224 let this_address = context.actor_address;
225
226 let mut fork = context
227 .fork()
228 .await
229 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
230
231 let fork_address = fork.actor_address;
232 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
233 .await
234 .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
235
236 let envelope = match fork.recv().timeout(timeout).await {
237 Err(_elapsed) => {
238 do_kill(context, spawned_address)
239 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
240
241 return Err(ErrorOf::new(
244 StartErrorKind::Timeout,
245 "no init-ack within timeout",
246 ))
247 },
248 Ok(recv_result) => {
249 recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
250 },
251 };
252
253 dispatch!(match envelope {
254 mm1_proto_system::InitAck { address } => {
255 if link {
256 do_link(context, this_address, address).await;
257 }
258 Ok(address)
259 },
260
261 mm1_proto_system::Exited { .. } => {
262 Err(ErrorOf::new(
263 StartErrorKind::Exited,
264 "exited before init-ack",
265 ))
266 },
267
268 unexpected @ _ => {
269 Err(ErrorOf::new(
270 StartErrorKind::InternalError,
271 format!("unexpected message: {:?}", unexpected),
272 ))
273 },
274 })
275}
276
277async fn do_spawn(
278 context: &mut ActorContext,
279 runnable: BoxedRunnable<ActorContext>,
280 ack_to: Option<Address>,
281 link_to: impl IntoIterator<Item = Address>,
282) -> Result<Address, ErrorOf<SpawnErrorKind>> {
283 let actor_key = context.actor_key.child(runnable.func_name());
284 let actor_config = context.rt_config.actor_config(&actor_key);
285 let execute_on = context.rt_api.choose_executor(actor_config.runtime_key());
286
287 trace!("starting [ack-to: {:?}]", ack_to);
288
289 let subnet_lease = context
290 .rt_api
291 .request_address(actor_config.netmask())
292 .await
293 .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
294
295 trace!("subnet-lease: {}", subnet_lease.net_address());
296
297 let rt_api = context.rt_api.clone();
298 let rt_config = context.rt_config.clone();
299 let container = container::Container::create(
300 container::ContainerArgs {
301 ack_to,
302 link_to: link_to.into_iter().collect(),
304 actor_key,
305
306 subnet_lease,
307 rt_api,
308 rt_config,
309 },
310 runnable,
311 )
312 .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
313 let actor_address = container.actor_address();
314
315 trace!("actor-address: {}", actor_address);
316
317 let _join_handle = execute_on.spawn(container.run());
319
320 Ok(actor_address)
321}
322
323fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
324 context.rt_api.sys_send(
325 peer,
326 SysMsg::Link(SysLink::Exit {
327 sender: this,
328 receiver: peer,
329 reason: ExitReason::Terminate,
330 }),
331 )
332}
333
334fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
335 context.rt_api.sys_send(peer, SysMsg::Kill)
336}
337
338async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
339 context
340 .call
341 .invoke(SysCall::Link {
342 sender: this,
343 receiver: peer,
344 })
345 .await
346}
347
348async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
349 context
350 .call
351 .invoke(SysCall::Unlink {
352 sender: this,
353 receiver: peer,
354 })
355 .await
356}
357
358async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
359 context.call.invoke(SysCall::TrapExit(enable)).await;
360}
361
362async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
363 let this = context.actor_address;
364 let (reply_tx, reply_rx) = oneshot::channel();
365 context
366 .call
367 .invoke(SysCall::Watch {
368 sender: this,
369 receiver: peer,
370 reply_tx,
371 })
372 .await;
373 reply_rx.await.expect("sys-call remained unanswered")
374}
375
376async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
377 let this = context.actor_address;
378 context
379 .call
380 .invoke(SysCall::Unwatch {
381 sender: this,
382 watch_ref,
383 })
384 .await;
385}
386
387async fn do_init_done(context: &mut ActorContext, address: Address) {
388 let message = InitAck { address };
389 let Some(ack_to_address) = context.ack_to.take() else {
390 return;
391 };
392 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
393 let _ = context.rt_api.send(true, envelope.into_erased());
394}
395
396fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
397 trace!("sending [outbound: {:?}]", outbound);
398 context
399 .rt_api
400 .send(false, outbound)
401 .map_err(|k| ErrorOf::new(k, ""))
402}