1use std::sync::Arc;
2use std::time::Duration;
3
4use futures::FutureExt;
5use mm1_address::address::Address;
6use mm1_address::address_range::AddressRange;
7use mm1_address::pool::Lease;
8use mm1_address::subnet::NetAddress;
9use mm1_common::errors::error_of::ErrorOf;
10use mm1_common::futures::timeout::FutureTimeoutExt;
11use mm1_common::log;
12use mm1_common::types::{AnyError, Never};
13use mm1_core::context::{
14 Bind, BindArgs, BindErrorKind, Fork, ForkErrorKind, InitDone, Linking, Messaging, Now, Quit,
15 RecvErrorKind, SendErrorKind, Start, Stop, Watching,
16};
17use mm1_core::envelope::{Envelope, EnvelopeHeader, dispatch};
18use mm1_proto_system::{InitAck, SpawnErrorKind, StartErrorKind, WatchRef};
19use mm1_runnable::local::BoxedRunnable;
20use tokio::sync::{mpsc, oneshot};
21use tokio::time::Instant;
22use tracing::trace;
23
24use crate::config::EffectiveActorConfig;
25use crate::registry::{ForkEntry, NetworkNode};
26use crate::runtime::container;
27use crate::runtime::context::ActorContext;
28use crate::runtime::sys_call::SysCall;
29use crate::runtime::sys_msg::{ExitReason, SysLink, SysMsg};
30
31impl Quit for ActorContext {
32 async fn quit_ok(&mut self) -> Never {
33 self.call.invoke(SysCall::Exit(Ok(()))).await;
34 std::future::pending().await
35 }
36
37 async fn quit_err<E>(&mut self, reason: E) -> Never
38 where
39 E: std::error::Error + Send + Sync + 'static,
40 {
41 self.call.invoke(SysCall::Exit(Err(reason.into()))).await;
42 std::future::pending().await
43 }
44}
45
46impl Fork for ActorContext {
47 async fn fork(&mut self) -> Result<Self, ErrorOf<ForkErrorKind>> {
48 let actor_node = self
49 .actor_node
50 .upgrade()
51 .ok_or_else(|| ErrorOf::new(ForkErrorKind::InternalError, "actor_node.upgrade"))?;
52 let fork_lease = actor_node.lease_address().map_err(|lease_error| {
53 ErrorOf::new(ForkErrorKind::ResourceConstraint, lease_error.to_string())
54 })?;
55 let fork_address = fork_lease.address;
56
57 let (tx_priority, rx_priority) = mpsc::unbounded_channel();
58 let (tx_regular, rx_regular) = mpsc::unbounded_channel();
59 let tx_system_weak = self.tx_system_weak.clone();
60
61 self.call
62 .invoke(SysCall::ForkAdded(
63 fork_lease.address,
64 tx_priority.downgrade(),
65 ))
66 .await;
67
68 let tx_priority_weak = tx_priority.downgrade();
69 let tx_regular_weak = tx_regular.downgrade();
70 let fork_entry = ForkEntry::new(fork_lease, tx_priority, tx_regular);
71
72 let registered = actor_node.register(fork_address, fork_entry);
73 assert!(registered);
74
75 let rt_api = self.rt_api.clone();
76 let rt_config = self.rt_config.clone();
77
78 let actor_key = self.actor_key.clone();
79
80 let context = Self {
81 rt_api,
82 rt_config,
83
84 address: fork_address,
85 actor_key,
86 ack_to: None,
87 actor_node: self.actor_node.clone(),
88 network_nodes: Default::default(),
89
90 rx_priority,
91 rx_regular,
92 tx_system_weak,
93 tx_priority_weak,
94 tx_regular_weak,
95 call: self.call.clone(),
96
97 tx_actor_failure: self.tx_actor_failure.clone(),
98 };
99
100 Ok(context)
101 }
102
103 async fn run<F, Fut>(self, fun: F)
104 where
105 F: FnOnce(Self) -> Fut,
106 F: Send + 'static,
107 Fut: Future + Send + 'static,
108 {
109 let call = self.call.clone();
110 let fut = fun(self).map(|_| ()).boxed();
111 call.invoke(SysCall::Spawn(fut)).await;
112 }
113}
114
115impl Now for ActorContext {
116 type Instant = Instant;
117
118 fn now(&self) -> Self::Instant {
119 Instant::now()
120 }
121}
122
123impl Start<BoxedRunnable<Self>> for ActorContext {
124 fn spawn(
125 &mut self,
126 runnable: BoxedRunnable<Self>,
127 link: bool,
128 ) -> impl Future<Output = Result<Address, ErrorOf<SpawnErrorKind>>> + Send {
129 do_spawn(self, runnable, None, link.then_some(self.address))
130 }
131
132 fn start(
133 &mut self,
134 runnable: BoxedRunnable<Self>,
135 link: bool,
136 start_timeout: Duration,
137 ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
138 do_start(self, runnable, link, start_timeout)
139 }
140}
141
142impl Stop for ActorContext {
143 fn exit(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
144 let this = self.address;
145 let out = do_exit(self, this, peer).is_ok();
146 std::future::ready(out)
147 }
148
149 fn kill(&mut self, peer: Address) -> impl Future<Output = bool> + Send {
150 let out = do_kill(self, peer).is_ok();
151 std::future::ready(out)
152 }
153}
154
155impl Linking for ActorContext {
156 fn link(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
157 let this = self.address;
158 do_link(self, this, peer)
159 }
160
161 fn unlink(&mut self, peer: Address) -> impl Future<Output = ()> + Send {
162 let this = self.address;
163 do_unlink(self, this, peer)
164 }
165
166 fn set_trap_exit(&mut self, enable: bool) -> impl Future<Output = ()> + Send {
167 do_set_trap_exit(self, enable)
168 }
169}
170
171impl Watching for ActorContext {
172 fn watch(&mut self, peer: Address) -> impl Future<Output = mm1_proto_system::WatchRef> + Send {
173 do_watch(self, peer)
174 }
175
176 fn unwatch(
177 &mut self,
178 watch_ref: mm1_proto_system::WatchRef,
179 ) -> impl Future<Output = ()> + Send {
180 do_unwatch(self, watch_ref)
181 }
182}
183
184impl InitDone for ActorContext {
185 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
186 do_init_done(self, address)
187 }
188}
189
190impl Messaging for ActorContext {
191 fn address(&self) -> Address {
192 self.address
193 }
194
195 async fn recv(&mut self) -> Result<Envelope, ErrorOf<RecvErrorKind>> {
196 let (priority, inbound_opt) = tokio::select! {
197 biased;
198
199 inbound_opt = self.rx_priority.recv() => (true, inbound_opt),
200 inbound_opt = self.rx_regular.recv() => (false, inbound_opt.map(|m| m.message)),
201 };
202
203 trace!(
204 "received [priority: {}; inbound: {:?}; via {}]",
205 priority, inbound_opt, self.address
206 );
207
208 inbound_opt.ok_or(ErrorOf::new(RecvErrorKind::Closed, "closed"))
209 }
210
211 async fn close(&mut self) {
212 self.rx_regular.close();
213 self.rx_priority.close();
214 }
215
216 fn send(
217 &mut self,
218 envelope: Envelope,
219 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
220 std::future::ready(do_send(self, envelope))
221 }
222
223 fn forward(
224 &mut self,
225 to: Address,
226 envelope: Envelope,
227 ) -> impl Future<Output = Result<(), ErrorOf<SendErrorKind>>> + Send {
228 std::future::ready(do_forward(self, to, envelope))
229 }
230}
231
232impl Bind<NetAddress> for ActorContext {
233 async fn bind(&mut self, args: BindArgs<NetAddress>) -> Result<(), ErrorOf<BindErrorKind>> {
234 use std::collections::btree_map::Entry::*;
235
236 let BindArgs {
237 bind_to,
238 inbox_size,
239 } = args;
240
241 log::debug!("binding [to: {}; inbox-size: {}]", bind_to, inbox_size);
242
243 let rt_api = self.rt_api.clone();
244 let registry = rt_api.registry();
245
246 let address_range = AddressRange::from(bind_to);
247
248 let network_nodes_entry = match self.network_nodes.entry(address_range) {
249 Occupied(o) => {
250 let previously_bound_to = NetAddress::from(*o.key());
251 return Err(ErrorOf::new(
252 BindErrorKind::Conflict,
253 format!("conflict [requested: {bind_to}; existing: {previously_bound_to}]"),
254 ))
255 },
256 Vacant(v) => v,
257 };
258
259 let subnet_lease = Lease::trusted(bind_to);
260
261 let (tx_system, _rx_system) = mpsc::unbounded_channel();
262
263 let tx_priority = self.tx_priority_weak.upgrade().ok_or_else(|| {
264 ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
265 })?;
266 let tx_regular = self.tx_regular_weak.upgrade().ok_or_else(|| {
267 ErrorOf::new(BindErrorKind::Closed, "tx_priority_weak.upgrade failed")
268 })?;
269
270 let network_node = Arc::new(NetworkNode::new(
271 subnet_lease,
272 inbox_size,
273 tx_system,
274 tx_priority,
275 tx_regular,
276 ));
277
278 if !registry.register(bind_to, network_node.clone()) {
279 log::warn!("couldn't bind [to: {}; reason: conflict]", bind_to);
280 return Err(ErrorOf::new(
281 BindErrorKind::Conflict,
282 "probably address in use",
283 ))
284 }
285
286 let network_node = Arc::downgrade(&network_node);
287 network_nodes_entry.insert(network_node);
288
289 log::info!("bound [to: {}; inbox-size: {}]", bind_to, inbox_size);
290
291 Ok(())
292 }
293}
294
295async fn do_start(
296 context: &mut ActorContext,
297 runnable: BoxedRunnable<ActorContext>,
298 link: bool,
299 timeout: Duration,
300) -> Result<Address, ErrorOf<StartErrorKind>> {
301 let this_address = context.address;
302
303 let mut fork = context
304 .fork()
305 .await
306 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
307
308 let fork_address = fork.address;
309 let spawned_address = do_spawn(&mut fork, runnable, Some(fork_address), Some(fork_address))
310 .await
311 .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
312
313 let envelope = match fork.recv().timeout(timeout).await {
314 Err(_elapsed) => {
315 do_kill(context, spawned_address)
316 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
317
318 return Err(ErrorOf::new(
321 StartErrorKind::Timeout,
322 "no init-ack within timeout",
323 ))
324 },
325 Ok(recv_result) => {
326 recv_result.map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
327 },
328 };
329
330 dispatch!(match envelope {
331 mm1_proto_system::InitAck { address } => {
332 if link {
333 do_link(context, this_address, address).await;
334 }
335 Ok(address)
336 },
337
338 mm1_proto_system::Exited { .. } => {
339 Err(ErrorOf::new(
340 StartErrorKind::Exited,
341 "exited before init-ack",
342 ))
343 },
344
345 unexpected @ _ => {
346 Err(ErrorOf::new(
347 StartErrorKind::InternalError,
348 format!("unexpected message: {unexpected:?}"),
349 ))
350 },
351 })
352}
353
354async fn do_spawn(
355 context: &mut ActorContext,
356 runnable: BoxedRunnable<ActorContext>,
357 ack_to: Option<Address>,
358 link_to: impl IntoIterator<Item = Address>,
359) -> Result<Address, ErrorOf<SpawnErrorKind>> {
360 let actor_key = context.actor_key.child(runnable.func_name());
361 let actor_config = context.rt_config.actor_config(&actor_key);
362 let execute_on = context.rt_api.choose_executor(actor_config.runtime_key());
363
364 trace!("starting [ack-to: {:?}]", ack_to);
365
366 let subnet_lease = context
367 .rt_api
368 .request_address(actor_config.netmask())
369 .await
370 .map_err(|e| ErrorOf::new(SpawnErrorKind::ResourceConstraint, e.to_string()))?;
371
372 let rt_api = context.rt_api.clone();
373 let rt_config = context.rt_config.clone();
374 let container = container::Container::create(
375 container::ContainerArgs {
376 ack_to,
377 link_to: link_to.into_iter().collect(),
379 actor_key,
380
381 subnet_lease,
382 rt_api,
383 rt_config,
384 },
385 runnable,
386 context.tx_actor_failure.clone(),
387 )
388 .map_err(|e| ErrorOf::new(SpawnErrorKind::InternalError, e.to_string()))?;
389 let actor_address = container.actor_address();
390
391 trace!("actor-address: {}", actor_address);
392
393 let tx_actor_failure = context.tx_actor_failure.clone();
394 let _join_handle = execute_on.spawn(async move {
396 match container.run().await {
397 Ok(Ok(())) => (),
398 Ok(Err(actor_failure)) => {
399 let _ = tx_actor_failure.send((actor_address, actor_failure));
400 },
401 Err(container_failure) => {
402 let report = AnyError::from(container_failure);
403 mm1_common::log::error!(
404 "actor container failure [addr: {}]: {}",
405 actor_address,
406 report
407 .chain()
408 .map(|e| e.to_string())
409 .collect::<Vec<_>>()
410 .join(" <- ")
411 );
412 },
413 }
414 });
415
416 Ok(actor_address)
417}
418
419fn do_exit(context: &mut ActorContext, this: Address, peer: Address) -> Result<(), SendErrorKind> {
420 context.rt_api.sys_send(
421 peer,
422 SysMsg::Link(SysLink::Exit {
423 sender: this,
424 receiver: peer,
425 reason: ExitReason::Terminate,
426 }),
427 )
428}
429
430fn do_kill(context: &mut ActorContext, peer: Address) -> Result<(), SendErrorKind> {
431 context.rt_api.sys_send(peer, SysMsg::Kill)
432}
433
434async fn do_link(context: &mut ActorContext, this: Address, peer: Address) {
435 context
436 .call
437 .invoke(SysCall::Link {
438 sender: this,
439 receiver: peer,
440 })
441 .await
442}
443
444async fn do_unlink(context: &mut ActorContext, this: Address, peer: Address) {
445 context
446 .call
447 .invoke(SysCall::Unlink {
448 sender: this,
449 receiver: peer,
450 })
451 .await
452}
453
454async fn do_set_trap_exit(context: &mut ActorContext, enable: bool) {
455 context.call.invoke(SysCall::TrapExit(enable)).await;
456}
457
458async fn do_watch(context: &mut ActorContext, peer: Address) -> WatchRef {
459 let this = context.address;
460 let (reply_tx, reply_rx) = oneshot::channel();
461 context
462 .call
463 .invoke(SysCall::Watch {
464 sender: this,
465 receiver: peer,
466 reply_tx,
467 })
468 .await;
469 reply_rx.await.expect("sys-call remained unanswered")
470}
471
472async fn do_unwatch(context: &mut ActorContext, watch_ref: WatchRef) {
473 let this = context.address;
474 context
475 .call
476 .invoke(SysCall::Unwatch {
477 sender: this,
478 watch_ref,
479 })
480 .await;
481}
482
483async fn do_init_done(context: &mut ActorContext, address: Address) {
484 let message = InitAck { address };
485 let Some(ack_to_address) = context.ack_to.take() else {
486 return;
487 };
488 let envelope = Envelope::new(EnvelopeHeader::to_address(ack_to_address), message);
489 let _ = context
490 .rt_api
491 .send_to(envelope.header().to, true, envelope.into_erased());
492}
493
494fn do_send(context: &mut ActorContext, outbound: Envelope) -> Result<(), ErrorOf<SendErrorKind>> {
495 trace!("sending [outbound: {:?}]", outbound);
496 context
497 .rt_api
498 .send_to(outbound.header().to, false, outbound)
499 .map_err(|k| ErrorOf::new(k, ""))
500}
501
502fn do_forward(
503 context: &mut ActorContext,
504 to: Address,
505 outbound: Envelope,
506) -> Result<(), ErrorOf<SendErrorKind>> {
507 trace!("forwarding [to: {}, outbound: {:?}]", to, outbound);
508 context
509 .rt_api
510 .send_to(to, false, outbound)
511 .map_err(|k| ErrorOf::new(k, ""))
512}