mm1_node/runtime/context/
impl_context_api.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::chain::{ExactTypeDisplayChainExt, StdErrorDisplayChainExt};
10use mm1_common::errors::error_of::ErrorOf;
11use mm1_common::futures::timeout::FutureTimeoutExt;
12use mm1_common::log;
13use mm1_common::types::{AnyError, Never};
14use mm1_core::context::{
15    Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Ping,
16    PingErrorKind, Quit, RecvErrorKind, SendErrorKind, Start, Stop, Watching,
17};
18use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
19use mm1_core::tap;
20use mm1_core::tracing::{TraceId, WithTraceIdExt};
21use mm1_proto_system as sys;
22use mm1_runnable::local::BoxedRunnable;
23use rand::RngCore;
24use tokio::sync::oneshot;
25use tokio::time::Instant;
26use tracing::{trace, warn};
27
28use crate::config::EffectiveActorConfig;
29use crate::registry::{self, MessageWithPermit, MessageWithoutPermit};
30use crate::runtime::container;
31use crate::runtime::context::{ActorContext, ForkEntry, SubnetContext};
32use crate::runtime::rt_api::RtApi;
33use crate::runtime::sys_call::SysCall;
34use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
35
36impl Quit for ActorContext {
37    async fn quit_ok(&mut self) -> Never {
38        self.call.invoke(SysCall::Exit(Ok(()))).await;
39        std::future::pending().await
40    }
41
42    async fn quit_err<E>(&mut self, reason: E) -> Never
43    where
44        E: std::error::Error + Send + Sync + 'static,
45    {
46        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
47        std::future::pending().await
48    }
49}
50
51impl Fork for ActorContext {
52    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
53        let Self {
54            fork_address: this_address,
55            call,
56            subnet_context,
57            ..
58        } = self;
59
60        let fork_lease = {
61            let mut subnet_context_locked = subnet_context
62                .try_lock()
63                .expect("could not lock subnet_context");
64            let SubnetContext {
65                subnet_pool,
66                fork_entries,
67                ..
68            } = &mut *subnet_context_locked;
69            let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
70                ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
71            })?;
72            let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
73            assert!(should_be_none.is_none());
74            fork_lease
75        };
76        let fork_address = fork_lease.address;
77        trace!(parent = %this_address, child = %fork_address, "forking");
78
79        call.invoke(SysCall::ForkAdded(fork_address)).await;
80
81        let context = Self {
82            subnet_context: subnet_context.clone(),
83            fork_address,
84            fork_lease: Some(fork_lease),
85            ack_to: None,
86            call: call.clone(),
87        };
88
89        Ok(context)
90    }
91
92    async fn run<F, Fut>(self, fun: F)
93    where
94        F: FnOnce(Self) -> Fut,
95        F: Send + 'static,
96        Fut: Future + Send + 'static,
97    {
98        let call = self.call.clone();
99        let fut = fun(self)
100            .map(|_| ())
101            .with_trace_id(TraceId::current())
102            .boxed();
103        call.invoke(SysCall::Spawn(fut)).await;
104    }
105}
106
107impl Now for ActorContext {
108    type Instant = Instant;
109
110    fn now(&self) -> Self::Instant {
111        Instant::now()
112    }
113}
114
115impl Start<BoxedRunnable<Self>> for ActorContext {
116    fn spawn(
117        &mut self,
118        runnable: BoxedRunnable<Self>,
119        link: bool,
120    ) -> impl Future<Output = Result<Address, ErrorOf<sys::SpawnErrorKind>>> + Send {
121        do_spawn(self, runnable, None, link.then_some(self.fork_address))
122    }
123
124    fn start(
125        &mut self,
126        runnable: BoxedRunnable<Self>,
127        link: bool,
128        start_timeout: Duration,
129    ) -> impl Future<Output = Result<Address, ErrorOf<sys::StartErrorKind>>> + Send {
130        do_start(self, runnable, link, start_timeout)
131    }
132}
133
134impl Stop for ActorContext {
135    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
136        let this = self.fork_address;
137        let out = do_exit(self, this, peer).is_ok();
138        std::future::ready(out)
139    }
140
141    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
142        let out = do_kill(self, peer).is_ok();
143        std::future::ready(out)
144    }
145}
146
147impl Linking for ActorContext {
148    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
149        let this = self.fork_address;
150        do_link(self, this, peer)
151    }
152
153    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
154        let this = self.fork_address;
155        do_unlink(self, this, peer)
156    }
157
158    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
159        do_set_trap_exit(self, enable)
160    }
161}
162
163impl Watching for ActorContext {
164    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
165        do_watch(self, peer)
166    }
167
168    fn unwatch(
169        &mut self,
170        watch_ref: mm1_proto_system::WatchRef,
171    ) -> impl Future<Output = ()> + Send {
172        do_unwatch(self, watch_ref)
173    }
174}
175
176impl InitDone for ActorContext {
177    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
178        do_init_done(self, address)
179    }
180}
181
182impl Messaging for ActorContext {
183    fn address(&self) -> Address {
184        self.fork_address
185    }
186
187    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
188        let ActorContext {
189            fork_address,
190            subnet_context,
191            ..
192        } = self;
193
194        loop {
195            let (inbound_envelope_opt, subnet_notify, fork_notify) = {
196                let mut subnet_context_locked = subnet_context
197                    .try_lock()
198                    .expect("could not lock subnet_context");
199                let SubnetContext {
200                    rt_api,
201                    actor_key,
202                    subnet_address,
203                    subnet_notify,
204                    rx_priority,
205                    rx_regular,
206                    fork_entries,
207                    bound_subnets,
208                    message_tap,
209                    ..
210                } = &mut *subnet_context_locked;
211
212                process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)?;
213
214                let this_fork_entry = fork_entries
215                    .get_mut(fork_address)
216                    .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
217                let inbound_envelope_opt = if let Some(priority_message) =
218                    this_fork_entry.inbox_priority.pop_front()
219                {
220                    Some(priority_message.message)
221                } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
222                    Some(regular_message.message)
223                } else {
224                    None
225                };
226                if let Some(envelope) = inbound_envelope_opt.as_ref() {
227                    message_tap.on_recv(tap::OnRecv {
228                        recv_by_addr: *fork_address,
229                        recv_by_net: *subnet_address,
230                        recv_by_key: actor_key,
231                        envelope,
232                    });
233                }
234                (
235                    inbound_envelope_opt,
236                    subnet_notify.clone(),
237                    this_fork_entry.fork_notifiy.clone(),
238                )
239            };
240
241            if let Some(inbound_envelope) = inbound_envelope_opt {
242                break Ok(inbound_envelope)
243            } else {
244                let subnet_notified = subnet_notify.notified();
245                let fork_notified = fork_notify.notified();
246
247                tokio::select! {
248                    _ = subnet_notified => (),
249                    _ = fork_notified => (),
250                }
251            }
252        }
253    }
254
255    async fn close(&mut self) {
256        // self.rx_regular.close();
257        // self.rx_priority.close();
258        todo!()
259    }
260
261    fn send(
262        &mut self,
263        envelope: Envelope,
264    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
265        std::future::ready(do_send(self, envelope))
266    }
267
268    fn forward(
269        &mut self,
270        to: Address,
271        envelope: Envelope,
272    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
273        std::future::ready(do_forward(self, to, envelope))
274    }
275}
276
277impl Bind<NetAddress> for ActorContext {
278    async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
279        use std::collections::btree_map::Entry::*;
280
281        let BindArgs {
282            bind_to,
283            inbox_size,
284        } = args;
285        let address_range = AddressRange::from(bind_to);
286
287        log::debug!(%bind_to, %inbox_size, "binding");
288
289        let Self {
290            fork_address,
291            subnet_context,
292            ..
293        } = self;
294
295        {
296            let mut subnet_context_locked = subnet_context
297                .try_lock()
298                .expect("could not lock subnet_context");
299
300            let SubnetContext {
301                rt_api,
302                subnet_mailbox_tx,
303                bound_subnets,
304                ..
305            } = &mut *subnet_context_locked;
306
307            let bound_subnet_entry = match bound_subnets.entry(address_range) {
308                Vacant(v) => v,
309                Occupied(o) => {
310                    let previously_bound_to = NetAddress::from(*o.key());
311                    return Err(ErrorOf::new(
312                        BindErrorKind::Conflict,
313                        format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
314                    ))
315                },
316            };
317            let subnet_lease = Lease::trusted(bind_to);
318
319            let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
320                BindErrorKind::Closed,
321                "the actor subnet is probably unregistered",
322            ))?;
323            let bound_subnet_node =
324                registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
325
326            rt_api
327                .registry()
328                .register(bind_to, bound_subnet_node)
329                .map_err(|_| {
330                    ErrorOf::new(
331                        BindErrorKind::Conflict,
332                        "could not register the subnet-node",
333                    )
334                })?;
335
336            bound_subnet_entry.insert(*fork_address);
337        }
338
339        log::info!(%bind_to, %inbox_size, "bound");
340
341        Ok(())
342    }
343}
344
345impl Ping for ActorContext {
346    async fn ping(
347        &mut self,
348        address: Address,
349        timeout: Duration,
350    ) -> Result<Duration, ErrorOf<PingErrorKind>> {
351        let ping_id = rand::rng().next_u64();
352        let now = Instant::now();
353        let deadline = now.checked_add(timeout).unwrap_or(now);
354
355        let Self {
356            fork_address,
357            subnet_context,
358            ..
359        } = self;
360
361        let (subnet_notify, fork_notify) = {
362            let mut subnet_context_locked = subnet_context
363                .try_lock()
364                .expect("could not lock subnet_context");
365            let SubnetContext {
366                rt_api,
367                fork_entries,
368                subnet_notify,
369                ..
370            } = &mut *subnet_context_locked;
371            let ForkEntry { fork_notifiy, .. } = fork_entries
372                .get_mut(fork_address)
373                .expect("fork_entry missing");
374
375            let ping_msg = sys::Ping {
376                reply_to: Some(*fork_address),
377                id:       ping_id,
378            };
379
380            let ping_header = EnvelopeHeader::to_address(address).with_priority(true);
381            let ping_envelope = Envelope::new(ping_header, ping_msg).into_erased();
382            rt_api
383                .send_to(address, true, ping_envelope)
384                .map_err(|e| ErrorOf::new(PingErrorKind::Send, e.to_string()))?;
385
386            (subnet_notify.clone(), fork_notifiy.clone())
387        };
388
389        loop {
390            let timeout = tokio::time::sleep_until(deadline);
391            tokio::select! {
392                _ = timeout => { return Err(ErrorOf::new(PingErrorKind::Timeout, "timeout elapsed")) },
393                _ = subnet_notify.notified() => (),
394                _ = fork_notify.notified() => (),
395            };
396
397            let mut subnet_context_locked = subnet_context
398                .try_lock()
399                .expect("could not lock subnet_context");
400            let SubnetContext {
401                rt_api,
402                fork_entries,
403                bound_subnets,
404                rx_priority,
405                rx_regular,
406                ..
407            } = &mut *subnet_context_locked;
408
409            process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)
410                .map_err(|e| ErrorOf::new(PingErrorKind::Recv, e.to_string()))?;
411
412            if fork_entries
413                .get(fork_address)
414                .expect("fork_entry_missing")
415                .last_ping_received
416                == Some(ping_id)
417            {
418                break
419            }
420        }
421        Ok(now.elapsed())
422    }
423}
424
425fn process_inlets(
426    rt_api: &RtApi,
427    bound_subnets: &BTreeMap<AddressRange, Address>,
428    rx_priority: &mut kanal::Receiver<MessageWithoutPermit<Envelope>>,
429    rx_regular: &mut kanal::Receiver<MessageWithPermit<Envelope>>,
430    fork_entries: &mut HashMap<Address, ForkEntry>,
431) -> Result<(), ErrorOf<RecvErrorKind>> {
432    let mut notified_forks = HashSet::new();
433    while let Some(message) = rx_priority
434        .try_recv_realtime()
435        .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
436    {
437        let message_to = bound_subnets
438            .get(&AddressRange::from(message.to))
439            .copied()
440            .unwrap_or(message.to);
441        let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
442            warn!(dst = %message_to, "no such fork");
443            continue
444        };
445        let should_notify = if let Some(ping_message) = message.message.peek::<sys::Ping>() {
446            let sys::Ping { reply_to, id } = *ping_message;
447            if let Some(reply_to) = reply_to {
448                trace!(%id, %reply_to, "received a ping request");
449                let pong_header = EnvelopeHeader::to_address(reply_to).with_priority(true);
450                let pong_envelope =
451                    Envelope::new(pong_header, sys::Ping { reply_to: None, id }).into_erased();
452                rt_api
453                    .send_to(reply_to, true, pong_envelope)
454                    .inspect_err(
455                        |e| warn!(reason = %e.as_display_chain(), "can't send ping-response"),
456                    )
457                    .ok();
458                trace!(%id, %reply_to, "send a ping response");
459                false
460            } else {
461                trace!(%id, "received a ping response");
462                fork_entry.last_ping_received = Some(id);
463                true
464            }
465        } else {
466            fork_entry.inbox_priority.push_back(message);
467            true
468        };
469
470        if should_notify && notified_forks.insert(message_to) {
471            trace!(dst = %message_to, "notifying fork");
472            fork_entry.fork_notifiy.notify_one();
473        }
474    }
475
476    while let Some(message) = rx_regular
477        .try_recv_realtime()
478        .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
479    {
480        let message_to = bound_subnets
481            .get(&AddressRange::from(message.to))
482            .copied()
483            .unwrap_or(message.to);
484        let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
485            warn!(dst = %message_to, "no such fork");
486            continue
487        };
488        fork_entry.inbox_regular.push_back(message);
489        trace!(dst = %message_to, "subnet received regular message");
490        if notified_forks.insert(message_to) {
491            trace!(dst = %message_to, "notifying fork");
492            fork_entry.fork_notifiy.notify_one();
493        }
494    }
495
496    Ok(())
497}
498
499async fn do_start(
500    context: &mut ActorContext,
501    runnable: BoxedRunnable<ActorContext>,
502    link: bool,
503    timeout: Duration,
504) -> Result<Address, ErrorOf<sys::StartErrorKind>> {
505    let this_address = context.fork_address;
506
507    let mut fork = context
508        .fork()
509        .await
510        .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
511
512    let fork_address = fork.fork_address;
513    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
514        .await
515        .map_err(|e| e.map_kind(sys::StartErrorKind::Spawn))?;
516
517    let envelope = match fork.recv().timeout(timeout).await {
518        Err(_elapsed) => {
519            do_kill(context, spawned_address)
520                .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
521
522            // TODO: should we ensure termination with a `system::Watch`?
523
524            return Err(ErrorOf::new(
525                sys::StartErrorKind::Timeout,
526                "no init-ack within timeout",
527            ))
528        },
529        Ok(recv_result) => {
530            recv_result
531                .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?
532        },
533    };
534
535    dispatch!(match envelope {
536        sys::InitAck { address } => {
537            if link {
538                do_link(context, this_address, address).await;
539            }
540            Ok(address)
541        },
542
543        sys::Exited { .. } => {
544            Err(ErrorOf::new(
545                sys::StartErrorKind::Exited,
546                "exited before init-ack",
547            ))
548        },
549
550        unexpected @ _ => {
551            Err(ErrorOf::new(
552                sys::StartErrorKind::InternalError,
553                format!("unexpected message: {unexpected:?}"),
554            ))
555        },
556    })
557}
558
559async fn do_spawn(
560    context: &mut ActorContext,
561    runnable: BoxedRunnable<ActorContext>,
562    ack_to: Option<Address>,
563    link_to: impl IntoIterator<Item = Address>,
564) -> Result<Address, ErrorOf<sys::SpawnErrorKind>> {
565    let ActorContext { subnet_context, .. } = context;
566
567    let (actor_key, rt_config, rt_api, tx_actor_failure) = {
568        let subnet_context_locked = subnet_context
569            .try_lock()
570            .expect("could not lock subnet_context");
571        let SubnetContext {
572            rt_api,
573            rt_config,
574            actor_key,
575            tx_actor_failure,
576            ..
577        } = &*subnet_context_locked;
578
579        (
580            actor_key.child(runnable.func_name()),
581            rt_config.clone(),
582            rt_api.clone(),
583            tx_actor_failure.clone(),
584        )
585    };
586
587    let actor_config = rt_config.actor_config(&actor_key);
588    let execute_on = rt_api.choose_executor(actor_config.runtime_key());
589    let message_tap = rt_api.message_tap(actor_config.message_tap_key());
590
591    trace!(?ack_to, "starting");
592
593    let subnet_lease = rt_api
594        .request_address(actor_config.netmask())
595        .await
596        .inspect_err(|e| log::error!("lease-error: {}", e))
597        .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::ResourceConstraint, e.to_string()))?;
598
599    trace!(subnet_lease = %subnet_lease.net_address(), "subnet leased");
600
601    let rt_api = rt_api.clone();
602    let rt_config = rt_config.clone();
603    let container = container::Container::create(
604        container::ContainerArgs {
605            ack_to,
606            // FIXME: can we make it IntoIterator too?
607            link_to: link_to.into_iter().collect(),
608            actor_key,
609            trace_id: TraceId::current(),
610
611            subnet_lease,
612            rt_api,
613            rt_config,
614            message_tap,
615            tx_actor_failure: tx_actor_failure.clone(),
616        },
617        runnable,
618    )
619    .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::InternalError, e.to_string()))?;
620    let actor_address = container.actor_address();
621
622    trace!(spawned_address = %actor_address, "about to run spawned actor");
623
624    let tx_actor_failure = tx_actor_failure.clone();
625    // TODO: maybe keep it somewhere too?
626    let _join_handle = execute_on.spawn(async move {
627        match container.run().await {
628            Ok(Ok(())) => (),
629            Ok(Err(actor_failure)) => {
630                let _ = tx_actor_failure.send((actor_address, actor_failure));
631            },
632            Err(container_failure) => {
633                let report = AnyError::from(container_failure);
634                log::error!(
635                    err = %report.as_display_chain(), %actor_address,
636                    "actor container failure"
637                );
638            },
639        }
640    });
641
642    Ok(actor_address)
643}
644
645fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
646    let ActorContext { subnet_context, .. } = context;
647    let subnet_context_locked = subnet_context
648        .try_lock()
649        .expect("could not lock subnet_context");
650    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
651
652    rt_api.sys_send(
653        peer,
654        SysMsg::Link(SysLink::Exit {
655            sender:   this,
656            receiver: peer,
657            reason:   ExitReason::Terminate,
658        }),
659    )
660}
661
662fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
663    let ActorContext { subnet_context, .. } = context;
664    let subnet_context_locked = subnet_context
665        .try_lock()
666        .expect("could not lock subnet_context");
667    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
668
669    rt_api.sys_send(peer, SysMsg::Kill)
670}
671
672async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
673    let ActorContext { call, .. } = context;
674
675    call.invoke(SysCall::Link {
676        sender:   this,
677        receiver: peer,
678    })
679    .await
680}
681
682async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
683    let ActorContext { call, .. } = context;
684    call.invoke(SysCall::Unlink {
685        sender:   this,
686        receiver: peer,
687    })
688    .await
689}
690
691async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
692    let ActorContext { call, .. } = context;
693    call.invoke(SysCall::TrapExit(enable)).await;
694}
695
696async fn do_watch(context: &mut ActorContext, peer: Address) -> sys::WatchRef {
697    let ActorContext {
698        fork_address: this,
699        call,
700        ..
701    } = context;
702    let (reply_tx, reply_rx) = oneshot::channel();
703    call.invoke(SysCall::Watch {
704        sender: *this,
705        receiver: peer,
706        reply_tx,
707    })
708    .await;
709    reply_rx.await.expect("sys-call remained unanswered")
710}
711
712async fn do_unwatch(context: &mut ActorContext, watch_ref: sys::WatchRef) {
713    let ActorContext {
714        fork_address: this,
715        call,
716        ..
717    } = context;
718    call.invoke(SysCall::Unwatch {
719        sender: *this,
720        watch_ref,
721    })
722    .await;
723}
724
725async fn do_init_done(context: &mut ActorContext, address: Address) {
726    let ActorContext {
727        ack_to,
728        subnet_context,
729        ..
730    } = context;
731    let message = sys::InitAck { address };
732    let Some(ack_to_address) = ack_to.take() else {
733        return;
734    };
735    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
736    let subnet_context_locked = subnet_context
737        .try_lock()
738        .expect("could not lock subnet_context");
739    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
740    let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
741}
742
743fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
744    let sender = context.fork_address;
745    let to = outbound.header().to;
746    {
747        let subnet_context_locked = context
748            .subnet_context
749            .try_lock()
750            .expect("could not lock subnet_context");
751        let SubnetContext {
752            subnet_address,
753            actor_key,
754            message_tap,
755            ..
756        } = &*subnet_context_locked;
757        message_tap.on_send(tap::OnSend {
758            sent_by_addr: sender,
759            sent_by_net:  *subnet_address,
760            sent_by_key:  actor_key,
761            sent_to_addr: to,
762            envelope:     &outbound,
763        });
764    }
765
766    let (message, empty_envelope) = outbound.take();
767    let mut header: EnvelopeHeader = empty_envelope.into();
768
769    let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
770        warn!(
771            envelope_header = ?header,
772            "TTL exhausted, dropping message"
773        );
774        ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
775    })?;
776    header.ttl = new_ttl;
777
778    let outbound = Envelope::new(header, message);
779
780    trace!(envelope = ?outbound, "sending");
781    let ActorContext { subnet_context, .. } = context;
782    let subnet_context_locked = subnet_context
783        .try_lock()
784        .expect("could not lock subnet_context");
785    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
786    rt_api
787        .send_to(outbound.header().to, outbound.header().priority, outbound)
788        .map_err(|k| ErrorOf::new(k, ""))
789}
790
791fn do_forward(
792    context: &mut ActorContext,
793    to: Address,
794    outbound: Envelope,
795) -> Result<(), ErrorOf<SendErrorKind>> {
796    let sender = context.fork_address;
797    {
798        let subnet_context_locked = context
799            .subnet_context
800            .try_lock()
801            .expect("could not lock subnet_context");
802        let SubnetContext {
803            subnet_address,
804            actor_key,
805            message_tap,
806            ..
807        } = &*subnet_context_locked;
808        message_tap.on_send(tap::OnSend {
809            sent_by_addr: sender,
810            sent_by_net:  *subnet_address,
811            sent_by_key:  actor_key,
812            sent_to_addr: to,
813            envelope:     &outbound,
814        });
815    }
816
817    let (message, empty_envelope) = outbound.take();
818    let mut header: EnvelopeHeader = empty_envelope.into();
819
820    let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
821        warn!(
822            forward_to = %to,
823            envelope_header = ?header,
824            "TTL exhausted, dropping message"
825        );
826        ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
827    })?;
828    header.ttl = new_ttl;
829
830    let outbound = Envelope::new(header, message);
831
832    trace!(forward_to = %to, envelope = ?outbound, "forwarding");
833    let ActorContext { subnet_context, .. } = context;
834    let subnet_context_locked = subnet_context
835        .try_lock()
836        .expect("could not lock subnet_context");
837    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
838    rt_api
839        .send_to(to, outbound.header().priority, outbound)
840        .map_err(|k| ErrorOf::new(k, ""))
841}