mm1_node/runtime/
context.rs1use std::collections::BTreeMap;
2use std::sync::{Arc, Weak};
3
4use mm1_address::address::Address;
5use mm1_address::address_range::AddressRange;
6use mm1_common::log;
7use mm1_common::types::AnyError;
8use mm1_core::envelope::Envelope;
9use mm1_core::tracing::TraceId;
10use tokio::sync::mpsc;
11
12use crate::actor_key::ActorKey;
13use crate::config::{Mm1NodeConfig, Valid};
14use crate::registry::{ActorNode, MessageWithPermit};
15use crate::runtime::rt_api::RtApi;
16use crate::runtime::sys_call;
17use crate::runtime::sys_msg::SysMsg;
18
19mod impl_context_api;
20
21pub struct ActorContext {
22 pub(crate) rt_api: RtApi,
23 pub(crate) rt_config: Arc<Valid<Mm1NodeConfig>>,
24
25 pub(crate) actor_key: ActorKey,
26 pub(crate) address: Address,
27 pub(crate) ack_to: Option<Address>,
28 pub(crate) actor_node: Weak<ActorNode<(TraceId, SysMsg), Envelope>>,
29 pub(crate) network_nodes: BTreeMap<AddressRange, Weak<NetworkNode>>,
30
31 pub(crate) rx_priority: mpsc::UnboundedReceiver<Envelope>,
32 pub(crate) rx_regular: mpsc::UnboundedReceiver<MessageWithPermit<Envelope>>,
33 pub(crate) tx_system_weak: mpsc::WeakUnboundedSender<(TraceId, SysMsg)>,
34 pub(crate) tx_priority_weak: mpsc::WeakUnboundedSender<Envelope>,
35 pub(crate) tx_regular_weak: mpsc::WeakUnboundedSender<MessageWithPermit<Envelope>>,
36 pub(crate) call: sys_call::Tx,
37
38 pub(crate) tx_actor_failure: mpsc::UnboundedSender<(Address, AnyError)>,
39}
40
41type NetworkNode = crate::registry::NetworkNode<(TraceId, SysMsg), Envelope>;
42
43impl Drop for ActorContext {
44 fn drop(&mut self) {
45 for (address_range, net_node) in std::mem::take(&mut self.network_nodes) {
46 if let Some(_net_node) = net_node.upgrade() {
47 let registry = self.rt_api.registry();
48 if !registry.unregister(address_range.into()) {
49 log::error!("could not unregister {address_range}");
50 }
51 }
52 }
53
54 if let Some(actor_node) = self.actor_node.upgrade() {
55 let fork_lease = actor_node
56 .unregister(self.address)
57 .expect("already unregistered?");
58 let fork_done = SysMsg::ForkDone(fork_lease);
59 let _ = actor_node.tx_system.send((TraceId::current(), fork_done));
60 }
61 }
62}