mm1_node/runtime/
context.rs

1use std::collections::BTreeMap;
2use std::sync::{Arc, Weak};
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_common::log;
7use mm1_common::types::AnyError;
8use mm1_core::envelope::Envelope;
9use mm1_core::tracing::TraceId;
10use tokio::sync::mpsc;
11
12use crate::actor_key::ActorKey;
13use crate::config::{Mm1NodeConfig, Valid};
14use crate::registry::{ActorNode, MessageWithPermit};
15use crate::runtime::rt_api::RtApi;
16use crate::runtime::sys_call;
17use crate::runtime::sys_msg::SysMsg;
18
19mod impl_context_api;
20
21pub struct ActorContext {
22    pub(crate) rt_api:    RtApi,
23    pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
24
25    pub(crate) actor_key:     ActorKey,
26    pub(crate) address:       Address,
27    pub(crate) ack_to:        Option<Address>,
28    pub(crate) actor_node:    Weak<ActorNode<(TraceId, SysMsg), Envelope>>,
29    pub(crate) network_nodes: BTreeMap<AddressRange, Weak<NetworkNode>>,
30
31    pub(crate) rx_priority:      mpsc::UnboundedReceiver<Envelope>,
32    pub(crate) rx_regular:       mpsc::UnboundedReceiver<MessageWithPermit<Envelope>>,
33    pub(crate) tx_system_weak:   mpsc::WeakUnboundedSender<(TraceId, SysMsg)>,
34    pub(crate) tx_priority_weak: mpsc::WeakUnboundedSender<Envelope>,
35    pub(crate) tx_regular_weak:  mpsc::WeakUnboundedSender<MessageWithPermit<Envelope>>,
36    pub(crate) call:             sys_call::Tx,
37
38    pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
39}
40
41type NetworkNode = crate::registry::NetworkNode<(TraceId, SysMsg), Envelope>;
42
43impl Drop for ActorContext {
44    fn drop(&mut self) {
45        for (address_range, net_node) in std::mem::take(&mut self.network_nodes) {
46            if let Some(_net_node) = net_node.upgrade() {
47                let registry = self.rt_api.registry();
48                if !registry.unregister(address_range.into()) {
49                    log::error!("could not unregister {address_range}");
50                }
51            }
52        }
53
54        if let Some(actor_node) = self.actor_node.upgrade() {
55            let fork_lease = actor_node
56                .unregister(self.address)
57                .expect("already unregistered?");
58            let fork_done = SysMsg::ForkDone(fork_lease);
59            let _ = actor_node.tx_system.send((TraceId::current(), fork_done));
60        }
61    }
62}