1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::chain::{ExactTypeDisplayChainExt, StdErrorDisplayChainExt};
10use mm1_common::errors::error_of::ErrorOf;
11use mm1_common::futures::timeout::FutureTimeoutExt;
12use mm1_common::log;
13use mm1_common::types::{AnyError, Never};
14use mm1_core::context::{
15 Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Ping,
16 PingErrorKind, Quit, RecvErrorKind, SendErrorKind, Start, Stop, Watching,
17};
18use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
19use mm1_core::tracing::{TraceId, WithTraceIdExt};
20use mm1_proto_system as sys;
21use mm1_runnable::local::BoxedRunnable;
22use rand::RngCore;
23use tokio::sync::oneshot;
24use tokio::time::Instant;
25use tracing::{trace, warn};
26
27use crate::config::EffectiveActorConfig;
28use crate::registry::{self, MessageWithPermit, MessageWithoutPermit};
29use crate::runtime::container;
30use crate::runtime::context::{ActorContext, ForkEntry, SubnetContext};
31use crate::runtime::rt_api::RtApi;
32use crate::runtime::sys_call::SysCall;
33use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
34
35impl Quit for ActorContext {
36 async fn quit_ok(&mut self) -> Never {
37 self.call.invoke(SysCall::Exit(Ok(()))).await;
38 std::future::pending().await
39 }
40
41 async fn quit_err<E>(&mut self, reason: E) -> Never
42 where
43 E: std::error::Error + Send + Sync + 'static,
44 {
45 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
46 std::future::pending().await
47 }
48}
49
50impl Fork for ActorContext {
51 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
52 let Self {
53 fork_address: this_address,
54 call,
55 subnet_context,
56 ..
57 } = self;
58
59 let fork_lease = {
60 let mut subnet_context_locked = subnet_context
61 .try_lock()
62 .expect("could not lock subnet_context");
63 let SubnetContext {
64 subnet_pool,
65 fork_entries,
66 ..
67 } = &mut *subnet_context_locked;
68 let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
69 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
70 })?;
71 let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
72 assert!(should_be_none.is_none());
73 fork_lease
74 };
75 let fork_address = fork_lease.address;
76 trace!(parent = %this_address, child = %fork_address, "forking");
77
78 call.invoke(SysCall::ForkAdded(fork_address)).await;
79
80 let context = Self {
81 subnet_context: subnet_context.clone(),
82 fork_address,
83 fork_lease: Some(fork_lease),
84 ack_to: None,
85 call: call.clone(),
86 };
87
88 Ok(context)
89 }
90
91 async fn run<F, Fut>(self, fun: F)
92 where
93 F: FnOnce(Self) -> Fut,
94 F: Send + 'static,
95 Fut: Future + Send + 'static,
96 {
97 let call = self.call.clone();
98 let fut = fun(self)
99 .map(|_| ())
100 .with_trace_id(TraceId::current())
101 .boxed();
102 call.invoke(SysCall::Spawn(fut)).await;
103 }
104}
105
106impl Now for ActorContext {
107 type Instant = Instant;
108
109 fn now(&self) -> Self::Instant {
110 Instant::now()
111 }
112}
113
114impl Start<BoxedRunnable<Self>> for ActorContext {
115 fn spawn(
116 &mut self,
117 runnable: BoxedRunnable<Self>,
118 link: bool,
119 ) -> impl Future<Output = Result<Address, ErrorOf<sys::SpawnErrorKind>>> + Send {
120 do_spawn(self, runnable, None, link.then_some(self.fork_address))
121 }
122
123 fn start(
124 &mut self,
125 runnable: BoxedRunnable<Self>,
126 link: bool,
127 start_timeout: Duration,
128 ) -> impl Future<Output = Result<Address, ErrorOf<sys::StartErrorKind>>> + Send {
129 do_start(self, runnable, link, start_timeout)
130 }
131}
132
133impl Stop for ActorContext {
134 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
135 let this = self.fork_address;
136 let out = do_exit(self, this, peer).is_ok();
137 std::future::ready(out)
138 }
139
140 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
141 let out = do_kill(self, peer).is_ok();
142 std::future::ready(out)
143 }
144}
145
146impl Linking for ActorContext {
147 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
148 let this = self.fork_address;
149 do_link(self, this, peer)
150 }
151
152 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
153 let this = self.fork_address;
154 do_unlink(self, this, peer)
155 }
156
157 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
158 do_set_trap_exit(self, enable)
159 }
160}
161
162impl Watching for ActorContext {
163 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
164 do_watch(self, peer)
165 }
166
167 fn unwatch(
168 &mut self,
169 watch_ref: mm1_proto_system::WatchRef,
170 ) -> impl Future<Output = ()> + Send {
171 do_unwatch(self, watch_ref)
172 }
173}
174
175impl InitDone for ActorContext {
176 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
177 do_init_done(self, address)
178 }
179}
180
181impl Messaging for ActorContext {
182 fn address(&self) -> Address {
183 self.fork_address
184 }
185
186 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
187 let ActorContext {
188 fork_address,
189 subnet_context,
190 ..
191 } = self;
192
193 loop {
194 let (inbound_envelope_opt, subnet_notify, fork_notify) = {
195 let mut subnet_context_locked = subnet_context
196 .try_lock()
197 .expect("could not lock subnet_context");
198 let SubnetContext {
199 rt_api,
200 subnet_notify,
201 rx_priority,
202 rx_regular,
203 fork_entries,
204 bound_subnets,
205 message_tap,
206 ..
207 } = &mut *subnet_context_locked;
208
209 process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)?;
210
211 let this_fork_entry = fork_entries
212 .get_mut(fork_address)
213 .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
214 let inbound_envelope_opt = if let Some(priority_message) =
215 this_fork_entry.inbox_priority.pop_front()
216 {
217 Some(priority_message.message)
218 } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
219 Some(regular_message.message)
220 } else {
221 None
222 };
223 if let Some(inbound_envelope) = inbound_envelope_opt.as_ref() {
224 message_tap.on_recv(*fork_address, inbound_envelope);
225 }
226 (
227 inbound_envelope_opt,
228 subnet_notify.clone(),
229 this_fork_entry.fork_notifiy.clone(),
230 )
231 };
232
233 if let Some(inbound_envelope) = inbound_envelope_opt {
234 break Ok(inbound_envelope)
235 } else {
236 let subnet_notified = subnet_notify.notified();
237 let fork_notified = fork_notify.notified();
238
239 tokio::select! {
240 _ = subnet_notified => (),
241 _ = fork_notified => (),
242 }
243 }
244 }
245 }
246
247 async fn close(&mut self) {
248 todo!()
251 }
252
253 fn send(
254 &mut self,
255 envelope: Envelope,
256 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
257 std::future::ready(do_send(self, envelope))
258 }
259
260 fn forward(
261 &mut self,
262 to: Address,
263 envelope: Envelope,
264 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
265 std::future::ready(do_forward(self, to, envelope))
266 }
267}
268
269impl Bind<NetAddress> for ActorContext {
270 async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
271 use std::collections::btree_map::Entry::*;
272
273 let BindArgs {
274 bind_to,
275 inbox_size,
276 } = args;
277 let address_range = AddressRange::from(bind_to);
278
279 log::debug!(%bind_to, %inbox_size, "binding");
280
281 let Self {
282 fork_address,
283 subnet_context,
284 ..
285 } = self;
286
287 {
288 let mut subnet_context_locked = subnet_context
289 .try_lock()
290 .expect("could not lock subnet_context");
291
292 let SubnetContext {
293 rt_api,
294 subnet_mailbox_tx,
295 bound_subnets,
296 ..
297 } = &mut *subnet_context_locked;
298
299 let bound_subnet_entry = match bound_subnets.entry(address_range) {
300 Vacant(v) => v,
301 Occupied(o) => {
302 let previously_bound_to = NetAddress::from(*o.key());
303 return Err(ErrorOf::new(
304 BindErrorKind::Conflict,
305 format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
306 ))
307 },
308 };
309 let subnet_lease = Lease::trusted(bind_to);
310
311 let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
312 BindErrorKind::Closed,
313 "the actor subnet is probably unregistered",
314 ))?;
315 let bound_subnet_node =
316 registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
317
318 rt_api
319 .registry()
320 .register(bind_to, bound_subnet_node)
321 .map_err(|_| {
322 ErrorOf::new(
323 BindErrorKind::Conflict,
324 "could not register the subnet-node",
325 )
326 })?;
327
328 bound_subnet_entry.insert(*fork_address);
329 }
330
331 log::info!(%bind_to, %inbox_size, "bound");
332
333 Ok(())
334 }
335}
336
337impl Ping for ActorContext {
338 async fn ping(
339 &mut self,
340 address: Address,
341 timeout: Duration,
342 ) -> Result<Duration, ErrorOf<PingErrorKind>> {
343 let ping_id = rand::rng().next_u64();
344 let now = Instant::now();
345 let deadline = now.checked_add(timeout).unwrap_or(now);
346
347 let Self {
348 fork_address,
349 subnet_context,
350 ..
351 } = self;
352
353 let (subnet_notify, fork_notify) = {
354 let mut subnet_context_locked = subnet_context
355 .try_lock()
356 .expect("could not lock subnet_context");
357 let SubnetContext {
358 rt_api,
359 fork_entries,
360 subnet_notify,
361 ..
362 } = &mut *subnet_context_locked;
363 let ForkEntry { fork_notifiy, .. } = fork_entries
364 .get_mut(fork_address)
365 .expect("fork_entry missing");
366
367 let ping_msg = sys::Ping {
368 reply_to: Some(*fork_address),
369 id: ping_id,
370 };
371
372 let ping_header = EnvelopeHeader::to_address(address).with_priority(true);
373 let ping_envelope = Envelope::new(ping_header, ping_msg).into_erased();
374 rt_api
375 .send_to(address, true, ping_envelope)
376 .map_err(|e| ErrorOf::new(PingErrorKind::Send, e.to_string()))?;
377
378 (subnet_notify.clone(), fork_notifiy.clone())
379 };
380
381 loop {
382 let timeout = tokio::time::sleep_until(deadline);
383 tokio::select! {
384 _ = timeout => { return Err(ErrorOf::new(PingErrorKind::Timeout, "timeout elapsed")) },
385 _ = subnet_notify.notified() => (),
386 _ = fork_notify.notified() => (),
387 };
388
389 let mut subnet_context_locked = subnet_context
390 .try_lock()
391 .expect("could not lock subnet_context");
392 let SubnetContext {
393 rt_api,
394 fork_entries,
395 bound_subnets,
396 rx_priority,
397 rx_regular,
398 ..
399 } = &mut *subnet_context_locked;
400
401 process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)
402 .map_err(|e| ErrorOf::new(PingErrorKind::Recv, e.to_string()))?;
403
404 if fork_entries
405 .get(fork_address)
406 .expect("fork_entry_missing")
407 .last_ping_received
408 == Some(ping_id)
409 {
410 break
411 }
412 }
413 Ok(now.elapsed())
414 }
415}
416
417fn process_inlets(
418 rt_api: &RtApi,
419 bound_subnets: &BTreeMap<AddressRange, Address>,
420 rx_priority: &mut kanal::Receiver<MessageWithoutPermit<Envelope>>,
421 rx_regular: &mut kanal::Receiver<MessageWithPermit<Envelope>>,
422 fork_entries: &mut HashMap<Address, ForkEntry>,
423) -> Result<(), ErrorOf<RecvErrorKind>> {
424 let mut notified_forks = HashSet::new();
425 while let Some(message) = rx_priority
426 .try_recv_realtime()
427 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
428 {
429 let message_to = bound_subnets
430 .get(&AddressRange::from(message.to))
431 .copied()
432 .unwrap_or(message.to);
433 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
434 warn!(dst = %message_to, "no such fork");
435 continue
436 };
437 let should_notify = if let Some(ping_message) = message.message.peek::<sys::Ping>() {
438 let sys::Ping { reply_to, id } = *ping_message;
439 if let Some(reply_to) = reply_to {
440 trace!(%id, %reply_to, "received a ping request");
441 let pong_header = EnvelopeHeader::to_address(reply_to).with_priority(true);
442 let pong_envelope =
443 Envelope::new(pong_header, sys::Ping { reply_to: None, id }).into_erased();
444 rt_api
445 .send_to(reply_to, true, pong_envelope)
446 .inspect_err(
447 |e| warn!(reason = %e.as_display_chain(), "can't send ping-response"),
448 )
449 .ok();
450 trace!(%id, %reply_to, "send a ping response");
451 false
452 } else {
453 trace!(%id, "received a ping response");
454 fork_entry.last_ping_received = Some(id);
455 true
456 }
457 } else {
458 fork_entry.inbox_priority.push_back(message);
459 true
460 };
461
462 if should_notify && notified_forks.insert(message_to) {
463 trace!(dst = %message_to, "notifying fork");
464 fork_entry.fork_notifiy.notify_one();
465 }
466 }
467
468 while let Some(message) = rx_regular
469 .try_recv_realtime()
470 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
471 {
472 let message_to = bound_subnets
473 .get(&AddressRange::from(message.to))
474 .copied()
475 .unwrap_or(message.to);
476 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
477 warn!(dst = %message_to, "no such fork");
478 continue
479 };
480 fork_entry.inbox_regular.push_back(message);
481 trace!(dst = %message_to, "subnet received regular message");
482 if notified_forks.insert(message_to) {
483 trace!(dst = %message_to, "notifying fork");
484 fork_entry.fork_notifiy.notify_one();
485 }
486 }
487
488 Ok(())
489}
490
491async fn do_start(
492 context: &mut ActorContext,
493 runnable: BoxedRunnable<ActorContext>,
494 link: bool,
495 timeout: Duration,
496) -> Result<Address, ErrorOf<sys::StartErrorKind>> {
497 let this_address = context.fork_address;
498
499 let mut fork = context
500 .fork()
501 .await
502 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
503
504 let fork_address = fork.fork_address;
505 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
506 .await
507 .map_err(|e| e.map_kind(sys::StartErrorKind::Spawn))?;
508
509 let envelope = match fork.recv().timeout(timeout).await {
510 Err(_elapsed) => {
511 do_kill(context, spawned_address)
512 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
513
514 return Err(ErrorOf::new(
517 sys::StartErrorKind::Timeout,
518 "no init-ack within timeout",
519 ))
520 },
521 Ok(recv_result) => {
522 recv_result
523 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?
524 },
525 };
526
527 dispatch!(match envelope {
528 sys::InitAck { address } => {
529 if link {
530 do_link(context, this_address, address).await;
531 }
532 Ok(address)
533 },
534
535 sys::Exited { .. } => {
536 Err(ErrorOf::new(
537 sys::StartErrorKind::Exited,
538 "exited before init-ack",
539 ))
540 },
541
542 unexpected @ _ => {
543 Err(ErrorOf::new(
544 sys::StartErrorKind::InternalError,
545 format!("unexpected message: {unexpected:?}"),
546 ))
547 },
548 })
549}
550
551async fn do_spawn(
552 context: &mut ActorContext,
553 runnable: BoxedRunnable<ActorContext>,
554 ack_to: Option<Address>,
555 link_to: impl IntoIterator<Item = Address>,
556) -> Result<Address, ErrorOf<sys::SpawnErrorKind>> {
557 let ActorContext { subnet_context, .. } = context;
558
559 let (actor_key, rt_config, rt_api, tx_actor_failure) = {
560 let subnet_context_locked = subnet_context
561 .try_lock()
562 .expect("could not lock subnet_context");
563 let SubnetContext {
564 rt_api,
565 rt_config,
566 actor_key,
567 tx_actor_failure,
568 ..
569 } = &*subnet_context_locked;
570
571 (
572 actor_key.child(runnable.func_name()),
573 rt_config.clone(),
574 rt_api.clone(),
575 tx_actor_failure.clone(),
576 )
577 };
578
579 let actor_config = rt_config.actor_config(&actor_key);
580 let execute_on = rt_api.choose_executor(actor_config.runtime_key());
581 let message_tap = rt_api.message_tap(actor_config.message_tap_key());
582
583 trace!(?ack_to, "starting");
584
585 let subnet_lease = rt_api
586 .request_address(actor_config.netmask())
587 .await
588 .inspect_err(|e| log::error!("lease-error: {}", e))
589 .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::ResourceConstraint, e.to_string()))?;
590
591 trace!(subnet_lease = %subnet_lease.net_address(), "subnet leased");
592
593 let rt_api = rt_api.clone();
594 let rt_config = rt_config.clone();
595 let container = container::Container::create(
596 container::ContainerArgs {
597 ack_to,
598 link_to: link_to.into_iter().collect(),
600 actor_key,
601 trace_id: TraceId::current(),
602
603 subnet_lease,
604 rt_api,
605 rt_config,
606 message_tap,
607 tx_actor_failure: tx_actor_failure.clone(),
608 },
609 runnable,
610 )
611 .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::InternalError, e.to_string()))?;
612 let actor_address = container.actor_address();
613
614 trace!(spawned_address = %actor_address, "about to run spawned actor");
615
616 let tx_actor_failure = tx_actor_failure.clone();
617 let _join_handle = execute_on.spawn(async move {
619 match container.run().await {
620 Ok(Ok(())) => (),
621 Ok(Err(actor_failure)) => {
622 let _ = tx_actor_failure.send((actor_address, actor_failure));
623 },
624 Err(container_failure) => {
625 let report = AnyError::from(container_failure);
626 log::error!(
627 err = %report.as_display_chain(), %actor_address,
628 "actor container failure"
629 );
630 },
631 }
632 });
633
634 Ok(actor_address)
635}
636
637fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
638 let ActorContext { subnet_context, .. } = context;
639 let subnet_context_locked = subnet_context
640 .try_lock()
641 .expect("could not lock subnet_context");
642 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
643
644 rt_api.sys_send(
645 peer,
646 SysMsg::Link(SysLink::Exit {
647 sender: this,
648 receiver: peer,
649 reason: ExitReason::Terminate,
650 }),
651 )
652}
653
654fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
655 let ActorContext { subnet_context, .. } = context;
656 let subnet_context_locked = subnet_context
657 .try_lock()
658 .expect("could not lock subnet_context");
659 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
660
661 rt_api.sys_send(peer, SysMsg::Kill)
662}
663
664async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
665 let ActorContext { call, .. } = context;
666
667 call.invoke(SysCall::Link {
668 sender: this,
669 receiver: peer,
670 })
671 .await
672}
673
674async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
675 let ActorContext { call, .. } = context;
676 call.invoke(SysCall::Unlink {
677 sender: this,
678 receiver: peer,
679 })
680 .await
681}
682
683async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
684 let ActorContext { call, .. } = context;
685 call.invoke(SysCall::TrapExit(enable)).await;
686}
687
688async fn do_watch(context: &mut ActorContext, peer: Address) -> sys::WatchRef {
689 let ActorContext {
690 fork_address: this,
691 call,
692 ..
693 } = context;
694 let (reply_tx, reply_rx) = oneshot::channel();
695 call.invoke(SysCall::Watch {
696 sender: *this,
697 receiver: peer,
698 reply_tx,
699 })
700 .await;
701 reply_rx.await.expect("sys-call remained unanswered")
702}
703
704async fn do_unwatch(context: &mut ActorContext, watch_ref: sys::WatchRef) {
705 let ActorContext {
706 fork_address: this,
707 call,
708 ..
709 } = context;
710 call.invoke(SysCall::Unwatch {
711 sender: *this,
712 watch_ref,
713 })
714 .await;
715}
716
717async fn do_init_done(context: &mut ActorContext, address: Address) {
718 let ActorContext {
719 ack_to,
720 subnet_context,
721 ..
722 } = context;
723 let message = sys::InitAck { address };
724 let Some(ack_to_address) = ack_to.take() else {
725 return;
726 };
727 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
728 let subnet_context_locked = subnet_context
729 .try_lock()
730 .expect("could not lock subnet_context");
731 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
732 let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
733}
734
735fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
736 let sender = context.fork_address;
737 let to = outbound.header().to;
738 {
739 let subnet_context_locked = context
740 .subnet_context
741 .try_lock()
742 .expect("could not lock subnet_context");
743 let SubnetContext { message_tap, .. } = &*subnet_context_locked;
744 message_tap.on_send(sender, to, &outbound);
745 }
746
747 let (message, empty_envelope) = outbound.take();
748 let mut header: EnvelopeHeader = empty_envelope.into();
749
750 let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
751 warn!(
752 envelope_header = ?header,
753 "TTL exhausted, dropping message"
754 );
755 ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
756 })?;
757 header.ttl = new_ttl;
758
759 let outbound = Envelope::new(header, message);
760
761 trace!(envelope = ?outbound, "sending");
762 let ActorContext { subnet_context, .. } = context;
763 let subnet_context_locked = subnet_context
764 .try_lock()
765 .expect("could not lock subnet_context");
766 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
767 rt_api
768 .send_to(outbound.header().to, outbound.header().priority, outbound)
769 .map_err(|k| ErrorOf::new(k, ""))
770}
771
772fn do_forward(
773 context: &mut ActorContext,
774 to: Address,
775 outbound: Envelope,
776) -> Result<(), ErrorOf<SendErrorKind>> {
777 let sender = context.fork_address;
778 {
779 let subnet_context_locked = context
780 .subnet_context
781 .try_lock()
782 .expect("could not lock subnet_context");
783 let SubnetContext { message_tap, .. } = &*subnet_context_locked;
784 message_tap.on_send(sender, to, &outbound);
785 }
786
787 let (message, empty_envelope) = outbound.take();
788 let mut header: EnvelopeHeader = empty_envelope.into();
789
790 let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
791 warn!(
792 forward_to = %to,
793 envelope_header = ?header,
794 "TTL exhausted, dropping message"
795 );
796 ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
797 })?;
798 header.ttl = new_ttl;
799
800 let outbound = Envelope::new(header, message);
801
802 trace!(forward_to = %to, envelope = ?outbound, "forwarding");
803 let ActorContext { subnet_context, .. } = context;
804 let subnet_context_locked = subnet_context
805 .try_lock()
806 .expect("could not lock subnet_context");
807 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
808 rt_api
809 .send_to(to, outbound.header().priority, outbound)
810 .map_err(|k| ErrorOf::new(k, ""))
811}