mm1_node/runtime/context/
impl_context_api.rs

1use std::collections::HashSet;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::error_of::ErrorOf;
10use mm1_common::futures::timeout::FutureTimeoutExt;
11use mm1_common::log;
12use mm1_common::types::{AnyError, Never};
13use mm1_core::context::{
14    Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit,
15    RecvErrorKind, SendErrorKind, Start, Stop, Watching,
16};
17use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
18use mm1_core::tracing::{TraceId, WithTraceIdExt};
19use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
20use mm1_runnable::local::BoxedRunnable;
21use tokio::sync::oneshot;
22use tokio::time::Instant;
23use tracing::{trace, warn};
24
25use crate::config::EffectiveActorConfig;
26use crate::registry;
27use crate::runtime::container;
28use crate::runtime::context::{ActorContext, SubnetContext};
29use crate::runtime::sys_call::SysCall;
30use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
31
32impl Quit for ActorContext {
33    async fn quit_ok(&mut self) -> Never {
34        self.call.invoke(SysCall::Exit(Ok(()))).await;
35        std::future::pending().await
36    }
37
38    async fn quit_err<E>(&mut self, reason: E) -> Never
39    where
40        E: std::error::Error + Send + Sync + 'static,
41    {
42        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
43        std::future::pending().await
44    }
45}
46
47impl Fork for ActorContext {
48    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
49        let Self {
50            fork_address: this_address,
51            call,
52            subnet_context,
53            ..
54        } = self;
55
56        let fork_lease = {
57            let mut subnet_context_locked = subnet_context
58                .try_lock()
59                .expect("could not lock subnet_context");
60            let SubnetContext {
61                subnet_pool,
62                fork_entries,
63                ..
64            } = &mut *subnet_context_locked;
65            let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
66                ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
67            })?;
68            let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
69            assert!(should_be_none.is_none());
70            fork_lease
71        };
72        let fork_address = fork_lease.address;
73        trace!("forking {} -> {}", this_address, fork_address);
74
75        call.invoke(SysCall::ForkAdded(fork_address)).await;
76
77        let context = Self {
78            subnet_context: subnet_context.clone(),
79            fork_address,
80            fork_lease: Some(fork_lease),
81            ack_to: None,
82            call: call.clone(),
83        };
84
85        Ok(context)
86    }
87
88    async fn run<F, Fut>(self, fun: F)
89    where
90        F: FnOnce(Self) -> Fut,
91        F: Send + 'static,
92        Fut: Future + Send + 'static,
93    {
94        let call = self.call.clone();
95        let fut = fun(self)
96            .map(|_| ())
97            .with_trace_id(TraceId::current())
98            .boxed();
99        call.invoke(SysCall::Spawn(fut)).await;
100    }
101}
102
103impl Now for ActorContext {
104    type Instant = Instant;
105
106    fn now(&self) -> Self::Instant {
107        Instant::now()
108    }
109}
110
111impl Start<BoxedRunnable<Self>> for ActorContext {
112    fn spawn(
113        &mut self,
114        runnable: BoxedRunnable<Self>,
115        link: bool,
116    ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
117        do_spawn(self, runnable, None, link.then_some(self.fork_address))
118    }
119
120    fn start(
121        &mut self,
122        runnable: BoxedRunnable<Self>,
123        link: bool,
124        start_timeout: Duration,
125    ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
126        do_start(self, runnable, link, start_timeout)
127    }
128}
129
130impl Stop for ActorContext {
131    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
132        let this = self.fork_address;
133        let out = do_exit(self, this, peer).is_ok();
134        std::future::ready(out)
135    }
136
137    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
138        let out = do_kill(self, peer).is_ok();
139        std::future::ready(out)
140    }
141}
142
143impl Linking for ActorContext {
144    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
145        let this = self.fork_address;
146        do_link(self, this, peer)
147    }
148
149    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
150        let this = self.fork_address;
151        do_unlink(self, this, peer)
152    }
153
154    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
155        do_set_trap_exit(self, enable)
156    }
157}
158
159impl Watching for ActorContext {
160    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
161        do_watch(self, peer)
162    }
163
164    fn unwatch(
165        &mut self,
166        watch_ref: mm1_proto_system::WatchRef,
167    ) -> impl Future<Output = ()> + Send {
168        do_unwatch(self, watch_ref)
169    }
170}
171
172impl InitDone for ActorContext {
173    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
174        do_init_done(self, address)
175    }
176}
177
178impl Messaging for ActorContext {
179    fn address(&self) -> Address {
180        self.fork_address
181    }
182
183    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
184        let ActorContext {
185            fork_address,
186            subnet_context,
187            ..
188        } = self;
189
190        loop {
191            let (inbound_envelope_opt, subnet_notify, fork_notify) = {
192                let mut subnet_context_locked = subnet_context
193                    .try_lock()
194                    .expect("could not lock subnet_context");
195                let SubnetContext {
196                    subnet_notify,
197                    rx_priority,
198                    rx_regular,
199                    fork_entries,
200                    bound_subnets,
201                    ..
202                } = &mut *subnet_context_locked;
203
204                let mut notified_forks = HashSet::new();
205                while let Some(message) = rx_priority
206                    .try_recv_realtime()
207                    .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
208                {
209                    let message_to = bound_subnets
210                        .get(&AddressRange::from(message.to))
211                        .copied()
212                        .unwrap_or(message.to);
213                    let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
214                        warn!("no such fork [dst: {}]", message_to);
215                        continue
216                    };
217                    fork_entry.inbox_priority.push_back(message);
218                    trace!("subnet received priority message [dst: {}]", message_to);
219                    if notified_forks.insert(message_to) {
220                        trace!("notifying fork [dst: {}]", message_to);
221                        fork_entry.fork_notifiy.notify_one();
222                    }
223                }
224                while let Some(message) = rx_regular
225                    .try_recv_realtime()
226                    .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
227                {
228                    let message_to = bound_subnets
229                        .get(&AddressRange::from(message.to))
230                        .copied()
231                        .unwrap_or(message.to);
232                    let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
233                        warn!("no such fork [dst: {}]", message_to);
234                        continue
235                    };
236                    fork_entry.inbox_regular.push_back(message);
237                    trace!("subnet received regular message [dst: {}]", message_to);
238                    if notified_forks.insert(message_to) {
239                        trace!("notifying fork [dst: {}]", message_to);
240                        fork_entry.fork_notifiy.notify_one();
241                    }
242                }
243
244                let this_fork_entry = fork_entries
245                    .get_mut(fork_address)
246                    .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
247                let inbound_envelope_opt = if let Some(priority_message) =
248                    this_fork_entry.inbox_priority.pop_front()
249                {
250                    Some(priority_message.message)
251                } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
252                    Some(regular_message.message)
253                } else {
254                    None
255                };
256                (
257                    inbound_envelope_opt,
258                    subnet_notify.clone(),
259                    this_fork_entry.fork_notifiy.clone(),
260                )
261            };
262
263            if let Some(inbound_envelope) = inbound_envelope_opt {
264                break Ok(inbound_envelope)
265            } else {
266                let subnet_notified = subnet_notify.notified();
267                let fork_notified = fork_notify.notified();
268
269                tokio::select! {
270                    _ = subnet_notified => (),
271                    _ = fork_notified => (),
272                }
273            }
274        }
275    }
276
277    async fn close(&mut self) {
278        // self.rx_regular.close();
279        // self.rx_priority.close();
280        todo!()
281    }
282
283    fn send(
284        &mut self,
285        envelope: Envelope,
286    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
287        std::future::ready(do_send(self, envelope))
288    }
289
290    fn forward(
291        &mut self,
292        to: Address,
293        envelope: Envelope,
294    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
295        std::future::ready(do_forward(self, to, envelope))
296    }
297}
298
299impl Bind<NetAddress> for ActorContext {
300    async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
301        use std::collections::btree_map::Entry::*;
302
303        let BindArgs {
304            bind_to,
305            inbox_size,
306        } = args;
307        let address_range = AddressRange::from(bind_to);
308
309        log::debug!("binding [to: {}; inbox-size: {}]", bind_to, inbox_size);
310
311        let Self {
312            fork_address,
313            subnet_context,
314            ..
315        } = self;
316
317        {
318            let mut subnet_context_locked = subnet_context
319                .try_lock()
320                .expect("could not lock subnet_context");
321
322            let SubnetContext {
323                rt_api,
324                subnet_mailbox_tx,
325                bound_subnets,
326                ..
327            } = &mut *subnet_context_locked;
328
329            let bound_subnet_entry = match bound_subnets.entry(address_range) {
330                Vacant(v) => v,
331                Occupied(o) => {
332                    let previously_bound_to = NetAddress::from(*o.key());
333                    return Err(ErrorOf::new(
334                        BindErrorKind::Conflict,
335                        format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
336                    ))
337                },
338            };
339            let subnet_lease = Lease::trusted(bind_to);
340
341            let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
342                BindErrorKind::Closed,
343                "the actor subnet is probably unregistered",
344            ))?;
345            let bound_subnet_node =
346                registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
347
348            rt_api
349                .registry()
350                .register(bind_to, bound_subnet_node)
351                .map_err(|_| {
352                    ErrorOf::new(
353                        BindErrorKind::Conflict,
354                        "could not register the subnet-node",
355                    )
356                })?;
357
358            bound_subnet_entry.insert(*fork_address);
359        }
360
361        log::info!("bound [to: {}; inbox-size: {}]", bind_to, inbox_size);
362
363        Ok(())
364    }
365}
366
367async fn do_start(
368    context: &mut ActorContext,
369    runnable: BoxedRunnable<ActorContext>,
370    link: bool,
371    timeout: Duration,
372) -> Result<Address, ErrorOf<StartErrorKind>> {
373    let this_address = context.fork_address;
374
375    let mut fork = context
376        .fork()
377        .await
378        .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
379
380    let fork_address = fork.fork_address;
381    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
382        .await
383        .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
384
385    let envelope = match fork.recv().timeout(timeout).await {
386        Err(_elapsed) => {
387            do_kill(context, spawned_address)
388                .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
389
390            // TODO: should we ensure termination with a `system::Watch`?
391
392            return Err(ErrorOf::new(
393                StartErrorKind::Timeout,
394                "no init-ack within timeout",
395            ))
396        },
397        Ok(recv_result) => {
398            recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
399        },
400    };
401
402    dispatch!(match envelope {
403        mm1_proto_system::InitAck { address } => {
404            if link {
405                do_link(context, this_address, address).await;
406            }
407            Ok(address)
408        },
409
410        mm1_proto_system::Exited { .. } => {
411            Err(ErrorOf::new(
412                StartErrorKind::Exited,
413                "exited before init-ack",
414            ))
415        },
416
417        unexpected @ _ => {
418            Err(ErrorOf::new(
419                StartErrorKind::InternalError,
420                format!("unexpected message: {unexpected:?}"),
421            ))
422        },
423    })
424}
425
426async fn do_spawn(
427    context: &mut ActorContext,
428    runnable: BoxedRunnable<ActorContext>,
429    ack_to: Option<Address>,
430    link_to: impl IntoIterator<Item = Address>,
431) -> Result<Address, ErrorOf<SpawnErrorKind>> {
432    let ActorContext { subnet_context, .. } = context;
433
434    let (actor_key, rt_config, rt_api, tx_actor_failure) = {
435        let subnet_context_locked = subnet_context
436            .try_lock()
437            .expect("could not lock subnet_context");
438        let SubnetContext {
439            rt_api,
440            rt_config,
441            actor_key,
442            tx_actor_failure,
443            ..
444        } = &*subnet_context_locked;
445
446        (
447            actor_key.child(runnable.func_name()),
448            rt_config.clone(),
449            rt_api.clone(),
450            tx_actor_failure.clone(),
451        )
452    };
453
454    let actor_config = rt_config.actor_config(&actor_key);
455    let execute_on = rt_api.choose_executor(actor_config.runtime_key());
456
457    trace!("starting [ack-to: {:?}]", ack_to);
458
459    let subnet_lease = rt_api
460        .request_address(actor_config.netmask())
461        .await
462        .inspect_err(|e| log::error!("lease-error: {}", e))
463        .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
464
465    trace!("starting [subnet-lease: {}]", subnet_lease.net_address());
466
467    let rt_api = rt_api.clone();
468    let rt_config = rt_config.clone();
469    let container = container::Container::create(
470        container::ContainerArgs {
471            ack_to,
472            // FIXME: can we make it IntoIterator too?
473            link_to: link_to.into_iter().collect(),
474            actor_key,
475            trace_id: TraceId::current(),
476
477            subnet_lease,
478            rt_api,
479            rt_config,
480        },
481        runnable,
482        tx_actor_failure.clone(),
483    )
484    .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
485    let actor_address = container.actor_address();
486
487    trace!("actor-address: {}", actor_address);
488
489    let tx_actor_failure = tx_actor_failure.clone();
490    // TODO: maybe keep it somewhere too?
491    let _join_handle = execute_on.spawn(async move {
492        match container.run().await {
493            Ok(Ok(())) => (),
494            Ok(Err(actor_failure)) => {
495                let _ = tx_actor_failure.send((actor_address, actor_failure));
496            },
497            Err(container_failure) => {
498                let report = AnyError::from(container_failure);
499                mm1_common::log::error!(
500                    "actor container failure [addr: {}]: {}",
501                    actor_address,
502                    report
503                        .chain()
504                        .map(|e| e.to_string())
505                        .collect::<Vec<_>>()
506                        .join(" <- ")
507                );
508            },
509        }
510    });
511
512    Ok(actor_address)
513}
514
515fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
516    let ActorContext { subnet_context, .. } = context;
517    let subnet_context_locked = subnet_context
518        .try_lock()
519        .expect("could not lock subnet_context");
520    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
521
522    rt_api.sys_send(
523        peer,
524        SysMsg::Link(SysLink::Exit {
525            sender:   this,
526            receiver: peer,
527            reason:   ExitReason::Terminate,
528        }),
529    )
530}
531
532fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
533    let ActorContext { subnet_context, .. } = context;
534    let subnet_context_locked = subnet_context
535        .try_lock()
536        .expect("could not lock subnet_context");
537    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
538
539    rt_api.sys_send(peer, SysMsg::Kill)
540}
541
542async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
543    let ActorContext { call, .. } = context;
544
545    call.invoke(SysCall::Link {
546        sender:   this,
547        receiver: peer,
548    })
549    .await
550}
551
552async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
553    let ActorContext { call, .. } = context;
554    call.invoke(SysCall::Unlink {
555        sender:   this,
556        receiver: peer,
557    })
558    .await
559}
560
561async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
562    let ActorContext { call, .. } = context;
563    call.invoke(SysCall::TrapExit(enable)).await;
564}
565
566async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
567    let ActorContext {
568        fork_address: this,
569        call,
570        ..
571    } = context;
572    let (reply_tx, reply_rx) = oneshot::channel();
573    call.invoke(SysCall::Watch {
574        sender: *this,
575        receiver: peer,
576        reply_tx,
577    })
578    .await;
579    reply_rx.await.expect("sys-call remained unanswered")
580}
581
582async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
583    let ActorContext {
584        fork_address: this,
585        call,
586        ..
587    } = context;
588    call.invoke(SysCall::Unwatch {
589        sender: *this,
590        watch_ref,
591    })
592    .await;
593}
594
595async fn do_init_done(context: &mut ActorContext, address: Address) {
596    let ActorContext {
597        ack_to,
598        subnet_context,
599        ..
600    } = context;
601    let message = InitAck { address };
602    let Some(ack_to_address) = ack_to.take() else {
603        return;
604    };
605    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
606    let subnet_context_locked = subnet_context
607        .try_lock()
608        .expect("could not lock subnet_context");
609    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
610    let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
611}
612
613fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
614    trace!("sending [outbound: {:?}]", outbound);
615    let ActorContext { subnet_context, .. } = context;
616    let subnet_context_locked = subnet_context
617        .try_lock()
618        .expect("could not lock subnet_context");
619    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
620    rt_api
621        .send_to(outbound.header().to, outbound.header().priority, outbound)
622        .map_err(|k| ErrorOf::new(k, ""))
623}
624
625fn do_forward(
626    context: &mut ActorContext,
627    to: Address,
628    outbound: Envelope,
629) -> Result<(), ErrorOf<SendErrorKind>> {
630    trace!("forwarding [to: {}, outbound: {:?}]", to, outbound);
631    let ActorContext { subnet_context, .. } = context;
632    let subnet_context_locked = subnet_context
633        .try_lock()
634        .expect("could not lock subnet_context");
635    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
636    rt_api
637        .send_to(to, outbound.header().priority, outbound)
638        .map_err(|k| ErrorOf::new(k, ""))
639}