mm1_node/runtime/
context.rs1use std::collections::{BTreeMap, HashMap, VecDeque};
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_address::pool::{Lease, Pool};
7use mm1_address::subnet::NetAddress;
8use mm1_common::log;
9use mm1_common::types::AnyError;
10use mm1_core::envelope::Envelope;
11use mm1_core::tap::MessageTap;
12use mm1_core::tracing::TraceId;
13use tokio::sync::{Notify, mpsc};
14
15use crate::actor_key::ActorKey;
16use crate::config::{Mm1NodeConfig, Valid};
17use crate::registry::{MessageWithPermit, MessageWithoutPermit, WeakSubnetMailboxTx};
18use crate::runtime::rt_api::RtApi;
19use crate::runtime::sys_call;
20use crate::runtime::sys_msg::SysMsg;
21
22mod impl_context_api;
23
24pub struct ActorContext {
25 pub(crate) fork_address: Address,
26 pub(crate) fork_lease: Option<Lease>,
27 pub(crate) ack_to: Option<Address>,
28 pub(crate) call: sys_call::Tx,
29
30 pub(crate) subnet_context: Arc<spin::lock_api::Mutex<SubnetContext>>,
31}
32
33#[derive(Default)]
34pub(crate) struct ForkEntry {
35 fork_notifiy: Arc<Notify>,
36 inbox_priority: VecDeque<MessageWithoutPermit<Envelope>>,
37 inbox_regular: VecDeque<MessageWithPermit<Envelope>>,
38 last_ping_received: Option<u64>,
39}
40
41pub(crate) struct SubnetContext {
42 pub(crate) rt_api: RtApi,
43 pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
44
45 pub(crate) actor_key: ActorKey,
46 pub(crate) subnet_pool: Pool,
47 pub(crate) subnet_address: NetAddress,
48
49 pub(crate) subnet_mailbox_tx: WeakSubnetMailboxTx<(TraceId, SysMsg), Envelope>,
50
51 pub(crate) subnet_notify: Arc<Notify>,
52 pub(crate) rx_priority: kanal::Receiver<MessageWithoutPermit<Envelope>>,
53 pub(crate) rx_regular: kanal::Receiver<MessageWithPermit<Envelope>>,
54
55 pub(crate) fork_entries: HashMap<Address, ForkEntry>,
56 pub(crate) bound_subnets: BTreeMap<AddressRange, Address>,
57 pub(crate) message_tap: Arc<dyn MessageTap>,
58
59 pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
61}
62
63impl Drop for ActorContext {
64 fn drop(&mut self) {
65 let Self {
66 fork_address,
67 subnet_context,
68 ..
69 } = self;
70 let mut subnet_context_locked = subnet_context
71 .try_lock()
72 .expect("could not lock subnet_context");
73 let SubnetContext {
74 rt_api,
75 subnet_address,
76 subnet_mailbox_tx,
77 fork_entries,
78 bound_subnets,
79 ..
80 } = &mut *subnet_context_locked;
81
82 let registry = rt_api.registry();
83
84 let mut unbound_subnets = vec![];
85 for (address_range, bound_by) in bound_subnets.iter() {
86 if bound_by == fork_address {
87 unbound_subnets.push(*address_range);
88 }
89 }
90
91 for address_range in unbound_subnets {
92 bound_subnets.remove(&address_range);
93 if !registry.unregister(address_range.into()) {
94 log::error!(%address_range, "could not unregister bound subnet");
95 }
96 }
97 if fork_entries.remove(fork_address).is_none() {
98 log::error!(
99 fork = %fork_address,
100 "nothing actually removed from fork_entries"
101 );
102 }
103
104 let fork_lease_opt = self.fork_lease.take();
105 let subnet_system_tx_opt = subnet_mailbox_tx.tx_system.upgrade();
106
107 if let Some((fork_lease, system_tx)) = fork_lease_opt.zip(subnet_system_tx_opt) {
108 let fork_done = SysMsg::ForkDone(fork_lease);
109 let _ = system_tx.send((TraceId::current(), fork_done));
110 }
111
112 if fork_entries.is_empty() {
113 let _ = registry.unregister(*subnet_address);
114 }
115 }
116}