#![deny(missing_docs, warnings)]
use std::any::Any;
use std::future::Future;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix::{Actor, ActorFuture, ActorStream, AsyncContext, ResponseActFuture};
use futures::Stream;
use pin_project::pin_project;
use scoped_tls_hkt::scoped_thread_local;
use self::local_handle::local_handle;
mod local_handle;
scoped_thread_local!(static mut CURRENT_ACTOR_CTX: for<'a> (&'a mut dyn Any, &'a mut dyn Any));
fn set_actor_context<A, F, R>(actor: &mut A, ctx: &mut A::Context, f: F) -> R
where
A: Actor,
F: FnOnce() -> R,
{
CURRENT_ACTOR_CTX.set((actor, ctx), f)
}
pub fn with_ctx<A, F, R>(f: F) -> R
where
A: Actor + 'static,
F: FnOnce(&mut A, &mut A::Context) -> R,
{
CURRENT_ACTOR_CTX.with(|(actor, ctx)| {
let actor = actor
.downcast_mut()
.expect("Future was spawned onto the wrong actor");
let ctx = ctx
.downcast_mut()
.expect("Future was spawned onto the wrong actor");
f(actor, ctx)
})
}
pub fn critical_section<A: Actor, F: Future>(f: F) -> impl Future<Output = F::Output>
where
A::Context: AsyncContext<A>,
F: 'static,
{
let (f, handle) = local_handle(f);
with_ctx(|actor: &mut A, ctx: &mut A::Context| ctx.wait(f.interop_actor(actor)));
handle
}
#[pin_project]
#[derive(Debug)]
pub struct FutureInteropWrap<A: Actor, F> {
#[pin]
inner: F,
phantom: PhantomData<fn(&mut A, &mut A::Context)>,
}
impl<A: Actor, F: Future> FutureInteropWrap<A, F> {
fn new(inner: F) -> Self {
Self {
inner,
phantom: PhantomData,
}
}
}
impl<A: Actor, F: Future> ActorFuture<A> for FutureInteropWrap<A, F> {
type Output = F::Output;
fn poll(
self: Pin<&mut Self>,
actor: &mut A,
ctx: &mut A::Context,
task: &mut Context,
) -> Poll<Self::Output> {
set_actor_context(actor, ctx, || self.project().inner.poll(task))
}
}
pub trait FutureInterop<A: Actor>: Future + Sized {
fn interop_actor(self, actor: &A) -> FutureInteropWrap<A, Self>;
fn interop_actor_boxed(self, actor: &A) -> ResponseActFuture<A, Self::Output>
where
Self: 'static,
{
Box::pin(self.interop_actor(actor))
}
}
impl<A: Actor, F: Future> FutureInterop<A> for F {
fn interop_actor(self, _actor: &A) -> FutureInteropWrap<A, Self> {
FutureInteropWrap::new(self)
}
}
#[pin_project]
#[derive(Debug)]
pub struct StreamInteropWrap<A: Actor, S> {
#[pin]
inner: S,
phantom: PhantomData<fn(&mut A, &mut A::Context)>,
}
impl<A: Actor, S: Stream> StreamInteropWrap<A, S> {
fn new(inner: S) -> Self {
Self {
inner,
phantom: PhantomData,
}
}
}
impl<A: Actor, S: Stream> ActorStream<A> for StreamInteropWrap<A, S> {
type Item = S::Item;
fn poll_next(
self: Pin<&mut Self>,
actor: &mut A,
ctx: &mut A::Context,
task: &mut Context,
) -> Poll<Option<Self::Item>> {
set_actor_context(actor, ctx, || self.project().inner.poll_next(task))
}
}
pub trait StreamInterop<A: Actor>: Stream + Sized {
fn interop_actor(self, actor: &A) -> StreamInteropWrap<A, Self>;
fn interop_actor_boxed(self, actor: &A) -> Box<dyn ActorStream<A, Item = Self::Item>>
where
Self: 'static,
{
Box::new(self.interop_actor(actor))
}
}
impl<A: Actor, S: Stream> StreamInterop<A> for S {
fn interop_actor(self, _actor: &A) -> StreamInteropWrap<A, Self> {
StreamInteropWrap::new(self)
}
}
#[cfg(test)]
mod tests {
use super::{critical_section, with_ctx, FutureInterop};
use actix::prelude::*;
#[derive(Message)]
#[rtype(result = "Result<i32, ()>")] struct Sum(i32);
struct Summator {
field: i32,
}
impl Actor for Summator {
type Context = Context<Self>;
}
impl Handler<Sum> for Summator {
type Result = ResponseActFuture<Self, Result<i32, ()>>;
fn handle(&mut self, msg: Sum, _ctx: &mut Context<Self>) -> Self::Result {
async move {
let accum = critical_section::<Self, _>(async {
with_ctx(move |a: &mut Self, _| {
a.field += msg.0;
a.field
})
})
.await;
Ok(accum)
}
.interop_actor_boxed(self)
}
}
impl StreamHandler<i32> for Summator {
fn handle(&mut self, msg: i32, _ctx: &mut Context<Self>) {
self.field += msg;
}
fn finished(&mut self, _ctx: &mut Context<Self>) {
assert_eq!(self.field, 10);
System::current().stop();
}
}
#[actix::test]
async fn can_run_future() -> Result<(), Box<dyn std::error::Error>> {
let addr = Summator { field: 0 }.start();
addr.send(Sum(3)).await.unwrap().unwrap();
let res = addr.send(Sum(4)).await?;
assert_eq!(res, Ok(7));
Ok(())
}
#[actix::test]
async fn can_run_stream() {
Summator::create(|ctx| {
ctx.add_stream(futures::stream::iter(1..5));
Summator { field: 0 }
});
}
mod pipeline {}
}