mm1_node/runtime/context/
impl_context_api.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::errors::error_of::ErrorOf;
10use mm1_common::futures::timeout::FutureTimeoutExt;
11use mm1_common::log;
12use mm1_common::types::{AnyError, Never};
13use mm1_core::context::{
14    Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit,
15    RecvErrorKind, SendErrorKind, Start, Stop, Watching,
16};
17use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
18use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
19use mm1_runnable::local::BoxedRunnable;
20use tokio::sync::{mpsc, oneshot};
21use tokio::time::Instant;
22use tracing::trace;
23
24use crate::config::EffectiveActorConfig;
25use crate::registry::{ForkEntry, NetworkNode};
26use crate::runtime::container;
27use crate::runtime::context::ActorContext;
28use crate::runtime::sys_call::SysCall;
29use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
30
31impl Quit for ActorContext {
32    async fn quit_ok(&mut self) -> Never {
33        self.call.invoke(SysCall::Exit(Ok(()))).await;
34        std::future::pending().await
35    }
36
37    async fn quit_err<E>(&mut self, reason: E) -> Never
38    where
39        E: std::error::Error + Send + Sync + 'static,
40    {
41        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
42        std::future::pending().await
43    }
44}
45
46impl Fork for ActorContext {
47    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
48        let actor_node = self
49            .actor_node
50            .upgrade()
51            .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "actor_node.upgrade"))?;
52        let fork_lease = actor_node.lease_address().map_err(|lease_error| {
53            ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
54        })?;
55        let fork_address = fork_lease.address;
56
57        let (tx_priority, rx_priority) = mpsc::unbounded_channel();
58        let (tx_regular, rx_regular) = mpsc::unbounded_channel();
59        let tx_system_weak = self.tx_system_weak.clone();
60
61        self.call
62            .invoke(SysCall::ForkAdded(
63                fork_lease.address,
64                tx_priority.downgrade(),
65            ))
66            .await;
67
68        let tx_priority_weak = tx_priority.downgrade();
69        let tx_regular_weak = tx_regular.downgrade();
70        let fork_entry = ForkEntry::new(fork_lease, tx_priority, tx_regular);
71
72        let registered = actor_node.register(fork_address, fork_entry);
73        assert!(registered);
74
75        let rt_api = self.rt_api.clone();
76        let rt_config = self.rt_config.clone();
77
78        let actor_key = self.actor_key.clone();
79
80        let context = Self {
81            rt_api,
82            rt_config,
83
84            address: fork_address,
85            actor_key,
86            ack_to: None,
87            actor_node: self.actor_node.clone(),
88            network_nodes: Default::default(),
89
90            rx_priority,
91            rx_regular,
92            tx_system_weak,
93            tx_priority_weak,
94            tx_regular_weak,
95            call: self.call.clone(),
96
97            tx_actor_failure: self.tx_actor_failure.clone(),
98        };
99
100        Ok(context)
101    }
102
103    async fn run<F, Fut>(self, fun: F)
104    where
105        F: FnOnce(Self) -> Fut,
106        F: Send + 'static,
107        Fut: Future + Send + 'static,
108    {
109        let call = self.call.clone();
110        let fut = fun(self).map(|_| ()).boxed();
111        call.invoke(SysCall::Spawn(fut)).await;
112    }
113}
114
115impl Now for ActorContext {
116    type Instant = Instant;
117
118    fn now(&self) -> Self::Instant {
119        Instant::now()
120    }
121}
122
123impl Start<BoxedRunnable<Self>> for ActorContext {
124    fn spawn(
125        &mut self,
126        runnable: BoxedRunnable<Self>,
127        link: bool,
128    ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
129        do_spawn(self, runnable, None, link.then_some(self.address))
130    }
131
132    fn start(
133        &mut self,
134        runnable: BoxedRunnable<Self>,
135        link: bool,
136        start_timeout: Duration,
137    ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
138        do_start(self, runnable, link, start_timeout)
139    }
140}
141
142impl Stop for ActorContext {
143    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
144        let this = self.address;
145        let out = do_exit(self, this, peer).is_ok();
146        std::future::ready(out)
147    }
148
149    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
150        let out = do_kill(self, peer).is_ok();
151        std::future::ready(out)
152    }
153}
154
155impl Linking for ActorContext {
156    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
157        let this = self.address;
158        do_link(self, this, peer)
159    }
160
161    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
162        let this = self.address;
163        do_unlink(self, this, peer)
164    }
165
166    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
167        do_set_trap_exit(self, enable)
168    }
169}
170
171impl Watching for ActorContext {
172    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
173        do_watch(self, peer)
174    }
175
176    fn unwatch(
177        &mut self,
178        watch_ref: mm1_proto_system::WatchRef,
179    ) -> impl Future<Output = ()> + Send {
180        do_unwatch(self, watch_ref)
181    }
182}
183
184impl InitDone for ActorContext {
185    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
186        do_init_done(self, address)
187    }
188}
189
190impl Messaging for ActorContext {
191    fn address(&self) -> Address {
192        self.address
193    }
194
195    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
196        let (priority, inbound_opt) = tokio::select! {
197            biased;
198
199            inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
200            inbound_opt = self.rx_regular.recv() => (false, inbound_opt.map(|m| m.message)),
201        };
202
203        trace!(
204            "received [priority: {}; inbound: {:?}; via {}]",
205            priority, inbound_opt, self.address
206        );
207
208        inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
209    }
210
211    async fn close(&mut self) {
212        self.rx_regular.close();
213        self.rx_priority.close();
214    }
215
216    fn send(
217        &mut self,
218        envelope: Envelope,
219    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
220        std::future::ready(do_send(self, envelope))
221    }
222
223    fn forward(
224        &mut self,
225        to: Address,
226        envelope: Envelope,
227    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
228        std::future::ready(do_forward(self, to, envelope))
229    }
230}
231
232impl Bind<NetAddress> for ActorContext {
233    async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
234        use std::collections::btree_map::Entry::*;
235
236        let BindArgs {
237            bind_to,
238            inbox_size,
239        } = args;
240
241        log::debug!("binding [to: {}; inbox-size: {}]", bind_to, inbox_size);
242
243        let rt_api = self.rt_api.clone();
244        let registry = rt_api.registry();
245
246        let address_range = AddressRange::from(bind_to);
247
248        let network_nodes_entry = match self.network_nodes.entry(address_range) {
249            Occupied(o) => {
250                let previously_bound_to = NetAddress::from(*o.key());
251                return Err(ErrorOf::new(
252                    BindErrorKind::Conflict,
253                    format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
254                ))
255            },
256            Vacant(v) => v,
257        };
258
259        let subnet_lease = Lease::trusted(bind_to);
260
261        let (tx_system, _rx_system) = mpsc::unbounded_channel();
262
263        let tx_priority = self.tx_priority_weak.upgrade().ok_or_else(|| {
264            ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
265        })?;
266        let tx_regular = self.tx_regular_weak.upgrade().ok_or_else(|| {
267            ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
268        })?;
269
270        let network_node = Arc::new(NetworkNode::new(
271            subnet_lease,
272            inbox_size,
273            tx_system,
274            tx_priority,
275            tx_regular,
276        ));
277
278        if !registry.register(bind_to, network_node.clone()) {
279            log::warn!("couldn't bind [to: {}; reason: conflict]", bind_to);
280            return Err(ErrorOf::new(
281                BindErrorKind::Conflict,
282                "probably address in use",
283            ))
284        }
285
286        let network_node = Arc::downgrade(&network_node);
287        network_nodes_entry.insert(network_node);
288
289        log::info!("bound [to: {}; inbox-size: {}]", bind_to, inbox_size);
290
291        Ok(())
292    }
293}
294
295async fn do_start(
296    context: &mut ActorContext,
297    runnable: BoxedRunnable<ActorContext>,
298    link: bool,
299    timeout: Duration,
300) -> Result<Address, ErrorOf<StartErrorKind>> {
301    let this_address = context.address;
302
303    let mut fork = context
304        .fork()
305        .await
306        .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
307
308    let fork_address = fork.address;
309    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
310        .await
311        .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
312
313    let envelope = match fork.recv().timeout(timeout).await {
314        Err(_elapsed) => {
315            do_kill(context, spawned_address)
316                .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
317
318            // TODO: should we ensure termination with a `system::Watch`?
319
320            return Err(ErrorOf::new(
321                StartErrorKind::Timeout,
322                "no init-ack within timeout",
323            ))
324        },
325        Ok(recv_result) => {
326            recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
327        },
328    };
329
330    dispatch!(match envelope {
331        mm1_proto_system::InitAck { address } => {
332            if link {
333                do_link(context, this_address, address).await;
334            }
335            Ok(address)
336        },
337
338        mm1_proto_system::Exited { .. } => {
339            Err(ErrorOf::new(
340                StartErrorKind::Exited,
341                "exited before init-ack",
342            ))
343        },
344
345        unexpected @ _ => {
346            Err(ErrorOf::new(
347                StartErrorKind::InternalError,
348                format!("unexpected message: {unexpected:?}"),
349            ))
350        },
351    })
352}
353
354async fn do_spawn(
355    context: &mut ActorContext,
356    runnable: BoxedRunnable<ActorContext>,
357    ack_to: Option<Address>,
358    link_to: impl IntoIterator<Item = Address>,
359) -> Result<Address, ErrorOf<SpawnErrorKind>> {
360    let actor_key = context.actor_key.child(runnable.func_name());
361    let actor_config = context.rt_config.actor_config(&actor_key);
362    let execute_on = context.rt_api.choose_executor(actor_config.runtime_key());
363
364    trace!("starting [ack-to: {:?}]", ack_to);
365
366    let subnet_lease = context
367        .rt_api
368        .request_address(actor_config.netmask())
369        .await
370        .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
371
372    let rt_api = context.rt_api.clone();
373    let rt_config = context.rt_config.clone();
374    let container = container::Container::create(
375        container::ContainerArgs {
376            ack_to,
377            // FIXME: can we make it IntoIterator too?
378            link_to: link_to.into_iter().collect(),
379            actor_key,
380
381            subnet_lease,
382            rt_api,
383            rt_config,
384        },
385        runnable,
386        context.tx_actor_failure.clone(),
387    )
388    .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
389    let actor_address = container.actor_address();
390
391    trace!("actor-address: {}", actor_address);
392
393    let tx_actor_failure = context.tx_actor_failure.clone();
394    // TODO: maybe keep it somewhere too?
395    let _join_handle = execute_on.spawn(async move {
396        match container.run().await {
397            Ok(Ok(())) => (),
398            Ok(Err(actor_failure)) => {
399                let _ = tx_actor_failure.send((actor_address, actor_failure));
400            },
401            Err(container_failure) => {
402                let report = AnyError::from(container_failure);
403                mm1_common::log::error!(
404                    "actor container failure [addr: {}]: {}",
405                    actor_address,
406                    report
407                        .chain()
408                        .map(|e| e.to_string())
409                        .collect::<Vec<_>>()
410                        .join(" <- ")
411                );
412            },
413        }
414    });
415
416    Ok(actor_address)
417}
418
419fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
420    context.rt_api.sys_send(
421        peer,
422        SysMsg::Link(SysLink::Exit {
423            sender:   this,
424            receiver: peer,
425            reason:   ExitReason::Terminate,
426        }),
427    )
428}
429
430fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
431    context.rt_api.sys_send(peer, SysMsg::Kill)
432}
433
434async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
435    context
436        .call
437        .invoke(SysCall::Link {
438            sender:   this,
439            receiver: peer,
440        })
441        .await
442}
443
444async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
445    context
446        .call
447        .invoke(SysCall::Unlink {
448            sender:   this,
449            receiver: peer,
450        })
451        .await
452}
453
454async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
455    context.call.invoke(SysCall::TrapExit(enable)).await;
456}
457
458async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
459    let this = context.address;
460    let (reply_tx, reply_rx) = oneshot::channel();
461    context
462        .call
463        .invoke(SysCall::Watch {
464            sender: this,
465            receiver: peer,
466            reply_tx,
467        })
468        .await;
469    reply_rx.await.expect("sys-call remained unanswered")
470}
471
472async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
473    let this = context.address;
474    context
475        .call
476        .invoke(SysCall::Unwatch {
477            sender: this,
478            watch_ref,
479        })
480        .await;
481}
482
483async fn do_init_done(context: &mut ActorContext, address: Address) {
484    let message = InitAck { address };
485    let Some(ack_to_address) = context.ack_to.take() else {
486        return;
487    };
488    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
489    let _ = context
490        .rt_api
491        .send_to(envelope.header().to, true, envelope.into_erased());
492}
493
494fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
495    trace!("sending [outbound: {:?}]", outbound);
496    context
497        .rt_api
498        .send_to(outbound.header().to, false, outbound)
499        .map_err(|k| ErrorOf::new(k, ""))
500}
501
502fn do_forward(
503    context: &mut ActorContext,
504    to: Address,
505    outbound: Envelope,
506) -> Result<(), ErrorOf<SendErrorKind>> {
507    trace!("forwarding [to: {}, outbound: {:?}]", to, outbound);
508    context
509        .rt_api
510        .send_to(to, false, outbound)
511        .map_err(|k| ErrorOf::new(k, ""))
512}