mm1_node/runtime/
context.rs1use std::collections::BTreeMap;
2use std::sync::{Arc, Weak};
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_common::log;
7use mm1_common::types::AnyError;
8use mm1_core::envelope::Envelope;
9use tokio::sync::mpsc;
10
11use crate::actor_key::ActorKey;
12use crate::config::{Mm1NodeConfig, Valid};
13use crate::registry::{ActorNode, MessageWithPermit, NetworkNode};
14use crate::runtime::rt_api::RtApi;
15use crate::runtime::sys_call;
16use crate::runtime::sys_msg::SysMsg;
17
18mod impl_context_api;
19
20pub struct ActorContext {
21 pub(crate) rt_api: RtApi,
22 pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
23
24 pub(crate) actor_key: ActorKey,
25 pub(crate) address: Address,
26 pub(crate) ack_to: Option<Address>,
27 pub(crate) actor_node: Weak<ActorNode<SysMsg, Envelope>>,
28 pub(crate) network_nodes: BTreeMap<AddressRange, Weak<NetworkNode<SysMsg, Envelope>>>,
29
30 pub(crate) rx_priority: mpsc::UnboundedReceiver<Envelope>,
31 pub(crate) rx_regular: mpsc::UnboundedReceiver<MessageWithPermit<Envelope>>,
32 pub(crate) tx_system_weak: mpsc::WeakUnboundedSender<SysMsg>,
33 pub(crate) tx_priority_weak: mpsc::WeakUnboundedSender<Envelope>,
34 pub(crate) tx_regular_weak: mpsc::WeakUnboundedSender<MessageWithPermit<Envelope>>,
35 pub(crate) call: sys_call::Tx,
36
37 pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
38}
39
40impl Drop for ActorContext {
41 fn drop(&mut self) {
42 for (address_range, net_node) in std::mem::take(&mut self.network_nodes) {
43 if let Some(_net_node) = net_node.upgrade() {
44 let registry = self.rt_api.registry();
45 if !registry.unregister(address_range.into()) {
46 log::error!("could not unregister {address_range}");
47 }
48 }
49 }
50
51 if let Some(actor_node) = self.actor_node.upgrade() {
52 let fork_lease = actor_node
53 .unregister(self.address)
54 .expect("already unregistered?");
55 let _ = actor_node.tx_system.send(SysMsg::ForkDone(fork_lease));
56 }
57 }
58}