mm1_node/runtime/context/
impl_context_api.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::errors::error_of::ErrorOf;
10use mm1_common::futures::timeout::FutureTimeoutExt;
11use mm1_common::log;
12use mm1_common::types::{AnyError, Never};
13use mm1_core::context::{
14    Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit,
15    RecvErrorKind, SendErrorKind, Start, Stop, Watching,
16};
17use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
18use mm1_core::tracing::{TraceId, WithTraceIdExt};
19use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
20use mm1_runnable::local::BoxedRunnable;
21use tokio::sync::{mpsc, oneshot};
22use tokio::time::Instant;
23use tracing::trace;
24
25use crate::config::EffectiveActorConfig;
26use crate::registry::{ForkEntry, NetworkNode};
27use crate::runtime::container;
28use crate::runtime::context::ActorContext;
29use crate::runtime::sys_call::SysCall;
30use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
31
32impl Quit for ActorContext {
33    async fn quit_ok(&mut self) -> Never {
34        self.call.invoke(SysCall::Exit(Ok(()))).await;
35        std::future::pending().await
36    }
37
38    async fn quit_err<E>(&mut self, reason: E) -> Never
39    where
40        E: std::error::Error + Send + Sync + 'static,
41    {
42        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
43        std::future::pending().await
44    }
45}
46
47impl Fork for ActorContext {
48    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
49        let actor_node = self
50            .actor_node
51            .upgrade()
52            .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "actor_node.upgrade"))?;
53        let fork_lease = actor_node.lease_address().map_err(|lease_error| {
54            ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
55        })?;
56        let fork_address = fork_lease.address;
57
58        let (tx_priority, rx_priority) = mpsc::unbounded_channel();
59        let (tx_regular, rx_regular) = mpsc::unbounded_channel();
60        let tx_system_weak = self.tx_system_weak.clone();
61
62        self.call
63            .invoke(SysCall::ForkAdded(
64                fork_lease.address,
65                tx_priority.downgrade(),
66            ))
67            .await;
68
69        let tx_priority_weak = tx_priority.downgrade();
70        let tx_regular_weak = tx_regular.downgrade();
71        let fork_entry = ForkEntry::new(fork_lease, tx_priority, tx_regular);
72
73        let registered = actor_node.register(fork_address, fork_entry);
74        assert!(registered);
75
76        let rt_api = self.rt_api.clone();
77        let rt_config = self.rt_config.clone();
78
79        let actor_key = self.actor_key.clone();
80
81        let context = Self {
82            rt_api,
83            rt_config,
84
85            address: fork_address,
86            actor_key,
87            ack_to: None,
88            actor_node: self.actor_node.clone(),
89            network_nodes: Default::default(),
90
91            rx_priority,
92            rx_regular,
93            tx_system_weak,
94            tx_priority_weak,
95            tx_regular_weak,
96            call: self.call.clone(),
97
98            tx_actor_failure: self.tx_actor_failure.clone(),
99        };
100
101        Ok(context)
102    }
103
104    async fn run<F, Fut>(self, fun: F)
105    where
106        F: FnOnce(Self) -> Fut,
107        F: Send + 'static,
108        Fut: Future + Send + 'static,
109    {
110        let call = self.call.clone();
111        let fut = fun(self)
112            .map(|_| ())
113            .with_trace_id(TraceId::current())
114            .boxed();
115        call.invoke(SysCall::Spawn(fut)).await;
116    }
117}
118
119impl Now for ActorContext {
120    type Instant = Instant;
121
122    fn now(&self) -> Self::Instant {
123        Instant::now()
124    }
125}
126
127impl Start<BoxedRunnable<Self>> for ActorContext {
128    fn spawn(
129        &mut self,
130        runnable: BoxedRunnable<Self>,
131        link: bool,
132    ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
133        do_spawn(self, runnable, None, link.then_some(self.address))
134    }
135
136    fn start(
137        &mut self,
138        runnable: BoxedRunnable<Self>,
139        link: bool,
140        start_timeout: Duration,
141    ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
142        do_start(self, runnable, link, start_timeout)
143    }
144}
145
146impl Stop for ActorContext {
147    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
148        let this = self.address;
149        let out = do_exit(self, this, peer).is_ok();
150        std::future::ready(out)
151    }
152
153    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
154        let out = do_kill(self, peer).is_ok();
155        std::future::ready(out)
156    }
157}
158
159impl Linking for ActorContext {
160    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
161        let this = self.address;
162        do_link(self, this, peer)
163    }
164
165    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
166        let this = self.address;
167        do_unlink(self, this, peer)
168    }
169
170    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
171        do_set_trap_exit(self, enable)
172    }
173}
174
175impl Watching for ActorContext {
176    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
177        do_watch(self, peer)
178    }
179
180    fn unwatch(
181        &mut self,
182        watch_ref: mm1_proto_system::WatchRef,
183    ) -> impl Future<Output = ()> + Send {
184        do_unwatch(self, watch_ref)
185    }
186}
187
188impl InitDone for ActorContext {
189    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
190        do_init_done(self, address)
191    }
192}
193
194impl Messaging for ActorContext {
195    fn address(&self) -> Address {
196        self.address
197    }
198
199    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
200        let (priority, inbound_opt) = tokio::select! {
201            biased;
202
203            inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
204            inbound_opt = self.rx_regular.recv() => (false, inbound_opt.map(|m| m.message)),
205        };
206
207        trace!(
208            "received [priority: {}; inbound: {:?}; via {}]",
209            priority, inbound_opt, self.address
210        );
211
212        inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
213    }
214
215    async fn close(&mut self) {
216        self.rx_regular.close();
217        self.rx_priority.close();
218    }
219
220    fn send(
221        &mut self,
222        envelope: Envelope,
223    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
224        std::future::ready(do_send(self, envelope))
225    }
226
227    fn forward(
228        &mut self,
229        to: Address,
230        envelope: Envelope,
231    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
232        std::future::ready(do_forward(self, to, envelope))
233    }
234}
235
236impl Bind<NetAddress> for ActorContext {
237    async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
238        use std::collections::btree_map::Entry::*;
239
240        let BindArgs {
241            bind_to,
242            inbox_size,
243        } = args;
244
245        log::debug!("binding [to: {}; inbox-size: {}]", bind_to, inbox_size);
246
247        let rt_api = self.rt_api.clone();
248        let registry = rt_api.registry();
249
250        let address_range = AddressRange::from(bind_to);
251
252        let network_nodes_entry = match self.network_nodes.entry(address_range) {
253            Occupied(o) => {
254                let previously_bound_to = NetAddress::from(*o.key());
255                return Err(ErrorOf::new(
256                    BindErrorKind::Conflict,
257                    format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
258                ))
259            },
260            Vacant(v) => v,
261        };
262
263        let subnet_lease = Lease::trusted(bind_to);
264
265        let (tx_system, _rx_system) = mpsc::unbounded_channel();
266
267        let tx_priority = self.tx_priority_weak.upgrade().ok_or_else(|| {
268            ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
269        })?;
270        let tx_regular = self.tx_regular_weak.upgrade().ok_or_else(|| {
271            ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
272        })?;
273
274        let network_node = Arc::new(NetworkNode::new(
275            subnet_lease,
276            inbox_size,
277            tx_system,
278            tx_priority,
279            tx_regular,
280        ));
281
282        if !registry.register(bind_to, network_node.clone()) {
283            log::warn!("couldn't bind [to: {}; reason: conflict]", bind_to);
284            return Err(ErrorOf::new(
285                BindErrorKind::Conflict,
286                "probably address in use",
287            ))
288        }
289
290        let network_node = Arc::downgrade(&network_node);
291        network_nodes_entry.insert(network_node);
292
293        log::info!("bound [to: {}; inbox-size: {}]", bind_to, inbox_size);
294
295        Ok(())
296    }
297}
298
299async fn do_start(
300    context: &mut ActorContext,
301    runnable: BoxedRunnable<ActorContext>,
302    link: bool,
303    timeout: Duration,
304) -> Result<Address, ErrorOf<StartErrorKind>> {
305    let this_address = context.address;
306
307    let mut fork = context
308        .fork()
309        .await
310        .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
311
312    let fork_address = fork.address;
313    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
314        .await
315        .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
316
317    let envelope = match fork.recv().timeout(timeout).await {
318        Err(_elapsed) => {
319            do_kill(context, spawned_address)
320                .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
321
322            // TODO: should we ensure termination with a `system::Watch`?
323
324            return Err(ErrorOf::new(
325                StartErrorKind::Timeout,
326                "no init-ack within timeout",
327            ))
328        },
329        Ok(recv_result) => {
330            recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
331        },
332    };
333
334    dispatch!(match envelope {
335        mm1_proto_system::InitAck { address } => {
336            if link {
337                do_link(context, this_address, address).await;
338            }
339            Ok(address)
340        },
341
342        mm1_proto_system::Exited { .. } => {
343            Err(ErrorOf::new(
344                StartErrorKind::Exited,
345                "exited before init-ack",
346            ))
347        },
348
349        unexpected @ _ => {
350            Err(ErrorOf::new(
351                StartErrorKind::InternalError,
352                format!("unexpected message: {unexpected:?}"),
353            ))
354        },
355    })
356}
357
358async fn do_spawn(
359    context: &mut ActorContext,
360    runnable: BoxedRunnable<ActorContext>,
361    ack_to: Option<Address>,
362    link_to: impl IntoIterator<Item = Address>,
363) -> Result<Address, ErrorOf<SpawnErrorKind>> {
364    let actor_key = context.actor_key.child(runnable.func_name());
365    let actor_config = context.rt_config.actor_config(&actor_key);
366    let execute_on = context.rt_api.choose_executor(actor_config.runtime_key());
367
368    trace!("starting [ack-to: {:?}]", ack_to);
369
370    let subnet_lease = context
371        .rt_api
372        .request_address(actor_config.netmask())
373        .await
374        .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
375
376    let rt_api = context.rt_api.clone();
377    let rt_config = context.rt_config.clone();
378    let container = container::Container::create(
379        container::ContainerArgs {
380            ack_to,
381            // FIXME: can we make it IntoIterator too?
382            link_to: link_to.into_iter().collect(),
383            actor_key,
384            trace_id: TraceId::current(),
385
386            subnet_lease,
387            rt_api,
388            rt_config,
389        },
390        runnable,
391        context.tx_actor_failure.clone(),
392    )
393    .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
394    let actor_address = container.actor_address();
395
396    trace!("actor-address: {}", actor_address);
397
398    let tx_actor_failure = context.tx_actor_failure.clone();
399    // TODO: maybe keep it somewhere too?
400    let _join_handle = execute_on.spawn(async move {
401        match container.run().await {
402            Ok(Ok(())) => (),
403            Ok(Err(actor_failure)) => {
404                let _ = tx_actor_failure.send((actor_address, actor_failure));
405            },
406            Err(container_failure) => {
407                let report = AnyError::from(container_failure);
408                mm1_common::log::error!(
409                    "actor container failure [addr: {}]: {}",
410                    actor_address,
411                    report
412                        .chain()
413                        .map(|e| e.to_string())
414                        .collect::<Vec<_>>()
415                        .join(" <- ")
416                );
417            },
418        }
419    });
420
421    Ok(actor_address)
422}
423
424fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
425    context.rt_api.sys_send(
426        peer,
427        SysMsg::Link(SysLink::Exit {
428            sender:   this,
429            receiver: peer,
430            reason:   ExitReason::Terminate,
431        }),
432    )
433}
434
435fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
436    context.rt_api.sys_send(peer, SysMsg::Kill)
437}
438
439async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
440    context
441        .call
442        .invoke(SysCall::Link {
443            sender:   this,
444            receiver: peer,
445        })
446        .await
447}
448
449async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
450    context
451        .call
452        .invoke(SysCall::Unlink {
453            sender:   this,
454            receiver: peer,
455        })
456        .await
457}
458
459async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
460    context.call.invoke(SysCall::TrapExit(enable)).await;
461}
462
463async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
464    let this = context.address;
465    let (reply_tx, reply_rx) = oneshot::channel();
466    context
467        .call
468        .invoke(SysCall::Watch {
469            sender: this,
470            receiver: peer,
471            reply_tx,
472        })
473        .await;
474    reply_rx.await.expect("sys-call remained unanswered")
475}
476
477async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
478    let this = context.address;
479    context
480        .call
481        .invoke(SysCall::Unwatch {
482            sender: this,
483            watch_ref,
484        })
485        .await;
486}
487
488async fn do_init_done(context: &mut ActorContext, address: Address) {
489    let message = InitAck { address };
490    let Some(ack_to_address) = context.ack_to.take() else {
491        return;
492    };
493    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
494    let _ = context
495        .rt_api
496        .send_to(envelope.header().to, true, envelope.into_erased());
497}
498
499fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
500    trace!("sending [outbound: {:?}]", outbound);
501    context
502        .rt_api
503        .send_to(outbound.header().to, false, outbound)
504        .map_err(|k| ErrorOf::new(k, ""))
505}
506
507fn do_forward(
508    context: &mut ActorContext,
509    to: Address,
510    outbound: Envelope,
511) -> Result<(), ErrorOf<SendErrorKind>> {
512    trace!("forwarding [to: {}, outbound: {:?}]", to, outbound);
513    context
514        .rt_api
515        .send_to(to, false, outbound)
516        .map_err(|k| ErrorOf::new(k, ""))
517}