mm1_node/runtime/context/
impl_context_api.rs

1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::chain::{ExactTypeDisplayChainExt, StdErrorDisplayChainExt};
10use mm1_common::errors::error_of::ErrorOf;
11use mm1_common::futures::timeout::FutureTimeoutExt;
12use mm1_common::log;
13use mm1_common::types::{AnyError, Never};
14use mm1_core::context::{
15    Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Ping,
16    PingErrorKind, Quit, RecvErrorKind, SendErrorKind, Start, Stop, Watching,
17};
18use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
19use mm1_core::tracing::{TraceId, WithTraceIdExt};
20use mm1_proto_system as sys;
21use mm1_runnable::local::BoxedRunnable;
22use rand::RngCore;
23use tokio::sync::oneshot;
24use tokio::time::Instant;
25use tracing::{trace, warn};
26
27use crate::config::EffectiveActorConfig;
28use crate::registry::{self, MessageWithPermit, MessageWithoutPermit};
29use crate::runtime::container;
30use crate::runtime::context::{ActorContext, ForkEntry, SubnetContext};
31use crate::runtime::rt_api::RtApi;
32use crate::runtime::sys_call::SysCall;
33use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
34
35impl Quit for ActorContext {
36    async fn quit_ok(&mut self) -> Never {
37        self.call.invoke(SysCall::Exit(Ok(()))).await;
38        std::future::pending().await
39    }
40
41    async fn quit_err<E>(&mut self, reason: E) -> Never
42    where
43        E: std::error::Error + Send + Sync + 'static,
44    {
45        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
46        std::future::pending().await
47    }
48}
49
50impl Fork for ActorContext {
51    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
52        let Self {
53            fork_address: this_address,
54            call,
55            subnet_context,
56            ..
57        } = self;
58
59        let fork_lease = {
60            let mut subnet_context_locked = subnet_context
61                .try_lock()
62                .expect("could not lock subnet_context");
63            let SubnetContext {
64                subnet_pool,
65                fork_entries,
66                ..
67            } = &mut *subnet_context_locked;
68            let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
69                ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
70            })?;
71            let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
72            assert!(should_be_none.is_none());
73            fork_lease
74        };
75        let fork_address = fork_lease.address;
76        trace!(parent = %this_address, child = %fork_address, "forking");
77
78        call.invoke(SysCall::ForkAdded(fork_address)).await;
79
80        let context = Self {
81            subnet_context: subnet_context.clone(),
82            fork_address,
83            fork_lease: Some(fork_lease),
84            ack_to: None,
85            call: call.clone(),
86        };
87
88        Ok(context)
89    }
90
91    async fn run<F, Fut>(self, fun: F)
92    where
93        F: FnOnce(Self) -> Fut,
94        F: Send + 'static,
95        Fut: Future + Send + 'static,
96    {
97        let call = self.call.clone();
98        let fut = fun(self)
99            .map(|_| ())
100            .with_trace_id(TraceId::current())
101            .boxed();
102        call.invoke(SysCall::Spawn(fut)).await;
103    }
104}
105
106impl Now for ActorContext {
107    type Instant = Instant;
108
109    fn now(&self) -> Self::Instant {
110        Instant::now()
111    }
112}
113
114impl Start<BoxedRunnable<Self>> for ActorContext {
115    fn spawn(
116        &mut self,
117        runnable: BoxedRunnable<Self>,
118        link: bool,
119    ) -> impl Future<Output = Result<Address, ErrorOf<sys::SpawnErrorKind>>> + Send {
120        do_spawn(self, runnable, None, link.then_some(self.fork_address))
121    }
122
123    fn start(
124        &mut self,
125        runnable: BoxedRunnable<Self>,
126        link: bool,
127        start_timeout: Duration,
128    ) -> impl Future<Output = Result<Address, ErrorOf<sys::StartErrorKind>>> + Send {
129        do_start(self, runnable, link, start_timeout)
130    }
131}
132
133impl Stop for ActorContext {
134    fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
135        let this = self.fork_address;
136        let out = do_exit(self, this, peer).is_ok();
137        std::future::ready(out)
138    }
139
140    fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
141        let out = do_kill(self, peer).is_ok();
142        std::future::ready(out)
143    }
144}
145
146impl Linking for ActorContext {
147    fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
148        let this = self.fork_address;
149        do_link(self, this, peer)
150    }
151
152    fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
153        let this = self.fork_address;
154        do_unlink(self, this, peer)
155    }
156
157    fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
158        do_set_trap_exit(self, enable)
159    }
160}
161
162impl Watching for ActorContext {
163    fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
164        do_watch(self, peer)
165    }
166
167    fn unwatch(
168        &mut self,
169        watch_ref: mm1_proto_system::WatchRef,
170    ) -> impl Future<Output = ()> + Send {
171        do_unwatch(self, watch_ref)
172    }
173}
174
175impl InitDone for ActorContext {
176    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
177        do_init_done(self, address)
178    }
179}
180
181impl Messaging for ActorContext {
182    fn address(&self) -> Address {
183        self.fork_address
184    }
185
186    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
187        let ActorContext {
188            fork_address,
189            subnet_context,
190            ..
191        } = self;
192
193        loop {
194            let (inbound_envelope_opt, subnet_notify, fork_notify) = {
195                let mut subnet_context_locked = subnet_context
196                    .try_lock()
197                    .expect("could not lock subnet_context");
198                let SubnetContext {
199                    rt_api,
200                    subnet_notify,
201                    rx_priority,
202                    rx_regular,
203                    fork_entries,
204                    bound_subnets,
205                    ..
206                } = &mut *subnet_context_locked;
207
208                process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)?;
209
210                let this_fork_entry = fork_entries
211                    .get_mut(fork_address)
212                    .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
213                let inbound_envelope_opt = if let Some(priority_message) =
214                    this_fork_entry.inbox_priority.pop_front()
215                {
216                    Some(priority_message.message)
217                } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
218                    Some(regular_message.message)
219                } else {
220                    None
221                };
222                (
223                    inbound_envelope_opt,
224                    subnet_notify.clone(),
225                    this_fork_entry.fork_notifiy.clone(),
226                )
227            };
228
229            if let Some(inbound_envelope) = inbound_envelope_opt {
230                break Ok(inbound_envelope)
231            } else {
232                let subnet_notified = subnet_notify.notified();
233                let fork_notified = fork_notify.notified();
234
235                tokio::select! {
236                    _ = subnet_notified => (),
237                    _ = fork_notified => (),
238                }
239            }
240        }
241    }
242
243    async fn close(&mut self) {
244        // self.rx_regular.close();
245        // self.rx_priority.close();
246        todo!()
247    }
248
249    fn send(
250        &mut self,
251        envelope: Envelope,
252    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
253        std::future::ready(do_send(self, envelope))
254    }
255
256    fn forward(
257        &mut self,
258        to: Address,
259        envelope: Envelope,
260    ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
261        std::future::ready(do_forward(self, to, envelope))
262    }
263}
264
265impl Bind<NetAddress> for ActorContext {
266    async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
267        use std::collections::btree_map::Entry::*;
268
269        let BindArgs {
270            bind_to,
271            inbox_size,
272        } = args;
273        let address_range = AddressRange::from(bind_to);
274
275        log::debug!(%bind_to, %inbox_size, "binding");
276
277        let Self {
278            fork_address,
279            subnet_context,
280            ..
281        } = self;
282
283        {
284            let mut subnet_context_locked = subnet_context
285                .try_lock()
286                .expect("could not lock subnet_context");
287
288            let SubnetContext {
289                rt_api,
290                subnet_mailbox_tx,
291                bound_subnets,
292                ..
293            } = &mut *subnet_context_locked;
294
295            let bound_subnet_entry = match bound_subnets.entry(address_range) {
296                Vacant(v) => v,
297                Occupied(o) => {
298                    let previously_bound_to = NetAddress::from(*o.key());
299                    return Err(ErrorOf::new(
300                        BindErrorKind::Conflict,
301                        format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
302                    ))
303                },
304            };
305            let subnet_lease = Lease::trusted(bind_to);
306
307            let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
308                BindErrorKind::Closed,
309                "the actor subnet is probably unregistered",
310            ))?;
311            let bound_subnet_node =
312                registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
313
314            rt_api
315                .registry()
316                .register(bind_to, bound_subnet_node)
317                .map_err(|_| {
318                    ErrorOf::new(
319                        BindErrorKind::Conflict,
320                        "could not register the subnet-node",
321                    )
322                })?;
323
324            bound_subnet_entry.insert(*fork_address);
325        }
326
327        log::info!(%bind_to, %inbox_size, "bound");
328
329        Ok(())
330    }
331}
332
333impl Ping for ActorContext {
334    async fn ping(
335        &mut self,
336        address: Address,
337        timeout: Duration,
338    ) -> Result<Duration, ErrorOf<PingErrorKind>> {
339        let ping_id = rand::rng().next_u64();
340        let now = Instant::now();
341        let deadline = now.checked_add(timeout).unwrap_or(now);
342
343        let Self {
344            fork_address,
345            subnet_context,
346            ..
347        } = self;
348
349        let (subnet_notify, fork_notify) = {
350            let mut subnet_context_locked = subnet_context
351                .try_lock()
352                .expect("could not lock subnet_context");
353            let SubnetContext {
354                rt_api,
355                fork_entries,
356                subnet_notify,
357                ..
358            } = &mut *subnet_context_locked;
359            let ForkEntry { fork_notifiy, .. } = fork_entries
360                .get_mut(fork_address)
361                .expect("fork_entry missing");
362
363            let ping_msg = sys::Ping {
364                reply_to: Some(*fork_address),
365                id:       ping_id,
366            };
367
368            let ping_header = EnvelopeHeader::to_address(address).with_priority(true);
369            let ping_envelope = Envelope::new(ping_header, ping_msg).into_erased();
370            rt_api
371                .send_to(address, true, ping_envelope)
372                .map_err(|e| ErrorOf::new(PingErrorKind::Send, e.to_string()))?;
373
374            (subnet_notify.clone(), fork_notifiy.clone())
375        };
376
377        loop {
378            let timeout = tokio::time::sleep_until(deadline);
379            tokio::select! {
380                _ = timeout => { return Err(ErrorOf::new(PingErrorKind::Timeout, "timeout elapsed")) },
381                _ = subnet_notify.notified() => (),
382                _ = fork_notify.notified() => (),
383            };
384
385            let mut subnet_context_locked = subnet_context
386                .try_lock()
387                .expect("could not lock subnet_context");
388            let SubnetContext {
389                rt_api,
390                fork_entries,
391                bound_subnets,
392                rx_priority,
393                rx_regular,
394                ..
395            } = &mut *subnet_context_locked;
396
397            process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)
398                .map_err(|e| ErrorOf::new(PingErrorKind::Recv, e.to_string()))?;
399
400            if fork_entries
401                .get(fork_address)
402                .expect("fork_entry_missing")
403                .last_ping_received
404                == Some(ping_id)
405            {
406                break
407            }
408        }
409        Ok(now.elapsed())
410    }
411}
412
413fn process_inlets(
414    rt_api: &RtApi,
415    bound_subnets: &BTreeMap<AddressRange, Address>,
416    rx_priority: &mut kanal::Receiver<MessageWithoutPermit<Envelope>>,
417    rx_regular: &mut kanal::Receiver<MessageWithPermit<Envelope>>,
418    fork_entries: &mut HashMap<Address, ForkEntry>,
419) -> Result<(), ErrorOf<RecvErrorKind>> {
420    let mut notified_forks = HashSet::new();
421    while let Some(message) = rx_priority
422        .try_recv_realtime()
423        .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
424    {
425        let message_to = bound_subnets
426            .get(&AddressRange::from(message.to))
427            .copied()
428            .unwrap_or(message.to);
429        let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
430            warn!(dst = %message_to, "no such fork");
431            continue
432        };
433        let should_notify = if let Some(ping_message) = message.message.peek::<sys::Ping>() {
434            let sys::Ping { reply_to, id } = *ping_message;
435            if let Some(reply_to) = reply_to {
436                trace!(%id, %reply_to, "received a ping request");
437                let pong_header = EnvelopeHeader::to_address(reply_to).with_priority(true);
438                let pong_envelope =
439                    Envelope::new(pong_header, sys::Ping { reply_to: None, id }).into_erased();
440                rt_api
441                    .send_to(reply_to, true, pong_envelope)
442                    .inspect_err(
443                        |e| warn!(reason = %e.as_display_chain(), "can't send ping-response"),
444                    )
445                    .ok();
446                trace!(%id, %reply_to, "send a ping response");
447                false
448            } else {
449                trace!(%id, "received a ping response");
450                fork_entry.last_ping_received = Some(id);
451                true
452            }
453        } else {
454            fork_entry.inbox_priority.push_back(message);
455            true
456        };
457
458        if should_notify && notified_forks.insert(message_to) {
459            trace!(dst = %message_to, "notifying fork");
460            fork_entry.fork_notifiy.notify_one();
461        }
462    }
463
464    while let Some(message) = rx_regular
465        .try_recv_realtime()
466        .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
467    {
468        let message_to = bound_subnets
469            .get(&AddressRange::from(message.to))
470            .copied()
471            .unwrap_or(message.to);
472        let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
473            warn!(dst = %message_to, "no such fork");
474            continue
475        };
476        fork_entry.inbox_regular.push_back(message);
477        trace!(dst = %message_to, "subnet received regular message");
478        if notified_forks.insert(message_to) {
479            trace!(dst = %message_to, "notifying fork");
480            fork_entry.fork_notifiy.notify_one();
481        }
482    }
483
484    Ok(())
485}
486
487async fn do_start(
488    context: &mut ActorContext,
489    runnable: BoxedRunnable<ActorContext>,
490    link: bool,
491    timeout: Duration,
492) -> Result<Address, ErrorOf<sys::StartErrorKind>> {
493    let this_address = context.fork_address;
494
495    let mut fork = context
496        .fork()
497        .await
498        .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
499
500    let fork_address = fork.fork_address;
501    let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
502        .await
503        .map_err(|e| e.map_kind(sys::StartErrorKind::Spawn))?;
504
505    let envelope = match fork.recv().timeout(timeout).await {
506        Err(_elapsed) => {
507            do_kill(context, spawned_address)
508                .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
509
510            // TODO: should we ensure termination with a `system::Watch`?
511
512            return Err(ErrorOf::new(
513                sys::StartErrorKind::Timeout,
514                "no init-ack within timeout",
515            ))
516        },
517        Ok(recv_result) => {
518            recv_result
519                .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?
520        },
521    };
522
523    dispatch!(match envelope {
524        sys::InitAck { address } => {
525            if link {
526                do_link(context, this_address, address).await;
527            }
528            Ok(address)
529        },
530
531        sys::Exited { .. } => {
532            Err(ErrorOf::new(
533                sys::StartErrorKind::Exited,
534                "exited before init-ack",
535            ))
536        },
537
538        unexpected @ _ => {
539            Err(ErrorOf::new(
540                sys::StartErrorKind::InternalError,
541                format!("unexpected message: {unexpected:?}"),
542            ))
543        },
544    })
545}
546
547async fn do_spawn(
548    context: &mut ActorContext,
549    runnable: BoxedRunnable<ActorContext>,
550    ack_to: Option<Address>,
551    link_to: impl IntoIterator<Item = Address>,
552) -> Result<Address, ErrorOf<sys::SpawnErrorKind>> {
553    let ActorContext { subnet_context, .. } = context;
554
555    let (actor_key, rt_config, rt_api, tx_actor_failure) = {
556        let subnet_context_locked = subnet_context
557            .try_lock()
558            .expect("could not lock subnet_context");
559        let SubnetContext {
560            rt_api,
561            rt_config,
562            actor_key,
563            tx_actor_failure,
564            ..
565        } = &*subnet_context_locked;
566
567        (
568            actor_key.child(runnable.func_name()),
569            rt_config.clone(),
570            rt_api.clone(),
571            tx_actor_failure.clone(),
572        )
573    };
574
575    let actor_config = rt_config.actor_config(&actor_key);
576    let execute_on = rt_api.choose_executor(actor_config.runtime_key());
577
578    trace!(?ack_to, "starting");
579
580    let subnet_lease = rt_api
581        .request_address(actor_config.netmask())
582        .await
583        .inspect_err(|e| log::error!("lease-error: {}", e))
584        .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::ResourceConstraint, e.to_string()))?;
585
586    trace!(subnet_lease = %subnet_lease.net_address(), "subnet leased");
587
588    let rt_api = rt_api.clone();
589    let rt_config = rt_config.clone();
590    let container = container::Container::create(
591        container::ContainerArgs {
592            ack_to,
593            // FIXME: can we make it IntoIterator too?
594            link_to: link_to.into_iter().collect(),
595            actor_key,
596            trace_id: TraceId::current(),
597
598            subnet_lease,
599            rt_api,
600            rt_config,
601        },
602        runnable,
603        tx_actor_failure.clone(),
604    )
605    .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::InternalError, e.to_string()))?;
606    let actor_address = container.actor_address();
607
608    trace!(spawned_address = %actor_address, "about to run spawned actor");
609
610    let tx_actor_failure = tx_actor_failure.clone();
611    // TODO: maybe keep it somewhere too?
612    let _join_handle = execute_on.spawn(async move {
613        match container.run().await {
614            Ok(Ok(())) => (),
615            Ok(Err(actor_failure)) => {
616                let _ = tx_actor_failure.send((actor_address, actor_failure));
617            },
618            Err(container_failure) => {
619                let report = AnyError::from(container_failure);
620                log::error!(
621                    err = %report.as_display_chain(), %actor_address,
622                    "actor container failure"
623                );
624            },
625        }
626    });
627
628    Ok(actor_address)
629}
630
631fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
632    let ActorContext { subnet_context, .. } = context;
633    let subnet_context_locked = subnet_context
634        .try_lock()
635        .expect("could not lock subnet_context");
636    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
637
638    rt_api.sys_send(
639        peer,
640        SysMsg::Link(SysLink::Exit {
641            sender:   this,
642            receiver: peer,
643            reason:   ExitReason::Terminate,
644        }),
645    )
646}
647
648fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
649    let ActorContext { subnet_context, .. } = context;
650    let subnet_context_locked = subnet_context
651        .try_lock()
652        .expect("could not lock subnet_context");
653    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
654
655    rt_api.sys_send(peer, SysMsg::Kill)
656}
657
658async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
659    let ActorContext { call, .. } = context;
660
661    call.invoke(SysCall::Link {
662        sender:   this,
663        receiver: peer,
664    })
665    .await
666}
667
668async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
669    let ActorContext { call, .. } = context;
670    call.invoke(SysCall::Unlink {
671        sender:   this,
672        receiver: peer,
673    })
674    .await
675}
676
677async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
678    let ActorContext { call, .. } = context;
679    call.invoke(SysCall::TrapExit(enable)).await;
680}
681
682async fn do_watch(context: &mut ActorContext, peer: Address) -> sys::WatchRef {
683    let ActorContext {
684        fork_address: this,
685        call,
686        ..
687    } = context;
688    let (reply_tx, reply_rx) = oneshot::channel();
689    call.invoke(SysCall::Watch {
690        sender: *this,
691        receiver: peer,
692        reply_tx,
693    })
694    .await;
695    reply_rx.await.expect("sys-call remained unanswered")
696}
697
698async fn do_unwatch(context: &mut ActorContext, watch_ref: sys::WatchRef) {
699    let ActorContext {
700        fork_address: this,
701        call,
702        ..
703    } = context;
704    call.invoke(SysCall::Unwatch {
705        sender: *this,
706        watch_ref,
707    })
708    .await;
709}
710
711async fn do_init_done(context: &mut ActorContext, address: Address) {
712    let ActorContext {
713        ack_to,
714        subnet_context,
715        ..
716    } = context;
717    let message = sys::InitAck { address };
718    let Some(ack_to_address) = ack_to.take() else {
719        return;
720    };
721    let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
722    let subnet_context_locked = subnet_context
723        .try_lock()
724        .expect("could not lock subnet_context");
725    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
726    let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
727}
728
729fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
730    trace!(envelope = ?outbound, "sending");
731    let ActorContext { subnet_context, .. } = context;
732    let subnet_context_locked = subnet_context
733        .try_lock()
734        .expect("could not lock subnet_context");
735    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
736    rt_api
737        .send_to(outbound.header().to, outbound.header().priority, outbound)
738        .map_err(|k| ErrorOf::new(k, ""))
739}
740
741fn do_forward(
742    context: &mut ActorContext,
743    to: Address,
744    outbound: Envelope,
745) -> Result<(), ErrorOf<SendErrorKind>> {
746    trace!(forward_to = %to, envelope = ?outbound, "forwarding");
747    let ActorContext { subnet_context, .. } = context;
748    let subnet_context_locked = subnet_context
749        .try_lock()
750        .expect("could not lock subnet_context");
751    let SubnetContext { rt_api, .. } = &*subnet_context_locked;
752    rt_api
753        .send_to(to, outbound.header().priority, outbound)
754        .map_err(|k| ErrorOf::new(k, ""))
755}