1use std::sync::Arc;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::errors::error_of::ErrorOf;
10use mm1_common::futures::timeout::FutureTimeoutExt;
11use mm1_common::log;
12use mm1_common::types::{AnyError, Never};
13use mm1_core::context::{
14 Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit,
15 RecvErrorKind, SendErrorKind, Start, Stop, Watching,
16};
17use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
18use mm1_core::tracing::{TraceId, WithTraceIdExt};
19use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
20use mm1_runnable::local::BoxedRunnable;
21use tokio::sync::{mpsc, oneshot};
22use tokio::time::Instant;
23use tracing::trace;
24
25use crate::config::EffectiveActorConfig;
26use crate::registry::{ForkEntry, NetworkNode};
27use crate::runtime::container;
28use crate::runtime::context::ActorContext;
29use crate::runtime::sys_call::SysCall;
30use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
31
32impl Quit for ActorContext {
33 async fn quit_ok(&mut self) -> Never {
34 self.call.invoke(SysCall::Exit(Ok(()))).await;
35 std::future::pending().await
36 }
37
38 async fn quit_err<E>(&mut self, reason: E) -> Never
39 where
40 E: std::error::Error + Send + Sync + 'static,
41 {
42 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
43 std::future::pending().await
44 }
45}
46
47impl Fork for ActorContext {
48 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
49 let actor_node = self
50 .actor_node
51 .upgrade()
52 .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "actor_node.upgrade"))?;
53 let fork_lease = actor_node.lease_address().map_err(|lease_error| {
54 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
55 })?;
56 let fork_address = fork_lease.address;
57
58 let (tx_priority, rx_priority) = mpsc::unbounded_channel();
59 let (tx_regular, rx_regular) = mpsc::unbounded_channel();
60 let tx_system_weak = self.tx_system_weak.clone();
61
62 self.call
63 .invoke(SysCall::ForkAdded(
64 fork_lease.address,
65 tx_priority.downgrade(),
66 ))
67 .await;
68
69 let tx_priority_weak = tx_priority.downgrade();
70 let tx_regular_weak = tx_regular.downgrade();
71 let fork_entry = ForkEntry::new(fork_lease, tx_priority, tx_regular);
72
73 let registered = actor_node.register(fork_address, fork_entry);
74 assert!(registered);
75
76 let rt_api = self.rt_api.clone();
77 let rt_config = self.rt_config.clone();
78
79 let actor_key = self.actor_key.clone();
80
81 let context = Self {
82 rt_api,
83 rt_config,
84
85 address: fork_address,
86 actor_key,
87 ack_to: None,
88 actor_node: self.actor_node.clone(),
89 network_nodes: Default::default(),
90
91 rx_priority,
92 rx_regular,
93 tx_system_weak,
94 tx_priority_weak,
95 tx_regular_weak,
96 call: self.call.clone(),
97
98 tx_actor_failure: self.tx_actor_failure.clone(),
99 };
100
101 Ok(context)
102 }
103
104 async fn run<F, Fut>(self, fun: F)
105 where
106 F: FnOnce(Self) -> Fut,
107 F: Send + 'static,
108 Fut: Future + Send + 'static,
109 {
110 let call = self.call.clone();
111 let fut = fun(self)
112 .map(|_| ())
113 .with_trace_id(TraceId::current())
114 .boxed();
115 call.invoke(SysCall::Spawn(fut)).await;
116 }
117}
118
119impl Now for ActorContext {
120 type Instant = Instant;
121
122 fn now(&self) -> Self::Instant {
123 Instant::now()
124 }
125}
126
127impl Start<BoxedRunnable<Self>> for ActorContext {
128 fn spawn(
129 &mut self,
130 runnable: BoxedRunnable<Self>,
131 link: bool,
132 ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
133 do_spawn(self, runnable, None, link.then_some(self.address))
134 }
135
136 fn start(
137 &mut self,
138 runnable: BoxedRunnable<Self>,
139 link: bool,
140 start_timeout: Duration,
141 ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
142 do_start(self, runnable, link, start_timeout)
143 }
144}
145
146impl Stop for ActorContext {
147 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
148 let this = self.address;
149 let out = do_exit(self, this, peer).is_ok();
150 std::future::ready(out)
151 }
152
153 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
154 let out = do_kill(self, peer).is_ok();
155 std::future::ready(out)
156 }
157}
158
159impl Linking for ActorContext {
160 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
161 let this = self.address;
162 do_link(self, this, peer)
163 }
164
165 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
166 let this = self.address;
167 do_unlink(self, this, peer)
168 }
169
170 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
171 do_set_trap_exit(self, enable)
172 }
173}
174
175impl Watching for ActorContext {
176 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
177 do_watch(self, peer)
178 }
179
180 fn unwatch(
181 &mut self,
182 watch_ref: mm1_proto_system::WatchRef,
183 ) -> impl Future<Output = ()> + Send {
184 do_unwatch(self, watch_ref)
185 }
186}
187
188impl InitDone for ActorContext {
189 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
190 do_init_done(self, address)
191 }
192}
193
194impl Messaging for ActorContext {
195 fn address(&self) -> Address {
196 self.address
197 }
198
199 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
200 let (priority, inbound_opt) = tokio::select! {
201 biased;
202
203 inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
204 inbound_opt = self.rx_regular.recv() => (false, inbound_opt.map(|m| m.message)),
205 };
206
207 trace!(
208 "received [priority: {}; inbound: {:?}; via {}]",
209 priority, inbound_opt, self.address
210 );
211
212 inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
213 }
214
215 async fn close(&mut self) {
216 self.rx_regular.close();
217 self.rx_priority.close();
218 }
219
220 fn send(
221 &mut self,
222 envelope: Envelope,
223 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
224 std::future::ready(do_send(self, envelope))
225 }
226
227 fn forward(
228 &mut self,
229 to: Address,
230 envelope: Envelope,
231 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
232 std::future::ready(do_forward(self, to, envelope))
233 }
234}
235
236impl Bind<NetAddress> for ActorContext {
237 async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
238 use std::collections::btree_map::Entry::*;
239
240 let BindArgs {
241 bind_to,
242 inbox_size,
243 } = args;
244
245 log::debug!("binding [to: {}; inbox-size: {}]", bind_to, inbox_size);
246
247 let rt_api = self.rt_api.clone();
248 let registry = rt_api.registry();
249
250 let address_range = AddressRange::from(bind_to);
251
252 let network_nodes_entry = match self.network_nodes.entry(address_range) {
253 Occupied(o) => {
254 let previously_bound_to = NetAddress::from(*o.key());
255 return Err(ErrorOf::new(
256 BindErrorKind::Conflict,
257 format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
258 ))
259 },
260 Vacant(v) => v,
261 };
262
263 let subnet_lease = Lease::trusted(bind_to);
264
265 let (tx_system, _rx_system) = mpsc::unbounded_channel();
266
267 let tx_priority = self.tx_priority_weak.upgrade().ok_or_else(|| {
268 ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
269 })?;
270 let tx_regular = self.tx_regular_weak.upgrade().ok_or_else(|| {
271 ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
272 })?;
273
274 let network_node = Arc::new(NetworkNode::new(
275 subnet_lease,
276 inbox_size,
277 tx_system,
278 tx_priority,
279 tx_regular,
280 ));
281
282 if !registry.register(bind_to, network_node.clone()) {
283 log::warn!("couldn't bind [to: {}; reason: conflict]", bind_to);
284 return Err(ErrorOf::new(
285 BindErrorKind::Conflict,
286 "probably address in use",
287 ))
288 }
289
290 let network_node = Arc::downgrade(&network_node);
291 network_nodes_entry.insert(network_node);
292
293 log::info!("bound [to: {}; inbox-size: {}]", bind_to, inbox_size);
294
295 Ok(())
296 }
297}
298
299async fn do_start(
300 context: &mut ActorContext,
301 runnable: BoxedRunnable<ActorContext>,
302 link: bool,
303 timeout: Duration,
304) -> Result<Address, ErrorOf<StartErrorKind>> {
305 let this_address = context.address;
306
307 let mut fork = context
308 .fork()
309 .await
310 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
311
312 let fork_address = fork.address;
313 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
314 .await
315 .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
316
317 let envelope = match fork.recv().timeout(timeout).await {
318 Err(_elapsed) => {
319 do_kill(context, spawned_address)
320 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
321
322 return Err(ErrorOf::new(
325 StartErrorKind::Timeout,
326 "no init-ack within timeout",
327 ))
328 },
329 Ok(recv_result) => {
330 recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
331 },
332 };
333
334 dispatch!(match envelope {
335 mm1_proto_system::InitAck { address } => {
336 if link {
337 do_link(context, this_address, address).await;
338 }
339 Ok(address)
340 },
341
342 mm1_proto_system::Exited { .. } => {
343 Err(ErrorOf::new(
344 StartErrorKind::Exited,
345 "exited before init-ack",
346 ))
347 },
348
349 unexpected @ _ => {
350 Err(ErrorOf::new(
351 StartErrorKind::InternalError,
352 format!("unexpected message: {unexpected:?}"),
353 ))
354 },
355 })
356}
357
358async fn do_spawn(
359 context: &mut ActorContext,
360 runnable: BoxedRunnable<ActorContext>,
361 ack_to: Option<Address>,
362 link_to: impl IntoIterator<Item = Address>,
363) -> Result<Address, ErrorOf<SpawnErrorKind>> {
364 let actor_key = context.actor_key.child(runnable.func_name());
365 let actor_config = context.rt_config.actor_config(&actor_key);
366 let execute_on = context.rt_api.choose_executor(actor_config.runtime_key());
367
368 trace!("starting [ack-to: {:?}]", ack_to);
369
370 let subnet_lease = context
371 .rt_api
372 .request_address(actor_config.netmask())
373 .await
374 .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
375
376 let rt_api = context.rt_api.clone();
377 let rt_config = context.rt_config.clone();
378 let container = container::Container::create(
379 container::ContainerArgs {
380 ack_to,
381 link_to: link_to.into_iter().collect(),
383 actor_key,
384 trace_id: TraceId::current(),
385
386 subnet_lease,
387 rt_api,
388 rt_config,
389 },
390 runnable,
391 context.tx_actor_failure.clone(),
392 )
393 .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
394 let actor_address = container.actor_address();
395
396 trace!("actor-address: {}", actor_address);
397
398 let tx_actor_failure = context.tx_actor_failure.clone();
399 let _join_handle = execute_on.spawn(async move {
401 match container.run().await {
402 Ok(Ok(())) => (),
403 Ok(Err(actor_failure)) => {
404 let _ = tx_actor_failure.send((actor_address, actor_failure));
405 },
406 Err(container_failure) => {
407 let report = AnyError::from(container_failure);
408 mm1_common::log::error!(
409 "actor container failure [addr: {}]: {}",
410 actor_address,
411 report
412 .chain()
413 .map(|e| e.to_string())
414 .collect::<Vec<_>>()
415 .join(" <- ")
416 );
417 },
418 }
419 });
420
421 Ok(actor_address)
422}
423
424fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
425 context.rt_api.sys_send(
426 peer,
427 SysMsg::Link(SysLink::Exit {
428 sender: this,
429 receiver: peer,
430 reason: ExitReason::Terminate,
431 }),
432 )
433}
434
435fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
436 context.rt_api.sys_send(peer, SysMsg::Kill)
437}
438
439async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
440 context
441 .call
442 .invoke(SysCall::Link {
443 sender: this,
444 receiver: peer,
445 })
446 .await
447}
448
449async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
450 context
451 .call
452 .invoke(SysCall::Unlink {
453 sender: this,
454 receiver: peer,
455 })
456 .await
457}
458
459async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
460 context.call.invoke(SysCall::TrapExit(enable)).await;
461}
462
463async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
464 let this = context.address;
465 let (reply_tx, reply_rx) = oneshot::channel();
466 context
467 .call
468 .invoke(SysCall::Watch {
469 sender: this,
470 receiver: peer,
471 reply_tx,
472 })
473 .await;
474 reply_rx.await.expect("sys-call remained unanswered")
475}
476
477async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
478 let this = context.address;
479 context
480 .call
481 .invoke(SysCall::Unwatch {
482 sender: this,
483 watch_ref,
484 })
485 .await;
486}
487
488async fn do_init_done(context: &mut ActorContext, address: Address) {
489 let message = InitAck { address };
490 let Some(ack_to_address) = context.ack_to.take() else {
491 return;
492 };
493 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
494 let _ = context
495 .rt_api
496 .send_to(envelope.header().to, true, envelope.into_erased());
497}
498
499fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
500 trace!("sending [outbound: {:?}]", outbound);
501 context
502 .rt_api
503 .send_to(outbound.header().to, false, outbound)
504 .map_err(|k| ErrorOf::new(k, ""))
505}
506
507fn do_forward(
508 context: &mut ActorContext,
509 to: Address,
510 outbound: Envelope,
511) -> Result<(), ErrorOf<SendErrorKind>> {
512 trace!("forwarding [to: {}, outbound: {:?}]", to, outbound);
513 context
514 .rt_api
515 .send_to(to, false, outbound)
516 .map_err(|k| ErrorOf::new(k, ""))
517}