1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::chain::{ExactTypeDisplayChainExt, StdErrorDisplayChainExt};
10use mm1_common::errors::error_of::ErrorOf;
11use mm1_common::futures::timeout::FutureTimeoutExt;
12use mm1_common::log;
13use mm1_common::types::{AnyError, Never};
14use mm1_core::context::{
15 Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Ping,
16 PingErrorKind, Quit, RecvErrorKind, SendErrorKind, Start, Stop, Watching,
17};
18use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
19use mm1_core::tracing::{TraceId, WithTraceIdExt};
20use mm1_proto_system as sys;
21use mm1_runnable::local::BoxedRunnable;
22use rand::RngCore;
23use tokio::sync::oneshot;
24use tokio::time::Instant;
25use tracing::{trace, warn};
26
27use crate::config::EffectiveActorConfig;
28use crate::registry::{self, MessageWithPermit, MessageWithoutPermit};
29use crate::runtime::container;
30use crate::runtime::context::{ActorContext, ForkEntry, SubnetContext};
31use crate::runtime::rt_api::RtApi;
32use crate::runtime::sys_call::SysCall;
33use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
34
35impl Quit for ActorContext {
36 async fn quit_ok(&mut self) -> Never {
37 self.call.invoke(SysCall::Exit(Ok(()))).await;
38 std::future::pending().await
39 }
40
41 async fn quit_err<E>(&mut self, reason: E) -> Never
42 where
43 E: std::error::Error + Send + Sync + 'static,
44 {
45 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
46 std::future::pending().await
47 }
48}
49
50impl Fork for ActorContext {
51 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
52 let Self {
53 fork_address: this_address,
54 call,
55 subnet_context,
56 ..
57 } = self;
58
59 let fork_lease = {
60 let mut subnet_context_locked = subnet_context
61 .try_lock()
62 .expect("could not lock subnet_context");
63 let SubnetContext {
64 subnet_pool,
65 fork_entries,
66 ..
67 } = &mut *subnet_context_locked;
68 let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
69 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
70 })?;
71 let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
72 assert!(should_be_none.is_none());
73 fork_lease
74 };
75 let fork_address = fork_lease.address;
76 trace!(parent = %this_address, child = %fork_address, "forking");
77
78 call.invoke(SysCall::ForkAdded(fork_address)).await;
79
80 let context = Self {
81 subnet_context: subnet_context.clone(),
82 fork_address,
83 fork_lease: Some(fork_lease),
84 ack_to: None,
85 call: call.clone(),
86 };
87
88 Ok(context)
89 }
90
91 async fn run<F, Fut>(self, fun: F)
92 where
93 F: FnOnce(Self) -> Fut,
94 F: Send + 'static,
95 Fut: Future + Send + 'static,
96 {
97 let call = self.call.clone();
98 let fut = fun(self)
99 .map(|_| ())
100 .with_trace_id(TraceId::current())
101 .boxed();
102 call.invoke(SysCall::Spawn(fut)).await;
103 }
104}
105
106impl Now for ActorContext {
107 type Instant = Instant;
108
109 fn now(&self) -> Self::Instant {
110 Instant::now()
111 }
112}
113
114impl Start<BoxedRunnable<Self>> for ActorContext {
115 fn spawn(
116 &mut self,
117 runnable: BoxedRunnable<Self>,
118 link: bool,
119 ) -> impl Future<Output = Result<Address, ErrorOf<sys::SpawnErrorKind>>> + Send {
120 do_spawn(self, runnable, None, link.then_some(self.fork_address))
121 }
122
123 fn start(
124 &mut self,
125 runnable: BoxedRunnable<Self>,
126 link: bool,
127 start_timeout: Duration,
128 ) -> impl Future<Output = Result<Address, ErrorOf<sys::StartErrorKind>>> + Send {
129 do_start(self, runnable, link, start_timeout)
130 }
131}
132
133impl Stop for ActorContext {
134 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
135 let this = self.fork_address;
136 let out = do_exit(self, this, peer).is_ok();
137 std::future::ready(out)
138 }
139
140 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
141 let out = do_kill(self, peer).is_ok();
142 std::future::ready(out)
143 }
144}
145
146impl Linking for ActorContext {
147 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
148 let this = self.fork_address;
149 do_link(self, this, peer)
150 }
151
152 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
153 let this = self.fork_address;
154 do_unlink(self, this, peer)
155 }
156
157 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
158 do_set_trap_exit(self, enable)
159 }
160}
161
162impl Watching for ActorContext {
163 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
164 do_watch(self, peer)
165 }
166
167 fn unwatch(
168 &mut self,
169 watch_ref: mm1_proto_system::WatchRef,
170 ) -> impl Future<Output = ()> + Send {
171 do_unwatch(self, watch_ref)
172 }
173}
174
175impl InitDone for ActorContext {
176 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
177 do_init_done(self, address)
178 }
179}
180
181impl Messaging for ActorContext {
182 fn address(&self) -> Address {
183 self.fork_address
184 }
185
186 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
187 let ActorContext {
188 fork_address,
189 subnet_context,
190 ..
191 } = self;
192
193 loop {
194 let (inbound_envelope_opt, subnet_notify, fork_notify) = {
195 let mut subnet_context_locked = subnet_context
196 .try_lock()
197 .expect("could not lock subnet_context");
198 let SubnetContext {
199 rt_api,
200 subnet_notify,
201 rx_priority,
202 rx_regular,
203 fork_entries,
204 bound_subnets,
205 ..
206 } = &mut *subnet_context_locked;
207
208 process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)?;
209
210 let this_fork_entry = fork_entries
211 .get_mut(fork_address)
212 .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
213 let inbound_envelope_opt = if let Some(priority_message) =
214 this_fork_entry.inbox_priority.pop_front()
215 {
216 Some(priority_message.message)
217 } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
218 Some(regular_message.message)
219 } else {
220 None
221 };
222 (
223 inbound_envelope_opt,
224 subnet_notify.clone(),
225 this_fork_entry.fork_notifiy.clone(),
226 )
227 };
228
229 if let Some(inbound_envelope) = inbound_envelope_opt {
230 break Ok(inbound_envelope)
231 } else {
232 let subnet_notified = subnet_notify.notified();
233 let fork_notified = fork_notify.notified();
234
235 tokio::select! {
236 _ = subnet_notified => (),
237 _ = fork_notified => (),
238 }
239 }
240 }
241 }
242
243 async fn close(&mut self) {
244 todo!()
247 }
248
249 fn send(
250 &mut self,
251 envelope: Envelope,
252 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
253 std::future::ready(do_send(self, envelope))
254 }
255
256 fn forward(
257 &mut self,
258 to: Address,
259 envelope: Envelope,
260 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
261 std::future::ready(do_forward(self, to, envelope))
262 }
263}
264
265impl Bind<NetAddress> for ActorContext {
266 async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
267 use std::collections::btree_map::Entry::*;
268
269 let BindArgs {
270 bind_to,
271 inbox_size,
272 } = args;
273 let address_range = AddressRange::from(bind_to);
274
275 log::debug!(%bind_to, %inbox_size, "binding");
276
277 let Self {
278 fork_address,
279 subnet_context,
280 ..
281 } = self;
282
283 {
284 let mut subnet_context_locked = subnet_context
285 .try_lock()
286 .expect("could not lock subnet_context");
287
288 let SubnetContext {
289 rt_api,
290 subnet_mailbox_tx,
291 bound_subnets,
292 ..
293 } = &mut *subnet_context_locked;
294
295 let bound_subnet_entry = match bound_subnets.entry(address_range) {
296 Vacant(v) => v,
297 Occupied(o) => {
298 let previously_bound_to = NetAddress::from(*o.key());
299 return Err(ErrorOf::new(
300 BindErrorKind::Conflict,
301 format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
302 ))
303 },
304 };
305 let subnet_lease = Lease::trusted(bind_to);
306
307 let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
308 BindErrorKind::Closed,
309 "the actor subnet is probably unregistered",
310 ))?;
311 let bound_subnet_node =
312 registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
313
314 rt_api
315 .registry()
316 .register(bind_to, bound_subnet_node)
317 .map_err(|_| {
318 ErrorOf::new(
319 BindErrorKind::Conflict,
320 "could not register the subnet-node",
321 )
322 })?;
323
324 bound_subnet_entry.insert(*fork_address);
325 }
326
327 log::info!(%bind_to, %inbox_size, "bound");
328
329 Ok(())
330 }
331}
332
333impl Ping for ActorContext {
334 async fn ping(
335 &mut self,
336 address: Address,
337 timeout: Duration,
338 ) -> Result<Duration, ErrorOf<PingErrorKind>> {
339 let ping_id = rand::rng().next_u64();
340 let now = Instant::now();
341 let deadline = now.checked_add(timeout).unwrap_or(now);
342
343 let Self {
344 fork_address,
345 subnet_context,
346 ..
347 } = self;
348
349 let (subnet_notify, fork_notify) = {
350 let mut subnet_context_locked = subnet_context
351 .try_lock()
352 .expect("could not lock subnet_context");
353 let SubnetContext {
354 rt_api,
355 fork_entries,
356 subnet_notify,
357 ..
358 } = &mut *subnet_context_locked;
359 let ForkEntry { fork_notifiy, .. } = fork_entries
360 .get_mut(fork_address)
361 .expect("fork_entry missing");
362
363 let ping_msg = sys::Ping {
364 reply_to: Some(*fork_address),
365 id: ping_id,
366 };
367
368 let ping_header = EnvelopeHeader::to_address(address).with_priority(true);
369 let ping_envelope = Envelope::new(ping_header, ping_msg).into_erased();
370 rt_api
371 .send_to(address, true, ping_envelope)
372 .map_err(|e| ErrorOf::new(PingErrorKind::Send, e.to_string()))?;
373
374 (subnet_notify.clone(), fork_notifiy.clone())
375 };
376
377 loop {
378 let timeout = tokio::time::sleep_until(deadline);
379 tokio::select! {
380 _ = timeout => { return Err(ErrorOf::new(PingErrorKind::Timeout, "timeout elapsed")) },
381 _ = subnet_notify.notified() => (),
382 _ = fork_notify.notified() => (),
383 };
384
385 let mut subnet_context_locked = subnet_context
386 .try_lock()
387 .expect("could not lock subnet_context");
388 let SubnetContext {
389 rt_api,
390 fork_entries,
391 bound_subnets,
392 rx_priority,
393 rx_regular,
394 ..
395 } = &mut *subnet_context_locked;
396
397 process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)
398 .map_err(|e| ErrorOf::new(PingErrorKind::Recv, e.to_string()))?;
399
400 if fork_entries
401 .get(fork_address)
402 .expect("fork_entry_missing")
403 .last_ping_received
404 == Some(ping_id)
405 {
406 break
407 }
408 }
409 Ok(now.elapsed())
410 }
411}
412
413fn process_inlets(
414 rt_api: &RtApi,
415 bound_subnets: &BTreeMap<AddressRange, Address>,
416 rx_priority: &mut kanal::Receiver<MessageWithoutPermit<Envelope>>,
417 rx_regular: &mut kanal::Receiver<MessageWithPermit<Envelope>>,
418 fork_entries: &mut HashMap<Address, ForkEntry>,
419) -> Result<(), ErrorOf<RecvErrorKind>> {
420 let mut notified_forks = HashSet::new();
421 while let Some(message) = rx_priority
422 .try_recv_realtime()
423 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
424 {
425 let message_to = bound_subnets
426 .get(&AddressRange::from(message.to))
427 .copied()
428 .unwrap_or(message.to);
429 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
430 warn!(dst = %message_to, "no such fork");
431 continue
432 };
433 let should_notify = if let Some(ping_message) = message.message.peek::<sys::Ping>() {
434 let sys::Ping { reply_to, id } = *ping_message;
435 if let Some(reply_to) = reply_to {
436 trace!(%id, %reply_to, "received a ping request");
437 let pong_header = EnvelopeHeader::to_address(reply_to).with_priority(true);
438 let pong_envelope =
439 Envelope::new(pong_header, sys::Ping { reply_to: None, id }).into_erased();
440 rt_api
441 .send_to(reply_to, true, pong_envelope)
442 .inspect_err(
443 |e| warn!(reason = %e.as_display_chain(), "can't send ping-response"),
444 )
445 .ok();
446 trace!(%id, %reply_to, "send a ping response");
447 false
448 } else {
449 trace!(%id, "received a ping response");
450 fork_entry.last_ping_received = Some(id);
451 true
452 }
453 } else {
454 fork_entry.inbox_priority.push_back(message);
455 true
456 };
457
458 if should_notify && notified_forks.insert(message_to) {
459 trace!(dst = %message_to, "notifying fork");
460 fork_entry.fork_notifiy.notify_one();
461 }
462 }
463
464 while let Some(message) = rx_regular
465 .try_recv_realtime()
466 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
467 {
468 let message_to = bound_subnets
469 .get(&AddressRange::from(message.to))
470 .copied()
471 .unwrap_or(message.to);
472 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
473 warn!(dst = %message_to, "no such fork");
474 continue
475 };
476 fork_entry.inbox_regular.push_back(message);
477 trace!(dst = %message_to, "subnet received regular message");
478 if notified_forks.insert(message_to) {
479 trace!(dst = %message_to, "notifying fork");
480 fork_entry.fork_notifiy.notify_one();
481 }
482 }
483
484 Ok(())
485}
486
487async fn do_start(
488 context: &mut ActorContext,
489 runnable: BoxedRunnable<ActorContext>,
490 link: bool,
491 timeout: Duration,
492) -> Result<Address, ErrorOf<sys::StartErrorKind>> {
493 let this_address = context.fork_address;
494
495 let mut fork = context
496 .fork()
497 .await
498 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
499
500 let fork_address = fork.fork_address;
501 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
502 .await
503 .map_err(|e| e.map_kind(sys::StartErrorKind::Spawn))?;
504
505 let envelope = match fork.recv().timeout(timeout).await {
506 Err(_elapsed) => {
507 do_kill(context, spawned_address)
508 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
509
510 return Err(ErrorOf::new(
513 sys::StartErrorKind::Timeout,
514 "no init-ack within timeout",
515 ))
516 },
517 Ok(recv_result) => {
518 recv_result
519 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?
520 },
521 };
522
523 dispatch!(match envelope {
524 sys::InitAck { address } => {
525 if link {
526 do_link(context, this_address, address).await;
527 }
528 Ok(address)
529 },
530
531 sys::Exited { .. } => {
532 Err(ErrorOf::new(
533 sys::StartErrorKind::Exited,
534 "exited before init-ack",
535 ))
536 },
537
538 unexpected @ _ => {
539 Err(ErrorOf::new(
540 sys::StartErrorKind::InternalError,
541 format!("unexpected message: {unexpected:?}"),
542 ))
543 },
544 })
545}
546
547async fn do_spawn(
548 context: &mut ActorContext,
549 runnable: BoxedRunnable<ActorContext>,
550 ack_to: Option<Address>,
551 link_to: impl IntoIterator<Item = Address>,
552) -> Result<Address, ErrorOf<sys::SpawnErrorKind>> {
553 let ActorContext { subnet_context, .. } = context;
554
555 let (actor_key, rt_config, rt_api, tx_actor_failure) = {
556 let subnet_context_locked = subnet_context
557 .try_lock()
558 .expect("could not lock subnet_context");
559 let SubnetContext {
560 rt_api,
561 rt_config,
562 actor_key,
563 tx_actor_failure,
564 ..
565 } = &*subnet_context_locked;
566
567 (
568 actor_key.child(runnable.func_name()),
569 rt_config.clone(),
570 rt_api.clone(),
571 tx_actor_failure.clone(),
572 )
573 };
574
575 let actor_config = rt_config.actor_config(&actor_key);
576 let execute_on = rt_api.choose_executor(actor_config.runtime_key());
577
578 trace!(?ack_to, "starting");
579
580 let subnet_lease = rt_api
581 .request_address(actor_config.netmask())
582 .await
583 .inspect_err(|e| log::error!("lease-error: {}", e))
584 .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::ResourceConstraint, e.to_string()))?;
585
586 trace!(subnet_lease = %subnet_lease.net_address(), "subnet leased");
587
588 let rt_api = rt_api.clone();
589 let rt_config = rt_config.clone();
590 let container = container::Container::create(
591 container::ContainerArgs {
592 ack_to,
593 link_to: link_to.into_iter().collect(),
595 actor_key,
596 trace_id: TraceId::current(),
597
598 subnet_lease,
599 rt_api,
600 rt_config,
601 },
602 runnable,
603 tx_actor_failure.clone(),
604 )
605 .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::InternalError, e.to_string()))?;
606 let actor_address = container.actor_address();
607
608 trace!(spawned_address = %actor_address, "about to run spawned actor");
609
610 let tx_actor_failure = tx_actor_failure.clone();
611 let _join_handle = execute_on.spawn(async move {
613 match container.run().await {
614 Ok(Ok(())) => (),
615 Ok(Err(actor_failure)) => {
616 let _ = tx_actor_failure.send((actor_address, actor_failure));
617 },
618 Err(container_failure) => {
619 let report = AnyError::from(container_failure);
620 log::error!(
621 err = %report.as_display_chain(), %actor_address,
622 "actor container failure"
623 );
624 },
625 }
626 });
627
628 Ok(actor_address)
629}
630
631fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
632 let ActorContext { subnet_context, .. } = context;
633 let subnet_context_locked = subnet_context
634 .try_lock()
635 .expect("could not lock subnet_context");
636 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
637
638 rt_api.sys_send(
639 peer,
640 SysMsg::Link(SysLink::Exit {
641 sender: this,
642 receiver: peer,
643 reason: ExitReason::Terminate,
644 }),
645 )
646}
647
648fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
649 let ActorContext { subnet_context, .. } = context;
650 let subnet_context_locked = subnet_context
651 .try_lock()
652 .expect("could not lock subnet_context");
653 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
654
655 rt_api.sys_send(peer, SysMsg::Kill)
656}
657
658async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
659 let ActorContext { call, .. } = context;
660
661 call.invoke(SysCall::Link {
662 sender: this,
663 receiver: peer,
664 })
665 .await
666}
667
668async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
669 let ActorContext { call, .. } = context;
670 call.invoke(SysCall::Unlink {
671 sender: this,
672 receiver: peer,
673 })
674 .await
675}
676
677async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
678 let ActorContext { call, .. } = context;
679 call.invoke(SysCall::TrapExit(enable)).await;
680}
681
682async fn do_watch(context: &mut ActorContext, peer: Address) -> sys::WatchRef {
683 let ActorContext {
684 fork_address: this,
685 call,
686 ..
687 } = context;
688 let (reply_tx, reply_rx) = oneshot::channel();
689 call.invoke(SysCall::Watch {
690 sender: *this,
691 receiver: peer,
692 reply_tx,
693 })
694 .await;
695 reply_rx.await.expect("sys-call remained unanswered")
696}
697
698async fn do_unwatch(context: &mut ActorContext, watch_ref: sys::WatchRef) {
699 let ActorContext {
700 fork_address: this,
701 call,
702 ..
703 } = context;
704 call.invoke(SysCall::Unwatch {
705 sender: *this,
706 watch_ref,
707 })
708 .await;
709}
710
711async fn do_init_done(context: &mut ActorContext, address: Address) {
712 let ActorContext {
713 ack_to,
714 subnet_context,
715 ..
716 } = context;
717 let message = sys::InitAck { address };
718 let Some(ack_to_address) = ack_to.take() else {
719 return;
720 };
721 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
722 let subnet_context_locked = subnet_context
723 .try_lock()
724 .expect("could not lock subnet_context");
725 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
726 let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
727}
728
729fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
730 trace!(envelope = ?outbound, "sending");
731 let ActorContext { subnet_context, .. } = context;
732 let subnet_context_locked = subnet_context
733 .try_lock()
734 .expect("could not lock subnet_context");
735 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
736 rt_api
737 .send_to(outbound.header().to, outbound.header().priority, outbound)
738 .map_err(|k| ErrorOf::new(k, ""))
739}
740
741fn do_forward(
742 context: &mut ActorContext,
743 to: Address,
744 outbound: Envelope,
745) -> Result<(), ErrorOf<SendErrorKind>> {
746 trace!(forward_to = %to, envelope = ?outbound, "forwarding");
747 let ActorContext { subnet_context, .. } = context;
748 let subnet_context_locked = subnet_context
749 .try_lock()
750 .expect("could not lock subnet_context");
751 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
752 rt_api
753 .send_to(to, outbound.header().priority, outbound)
754 .map_err(|k| ErrorOf::new(k, ""))
755}