Skip to main content

elfo_core/
init.rs

1use std::{
2    future::Future,
3    sync::Arc,
4    time::{Duration, SystemTime},
5};
6
7use futures::future::join_all;
8use tokio::{
9    pin, select,
10    time::{sleep, timeout},
11};
12use tracing::{error, info, level_filters::LevelFilter, warn};
13
14use elfo_utils::time::Instant;
15
16#[cfg(target_os = "linux")]
17use crate::{
18    memory_tracker::{MemoryCheckResult, MemoryTracker},
19    time::Interval,
20};
21
22use crate::{
23    actor::{Actor, ActorMeta, ActorStartInfo},
24    actor_status::ActorStatus,
25    addr::{Addr, GroupNo},
26    config::SystemConfig,
27    context::Context,
28    demux::Demux,
29    errors::{RequestError, StartError, StartGroupError},
30    message,
31    messages::{StartEntrypoint, Terminate, TerminateReason, UpdateConfig},
32    msg,
33    object::Object,
34    scope::{Scope, ScopeGroupShared},
35    signal::{Signal, SignalKind},
36    subscription::SubscriptionManager,
37    topology::{Topology, SYSTEM_INIT_GROUP_NO},
38    tracing::TraceId,
39};
40
41const INIT_GROUP_NAME: &str = "system.init";
42
43type Result<T, E = StartError> = std::result::Result<T, E>;
44
45async fn start_entrypoints(ctx: &Context, topology: &Topology, is_check_only: bool) -> Result<()> {
46    let futures = topology
47        .locals()
48        .filter(|group| group.is_entrypoint)
49        .map(|group| async move {
50            let response = ctx
51                .request_to(
52                    group.addr,
53                    UpdateConfig {
54                        config: Default::default(),
55                    },
56                )
57                .resolve()
58                .await;
59            match response {
60                Ok(Ok(())) => Ok(()),
61                Ok(Err(e)) => Err(StartError::single(group.name.clone(), e.reason)),
62                Err(RequestError::Ignored) | Err(RequestError::Failed) => Err(StartError::single(
63                    group.name.clone(),
64                    "config cannot be delivered to the entrypoint".into(),
65                )),
66            }?;
67
68            let response = ctx
69                .request_to(group.addr, StartEntrypoint { is_check_only })
70                .resolve()
71                .await;
72            match response {
73                Ok(Ok(())) => Ok(()),
74                Ok(Err(e)) => {
75                    let group_errors: Vec<StartGroupError> = e
76                        .errors
77                        .into_iter()
78                        .map(|e| StartGroupError {
79                            group: e.group,
80                            reason: e.reason,
81                        })
82                        .collect();
83                    Err(StartError::multiple(group_errors))
84                }
85                Err(RequestError::Ignored) | Err(RequestError::Failed) => Err(StartError::single(
86                    group.name,
87                    "starting message cannot be delivered to the entrypoint".into(),
88                )),
89            }
90        });
91
92    let errors: Vec<StartGroupError> = futures::future::join_all(futures)
93        .await
94        .into_iter()
95        .filter_map(Result::err)
96        .flat_map(|e| e.errors.into_iter())
97        .collect();
98
99    if errors.is_empty() {
100        Ok(())
101    } else {
102        Err(StartError::multiple(errors))
103    }
104}
105
106/// Starts a node with the provided topology.
107///
108/// # Panics
109///
110/// Panics if the system cannot initialize.
111/// Usually, it happens because of an invalid config.
112pub async fn start(topology: Topology) {
113    try_start(topology).await.expect("cannot start")
114}
115
116/// The same as `start()`, but returns an error rather than panics.
117pub async fn try_start(topology: Topology) -> Result<()> {
118    check_messages_uniqueness()?;
119
120    #[cfg(feature = "test-util")]
121    warn!("elfo is compiled with `test-util` feature, it may affect performance");
122
123    let res = do_start(topology, false, exec).await;
124
125    if res.is_err() {
126        // XXX: give enough time to the logger.
127        sleep(Duration::from_millis(500)).await;
128    }
129
130    res
131}
132
133/// Starts node in "check only" mode. Entrypoints are started, then the system
134/// is immediately gracefully terminated.
135pub async fn check_only(topology: Topology) -> Result<()> {
136    check_messages_uniqueness()?;
137
138    // The logger is not supposed to be initialized in this mode, so we do not wait
139    // for it before exiting.
140    do_start(topology, true, |ctx, topology| {
141        terminate(ctx, topology, TerminateReason::Unknown)
142    })
143    .await
144}
145
146/// Checks that all messages are unique by `(protocol, name)` pair.
147/// If there are duplicates, returns an error.
148///
149/// It's called automatically by `(try_)start()` and `check_only()`,
150/// but still provided in order to being called manually in service tests.
151#[instability::unstable]
152pub fn check_messages_uniqueness() -> Result<()> {
153    message::check_uniqueness().map_err(|duplicates| {
154        let errors = duplicates
155            .into_iter()
156            .map(|(protocol, name)| StartGroupError {
157                group: INIT_GROUP_NAME.into(),
158                reason: format!("message `{protocol}/{name}` is defined several times"),
159            })
160            .collect();
161
162        StartError::multiple(errors)
163    })
164}
165
166#[doc(hidden)]
167pub async fn do_start<F: Future>(
168    topology: Topology,
169    is_check_only: bool,
170    and_then: impl FnOnce(Context, Topology) -> F,
171) -> Result<F::Output> {
172    instant_clock_calibration();
173
174    let group_no = GroupNo::new(SYSTEM_INIT_GROUP_NO, topology.launch_id()).unwrap();
175    let entry = topology.book.vacant_entry(group_no);
176    let addr = entry.addr();
177    let ctx = Context::new(topology.book.clone(), Demux::default());
178
179    let meta = Arc::new(ActorMeta {
180        group: INIT_GROUP_NAME.into(),
181        key: "_".into(), // Just like `Singleton`.
182    });
183
184    // XXX: create a real group.
185    let actor = Actor::new(
186        meta.clone(),
187        addr,
188        &<_>::default(),
189        <_>::default(),
190        Arc::new(SubscriptionManager::new(ctx.clone())),
191    );
192
193    let scope_shared = ScopeGroupShared::new(topology.node_no(), topology.launch_id(), addr);
194    let mut config = SystemConfig::default();
195    config.logging.max_level = LevelFilter::INFO;
196    scope_shared.configure(&config);
197
198    let scope = Scope::new(TraceId::generate(), addr, meta, Arc::new(scope_shared));
199    scope.clone().sync_within(|| actor.on_start()); // need to emit initial metrics
200    entry.insert(Object::new(addr, actor));
201
202    // It must be called after `entry.insert()`.
203    let ctx = ctx
204        .with_addr(addr)
205        .with_start_info(ActorStartInfo::on_group_mounted());
206
207    let init = async move {
208        start_entrypoints(&ctx, &topology, is_check_only).await?;
209        Ok(and_then(ctx, topology).await)
210    };
211    scope.within(init).await
212}
213
214#[message]
215struct TerminateSystem(TerminateReason);
216
217#[message]
218struct CheckMemoryUsageTick;
219
220// TODO: make these values configurable.
221const SEND_CLOSING_TERMINATE_AFTER: Duration = Duration::from_secs(25);
222const STOP_GROUP_TERMINATION_AFTER: Duration = Duration::from_secs(35);
223
224async fn exec(mut ctx: Context, topology: Topology) {
225    emit_start_time();
226
227    ctx.attach(Signal::new(
228        SignalKind::UnixTerminate,
229        TerminateSystem(TerminateReason::Signal(SignalKind::UnixTerminate)),
230    ));
231    ctx.attach(Signal::new(
232        SignalKind::UnixInterrupt,
233        TerminateSystem(TerminateReason::Signal(SignalKind::UnixInterrupt)),
234    ));
235    ctx.attach(Signal::new(
236        SignalKind::WindowsCtrlC,
237        TerminateSystem(TerminateReason::Signal(SignalKind::WindowsCtrlC)),
238    ));
239
240    #[cfg(target_os = "linux")]
241    let memory_tracker = {
242        const MAX_MEMORY_USAGE_RATIO: f64 = 0.9;
243        const CHECK_MEMORY_USAGE_INTERVAL: Duration = Duration::from_secs(3);
244
245        match MemoryTracker::new(MAX_MEMORY_USAGE_RATIO) {
246            Ok(tracker) => {
247                ctx.attach(Interval::new(CheckMemoryUsageTick))
248                    .start(CHECK_MEMORY_USAGE_INTERVAL);
249                Some(tracker)
250            }
251            Err(err) => {
252                warn!(error = %err, "memory tracker is unavailable, disabled");
253                None
254            }
255        }
256    };
257
258    let mut oom_prevented = false;
259    let mut terminate_reason = TerminateReason::Unknown;
260
261    while let Some(envelope) = ctx.recv().await {
262        msg!(match &envelope {
263            TerminateSystem(reason) => {
264                terminate_reason = reason.clone();
265                break;
266            }
267        });
268
269        #[cfg(target_os = "linux")]
270        if envelope.is::<CheckMemoryUsageTick>() {
271            match memory_tracker.as_ref().map(|mt| mt.check()) {
272                Some(Ok(MemoryCheckResult::Passed)) | None => {}
273                Some(Ok(MemoryCheckResult::Failed(stats))) => {
274                    let percents_of_total =
275                        |x| ((x as f64) / (stats.total as f64) * 100.).round() as u64;
276                    let used = percents_of_total(stats.used);
277                    let available = percents_of_total(stats.available);
278                    error!(
279                        total = stats.total,
280                        used_pct = used,
281                        available_pct = available,
282                        "maximum memory usage is reached, forcibly terminating"
283                    );
284
285                    let _ = ctx.try_send_to(ctx.addr(), TerminateSystem(TerminateReason::Oom));
286                    oom_prevented = true;
287                }
288                Some(Err(err)) => {
289                    warn!(error = %err, "memory tracker cannot check memory usage");
290                }
291            }
292        }
293    }
294
295    ctx.set_status(ActorStatus::TERMINATING);
296
297    let termination = terminate(ctx.pruned(), topology, terminate_reason);
298    pin!(termination);
299
300    loop {
301        select! {
302            _ = &mut termination => return,
303            Some(envelope) = ctx.recv() => {
304                if !envelope.is::<TerminateSystem>() {
305                    continue;
306                }
307
308                if oom_prevented {
309                    // Skip the first signal after OOM prevented.
310                    oom_prevented = false;
311                } else {
312                    // `Ctrl-C` has been pressed again. Terminate immediately.
313                    // TODO: `Terminate::closing` on second `Ctrl-C`
314                    return;
315                }
316            }
317        }
318    }
319}
320
321async fn terminate(ctx: Context, topology: Topology, reason: TerminateReason) {
322    let mut stop_order_list = topology
323        .locals()
324        .map(|group| group.stop_order)
325        .collect::<Vec<_>>();
326
327    stop_order_list.sort_unstable();
328    stop_order_list.dedup();
329
330    for stop_order in stop_order_list {
331        info!(%stop_order, "terminating groups");
332        terminate_groups(&ctx, &topology, stop_order, reason.clone()).await;
333    }
334}
335
336async fn terminate_groups(
337    ctx: &Context,
338    topology: &Topology,
339    stop_order: i8,
340    reason: TerminateReason,
341) {
342    let futures = topology
343        .locals()
344        .filter(|group| group.stop_order == stop_order)
345        .zip(core::iter::repeat(reason))
346        .map(|(group, reason)| async move {
347            let started_at = Instant::now();
348            select! {
349                _ = terminate_group(ctx, group.addr, group.name.clone(), started_at, reason) => {},
350                _ = watch_group(ctx, group.addr, group.name, started_at) => {},
351            }
352        })
353        .collect::<Vec<_>>();
354
355    join_all(futures).await;
356}
357
358async fn terminate_group(
359    ctx: &Context,
360    addr: Addr,
361    name: String,
362    started_at: Instant,
363    reason: TerminateReason,
364) {
365    // Terminate::default
366
367    info!(group = %name, "sending polite Terminate");
368    let fut = ctx.send_to(addr, Terminate::default().with_reason(reason));
369
370    if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() {
371        let elapsed = started_at.elapsed();
372        if let Some(delta) = SEND_CLOSING_TERMINATE_AFTER.checked_sub(elapsed) {
373            sleep(delta).await;
374        }
375    } else {
376        warn!(group = %name, "failed to deliver polite Terminate, some actors are too busy");
377    }
378
379    // Terminate::closing
380
381    warn!(
382        message = "actor group hasn't finished yet, sending closing Terminate",
383        group = %name,
384        elapsed = ?started_at.elapsed(),
385    );
386    let fut = ctx.send_to(addr, Terminate::closing());
387
388    if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() {
389        let elapsed = started_at.elapsed();
390        if let Some(delta) = STOP_GROUP_TERMINATION_AFTER.checked_sub(elapsed) {
391            sleep(delta).await;
392        }
393    } else {
394        warn!(group = %name, "failed to deliver closing Terminate");
395    }
396
397    error!(
398        message = "failed to terminate an actor group, skipped",
399        group = %name,
400        elapsed = ?started_at.elapsed(),
401    );
402}
403
404async fn watch_group(ctx: &Context, addr: Addr, name: String, started_at: Instant) {
405    ctx.finished(addr).await;
406
407    info!(
408        message = "actor group finished",
409        group = %name,
410        elapsed = ?started_at.elapsed(),
411    );
412}
413
414fn instant_clock_calibration() {
415    // Perform the clock calibration if needed.
416    std::hint::black_box(Instant::now());
417}
418
419fn emit_start_time() {
420    metrics::register_gauge!(
421        "elfo_start_time_seconds",
422        metrics::Unit::Seconds,
423        "Start time of the elfo system since unix epoch in seconds",
424    );
425
426    let unix_time = SystemTime::now()
427        .duration_since(SystemTime::UNIX_EPOCH)
428        .unwrap_or_default()
429        .as_secs_f64();
430
431    metrics::gauge!("elfo_start_time_seconds", unix_time);
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    use tokio::sync::mpsc;
439
440    use crate::{config::AnyConfig, ActorGroup, TerminationPolicy};
441
442    #[tokio::test]
443    async fn terminate() {
444        let (tx, mut rx) = mpsc::unbounded_channel();
445
446        let group = ActorGroup::new()
447            .termination_policy(TerminationPolicy::manually())
448            .exec(move |mut ctx| {
449                let tx = tx.clone();
450                async move {
451                    while let Some(envelope) = ctx.recv().await {
452                        msg!(match envelope {
453                            msg @ Terminate => {
454                                let _ = tx.send(msg.reason);
455                                break;
456                            }
457                            _ => {}
458                        });
459                    }
460                }
461            });
462
463        let topology = Topology::empty();
464        let test_terminate = topology.local("test.terminate");
465        test_terminate.mount(group);
466
467        do_start(topology, false, |ctx, topology| async move {
468            ctx.try_send_to(
469                ctx.addr(),
470                TerminateSystem(TerminateReason::Signal(SignalKind::UnixTerminate)),
471            )
472            .expect("failed to send terminate message");
473            for group in topology.locals() {
474                ctx.send_to(group.addr, UpdateConfig::new(AnyConfig::default()))
475                    .await
476                    .expect("failed to send update config message");
477            }
478            exec(ctx, topology).await;
479        })
480        .await
481        .expect("cannot start");
482
483        let reason = rx.recv().await.expect("failed to receive terminate reason");
484        assert_eq!(reason, TerminateReason::Signal(SignalKind::UnixTerminate));
485    }
486}