1use std::{fmt::Debug, future::Future, marker::PhantomData, sync::Arc};
2
3use futures::future::BoxFuture;
4
5use crate::{
6 addr::{NodeLaunchId, NodeNo},
7 config::Config,
8 context::Context,
9 envelope::Envelope,
10 exec::{Exec, ExecResult},
11 object::{GroupHandle, GroupVisitor, Object},
12 restarting::RestartPolicy,
13 routers::Router,
14 runtime::RuntimeManager,
15 supervisor::Supervisor,
16};
17
18#[derive(Debug)]
19pub struct ActorGroup<R, C> {
20 restart_policy: RestartPolicy,
21 termination_policy: TerminationPolicy,
22 stop_order: i8,
23 router: R,
24 _config: PhantomData<C>,
25}
26
27impl ActorGroup<(), ()> {
28 #[allow(clippy::new_without_default)]
29 pub fn new() -> Self {
30 Self {
31 restart_policy: RestartPolicy::default(),
32 termination_policy: TerminationPolicy::default(),
33 router: (),
34 stop_order: 0,
35 _config: PhantomData,
36 }
37 }
38}
39
40impl<R, C> ActorGroup<R, C> {
41 pub fn config<C1: Config>(self) -> ActorGroup<R, C1> {
42 ActorGroup {
43 restart_policy: self.restart_policy,
44 termination_policy: self.termination_policy,
45 router: self.router,
46 stop_order: self.stop_order,
47 _config: PhantomData,
48 }
49 }
50
51 pub fn restart_policy(mut self, policy: RestartPolicy) -> Self {
55 self.restart_policy = policy;
56 self
57 }
58
59 pub fn termination_policy(mut self, policy: TerminationPolicy) -> Self {
63 self.termination_policy = policy;
64 self
65 }
66
67 pub fn router<R1: Router<C>>(self, router: R1) -> ActorGroup<R1, C> {
69 ActorGroup {
70 restart_policy: self.restart_policy,
71 termination_policy: self.termination_policy,
72 router,
73 stop_order: self.stop_order,
74 _config: self._config,
75 }
76 }
77
78 #[instability::unstable]
91 pub fn stop_order(mut self, stop_order: i8) -> Self {
92 self.stop_order = stop_order;
93 self
94 }
95
96 pub fn exec<X, O, ER>(self, exec: X) -> Blueprint
102 where
103 R: Router<C>,
104 X: Fn(Context<C, R::Key>) -> O + Send + Sync + 'static,
105 O: Future<Output = ER> + Send + 'static,
106 ER: ExecResult,
107 C: Config,
108 {
109 let mount = move |ctx: Context,
110 node_no: NodeNo,
111 node_launch_id: NodeLaunchId,
112 name: String,
113 rt_manager: RuntimeManager| {
114 let addr = ctx.group();
115 let sv = Arc::new(Supervisor::new(
116 ctx,
117 node_no,
118 node_launch_id,
119 name,
120 exec,
121 self.router,
122 self.restart_policy,
123 self.termination_policy,
124 rt_manager,
125 ));
126
127 Object::new(addr, Box::new(Handle(sv)) as Box<dyn GroupHandle>)
128 };
129
130 Blueprint {
131 mount: Box::new(mount),
132 stop_order: self.stop_order,
133 }
134 }
135}
136
137struct Handle<R: Router<C>, C, X>(Arc<Supervisor<R, C, X>>);
138
139impl<R, C, X> GroupHandle for Handle<R, C, X>
140where
141 R: Router<C>,
142 X: Exec<Context<C, R::Key>>,
143 <X::Output as Future>::Output: ExecResult,
144 C: Config,
145{
146 fn handle(&self, envelope: Envelope, visitor: &mut dyn GroupVisitor) {
147 self.0.handle(envelope, visitor)
148 }
149
150 fn finished(&self) -> BoxFuture<'static, ()> {
151 self.0.finished()
152 }
153}
154
155pub struct Blueprint {
156 pub(crate) mount:
157 Box<dyn FnOnce(Context, NodeNo, NodeLaunchId, String, RuntimeManager) -> Object>,
158 pub(crate) stop_order: i8,
159}
160
161#[derive(Debug, Clone)]
163pub struct TerminationPolicy {
164 pub(crate) stop_spawning: bool,
165 pub(crate) close_mailbox: bool,
166}
167
168impl Default for TerminationPolicy {
169 fn default() -> Self {
170 Self::closing()
171 }
172}
173
174impl TerminationPolicy {
175 pub fn closing() -> Self {
182 Self {
183 stop_spawning: true,
184 close_mailbox: true,
185 }
186 }
187
188 pub fn manually() -> Self {
193 Self {
194 stop_spawning: true,
195 close_mailbox: false,
196 }
197 }
198
199 }