use std::cmp::min;
use std::future::Future;
use std::os::unix::io::AsRawFd;
use std::pin::Pin;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex, TryLockError};
use std::time::{Duration, Instant};
use std::{io, task};
use heph::actor::{self, NewActor};
use heph::actor_ref::ActorRef;
use heph::supervisor::Supervisor;
use heph_inbox as inbox;
use log::{debug, error, trace};
use mio::unix::SourceFd;
use mio::{event, Events, Interest, Poll, Registry, Token};
use crate::spawn::{ActorOptions, AddActorError, FutureOptions};
use crate::thread_waker::ThreadWaker;
use crate::{trace, ProcessId, ThreadSafe};
mod scheduler;
mod timers;
pub(crate) mod waker;
use scheduler::{ProcessData, Scheduler};
use timers::Timers;
use waker::WakerId;
pub(crate) struct RuntimeSetup {
poll: Poll,
registry: Registry,
}
impl RuntimeSetup {
pub(crate) fn complete(
self,
shared_id: WakerId,
worker_wakers: Box<[&'static ThreadWaker]>,
trace_log: Option<Arc<trace::SharedLog>>,
) -> RuntimeInternals {
debug_assert!(worker_wakers.len() >= 1);
RuntimeInternals {
shared_id,
worker_wakers,
wake_worker_idx: AtomicUsize::new(0),
poll: Mutex::new(self.poll),
registry: self.registry,
scheduler: Scheduler::new(),
timers: Timers::new(),
trace_log,
}
}
}
#[derive(Debug)]
pub(crate) struct RuntimeInternals {
shared_id: WakerId,
worker_wakers: Box<[&'static ThreadWaker]>,
wake_worker_idx: AtomicUsize,
poll: Mutex<Poll>,
registry: Registry,
scheduler: Scheduler,
timers: Timers,
trace_log: Option<Arc<trace::SharedLog>>,
}
#[derive(Debug)]
pub(crate) struct Metrics {
pub(crate) scheduler_ready: usize,
pub(crate) scheduler_inactive: usize,
pub(crate) timers_total: usize,
pub(crate) timers_next: Option<Duration>,
}
impl RuntimeInternals {
pub(crate) fn setup() -> io::Result<RuntimeSetup> {
let poll = Poll::new()?;
let registry = poll.registry().try_clone()?;
Ok(RuntimeSetup { poll, registry })
}
pub(crate) fn metrics(&self) -> Metrics {
Metrics {
scheduler_ready: self.scheduler.ready(),
scheduler_inactive: self.scheduler.inactive(),
timers_total: self.timers.len(),
timers_next: self.timers.next_timer(),
}
}
pub(crate) fn new_task_waker(&self, pid: ProcessId) -> task::Waker {
waker::new(self.shared_id, pid)
}
pub(crate) fn register_worker_poll(&self, registry: &Registry, token: Token) -> io::Result<()> {
use mio::event::Source;
let poll = self.poll.lock().unwrap();
SourceFd(&poll.as_raw_fd()).register(registry, token, Interest::READABLE)
}
pub(crate) fn try_poll(&self, events: &mut Events) -> io::Result<bool> {
match self.poll.try_lock() {
Ok(mut poll) => poll.poll(events, Some(Duration::ZERO)).map(|()| true),
Err(TryLockError::WouldBlock) => Ok(false),
Err(TryLockError::Poisoned(err)) => panic!("failed to lock shared poll: {}", err),
}
}
pub(crate) fn register<S>(
&self,
source: &mut S,
token: Token,
interest: Interest,
) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.registry.register(source, token, interest)
}
pub(crate) fn reregister<S>(
&self,
source: &mut S,
token: Token,
interest: Interest,
) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.registry.reregister(source, token, interest)
}
pub(super) fn add_deadline(&self, pid: ProcessId, deadline: Instant) {
self.timers.add(pid, deadline);
}
pub(super) fn remove_deadline(&self, pid: ProcessId, deadline: Instant) {
self.timers.remove(pid, deadline);
}
pub(super) fn change_deadline(&self, from: ProcessId, to: ProcessId, deadline: Instant) {
self.timers.change(from, deadline, to);
}
pub(crate) fn remove_next_deadline(&self, now: Instant) -> Option<ProcessId> {
self.timers.remove_next(now)
}
pub(crate) fn next_timeout(&self, now: Instant, current: Option<Duration>) -> Option<Duration> {
match self.timers.next() {
Some(deadline) => match deadline.checked_duration_since(now) {
None => Some(Duration::ZERO),
Some(timeout) => match current {
Some(current) if current < timeout => Some(current),
Some(..) | None => Some(timeout),
},
},
None => current,
}
}
#[allow(clippy::needless_pass_by_value)] pub(crate) fn spawn_setup<S, NA, ArgFn, E>(
self: &Arc<Self>,
supervisor: S,
mut new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
S: Supervisor<NA> + Send + Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Sync + Send + 'static,
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadSafe>) -> Result<NA::Argument, E>,
NA::Actor: Send + Sync + 'static,
NA::Message: Send,
{
let actor_entry = self.scheduler.add_actor();
let pid = actor_entry.pid();
let name = NA::name();
debug!(pid = pid.0, name = name; "spawning thread-safe actor");
let (manager, sender, receiver) = inbox::Manager::new_small_channel();
let actor_ref = ActorRef::local(sender);
let mut ctx = actor::Context::new(receiver, ThreadSafe::new(pid, self.clone()));
let arg = arg_fn(&mut ctx).map_err(AddActorError::ArgFn)?;
let actor = new_actor.new(ctx, arg).map_err(AddActorError::NewActor)?;
actor_entry.add(
options.priority(),
supervisor,
new_actor,
actor,
manager,
options.is_ready(),
);
Ok(actor_ref)
}
#[allow(clippy::needless_pass_by_value)]
pub(crate) fn spawn_future<Fut>(&self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + Sync + 'static,
{
self.scheduler.add_future(future, options.priority())
}
pub(crate) fn mark_ready(&self, pid: ProcessId) {
self.scheduler.mark_ready(pid)
}
pub(crate) fn wake_workers(&self, n: usize) {
trace!("waking {} worker thread(s)", n);
let n = min(n, self.worker_wakers.len());
let wake_worker_idx =
self.wake_worker_idx.fetch_add(n, Ordering::AcqRel) % self.worker_wakers.len();
let (wake_second, wake_first) = self.worker_wakers.split_at(wake_worker_idx);
let workers_to_wake = wake_first.iter().chain(wake_second.iter());
let mut wakes_left = n;
for worker in workers_to_wake {
match worker.wake() {
Ok(true) => {
wakes_left -= 1;
if wakes_left == 0 {
break;
}
}
Ok(false) => {}
Err(err) => error!("error waking worker: {}", err),
}
}
}
pub(crate) fn wake_all_workers(&self) {
trace!("waking all worker thread(s)");
for worker in self.worker_wakers.iter() {
drop(worker.wake());
}
}
pub(crate) fn has_process(&self) -> bool {
self.scheduler.has_process()
}
pub(crate) fn has_ready_process(&self) -> bool {
self.scheduler.has_ready_process()
}
pub(crate) fn remove_process(&self) -> Option<Pin<Box<ProcessData>>> {
self.scheduler.remove()
}
pub(crate) fn add_process(&self, process: Pin<Box<ProcessData>>) {
self.scheduler.add_process(process);
}
pub(crate) fn complete(&self, process: Pin<Box<ProcessData>>) {
self.scheduler.complete(process);
}
pub(crate) fn start_trace(&self) -> Option<trace::EventTiming> {
trace::start(&self.trace_log.as_deref())
}
pub(crate) fn finish_trace(
&self,
timing: Option<trace::EventTiming>,
pid: ProcessId,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
) {
trace::finish(
self.trace_log.as_deref(),
timing,
pid.0 as u64,
description,
attributes,
)
}
}