mm1_core/context/
start.rs1use std::future::Future;
2use std::time::Duration;
3
4use mm1_address::address::Address;
5use mm1_common::errors::error_of::ErrorOf;
6use mm1_common::futures::timeout::FutureTimeoutExt;
7use mm1_proc_macros::dispatch;
8use mm1_proto_system as system;
9use mm1_proto_system::{Runnable, StartErrorKind, System};
10
11use crate::context::call::Call;
12use crate::context::fork::Fork;
13use crate::context::recv::Recv;
14use crate::context::tell::Tell;
15
16pub trait Start<Sys>:
17 Tell
18 + Recv
19 + Fork
20 + Call<Sys, system::SpawnRequest<Sys>, Outcome = system::SpawnResponse>
21 + Call<Sys, system::Kill, Outcome = bool>
22 + Call<Sys, system::Link, Outcome = ()>
23where
24 Sys: System,
25{
26 fn spawn(
27 &mut self,
28 runnable: Sys::Runnable,
29 link: bool,
30 ) -> impl Future<Output = system::SpawnResponse> + Send {
31 async move {
32 let run_at = runnable.run_at();
33 let link_to = if link { vec![self.address()] } else { vec![] };
34 let spawn_request = system::SpawnRequest {
35 runnable,
36 ack_to: None,
37 link_to,
38 };
39 self.call(run_at, spawn_request).await
40 }
41 }
42
43 fn start(
44 &mut self,
45 runnable: Sys::Runnable,
46 link: bool,
47 start_timeout: Duration,
48 ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
49 async move {
50 let run_at = runnable.run_at();
51
52 let mut fork = self
53 .fork()
54 .await
55 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
56 let link_to = if link { vec![fork.address()] } else { vec![] };
57
58 let spawn_request = system::SpawnRequest {
59 runnable,
60 ack_to: Some(fork.address()),
61 link_to,
62 };
63 let spawned_address = fork
64 .call(run_at, spawn_request)
65 .await
66 .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
67
68 let envelope = match fork.recv().timeout(start_timeout).await {
69 Err(_elapsed) => {
70 self.call(
71 run_at,
72 system::Kill {
73 peer: spawned_address,
74 },
75 )
76 .await;
77
78 return Err(ErrorOf::new(
81 StartErrorKind::Timeout,
82 "no init-ack within timeout",
83 ))
84 },
85 Ok(recv_result) => {
86 recv_result
87 .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
88 },
89 };
90
91 dispatch!(match envelope {
92 system::InitAck { address } => {
93 if link {
94 self.call(run_at, system::Link { peer: address }).await;
95 }
96 Ok(address)
97 },
98
99 system::Exited { .. } => {
100 Err(ErrorOf::new(
101 StartErrorKind::Exited,
102 "exited before init-ack",
103 ))
104 },
105
106 unexpected @ _ => {
107 Err(ErrorOf::new(
108 StartErrorKind::InternalError,
109 format!("unexpected message: {:?}", unexpected),
110 ))
111 },
112 })
113 }
114 }
115}
116pub trait InitDone<Sys>: Call<Sys, system::InitAck, Outcome = ()>
117where
118 Sys: System + Default,
119{
120 fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
121 async move { self.call(Sys::default(), system::InitAck { address }).await }
122 }
123}
124
125impl<Sys, T> Start<Sys> for T
126where
127 T: Tell
128 + Recv
129 + Fork
130 + Call<Sys, system::SpawnRequest<Sys>, Outcome = system::SpawnResponse>
131 + Call<Sys, system::Kill, Outcome = bool>
132 + Call<Sys, system::Link, Outcome = ()>,
133 Sys: System,
134{
135}
136
137impl<Sys, T> InitDone<Sys> for T
138where
139 T: Call<Sys, system::InitAck, Outcome = ()>,
140 Sys: System + Default,
141{
142}