1use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc};
2
3use crate::{
4 config::Config,
5 context::Context,
6 exec::ExecResult,
7 object::{Group, Object},
8 routers::Router,
9 runtime::RuntimeManager,
10 supervisor::Supervisor,
11};
12
13#[derive(Debug)]
14pub struct ActorGroup<R, C> {
15 restart_policy: RestartPolicy,
16 termination_policy: TerminationPolicy,
17 router: R,
18 _config: PhantomData<C>,
19}
20
21impl ActorGroup<(), ()> {
22 #[allow(clippy::new_without_default)]
23 pub fn new() -> Self {
24 Self {
25 restart_policy: RestartPolicy::default(),
26 termination_policy: TerminationPolicy::default(),
27 router: (),
28 _config: PhantomData,
29 }
30 }
31}
32
33impl<R, C> ActorGroup<R, C> {
34 pub fn config<C1: Config>(self) -> ActorGroup<R, C1> {
35 ActorGroup {
36 restart_policy: self.restart_policy,
37 termination_policy: self.termination_policy,
38 router: self.router,
39 _config: PhantomData,
40 }
41 }
42
43 pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
46 self.restart_policy = policy;
47 self
48 }
49
50 pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self {
53 self.termination_policy = policy;
54 self
55 }
56
57 pub fn router<R1: Router<C>>(self, router: R1) -> ActorGroup<R1, C> {
58 ActorGroup {
59 restart_policy: self.restart_policy,
60 termination_policy: self.termination_policy,
61 router,
62 _config: self._config,
63 }
64 }
65
66 pub fn exec<X, O, ER>(self, exec: X) -> Schema
67 where
68 R: Router<C>,
69 X: Fn(Context<C, R::Key>) -> O + Send + Sync + 'static,
70 O: Future<Output = ER> + Send + 'static,
71 ER: ExecResult,
72 C: Config,
73 {
74 let run = move |ctx: Context, name: String, rt_manager: RuntimeManager| {
75 let addr = ctx.addr();
76 let sv = Arc::new(Supervisor::new(
77 ctx,
78 name,
79 exec,
80 self.router,
81 self.restart_policy,
82 self.termination_policy,
83 rt_manager,
84 ));
85 let sv1 = sv.clone();
86 let router = Box::new(move |envelope| sv.handle(envelope));
87 let finished = Box::new(move || sv1.finished());
88 Object::new(addr, Group::new(router, finished))
89 };
90
91 Schema { run: Box::new(run) }
92 }
93}
94
95pub struct Schema {
96 pub(crate) run: Box<dyn FnOnce(Context, String, RuntimeManager) -> Object>,
97}
98
99#[derive(Debug, Clone)]
101pub struct TerminationPolicy {
102 pub(crate) stop_spawning: bool,
103 pub(crate) close_mailbox: bool,
104}
105
106impl Default for TerminationPolicy {
107 fn default() -> Self {
108 Self::closing()
109 }
110}
111
112impl TerminationPolicy {
113 pub fn closing() -> Self {
120 Self {
121 stop_spawning: true,
122 close_mailbox: true,
123 }
124 }
125
126 pub fn manually() -> Self {
131 Self {
132 stop_spawning: true,
133 close_mailbox: false,
134 }
135 }
136
137 }
139
140#[derive(Debug, Clone)]
142pub struct RestartPolicy {
143 pub(crate) mode: RestartMode,
144}
145
146impl Default for RestartPolicy {
147 fn default() -> Self {
148 Self::on_failures()
149 }
150}
151
152#[derive(Debug, Clone)]
153pub(crate) enum RestartMode {
154 Always,
155 OnFailures,
156 Never,
157}
158
159impl RestartPolicy {
160 pub fn always() -> Self {
161 Self {
162 mode: RestartMode::Always,
163 }
164 }
165
166 pub fn on_failures() -> Self {
167 Self {
168 mode: RestartMode::OnFailures,
169 }
170 }
171
172 pub fn never() -> Self {
173 Self {
174 mode: RestartMode::Never,
175 }
176 }
177}