use std::env::consts::ARCH;
use std::os::unix::process::parent_id;
use std::sync::Arc;
use std::time::Instant;
use std::{fmt, io, process};
use heph::actor_ref::{ActorGroup, Delivery};
use log::{as_debug, as_display, debug, error, info, trace};
use mio::event::Event;
use mio::{Events, Interest, Poll, Registry, Token};
use mio_signals::{SignalSet, Signals};
use crate::setup::{host_id, host_info, Uuid};
use crate::shared::waker;
use crate::thread_waker::ThreadWaker;
use crate::trace;
use crate::{
self as rt, cpu_usage, shared, worker, Signal, SyncWorker, SYNC_WORKER_ID_END,
SYNC_WORKER_ID_START,
};
const SIGNAL: Token = Token(usize::MAX);
#[derive(Debug)]
pub(super) struct Coordinator {
poll: Poll,
signals: Signals,
internals: Arc<shared::RuntimeInternals>,
start: Instant,
app_name: Box<str>,
host_os: Box<str>,
host_name: Box<str>,
host_id: Uuid,
}
impl Coordinator {
pub(super) fn init(
app_name: Box<str>,
worker_wakers: Box<[&'static ThreadWaker]>,
trace_log: Option<Arc<trace::SharedLog>>,
) -> io::Result<Coordinator> {
let poll = Poll::new()?;
let signals = setup_signals(poll.registry())?;
let setup = shared::RuntimeInternals::setup()?;
let internals = Arc::new_cyclic(|shared_internals| {
let waker_id = waker::init(shared_internals.clone());
setup.complete(waker_id, worker_wakers, trace_log)
});
let (host_os, host_name) = host_info()?;
let host_id = host_id()?;
Ok(Coordinator {
host_os,
host_name,
host_id,
app_name,
poll,
signals,
internals,
start: Instant::now(),
})
}
pub(super) const fn shared_internals(&self) -> &Arc<shared::RuntimeInternals> {
&self.internals
}
pub(super) fn run(
mut self,
mut workers: Vec<worker::Handle>,
mut sync_workers: Vec<SyncWorker>,
mut signal_refs: ActorGroup<Signal>,
mut trace_log: Option<trace::CoordinatorLog>,
) -> Result<(), rt::Error> {
self.pre_run(&mut workers, &mut sync_workers, &mut trace_log)?;
let mut events = Events::with_capacity(16);
loop {
let timing = trace::start(&trace_log);
self.poll
.poll(&mut events, None)
.map_err(|err| rt::Error::coordinator(Error::Polling(err)))?;
trace::finish_rt(trace_log.as_mut(), timing, "Polling for OS events", &[]);
let timing = trace::start(&trace_log);
for event in events.iter() {
trace!("got OS event: {:?}", event);
match event.token() {
SIGNAL => {
let timing = trace::start(&trace_log);
let log_metrics =
relay_signals(&mut self.signals, &mut workers, &mut signal_refs);
trace::finish_rt(
trace_log.as_mut(),
timing,
"Relaying process signal(s)",
&[],
);
if log_metrics {
self.log_metrics(&workers, &sync_workers, &signal_refs, &mut trace_log);
}
}
token if token.0 < SYNC_WORKER_ID_START => {
let timing = trace::start(&trace_log);
handle_worker_event(&mut workers, event)?;
trace::finish_rt(
trace_log.as_mut(),
timing,
"Processing worker event",
&[],
);
}
token if token.0 <= SYNC_WORKER_ID_END => {
let timing = trace::start(&trace_log);
handle_sync_worker_event(&mut sync_workers, event)?;
trace::finish_rt(
trace_log.as_mut(),
timing,
"Processing sync worker event",
&[],
);
}
_ => debug!("unexpected OS event: {:?}", event),
}
}
trace::finish_rt(trace_log.as_mut(), timing, "Handling OS events", &[]);
if workers.is_empty() && sync_workers.is_empty() {
return Ok(());
}
}
}
fn pre_run(
&mut self,
workers: &mut [worker::Handle],
sync_workers: &mut Vec<SyncWorker>,
trace_log: &mut Option<trace::CoordinatorLog>,
) -> Result<(), rt::Error> {
debug_assert!(workers.is_sorted_by_key(worker::Handle::id));
debug_assert!(sync_workers.is_sorted_by_key(SyncWorker::id));
let timing = trace::start(&*trace_log);
let registry = self.poll.registry();
register_workers(registry, workers)
.map_err(|err| rt::Error::coordinator(Error::RegisteringWorkers(err)))?;
register_sync_workers(registry, sync_workers)
.map_err(|err| rt::Error::coordinator(Error::RegisteringSyncActors(err)))?;
check_sync_worker_alive(sync_workers)?;
trace::finish_rt(
trace_log.as_mut(),
timing,
"Initialising the coordinator thread",
&[],
);
debug!("signaling to workers the runtime started");
for worker in workers {
worker
.send_runtime_started()
.map_err(|err| rt::Error::coordinator(Error::SendingStartSignal(err)))?;
}
Ok(())
}
fn log_metrics<'c, 'l>(
&'c self,
workers: &[worker::Handle],
sync_workers: &[SyncWorker],
signal_refs: &ActorGroup<Signal>,
trace_log: &'l mut Option<trace::CoordinatorLog>,
) {
let timing = trace::start(trace_log);
let shared_metrics = self.internals.metrics();
let trace_metrics = trace_log.as_ref().map(trace::CoordinatorLog::metrics);
info!(
target: "metrics",
heph_version = as_display!(concat!("v", env!("CARGO_PKG_VERSION"))),
host_os = self.host_os,
host_arch = ARCH,
host_name = self.host_name,
host_id = as_display!(self.host_id),
app_name = self.app_name,
process_id = process::id(),
parent_process_id = parent_id(),
uptime = as_debug!(self.start.elapsed()),
worker_threads = workers.len(),
sync_actors = sync_workers.len(),
shared_scheduler_ready = shared_metrics.scheduler_ready,
shared_scheduler_inactive = shared_metrics.scheduler_inactive,
shared_timers_total = shared_metrics.timers_total,
shared_timers_next = as_debug!(shared_metrics.timers_next),
process_signals = as_debug!(SIGNAL_SET),
process_signal_receivers = signal_refs.len(),
cpu_time = as_debug!(cpu_usage(libc::CLOCK_THREAD_CPUTIME_ID)),
total_cpu_time = as_debug!(cpu_usage(libc::CLOCK_PROCESS_CPUTIME_ID)),
trace_file = as_debug!(trace_metrics.as_ref().map(|m| m.file)),
trace_counter = trace_metrics.map_or(0, |m| m.counter);
"coordinator metrics",
);
trace::finish_rt(trace_log.as_mut(), timing, "Printing runtime metrics", &[]);
}
}
const SIGNAL_SET: SignalSet = SignalSet::all();
fn setup_signals(registry: &Registry) -> io::Result<Signals> {
trace!(signals = as_debug!(SIGNAL_SET); "setting up signal handling");
Signals::new(SIGNAL_SET).and_then(|mut signals| {
registry
.register(&mut signals, SIGNAL, Interest::READABLE)
.map(|()| signals)
})
}
fn register_workers(registry: &Registry, workers: &mut [worker::Handle]) -> io::Result<()> {
workers.iter_mut().try_for_each(|worker| {
trace!(worker_id = worker.id(); "registering worker thread");
worker.register(registry)
})
}
fn register_sync_workers(registry: &Registry, sync_workers: &mut [SyncWorker]) -> io::Result<()> {
sync_workers.iter_mut().try_for_each(|sync_worker| {
trace!(sync_worker_id = sync_worker.id(); "registering sync actor worker thread");
sync_worker.register(registry)
})
}
fn check_sync_worker_alive(sync_workers: &mut Vec<SyncWorker>) -> Result<(), rt::Error> {
sync_workers
.drain_filter(|sync_worker| !sync_worker.is_alive())
.try_for_each(|sync_worker| {
debug!(sync_worker_id = sync_worker.id(); "sync actor worker thread stopped");
sync_worker.join().map_err(rt::Error::sync_actor_panic)
})
}
fn relay_signals(
signals: &mut Signals,
workers: &mut [worker::Handle],
signal_refs: &mut ActorGroup<Signal>,
) -> bool {
signal_refs.remove_disconnected();
let mut log_metrics = false;
loop {
match signals.receive() {
Ok(Some(signal)) => {
let signal = Signal::from_mio(signal);
if let Signal::User2 = signal {
log_metrics = true;
}
debug!(signal = as_debug!(signal); "relaying process signal to worker threads");
for worker in workers.iter_mut() {
if let Err(err) = worker.send_signal(signal) {
error!(
signal = as_debug!(signal), worker_id = worker.id();
"failed to send process signal to worker: {}", err,
);
}
}
debug!(signal = as_debug!(signal); "relaying process signal to actors");
let _ = signal_refs.try_send(signal, Delivery::ToAll);
}
Ok(None) => break,
Err(err) => {
error!("failed to retrieve process signal: {}", err);
break;
}
}
}
log_metrics
}
fn handle_worker_event(workers: &mut Vec<worker::Handle>, event: &Event) -> Result<(), rt::Error> {
if let Ok(i) = workers.binary_search_by_key(&event.token().0, worker::Handle::id) {
if event.is_error() || event.is_write_closed() {
let worker = workers.remove(i);
debug!(worker_id = worker.id(); "worker thread stopped");
worker
.join()
.map_err(rt::Error::worker_panic)
.and_then(|res| res)?;
}
}
Ok(())
}
fn handle_sync_worker_event(
sync_workers: &mut Vec<SyncWorker>,
event: &Event,
) -> Result<(), rt::Error> {
if let Ok(i) = sync_workers.binary_search_by_key(&event.token().0, SyncWorker::id) {
if event.is_error() || event.is_write_closed() {
let sync_worker = sync_workers.remove(i);
debug!(sync_worker_id = sync_worker.id(); "sync actor worker thread stopped");
sync_worker.join().map_err(rt::Error::sync_actor_panic)?;
}
}
Ok(())
}
#[derive(Debug)]
pub(super) enum Error {
RegisteringWorkers(io::Error),
RegisteringSyncActors(io::Error),
Polling(io::Error),
SendingStartSignal(io::Error),
SendingFunc(io::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Error::*;
match self {
RegisteringWorkers(err) => write!(f, "error registering worker threads: {}", err),
RegisteringSyncActors(err) => {
write!(f, "error registering synchronous actor threads: {}", err)
}
Polling(err) => write!(f, "error polling for OS events: {}", err),
SendingStartSignal(err) => write!(f, "error sending start signal to worker: {}", err),
SendingFunc(err) => write!(f, "error sending function to worker: {}", err),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
RegisteringWorkers(ref err)
| RegisteringSyncActors(ref err)
| Polling(ref err)
| SendingStartSignal(ref err)
| SendingFunc(ref err) => Some(err),
}
}
}