elfo_core/
group.rs

1use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc};
2
3use crate::{
4    config::Config,
5    context::Context,
6    exec::ExecResult,
7    object::{Group, Object},
8    routers::Router,
9    runtime::RuntimeManager,
10    supervisor::Supervisor,
11};
12
13#[derive(Debug)]
14pub struct ActorGroup<R, C> {
15    restart_policy: RestartPolicy,
16    termination_policy: TerminationPolicy,
17    router: R,
18    _config: PhantomData<C>,
19}
20
21impl ActorGroup<(), ()> {
22    #[allow(clippy::new_without_default)]
23    pub fn new() -> Self {
24        Self {
25            restart_policy: RestartPolicy::default(),
26            termination_policy: TerminationPolicy::default(),
27            router: (),
28            _config: PhantomData,
29        }
30    }
31}
32
33impl<R, C> ActorGroup<R, C> {
34    pub fn config<C1: Config>(self) -> ActorGroup<R, C1> {
35        ActorGroup {
36            restart_policy: self.restart_policy,
37            termination_policy: self.termination_policy,
38            router: self.router,
39            _config: PhantomData,
40        }
41    }
42
43    /// The behaviour on actor termination.
44    /// `RestartPolicy::on_failures` is used by default.
45    pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
46        self.restart_policy = policy;
47        self
48    }
49
50    /// The behaviour on the `Terminate` message.
51    /// `TerminationPolicy::closing` is used by default.
52    pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self {
53        self.termination_policy = policy;
54        self
55    }
56
57    pub fn router<R1: Router<C>>(self, router: R1) -> ActorGroup<R1, C> {
58        ActorGroup {
59            restart_policy: self.restart_policy,
60            termination_policy: self.termination_policy,
61            router,
62            _config: self._config,
63        }
64    }
65
66    pub fn exec<X, O, ER>(self, exec: X) -> Schema
67    where
68        R: Router<C>,
69        X: Fn(Context<C, R::Key>) -> O + Send + Sync + 'static,
70        O: Future<Output = ER> + Send + 'static,
71        ER: ExecResult,
72        C: Config,
73    {
74        let run = move |ctx: Context, name: String, rt_manager: RuntimeManager| {
75            let addr = ctx.addr();
76            let sv = Arc::new(Supervisor::new(
77                ctx,
78                name,
79                exec,
80                self.router,
81                self.restart_policy,
82                self.termination_policy,
83                rt_manager,
84            ));
85            let sv1 = sv.clone();
86            let router = Box::new(move |envelope| sv.handle(envelope));
87            let finished = Box::new(move || sv1.finished());
88            Object::new(addr, Group::new(router, finished))
89        };
90
91        Schema { run: Box::new(run) }
92    }
93}
94
95pub struct Schema {
96    pub(crate) run: Box<dyn FnOnce(Context, String, RuntimeManager) -> Object>,
97}
98
99/// The behaviour on the `Terminate` message.
100#[derive(Debug, Clone)]
101pub struct TerminationPolicy {
102    pub(crate) stop_spawning: bool,
103    pub(crate) close_mailbox: bool,
104}
105
106impl Default for TerminationPolicy {
107    fn default() -> Self {
108        Self::closing()
109    }
110}
111
112impl TerminationPolicy {
113    /// On `Terminate`:
114    /// * A supervisor stops spawning new actors.
115    /// * New messages are not accepted more.
116    /// * Mailboxes are closed.
117    ///
118    /// This behaviour is used by default.
119    pub fn closing() -> Self {
120        Self {
121            stop_spawning: true,
122            close_mailbox: true,
123        }
124    }
125
126    /// On `Terminate`:
127    /// * A supervisor stops spawning new actors.
128    /// * The `Terminate` message can be handled by actors manually.
129    /// * Mailboxes receive messages (use `Context::close()` to stop it).
130    pub fn manually() -> Self {
131        Self {
132            stop_spawning: true,
133            close_mailbox: false,
134        }
135    }
136
137    // TODO: add `stop_spawning`?
138}
139
140/// The behaviour on actor termination.
141#[derive(Debug, Clone)]
142pub struct RestartPolicy {
143    pub(crate) mode: RestartMode,
144}
145
146impl Default for RestartPolicy {
147    fn default() -> Self {
148        Self::on_failures()
149    }
150}
151
152#[derive(Debug, Clone)]
153pub(crate) enum RestartMode {
154    Always,
155    OnFailures,
156    Never,
157}
158
159impl RestartPolicy {
160    pub fn always() -> Self {
161        Self {
162            mode: RestartMode::Always,
163        }
164    }
165
166    pub fn on_failures() -> Self {
167        Self {
168            mode: RestartMode::OnFailures,
169        }
170    }
171
172    pub fn never() -> Self {
173        Self {
174            mode: RestartMode::Never,
175        }
176    }
177}