Expand description

Use async/await syntax with actix actors.

Example

This example shows how you could implement a generic “pipeline adapter” which allows turning any Sink/Stream pair (forming a request/response pipeline) into an actix Actor.

Responses are matched up with their requests according to the order in which they were sent (so the first response corresponds to the first request sent to the Sink, etc). This requires that our “send” operations are strictly ordered, and this is difficult to achieve in actix because async operations are normally allowed to interleave.

Furthermore, although the sends must be atomic, we also want to be able to have a large number of requests in-flight at any given time, so the receiving part of the message handler must not require exclusive access to the actor while it is waiting. As a result, abstractions like the AtomicResponse type are too simplistic to help.

To solve this problem, we use the critical_section function to allow specific parts of our message handler to be atomic.

use std::collections::VecDeque;
use std::pin::Pin;

use futures::{Sink, SinkExt, Stream, channel::oneshot};
use actix::prelude::*;

use actix_interop::{FutureInterop, with_ctx, critical_section};

// Define our actor
pub struct PipelineAdapter<Req, Res, Err> {
    sink: Option<Pin<Box<dyn Sink<Req, Error=Err>>>>,
    in_flight_reqs: VecDeque<oneshot::Sender<Result<Res, Err>>>,
}

// Implement a constructor
impl<Req, Res, Err> PipelineAdapter<Req, Res, Err>
where
    Req: 'static,
    Res: 'static,
    Err: 'static,
{
    pub fn new<Si, St>(sink: Si, stream: St) -> Addr<Self>
    where
        Si: Sink<Req, Error=Err> + 'static,
        St: Stream<Item=Res> + 'static,
    {
        // Convert to a boxed trait object
        let sink: Box<dyn Sink<Req, Error=Err>> = Box::new(sink);

        Self::create(|ctx| {
            ctx.add_stream(stream);
            Self {
                sink: Some(sink.into()),
                in_flight_reqs: VecDeque::new(),
            }
        })
    }
}

// Tell actix this is an actor using the default Context type
impl<Req, Res, Err> Actor for PipelineAdapter<Req, Res, Err>
where
    Req: 'static,
    Res: 'static,
    Err: 'static,
{
    type Context = Context<Self>;
}

// Transform actix messages into the pipelines request/response protocol
impl<Req, Res, Err> Handler<Req> for PipelineAdapter<Req, Res, Err>
where
    Req: Message<Result=Result<Res, Err>> + 'static,
    Res: 'static,
    Err: 'static,
{
    type Result = ResponseActFuture<Self, Result<Res, Err>>; // <- Message response type

    fn handle(&mut self, msg: Req, _ctx: &mut Context<Self>) -> Self::Result {
        async move {
            let (tx, rx) = oneshot::channel();

            // Perform sends in a critical section so they are strictly ordered
            critical_section::<Self, _>(async {
                // Take the sink from the actor state
                let mut sink = with_ctx(|actor: &mut Self, _| actor.sink.take())
                    .expect("Sink to be present");
                 
                // Send the request
                let res = sink.send(msg).await;

                // Put the sink back, and if the send was successful,
                // record the in-flight request.
                with_ctx(|actor: &mut Self, _| {
                    actor.sink = Some(sink);
                    match res {
                        Ok(()) => actor.in_flight_reqs.push_back(tx),
                        Err(e) => {
                            // Don't care if the receiver has gone away
                            let _ = tx.send(Err(e));
                        }
                    }
                });
            })
            .await;

            // Wait for the result concurrently, so many requests can
            // be pipelined at the same time.
            rx.await.expect("Sender should not be dropped")
        }
        .interop_actor_boxed(self)
    }
}

// Process responses
impl<Req, Res, Err> StreamHandler<Res> for PipelineAdapter<Req, Res, Err>
where
    Req: 'static,
    Res: 'static,
    Err: 'static,
{
    fn handle(&mut self, msg: Res, _ctx: &mut Context<Self>) {
        // When we receive a response, just pull the first in-flight
        // request and forward on the result.
        let _ = self.in_flight_reqs
            .pop_front()
            .expect("There to be an in-flight request")
            .send(Ok(msg));
    }
}

Structs

Future to ActorFuture adapter returned by interop_actor.

Stream to ActorStream adapter returned by interop_actor.

Traits

Extension trait implemented for all futures. Import this trait to bring the interop_actor and interop_actor_boxed methods into scope.

Extension trait implemented for all streams. Import this trait to bring the interop_actor and interop_actor_boxed methods into scope.

Functions

May be called in the same places as with_ctx to run a chunk of async code with exclusive access to an actor’s state: no other futures spawned to this actor will be polled during the critical section. Unlike with_ctx, calls to this function may be nested, although there is little point in doing so. Calling with_ctx from within a critical section is allowed (and expected).

May be called from within a future spawned onto an actor context to gain mutable access to the actor’s state and/or context. The future must have been wrapped using interop_actor or interop_actor_boxed.