mm1_node/runtime/context/
impl_context_api.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::chain::{ExactTypeDisplayChainExt, StdErrorDisplayChainExt};
10use mm1_common::errors::error_of::ErrorOf;
11use mm1_common::futures::timeout::FutureTimeoutExt;
12use mm1_common::log;
13use mm1_common::types::{AnyError, Never};
14use mm1_core::context::{
15    Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Ping,
16    PingErrorKind, Quit, RecvErrorKind, SendErrorKind, Start, Stop, Watching,
17};
18use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
19use mm1_core::tracing::{TraceId, WithTraceIdExt};
20use mm1_proto_system as sys;
21use mm1_runnable::local::BoxedRunnable;
22use rand::RngCore;
23use tokio::sync::oneshot;
24use tokio::time::Instant;
25use tracing::{trace, warn};
26
27use crate::config::EffectiveActorConfig;
28use crate::registry::{self, MessageWithPermit, MessageWithoutPermit};
29use crate::runtime::container;
30use crate::runtime::context::{ActorContext, ForkEntry, SubnetContext};
31use crate::runtime::rt_api::RtApi;
32use crate::runtime::sys_call::SysCall;
33use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
34
35impl Quit for ActorContext {
36    async fn quit_ok(&mut self) -> Never {
37        self.call.invoke(SysCall::Exit(Ok(()))).await;
38        std::future::pending().await
39    }
40
41    async fn quit_err<E>(&mut self, reason: E) -> Never
42    where
43        E: std::error::Error + Send + Sync + 'static,
44    {
45        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
46        std::future::pending().await
47    }
48}
49
50impl Fork for ActorContext {
51    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
52        let Self {
53            fork_address: this_address,
54            call,
55            subnet_context,
56            ..
57        } = self;
58
59        let fork_lease = {
60            let mut subnet_context_locked = subnet_context
61                .try_lock()
62                .expect("could not lock subnet_context");
63            let SubnetContext {
64                subnet_pool,
65                fork_entries,
66                ..
67            } = &mut *subnet_context_locked;
68            let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
69                ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
70            })?;
71            let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
72            assert!(should_be_none.is_none());
73            fork_lease
74        };
75        let fork_address = fork_lease.address;
76        trace!(parent = %this_address, child = %fork_address, "forking");
77
78        call.invoke(SysCall::ForkAdded(fork_address)).await;
79
80        let context = Self {
81            subnet_context: subnet_context.clone(),
82            fork_address,
83            fork_lease: Some(fork_lease),
84            ack_to: None,
85            call: call.clone(),
86        };
87
88        Ok(context)
89    }
90
91    async fn run<F, Fut>(self, fun: F)
92    where
93        F: FnOnce(Self) -> Fut,
94        F: Send + 'static,
95        Fut: Future + Send + 'static,
96    {
97        let call = self.call.clone();
98        let fut = fun(self)
99            .map(|_| ())
100            .with_trace_id(TraceId::current())
101            .boxed();
102        call.invoke(SysCall::Spawn(fut)).await;
103    }
104}
105
106impl Now for ActorContext {
107    type Instant = Instant;
108
109    fn now(&self) -> Self::Instant {
110        Instant::now()
111    }
112}
113
114impl Start<BoxedRunnable<Self>> for ActorContext {
115    fn spawn(
116        &mut self,
117        runnable: BoxedRunnable<Self>,
118        link: bool,
119    ) -> impl Future<Output = Result<Address, ErrorOf<sys::SpawnErrorKind>>> + Send {
120        do_spawn(self, runnable, None, link.then_some(self.fork_address))
121    }
122
123    fn start(
124        &mut self,
125        runnable: BoxedRunnable<Self>,
126        link: bool,
127        start_timeout: Duration,
128    ) -> impl Future<Output = Result<Address, ErrorOf<sys::StartErrorKind>>> + Send {
129        do_start(self, runnable, link, start_timeout)
130    }
131}
132
133impl Stop for ActorContext {
134    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
135        let this = self.fork_address;
136        let out = do_exit(self, this, peer).is_ok();
137        std::future::ready(out)
138    }
139
140    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
141        let out = do_kill(self, peer).is_ok();
142        std::future::ready(out)
143    }
144}
145
146impl Linking for ActorContext {
147    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
148        let this = self.fork_address;
149        do_link(self, this, peer)
150    }
151
152    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
153        let this = self.fork_address;
154        do_unlink(self, this, peer)
155    }
156
157    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
158        do_set_trap_exit(self, enable)
159    }
160}
161
162impl Watching for ActorContext {
163    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
164        do_watch(self, peer)
165    }
166
167    fn unwatch(
168        &mut self,
169        watch_ref: mm1_proto_system::WatchRef,
170    ) -> impl Future<Output = ()> + Send {
171        do_unwatch(self, watch_ref)
172    }
173}
174
175impl InitDone for ActorContext {
176    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
177        do_init_done(self, address)
178    }
179}
180
181impl Messaging for ActorContext {
182    fn address(&self) -> Address {
183        self.fork_address
184    }
185
186    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
187        let ActorContext {
188            fork_address,
189            subnet_context,
190            ..
191        } = self;
192
193        loop {
194            let (inbound_envelope_opt, subnet_notify, fork_notify) = {
195                let mut subnet_context_locked = subnet_context
196                    .try_lock()
197                    .expect("could not lock subnet_context");
198                let SubnetContext {
199                    rt_api,
200                    subnet_notify,
201                    rx_priority,
202                    rx_regular,
203                    fork_entries,
204                    bound_subnets,
205                    message_tap,
206                    ..
207                } = &mut *subnet_context_locked;
208
209                process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)?;
210
211                let this_fork_entry = fork_entries
212                    .get_mut(fork_address)
213                    .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
214                let inbound_envelope_opt = if let Some(priority_message) =
215                    this_fork_entry.inbox_priority.pop_front()
216                {
217                    Some(priority_message.message)
218                } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
219                    Some(regular_message.message)
220                } else {
221                    None
222                };
223                if let Some(inbound_envelope) = inbound_envelope_opt.as_ref() {
224                    message_tap.on_recv(*fork_address, inbound_envelope);
225                }
226                (
227                    inbound_envelope_opt,
228                    subnet_notify.clone(),
229                    this_fork_entry.fork_notifiy.clone(),
230                )
231            };
232
233            if let Some(inbound_envelope) = inbound_envelope_opt {
234                break Ok(inbound_envelope)
235            } else {
236                let subnet_notified = subnet_notify.notified();
237                let fork_notified = fork_notify.notified();
238
239                tokio::select! {
240                    _ = subnet_notified => (),
241                    _ = fork_notified => (),
242                }
243            }
244        }
245    }
246
247    async fn close(&mut self) {
248        // self.rx_regular.close();
249        // self.rx_priority.close();
250        todo!()
251    }
252
253    fn send(
254        &mut self,
255        envelope: Envelope,
256    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
257        std::future::ready(do_send(self, envelope))
258    }
259
260    fn forward(
261        &mut self,
262        to: Address,
263        envelope: Envelope,
264    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
265        std::future::ready(do_forward(self, to, envelope))
266    }
267}
268
269impl Bind<NetAddress> for ActorContext {
270    async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
271        use std::collections::btree_map::Entry::*;
272
273        let BindArgs {
274            bind_to,
275            inbox_size,
276        } = args;
277        let address_range = AddressRange::from(bind_to);
278
279        log::debug!(%bind_to, %inbox_size, "binding");
280
281        let Self {
282            fork_address,
283            subnet_context,
284            ..
285        } = self;
286
287        {
288            let mut subnet_context_locked = subnet_context
289                .try_lock()
290                .expect("could not lock subnet_context");
291
292            let SubnetContext {
293                rt_api,
294                subnet_mailbox_tx,
295                bound_subnets,
296                ..
297            } = &mut *subnet_context_locked;
298
299            let bound_subnet_entry = match bound_subnets.entry(address_range) {
300                Vacant(v) => v,
301                Occupied(o) => {
302                    let previously_bound_to = NetAddress::from(*o.key());
303                    return Err(ErrorOf::new(
304                        BindErrorKind::Conflict,
305                        format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
306                    ))
307                },
308            };
309            let subnet_lease = Lease::trusted(bind_to);
310
311            let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
312                BindErrorKind::Closed,
313                "the actor subnet is probably unregistered",
314            ))?;
315            let bound_subnet_node =
316                registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
317
318            rt_api
319                .registry()
320                .register(bind_to, bound_subnet_node)
321                .map_err(|_| {
322                    ErrorOf::new(
323                        BindErrorKind::Conflict,
324                        "could not register the subnet-node",
325                    )
326                })?;
327
328            bound_subnet_entry.insert(*fork_address);
329        }
330
331        log::info!(%bind_to, %inbox_size, "bound");
332
333        Ok(())
334    }
335}
336
337impl Ping for ActorContext {
338    async fn ping(
339        &mut self,
340        address: Address,
341        timeout: Duration,
342    ) -> Result<Duration, ErrorOf<PingErrorKind>> {
343        let ping_id = rand::rng().next_u64();
344        let now = Instant::now();
345        let deadline = now.checked_add(timeout).unwrap_or(now);
346
347        let Self {
348            fork_address,
349            subnet_context,
350            ..
351        } = self;
352
353        let (subnet_notify, fork_notify) = {
354            let mut subnet_context_locked = subnet_context
355                .try_lock()
356                .expect("could not lock subnet_context");
357            let SubnetContext {
358                rt_api,
359                fork_entries,
360                subnet_notify,
361                ..
362            } = &mut *subnet_context_locked;
363            let ForkEntry { fork_notifiy, .. } = fork_entries
364                .get_mut(fork_address)
365                .expect("fork_entry missing");
366
367            let ping_msg = sys::Ping {
368                reply_to: Some(*fork_address),
369                id:       ping_id,
370            };
371
372            let ping_header = EnvelopeHeader::to_address(address).with_priority(true);
373            let ping_envelope = Envelope::new(ping_header, ping_msg).into_erased();
374            rt_api
375                .send_to(address, true, ping_envelope)
376                .map_err(|e| ErrorOf::new(PingErrorKind::Send, e.to_string()))?;
377
378            (subnet_notify.clone(), fork_notifiy.clone())
379        };
380
381        loop {
382            let timeout = tokio::time::sleep_until(deadline);
383            tokio::select! {
384                _ = timeout => { return Err(ErrorOf::new(PingErrorKind::Timeout, "timeout elapsed")) },
385                _ = subnet_notify.notified() => (),
386                _ = fork_notify.notified() => (),
387            };
388
389            let mut subnet_context_locked = subnet_context
390                .try_lock()
391                .expect("could not lock subnet_context");
392            let SubnetContext {
393                rt_api,
394                fork_entries,
395                bound_subnets,
396                rx_priority,
397                rx_regular,
398                ..
399            } = &mut *subnet_context_locked;
400
401            process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)
402                .map_err(|e| ErrorOf::new(PingErrorKind::Recv, e.to_string()))?;
403
404            if fork_entries
405                .get(fork_address)
406                .expect("fork_entry_missing")
407                .last_ping_received
408                == Some(ping_id)
409            {
410                break
411            }
412        }
413        Ok(now.elapsed())
414    }
415}
416
417fn process_inlets(
418    rt_api: &RtApi,
419    bound_subnets: &BTreeMap<AddressRange, Address>,
420    rx_priority: &mut kanal::Receiver<MessageWithoutPermit<Envelope>>,
421    rx_regular: &mut kanal::Receiver<MessageWithPermit<Envelope>>,
422    fork_entries: &mut HashMap<Address, ForkEntry>,
423) -> Result<(), ErrorOf<RecvErrorKind>> {
424    let mut notified_forks = HashSet::new();
425    while let Some(message) = rx_priority
426        .try_recv_realtime()
427        .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
428    {
429        let message_to = bound_subnets
430            .get(&AddressRange::from(message.to))
431            .copied()
432            .unwrap_or(message.to);
433        let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
434            warn!(dst = %message_to, "no such fork");
435            continue
436        };
437        let should_notify = if let Some(ping_message) = message.message.peek::<sys::Ping>() {
438            let sys::Ping { reply_to, id } = *ping_message;
439            if let Some(reply_to) = reply_to {
440                trace!(%id, %reply_to, "received a ping request");
441                let pong_header = EnvelopeHeader::to_address(reply_to).with_priority(true);
442                let pong_envelope =
443                    Envelope::new(pong_header, sys::Ping { reply_to: None, id }).into_erased();
444                rt_api
445                    .send_to(reply_to, true, pong_envelope)
446                    .inspect_err(
447                        |e| warn!(reason = %e.as_display_chain(), "can't send ping-response"),
448                    )
449                    .ok();
450                trace!(%id, %reply_to, "send a ping response");
451                false
452            } else {
453                trace!(%id, "received a ping response");
454                fork_entry.last_ping_received = Some(id);
455                true
456            }
457        } else {
458            fork_entry.inbox_priority.push_back(message);
459            true
460        };
461
462        if should_notify && notified_forks.insert(message_to) {
463            trace!(dst = %message_to, "notifying fork");
464            fork_entry.fork_notifiy.notify_one();
465        }
466    }
467
468    while let Some(message) = rx_regular
469        .try_recv_realtime()
470        .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
471    {
472        let message_to = bound_subnets
473            .get(&AddressRange::from(message.to))
474            .copied()
475            .unwrap_or(message.to);
476        let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
477            warn!(dst = %message_to, "no such fork");
478            continue
479        };
480        fork_entry.inbox_regular.push_back(message);
481        trace!(dst = %message_to, "subnet received regular message");
482        if notified_forks.insert(message_to) {
483            trace!(dst = %message_to, "notifying fork");
484            fork_entry.fork_notifiy.notify_one();
485        }
486    }
487
488    Ok(())
489}
490
491async fn do_start(
492    context: &mut ActorContext,
493    runnable: BoxedRunnable<ActorContext>,
494    link: bool,
495    timeout: Duration,
496) -> Result<Address, ErrorOf<sys::StartErrorKind>> {
497    let this_address = context.fork_address;
498
499    let mut fork = context
500        .fork()
501        .await
502        .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
503
504    let fork_address = fork.fork_address;
505    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
506        .await
507        .map_err(|e| e.map_kind(sys::StartErrorKind::Spawn))?;
508
509    let envelope = match fork.recv().timeout(timeout).await {
510        Err(_elapsed) => {
511            do_kill(context, spawned_address)
512                .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
513
514            // TODO: should we ensure termination with a `system::Watch`?
515
516            return Err(ErrorOf::new(
517                sys::StartErrorKind::Timeout,
518                "no init-ack within timeout",
519            ))
520        },
521        Ok(recv_result) => {
522            recv_result
523                .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?
524        },
525    };
526
527    dispatch!(match envelope {
528        sys::InitAck { address } => {
529            if link {
530                do_link(context, this_address, address).await;
531            }
532            Ok(address)
533        },
534
535        sys::Exited { .. } => {
536            Err(ErrorOf::new(
537                sys::StartErrorKind::Exited,
538                "exited before init-ack",
539            ))
540        },
541
542        unexpected @ _ => {
543            Err(ErrorOf::new(
544                sys::StartErrorKind::InternalError,
545                format!("unexpected message: {unexpected:?}"),
546            ))
547        },
548    })
549}
550
551async fn do_spawn(
552    context: &mut ActorContext,
553    runnable: BoxedRunnable<ActorContext>,
554    ack_to: Option<Address>,
555    link_to: impl IntoIterator<Item = Address>,
556) -> Result<Address, ErrorOf<sys::SpawnErrorKind>> {
557    let ActorContext { subnet_context, .. } = context;
558
559    let (actor_key, rt_config, rt_api, tx_actor_failure) = {
560        let subnet_context_locked = subnet_context
561            .try_lock()
562            .expect("could not lock subnet_context");
563        let SubnetContext {
564            rt_api,
565            rt_config,
566            actor_key,
567            tx_actor_failure,
568            ..
569        } = &*subnet_context_locked;
570
571        (
572            actor_key.child(runnable.func_name()),
573            rt_config.clone(),
574            rt_api.clone(),
575            tx_actor_failure.clone(),
576        )
577    };
578
579    let actor_config = rt_config.actor_config(&actor_key);
580    let execute_on = rt_api.choose_executor(actor_config.runtime_key());
581    let message_tap = rt_api.message_tap(actor_config.message_tap_key());
582
583    trace!(?ack_to, "starting");
584
585    let subnet_lease = rt_api
586        .request_address(actor_config.netmask())
587        .await
588        .inspect_err(|e| log::error!("lease-error: {}", e))
589        .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::ResourceConstraint, e.to_string()))?;
590
591    trace!(subnet_lease = %subnet_lease.net_address(), "subnet leased");
592
593    let rt_api = rt_api.clone();
594    let rt_config = rt_config.clone();
595    let container = container::Container::create(
596        container::ContainerArgs {
597            ack_to,
598            // FIXME: can we make it IntoIterator too?
599            link_to: link_to.into_iter().collect(),
600            actor_key,
601            trace_id: TraceId::current(),
602
603            subnet_lease,
604            rt_api,
605            rt_config,
606            message_tap,
607            tx_actor_failure: tx_actor_failure.clone(),
608        },
609        runnable,
610    )
611    .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::InternalError, e.to_string()))?;
612    let actor_address = container.actor_address();
613
614    trace!(spawned_address = %actor_address, "about to run spawned actor");
615
616    let tx_actor_failure = tx_actor_failure.clone();
617    // TODO: maybe keep it somewhere too?
618    let _join_handle = execute_on.spawn(async move {
619        match container.run().await {
620            Ok(Ok(())) => (),
621            Ok(Err(actor_failure)) => {
622                let _ = tx_actor_failure.send((actor_address, actor_failure));
623            },
624            Err(container_failure) => {
625                let report = AnyError::from(container_failure);
626                log::error!(
627                    err = %report.as_display_chain(), %actor_address,
628                    "actor container failure"
629                );
630            },
631        }
632    });
633
634    Ok(actor_address)
635}
636
637fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
638    let ActorContext { subnet_context, .. } = context;
639    let subnet_context_locked = subnet_context
640        .try_lock()
641        .expect("could not lock subnet_context");
642    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
643
644    rt_api.sys_send(
645        peer,
646        SysMsg::Link(SysLink::Exit {
647            sender:   this,
648            receiver: peer,
649            reason:   ExitReason::Terminate,
650        }),
651    )
652}
653
654fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
655    let ActorContext { subnet_context, .. } = context;
656    let subnet_context_locked = subnet_context
657        .try_lock()
658        .expect("could not lock subnet_context");
659    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
660
661    rt_api.sys_send(peer, SysMsg::Kill)
662}
663
664async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
665    let ActorContext { call, .. } = context;
666
667    call.invoke(SysCall::Link {
668        sender:   this,
669        receiver: peer,
670    })
671    .await
672}
673
674async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
675    let ActorContext { call, .. } = context;
676    call.invoke(SysCall::Unlink {
677        sender:   this,
678        receiver: peer,
679    })
680    .await
681}
682
683async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
684    let ActorContext { call, .. } = context;
685    call.invoke(SysCall::TrapExit(enable)).await;
686}
687
688async fn do_watch(context: &mut ActorContext, peer: Address) -> sys::WatchRef {
689    let ActorContext {
690        fork_address: this,
691        call,
692        ..
693    } = context;
694    let (reply_tx, reply_rx) = oneshot::channel();
695    call.invoke(SysCall::Watch {
696        sender: *this,
697        receiver: peer,
698        reply_tx,
699    })
700    .await;
701    reply_rx.await.expect("sys-call remained unanswered")
702}
703
704async fn do_unwatch(context: &mut ActorContext, watch_ref: sys::WatchRef) {
705    let ActorContext {
706        fork_address: this,
707        call,
708        ..
709    } = context;
710    call.invoke(SysCall::Unwatch {
711        sender: *this,
712        watch_ref,
713    })
714    .await;
715}
716
717async fn do_init_done(context: &mut ActorContext, address: Address) {
718    let ActorContext {
719        ack_to,
720        subnet_context,
721        ..
722    } = context;
723    let message = sys::InitAck { address };
724    let Some(ack_to_address) = ack_to.take() else {
725        return;
726    };
727    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
728    let subnet_context_locked = subnet_context
729        .try_lock()
730        .expect("could not lock subnet_context");
731    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
732    let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
733}
734
735fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
736    let sender = context.fork_address;
737    let to = outbound.header().to;
738    {
739        let subnet_context_locked = context
740            .subnet_context
741            .try_lock()
742            .expect("could not lock subnet_context");
743        let SubnetContext { message_tap, .. } = &*subnet_context_locked;
744        message_tap.on_send(sender, to, &outbound);
745    }
746
747    let (message, empty_envelope) = outbound.take();
748    let mut header: EnvelopeHeader = empty_envelope.into();
749
750    let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
751        warn!(
752            envelope_header = ?header,
753            "TTL exhausted, dropping message"
754        );
755        ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
756    })?;
757    header.ttl = new_ttl;
758
759    let outbound = Envelope::new(header, message);
760
761    trace!(envelope = ?outbound, "sending");
762    let ActorContext { subnet_context, .. } = context;
763    let subnet_context_locked = subnet_context
764        .try_lock()
765        .expect("could not lock subnet_context");
766    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
767    rt_api
768        .send_to(outbound.header().to, outbound.header().priority, outbound)
769        .map_err(|k| ErrorOf::new(k, ""))
770}
771
772fn do_forward(
773    context: &mut ActorContext,
774    to: Address,
775    outbound: Envelope,
776) -> Result<(), ErrorOf<SendErrorKind>> {
777    let sender = context.fork_address;
778    {
779        let subnet_context_locked = context
780            .subnet_context
781            .try_lock()
782            .expect("could not lock subnet_context");
783        let SubnetContext { message_tap, .. } = &*subnet_context_locked;
784        message_tap.on_send(sender, to, &outbound);
785    }
786
787    let (message, empty_envelope) = outbound.take();
788    let mut header: EnvelopeHeader = empty_envelope.into();
789
790    let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
791        warn!(
792            forward_to = %to,
793            envelope_header = ?header,
794            "TTL exhausted, dropping message"
795        );
796        ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
797    })?;
798    header.ttl = new_ttl;
799
800    let outbound = Envelope::new(header, message);
801
802    trace!(forward_to = %to, envelope = ?outbound, "forwarding");
803    let ActorContext { subnet_context, .. } = context;
804    let subnet_context_locked = subnet_context
805        .try_lock()
806        .expect("could not lock subnet_context");
807    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
808    rt_api
809        .send_to(to, outbound.header().priority, outbound)
810        .map_err(|k| ErrorOf::new(k, ""))
811}