use std::future::Future;
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};
use std::thread::{self, Thread};
use std::time::{Duration, Instant};
use heph_inbox::Receiver;
use heph_inbox::{self as inbox, ReceiverConnected};
use log::trace;
use crate::actor::{NoMessages, RecvError};
use crate::actor_ref::ActorRef;
use crate::supervisor::{SupervisorStrategy, SyncSupervisor};
pub trait SyncActor {
type Message;
type Argument;
type Error;
type RuntimeAccess;
fn run(
&self,
ctx: SyncContext<Self::Message, Self::RuntimeAccess>,
arg: Self::Argument,
) -> Result<(), Self::Error>;
}
macro_rules! impl_sync_actor {
(
$( ( $( $arg_name: ident : $arg: ident ),* ) ),*
$(,)*
) => {
$(
impl<M, E, RT, $( $arg ),*> SyncActor for fn(ctx: SyncContext<M, RT>, $( $arg_name: $arg ),*) -> Result<(), E> {
type Message = M;
type Argument = ($( $arg ),*);
type Error = E;
type RuntimeAccess = RT;
#[allow(non_snake_case)]
fn run(&self, ctx: SyncContext<Self::Message, Self::RuntimeAccess>, arg: Self::Argument) -> Result<(), Self::Error> {
let ($( $arg ),*) = arg;
(self)(ctx, $( $arg ),*)
}
}
impl<M, RT, $( $arg ),*> SyncActor for fn(ctx: SyncContext<M, RT>, $( $arg_name: $arg ),*) {
type Message = M;
type Argument = ($( $arg ),*);
type Error = !;
type RuntimeAccess = RT;
#[allow(non_snake_case)]
fn run(&self, ctx: SyncContext<Self::Message, Self::RuntimeAccess>, arg: Self::Argument) -> Result<(), Self::Error> {
let ($( $arg ),*) = arg;
Ok((self)(ctx, $( $arg ),*))
}
}
)*
};
}
impl_sync_actor!(());
impl<M, E, RT, Arg> SyncActor for fn(ctx: SyncContext<M, RT>, arg: Arg) -> Result<(), E> {
type Message = M;
type Argument = Arg;
type Error = E;
type RuntimeAccess = RT;
fn run(
&self,
ctx: SyncContext<Self::Message, Self::RuntimeAccess>,
arg: Self::Argument,
) -> Result<(), Self::Error> {
(self)(ctx, arg)
}
}
impl<M, RT, Arg> SyncActor for fn(ctx: SyncContext<M, RT>, arg: Arg) {
type Message = M;
type Argument = Arg;
type Error = !;
type RuntimeAccess = RT;
fn run(
&self,
ctx: SyncContext<Self::Message, Self::RuntimeAccess>,
arg: Self::Argument,
) -> Result<(), Self::Error> {
#[allow(clippy::unit_arg)]
Ok((self)(ctx, arg))
}
}
impl_sync_actor!(
(arg1: Arg1, arg2: Arg2),
(arg1: Arg1, arg2: Arg2, arg3: Arg3),
(arg1: Arg1, arg2: Arg2, arg3: Arg3, arg4: Arg4),
(arg1: Arg1, arg2: Arg2, arg3: Arg3, arg4: Arg4, arg5: Arg5),
);
#[derive(Debug)]
pub struct SyncContext<M, RT> {
inbox: Receiver<M>,
future_waker: Option<Arc<SyncWaker>>,
rt: RT,
}
impl<M, RT> SyncContext<M, RT> {
#[doc(hidden)] pub const fn new(inbox: Receiver<M>, rt: RT) -> SyncContext<M, RT> {
SyncContext {
inbox,
future_waker: None,
rt,
}
}
pub fn try_receive_next(&mut self) -> Result<M, RecvError> {
self.inbox.try_recv().map_err(RecvError::from)
}
pub fn receive_next(&mut self) -> Result<M, NoMessages> {
let waker = self.future_waker();
waker.block_on(self.inbox.recv()).ok_or(NoMessages)
}
pub fn block_on<Fut>(&mut self, fut: Fut) -> Fut::Output
where
Fut: Future,
{
let waker = self.future_waker();
waker.block_on(fut)
}
pub fn runtime(&mut self) -> &mut RT {
&mut self.rt
}
pub const fn runtime_ref(&self) -> &RT {
&self.rt
}
fn future_waker(&mut self) -> Arc<SyncWaker> {
if let Some(waker) = self.future_waker.as_ref() {
waker.clone()
} else {
let waker = SyncWaker::new();
self.future_waker = Some(waker.clone());
waker
}
}
}
#[derive(Debug)]
#[doc(hidden)] pub struct SyncWaker {
handle: Thread,
}
impl task::Wake for SyncWaker {
fn wake(self: Arc<Self>) {
self.handle.unpark();
}
fn wake_by_ref(self: &Arc<Self>) {
self.handle.unpark();
}
}
impl SyncWaker {
#[doc(hidden)] pub fn new() -> Arc<SyncWaker> {
Arc::new(SyncWaker {
handle: thread::current(),
})
}
#[doc(hidden)] pub fn block_on<Fut>(self: Arc<SyncWaker>, future: Fut) -> Fut::Output
where
Fut: Future,
{
let mut future = future;
let mut future = unsafe { Pin::new_unchecked(&mut future) };
let task_waker = task::Waker::from(self);
let mut task_ctx = task::Context::from_waker(&task_waker);
loop {
match Future::poll(future.as_mut(), &mut task_ctx) {
Poll::Ready(res) => return res,
Poll::Pending => thread::park(),
}
}
}
#[doc(hidden)] pub fn block_for<Fut>(
self: Arc<SyncWaker>,
future: Fut,
timeout: Duration,
) -> Option<Fut::Output>
where
Fut: Future,
{
let mut future = future;
let mut future = unsafe { Pin::new_unchecked(&mut future) };
let task_waker = task::Waker::from(self);
let mut task_ctx = task::Context::from_waker(&task_waker);
let start = Instant::now();
loop {
match Future::poll(future.as_mut(), &mut task_ctx) {
Poll::Ready(res) => return Some(res),
Poll::Pending => {
let elapsed = start.elapsed();
if elapsed > timeout {
return None;
}
thread::park_timeout(timeout - elapsed)
}
}
}
}
}
pub fn spawn_sync_actor<S, A, RT>(
supervisor: S,
actor: A,
arg: A::Argument,
rt: RT,
) -> io::Result<(thread::JoinHandle<()>, ActorRef<A::Message>)>
where
S: SyncSupervisor<A> + Send + 'static,
A: SyncActor<RuntimeAccess = RT> + Send + 'static,
A::Message: Send + 'static,
A::Argument: Send + 'static,
RT: Clone + Send + 'static,
{
let (inbox, sender, ..) = heph_inbox::Manager::new_small_channel();
let actor_ref = ActorRef::local(sender);
let sync_worker = SyncWorker {
supervisor,
actor,
inbox,
};
thread::Builder::new()
.name("Sync actor".to_owned())
.spawn(move || sync_worker.run(arg, rt))
.map(|handle| (handle, actor_ref))
}
#[derive(Debug)]
struct SyncWorker<S, A: SyncActor> {
supervisor: S,
actor: A,
inbox: inbox::Manager<A::Message>,
}
impl<S, A> SyncWorker<S, A>
where
S: SyncSupervisor<A>,
A: SyncActor,
A::RuntimeAccess: Clone,
{
fn run(mut self, mut arg: A::Argument, rt: A::RuntimeAccess) {
let thread = thread::current();
let name = thread.name().unwrap();
trace!(name = name; "running synchronous actor");
loop {
let receiver = self.inbox.new_receiver().unwrap_or_else(inbox_failure);
let ctx = SyncContext::new(receiver, rt.clone());
match self.actor.run(ctx, arg) {
Ok(()) => break,
Err(err) => match self.supervisor.decide(err) {
SupervisorStrategy::Restart(new_arg) => {
trace!(name = name; "restarting synchronous actor");
arg = new_arg;
}
SupervisorStrategy::Stop => break,
},
}
}
trace!(name = name; "stopping synchronous actor");
}
}
#[cold]
fn inbox_failure<T>(_: ReceiverConnected) -> T {
panic!("failed to create new receiver for synchronous actor's inbox. Was the `SyncContext` leaked?");
}