mm1_node/runtime/
context.rs1use std::collections::{BTreeMap, HashMap, VecDeque};
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_address::pool::{Lease, Pool};
7use mm1_address::subnet::NetAddress;
8use mm1_common::log;
9use mm1_common::types::AnyError;
10use mm1_core::envelope::Envelope;
11use mm1_core::tracing::TraceId;
12use tokio::sync::{Notify, mpsc};
13
14use crate::actor_key::ActorKey;
15use crate::config::{Mm1NodeConfig, Valid};
16use crate::registry::{MessageWithPermit, MessageWithoutPermit, WeakSubnetMailboxTx};
17use crate::runtime::rt_api::RtApi;
18use crate::runtime::sys_call;
19use crate::runtime::sys_msg::SysMsg;
20
21mod impl_context_api;
22
23pub struct ActorContext {
24 pub(crate) fork_address: Address,
25 pub(crate) fork_lease: Option<Lease>,
26 pub(crate) ack_to: Option<Address>,
27 pub(crate) call: sys_call::Tx,
28
29 pub(crate) subnet_context: Arc<spin::lock_api::Mutex<SubnetContext>>,
30}
31
32#[derive(Default)]
33pub(crate) struct ForkEntry {
34 fork_notifiy: Arc<Notify>,
35 inbox_priority: VecDeque<MessageWithoutPermit<Envelope>>,
36 inbox_regular: VecDeque<MessageWithPermit<Envelope>>,
37}
38
39pub(crate) struct SubnetContext {
40 pub(crate) rt_api: RtApi,
41 pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
42
43 pub(crate) actor_key: ActorKey,
44 pub(crate) subnet_pool: Pool,
45 pub(crate) subnet_address: NetAddress,
46
47 pub(crate) subnet_mailbox_tx: WeakSubnetMailboxTx<(TraceId, SysMsg), Envelope>,
48
49 pub(crate) subnet_notify: Arc<Notify>,
50 pub(crate) rx_priority: kanal::Receiver<MessageWithoutPermit<Envelope>>,
51 pub(crate) rx_regular: kanal::Receiver<MessageWithPermit<Envelope>>,
52
53 pub(crate) fork_entries: HashMap<Address, ForkEntry>,
54 pub(crate) bound_subnets: BTreeMap<AddressRange, Address>,
55
56 pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
58}
59
60impl Drop for ActorContext {
61 fn drop(&mut self) {
62 let Self {
63 fork_address,
64 subnet_context,
65 ..
66 } = self;
67 let mut subnet_context_locked = subnet_context
68 .try_lock()
69 .expect("could not lock subnet_context");
70 let SubnetContext {
71 rt_api,
72 subnet_address,
73 subnet_mailbox_tx,
74 fork_entries,
75 bound_subnets,
76 ..
77 } = &mut *subnet_context_locked;
78
79 let registry = rt_api.registry();
80
81 let mut unbound_subnets = vec![];
82 for (address_range, bound_by) in bound_subnets.iter() {
83 if bound_by == fork_address {
84 unbound_subnets.push(*address_range);
85 }
86 }
87
88 for address_range in unbound_subnets {
89 bound_subnets.remove(&address_range);
90 if !registry.unregister(address_range.into()) {
91 log::error!("could not unregister {address_range} [bound subnet]");
92 }
93 }
94 if fork_entries.remove(fork_address).is_none() {
95 log::error!(
96 "nothing actually removed from fork_entries [fork: {}]",
97 fork_address
98 );
99 }
100
101 let fork_lease_opt = self.fork_lease.take();
102 let subnet_system_tx_opt = subnet_mailbox_tx.tx_system.upgrade();
103
104 if let Some((fork_lease, system_tx)) = fork_lease_opt.zip(subnet_system_tx_opt) {
105 let fork_done = SysMsg::ForkDone(fork_lease);
106 let _ = system_tx.send((TraceId::current(), fork_done));
107 }
108
109 if fork_entries.is_empty() {
110 let _ = registry.unregister(*subnet_address);
111 }
112 }
113}