1use std::collections::HashSet;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::error_of::ErrorOf;
10use mm1_common::futures::timeout::FutureTimeoutExt;
11use mm1_common::log;
12use mm1_common::types::{AnyError, Never};
13use mm1_core::context::{
14 Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit,
15 RecvErrorKind, SendErrorKind, Start, Stop, Watching,
16};
17use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
18use mm1_core::tracing::{TraceId, WithTraceIdExt};
19use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
20use mm1_runnable::local::BoxedRunnable;
21use tokio::sync::oneshot;
22use tokio::time::Instant;
23use tracing::{trace, warn};
24
25use crate::config::EffectiveActorConfig;
26use crate::registry;
27use crate::runtime::container;
28use crate::runtime::context::{ActorContext, SubnetContext};
29use crate::runtime::sys_call::SysCall;
30use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
31
32impl Quit for ActorContext {
33 async fn quit_ok(&mut self) -> Never {
34 self.call.invoke(SysCall::Exit(Ok(()))).await;
35 std::future::pending().await
36 }
37
38 async fn quit_err<E>(&mut self, reason: E) -> Never
39 where
40 E: std::error::Error + Send + Sync + 'static,
41 {
42 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
43 std::future::pending().await
44 }
45}
46
47impl Fork for ActorContext {
48 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
49 let Self {
50 fork_address: this_address,
51 call,
52 subnet_context,
53 ..
54 } = self;
55
56 let fork_lease = {
57 let mut subnet_context_locked = subnet_context
58 .try_lock()
59 .expect("could not lock subnet_context");
60 let SubnetContext {
61 subnet_pool,
62 fork_entries,
63 ..
64 } = &mut *subnet_context_locked;
65 let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
66 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
67 })?;
68 let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
69 assert!(should_be_none.is_none());
70 fork_lease
71 };
72 let fork_address = fork_lease.address;
73 trace!("forking {} -> {}", this_address, fork_address);
74
75 call.invoke(SysCall::ForkAdded(fork_address)).await;
76
77 let context = Self {
78 subnet_context: subnet_context.clone(),
79 fork_address,
80 fork_lease: Some(fork_lease),
81 ack_to: None,
82 call: call.clone(),
83 };
84
85 Ok(context)
86 }
87
88 async fn run<F, Fut>(self, fun: F)
89 where
90 F: FnOnce(Self) -> Fut,
91 F: Send + 'static,
92 Fut: Future + Send + 'static,
93 {
94 let call = self.call.clone();
95 let fut = fun(self)
96 .map(|_| ())
97 .with_trace_id(TraceId::current())
98 .boxed();
99 call.invoke(SysCall::Spawn(fut)).await;
100 }
101}
102
103impl Now for ActorContext {
104 type Instant = Instant;
105
106 fn now(&self) -> Self::Instant {
107 Instant::now()
108 }
109}
110
111impl Start<BoxedRunnable<Self>> for ActorContext {
112 fn spawn(
113 &mut self,
114 runnable: BoxedRunnable<Self>,
115 link: bool,
116 ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
117 do_spawn(self, runnable, None, link.then_some(self.fork_address))
118 }
119
120 fn start(
121 &mut self,
122 runnable: BoxedRunnable<Self>,
123 link: bool,
124 start_timeout: Duration,
125 ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
126 do_start(self, runnable, link, start_timeout)
127 }
128}
129
130impl Stop for ActorContext {
131 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
132 let this = self.fork_address;
133 let out = do_exit(self, this, peer).is_ok();
134 std::future::ready(out)
135 }
136
137 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
138 let out = do_kill(self, peer).is_ok();
139 std::future::ready(out)
140 }
141}
142
143impl Linking for ActorContext {
144 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
145 let this = self.fork_address;
146 do_link(self, this, peer)
147 }
148
149 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
150 let this = self.fork_address;
151 do_unlink(self, this, peer)
152 }
153
154 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
155 do_set_trap_exit(self, enable)
156 }
157}
158
159impl Watching for ActorContext {
160 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
161 do_watch(self, peer)
162 }
163
164 fn unwatch(
165 &mut self,
166 watch_ref: mm1_proto_system::WatchRef,
167 ) -> impl Future<Output = ()> + Send {
168 do_unwatch(self, watch_ref)
169 }
170}
171
172impl InitDone for ActorContext {
173 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
174 do_init_done(self, address)
175 }
176}
177
178impl Messaging for ActorContext {
179 fn address(&self) -> Address {
180 self.fork_address
181 }
182
183 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
184 let ActorContext {
185 fork_address,
186 subnet_context,
187 ..
188 } = self;
189
190 loop {
191 let (inbound_envelope_opt, subnet_notify, fork_notify) = {
192 let mut subnet_context_locked = subnet_context
193 .try_lock()
194 .expect("could not lock subnet_context");
195 let SubnetContext {
196 subnet_notify,
197 rx_priority,
198 rx_regular,
199 fork_entries,
200 bound_subnets,
201 ..
202 } = &mut *subnet_context_locked;
203
204 let mut notified_forks = HashSet::new();
205 while let Some(message) = rx_priority
206 .try_recv_realtime()
207 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
208 {
209 let message_to = bound_subnets
210 .get(&AddressRange::from(message.to))
211 .copied()
212 .unwrap_or(message.to);
213 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
214 warn!("no such fork [dst: {}]", message_to);
215 continue
216 };
217 fork_entry.inbox_priority.push_back(message);
218 trace!("subnet received priority message [dst: {}]", message_to);
219 if notified_forks.insert(message_to) {
220 trace!("notifying fork [dst: {}]", message_to);
221 fork_entry.fork_notifiy.notify_one();
222 }
223 }
224 while let Some(message) = rx_regular
225 .try_recv_realtime()
226 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
227 {
228 let message_to = bound_subnets
229 .get(&AddressRange::from(message.to))
230 .copied()
231 .unwrap_or(message.to);
232 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
233 warn!("no such fork [dst: {}]", message_to);
234 continue
235 };
236 fork_entry.inbox_regular.push_back(message);
237 trace!("subnet received regular message [dst: {}]", message_to);
238 if notified_forks.insert(message_to) {
239 trace!("notifying fork [dst: {}]", message_to);
240 fork_entry.fork_notifiy.notify_one();
241 }
242 }
243
244 let this_fork_entry = fork_entries
245 .get_mut(fork_address)
246 .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
247 let inbound_envelope_opt = if let Some(priority_message) =
248 this_fork_entry.inbox_priority.pop_front()
249 {
250 Some(priority_message.message)
251 } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
252 Some(regular_message.message)
253 } else {
254 None
255 };
256 (
257 inbound_envelope_opt,
258 subnet_notify.clone(),
259 this_fork_entry.fork_notifiy.clone(),
260 )
261 };
262
263 if let Some(inbound_envelope) = inbound_envelope_opt {
264 break Ok(inbound_envelope)
265 } else {
266 let subnet_notified = subnet_notify.notified();
267 let fork_notified = fork_notify.notified();
268
269 tokio::select! {
270 _ = subnet_notified => (),
271 _ = fork_notified => (),
272 }
273 }
274 }
275 }
276
277 async fn close(&mut self) {
278 todo!()
281 }
282
283 fn send(
284 &mut self,
285 envelope: Envelope,
286 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
287 std::future::ready(do_send(self, envelope))
288 }
289
290 fn forward(
291 &mut self,
292 to: Address,
293 envelope: Envelope,
294 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
295 std::future::ready(do_forward(self, to, envelope))
296 }
297}
298
299impl Bind<NetAddress> for ActorContext {
300 async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
301 use std::collections::btree_map::Entry::*;
302
303 let BindArgs {
304 bind_to,
305 inbox_size,
306 } = args;
307 let address_range = AddressRange::from(bind_to);
308
309 log::debug!("binding [to: {}; inbox-size: {}]", bind_to, inbox_size);
310
311 let Self {
312 fork_address,
313 subnet_context,
314 ..
315 } = self;
316
317 {
318 let mut subnet_context_locked = subnet_context
319 .try_lock()
320 .expect("could not lock subnet_context");
321
322 let SubnetContext {
323 rt_api,
324 subnet_mailbox_tx,
325 bound_subnets,
326 ..
327 } = &mut *subnet_context_locked;
328
329 let bound_subnet_entry = match bound_subnets.entry(address_range) {
330 Vacant(v) => v,
331 Occupied(o) => {
332 let previously_bound_to = NetAddress::from(*o.key());
333 return Err(ErrorOf::new(
334 BindErrorKind::Conflict,
335 format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
336 ))
337 },
338 };
339 let subnet_lease = Lease::trusted(bind_to);
340
341 let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
342 BindErrorKind::Closed,
343 "the actor subnet is probably unregistered",
344 ))?;
345 let bound_subnet_node =
346 registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
347
348 rt_api
349 .registry()
350 .register(bind_to, bound_subnet_node)
351 .map_err(|_| {
352 ErrorOf::new(
353 BindErrorKind::Conflict,
354 "could not register the subnet-node",
355 )
356 })?;
357
358 bound_subnet_entry.insert(*fork_address);
359 }
360
361 log::info!("bound [to: {}; inbox-size: {}]", bind_to, inbox_size);
362
363 Ok(())
364 }
365}
366
367async fn do_start(
368 context: &mut ActorContext,
369 runnable: BoxedRunnable<ActorContext>,
370 link: bool,
371 timeout: Duration,
372) -> Result<Address, ErrorOf<StartErrorKind>> {
373 let this_address = context.fork_address;
374
375 let mut fork = context
376 .fork()
377 .await
378 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
379
380 let fork_address = fork.fork_address;
381 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
382 .await
383 .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
384
385 let envelope = match fork.recv().timeout(timeout).await {
386 Err(_elapsed) => {
387 do_kill(context, spawned_address)
388 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
389
390 return Err(ErrorOf::new(
393 StartErrorKind::Timeout,
394 "no init-ack within timeout",
395 ))
396 },
397 Ok(recv_result) => {
398 recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
399 },
400 };
401
402 dispatch!(match envelope {
403 mm1_proto_system::InitAck { address } => {
404 if link {
405 do_link(context, this_address, address).await;
406 }
407 Ok(address)
408 },
409
410 mm1_proto_system::Exited { .. } => {
411 Err(ErrorOf::new(
412 StartErrorKind::Exited,
413 "exited before init-ack",
414 ))
415 },
416
417 unexpected @ _ => {
418 Err(ErrorOf::new(
419 StartErrorKind::InternalError,
420 format!("unexpected message: {unexpected:?}"),
421 ))
422 },
423 })
424}
425
426async fn do_spawn(
427 context: &mut ActorContext,
428 runnable: BoxedRunnable<ActorContext>,
429 ack_to: Option<Address>,
430 link_to: impl IntoIterator<Item = Address>,
431) -> Result<Address, ErrorOf<SpawnErrorKind>> {
432 let ActorContext { subnet_context, .. } = context;
433
434 let (actor_key, rt_config, rt_api, tx_actor_failure) = {
435 let subnet_context_locked = subnet_context
436 .try_lock()
437 .expect("could not lock subnet_context");
438 let SubnetContext {
439 rt_api,
440 rt_config,
441 actor_key,
442 tx_actor_failure,
443 ..
444 } = &*subnet_context_locked;
445
446 (
447 actor_key.child(runnable.func_name()),
448 rt_config.clone(),
449 rt_api.clone(),
450 tx_actor_failure.clone(),
451 )
452 };
453
454 let actor_config = rt_config.actor_config(&actor_key);
455 let execute_on = rt_api.choose_executor(actor_config.runtime_key());
456
457 trace!("starting [ack-to: {:?}]", ack_to);
458
459 let subnet_lease = rt_api
460 .request_address(actor_config.netmask())
461 .await
462 .inspect_err(|e| log::error!("lease-error: {}", e))
463 .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
464
465 trace!("starting [subnet-lease: {}]", subnet_lease.net_address());
466
467 let rt_api = rt_api.clone();
468 let rt_config = rt_config.clone();
469 let container = container::Container::create(
470 container::ContainerArgs {
471 ack_to,
472 link_to: link_to.into_iter().collect(),
474 actor_key,
475 trace_id: TraceId::current(),
476
477 subnet_lease,
478 rt_api,
479 rt_config,
480 },
481 runnable,
482 tx_actor_failure.clone(),
483 )
484 .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
485 let actor_address = container.actor_address();
486
487 trace!("actor-address: {}", actor_address);
488
489 let tx_actor_failure = tx_actor_failure.clone();
490 let _join_handle = execute_on.spawn(async move {
492 match container.run().await {
493 Ok(Ok(())) => (),
494 Ok(Err(actor_failure)) => {
495 let _ = tx_actor_failure.send((actor_address, actor_failure));
496 },
497 Err(container_failure) => {
498 let report = AnyError::from(container_failure);
499 mm1_common::log::error!(
500 "actor container failure [addr: {}]: {}",
501 actor_address,
502 report
503 .chain()
504 .map(|e| e.to_string())
505 .collect::<Vec<_>>()
506 .join(" <- ")
507 );
508 },
509 }
510 });
511
512 Ok(actor_address)
513}
514
515fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
516 let ActorContext { subnet_context, .. } = context;
517 let subnet_context_locked = subnet_context
518 .try_lock()
519 .expect("could not lock subnet_context");
520 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
521
522 rt_api.sys_send(
523 peer,
524 SysMsg::Link(SysLink::Exit {
525 sender: this,
526 receiver: peer,
527 reason: ExitReason::Terminate,
528 }),
529 )
530}
531
532fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
533 let ActorContext { subnet_context, .. } = context;
534 let subnet_context_locked = subnet_context
535 .try_lock()
536 .expect("could not lock subnet_context");
537 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
538
539 rt_api.sys_send(peer, SysMsg::Kill)
540}
541
542async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
543 let ActorContext { call, .. } = context;
544
545 call.invoke(SysCall::Link {
546 sender: this,
547 receiver: peer,
548 })
549 .await
550}
551
552async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
553 let ActorContext { call, .. } = context;
554 call.invoke(SysCall::Unlink {
555 sender: this,
556 receiver: peer,
557 })
558 .await
559}
560
561async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
562 let ActorContext { call, .. } = context;
563 call.invoke(SysCall::TrapExit(enable)).await;
564}
565
566async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
567 let ActorContext {
568 fork_address: this,
569 call,
570 ..
571 } = context;
572 let (reply_tx, reply_rx) = oneshot::channel();
573 call.invoke(SysCall::Watch {
574 sender: *this,
575 receiver: peer,
576 reply_tx,
577 })
578 .await;
579 reply_rx.await.expect("sys-call remained unanswered")
580}
581
582async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
583 let ActorContext {
584 fork_address: this,
585 call,
586 ..
587 } = context;
588 call.invoke(SysCall::Unwatch {
589 sender: *this,
590 watch_ref,
591 })
592 .await;
593}
594
595async fn do_init_done(context: &mut ActorContext, address: Address) {
596 let ActorContext {
597 ack_to,
598 subnet_context,
599 ..
600 } = context;
601 let message = InitAck { address };
602 let Some(ack_to_address) = ack_to.take() else {
603 return;
604 };
605 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
606 let subnet_context_locked = subnet_context
607 .try_lock()
608 .expect("could not lock subnet_context");
609 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
610 let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
611}
612
613fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
614 trace!("sending [outbound: {:?}]", outbound);
615 let ActorContext { subnet_context, .. } = context;
616 let subnet_context_locked = subnet_context
617 .try_lock()
618 .expect("could not lock subnet_context");
619 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
620 rt_api
621 .send_to(outbound.header().to, outbound.header().priority, outbound)
622 .map_err(|k| ErrorOf::new(k, ""))
623}
624
625fn do_forward(
626 context: &mut ActorContext,
627 to: Address,
628 outbound: Envelope,
629) -> Result<(), ErrorOf<SendErrorKind>> {
630 trace!("forwarding [to: {}, outbound: {:?}]", to, outbound);
631 let ActorContext { subnet_context, .. } = context;
632 let subnet_context_locked = subnet_context
633 .try_lock()
634 .expect("could not lock subnet_context");
635 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
636 rt_api
637 .send_to(to, outbound.header().priority, outbound)
638 .map_err(|k| ErrorOf::new(k, ""))
639}