elfo_core/
init.rs

1use std::{future::Future, sync::Arc, time::Duration};
2
3use futures::{future::join_all, TryFutureExt};
4use tokio::{
5    pin, select,
6    time::{sleep, timeout, Instant},
7};
8use tracing::{error, info, level_filters::LevelFilter, warn};
9
10use crate::{self as elfo};
11use elfo_macros::{message, msg_raw as msg};
12
13use crate::{
14    actor::{Actor, ActorMeta, ActorStatus},
15    addr::Addr,
16    config::SystemConfig,
17    context::Context,
18    demux::Demux,
19    errors::{RequestError, StartError},
20    memory_tracker::MemoryTracker,
21    message,
22    messages::{Ping, Terminate, UpdateConfig},
23    object::Object,
24    scope::{Scope, ScopeGroupShared},
25    signal::{Signal, SignalKind},
26    subscription::SubscriptionManager,
27    time::Interval,
28    topology::Topology,
29    tracing::TraceId,
30};
31
32type Result<T, E = StartError> = std::result::Result<T, E>;
33
34async fn start_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
35    let futures = topology
36        .actor_groups()
37        .filter(|group| group.is_entrypoint)
38        .map(|group| {
39            let config = Default::default();
40            ctx.request_to(group.addr, UpdateConfig { config })
41                .resolve()
42                .or_else(|err| async move {
43                    match err {
44                        RequestError::Ignored => Ok(Ok(())),
45                        _ => Err(StartError::Other(
46                            "initial messages cannot be delivered".into(),
47                        )),
48                    }
49                })
50        });
51
52    let error_count = futures::future::try_join_all(futures)
53        .await?
54        .into_iter()
55        .filter_map(Result::err)
56        .count();
57
58    if error_count == 0 {
59        Ok(())
60    } else {
61        Err(StartError::InvalidConfig)
62    }
63}
64
65async fn wait_entrypoints(ctx: &Context, topology: &Topology) -> Result<()> {
66    let futures = topology
67        .actor_groups()
68        .filter(|group| group.is_entrypoint)
69        .map(|group| ctx.request_to(group.addr, Ping).resolve());
70
71    futures::future::try_join_all(futures)
72        .await
73        .map_err(|_| StartError::Other("entrypoint cannot start".into()))?;
74
75    Ok(())
76}
77
78/// Starts a node with the provided topology.
79///
80/// # Panics
81///
82/// Panics if the system cannot initialize.
83/// Usually, it happens because of an invalid config.
84pub async fn start(topology: Topology) {
85    try_start(topology).await.expect("cannot start")
86}
87
88/// The same as `start()`, but returns an error rather than panics.
89pub async fn try_start(topology: Topology) -> Result<()> {
90    let res = do_start(topology, termination).await;
91
92    if res.is_err() {
93        // XXX: give enough time to the logger.
94        sleep(Duration::from_millis(500)).await;
95    }
96
97    res
98}
99
100#[doc(hidden)]
101pub async fn do_start<F: Future>(
102    topology: Topology,
103    f: impl FnOnce(Context, Topology) -> F,
104) -> Result<F::Output> {
105    message::init();
106
107    let entry = topology.book.vacant_entry();
108    let addr = entry.addr();
109    let ctx = Context::new(topology.book.clone(), Demux::default()).with_addr(addr);
110
111    let meta = Arc::new(ActorMeta {
112        group: "system.init".into(),
113        key: "_".into(), // Just like `Singleton`.
114    });
115
116    // XXX: create a real group.
117    let actor = Actor::new(
118        meta.clone(),
119        addr,
120        Default::default(),
121        Arc::new(SubscriptionManager::new(ctx.clone())),
122    );
123
124    let scope_shared = ScopeGroupShared::new(addr);
125    let mut config = SystemConfig::default();
126    config.logging.max_level = LevelFilter::INFO;
127    scope_shared.configure(&config);
128
129    let scope = Scope::new(TraceId::generate(), addr, meta, Arc::new(scope_shared));
130    scope.clone().sync_within(|| actor.on_start()); // need to emit initial metrics
131    entry.insert(Object::new(addr, actor));
132
133    let f = async move {
134        start_entrypoints(&ctx, &topology).await?;
135        wait_entrypoints(&ctx, &topology).await?;
136        Ok(f(ctx, topology).await)
137    };
138    scope.within(f).await
139}
140
141#[message(elfo = crate)]
142struct TerminateSystem;
143
144#[message(elfo = crate)]
145struct CheckMemoryUsageTick;
146
147// TODO: make these values configurable.
148const SEND_CLOSING_TERMINATE_AFTER: Duration = Duration::from_secs(30);
149const STOP_GROUP_TERMINATION_AFTER: Duration = Duration::from_secs(45);
150const MAX_MEMORY_USAGE_RATIO: f64 = 0.9;
151const CHECK_MEMORY_USAGE_INTERVAL: Duration = Duration::from_secs(7);
152
153async fn termination(ctx: Context, topology: Topology) {
154    let term_signal = Signal::new(SignalKind::Terminate, || TerminateSystem);
155    let ctrl_c_signal = Signal::new(SignalKind::CtrlC, || TerminateSystem);
156    let memory_usage_interval = Interval::new(|| CheckMemoryUsageTick);
157
158    let mut ctx = ctx
159        .with(&term_signal)
160        .with(&ctrl_c_signal)
161        .with(&memory_usage_interval);
162
163    let memory_tracker = match MemoryTracker::new(MAX_MEMORY_USAGE_RATIO) {
164        Ok(tracker) => {
165            memory_usage_interval.set_period(CHECK_MEMORY_USAGE_INTERVAL);
166            Some(tracker)
167        }
168        Err(err) => {
169            warn!(error = %err, "memory tracker is unavailable, disabled");
170            None
171        }
172    };
173
174    while let Some(envelope) = ctx.recv().await {
175        msg!(match envelope {
176            TerminateSystem => break, // TODO: use `Terminate`?
177            CheckMemoryUsageTick => {
178                match memory_tracker.as_ref().map(|mt| mt.check()) {
179                    Some(Ok(true)) | None => {}
180                    Some(Ok(false)) => {
181                        error!("maximum memory usage is reached, forcibly terminating");
182                        let _ = ctx.try_send_to(ctx.addr(), TerminateSystem);
183                    }
184                    Some(Err(err)) => {
185                        error!(error = %err, "memory tracker cannot check memory usage");
186                    }
187                }
188            }
189        });
190    }
191
192    ctx.set_status(ActorStatus::TERMINATING);
193
194    let termination = do_termination(ctx.pruned(), topology);
195    pin!(termination);
196
197    loop {
198        select! {
199            _ = &mut termination => return,
200            Some(envelope) = ctx.recv() => {
201                // TODO: `Terminate::closing` on second `Ctrl-C`
202                // `Ctrl-C` has been pressed again.
203                // Terminate immediately.
204                if envelope.is::<TerminateSystem>() {
205                    return;
206                }
207            }
208        }
209    }
210}
211
212async fn do_termination(ctx: Context, topology: Topology) {
213    info!("terminating user actor groups");
214    terminate_groups(&ctx, &topology, true).await;
215    info!("terminating system actor groups");
216    terminate_groups(&ctx, &topology, false).await;
217    info!("system terminated");
218}
219
220async fn terminate_groups(ctx: &Context, topology: &Topology, user: bool) {
221    // TODO: specify order of system groups.
222    let futures = topology
223        .actor_groups()
224        .filter(|group| user ^ group.name.starts_with("system."))
225        .map(|group| async move {
226            select! {
227                _ = terminate_group(ctx, group.addr, group.name.clone()) => {},
228                _ = watch_group(ctx, group.addr, group.name) => {},
229            }
230        })
231        .collect::<Vec<_>>();
232
233    join_all(futures).await;
234}
235
236async fn terminate_group(ctx: &Context, addr: Addr, name: String) {
237    let start_time = Instant::now();
238
239    // Terminate::default
240
241    info!(group = %name, "sending polite Terminate");
242    let fut = ctx.send_to(addr, Terminate::default());
243
244    if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() {
245        let elapsed = start_time.elapsed();
246        if let Some(delta) = SEND_CLOSING_TERMINATE_AFTER.checked_sub(elapsed) {
247            sleep(delta).await;
248        }
249    } else {
250        warn!(
251            group = %name,
252            "failed to deliver polite Terminate, some actors are too busy"
253        );
254    }
255
256    // Terminate::closing
257
258    warn!(
259        group = %name,
260        "actor group hasn't finished yet, sending closing terminate"
261    );
262    let fut = ctx.send_to(addr, Terminate::closing());
263
264    if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() {
265        let elapsed = start_time.elapsed();
266        if let Some(delta) = STOP_GROUP_TERMINATION_AFTER.checked_sub(elapsed) {
267            sleep(delta).await;
268        }
269    } else {
270        warn!(
271            group = %name,
272            "failed to deliver closing Terminate"
273        );
274    }
275
276    error!(group = %name, "failed to terminate an actor group");
277}
278
279async fn watch_group(ctx: &Context, addr: Addr, name: String) {
280    ctx.finished(addr).await;
281    info!(group = %name, "actor group finished");
282}