mm1_node/runtime/system/
protocol_system.rs

1use mm1_common::errors::error_of::ErrorOf;
2use mm1_core::context::Call;
3use mm1_core::envelope::{Envelope, EnvelopeInfo};
4use mm1_proto_system::{
5    Exit, InitAck, Kill, Link, SpawnErrorKind, SpawnRequest, SpawnResponse, TrapExit, Unlink,
6    Unwatch, Watch, WatchRef,
7};
8use tokio::sync::oneshot;
9use tracing::trace;
10
11use crate::runtime::config::EffectiveActorConfig;
12use crate::runtime::container;
13use crate::runtime::context::ActorContext;
14use crate::runtime::sys_call::SysCall;
15use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
16use crate::runtime::system::Local;
17
18impl Call<Local, SpawnRequest<Local>> for ActorContext {
19    type Outcome = SpawnResponse;
20
21    async fn call(&mut self, _to: Local, message: SpawnRequest<Local>) -> Self::Outcome {
22        let SpawnRequest {
23            runnable,
24            ack_to,
25            link_to,
26        } = message;
27
28        let actor_key = self.actor_key.child(runnable.func_name());
29        let actor_config = self.rt_config.actor_config(&actor_key);
30        let execute_on = self.rt_api.choose_executor(actor_config.runtime_key());
31
32        trace!("starting [ack-to: {:?}; link-to: {:?}]", ack_to, link_to);
33
34        let subnet_lease = self
35            .rt_api
36            .request_address(actor_config.netmask())
37            .await
38            .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
39
40        trace!("subnet-lease: {}", subnet_lease.net_address());
41
42        let rt_api = self.rt_api.clone();
43        let rt_config = self.rt_config.clone();
44        let container = container::Container::create(
45            container::ContainerArgs {
46                ack_to,
47                link_to,
48                actor_key,
49
50                subnet_lease,
51                rt_api,
52                rt_config,
53            },
54            runnable,
55        )
56        .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
57        let actor_address = container.actor_address();
58
59        trace!("actor-address: {}", actor_address);
60
61        // TODO: maybe keep it somewhere too?
62        let _join_handle = execute_on.spawn(container.run());
63
64        Ok(actor_address)
65    }
66}
67
68impl Call<Local, Kill> for ActorContext {
69    type Outcome = bool;
70
71    async fn call(&mut self, _to: Local, message: Kill) -> Self::Outcome {
72        let Kill { peer: address } = message;
73
74        self.rt_api.sys_send(address, SysMsg::Kill).is_ok()
75    }
76}
77
78impl Call<Local, InitAck> for ActorContext {
79    type Outcome = ();
80
81    async fn call(&mut self, _to: Local, message: InitAck) -> Self::Outcome {
82        let Some(ack_to_address) = self.ack_to.take() else {
83            return;
84        };
85        let envelope = Envelope::new(EnvelopeInfo::new(ack_to_address), message);
86        let _ = self
87            .rt_api
88            .send(ack_to_address, true, envelope.into_erased());
89    }
90}
91
92impl Call<Local, TrapExit> for ActorContext {
93    type Outcome = ();
94
95    async fn call(&mut self, _to: Local, message: TrapExit) -> Self::Outcome {
96        let TrapExit { enable } = message;
97
98        self.call.invoke(SysCall::TrapExit(enable)).await;
99    }
100}
101
102impl Call<Local, Link> for ActorContext {
103    type Outcome = ();
104
105    async fn call(&mut self, _to: Local, message: Link) -> Self::Outcome {
106        let Link { peer } = message;
107
108        self.call
109            .invoke(SysCall::Link {
110                sender:   self.actor_address,
111                receiver: peer,
112            })
113            .await;
114    }
115}
116
117impl Call<Local, Unlink> for ActorContext {
118    type Outcome = ();
119
120    async fn call(&mut self, _to: Local, message: Unlink) -> Self::Outcome {
121        let Unlink { peer } = message;
122
123        self.call
124            .invoke(SysCall::Unlink {
125                sender:   self.actor_address,
126                receiver: peer,
127            })
128            .await;
129    }
130}
131
132impl Call<Local, Exit> for ActorContext {
133    type Outcome = bool;
134
135    async fn call(&mut self, _to: Local, msg: Exit) -> Self::Outcome {
136        let Exit { peer } = msg;
137        self.rt_api
138            .sys_send(
139                peer,
140                SysMsg::Link(SysLink::Exit {
141                    sender:   self.actor_address,
142                    receiver: peer,
143                    reason:   ExitReason::Terminate,
144                }),
145            )
146            .is_ok()
147    }
148}
149
150impl Call<Local, Watch> for ActorContext {
151    type Outcome = WatchRef;
152
153    async fn call(&mut self, _to: Local, msg: Watch) -> Self::Outcome {
154        let Watch { peer } = msg;
155        let (reply_tx, reply_rx) = oneshot::channel();
156        self.call
157            .invoke(SysCall::Watch {
158                sender: self.actor_address,
159                receiver: peer,
160                reply_tx,
161            })
162            .await;
163        reply_rx.await.expect("sys-call remained unanswered")
164    }
165}
166
167impl Call<Local, Unwatch> for ActorContext {
168    type Outcome = ();
169
170    async fn call(&mut self, _to: Local, msg: Unwatch) -> Self::Outcome {
171        let Unwatch { watch_ref } = msg;
172        self.call
173            .invoke(SysCall::Unwatch {
174                sender: self.actor_address,
175                watch_ref,
176            })
177            .await;
178    }
179}