mm1_core/context/
start.rs

1use std::future::Future;
2use std::time::Duration;
3
4use mm1_address::address::Address;
5use mm1_common::errors::error_of::ErrorOf;
6use mm1_common::futures::timeout::FutureTimeoutExt;
7use mm1_proc_macros::dispatch;
8use mm1_proto_system as system;
9use mm1_proto_system::{Runnable, StartErrorKind, System};
10
11use crate::context::call::Call;
12use crate::context::fork::Fork;
13use crate::context::recv::Recv;
14use crate::context::tell::Tell;
15
16pub trait Start<Sys>:
17    Tell
18    + Recv
19    + Fork
20    + Call<Sys, system::SpawnRequest<Sys>, Outcome = system::SpawnResponse>
21    + Call<Sys, system::Kill, Outcome = bool>
22    + Call<Sys, system::Link, Outcome = ()>
23where
24    Sys: System,
25{
26    fn spawn(
27        &mut self,
28        runnable: Sys::Runnable,
29        link: bool,
30    ) -> impl Future<Output = system::SpawnResponse> + Send {
31        async move {
32            let run_at = runnable.run_at();
33            let link_to = if link { vec![self.address()] } else { vec![] };
34            let spawn_request = system::SpawnRequest {
35                runnable,
36                ack_to: None,
37                link_to,
38            };
39            self.call(run_at, spawn_request).await
40        }
41    }
42
43    fn start(
44        &mut self,
45        runnable: Sys::Runnable,
46        link: bool,
47        start_timeout: Duration,
48    ) -> impl Future<Output = Result<Address, ErrorOf<StartErrorKind>>> + Send {
49        async move {
50            let run_at = runnable.run_at();
51
52            let mut fork = self
53                .fork()
54                .await
55                .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?;
56            let link_to = if link { vec![fork.address()] } else { vec![] };
57
58            let spawn_request = system::SpawnRequest {
59                runnable,
60                ack_to: Some(fork.address()),
61                link_to,
62            };
63            let spawned_address = fork
64                .call(run_at, spawn_request)
65                .await
66                .map_err(|e| e.map_kind(StartErrorKind::Spawn))?;
67
68            let envelope = match fork.recv().timeout(start_timeout).await {
69                Err(_elapsed) => {
70                    self.call(
71                        run_at,
72                        system::Kill {
73                            peer: spawned_address,
74                        },
75                    )
76                    .await;
77
78                    // TODO: should we ensure termination with a `system::Watch`?
79
80                    return Err(ErrorOf::new(
81                        StartErrorKind::Timeout,
82                        "no init-ack within timeout",
83                    ))
84                },
85                Ok(recv_result) => {
86                    recv_result
87                        .map_err(|e| ErrorOf::new(StartErrorKind::InternalError, e.to_string()))?
88                },
89            };
90
91            dispatch!(match envelope {
92                system::InitAck { address } => {
93                    if link {
94                        self.call(run_at, system::Link { peer: address }).await;
95                    }
96                    Ok(address)
97                },
98
99                system::Exited { .. } => {
100                    Err(ErrorOf::new(
101                        StartErrorKind::Exited,
102                        "exited before init-ack",
103                    ))
104                },
105
106                unexpected @ _ => {
107                    Err(ErrorOf::new(
108                        StartErrorKind::InternalError,
109                        format!("unexpected message: {:?}", unexpected),
110                    ))
111                },
112            })
113        }
114    }
115}
116pub trait InitDone<Sys>: Call<Sys, system::InitAck, Outcome = ()>
117where
118    Sys: System + Default,
119{
120    fn init_done(&mut self, address: Address) -> impl Future<Output = ()> + Send {
121        async move { self.call(Sys::default(), system::InitAck { address }).await }
122    }
123}
124
125impl<Sys, T> Start<Sys> for T
126where
127    T: Tell
128        + Recv
129        + Fork
130        + Call<Sys, system::SpawnRequest<Sys>, Outcome = system::SpawnResponse>
131        + Call<Sys, system::Kill, Outcome = bool>
132        + Call<Sys, system::Link, Outcome = ()>,
133    Sys: System,
134{
135}
136
137impl<Sys, T> InitDone<Sys> for T
138where
139    T: Call<Sys, system::InitAck, Outcome = ()>,
140    Sys: System + Default,
141{
142}