mm1_node/runtime/
context.rs

1use std::collections::{BTreeMap, HashMap, VecDeque};
2use std::sync::Arc;
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_address::pool::{Lease, Pool};
7use mm1_address::subnet::NetAddress;
8use mm1_common::log;
9use mm1_common::types::AnyError;
10use mm1_core::envelope::Envelope;
11use mm1_core::tracing::TraceId;
12use tokio::sync::{Notify, mpsc};
13
14use crate::actor_key::ActorKey;
15use crate::config::{Mm1NodeConfig, Valid};
16use crate::registry::{MessageWithPermit, MessageWithoutPermit, WeakSubnetMailboxTx};
17use crate::runtime::rt_api::RtApi;
18use crate::runtime::sys_call;
19use crate::runtime::sys_msg::SysMsg;
20
21mod impl_context_api;
22
23pub struct ActorContext {
24    pub(crate) fork_address: Address,
25    pub(crate) fork_lease:   Option<Lease>,
26    pub(crate) ack_to:       Option<Address>,
27    pub(crate) call:         sys_call::Tx,
28
29    pub(crate) subnet_context: Arc<spin::lock_api::Mutex<SubnetContext>>,
30}
31
32#[derive(Default)]
33pub(crate) struct ForkEntry {
34    fork_notifiy:   Arc<Notify>,
35    inbox_priority: VecDeque<MessageWithoutPermit<Envelope>>,
36    inbox_regular:  VecDeque<MessageWithPermit<Envelope>>,
37}
38
39pub(crate) struct SubnetContext {
40    pub(crate) rt_api:    RtApi,
41    pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
42
43    pub(crate) actor_key:      ActorKey,
44    pub(crate) subnet_pool:    Pool,
45    pub(crate) subnet_address: NetAddress,
46
47    pub(crate) subnet_mailbox_tx: WeakSubnetMailboxTx<(TraceId, SysMsg), Envelope>,
48
49    pub(crate) subnet_notify: Arc<Notify>,
50    pub(crate) rx_priority:   kanal::Receiver<MessageWithoutPermit<Envelope>>,
51    pub(crate) rx_regular:    kanal::Receiver<MessageWithPermit<Envelope>>,
52
53    pub(crate) fork_entries:  HashMap<Address, ForkEntry>,
54    pub(crate) bound_subnets: BTreeMap<AddressRange, Address>,
55
56    // TODO: send the NetAddress instead of Address here
57    pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
58}
59
60impl Drop for ActorContext {
61    fn drop(&mut self) {
62        let Self {
63            fork_address,
64            subnet_context,
65            ..
66        } = self;
67        let mut subnet_context_locked = subnet_context
68            .try_lock()
69            .expect("could not lock subnet_context");
70        let SubnetContext {
71            rt_api,
72            subnet_address,
73            subnet_mailbox_tx,
74            fork_entries,
75            bound_subnets,
76            ..
77        } = &mut *subnet_context_locked;
78
79        let registry = rt_api.registry();
80
81        let mut unbound_subnets = vec![];
82        for (address_range, bound_by) in bound_subnets.iter() {
83            if bound_by == fork_address {
84                unbound_subnets.push(*address_range);
85            }
86        }
87
88        for address_range in unbound_subnets {
89            bound_subnets.remove(&address_range);
90            if !registry.unregister(address_range.into()) {
91                log::error!("could not unregister {address_range} [bound subnet]");
92            }
93        }
94        if fork_entries.remove(fork_address).is_none() {
95            log::error!(
96                "nothing actually removed from fork_entries [fork: {}]",
97                fork_address
98            );
99        }
100
101        let fork_lease_opt = self.fork_lease.take();
102        let subnet_system_tx_opt = subnet_mailbox_tx.tx_system.upgrade();
103
104        if let Some((fork_lease, system_tx)) = fork_lease_opt.zip(subnet_system_tx_opt) {
105            let fork_done = SysMsg::ForkDone(fork_lease);
106            let _ = system_tx.send((TraceId::current(), fork_done));
107        }
108
109        if fork_entries.is_empty() {
110            let _ = registry.unregister(*subnet_address);
111        }
112    }
113}