1use std::{future::Future, sync::Arc, time::Duration};
2
3use futures::{future::join_all, TryFutureExt};
4use tokio::{
5 pin, select,
6 time::{sleep, timeout, Instant},
7};
8use tracing::{error, info, level_filters::LevelFilter, warn};
9
10use crate::{self as elfo};
11use elfo_macros::{message, msg_raw as msg};
12
13use crate::{
14 actor::{Actor, ActorMeta, ActorStatus},
15 addr::Addr,
16 config::SystemConfig,
17 context::Context,
18 demux::Demux,
19 errors::{RequestError, StartError},
20 memory_tracker::MemoryTracker,
21 message,
22 messages::{Ping, Terminate, UpdateConfig},
23 object::Object,
24 scope::{Scope, ScopeGroupShared},
25 signal::{Signal, SignalKind},
26 subscription::SubscriptionManager,
27 time::Interval,
28 topology::Topology,
29 tracing::TraceId,
30};
31
32type Result<T, E = StartError> = std::result::Result<T, E>;
33
34async fn start_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
35 let futures = topology
36 .actor_groups()
37 .filter(|group| group.is_entrypoint)
38 .map(|group| {
39 let config = Default::default();
40 ctx.request_to(group.addr, UpdateConfig { config })
41 .resolve()
42 .or_else(|err| async move {
43 match err {
44 RequestError::Ignored => Ok(Ok(())),
45 _ => Err(StartError::Other(
46 "initial messages cannot be delivered".into(),
47 )),
48 }
49 })
50 });
51
52 let error_count = futures::future::try_join_all(futures)
53 .await?
54 .into_iter()
55 .filter_map(Result::err)
56 .count();
57
58 if error_count == 0 {
59 Ok(())
60 } else {
61 Err(StartError::InvalidConfig)
62 }
63}
64
65async fn wait_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
66 let futures = topology
67 .actor_groups()
68 .filter(|group| group.is_entrypoint)
69 .map(|group| ctx.request_to(group.addr, Ping).resolve());
70
71 futures::future::try_join_all(futures)
72 .await
73 .map_err(|_| StartError::Other("entrypoint cannot start".into()))?;
74
75 Ok(())
76}
77
78pub async fn start(topology: Topology) {
85 try_start(topology).await.expect("cannot start")
86}
87
88pub async fn try_start(topology: Topology) -> Result<()> {
90 let res = do_start(topology, termination).await;
91
92 if res.is_err() {
93 sleep(Duration::from_millis(500)).await;
95 }
96
97 res
98}
99
100#[doc(hidden)]
101pub async fn do_start<F: Future>(
102 topology: Topology,
103 f: impl FnOnce(Context, Topology) -> F,
104) -> Result<F::Output> {
105 message::init();
106
107 let entry = topology.book.vacant_entry();
108 let addr = entry.addr();
109 let ctx = Context::new(topology.book.clone(), Demux::default()).with_addr(addr);
110
111 let meta = Arc::new(ActorMeta {
112 group: "system.init".into(),
113 key: "_".into(), });
115
116 let actor = Actor::new(
118 meta.clone(),
119 addr,
120 Default::default(),
121 Arc::new(SubscriptionManager::new(ctx.clone())),
122 );
123
124 let scope_shared = ScopeGroupShared::new(addr);
125 let mut config = SystemConfig::default();
126 config.logging.max_level = LevelFilter::INFO;
127 scope_shared.configure(&config);
128
129 let scope = Scope::new(TraceId::generate(), addr, meta, Arc::new(scope_shared));
130 scope.clone().sync_within(|| actor.on_start()); entry.insert(Object::new(addr, actor));
132
133 let f = async move {
134 start_entrypoints(&ctx, &topology).await?;
135 wait_entrypoints(&ctx, &topology).await?;
136 Ok(f(ctx, topology).await)
137 };
138 scope.within(f).await
139}
140
141#[message(elfo = crate)]
142struct TerminateSystem;
143
144#[message(elfo = crate)]
145struct CheckMemoryUsageTick;
146
147const SEND_CLOSING_TERMINATE_AFTER: Duration = Duration::from_secs(30);
149const STOP_GROUP_TERMINATION_AFTER: Duration = Duration::from_secs(45);
150const MAX_MEMORY_USAGE_RATIO: f64 = 0.9;
151const CHECK_MEMORY_USAGE_INTERVAL: Duration = Duration::from_secs(7);
152
153async fn termination(ctx: Context, topology: Topology) {
154 let term_signal = Signal::new(SignalKind::Terminate, || TerminateSystem);
155 let ctrl_c_signal = Signal::new(SignalKind::CtrlC, || TerminateSystem);
156 let memory_usage_interval = Interval::new(|| CheckMemoryUsageTick);
157
158 let mut ctx = ctx
159 .with(&term_signal)
160 .with(&ctrl_c_signal)
161 .with(&memory_usage_interval);
162
163 let memory_tracker = match MemoryTracker::new(MAX_MEMORY_USAGE_RATIO) {
164 Ok(tracker) => {
165 memory_usage_interval.set_period(CHECK_MEMORY_USAGE_INTERVAL);
166 Some(tracker)
167 }
168 Err(err) => {
169 warn!(error = %err, "memory tracker is unavailable, disabled");
170 None
171 }
172 };
173
174 while let Some(envelope) = ctx.recv().await {
175 msg!(match envelope {
176 TerminateSystem => break, CheckMemoryUsageTick => {
178 match memory_tracker.as_ref().map(|mt| mt.check()) {
179 Some(Ok(true)) | None => {}
180 Some(Ok(false)) => {
181 error!("maximum memory usage is reached, forcibly terminating");
182 let _ = ctx.try_send_to(ctx.addr(), TerminateSystem);
183 }
184 Some(Err(err)) => {
185 error!(error = %err, "memory tracker cannot check memory usage");
186 }
187 }
188 }
189 });
190 }
191
192 ctx.set_status(ActorStatus::TERMINATING);
193
194 let termination = do_termination(ctx.pruned(), topology);
195 pin!(termination);
196
197 loop {
198 select! {
199 _ = &mut termination => return,
200 Some(envelope) = ctx.recv() => {
201 if envelope.is::<TerminateSystem>() {
205 return;
206 }
207 }
208 }
209 }
210}
211
212async fn do_termination(ctx: Context, topology: Topology) {
213 info!("terminating user actor groups");
214 terminate_groups(&ctx, &topology, true).await;
215 info!("terminating system actor groups");
216 terminate_groups(&ctx, &topology, false).await;
217 info!("system terminated");
218}
219
220async fn terminate_groups(ctx: &Context, topology: &Topology, user: bool) {
221 let futures = topology
223 .actor_groups()
224 .filter(|group| user ^ group.name.starts_with("system."))
225 .map(|group| async move {
226 select! {
227 _ = terminate_group(ctx, group.addr, group.name.clone()) => {},
228 _ = watch_group(ctx, group.addr, group.name) => {},
229 }
230 })
231 .collect::<Vec<_>>();
232
233 join_all(futures).await;
234}
235
236async fn terminate_group(ctx: &Context, addr: Addr, name: String) {
237 let start_time = Instant::now();
238
239 info!(group = %name, "sending polite Terminate");
242 let fut = ctx.send_to(addr, Terminate::default());
243
244 if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() {
245 let elapsed = start_time.elapsed();
246 if let Some(delta) = SEND_CLOSING_TERMINATE_AFTER.checked_sub(elapsed) {
247 sleep(delta).await;
248 }
249 } else {
250 warn!(
251 group = %name,
252 "failed to deliver polite Terminate, some actors are too busy"
253 );
254 }
255
256 warn!(
259 group = %name,
260 "actor group hasn't finished yet, sending closing terminate"
261 );
262 let fut = ctx.send_to(addr, Terminate::closing());
263
264 if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() {
265 let elapsed = start_time.elapsed();
266 if let Some(delta) = STOP_GROUP_TERMINATION_AFTER.checked_sub(elapsed) {
267 sleep(delta).await;
268 }
269 } else {
270 warn!(
271 group = %name,
272 "failed to deliver closing Terminate"
273 );
274 }
275
276 error!(group = %name, "failed to terminate an actor group");
277}
278
279async fn watch_group(ctx: &Context, addr: Addr, name: String) {
280 ctx.finished(addr).await;
281 info!(group = %name, "actor group finished");
282}