use std::future::Future;
use std::mem::replace;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use std::time::Instant;
use std::{fmt, io, task};
use heph::actor::{self, NewActor, SyncContext};
use heph::actor_ref::ActorRef;
use heph::supervisor::Supervisor;
use mio::{event, Interest};
use crate::process::ProcessId;
use crate::spawn::{ActorOptions, AddActorError, FutureOptions, PrivateSpawn, Spawn};
use crate::trace::{self, Trace};
use crate::{shared, RuntimeRef};
pub trait Access: PrivateAccess {}
mod private {
use std::time::Instant;
use std::{io, task};
use mio::{event, Interest};
use crate::process::ProcessId;
use crate::{trace, RuntimeRef};
pub trait PrivateAccess {
fn pid(&self) -> ProcessId;
fn change_pid(&mut self, new_pid: ProcessId) -> ProcessId;
fn register<S>(&mut self, source: &mut S, interest: Interest) -> io::Result<()>
where
S: event::Source + ?Sized;
fn reregister<S>(&mut self, source: &mut S, interest: Interest) -> io::Result<()>
where
S: event::Source + ?Sized;
fn add_deadline(&mut self, deadline: Instant);
fn remove_deadline(&mut self, deadline: Instant);
fn change_deadline(&mut self, old_pid: ProcessId, deadline: Instant);
fn new_task_waker(runtime_ref: &mut RuntimeRef, pid: ProcessId) -> task::Waker;
fn cpu(&self) -> Option<usize>;
fn start_trace(&self) -> Option<trace::EventTiming>;
fn finish_trace(
&mut self,
timing: Option<trace::EventTiming>,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
);
}
}
pub(crate) use private::PrivateAccess;
#[derive(Clone)]
pub struct ThreadLocal {
pid: ProcessId,
rt: RuntimeRef,
}
impl ThreadLocal {
pub(crate) const fn new(pid: ProcessId, rt: RuntimeRef) -> ThreadLocal {
ThreadLocal { pid, rt }
}
}
impl Deref for ThreadLocal {
type Target = RuntimeRef;
fn deref(&self) -> &Self::Target {
&self.rt
}
}
impl DerefMut for ThreadLocal {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.rt
}
}
impl Access for ThreadLocal {}
impl PrivateAccess for ThreadLocal {
fn pid(&self) -> ProcessId {
self.pid
}
fn change_pid(&mut self, new_pid: ProcessId) -> ProcessId {
replace(&mut self.pid, new_pid)
}
fn register<S>(&mut self, source: &mut S, interest: Interest) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.rt.register(source, self.pid.into(), interest)
}
fn reregister<S>(&mut self, source: &mut S, interest: Interest) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.rt.reregister(source, self.pid.into(), interest)
}
fn add_deadline(&mut self, deadline: Instant) {
self.rt.add_deadline(self.pid, deadline)
}
fn remove_deadline(&mut self, deadline: Instant) {
self.rt.remove_deadline(self.pid, deadline);
}
fn change_deadline(&mut self, old_pid: ProcessId, deadline: Instant) {
self.rt.change_deadline(old_pid, self.pid, deadline);
}
fn new_task_waker(runtime_ref: &mut RuntimeRef, pid: ProcessId) -> task::Waker {
runtime_ref.new_local_task_waker(pid)
}
fn cpu(&self) -> Option<usize> {
self.rt.cpu()
}
fn start_trace(&self) -> Option<trace::EventTiming> {
self.rt.start_trace()
}
fn finish_trace(
&mut self,
timing: Option<trace::EventTiming>,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
) {
self.rt
.finish_trace(timing, self.pid, description, attributes)
}
}
impl<S, NA> Spawn<S, NA, ThreadLocal> for ThreadLocal
where
S: Supervisor<NA> + 'static,
NA: NewActor<RuntimeAccess = ThreadLocal> + 'static,
NA::Actor: 'static,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadLocal> for ThreadLocal
where
S: Supervisor<NA> + 'static,
NA: NewActor<RuntimeAccess = ThreadLocal> + 'static,
NA::Actor: 'static,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadLocal>) -> Result<NA::Argument, E>,
{
self.rt
.try_spawn_setup(supervisor, new_actor, arg_fn, options)
}
}
impl<S, NA> Spawn<S, NA, ThreadSafe> for ThreadLocal
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadSafe> for ThreadLocal
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadSafe>) -> Result<NA::Argument, E>,
{
self.rt
.try_spawn_setup(supervisor, new_actor, arg_fn, options)
}
}
impl fmt::Debug for ThreadLocal {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ThreadLocal")
}
}
#[derive(Clone)]
pub struct ThreadSafe {
pid: ProcessId,
rt: Arc<shared::RuntimeInternals>,
}
impl ThreadSafe {
pub(crate) const fn new(pid: ProcessId, rt: Arc<shared::RuntimeInternals>) -> ThreadSafe {
ThreadSafe { pid, rt }
}
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
{
self.rt.spawn_future(future, options)
}
}
impl Access for ThreadSafe {}
impl PrivateAccess for ThreadSafe {
fn pid(&self) -> ProcessId {
self.pid
}
fn change_pid(&mut self, new_pid: ProcessId) -> ProcessId {
replace(&mut self.pid, new_pid)
}
fn register<S>(&mut self, source: &mut S, interest: Interest) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.rt.register(source, self.pid.into(), interest)
}
fn reregister<S>(&mut self, source: &mut S, interest: Interest) -> io::Result<()>
where
S: event::Source + ?Sized,
{
self.rt.reregister(source, self.pid.into(), interest)
}
fn add_deadline(&mut self, deadline: Instant) {
self.rt.add_deadline(self.pid, deadline)
}
fn remove_deadline(&mut self, deadline: Instant) {
self.rt.remove_deadline(self.pid, deadline);
}
fn change_deadline(&mut self, old_pid: ProcessId, deadline: Instant) {
self.rt.change_deadline(old_pid, self.pid, deadline);
}
fn new_task_waker(runtime_ref: &mut RuntimeRef, pid: ProcessId) -> task::Waker {
runtime_ref.new_shared_task_waker(pid)
}
fn cpu(&self) -> Option<usize> {
None
}
fn start_trace(&self) -> Option<trace::EventTiming> {
self.rt.start_trace()
}
fn finish_trace(
&mut self,
timing: Option<trace::EventTiming>,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
) {
self.rt
.finish_trace(timing, self.pid, description, attributes)
}
}
impl<S, NA> Spawn<S, NA, ThreadSafe> for ThreadSafe
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadSafe> for ThreadSafe
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadSafe>) -> Result<NA::Argument, E>,
{
self.rt.spawn_setup(supervisor, new_actor, arg_fn, options)
}
}
impl fmt::Debug for ThreadSafe {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ThreadSafe")
}
}
impl<M, RT> Trace for actor::Context<M, RT>
where
RT: Access,
{
fn start_trace(&self) -> Option<trace::EventTiming> {
self.runtime_ref().start_trace()
}
fn finish_trace(
&mut self,
timing: Option<trace::EventTiming>,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
) {
self.runtime().finish_trace(timing, description, attributes)
}
}
#[derive(Clone)]
pub struct Sync {
rt: Arc<shared::RuntimeInternals>,
trace_log: Option<trace::Log>,
}
impl Sync {
pub(crate) const fn new(
rt: Arc<shared::RuntimeInternals>,
trace_log: Option<trace::Log>,
) -> Sync {
Sync { rt, trace_log }
}
pub fn spawn_future<Fut>(&mut self, future: Fut, options: FutureOptions)
where
Fut: Future<Output = ()> + Send + std::marker::Sync + 'static,
{
self.rt.spawn_future(future, options)
}
}
impl<S, NA> Spawn<S, NA, ThreadSafe> for Sync
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
}
impl<S, NA> PrivateSpawn<S, NA, ThreadSafe> for Sync
where
S: Supervisor<NA> + Send + std::marker::Sync + 'static,
NA: NewActor<RuntimeAccess = ThreadSafe> + Send + std::marker::Sync + 'static,
NA::Actor: Send + std::marker::Sync + 'static,
NA::Message: Send,
{
fn try_spawn_setup<ArgFn, E>(
&mut self,
supervisor: S,
new_actor: NA,
arg_fn: ArgFn,
options: ActorOptions,
) -> Result<ActorRef<NA::Message>, AddActorError<NA::Error, E>>
where
ArgFn: FnOnce(&mut actor::Context<NA::Message, ThreadSafe>) -> Result<NA::Argument, E>,
{
self.rt.spawn_setup(supervisor, new_actor, arg_fn, options)
}
}
impl fmt::Debug for Sync {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Sync")
}
}
impl<M> Trace for SyncContext<M, Sync> {
fn start_trace(&self) -> Option<trace::EventTiming> {
trace::start(&self.runtime_ref().trace_log)
}
fn finish_trace(
&mut self,
timing: Option<trace::EventTiming>,
description: &str,
attributes: &[(&str, &dyn trace::AttributeValue)],
) {
trace::finish_rt(
self.runtime().trace_log.as_mut(),
timing,
description,
attributes,
)
}
}