mm1_node/runtime/
context.rs1use std::sync::Arc;
2
3use futures::FutureExt;
4use mm1_address::address::Address;
5use mm1_address::pool::Pool as SubnetPool;
6use mm1_address::subnet::NetMask;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::log::trace;
9use mm1_common::types::Never;
10use mm1_core::context::{Call, Fork, ForkErrorKind, Quit, Recv, RecvErrorKind, TellErrorKind};
11use mm1_core::envelope::{Envelope, EnvelopeInfo};
12use mm1_core::message::AnyMessage;
13
14use super::config::{EffectiveActorConfig, Mm1Config};
15use crate::runtime::actor_key::ActorKey;
16use crate::runtime::mq;
17use crate::runtime::rt_api::RtApi;
18use crate::runtime::sys_call::{self, SysCall};
19use crate::runtime::sys_msg::SysMsg;
20
21pub struct ActorContext {
22 pub(crate) rt_api: RtApi,
23 pub(crate) rt_config: Arc<Mm1Config>,
24 pub(crate) actor_address: Address,
25 pub(crate) rx_priority: mq::UbRx<Envelope>,
26 pub(crate) rx_regular: mq::Rx<Envelope>,
27 pub(crate) tx_system_weak: mq::UbTxWeak<SysMsg>,
28 pub(crate) call: sys_call::Tx,
29 pub(crate) subnet_pool: SubnetPool,
30 pub(crate) actor_key: ActorKey,
31 pub(crate) ack_to: Option<Address>,
32 pub(crate) unregister_on_drop: bool,
33}
34
35impl Drop for ActorContext {
36 fn drop(&mut self) {
37 if self.unregister_on_drop {
38 if let Some((address_lease, tx_system)) = self.rt_api.unregister(self.actor_address) {
39 let _ = tx_system.send(SysMsg::ForkDone(address_lease));
40 }
41 }
42 }
43}
44
45impl Call<Address, AnyMessage> for ActorContext {
46 type Outcome = Result<(), ErrorOf<TellErrorKind>>;
47
48 async fn call(&mut self, to: Address, msg: AnyMessage) -> Self::Outcome {
49 let info = EnvelopeInfo::new(to);
50 let outbound = Envelope::new(info, msg);
51 trace!("sending [outbound: {:?}]", outbound);
52 self.rt_api
53 .send(to, false, outbound)
54 .map_err(|k| ErrorOf::new(k, ""))
55 }
56}
57
58impl Quit for ActorContext {
59 async fn quit_ok(&mut self) -> Never {
60 self.call.invoke(SysCall::Exit(Ok(()))).await;
61 std::future::pending().await
62 }
63
64 async fn quit_err<E>(&mut self, reason: E) -> Never
65 where
66 E: std::error::Error + Send + Sync + 'static,
67 {
68 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
69 std::future::pending().await
70 }
71}
72
73impl Fork for ActorContext {
74 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
75 let address_lease = self
76 .subnet_pool
77 .lease(NetMask::M_64)
78 .map_err(|lease_error| {
79 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
80 })?;
81 let actor_address = address_lease.address;
82
83 let (tx_priority, rx_priority) = mq::unbounded();
84 let (tx_regular, rx_regular) = mq::bounded(
85 self.rt_config
86 .actor_config(&self.actor_key)
87 .fork_inbox_size(),
88 );
89 let tx_system_weak = self.tx_system_weak.clone();
90 let tx_system = tx_system_weak
91 .upgrade()
92 .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "tx_system_weak.upgrade"))?;
93
94 self.call
95 .invoke(SysCall::ForkAdded(
96 address_lease.address,
97 tx_priority.downgrade(),
98 ))
99 .await;
100
101 let () = self
102 .rt_api
103 .register(address_lease, tx_system, tx_priority, tx_regular);
104
105 let subnet_pool = self.subnet_pool.clone();
106 let rt_api = self.rt_api.clone();
107 let rt_config = self.rt_config.clone();
108
109 let actor_key = self.actor_key.clone();
110
111 let context = Self {
112 rt_api,
113 rt_config,
114 actor_address,
115 rx_priority,
116 rx_regular,
117 tx_system_weak,
118 call: self.call.clone(),
119 actor_key,
120 subnet_pool,
121 ack_to: None,
122 unregister_on_drop: true,
123 };
124
125 Ok(context)
126 }
127
128 async fn run<F, Fut>(self, fun: F)
129 where
130 F: FnOnce(Self) -> Fut,
131 F: Send + 'static,
132 Fut: std::future::Future + Send + 'static,
133 {
134 let call = self.call.clone();
135 let fut = fun(self).map(|_| ()).boxed();
136 call.invoke(SysCall::Spawn(fut)).await;
137 }
138}
139
140impl Recv for ActorContext {
141 fn address(&self) -> Address {
142 self.actor_address
143 }
144
145 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
146 let (priority, inbound_opt) = tokio::select! {
147 biased;
148
149 inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
150 inbound_opt = self.rx_regular.recv() => (false, inbound_opt),
151 };
152
153 trace!(
154 "received [priority: {}; inbound: {:?}]",
155 priority,
156 inbound_opt
157 );
158
159 inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
160 }
161
162 async fn close(&mut self) {
163 self.rx_regular.close();
164 self.rx_priority.close();
165 }
166}