actix_interop/
lib.rs

1// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4// option. This file may not be copied, modified, or distributed
5// except according to those terms.
6
7//! Use async/await syntax with actix actors.
8//!
9//! # Example
10//!
11//! This example shows how you could implement a generic "pipeline adapter"
12//! which allows turning any `Sink`/`Stream` pair (forming a request/response
13//! pipeline) into an actix Actor.
14//!
15//! Responses are matched up with their requests according to the order
16//! in which they were sent (so the first response corresponds to the
17//! first request sent to the `Sink`, etc). This requires that our "send"
18//! operations are strictly ordered, and this is difficult to achieve
19//! in actix because async operations are normally allowed to interleave.
20//!
21//! Furthermore, although the sends must be atomic, we also want to be
22//! able to have a large number of requests in-flight at any given time,
23//! so the receiving part of the message handler must not require exclusive
24//! access to the actor while it is waiting. As a result, abstractions
25//! like the `AtomicResponse` type are too simplistic to help.
26//!
27//! To solve this problem, we use the [`critical_section`](critical_section)
28//! function to allow specific parts of our message handler to be  atomic.
29//!
30//! ```rust
31//! use std::collections::VecDeque;
32//! use std::pin::Pin;
33//!
34//! use futures::{Sink, SinkExt, Stream, channel::oneshot};
35//! use actix::prelude::*;
36//!
37//! use actix_interop::{FutureInterop, with_ctx, critical_section};
38//!
39//! // Define our actor
40//! pub struct PipelineAdapter<Req, Res, Err> {
41//!     sink: Option<Pin<Box<dyn Sink<Req, Error=Err>>>>,
42//!     in_flight_reqs: VecDeque<oneshot::Sender<Result<Res, Err>>>,
43//! }
44//!
45//! // Implement a constructor
46//! impl<Req, Res, Err> PipelineAdapter<Req, Res, Err>
47//! where
48//!     Req: 'static,
49//!     Res: 'static,
50//!     Err: 'static,
51//! {
52//!     pub fn new<Si, St>(sink: Si, stream: St) -> Addr<Self>
53//!     where
54//!         Si: Sink<Req, Error=Err> + 'static,
55//!         St: Stream<Item=Res> + 'static,
56//!     {
57//!         // Convert to a boxed trait object
58//!         let sink: Box<dyn Sink<Req, Error=Err>> = Box::new(sink);
59//!
60//!         Self::create(|ctx| {
61//!             ctx.add_stream(stream);
62//!             Self {
63//!                 sink: Some(sink.into()),
64//!                 in_flight_reqs: VecDeque::new(),
65//!             }
66//!         })
67//!     }
68//! }
69//!
70//! // Tell actix this is an actor using the default Context type
71//! impl<Req, Res, Err> Actor for PipelineAdapter<Req, Res, Err>
72//! where
73//!     Req: 'static,
74//!     Res: 'static,
75//!     Err: 'static,
76//! {
77//!     type Context = Context<Self>;
78//! }
79//!
80//! // Transform actix messages into the pipelines request/response protocol
81//! impl<Req, Res, Err> Handler<Req> for PipelineAdapter<Req, Res, Err>
82//! where
83//!     Req: Message<Result=Result<Res, Err>> + 'static,
84//!     Res: 'static,
85//!     Err: 'static,
86//! {
87//!     type Result = ResponseActFuture<Self, Result<Res, Err>>; // <- Message response type
88//!
89//!     fn handle(&mut self, msg: Req, _ctx: &mut Context<Self>) -> Self::Result {
90//!         async move {
91//!             let (tx, rx) = oneshot::channel();
92//!
93//!             // Perform sends in a critical section so they are strictly ordered
94//!             critical_section::<Self, _>(async {
95//!                 // Take the sink from the actor state
96//!                 let mut sink = with_ctx(|actor: &mut Self, _| actor.sink.take())
97//!                     .expect("Sink to be present");
98//!                 
99//!                 // Send the request
100//!                 let res = sink.send(msg).await;
101//!
102//!                 // Put the sink back, and if the send was successful,
103//!                 // record the in-flight request.
104//!                 with_ctx(|actor: &mut Self, _| {
105//!                     actor.sink = Some(sink);
106//!                     match res {
107//!                         Ok(()) => actor.in_flight_reqs.push_back(tx),
108//!                         Err(e) => {
109//!                             // Don't care if the receiver has gone away
110//!                             let _ = tx.send(Err(e));
111//!                         }
112//!                     }
113//!                 });
114//!             })
115//!             .await;
116//!
117//!             // Wait for the result concurrently, so many requests can
118//!             // be pipelined at the same time.
119//!             rx.await.expect("Sender should not be dropped")
120//!         }
121//!         .interop_actor_boxed(self)
122//!     }
123//! }
124//!
125//! // Process responses
126//! impl<Req, Res, Err> StreamHandler<Res> for PipelineAdapter<Req, Res, Err>
127//! where
128//!     Req: 'static,
129//!     Res: 'static,
130//!     Err: 'static,
131//! {
132//!     fn handle(&mut self, msg: Res, _ctx: &mut Context<Self>) {
133//!         // When we receive a response, just pull the first in-flight
134//!         // request and forward on the result.
135//!         let _ = self.in_flight_reqs
136//!             .pop_front()
137//!             .expect("There to be an in-flight request")
138//!             .send(Ok(msg));
139//!     }
140//! }
141//! ```
142//!
143
144#![deny(missing_docs, warnings)]
145
146use std::any::Any;
147use std::future::Future;
148use std::marker::PhantomData;
149use std::pin::Pin;
150use std::task::{Context, Poll};
151
152use actix::{Actor, ActorFuture, ActorStream, AsyncContext, ResponseActFuture};
153use futures::Stream;
154use pin_project::pin_project;
155use scoped_tls_hkt::scoped_thread_local;
156
157use self::local_handle::local_handle;
158
159mod local_handle;
160
161scoped_thread_local!(static mut CURRENT_ACTOR_CTX: for<'a> (&'a mut dyn Any, &'a mut dyn Any));
162
163fn set_actor_context<A, F, R>(actor: &mut A, ctx: &mut A::Context, f: F) -> R
164where
165    A: Actor,
166    F: FnOnce() -> R,
167{
168    CURRENT_ACTOR_CTX.set((actor, ctx), f)
169}
170
171/// May be called from within a future spawned onto an actor context to gain mutable access
172/// to the actor's state and/or context. The future must have been wrapped using
173/// [`interop_actor`](FutureInterop::interop_actor) or
174/// [`interop_actor_boxed`](FutureInterop::interop_actor_boxed).
175///
176/// Nested calls to this function will panic, as only one mutable borrow can be given out
177/// at a time.
178pub fn with_ctx<A, F, R>(f: F) -> R
179where
180    A: Actor + 'static,
181    F: FnOnce(&mut A, &mut A::Context) -> R,
182{
183    CURRENT_ACTOR_CTX.with(|(actor, ctx)| {
184        let actor = actor
185            .downcast_mut()
186            .expect("Future was spawned onto the wrong actor");
187        let ctx = ctx
188            .downcast_mut()
189            .expect("Future was spawned onto the wrong actor");
190        f(actor, ctx)
191    })
192}
193
194/// May be called in the same places as [`with_ctx`](with_ctx) to run a chunk of async
195/// code with exclusive access to an actor's state: no other futures spawned to this
196/// actor will be polled during the critical section.
197/// Unlike [`with_ctx`](with_ctx), calls to this function may be nested, although there
198/// is little point in doing so. Calling [`with_ctx`](with_ctx) from within a critical
199/// section is allowed (and expected).
200pub fn critical_section<A: Actor, F: Future>(f: F) -> impl Future<Output = F::Output>
201where
202    A::Context: AsyncContext<A>,
203    F: 'static,
204{
205    let (f, handle) = local_handle(f);
206    with_ctx(|actor: &mut A, ctx: &mut A::Context| ctx.wait(f.interop_actor(actor)));
207    handle
208}
209
210/// Future to ActorFuture adapter returned by [`interop_actor`](FutureInterop::interop_actor).
211#[pin_project]
212#[derive(Debug)]
213pub struct FutureInteropWrap<A: Actor, F> {
214    #[pin]
215    inner: F,
216    phantom: PhantomData<fn(&mut A, &mut A::Context)>,
217}
218
219impl<A: Actor, F: Future> FutureInteropWrap<A, F> {
220    fn new(inner: F) -> Self {
221        Self {
222            inner,
223            phantom: PhantomData,
224        }
225    }
226}
227
228impl<A: Actor, F: Future> ActorFuture<A> for FutureInteropWrap<A, F> {
229    type Output = F::Output;
230
231    fn poll(
232        self: Pin<&mut Self>,
233        actor: &mut A,
234        ctx: &mut A::Context,
235        task: &mut Context,
236    ) -> Poll<Self::Output> {
237        set_actor_context(actor, ctx, || self.project().inner.poll(task))
238    }
239}
240
241/// Extension trait implemented for all futures. Import this trait to bring the
242/// [`interop_actor`](FutureInterop::interop_actor) and
243/// [`interop_actor_boxed`](FutureInterop::interop_actor_boxed) methods into scope.
244pub trait FutureInterop<A: Actor>: Future + Sized {
245    /// Convert a future using the `with_ctx` or `critical_section` methods into an ActorFuture.
246    fn interop_actor(self, actor: &A) -> FutureInteropWrap<A, Self>;
247    /// Convert a future using the `with_ctx` or `critical_section` methods into a boxed
248    /// ActorFuture.
249    fn interop_actor_boxed(self, actor: &A) -> ResponseActFuture<A, Self::Output>
250    where
251        Self: 'static,
252    {
253        Box::pin(self.interop_actor(actor))
254    }
255}
256
257impl<A: Actor, F: Future> FutureInterop<A> for F {
258    fn interop_actor(self, _actor: &A) -> FutureInteropWrap<A, Self> {
259        FutureInteropWrap::new(self)
260    }
261}
262
263/// Stream to ActorStream adapter returned by [`interop_actor`](StreamInterop::interop_actor).
264#[pin_project]
265#[derive(Debug)]
266pub struct StreamInteropWrap<A: Actor, S> {
267    #[pin]
268    inner: S,
269    phantom: PhantomData<fn(&mut A, &mut A::Context)>,
270}
271
272impl<A: Actor, S: Stream> StreamInteropWrap<A, S> {
273    fn new(inner: S) -> Self {
274        Self {
275            inner,
276            phantom: PhantomData,
277        }
278    }
279}
280
281impl<A: Actor, S: Stream> ActorStream<A> for StreamInteropWrap<A, S> {
282    type Item = S::Item;
283
284    fn poll_next(
285        self: Pin<&mut Self>,
286        actor: &mut A,
287        ctx: &mut A::Context,
288        task: &mut Context,
289    ) -> Poll<Option<Self::Item>> {
290        set_actor_context(actor, ctx, || self.project().inner.poll_next(task))
291    }
292}
293
294/// Extension trait implemented for all streams. Import this trait to bring the
295/// [`interop_actor`](StreamInterop::interop_actor) and
296/// [`interop_actor_boxed`](StreamInterop::interop_actor_boxed) methods into scope.
297pub trait StreamInterop<A: Actor>: Stream + Sized {
298    /// Convert a stream using the `with_ctx` or `critical_section` methods into an ActorStream.
299    fn interop_actor(self, actor: &A) -> StreamInteropWrap<A, Self>;
300    /// Convert a stream using the `with_ctx` or `critical_section` methods into a boxed
301    /// ActorStream.
302    fn interop_actor_boxed(self, actor: &A) -> Box<dyn ActorStream<A, Item = Self::Item>>
303    where
304        Self: 'static,
305    {
306        Box::new(self.interop_actor(actor))
307    }
308}
309
310impl<A: Actor, S: Stream> StreamInterop<A> for S {
311    fn interop_actor(self, _actor: &A) -> StreamInteropWrap<A, Self> {
312        StreamInteropWrap::new(self)
313    }
314}
315
316#[cfg(test)]
317mod tests {
318
319    use super::{critical_section, with_ctx, FutureInterop};
320    use actix::prelude::*;
321
322    #[derive(Message)]
323    #[rtype(result = "Result<i32, ()>")] // we have to define the response type for `Sum` message
324    struct Sum(i32);
325
326    struct Summator {
327        field: i32,
328    }
329
330    impl Actor for Summator {
331        type Context = Context<Self>;
332    }
333
334    impl Handler<Sum> for Summator {
335        type Result = ResponseActFuture<Self, Result<i32, ()>>; // <- Message response type
336
337        fn handle(&mut self, msg: Sum, _ctx: &mut Context<Self>) -> Self::Result {
338            async move {
339                // Run some code with exclusive access to the actor
340                let accum = critical_section::<Self, _>(async {
341                    with_ctx(move |a: &mut Self, _| {
342                        a.field += msg.0;
343                        a.field
344                    })
345                })
346                .await;
347
348                // Return a result
349                Ok(accum)
350            }
351            .interop_actor_boxed(self)
352        }
353    }
354
355    impl StreamHandler<i32> for Summator {
356        fn handle(&mut self, msg: i32, _ctx: &mut Context<Self>) {
357            self.field += msg;
358        }
359        fn finished(&mut self, _ctx: &mut Context<Self>) {
360            assert_eq!(self.field, 10);
361            System::current().stop();
362        }
363    }
364
365    #[actix::test]
366    async fn can_run_future() -> Result<(), Box<dyn std::error::Error>> {
367        let addr = Summator { field: 0 }.start();
368
369        addr.send(Sum(3)).await.unwrap().unwrap();
370        let res = addr.send(Sum(4)).await?;
371
372        assert_eq!(res, Ok(7));
373        Ok(())
374    }
375
376    #[actix::test]
377    async fn can_run_stream() {
378        Summator::create(|ctx| {
379            ctx.add_stream(futures::stream::iter(1..5));
380            Summator { field: 0 }
381        });
382    }
383
384    mod pipeline {}
385}