mm1_node/runtime/system/
protocol_system.rs1use mm1_common::errors::error_of::ErrorOf;
2use mm1_core::context::Call;
3use mm1_core::envelope::{Envelope, EnvelopeInfo};
4use mm1_proto_system::{
5 Exit, InitAck, Kill, Link, SpawnErrorKind, SpawnRequest, SpawnResponse, TrapExit, Unlink,
6 Unwatch, Watch, WatchRef,
7};
8use tokio::sync::oneshot;
9use tracing::trace;
10
11use crate::runtime::config::EffectiveActorConfig;
12use crate::runtime::container;
13use crate::runtime::context::ActorContext;
14use crate::runtime::sys_call::SysCall;
15use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
16use crate::runtime::system::Local;
17
18impl Call<Local, SpawnRequest<Local>> for ActorContext {
19 type Outcome = SpawnResponse;
20
21 async fn call(&mut self, _to: Local, message: SpawnRequest<Local>) -> Self::Outcome {
22 let SpawnRequest {
23 runnable,
24 ack_to,
25 link_to,
26 } = message;
27
28 let actor_key = self.actor_key.child(runnable.func_name());
29 let actor_config = self.rt_config.actor_config(&actor_key);
30 let execute_on = self.rt_api.choose_executor(actor_config.runtime_key());
31
32 trace!("starting [ack-to: {:?}; link-to: {:?}]", ack_to, link_to);
33
34 let subnet_lease = self
35 .rt_api
36 .request_address(actor_config.netmask())
37 .await
38 .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
39
40 trace!("subnet-lease: {}", subnet_lease.net_address());
41
42 let rt_api = self.rt_api.clone();
43 let rt_config = self.rt_config.clone();
44 let container = container::Container::create(
45 container::ContainerArgs {
46 ack_to,
47 link_to,
48 actor_key,
49
50 subnet_lease,
51 rt_api,
52 rt_config,
53 },
54 runnable,
55 )
56 .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
57 let actor_address = container.actor_address();
58
59 trace!("actor-address: {}", actor_address);
60
61 let _join_handle = execute_on.spawn(container.run());
63
64 Ok(actor_address)
65 }
66}
67
68impl Call<Local, Kill> for ActorContext {
69 type Outcome = bool;
70
71 async fn call(&mut self, _to: Local, message: Kill) -> Self::Outcome {
72 let Kill { peer: address } = message;
73
74 self.rt_api.sys_send(address, SysMsg::Kill).is_ok()
75 }
76}
77
78impl Call<Local, InitAck> for ActorContext {
79 type Outcome = ();
80
81 async fn call(&mut self, _to: Local, message: InitAck) -> Self::Outcome {
82 let Some(ack_to_address) = self.ack_to.take() else {
83 return;
84 };
85 let envelope = Envelope::new(EnvelopeInfo::new(ack_to_address), message);
86 let _ = self
87 .rt_api
88 .send(ack_to_address, true, envelope.into_erased());
89 }
90}
91
92impl Call<Local, TrapExit> for ActorContext {
93 type Outcome = ();
94
95 async fn call(&mut self, _to: Local, message: TrapExit) -> Self::Outcome {
96 let TrapExit { enable } = message;
97
98 self.call.invoke(SysCall::TrapExit(enable)).await;
99 }
100}
101
102impl Call<Local, Link> for ActorContext {
103 type Outcome = ();
104
105 async fn call(&mut self, _to: Local, message: Link) -> Self::Outcome {
106 let Link { peer } = message;
107
108 self.call
109 .invoke(SysCall::Link {
110 sender: self.actor_address,
111 receiver: peer,
112 })
113 .await;
114 }
115}
116
117impl Call<Local, Unlink> for ActorContext {
118 type Outcome = ();
119
120 async fn call(&mut self, _to: Local, message: Unlink) -> Self::Outcome {
121 let Unlink { peer } = message;
122
123 self.call
124 .invoke(SysCall::Unlink {
125 sender: self.actor_address,
126 receiver: peer,
127 })
128 .await;
129 }
130}
131
132impl Call<Local, Exit> for ActorContext {
133 type Outcome = bool;
134
135 async fn call(&mut self, _to: Local, msg: Exit) -> Self::Outcome {
136 let Exit { peer } = msg;
137 self.rt_api
138 .sys_send(
139 peer,
140 SysMsg::Link(SysLink::Exit {
141 sender: self.actor_address,
142 receiver: peer,
143 reason: ExitReason::Terminate,
144 }),
145 )
146 .is_ok()
147 }
148}
149
150impl Call<Local, Watch> for ActorContext {
151 type Outcome = WatchRef;
152
153 async fn call(&mut self, _to: Local, msg: Watch) -> Self::Outcome {
154 let Watch { peer } = msg;
155 let (reply_tx, reply_rx) = oneshot::channel();
156 self.call
157 .invoke(SysCall::Watch {
158 sender: self.actor_address,
159 receiver: peer,
160 reply_tx,
161 })
162 .await;
163 reply_rx.await.expect("sys-call remained unanswered")
164 }
165}
166
167impl Call<Local, Unwatch> for ActorContext {
168 type Outcome = ();
169
170 async fn call(&mut self, _to: Local, msg: Unwatch) -> Self::Outcome {
171 let Unwatch { watch_ref } = msg;
172 self.call
173 .invoke(SysCall::Unwatch {
174 sender: self.actor_address,
175 watch_ref,
176 })
177 .await;
178 }
179}