mm1_node/runtime/
context.rs

1use std::sync::Arc;
2
3use futures::FutureExt;
4use mm1_address::address::Address;
5use mm1_address::pool::Pool as SubnetPool;
6use mm1_address::subnet::NetMask;
7use mm1_common::errors::error_of::ErrorOf;
8use mm1_common::log::trace;
9use mm1_common::types::Never;
10use mm1_core::context::{Call, Fork, ForkErrorKind, Quit, Recv, RecvErrorKind, TellErrorKind};
11use mm1_core::envelope::{Envelope, EnvelopeInfo};
12use mm1_core::message::AnyMessage;
13
14use super::config::{EffectiveActorConfig, Mm1Config};
15use crate::runtime::actor_key::ActorKey;
16use crate::runtime::mq;
17use crate::runtime::rt_api::RtApi;
18use crate::runtime::sys_call::{self, SysCall};
19use crate::runtime::sys_msg::SysMsg;
20
21pub struct ActorContext {
22    pub(crate) rt_api:             RtApi,
23    pub(crate) rt_config:          Arc<Mm1Config>,
24    pub(crate) actor_address:      Address,
25    pub(crate) rx_priority:        mq::UbRx<Envelope>,
26    pub(crate) rx_regular:         mq::Rx<Envelope>,
27    pub(crate) tx_system_weak:     mq::UbTxWeak<SysMsg>,
28    pub(crate) call:               sys_call::Tx,
29    pub(crate) subnet_pool:        SubnetPool,
30    pub(crate) actor_key:          ActorKey,
31    pub(crate) ack_to:             Option<Address>,
32    pub(crate) unregister_on_drop: bool,
33}
34
35impl Drop for ActorContext {
36    fn drop(&mut self) {
37        if self.unregister_on_drop {
38            if let Some((address_lease, tx_system)) = self.rt_api.unregister(self.actor_address) {
39                let _ = tx_system.send(SysMsg::ForkDone(address_lease));
40            }
41        }
42    }
43}
44
45impl Call<Address, AnyMessage> for ActorContext {
46    type Outcome = Result<(), ErrorOf<TellErrorKind>>;
47
48    async fn call(&mut self, to: Address, msg: AnyMessage) -> Self::Outcome {
49        let info = EnvelopeInfo::new(to);
50        let outbound = Envelope::new(info, msg);
51        trace!("sending [outbound: {:?}]", outbound);
52        self.rt_api
53            .send(to, false, outbound)
54            .map_err(|k| ErrorOf::new(k, ""))
55    }
56}
57
58impl Quit for ActorContext {
59    async fn quit_ok(&mut self) -> Never {
60        self.call.invoke(SysCall::Exit(Ok(()))).await;
61        std::future::pending().await
62    }
63
64    async fn quit_err<E>(&mut self, reason: E) -> Never
65    where
66        E: std::error::Error + Send + Sync + 'static,
67    {
68        self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
69        std::future::pending().await
70    }
71}
72
73impl Fork for ActorContext {
74    async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
75        let address_lease = self
76            .subnet_pool
77            .lease(NetMask::M_64)
78            .map_err(|lease_error| {
79                ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
80            })?;
81        let actor_address = address_lease.address;
82
83        let (tx_priority, rx_priority) = mq::unbounded();
84        let (tx_regular, rx_regular) = mq::bounded(
85            self.rt_config
86                .actor_config(&self.actor_key)
87                .fork_inbox_size(),
88        );
89        let tx_system_weak = self.tx_system_weak.clone();
90        let tx_system = tx_system_weak
91            .upgrade()
92            .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "tx_system_weak.upgrade"))?;
93
94        self.call
95            .invoke(SysCall::ForkAdded(
96                address_lease.address,
97                tx_priority.downgrade(),
98            ))
99            .await;
100
101        let () = self
102            .rt_api
103            .register(address_lease, tx_system, tx_priority, tx_regular);
104
105        let subnet_pool = self.subnet_pool.clone();
106        let rt_api = self.rt_api.clone();
107        let rt_config = self.rt_config.clone();
108
109        let actor_key = self.actor_key.clone();
110
111        let context = Self {
112            rt_api,
113            rt_config,
114            actor_address,
115            rx_priority,
116            rx_regular,
117            tx_system_weak,
118            call: self.call.clone(),
119            actor_key,
120            subnet_pool,
121            ack_to: None,
122            unregister_on_drop: true,
123        };
124
125        Ok(context)
126    }
127
128    async fn run<F, Fut>(self, fun: F)
129    where
130        F: FnOnce(Self) -> Fut,
131        F: Send + 'static,
132        Fut: std::future::Future + Send + 'static,
133    {
134        let call = self.call.clone();
135        let fut = fun(self).map(|_| ()).boxed();
136        call.invoke(SysCall::Spawn(fut)).await;
137    }
138}
139
140impl Recv for ActorContext {
141    fn address(&self) -> Address {
142        self.actor_address
143    }
144
145    async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
146        let (priority, inbound_opt) = tokio::select! {
147            biased;
148
149            inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
150            inbound_opt = self.rx_regular.recv() => (false, inbound_opt),
151        };
152
153        trace!(
154            "received [priority: {}; inbound: {:?}]",
155            priority,
156            inbound_opt
157        );
158
159        inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
160    }
161
162    async fn close(&mut self) {
163        self.rx_regular.close();
164        self.rx_priority.close();
165    }
166}