1use std::collections::{BTreeMap, HashMap, HashSet};
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::{NetAddress, NetMask};
9use mm1_common::errors::chain::{ExactTypeDisplayChainExt, StdErrorDisplayChainExt};
10use mm1_common::errors::error_of::ErrorOf;
11use mm1_common::futures::timeout::FutureTimeoutExt;
12use mm1_common::log;
13use mm1_common::types::{AnyError, Never};
14use mm1_core::context::{
15 Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Ping,
16 PingErrorKind, Quit, RecvErrorKind, SendErrorKind, Start, Stop, Watching,
17};
18use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
19use mm1_core::tap;
20use mm1_core::tracing::{TraceId, WithTraceIdExt};
21use mm1_proto_system as sys;
22use mm1_runnable::local::BoxedRunnable;
23use rand::RngCore;
24use tokio::sync::oneshot;
25use tokio::time::Instant;
26use tracing::{trace, warn};
27
28use crate::config::EffectiveActorConfig;
29use crate::registry::{self, MessageWithPermit, MessageWithoutPermit};
30use crate::runtime::container;
31use crate::runtime::context::{ActorContext, ForkEntry, SubnetContext};
32use crate::runtime::rt_api::RtApi;
33use crate::runtime::sys_call::SysCall;
34use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
35
36impl Quit for ActorContext {
37 async fn quit_ok(&mut self) -> Never {
38 self.call.invoke(SysCall::Exit(Ok(()))).await;
39 std::future::pending().await
40 }
41
42 async fn quit_err<E>(&mut self, reason: E) -> Never
43 where
44 E: std::error::Error + Send + Sync + 'static,
45 {
46 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
47 std::future::pending().await
48 }
49}
50
51impl Fork for ActorContext {
52 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
53 let Self {
54 fork_address: this_address,
55 call,
56 subnet_context,
57 ..
58 } = self;
59
60 let fork_lease = {
61 let mut subnet_context_locked = subnet_context
62 .try_lock()
63 .expect("could not lock subnet_context");
64 let SubnetContext {
65 subnet_pool,
66 fork_entries,
67 ..
68 } = &mut *subnet_context_locked;
69 let fork_lease = subnet_pool.lease(NetMask::MAX).map_err(|lease_error| {
70 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
71 })?;
72 let should_be_none = fork_entries.insert(fork_lease.address, Default::default());
73 assert!(should_be_none.is_none());
74 fork_lease
75 };
76 let fork_address = fork_lease.address;
77 trace!(parent = %this_address, child = %fork_address, "forking");
78
79 call.invoke(SysCall::ForkAdded(fork_address)).await;
80
81 let context = Self {
82 subnet_context: subnet_context.clone(),
83 fork_address,
84 fork_lease: Some(fork_lease),
85 ack_to: None,
86 call: call.clone(),
87 };
88
89 Ok(context)
90 }
91
92 async fn run<F, Fut>(self, fun: F)
93 where
94 F: FnOnce(Self) -> Fut,
95 F: Send + 'static,
96 Fut: Future + Send + 'static,
97 {
98 let call = self.call.clone();
99 let fut = fun(self)
100 .map(|_| ())
101 .with_trace_id(TraceId::current())
102 .boxed();
103 call.invoke(SysCall::Spawn(fut)).await;
104 }
105}
106
107impl Now for ActorContext {
108 type Instant = Instant;
109
110 fn now(&self) -> Self::Instant {
111 Instant::now()
112 }
113}
114
115impl Start<BoxedRunnable<Self>> for ActorContext {
116 fn spawn(
117 &mut self,
118 runnable: BoxedRunnable<Self>,
119 link: bool,
120 ) -> impl Future<Output = Result<Address, ErrorOf<sys::SpawnErrorKind>>> + Send {
121 do_spawn(self, runnable, None, link.then_some(self.fork_address))
122 }
123
124 fn start(
125 &mut self,
126 runnable: BoxedRunnable<Self>,
127 link: bool,
128 start_timeout: Duration,
129 ) -> impl Future<Output = Result<Address, ErrorOf<sys::StartErrorKind>>> + Send {
130 do_start(self, runnable, link, start_timeout)
131 }
132}
133
134impl Stop for ActorContext {
135 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
136 let this = self.fork_address;
137 let out = do_exit(self, this, peer).is_ok();
138 std::future::ready(out)
139 }
140
141 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
142 let out = do_kill(self, peer).is_ok();
143 std::future::ready(out)
144 }
145}
146
147impl Linking for ActorContext {
148 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
149 let this = self.fork_address;
150 do_link(self, this, peer)
151 }
152
153 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
154 let this = self.fork_address;
155 do_unlink(self, this, peer)
156 }
157
158 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
159 do_set_trap_exit(self, enable)
160 }
161}
162
163impl Watching for ActorContext {
164 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
165 do_watch(self, peer)
166 }
167
168 fn unwatch(
169 &mut self,
170 watch_ref: mm1_proto_system::WatchRef,
171 ) -> impl Future<Output = ()> + Send {
172 do_unwatch(self, watch_ref)
173 }
174}
175
176impl InitDone for ActorContext {
177 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
178 do_init_done(self, address)
179 }
180}
181
182impl Messaging for ActorContext {
183 fn address(&self) -> Address {
184 self.fork_address
185 }
186
187 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
188 let ActorContext {
189 fork_address,
190 subnet_context,
191 ..
192 } = self;
193
194 loop {
195 let (inbound_envelope_opt, subnet_notify, fork_notify) = {
196 let mut subnet_context_locked = subnet_context
197 .try_lock()
198 .expect("could not lock subnet_context");
199 let SubnetContext {
200 rt_api,
201 actor_key,
202 subnet_address,
203 subnet_notify,
204 rx_priority,
205 rx_regular,
206 fork_entries,
207 bound_subnets,
208 message_tap,
209 ..
210 } = &mut *subnet_context_locked;
211
212 process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)?;
213
214 let this_fork_entry = fork_entries
215 .get_mut(fork_address)
216 .unwrap_or_else(|| panic!("no fork-entry for {}", fork_address));
217 let inbound_envelope_opt = if let Some(priority_message) =
218 this_fork_entry.inbox_priority.pop_front()
219 {
220 Some(priority_message.message)
221 } else if let Some(regular_message) = this_fork_entry.inbox_regular.pop_front() {
222 Some(regular_message.message)
223 } else {
224 None
225 };
226 if let Some(envelope) = inbound_envelope_opt.as_ref() {
227 message_tap.on_recv(tap::OnRecv {
228 recv_by_addr: *fork_address,
229 recv_by_net: *subnet_address,
230 recv_by_key: actor_key,
231 envelope,
232 });
233 }
234 (
235 inbound_envelope_opt,
236 subnet_notify.clone(),
237 this_fork_entry.fork_notifiy.clone(),
238 )
239 };
240
241 if let Some(inbound_envelope) = inbound_envelope_opt {
242 break Ok(inbound_envelope)
243 } else {
244 let subnet_notified = subnet_notify.notified();
245 let fork_notified = fork_notify.notified();
246
247 tokio::select! {
248 _ = subnet_notified => (),
249 _ = fork_notified => (),
250 }
251 }
252 }
253 }
254
255 async fn close(&mut self) {
256 todo!()
259 }
260
261 fn send(
262 &mut self,
263 envelope: Envelope,
264 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
265 std::future::ready(do_send(self, envelope))
266 }
267
268 fn forward(
269 &mut self,
270 to: Address,
271 envelope: Envelope,
272 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
273 std::future::ready(do_forward(self, to, envelope))
274 }
275}
276
277impl Bind<NetAddress> for ActorContext {
278 async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
279 use std::collections::btree_map::Entry::*;
280
281 let BindArgs {
282 bind_to,
283 inbox_size,
284 } = args;
285 let address_range = AddressRange::from(bind_to);
286
287 log::debug!(%bind_to, %inbox_size, "binding");
288
289 let Self {
290 fork_address,
291 subnet_context,
292 ..
293 } = self;
294
295 {
296 let mut subnet_context_locked = subnet_context
297 .try_lock()
298 .expect("could not lock subnet_context");
299
300 let SubnetContext {
301 rt_api,
302 subnet_mailbox_tx,
303 bound_subnets,
304 ..
305 } = &mut *subnet_context_locked;
306
307 let bound_subnet_entry = match bound_subnets.entry(address_range) {
308 Vacant(v) => v,
309 Occupied(o) => {
310 let previously_bound_to = NetAddress::from(*o.key());
311 return Err(ErrorOf::new(
312 BindErrorKind::Conflict,
313 format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
314 ))
315 },
316 };
317 let subnet_lease = Lease::trusted(bind_to);
318
319 let subnet_mailbox_tx = subnet_mailbox_tx.upgrade().ok_or(ErrorOf::new(
320 BindErrorKind::Closed,
321 "the actor subnet is probably unregistered",
322 ))?;
323 let bound_subnet_node =
324 registry::Node::new(subnet_lease, inbox_size, subnet_mailbox_tx);
325
326 rt_api
327 .registry()
328 .register(bind_to, bound_subnet_node)
329 .map_err(|_| {
330 ErrorOf::new(
331 BindErrorKind::Conflict,
332 "could not register the subnet-node",
333 )
334 })?;
335
336 bound_subnet_entry.insert(*fork_address);
337 }
338
339 log::info!(%bind_to, %inbox_size, "bound");
340
341 Ok(())
342 }
343}
344
345impl Ping for ActorContext {
346 async fn ping(
347 &mut self,
348 address: Address,
349 timeout: Duration,
350 ) -> Result<Duration, ErrorOf<PingErrorKind>> {
351 let ping_id = rand::rng().next_u64();
352 let now = Instant::now();
353 let deadline = now.checked_add(timeout).unwrap_or(now);
354
355 let Self {
356 fork_address,
357 subnet_context,
358 ..
359 } = self;
360
361 let (subnet_notify, fork_notify) = {
362 let mut subnet_context_locked = subnet_context
363 .try_lock()
364 .expect("could not lock subnet_context");
365 let SubnetContext {
366 rt_api,
367 fork_entries,
368 subnet_notify,
369 ..
370 } = &mut *subnet_context_locked;
371 let ForkEntry { fork_notifiy, .. } = fork_entries
372 .get_mut(fork_address)
373 .expect("fork_entry missing");
374
375 let ping_msg = sys::Ping {
376 reply_to: Some(*fork_address),
377 id: ping_id,
378 };
379
380 let ping_header = EnvelopeHeader::to_address(address).with_priority(true);
381 let ping_envelope = Envelope::new(ping_header, ping_msg).into_erased();
382 rt_api
383 .send_to(address, true, ping_envelope)
384 .map_err(|e| ErrorOf::new(PingErrorKind::Send, e.to_string()))?;
385
386 (subnet_notify.clone(), fork_notifiy.clone())
387 };
388
389 loop {
390 let timeout = tokio::time::sleep_until(deadline);
391 tokio::select! {
392 _ = timeout => { return Err(ErrorOf::new(PingErrorKind::Timeout, "timeout elapsed")) },
393 _ = subnet_notify.notified() => (),
394 _ = fork_notify.notified() => (),
395 };
396
397 let mut subnet_context_locked = subnet_context
398 .try_lock()
399 .expect("could not lock subnet_context");
400 let SubnetContext {
401 rt_api,
402 fork_entries,
403 bound_subnets,
404 rx_priority,
405 rx_regular,
406 ..
407 } = &mut *subnet_context_locked;
408
409 process_inlets(rt_api, bound_subnets, rx_priority, rx_regular, fork_entries)
410 .map_err(|e| ErrorOf::new(PingErrorKind::Recv, e.to_string()))?;
411
412 if fork_entries
413 .get(fork_address)
414 .expect("fork_entry_missing")
415 .last_ping_received
416 == Some(ping_id)
417 {
418 break
419 }
420 }
421 Ok(now.elapsed())
422 }
423}
424
425fn process_inlets(
426 rt_api: &RtApi,
427 bound_subnets: &BTreeMap<AddressRange, Address>,
428 rx_priority: &mut kanal::Receiver<MessageWithoutPermit<Envelope>>,
429 rx_regular: &mut kanal::Receiver<MessageWithPermit<Envelope>>,
430 fork_entries: &mut HashMap<Address, ForkEntry>,
431) -> Result<(), ErrorOf<RecvErrorKind>> {
432 let mut notified_forks = HashSet::new();
433 while let Some(message) = rx_priority
434 .try_recv_realtime()
435 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
436 {
437 let message_to = bound_subnets
438 .get(&AddressRange::from(message.to))
439 .copied()
440 .unwrap_or(message.to);
441 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
442 warn!(dst = %message_to, "no such fork");
443 continue
444 };
445 let should_notify = if let Some(ping_message) = message.message.peek::<sys::Ping>() {
446 let sys::Ping { reply_to, id } = *ping_message;
447 if let Some(reply_to) = reply_to {
448 trace!(%id, %reply_to, "received a ping request");
449 let pong_header = EnvelopeHeader::to_address(reply_to).with_priority(true);
450 let pong_envelope =
451 Envelope::new(pong_header, sys::Ping { reply_to: None, id }).into_erased();
452 rt_api
453 .send_to(reply_to, true, pong_envelope)
454 .inspect_err(
455 |e| warn!(reason = %e.as_display_chain(), "can't send ping-response"),
456 )
457 .ok();
458 trace!(%id, %reply_to, "send a ping response");
459 false
460 } else {
461 trace!(%id, "received a ping response");
462 fork_entry.last_ping_received = Some(id);
463 true
464 }
465 } else {
466 fork_entry.inbox_priority.push_back(message);
467 true
468 };
469
470 if should_notify && notified_forks.insert(message_to) {
471 trace!(dst = %message_to, "notifying fork");
472 fork_entry.fork_notifiy.notify_one();
473 }
474 }
475
476 while let Some(message) = rx_regular
477 .try_recv_realtime()
478 .map_err(|e| ErrorOf::new(RecvErrorKind::Closed, e.to_string()))?
479 {
480 let message_to = bound_subnets
481 .get(&AddressRange::from(message.to))
482 .copied()
483 .unwrap_or(message.to);
484 let Some(fork_entry) = fork_entries.get_mut(&message_to) else {
485 warn!(dst = %message_to, "no such fork");
486 continue
487 };
488 fork_entry.inbox_regular.push_back(message);
489 trace!(dst = %message_to, "subnet received regular message");
490 if notified_forks.insert(message_to) {
491 trace!(dst = %message_to, "notifying fork");
492 fork_entry.fork_notifiy.notify_one();
493 }
494 }
495
496 Ok(())
497}
498
499async fn do_start(
500 context: &mut ActorContext,
501 runnable: BoxedRunnable<ActorContext>,
502 link: bool,
503 timeout: Duration,
504) -> Result<Address, ErrorOf<sys::StartErrorKind>> {
505 let this_address = context.fork_address;
506
507 let mut fork = context
508 .fork()
509 .await
510 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
511
512 let fork_address = fork.fork_address;
513 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
514 .await
515 .map_err(|e| e.map_kind(sys::StartErrorKind::Spawn))?;
516
517 let envelope = match fork.recv().timeout(timeout).await {
518 Err(_elapsed) => {
519 do_kill(context, spawned_address)
520 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?;
521
522 return Err(ErrorOf::new(
525 sys::StartErrorKind::Timeout,
526 "no init-ack within timeout",
527 ))
528 },
529 Ok(recv_result) => {
530 recv_result
531 .map_err(|e| ErrorOf::new(sys::StartErrorKind::InternalError, e.to_string()))?
532 },
533 };
534
535 dispatch!(match envelope {
536 sys::InitAck { address } => {
537 if link {
538 do_link(context, this_address, address).await;
539 }
540 Ok(address)
541 },
542
543 sys::Exited { .. } => {
544 Err(ErrorOf::new(
545 sys::StartErrorKind::Exited,
546 "exited before init-ack",
547 ))
548 },
549
550 unexpected @ _ => {
551 Err(ErrorOf::new(
552 sys::StartErrorKind::InternalError,
553 format!("unexpected message: {unexpected:?}"),
554 ))
555 },
556 })
557}
558
559async fn do_spawn(
560 context: &mut ActorContext,
561 runnable: BoxedRunnable<ActorContext>,
562 ack_to: Option<Address>,
563 link_to: impl IntoIterator<Item = Address>,
564) -> Result<Address, ErrorOf<sys::SpawnErrorKind>> {
565 let ActorContext { subnet_context, .. } = context;
566
567 let (actor_key, rt_config, rt_api, tx_actor_failure) = {
568 let subnet_context_locked = subnet_context
569 .try_lock()
570 .expect("could not lock subnet_context");
571 let SubnetContext {
572 rt_api,
573 rt_config,
574 actor_key,
575 tx_actor_failure,
576 ..
577 } = &*subnet_context_locked;
578
579 (
580 actor_key.child(runnable.func_name()),
581 rt_config.clone(),
582 rt_api.clone(),
583 tx_actor_failure.clone(),
584 )
585 };
586
587 let actor_config = rt_config.actor_config(&actor_key);
588 let execute_on = rt_api.choose_executor(actor_config.runtime_key());
589 let message_tap = rt_api.message_tap(actor_config.message_tap_key());
590
591 trace!(?ack_to, "starting");
592
593 let subnet_lease = rt_api
594 .request_address(actor_config.netmask())
595 .await
596 .inspect_err(|e| log::error!("lease-error: {}", e))
597 .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::ResourceConstraint, e.to_string()))?;
598
599 trace!(subnet_lease = %subnet_lease.net_address(), "subnet leased");
600
601 let rt_api = rt_api.clone();
602 let rt_config = rt_config.clone();
603 let container = container::Container::create(
604 container::ContainerArgs {
605 ack_to,
606 link_to: link_to.into_iter().collect(),
608 actor_key,
609 trace_id: TraceId::current(),
610
611 subnet_lease,
612 rt_api,
613 rt_config,
614 message_tap,
615 tx_actor_failure: tx_actor_failure.clone(),
616 },
617 runnable,
618 )
619 .map_err(|e| ErrorOf::new(sys::SpawnErrorKind::InternalError, e.to_string()))?;
620 let actor_address = container.actor_address();
621
622 trace!(spawned_address = %actor_address, "about to run spawned actor");
623
624 let tx_actor_failure = tx_actor_failure.clone();
625 let _join_handle = execute_on.spawn(async move {
627 match container.run().await {
628 Ok(Ok(())) => (),
629 Ok(Err(actor_failure)) => {
630 let _ = tx_actor_failure.send((actor_address, actor_failure));
631 },
632 Err(container_failure) => {
633 let report = AnyError::from(container_failure);
634 log::error!(
635 err = %report.as_display_chain(), %actor_address,
636 "actor container failure"
637 );
638 },
639 }
640 });
641
642 Ok(actor_address)
643}
644
645fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
646 let ActorContext { subnet_context, .. } = context;
647 let subnet_context_locked = subnet_context
648 .try_lock()
649 .expect("could not lock subnet_context");
650 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
651
652 rt_api.sys_send(
653 peer,
654 SysMsg::Link(SysLink::Exit {
655 sender: this,
656 receiver: peer,
657 reason: ExitReason::Terminate,
658 }),
659 )
660}
661
662fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
663 let ActorContext { subnet_context, .. } = context;
664 let subnet_context_locked = subnet_context
665 .try_lock()
666 .expect("could not lock subnet_context");
667 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
668
669 rt_api.sys_send(peer, SysMsg::Kill)
670}
671
672async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
673 let ActorContext { call, .. } = context;
674
675 call.invoke(SysCall::Link {
676 sender: this,
677 receiver: peer,
678 })
679 .await
680}
681
682async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
683 let ActorContext { call, .. } = context;
684 call.invoke(SysCall::Unlink {
685 sender: this,
686 receiver: peer,
687 })
688 .await
689}
690
691async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
692 let ActorContext { call, .. } = context;
693 call.invoke(SysCall::TrapExit(enable)).await;
694}
695
696async fn do_watch(context: &mut ActorContext, peer: Address) -> sys::WatchRef {
697 let ActorContext {
698 fork_address: this,
699 call,
700 ..
701 } = context;
702 let (reply_tx, reply_rx) = oneshot::channel();
703 call.invoke(SysCall::Watch {
704 sender: *this,
705 receiver: peer,
706 reply_tx,
707 })
708 .await;
709 reply_rx.await.expect("sys-call remained unanswered")
710}
711
712async fn do_unwatch(context: &mut ActorContext, watch_ref: sys::WatchRef) {
713 let ActorContext {
714 fork_address: this,
715 call,
716 ..
717 } = context;
718 call.invoke(SysCall::Unwatch {
719 sender: *this,
720 watch_ref,
721 })
722 .await;
723}
724
725async fn do_init_done(context: &mut ActorContext, address: Address) {
726 let ActorContext {
727 ack_to,
728 subnet_context,
729 ..
730 } = context;
731 let message = sys::InitAck { address };
732 let Some(ack_to_address) = ack_to.take() else {
733 return;
734 };
735 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
736 let subnet_context_locked = subnet_context
737 .try_lock()
738 .expect("could not lock subnet_context");
739 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
740 let _ = rt_api.send_to(envelope.header().to, true, envelope.into_erased());
741}
742
743fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
744 let sender = context.fork_address;
745 let to = outbound.header().to;
746 {
747 let subnet_context_locked = context
748 .subnet_context
749 .try_lock()
750 .expect("could not lock subnet_context");
751 let SubnetContext {
752 subnet_address,
753 actor_key,
754 message_tap,
755 ..
756 } = &*subnet_context_locked;
757 message_tap.on_send(tap::OnSend {
758 sent_by_addr: sender,
759 sent_by_net: *subnet_address,
760 sent_by_key: actor_key,
761 sent_to_addr: to,
762 envelope: &outbound,
763 });
764 }
765
766 let (message, empty_envelope) = outbound.take();
767 let mut header: EnvelopeHeader = empty_envelope.into();
768
769 let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
770 warn!(
771 envelope_header = ?header,
772 "TTL exhausted, dropping message"
773 );
774 ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
775 })?;
776 header.ttl = new_ttl;
777
778 let outbound = Envelope::new(header, message);
779
780 trace!(envelope = ?outbound, "sending");
781 let ActorContext { subnet_context, .. } = context;
782 let subnet_context_locked = subnet_context
783 .try_lock()
784 .expect("could not lock subnet_context");
785 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
786 rt_api
787 .send_to(outbound.header().to, outbound.header().priority, outbound)
788 .map_err(|k| ErrorOf::new(k, ""))
789}
790
791fn do_forward(
792 context: &mut ActorContext,
793 to: Address,
794 outbound: Envelope,
795) -> Result<(), ErrorOf<SendErrorKind>> {
796 let sender = context.fork_address;
797 {
798 let subnet_context_locked = context
799 .subnet_context
800 .try_lock()
801 .expect("could not lock subnet_context");
802 let SubnetContext {
803 subnet_address,
804 actor_key,
805 message_tap,
806 ..
807 } = &*subnet_context_locked;
808 message_tap.on_send(tap::OnSend {
809 sent_by_addr: sender,
810 sent_by_net: *subnet_address,
811 sent_by_key: actor_key,
812 sent_to_addr: to,
813 envelope: &outbound,
814 });
815 }
816
817 let (message, empty_envelope) = outbound.take();
818 let mut header: EnvelopeHeader = empty_envelope.into();
819
820 let new_ttl = header.ttl.checked_sub(1).ok_or_else(|| {
821 warn!(
822 forward_to = %to,
823 envelope_header = ?header,
824 "TTL exhausted, dropping message"
825 );
826 ErrorOf::new(SendErrorKind::TtlExhausted, "TTL exhausted")
827 })?;
828 header.ttl = new_ttl;
829
830 let outbound = Envelope::new(header, message);
831
832 trace!(forward_to = %to, envelope = ?outbound, "forwarding");
833 let ActorContext { subnet_context, .. } = context;
834 let subnet_context_locked = subnet_context
835 .try_lock()
836 .expect("could not lock subnet_context");
837 let SubnetContext { rt_api, .. } = &*subnet_context_locked;
838 rt_api
839 .send_to(to, outbound.header().priority, outbound)
840 .map_err(|k| ErrorOf::new(k, ""))
841}