use std::{
future::Future,
sync::Arc,
time::{Duration, SystemTime},
};
use futures::future::join_all;
use tokio::{
pin, select,
time::{sleep, timeout},
};
use tracing::{error, info, level_filters::LevelFilter, warn};
use elfo_utils::time::Instant;
#[cfg(target_os = "linux")]
use crate::{
memory_tracker::{MemoryCheckResult, MemoryTracker},
time::Interval,
};
use crate::{
actor::{Actor, ActorMeta, ActorStartInfo},
actor_status::ActorStatus,
addr::{Addr, GroupNo},
config::SystemConfig,
context::Context,
demux::Demux,
errors::{RequestError, StartError, StartGroupError},
message,
messages::{StartEntrypoint, Terminate, TerminateReason, UpdateConfig},
msg,
object::Object,
scope::{Scope, ScopeGroupShared},
signal::{Signal, SignalKind},
subscription::SubscriptionManager,
topology::{Topology, SYSTEM_INIT_GROUP_NO},
tracing::TraceId,
};
const INIT_GROUP_NAME: &str = "system.init";
type Result<T, E = StartError> = std::result::Result<T, E>;
async fn start_entrypoints(ctx: &Context, topology: &Topology, is_check_only: bool) -> Result<()> {
let futures = topology
.locals()
.filter(|group| group.is_entrypoint)
.map(|group| async move {
let response = ctx
.request_to(
group.addr,
UpdateConfig {
config: Default::default(),
},
)
.resolve()
.await;
match response {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => Err(StartError::single(group.name.clone(), e.reason)),
Err(RequestError::Ignored) | Err(RequestError::Failed) => Err(StartError::single(
group.name.clone(),
"config cannot be delivered to the entrypoint".into(),
)),
}?;
let response = ctx
.request_to(group.addr, StartEntrypoint { is_check_only })
.resolve()
.await;
match response {
Ok(Ok(())) => Ok(()),
Ok(Err(e)) => {
let group_errors: Vec<StartGroupError> = e
.errors
.into_iter()
.map(|e| StartGroupError {
group: e.group,
reason: e.reason,
})
.collect();
Err(StartError::multiple(group_errors))
}
Err(RequestError::Ignored) | Err(RequestError::Failed) => Err(StartError::single(
group.name,
"starting message cannot be delivered to the entrypoint".into(),
)),
}
});
let errors: Vec<StartGroupError> = futures::future::join_all(futures)
.await
.into_iter()
.filter_map(Result::err)
.flat_map(|e| e.errors.into_iter())
.collect();
if errors.is_empty() {
Ok(())
} else {
Err(StartError::multiple(errors))
}
}
pub async fn start(topology: Topology) {
try_start(topology).await.expect("cannot start")
}
pub async fn try_start(topology: Topology) -> Result<()> {
check_messages_uniqueness()?;
#[cfg(feature = "test-util")]
warn!("elfo is compiled with `test-util` feature, it may affect performance");
let res = do_start(topology, false, exec).await;
if res.is_err() {
sleep(Duration::from_millis(500)).await;
}
res
}
pub async fn check_only(topology: Topology) -> Result<()> {
check_messages_uniqueness()?;
do_start(topology, true, |ctx, topology| {
terminate(ctx, topology, TerminateReason::Unknown)
})
.await
}
#[instability::unstable]
pub fn check_messages_uniqueness() -> Result<()> {
message::check_uniqueness().map_err(|duplicates| {
let errors = duplicates
.into_iter()
.map(|(protocol, name)| StartGroupError {
group: INIT_GROUP_NAME.into(),
reason: format!("message `{protocol}/{name}` is defined several times"),
})
.collect();
StartError::multiple(errors)
})
}
#[doc(hidden)]
pub async fn do_start<F: Future>(
topology: Topology,
is_check_only: bool,
and_then: impl FnOnce(Context, Topology) -> F,
) -> Result<F::Output> {
instant_clock_calibration();
let group_no = GroupNo::new(SYSTEM_INIT_GROUP_NO, topology.launch_id()).unwrap();
let entry = topology.book.vacant_entry(group_no);
let addr = entry.addr();
let ctx = Context::new(topology.book.clone(), Demux::default());
let meta = Arc::new(ActorMeta {
group: INIT_GROUP_NAME.into(),
key: "_".into(), });
let actor = Actor::new(
meta.clone(),
addr,
&<_>::default(),
<_>::default(),
Arc::new(SubscriptionManager::new(ctx.clone())),
);
let scope_shared = ScopeGroupShared::new(topology.node_no(), topology.launch_id(), addr);
let mut config = SystemConfig::default();
config.logging.max_level = LevelFilter::INFO;
scope_shared.configure(&config);
let scope = Scope::new(TraceId::generate(), addr, meta, Arc::new(scope_shared));
scope.clone().sync_within(|| actor.on_start()); entry.insert(Object::new(addr, actor));
let ctx = ctx
.with_addr(addr)
.with_start_info(ActorStartInfo::on_group_mounted());
let init = async move {
start_entrypoints(&ctx, &topology, is_check_only).await?;
Ok(and_then(ctx, topology).await)
};
scope.within(init).await
}
#[message]
struct TerminateSystem(TerminateReason);
#[message]
struct CheckMemoryUsageTick;
const SEND_CLOSING_TERMINATE_AFTER: Duration = Duration::from_secs(25);
const STOP_GROUP_TERMINATION_AFTER: Duration = Duration::from_secs(35);
async fn exec(mut ctx: Context, topology: Topology) {
emit_start_time();
ctx.attach(Signal::new(
SignalKind::UnixTerminate,
TerminateSystem(TerminateReason::Signal(SignalKind::UnixTerminate)),
));
ctx.attach(Signal::new(
SignalKind::UnixInterrupt,
TerminateSystem(TerminateReason::Signal(SignalKind::UnixInterrupt)),
));
ctx.attach(Signal::new(
SignalKind::WindowsCtrlC,
TerminateSystem(TerminateReason::Signal(SignalKind::WindowsCtrlC)),
));
#[cfg(target_os = "linux")]
let memory_tracker = {
const MAX_MEMORY_USAGE_RATIO: f64 = 0.9;
const CHECK_MEMORY_USAGE_INTERVAL: Duration = Duration::from_secs(3);
match MemoryTracker::new(MAX_MEMORY_USAGE_RATIO) {
Ok(tracker) => {
ctx.attach(Interval::new(CheckMemoryUsageTick))
.start(CHECK_MEMORY_USAGE_INTERVAL);
Some(tracker)
}
Err(err) => {
warn!(error = %err, "memory tracker is unavailable, disabled");
None
}
}
};
let mut oom_prevented = false;
let mut terminate_reason = TerminateReason::Unknown;
while let Some(envelope) = ctx.recv().await {
msg!(match &envelope {
TerminateSystem(reason) => {
terminate_reason = reason.clone();
break;
}
});
#[cfg(target_os = "linux")]
if envelope.is::<CheckMemoryUsageTick>() {
match memory_tracker.as_ref().map(|mt| mt.check()) {
Some(Ok(MemoryCheckResult::Passed)) | None => {}
Some(Ok(MemoryCheckResult::Failed(stats))) => {
let percents_of_total =
|x| ((x as f64) / (stats.total as f64) * 100.).round() as u64;
let used = percents_of_total(stats.used);
let available = percents_of_total(stats.available);
error!(
total = stats.total,
used_pct = used,
available_pct = available,
"maximum memory usage is reached, forcibly terminating"
);
let _ = ctx.try_send_to(ctx.addr(), TerminateSystem(TerminateReason::Oom));
oom_prevented = true;
}
Some(Err(err)) => {
warn!(error = %err, "memory tracker cannot check memory usage");
}
}
}
}
ctx.set_status(ActorStatus::TERMINATING);
let termination = terminate(ctx.pruned(), topology, terminate_reason);
pin!(termination);
loop {
select! {
_ = &mut termination => return,
Some(envelope) = ctx.recv() => {
if !envelope.is::<TerminateSystem>() {
continue;
}
if oom_prevented {
oom_prevented = false;
} else {
return;
}
}
}
}
}
async fn terminate(ctx: Context, topology: Topology, reason: TerminateReason) {
let mut stop_order_list = topology
.locals()
.map(|group| group.stop_order)
.collect::<Vec<_>>();
stop_order_list.sort_unstable();
stop_order_list.dedup();
for stop_order in stop_order_list {
info!(%stop_order, "terminating groups");
terminate_groups(&ctx, &topology, stop_order, reason.clone()).await;
}
}
async fn terminate_groups(
ctx: &Context,
topology: &Topology,
stop_order: i8,
reason: TerminateReason,
) {
let futures = topology
.locals()
.filter(|group| group.stop_order == stop_order)
.zip(core::iter::repeat(reason))
.map(|(group, reason)| async move {
let started_at = Instant::now();
select! {
_ = terminate_group(ctx, group.addr, group.name.clone(), started_at, reason) => {},
_ = watch_group(ctx, group.addr, group.name, started_at) => {},
}
})
.collect::<Vec<_>>();
join_all(futures).await;
}
async fn terminate_group(
ctx: &Context,
addr: Addr,
name: String,
started_at: Instant,
reason: TerminateReason,
) {
info!(group = %name, "sending polite Terminate");
let fut = ctx.send_to(addr, Terminate::default().with_reason(reason));
if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() {
let elapsed = started_at.elapsed();
if let Some(delta) = SEND_CLOSING_TERMINATE_AFTER.checked_sub(elapsed) {
sleep(delta).await;
}
} else {
warn!(group = %name, "failed to deliver polite Terminate, some actors are too busy");
}
warn!(
message = "actor group hasn't finished yet, sending closing Terminate",
group = %name,
elapsed = ?started_at.elapsed(),
);
let fut = ctx.send_to(addr, Terminate::closing());
if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() {
let elapsed = started_at.elapsed();
if let Some(delta) = STOP_GROUP_TERMINATION_AFTER.checked_sub(elapsed) {
sleep(delta).await;
}
} else {
warn!(group = %name, "failed to deliver closing Terminate");
}
error!(
message = "failed to terminate an actor group, skipped",
group = %name,
elapsed = ?started_at.elapsed(),
);
}
async fn watch_group(ctx: &Context, addr: Addr, name: String, started_at: Instant) {
ctx.finished(addr).await;
info!(
message = "actor group finished",
group = %name,
elapsed = ?started_at.elapsed(),
);
}
fn instant_clock_calibration() {
std::hint::black_box(Instant::now());
}
fn emit_start_time() {
metrics::register_gauge!(
"elfo_start_time_seconds",
metrics::Unit::Seconds,
"Start time of the elfo system since unix epoch in seconds",
);
let unix_time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs_f64();
metrics::gauge!("elfo_start_time_seconds", unix_time);
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::sync::mpsc;
use crate::{config::AnyConfig, ActorGroup, TerminationPolicy};
#[tokio::test]
async fn terminate() {
let (tx, mut rx) = mpsc::unbounded_channel();
let group = ActorGroup::new()
.termination_policy(TerminationPolicy::manually())
.exec(move |mut ctx| {
let tx = tx.clone();
async move {
while let Some(envelope) = ctx.recv().await {
msg!(match envelope {
msg @ Terminate => {
let _ = tx.send(msg.reason);
break;
}
_ => {}
});
}
}
});
let topology = Topology::empty();
let test_terminate = topology.local("test.terminate");
test_terminate.mount(group);
do_start(topology, false, |ctx, topology| async move {
ctx.try_send_to(
ctx.addr(),
TerminateSystem(TerminateReason::Signal(SignalKind::UnixTerminate)),
)
.expect("failed to send terminate message");
for group in topology.locals() {
ctx.send_to(group.addr, UpdateConfig::new(AnyConfig::default()))
.await
.expect("failed to send update config message");
}
exec(ctx, topology).await;
})
.await
.expect("cannot start");
let reason = rx.recv().await.expect("failed to receive terminate reason");
assert_eq!(reason, TerminateReason::Signal(SignalKind::UnixTerminate));
}
}