#![feature(
async_iterator,
const_option,
doc_auto_cfg,
doc_cfg_hide,
drain_filter,
io_slice_advance,
is_sorted,
maybe_uninit_array_assume_init,
maybe_uninit_uninit_array,
never_type,
new_uninit,
stmt_expr_attributes
)]
#![cfg_attr(any(test, feature = "test"), feature(once_cell))]
#![warn(
anonymous_parameters,
bare_trait_objects,
missing_debug_implementations,
missing_docs,
rust_2018_idioms,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
unused_results,
variant_size_differences
)]
#![cfg_attr(test, deny(warnings))]
#![doc(test(attr(deny(warnings))))]
#![doc(cfg_hide(any(test, feature = "test")))]
#[cfg(not(any(target_os = "linux", target_os = "freebsd", target_os = "macos")))]
compile_error!("Heph currently only supports Linux, FreeBSD and macOS.");
#[cfg(not(target_pointer_width = "64"))]
compile_error!("Heph currently only supports 64 bit architectures.");
macro_rules! try_io {
($op: expr) => {
loop {
match $op {
Ok(ok) => break Poll::Ready(Ok(ok)),
Err(ref err) if err.kind() == io::ErrorKind::WouldBlock => break Poll::Pending,
Err(ref err) if err.kind() == io::ErrorKind::Interrupted => continue,
Err(err) => break Poll::Ready(Err(err)),
}
}
};
}
use std::convert::TryInto;
use std::future::Future;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::{io, task};
use ::log::{as_debug, debug, warn};
use heph::actor::{self, NewActor, SyncActor};
use heph::actor_ref::{ActorGroup, ActorRef};
use heph::supervisor::{Supervisor, SyncSupervisor};
use heph_inbox as inbox;
use mio::{event, Interest, Token};
pub mod access;
pub mod bytes;
pub(crate) mod channel;
mod coordinator;
mod error;
pub(crate) mod local;
pub mod log;
pub mod net;
pub mod pipe;
mod process;
mod setup;
pub(crate) mod shared;
mod signal;
pub mod spawn;
pub(crate) mod sync_worker;
#[cfg(target_os = "linux")]
pub mod systemd;
#[cfg(any(test, feature = "test"))]
pub mod test;
pub(crate) mod thread_waker;
pub mod timer;
pub mod trace;
#[doc(hidden)]
pub mod util;
pub(crate) mod worker;
pub(crate) use access::PrivateAccess;
pub(crate) use process::ProcessId;
#[doc(no_inline)]
pub use access::{Access, Sync, ThreadLocal, ThreadSafe};
pub use error::Error;
pub use setup::Setup;
pub use signal::Signal;
use coordinator::Coordinator;
use local::waker::MAX_THREADS;
use spawn::{ActorOptions, AddActorError, FutureOptions, PrivateSpawn, Spawn, SyncActorOptions};
use sync_worker::SyncWorker;
pub(crate) const SYNC_WORKER_ID_START: usize = 10000;
pub(crate) const SYNC_WORKER_ID_END: usize = SYNC_WORKER_ID_START + 10000;
const fn ptr_as_usize<T>(ptr: *const T) -> usize {
union Pointer<T> {
ptr: *const T,
int: usize,
}
let ptr = Pointer { ptr };
unsafe { ptr.int }
}
#[test]
#[allow(clippy::assertions_on_constants)] fn sync_worker_id() {
assert!(SYNC_WORKER_ID_START > MAX_THREADS + 1); }
#[test]
#[allow(clippy::assertions_on_constants)] fn max_threads() {
assert!(MAX_THREADS < u32::MAX as usize);
}
#[derive(Debug)]
pub struct Runtime {
coordinator: Coordinator,
workers: Vec<worker::Handle>,
sync_actors: Vec<SyncWorker>,
signals: ActorGroup<Signal>,
trace_log: Option<trace::CoordinatorLog>,
}
impl Runtime {
pub const fn setup() -> Setup {
Setup::new()
}
#[allow(clippy::new_without_default)]
pub fn new() -> Result<Runtime, Error> {
Setup::new().build()
}
pub fn try_spawn<S, NA>(
&mut self,
supervisor: S,
new_actor: NA,
arg: NA::Argument,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, NA::Error>
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + std::marker::Sync + Send + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
Spawn::try_spawn(self, supervisor, new_actor, arg, options)
}
pub fn spawn<S, NA>(
&mut self,
supervisor: S,
new_actor: NA,
arg: NA::Argument,
options: ActorOptions,
) -> ActorRef<NA::Message>
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<Error = !, RuntimeAccess = ThreadSafe> + std::marker::Sync + Send + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
Spawn::spawn(self, supervisor, new_actor, arg, options)
}
pub fn spawn_sync_actor<S, A>(
&mut self,
supervisor: S,
actor: A,
arg: A::Argument,
options: SyncActorOptions,
) -> Result<ActorRef<A::Message>, Error>
where
S: SyncSupervisor<A> + Send + 'static,
A: SyncActor<RuntimeAccess = Sync> + Send + 'static,
A::Message: Send + 'static,
A::Argument: Send + 'static,
{
let id = SYNC_WORKER_ID_START + self.sync_actors.len();
if let Some(name) = options.name() {
debug!(sync_worker_id = id, name = name; "spawning synchronous actor");
} else {
debug!(sync_worker_id = id; "spawning synchronous actor");
}
#[allow(clippy::cast_possible_truncation)]
let trace_log = self
.trace_log
.as_ref()
.map(|trace_log| trace_log.new_stream(id as u32));
let shared = self.coordinator.shared_internals().clone();
SyncWorker::start(id, supervisor, actor, arg, options, shared, trace_log)
.map(|(worker, actor_ref)| {
self.sync_actors.push(worker);
actor_ref
})
.map_err(Error::start_sync_actor)
}
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
{
self.coordinator
.shared_internals()
.spawn_future(future, options)
}
pub fn run_on_workers<F, E>(&mut self, f: F) -> Result<(), Error>
where
F: FnOnce(RuntimeRef) -> Result<(), E> + Send + Clone + 'static,
E: ToString,
{
debug!("sending user function to workers");
for worker in &mut self.workers {
let f = f.clone();
let f = Box::new(move |runtime_ref| f(runtime_ref).map_err(|err| err.to_string()));
worker
.send_function(f)
.map_err(|err| Error::coordinator(coordinator::Error::SendingFunc(err)))?;
}
Ok(())
}
pub fn receive_signals(&mut self, actor_ref: ActorRef<Signal>) {
self.signals.add(actor_ref);
}
pub fn start(self) -> Result<(), Error> {
debug!(
workers = self.workers.len(), sync_actors = self.sync_actors.len();
"starting Heph runtime"
);
self.coordinator
.run(self.workers, self.sync_actors, self.signals, self.trace_log)
}
}
impl<S, NA> Spawn<S, NA, ThreadSafe> for Runtime
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadSafe> for Runtime
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadSafe>) -> Result<NA::Argument, E>,
{
self.coordinator
.shared_internals()
.spawn_setup(supervisor, new_actor, arg_fn, options)
}
}
#[derive(Clone, Debug)]
pub struct RuntimeRef {
internals: Rc<local::RuntimeInternals>,
}
impl RuntimeRef {
pub fn try_spawn_local<S, NA>(
&mut self,
supervisor: S,
new_actor: NA,
arg: NA::Argument,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, NA::Error>
where
S: Supervisor<NA> + 'static,
NA: NewActor<RuntimeAccess = ThreadLocal> + 'static,
NA::Actor: 'static,
{
Spawn::try_spawn(self, supervisor, new_actor, arg, options)
}
pub fn spawn_local<S, NA>(
&mut self,
supervisor: S,
new_actor: NA,
arg: NA::Argument,
options: ActorOptions,
) -> ActorRef<NA::Message>
where
S: Supervisor<NA> + 'static,
NA: NewActor<Error = !, RuntimeAccess = ThreadLocal> + 'static,
NA::Actor: 'static,
{
Spawn::spawn(self, supervisor, new_actor, arg, options)
}
pub fn try_spawn<S, NA>(
&mut self,
supervisor: S,
new_actor: NA,
arg: NA::Argument,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, NA::Error>
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + std::marker::Sync + Send + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
Spawn::try_spawn(self, supervisor, new_actor, arg, options)
}
pub fn spawn<S, NA>(
&mut self,
supervisor: S,
new_actor: NA,
arg: NA::Argument,
options: ActorOptions,
) -> ActorRef<NA::Message>
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<Error = !, RuntimeAccess = ThreadSafe> + std::marker::Sync + Send + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
Spawn::spawn(self, supervisor, new_actor, arg, options)
}
#[allow(clippy::needless_pass_by_value)]
pub fn spawn_local_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + 'static,
{
self.internals
.scheduler
.borrow_mut()
.add_future(future, options.priority())
}
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
{
self.internals.shared.spawn_future(future, options)
}
pub fn receive_signals(&mut self, actor_ref: ActorRef<Signal>) {
self.internals
.signal_receivers
.borrow_mut()
.add_unique(actor_ref)
}
pub(crate) fn register<S>(
&mut self,
source: &mut S,
token: Token,
interest: Interest,
) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.internals
.poll
.borrow()
.registry()
.register(source, token, interest)
}
pub(crate) fn reregister<S>(
&mut self,
source: &mut S,
token: Token,
interest: Interest,
) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.internals
.poll
.borrow()
.registry()
.reregister(source, token, interest)
}
pub(crate) fn new_local_task_waker(&self, pid: ProcessId) -> task::Waker {
local::waker::new(self.internals.waker_id, pid)
}
pub(crate) fn new_shared_task_waker(&self, pid: ProcessId) -> task::Waker {
self.internals.shared.new_task_waker(pid)
}
fn mark_ready_local(&mut self, pid: ProcessId) {
self.internals.scheduler.borrow_mut().mark_ready(pid);
}
fn mark_ready_shared(&mut self, pid: ProcessId) {
self.internals.shared.mark_ready(pid);
}
fn add_deadline(&mut self, pid: ProcessId, deadline: Instant) {
::log::trace!(pid = pid.0, deadline = as_debug!(deadline); "adding deadline");
self.internals.timers.borrow_mut().add(pid, deadline);
}
fn remove_deadline(&mut self, pid: ProcessId, deadline: Instant) {
::log::trace!(pid = pid.0, deadline = as_debug!(deadline); "removing deadline");
self.internals.timers.borrow_mut().remove(pid, deadline);
}
fn change_deadline(&mut self, from: ProcessId, to: ProcessId, deadline: Instant) {
::log::trace!(old_pid = from.0, new_pid = to.0, deadline = as_debug!(deadline); "changing deadline");
self.internals
.timers
.borrow_mut()
.change(from, deadline, to);
}
pub(crate) fn clone_shared(&self) -> Arc<shared::RuntimeInternals> {
self.internals.shared.clone()
}
pub(crate) fn cpu(&self) -> Option<usize> {
self.internals.cpu
}
fn start_trace(&self) -> Option<trace::EventTiming> {
trace::start(&*self.internals.trace_log.borrow())
}
fn finish_trace(
&mut self,
timing: Option<trace::EventTiming>,
pid: ProcessId,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
) {
trace::finish(
(&mut *self.internals.trace_log.borrow_mut()).as_mut(),
timing,
pid.0 as u64,
description,
attributes,
)
}
}
impl<S, NA> Spawn<S, NA, ThreadLocal> for RuntimeRef
where
S: Supervisor<NA> + 'static,
NA: NewActor<RuntimeAccess = ThreadLocal> + 'static,
NA::Actor: 'static,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadLocal> for RuntimeRef
where
S: Supervisor<NA> + 'static,
NA: NewActor<RuntimeAccess = ThreadLocal> + 'static,
NA::Actor: 'static,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
mut new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadLocal>) -> Result<NA::Argument, E>,
{
let mut scheduler = self.internals.scheduler.borrow_mut();
let actor_entry = scheduler.add_actor();
let pid = actor_entry.pid();
let name = NA::name();
debug!(pid = pid.0, name = name; "spawning thread-local actor");
let (manager, sender, receiver) = inbox::Manager::new_small_channel();
let actor_ref = ActorRef::local(sender);
let mut ctx = actor::Context::new(receiver, ThreadLocal::new(pid, self.clone()));
let arg = arg_fn(&mut ctx).map_err(AddActorError::ArgFn)?;
let actor = new_actor.new(ctx, arg).map_err(AddActorError::NewActor)?;
actor_entry.add(
options.priority(),
supervisor,
new_actor,
actor,
manager,
options.is_ready(),
);
Ok(actor_ref)
}
}
impl<S, NA> Spawn<S, NA, ThreadSafe> for RuntimeRef
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadSafe> for RuntimeRef
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadSafe>) -> Result<NA::Argument, E>,
{
self.internals
.shared
.spawn_setup(supervisor, new_actor, arg_fn, options)
}
}
fn cpu_usage(clock_id: libc::clockid_t) -> Duration {
let mut duration = libc::timespec {
tv_sec: 0,
tv_nsec: 0,
};
if unsafe { libc::clock_gettime(clock_id, &mut duration) } == -1 {
let err = io::Error::last_os_error();
warn!("error getting CPU time: {}, using zero", err);
Duration::ZERO
} else {
Duration::new(
duration.tv_sec.try_into().unwrap_or(0),
duration.tv_nsec.try_into().unwrap_or(u32::MAX),
)
}
}
pub trait Bound<RT> {
type Error;
fn bind_to<M>(&mut self, ctx: &mut actor::Context<M, RT>) -> Result<(), Self::Error>;
}