mm1_node/runtime/
context.rs1use std::collections::{BTreeMap, HashMap, VecDeque};
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_address::pool::{Lease, Pool};
7use mm1_address::subnet::NetAddress;
8use mm1_common::log;
9use mm1_common::types::AnyError;
10use mm1_core::envelope::Envelope;
11use mm1_core::tracing::TraceId;
12use tokio::sync::{Notify, mpsc};
13
14use crate::actor_key::ActorKey;
15use crate::config::{Mm1NodeConfig, Valid};
16use crate::registry::{MessageWithPermit, MessageWithoutPermit, WeakSubnetMailboxTx};
17use crate::runtime::rt_api::RtApi;
18use crate::runtime::sys_call;
19use crate::runtime::sys_msg::SysMsg;
20
21mod impl_context_api;
22
23pub struct ActorContext {
24 pub(crate) fork_address: Address,
25 pub(crate) fork_lease: Option<Lease>,
26 pub(crate) ack_to: Option<Address>,
27 pub(crate) call: sys_call::Tx,
28
29 pub(crate) subnet_context: Arc<spin::lock_api::Mutex<SubnetContext>>,
30}
31
32#[derive(Default)]
33pub(crate) struct ForkEntry {
34 fork_notifiy: Arc<Notify>,
35 inbox_priority: VecDeque<MessageWithoutPermit<Envelope>>,
36 inbox_regular: VecDeque<MessageWithPermit<Envelope>>,
37 last_ping_received: Option<u64>,
38}
39
40pub(crate) struct SubnetContext {
41 pub(crate) rt_api: RtApi,
42 pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
43
44 pub(crate) actor_key: ActorKey,
45 pub(crate) subnet_pool: Pool,
46 pub(crate) subnet_address: NetAddress,
47
48 pub(crate) subnet_mailbox_tx: WeakSubnetMailboxTx<(TraceId, SysMsg), Envelope>,
49
50 pub(crate) subnet_notify: Arc<Notify>,
51 pub(crate) rx_priority: kanal::Receiver<MessageWithoutPermit<Envelope>>,
52 pub(crate) rx_regular: kanal::Receiver<MessageWithPermit<Envelope>>,
53
54 pub(crate) fork_entries: HashMap<Address, ForkEntry>,
55 pub(crate) bound_subnets: BTreeMap<AddressRange, Address>,
56
57 pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
59}
60
61impl Drop for ActorContext {
62 fn drop(&mut self) {
63 let Self {
64 fork_address,
65 subnet_context,
66 ..
67 } = self;
68 let mut subnet_context_locked = subnet_context
69 .try_lock()
70 .expect("could not lock subnet_context");
71 let SubnetContext {
72 rt_api,
73 subnet_address,
74 subnet_mailbox_tx,
75 fork_entries,
76 bound_subnets,
77 ..
78 } = &mut *subnet_context_locked;
79
80 let registry = rt_api.registry();
81
82 let mut unbound_subnets = vec![];
83 for (address_range, bound_by) in bound_subnets.iter() {
84 if bound_by == fork_address {
85 unbound_subnets.push(*address_range);
86 }
87 }
88
89 for address_range in unbound_subnets {
90 bound_subnets.remove(&address_range);
91 if !registry.unregister(address_range.into()) {
92 log::error!(%address_range, "could not unregister bound subnet");
93 }
94 }
95 if fork_entries.remove(fork_address).is_none() {
96 log::error!(
97 fork = %fork_address,
98 "nothing actually removed from fork_entries"
99 );
100 }
101
102 let fork_lease_opt = self.fork_lease.take();
103 let subnet_system_tx_opt = subnet_mailbox_tx.tx_system.upgrade();
104
105 if let Some((fork_lease, system_tx)) = fork_lease_opt.zip(subnet_system_tx_opt) {
106 let fork_done = SysMsg::ForkDone(fork_lease);
107 let _ = system_tx.send((TraceId::current(), fork_done));
108 }
109
110 if fork_entries.is_empty() {
111 let _ = registry.unregister(*subnet_address);
112 }
113 }
114}