Skip to main content

elfo_core/
group.rs

1use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc};
2
3use futures::future::BoxFuture;
4
5use crate::{
6    addr::{NodeLaunchId, NodeNo},
7    config::Config,
8    context::Context,
9    envelope::Envelope,
10    exec::{Exec, ExecResult},
11    object::{GroupHandle, GroupVisitor, Object},
12    restarting::RestartPolicy,
13    routers::Router,
14    runtime::RuntimeManager,
15    supervisor::Supervisor,
16};
17
18#[derive(Debug)]
19pub struct ActorGroup<R, C> {
20    restart_policy: RestartPolicy,
21    termination_policy: TerminationPolicy,
22    stop_order: i8,
23    router: R,
24    _config: PhantomData<C>,
25}
26
27impl ActorGroup<(), ()> {
28    #[allow(clippy::new_without_default)]
29    pub fn new() -> Self {
30        Self {
31            restart_policy: RestartPolicy::default(),
32            termination_policy: TerminationPolicy::default(),
33            router: (),
34            stop_order: 0,
35            _config: PhantomData,
36        }
37    }
38}
39
40impl<R, C> ActorGroup<R, C> {
41    pub fn config<C1: Config>(self) -> ActorGroup<R, C1> {
42        ActorGroup {
43            restart_policy: self.restart_policy,
44            termination_policy: self.termination_policy,
45            router: self.router,
46            stop_order: self.stop_order,
47            _config: PhantomData,
48        }
49    }
50
51    /// The behaviour on actor termination.
52    ///
53    /// `RestartPolicy::never` is used by default.
54    pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
55        self.restart_policy = policy;
56        self
57    }
58
59    /// The behaviour on the `Terminate` message.
60    ///
61    /// `TerminationPolicy::closing` is used by default.
62    pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self {
63        self.termination_policy = policy;
64        self
65    }
66
67    /// Installs a router.
68    pub fn router<R1: Router<C>>(self, router: R1) -> ActorGroup<R1, C> {
69        ActorGroup {
70            restart_policy: self.restart_policy,
71            termination_policy: self.termination_policy,
72            router,
73            stop_order: self.stop_order,
74            _config: self._config,
75        }
76    }
77
78    /// Specifies the order of stopping among other groups.
79    ///
80    /// Actors in groups with lower values are stopped first.
81    /// Actors in groups with higher values start stopping when all actors in
82    /// groups with lower values are stopped or timeout is reached.
83    /// It also means that a lot of different values of this parameter among
84    /// groups can lead to longer shutdown time of the node. In some
85    /// environment (e.g. systemd) there is a hard limit on the shutdown
86    /// time, thus the node can be forcibly terminated (by SIGKILL) before all
87    /// actors are stopped gracefully.
88    ///
89    /// `0` by default.
90    #[instability::unstable]
91    pub fn stop_order(mut self, stop_order: i8) -> Self {
92        self.stop_order = stop_order;
93        self
94    }
95
96    /// Builds the group with the specified executor function.
97    ///
98    /// The provided closure must return a future resolving to
99    /// `()`, `!` or `Result<(), E>`, where `E` should be convertible to
100    /// `Box<dyn Error>`, so it works with `anyhow::Error`, `eyre::Report` etc.
101    pub fn exec<X, O, ER>(self, exec: X) -> Blueprint
102    where
103        R: Router<C>,
104        X: Fn(Context<C, R::Key>) -> O + Send + Sync + 'static,
105        O: Future<Output = ER> + Send + 'static,
106        ER: ExecResult,
107        C: Config,
108    {
109        let mount = move |ctx: Context,
110                          node_no: NodeNo,
111                          node_launch_id: NodeLaunchId,
112                          name: String,
113                          rt_manager: RuntimeManager| {
114            let addr = ctx.group();
115            let sv = Arc::new(Supervisor::new(
116                ctx,
117                node_no,
118                node_launch_id,
119                name,
120                exec,
121                self.router,
122                self.restart_policy,
123                self.termination_policy,
124                rt_manager,
125            ));
126
127            Object::new(addr, Box::new(Handle(sv)) as Box<dyn GroupHandle>)
128        };
129
130        Blueprint {
131            mount: Box::new(mount),
132            stop_order: self.stop_order,
133        }
134    }
135}
136
137struct Handle<R: Router<C>, C, X>(Arc<Supervisor<R, C, X>>);
138
139impl<R, C, X> GroupHandle for Handle<R, C, X>
140where
141    R: Router<C>,
142    X: Exec<Context<C, R::Key>>,
143    <X::Output as Future>::Output: ExecResult,
144    C: Config,
145{
146    fn handle(&self, envelope: Envelope, visitor: &mut dyn GroupVisitor) {
147        self.0.handle(envelope, visitor)
148    }
149
150    fn finished(&self) -> BoxFuture<'static, ()> {
151        self.0.finished()
152    }
153}
154
155pub struct Blueprint {
156    pub(crate) mount:
157        Box<dyn FnOnce(Context, NodeNo, NodeLaunchId, String, RuntimeManager) -> Object>,
158    pub(crate) stop_order: i8,
159}
160
161/// The behaviour on the `Terminate` message.
162#[derive(Debug, Clone)]
163pub struct TerminationPolicy {
164    pub(crate) stop_spawning: bool,
165    pub(crate) close_mailbox: bool,
166}
167
168impl Default for TerminationPolicy {
169    fn default() -> Self {
170        Self::closing()
171    }
172}
173
174impl TerminationPolicy {
175    /// On `Terminate`:
176    /// * A supervisor stops spawning new actors.
177    /// * New messages are not accepted more.
178    /// * Mailboxes are closed.
179    ///
180    /// This behaviour is used by default.
181    pub fn closing() -> Self {
182        Self {
183            stop_spawning: true,
184            close_mailbox: true,
185        }
186    }
187
188    /// On `Terminate`:
189    /// * A supervisor stops spawning new actors.
190    /// * The `Terminate` message can be handled by actors manually.
191    /// * Mailboxes receive messages (use `Context::close()` to stop it).
192    pub fn manually() -> Self {
193        Self {
194            stop_spawning: true,
195            close_mailbox: false,
196        }
197    }
198
199    // TODO: add `stop_spawning`?
200}