mm1_node/runtime/
context.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Weak};
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_common::log;
7use mm1_common::types::AnyError;
8use mm1_core::envelope::Envelope;
9use tokio::sync::mpsc;
10
11use crate::actor_key::ActorKey;
12use crate::config::{Mm1NodeConfig, Valid};
13use crate::registry::{ActorNode, MessageWithPermit, NetworkNode};
14use crate::runtime::rt_api::RtApi;
15use crate::runtime::sys_call;
16use crate::runtime::sys_msg::SysMsg;
17
18mod impl_context_api;
19
20pub struct ActorContext {
21    pub(crate) rt_api:    RtApi,
22    pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
23
24    pub(crate) actor_key:     ActorKey,
25    pub(crate) address:       Address,
26    pub(crate) ack_to:        Option<Address>,
27    pub(crate) actor_node:    Weak<ActorNode<SysMsg, Envelope>>,
28    pub(crate) network_nodes: BTreeMap<AddressRange, Weak<NetworkNode<SysMsg, Envelope>>>,
29
30    pub(crate) rx_priority:      mpsc::UnboundedReceiver<Envelope>,
31    pub(crate) rx_regular:       mpsc::UnboundedReceiver<MessageWithPermit<Envelope>>,
32    pub(crate) tx_system_weak:   mpsc::WeakUnboundedSender<SysMsg>,
33    pub(crate) tx_priority_weak: mpsc::WeakUnboundedSender<Envelope>,
34    pub(crate) tx_regular_weak:  mpsc::WeakUnboundedSender<MessageWithPermit<Envelope>>,
35    pub(crate) call:             sys_call::Tx,
36
37    pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
38}
39
40impl Drop for ActorContext {
41    fn drop(&mut self) {
42        for (address_range, net_node) in std::mem::take(&mut self.network_nodes) {
43            if let Some(_net_node) = net_node.upgrade() {
44                let registry = self.rt_api.registry();
45                if !registry.unregister(address_range.into()) {
46                    log::error!("could not unregister {address_range}");
47                }
48            }
49        }
50
51        if let Some(actor_node) = self.actor_node.upgrade() {
52            let fork_lease = actor_node
53                .unregister(self.address)
54                .expect("already unregistered?");
55            let _ = actor_node.tx_system.send(SysMsg::ForkDone(fork_lease));
56        }
57    }
58}