1use std::{
2 future::Future,
3 sync::Arc,
4 time::{Duration, SystemTime},
5};
6
7use futures::future::join_all;
8use tokio::{
9 pin, select,
10 time::{sleep, timeout},
11};
12use tracing::{error, info, level_filters::LevelFilter, warn};
13
14use elfo_utils::time::Instant;
15
16#[cfg(target_os = "linux")]
17use crate::{
18 memory_tracker::{MemoryCheckResult, MemoryTracker},
19 time::Interval,
20};
21
22use crate::{
23 actor::{Actor, ActorMeta, ActorStartInfo},
24 actor_status::ActorStatus,
25 addr::{Addr, GroupNo},
26 config::SystemConfig,
27 context::Context,
28 demux::Demux,
29 errors::{RequestError, StartError, StartGroupError},
30 message,
31 messages::{StartEntrypoint, Terminate, TerminateReason, UpdateConfig},
32 msg,
33 object::Object,
34 scope::{Scope, ScopeGroupShared},
35 signal::{Signal, SignalKind},
36 subscription::SubscriptionManager,
37 topology::{Topology, SYSTEM_INIT_GROUP_NO},
38 tracing::TraceId,
39};
40
41const INIT_GROUP_NAME: &str = "system.init";
42
43type Result<T, E = StartError> = std::result::Result<T, E>;
44
45async fn start_entrypoints(ctx: &Context, topology: &Topology, is_check_only: bool) -> Result<()> {
46 let futures = topology
47 .locals()
48 .filter(|group| group.is_entrypoint)
49 .map(|group| async move {
50 let response = ctx
51 .request_to(
52 group.addr,
53 UpdateConfig {
54 config: Default::default(),
55 },
56 )
57 .resolve()
58 .await;
59 match response {
60 Ok(Ok(())) => Ok(()),
61 Ok(Err(e)) => Err(StartError::single(group.name.clone(), e.reason)),
62 Err(RequestError::Ignored) | Err(RequestError::Failed) => Err(StartError::single(
63 group.name.clone(),
64 "config cannot be delivered to the entrypoint".into(),
65 )),
66 }?;
67
68 let response = ctx
69 .request_to(group.addr, StartEntrypoint { is_check_only })
70 .resolve()
71 .await;
72 match response {
73 Ok(Ok(())) => Ok(()),
74 Ok(Err(e)) => {
75 let group_errors: Vec<StartGroupError> = e
76 .errors
77 .into_iter()
78 .map(|e| StartGroupError {
79 group: e.group,
80 reason: e.reason,
81 })
82 .collect();
83 Err(StartError::multiple(group_errors))
84 }
85 Err(RequestError::Ignored) | Err(RequestError::Failed) => Err(StartError::single(
86 group.name,
87 "starting message cannot be delivered to the entrypoint".into(),
88 )),
89 }
90 });
91
92 let errors: Vec<StartGroupError> = futures::future::join_all(futures)
93 .await
94 .into_iter()
95 .filter_map(Result::err)
96 .flat_map(|e| e.errors.into_iter())
97 .collect();
98
99 if errors.is_empty() {
100 Ok(())
101 } else {
102 Err(StartError::multiple(errors))
103 }
104}
105
106pub async fn start(topology: Topology) {
113 try_start(topology).await.expect("cannot start")
114}
115
116pub async fn try_start(topology: Topology) -> Result<()> {
118 check_messages_uniqueness()?;
119
120 #[cfg(feature = "test-util")]
121 warn!("elfo is compiled with `test-util` feature, it may affect performance");
122
123 let res = do_start(topology, false, exec).await;
124
125 if res.is_err() {
126 sleep(Duration::from_millis(500)).await;
128 }
129
130 res
131}
132
133pub async fn check_only(topology: Topology) -> Result<()> {
136 check_messages_uniqueness()?;
137
138 do_start(topology, true, |ctx, topology| {
141 terminate(ctx, topology, TerminateReason::Unknown)
142 })
143 .await
144}
145
146#[instability::unstable]
152pub fn check_messages_uniqueness() -> Result<()> {
153 message::check_uniqueness().map_err(|duplicates| {
154 let errors = duplicates
155 .into_iter()
156 .map(|(protocol, name)| StartGroupError {
157 group: INIT_GROUP_NAME.into(),
158 reason: format!("message `{protocol}/{name}` is defined several times"),
159 })
160 .collect();
161
162 StartError::multiple(errors)
163 })
164}
165
166#[doc(hidden)]
167pub async fn do_start<F: Future>(
168 topology: Topology,
169 is_check_only: bool,
170 and_then: impl FnOnce(Context, Topology) -> F,
171) -> Result<F::Output> {
172 instant_clock_calibration();
173
174 let group_no = GroupNo::new(SYSTEM_INIT_GROUP_NO, topology.launch_id()).unwrap();
175 let entry = topology.book.vacant_entry(group_no);
176 let addr = entry.addr();
177 let ctx = Context::new(topology.book.clone(), Demux::default());
178
179 let meta = Arc::new(ActorMeta {
180 group: INIT_GROUP_NAME.into(),
181 key: "_".into(), });
183
184 let actor = Actor::new(
186 meta.clone(),
187 addr,
188 &<_>::default(),
189 <_>::default(),
190 Arc::new(SubscriptionManager::new(ctx.clone())),
191 );
192
193 let scope_shared = ScopeGroupShared::new(topology.node_no(), topology.launch_id(), addr);
194 let mut config = SystemConfig::default();
195 config.logging.max_level = LevelFilter::INFO;
196 scope_shared.configure(&config);
197
198 let scope = Scope::new(TraceId::generate(), addr, meta, Arc::new(scope_shared));
199 scope.clone().sync_within(|| actor.on_start()); entry.insert(Object::new(addr, actor));
201
202 let ctx = ctx
204 .with_addr(addr)
205 .with_start_info(ActorStartInfo::on_group_mounted());
206
207 let init = async move {
208 start_entrypoints(&ctx, &topology, is_check_only).await?;
209 Ok(and_then(ctx, topology).await)
210 };
211 scope.within(init).await
212}
213
214#[message]
215struct TerminateSystem(TerminateReason);
216
217#[message]
218struct CheckMemoryUsageTick;
219
220const SEND_CLOSING_TERMINATE_AFTER: Duration = Duration::from_secs(25);
222const STOP_GROUP_TERMINATION_AFTER: Duration = Duration::from_secs(35);
223
224async fn exec(mut ctx: Context, topology: Topology) {
225 emit_start_time();
226
227 ctx.attach(Signal::new(
228 SignalKind::UnixTerminate,
229 TerminateSystem(TerminateReason::Signal(SignalKind::UnixTerminate)),
230 ));
231 ctx.attach(Signal::new(
232 SignalKind::UnixInterrupt,
233 TerminateSystem(TerminateReason::Signal(SignalKind::UnixInterrupt)),
234 ));
235 ctx.attach(Signal::new(
236 SignalKind::WindowsCtrlC,
237 TerminateSystem(TerminateReason::Signal(SignalKind::WindowsCtrlC)),
238 ));
239
240 #[cfg(target_os = "linux")]
241 let memory_tracker = {
242 const MAX_MEMORY_USAGE_RATIO: f64 = 0.9;
243 const CHECK_MEMORY_USAGE_INTERVAL: Duration = Duration::from_secs(3);
244
245 match MemoryTracker::new(MAX_MEMORY_USAGE_RATIO) {
246 Ok(tracker) => {
247 ctx.attach(Interval::new(CheckMemoryUsageTick))
248 .start(CHECK_MEMORY_USAGE_INTERVAL);
249 Some(tracker)
250 }
251 Err(err) => {
252 warn!(error = %err, "memory tracker is unavailable, disabled");
253 None
254 }
255 }
256 };
257
258 let mut oom_prevented = false;
259 let mut terminate_reason = TerminateReason::Unknown;
260
261 while let Some(envelope) = ctx.recv().await {
262 msg!(match &envelope {
263 TerminateSystem(reason) => {
264 terminate_reason = reason.clone();
265 break;
266 }
267 });
268
269 #[cfg(target_os = "linux")]
270 if envelope.is::<CheckMemoryUsageTick>() {
271 match memory_tracker.as_ref().map(|mt| mt.check()) {
272 Some(Ok(MemoryCheckResult::Passed)) | None => {}
273 Some(Ok(MemoryCheckResult::Failed(stats))) => {
274 let percents_of_total =
275 |x| ((x as f64) / (stats.total as f64) * 100.).round() as u64;
276 let used = percents_of_total(stats.used);
277 let available = percents_of_total(stats.available);
278 error!(
279 total = stats.total,
280 used_pct = used,
281 available_pct = available,
282 "maximum memory usage is reached, forcibly terminating"
283 );
284
285 let _ = ctx.try_send_to(ctx.addr(), TerminateSystem(TerminateReason::Oom));
286 oom_prevented = true;
287 }
288 Some(Err(err)) => {
289 warn!(error = %err, "memory tracker cannot check memory usage");
290 }
291 }
292 }
293 }
294
295 ctx.set_status(ActorStatus::TERMINATING);
296
297 let termination = terminate(ctx.pruned(), topology, terminate_reason);
298 pin!(termination);
299
300 loop {
301 select! {
302 _ = &mut termination => return,
303 Some(envelope) = ctx.recv() => {
304 if !envelope.is::<TerminateSystem>() {
305 continue;
306 }
307
308 if oom_prevented {
309 oom_prevented = false;
311 } else {
312 return;
315 }
316 }
317 }
318 }
319}
320
321async fn terminate(ctx: Context, topology: Topology, reason: TerminateReason) {
322 let mut stop_order_list = topology
323 .locals()
324 .map(|group| group.stop_order)
325 .collect::<Vec<_>>();
326
327 stop_order_list.sort_unstable();
328 stop_order_list.dedup();
329
330 for stop_order in stop_order_list {
331 info!(%stop_order, "terminating groups");
332 terminate_groups(&ctx, &topology, stop_order, reason.clone()).await;
333 }
334}
335
336async fn terminate_groups(
337 ctx: &Context,
338 topology: &Topology,
339 stop_order: i8,
340 reason: TerminateReason,
341) {
342 let futures = topology
343 .locals()
344 .filter(|group| group.stop_order == stop_order)
345 .zip(core::iter::repeat(reason))
346 .map(|(group, reason)| async move {
347 let started_at = Instant::now();
348 select! {
349 _ = terminate_group(ctx, group.addr, group.name.clone(), started_at, reason) => {},
350 _ = watch_group(ctx, group.addr, group.name, started_at) => {},
351 }
352 })
353 .collect::<Vec<_>>();
354
355 join_all(futures).await;
356}
357
358async fn terminate_group(
359 ctx: &Context,
360 addr: Addr,
361 name: String,
362 started_at: Instant,
363 reason: TerminateReason,
364) {
365 info!(group = %name, "sending polite Terminate");
368 let fut = ctx.send_to(addr, Terminate::default().with_reason(reason));
369
370 if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() {
371 let elapsed = started_at.elapsed();
372 if let Some(delta) = SEND_CLOSING_TERMINATE_AFTER.checked_sub(elapsed) {
373 sleep(delta).await;
374 }
375 } else {
376 warn!(group = %name, "failed to deliver polite Terminate, some actors are too busy");
377 }
378
379 warn!(
382 message = "actor group hasn't finished yet, sending closing Terminate",
383 group = %name,
384 elapsed = ?started_at.elapsed(),
385 );
386 let fut = ctx.send_to(addr, Terminate::closing());
387
388 if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() {
389 let elapsed = started_at.elapsed();
390 if let Some(delta) = STOP_GROUP_TERMINATION_AFTER.checked_sub(elapsed) {
391 sleep(delta).await;
392 }
393 } else {
394 warn!(group = %name, "failed to deliver closing Terminate");
395 }
396
397 error!(
398 message = "failed to terminate an actor group, skipped",
399 group = %name,
400 elapsed = ?started_at.elapsed(),
401 );
402}
403
404async fn watch_group(ctx: &Context, addr: Addr, name: String, started_at: Instant) {
405 ctx.finished(addr).await;
406
407 info!(
408 message = "actor group finished",
409 group = %name,
410 elapsed = ?started_at.elapsed(),
411 );
412}
413
414fn instant_clock_calibration() {
415 std::hint::black_box(Instant::now());
417}
418
419fn emit_start_time() {
420 metrics::register_gauge!(
421 "elfo_start_time_seconds",
422 metrics::Unit::Seconds,
423 "Start time of the elfo system since unix epoch in seconds",
424 );
425
426 let unix_time = SystemTime::now()
427 .duration_since(SystemTime::UNIX_EPOCH)
428 .unwrap_or_default()
429 .as_secs_f64();
430
431 metrics::gauge!("elfo_start_time_seconds", unix_time);
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 use tokio::sync::mpsc;
439
440 use crate::{config::AnyConfig, ActorGroup, TerminationPolicy};
441
442 #[tokio::test]
443 async fn terminate() {
444 let (tx, mut rx) = mpsc::unbounded_channel();
445
446 let group = ActorGroup::new()
447 .termination_policy(TerminationPolicy::manually())
448 .exec(move |mut ctx| {
449 let tx = tx.clone();
450 async move {
451 while let Some(envelope) = ctx.recv().await {
452 msg!(match envelope {
453 msg @ Terminate => {
454 let _ = tx.send(msg.reason);
455 break;
456 }
457 _ => {}
458 });
459 }
460 }
461 });
462
463 let topology = Topology::empty();
464 let test_terminate = topology.local("test.terminate");
465 test_terminate.mount(group);
466
467 do_start(topology, false, |ctx, topology| async move {
468 ctx.try_send_to(
469 ctx.addr(),
470 TerminateSystem(TerminateReason::Signal(SignalKind::UnixTerminate)),
471 )
472 .expect("failed to send terminate message");
473 for group in topology.locals() {
474 ctx.send_to(group.addr, UpdateConfig::new(AnyConfig::default()))
475 .await
476 .expect("failed to send update config message");
477 }
478 exec(ctx, topology).await;
479 })
480 .await
481 .expect("cannot start");
482
483 let reason = rx.recv().await.expect("failed to receive terminate reason");
484 assert_eq!(reason, TerminateReason::Signal(SignalKind::UnixTerminate));
485 }
486}