use std::cell::RefMut;
use std::num::NonZeroUsize;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};
use std::{fmt, io, thread};
use crossbeam_channel::{self, Receiver};
use heph::actor_ref::{Delivery, SendError};
use log::{as_debug, debug, info, trace};
use mio::{Events, Poll, Registry, Token};
use crate::error::StringError;
use crate::local::waker::{self, WakerId};
use crate::local::RuntimeInternals;
use crate::process::{ProcessId, ProcessResult};
use crate::setup::set_cpu_affinity;
use crate::thread_waker::ThreadWaker;
use crate::{self as rt, cpu_usage, shared, trace, RuntimeRef, Signal};
const RUN_POLL_RATIO: usize = 32;
pub(super) const WAKER: Token = Token(usize::MAX);
const COMMS: Token = Token(usize::MAX - 1);
const SHARED_POLL: Token = Token(usize::MAX - 2);
pub(super) fn setup(id: NonZeroUsize) -> io::Result<(WorkerSetup, &'static ThreadWaker)> {
let poll = Poll::new()?;
let (waker_sender, waker_events) = crossbeam_channel::unbounded();
let waker = mio::Waker::new(poll.registry(), WAKER)?;
let waker_id = waker::init(waker, waker_sender);
let thread_waker = waker::get_thread_waker(waker_id);
let setup = WorkerSetup {
id,
poll,
waker_id,
waker_events,
};
Ok((setup, thread_waker))
}
pub(super) struct WorkerSetup {
id: NonZeroUsize,
poll: Poll,
waker_id: WakerId,
waker_events: Receiver<ProcessId>,
}
impl WorkerSetup {
pub(super) fn start(
self,
shared_internals: Arc<shared::RuntimeInternals>,
auto_cpu_affinity: bool,
trace_log: Option<trace::Log>,
) -> io::Result<Handle> {
rt::channel::new().and_then(move |(sender, receiver)| {
let id = self.id;
thread::Builder::new()
.name(format!("Worker {}", id))
.spawn(move || {
let worker = Worker::setup(
self,
receiver,
shared_internals,
auto_cpu_affinity,
trace_log,
)
.map_err(rt::Error::worker)?;
worker.run().map_err(rt::Error::worker)
})
.map(|handle| Handle {
id,
channel: sender,
handle,
})
})
}
pub(super) const fn id(&self) -> usize {
self.id.get()
}
}
#[derive(Debug)]
pub(super) struct Handle {
id: NonZeroUsize,
channel: rt::channel::Sender<Control>,
handle: thread::JoinHandle<Result<(), rt::Error>>,
}
impl Handle {
pub(super) const fn id(&self) -> usize {
self.id.get()
}
pub(super) fn register(&mut self, registry: &Registry) -> io::Result<()> {
self.channel.register(registry, Token(self.id()))
}
pub(super) fn send_runtime_started(&mut self) -> io::Result<()> {
self.channel.try_send(Control::Started)
}
pub(super) fn send_signal(&mut self, signal: Signal) -> io::Result<()> {
self.channel.try_send(Control::Signal(signal))
}
pub(super) fn send_function(
&mut self,
f: Box<dyn FnOnce(RuntimeRef) -> Result<(), String> + Send + 'static>,
) -> io::Result<()> {
self.channel.try_send(Control::Run(f))
}
pub(super) fn join(self) -> thread::Result<Result<(), rt::Error>> {
self.handle.join()
}
}
pub(crate) struct Worker {
internals: Rc<RuntimeInternals>,
events: Events,
waker_events: Receiver<ProcessId>,
channel: rt::channel::Receiver<Control>,
started: bool,
}
impl Worker {
fn setup(
setup: WorkerSetup,
mut receiver: rt::channel::Receiver<Control>,
shared_internals: Arc<shared::RuntimeInternals>,
auto_cpu_affinity: bool,
trace_log: Option<trace::Log>,
) -> Result<Worker, Error> {
let timing = trace::start(&trace_log);
let cpu = if auto_cpu_affinity {
set_cpu_affinity(setup.id)
} else {
None
};
let poll = setup.poll;
trace!(worker_id = setup.id.get(); "registring shared poll");
shared_internals
.register_worker_poll(poll.registry(), SHARED_POLL)
.map_err(Error::Init)?;
trace!(worker_id = setup.id.get(); "registring communication channel");
receiver
.register(poll.registry(), COMMS)
.map_err(Error::Init)?;
let internals = RuntimeInternals::new(
setup.id,
shared_internals,
setup.waker_id,
poll,
cpu,
trace_log,
);
let mut worker = Worker {
internals: Rc::new(internals),
events: Events::with_capacity(128),
waker_events: setup.waker_events,
channel: receiver,
started: false,
};
trace::finish_rt(
worker.trace_log().as_mut(),
timing,
"Initialising the worker thread",
&[],
);
Ok(worker)
}
#[cfg(any(test, feature = "test"))]
pub(crate) fn new_test(
shared_internals: Arc<shared::RuntimeInternals>,
mut receiver: rt::channel::Receiver<Control>,
) -> io::Result<Worker> {
let poll = Poll::new()?;
let (waker_sender, waker_events) = crossbeam_channel::unbounded();
let waker = mio::Waker::new(poll.registry(), WAKER)?;
let waker_id = waker::init(waker, waker_sender);
receiver.register(poll.registry(), COMMS)?;
let id = NonZeroUsize::new(usize::MAX).unwrap();
let internals = RuntimeInternals::new(id, shared_internals, waker_id, poll, None, None);
Ok(Worker {
internals: Rc::new(internals),
events: Events::with_capacity(16),
waker_events,
channel: receiver,
started: false,
})
}
pub(crate) fn run(mut self) -> Result<(), Error> {
debug!(worker_id = self.internals.id.get(); "starting worker");
let mut runtime_ref = self.create_ref();
loop {
trace!(worker_id = self.internals.id.get(); "running processes");
let mut n = 0;
while n < RUN_POLL_RATIO {
if !self.run_local_process(&mut runtime_ref) {
break;
}
n += 1;
}
while n < RUN_POLL_RATIO {
if !self.run_shared_process(&mut runtime_ref) {
break;
}
n += 1;
}
if self.started && !self.has_process() {
debug!(worker_id = self.internals.id.get(); "no processes to run, stopping worker");
self.internals.shared.wake_all_workers();
return Ok(());
}
self.schedule_processes()?;
}
}
fn run_local_process(&mut self, runtime_ref: &mut RuntimeRef) -> bool {
let process = self.internals.scheduler.borrow_mut().next_process();
match process {
Some(mut process) => {
let timing = trace::start(&*self.internals.trace_log.borrow());
let pid = process.as_ref().id();
let name = process.as_ref().name();
match process.as_mut().run(runtime_ref) {
ProcessResult::Complete => {
drop(catch_unwind(AssertUnwindSafe(move || drop(process))));
}
ProcessResult::Pending => {
self.internals.scheduler.borrow_mut().add_process(process);
}
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Running thread-local process",
&[("id", &pid.0), ("name", &name)],
);
true
}
None => false,
}
}
fn run_shared_process(&mut self, runtime_ref: &mut RuntimeRef) -> bool {
let process = self.internals.shared.remove_process();
match process {
Some(mut process) => {
let timing = trace::start(&*self.internals.trace_log.borrow());
let pid = process.as_ref().id();
let name = process.as_ref().name();
match process.as_mut().run(runtime_ref) {
ProcessResult::Complete => {
self.internals.shared.complete(process);
}
ProcessResult::Pending => {
self.internals.shared.add_process(process);
}
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Running thread-safe process",
&[("id", &pid.0), ("name", &name)],
);
true
}
None => false,
}
}
fn has_process(&self) -> bool {
self.internals.scheduler.borrow().has_process() || self.internals.shared.has_process()
}
fn schedule_processes(&mut self) -> Result<(), Error> {
trace!(worker_id = self.internals.id.get(); "polling event sources to schedule processes");
let timing = trace::start(&*self.internals.trace_log.borrow());
let (mut local_amount, check_shared_poll) = self.schedule_from_os_events()?;
let mut shared_amount = if check_shared_poll {
self.schedule_from_shared_os_events()
.map_err(Error::Polling)?
} else {
0
};
local_amount += self.schedule_from_waker();
let now = Instant::now();
local_amount += self.schedule_from_local_timers(now);
shared_amount += self.schedule_from_shared_timers(now);
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Scheduling processes",
&[
("local amount", &local_amount),
("shared amount", &shared_amount),
("total amount", &(local_amount + shared_amount)),
],
);
self.wake_workers(local_amount, shared_amount);
Ok(())
}
fn schedule_from_os_events(&mut self) -> Result<(usize, bool), Error> {
self.poll_os().map_err(Error::Polling)?;
let timing = trace::start(&*self.internals.trace_log.borrow());
let mut scheduler = self.internals.scheduler.borrow_mut();
let mut check_comms = false;
let mut check_shared_poll = false;
let mut amount = 0;
for event in self.events.iter() {
trace!(worker_id = self.internals.id.get(); "got OS event: {:?}", event);
match event.token() {
WAKER => { }
COMMS => check_comms = true,
SHARED_POLL => check_shared_poll = true,
token => {
let pid = ProcessId::from(token);
trace!(
worker_id = self.internals.id.get(), pid = pid.0;
"scheduling local process based on OS event",
);
scheduler.mark_ready(pid);
amount += 1;
}
}
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Handling OS events",
&[],
);
if check_comms {
drop(scheduler);
self.check_comms()?;
}
Ok((amount, check_shared_poll))
}
fn schedule_from_shared_os_events(&mut self) -> io::Result<usize> {
trace!(worker_id = self.internals.id.get(); "polling shared OS events");
let timing = trace::start(&*self.internals.trace_log.borrow());
let mut amount = 0;
if self.internals.shared.try_poll(&mut self.events)? {
for event in self.events.iter() {
trace!(worker_id = self.internals.id.get(); "got shared OS event: {:?}", event);
let pid = ProcessId::from(event.token());
trace!(
worker_id = self.internals.id.get(), pid = pid.0;
"scheduling shared process based on OS event",
);
self.internals.shared.mark_ready(pid);
amount += 1;
}
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Scheduling thread-safe processes based on shared OS events",
&[("amount", &amount)],
);
Ok(amount)
}
fn schedule_from_waker(&mut self) -> usize {
trace!(worker_id = self.internals.id.get(); "polling wakup events");
let timing = trace::start(&*self.internals.trace_log.borrow());
let mut scheduler = self.internals.scheduler.borrow_mut();
let mut amount: usize = 0;
for pid in self.waker_events.try_iter() {
trace!(worker_id = self.internals.id.get(), pid = pid.0; "waking up local process");
scheduler.mark_ready(pid);
amount += 1;
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Scheduling thread-local processes based on wake-up events",
&[("amount", &amount)],
);
amount
}
fn schedule_from_local_timers(&mut self, now: Instant) -> usize {
trace!(worker_id = self.internals.id.get(); "polling local timers");
let timing = trace::start(&*self.internals.trace_log.borrow());
let mut scheduler = self.internals.scheduler.borrow_mut();
let mut amount: usize = 0;
for pid in self.internals.timers.borrow_mut().deadlines(now) {
trace!(worker_id = self.internals.id.get(), pid = pid.0; "expiring timer for local process");
scheduler.mark_ready(pid);
amount += 1;
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Scheduling thread-local processes based on timers",
&[("amount", &amount)],
);
amount
}
fn schedule_from_shared_timers(&mut self, now: Instant) -> usize {
trace!(worker_id = self.internals.id.get(); "polling shared timers");
let timing = trace::start(&*self.internals.trace_log.borrow());
let mut amount: usize = 0;
while let Some(pid) = self.internals.shared.remove_next_deadline(now) {
trace!(worker_id = self.internals.id.get(), pid = pid.0; "expiring timer for shared process");
self.internals.shared.mark_ready(pid);
amount += 1;
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Scheduling thread-safe processes based on timers",
&[("amount", &amount)],
);
amount
}
fn wake_workers(&mut self, local_amount: usize, shared_amount: usize) {
let wake_n = if local_amount == 0 {
shared_amount.saturating_sub(1)
} else {
shared_amount
};
if wake_n != 0 {
trace!(worker_id = self.internals.id.get(); "waking {} worker threads", wake_n);
let timing = trace::start(&*self.internals.trace_log.borrow());
self.internals.shared.wake_workers(wake_n);
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Waking worker threads",
&[("amount", &wake_n)],
);
}
}
fn poll_os(&mut self) -> io::Result<()> {
let timing = trace::start(&*self.internals.trace_log.borrow());
let mut timeout = self.determine_timeout();
let marked_polling = if timeout.map_or(true, |t| !t.is_zero()) {
waker::mark_polling(self.internals.waker_id, true);
timeout = self.check_timeout(timeout);
true
} else {
false
};
trace!(worker_id = self.internals.id.get(), timeout = as_debug!(timeout); "polling OS events");
let res = self
.internals
.poll
.borrow_mut()
.poll(&mut self.events, timeout);
if marked_polling {
waker::mark_polling(self.internals.waker_id, false);
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Polling for OS events",
&[],
);
res
}
fn determine_timeout(&self) -> Option<Duration> {
if self.internals.scheduler.borrow().has_ready_process()
|| !self.waker_events.is_empty()
|| self.internals.shared.has_ready_process()
{
return Some(Duration::ZERO);
}
let now = Instant::now();
match self.internals.timers.borrow_mut().next() {
Some(deadline) => match deadline.checked_duration_since(now) {
None => Some(Duration::ZERO),
timeout @ Some(..) => self.internals.shared.next_timeout(now, timeout),
},
None => self.internals.shared.next_timeout(now, None),
}
}
fn check_timeout(&self, timeout: Option<Duration>) -> Option<Duration> {
if !self.waker_events.is_empty() || self.internals.shared.has_ready_process() {
Some(Duration::ZERO)
} else {
self.internals.shared.next_timeout(Instant::now(), timeout)
}
}
fn check_comms(&mut self) -> Result<(), Error> {
trace!(worker_id = self.internals.id.get(); "processing coordinator messages");
let timing = trace::start(&*self.internals.trace_log.borrow());
while let Some(msg) = self.channel.try_recv().map_err(Error::RecvMsg)? {
match msg {
Control::Started => self.started = true,
Control::Signal(signal) => {
if let Signal::User2 = signal {
self.log_metrics();
}
self.relay_signal(signal)?
}
Control::Run(f) => self.run_user_function(f)?,
}
}
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Processing communication message(s)",
&[],
);
Ok(())
}
fn relay_signal(&mut self, signal: Signal) -> Result<(), Error> {
let timing = trace::start(&*self.internals.trace_log.borrow());
trace!(worker_id = self.internals.id.get(), signal = as_debug!(signal); "received process signal");
let mut receivers = self.internals.signal_receivers.borrow_mut();
receivers.remove_disconnected();
let res = match receivers.try_send(signal, Delivery::ToAll) {
Err(SendError) if signal.should_stop() => Err(Error::ProcessInterrupted),
Ok(()) | Err(SendError) => Ok(()),
};
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Relaying process signal to actors",
&[("signal", &signal.as_str())],
);
res
}
fn run_user_function(
&mut self,
f: Box<dyn FnOnce(RuntimeRef) -> Result<(), String>>,
) -> Result<(), Error> {
let timing = trace::start(&*self.internals.trace_log.borrow());
trace!(worker_id = self.internals.id.get(); "running user function");
let runtime_ref = self.create_ref();
let res = f(runtime_ref).map_err(|err| Error::UserFunction(err.into()));
trace::finish_rt(
self.internals.trace_log.borrow_mut().as_mut(),
timing,
"Running user function",
&[],
);
res
}
fn log_metrics(&self) {
let shared = &*self.internals;
let timing = trace::start(&*shared.trace_log.borrow());
let trace_metrics = shared.trace_log.borrow().as_ref().map(trace::Log::metrics);
let scheduler = shared.scheduler.borrow();
let mut timers = shared.timers.borrow_mut();
info!(
target: "metrics",
worker_id = shared.id.get(),
cpu_affinity = shared.cpu,
scheduler_ready = scheduler.ready(),
scheduler_inactive = scheduler.inactive(),
timers_total = timers.len(),
timers_next = as_debug!(timers.next_timer()),
process_signal_receivers = shared.signal_receivers.borrow().len(),
cpu_time = as_debug!(cpu_usage(libc::CLOCK_THREAD_CPUTIME_ID)),
trace_counter = trace_metrics.map_or(0, |m| m.counter);
"worker metrics",
);
trace::finish_rt(
shared.trace_log.borrow_mut().as_mut(),
timing,
"Printing runtime metrics",
&[],
);
}
pub(crate) fn create_ref(&self) -> RuntimeRef {
RuntimeRef {
internals: self.internals.clone(),
}
}
fn trace_log(&mut self) -> RefMut<'_, Option<trace::Log>> {
self.internals.trace_log.borrow_mut()
}
}
#[derive(Debug)]
pub(crate) enum Error {
Init(io::Error),
Polling(io::Error),
RecvMsg(io::Error),
ProcessInterrupted,
UserFunction(StringError),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Error::*;
match self {
Init(err) => write!(f, "error initialising local runtime: {}", err),
Polling(err) => write!(f, "error polling OS: {}", err),
RecvMsg(err) => write!(f, "error receiving message(s): {}", err),
ProcessInterrupted => write!(
f,
"received process signal, but no receivers for it: stopping runtime"
),
UserFunction(err) => write!(f, "error running user function: {}", err),
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
use Error::*;
match self {
Init(ref err) | Polling(ref err) | RecvMsg(ref err) => Some(err),
ProcessInterrupted => None,
UserFunction(ref err) => Some(err),
}
}
}
#[allow(variant_size_differences)] pub(crate) enum Control {
Started,
Signal(Signal),
Run(Box<dyn FnOnce(RuntimeRef) -> Result<(), String> + Send + 'static>),
}
impl fmt::Debug for Control {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Control::*;
f.write_str("Control::")?;
match self {
Started => f.write_str("Started"),
Signal(signal) => f.debug_tuple("Signal").field(&signal).finish(),
Run(..) => f.write_str("Run(..)"),
}
}
}