actix_interop/lib.rs
1// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
2// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
4// option. This file may not be copied, modified, or distributed
5// except according to those terms.
6
7//! Use async/await syntax with actix actors.
8//!
9//! # Example
10//!
11//! This example shows how you could implement a generic "pipeline adapter"
12//! which allows turning any `Sink`/`Stream` pair (forming a request/response
13//! pipeline) into an actix Actor.
14//!
15//! Responses are matched up with their requests according to the order
16//! in which they were sent (so the first response corresponds to the
17//! first request sent to the `Sink`, etc). This requires that our "send"
18//! operations are strictly ordered, and this is difficult to achieve
19//! in actix because async operations are normally allowed to interleave.
20//!
21//! Furthermore, although the sends must be atomic, we also want to be
22//! able to have a large number of requests in-flight at any given time,
23//! so the receiving part of the message handler must not require exclusive
24//! access to the actor while it is waiting. As a result, abstractions
25//! like the `AtomicResponse` type are too simplistic to help.
26//!
27//! To solve this problem, we use the [`critical_section`](critical_section)
28//! function to allow specific parts of our message handler to be atomic.
29//!
30//! ```rust
31//! use std::collections::VecDeque;
32//! use std::pin::Pin;
33//!
34//! use futures::{Sink, SinkExt, Stream, channel::oneshot};
35//! use actix::prelude::*;
36//!
37//! use actix_interop::{FutureInterop, with_ctx, critical_section};
38//!
39//! // Define our actor
40//! pub struct PipelineAdapter<Req, Res, Err> {
41//! sink: Option<Pin<Box<dyn Sink<Req, Error=Err>>>>,
42//! in_flight_reqs: VecDeque<oneshot::Sender<Result<Res, Err>>>,
43//! }
44//!
45//! // Implement a constructor
46//! impl<Req, Res, Err> PipelineAdapter<Req, Res, Err>
47//! where
48//! Req: 'static,
49//! Res: 'static,
50//! Err: 'static,
51//! {
52//! pub fn new<Si, St>(sink: Si, stream: St) -> Addr<Self>
53//! where
54//! Si: Sink<Req, Error=Err> + 'static,
55//! St: Stream<Item=Res> + 'static,
56//! {
57//! // Convert to a boxed trait object
58//! let sink: Box<dyn Sink<Req, Error=Err>> = Box::new(sink);
59//!
60//! Self::create(|ctx| {
61//! ctx.add_stream(stream);
62//! Self {
63//! sink: Some(sink.into()),
64//! in_flight_reqs: VecDeque::new(),
65//! }
66//! })
67//! }
68//! }
69//!
70//! // Tell actix this is an actor using the default Context type
71//! impl<Req, Res, Err> Actor for PipelineAdapter<Req, Res, Err>
72//! where
73//! Req: 'static,
74//! Res: 'static,
75//! Err: 'static,
76//! {
77//! type Context = Context<Self>;
78//! }
79//!
80//! // Transform actix messages into the pipelines request/response protocol
81//! impl<Req, Res, Err> Handler<Req> for PipelineAdapter<Req, Res, Err>
82//! where
83//! Req: Message<Result=Result<Res, Err>> + 'static,
84//! Res: 'static,
85//! Err: 'static,
86//! {
87//! type Result = ResponseActFuture<Self, Result<Res, Err>>; // <- Message response type
88//!
89//! fn handle(&mut self, msg: Req, _ctx: &mut Context<Self>) -> Self::Result {
90//! async move {
91//! let (tx, rx) = oneshot::channel();
92//!
93//! // Perform sends in a critical section so they are strictly ordered
94//! critical_section::<Self, _>(async {
95//! // Take the sink from the actor state
96//! let mut sink = with_ctx(|actor: &mut Self, _| actor.sink.take())
97//! .expect("Sink to be present");
98//!
99//! // Send the request
100//! let res = sink.send(msg).await;
101//!
102//! // Put the sink back, and if the send was successful,
103//! // record the in-flight request.
104//! with_ctx(|actor: &mut Self, _| {
105//! actor.sink = Some(sink);
106//! match res {
107//! Ok(()) => actor.in_flight_reqs.push_back(tx),
108//! Err(e) => {
109//! // Don't care if the receiver has gone away
110//! let _ = tx.send(Err(e));
111//! }
112//! }
113//! });
114//! })
115//! .await;
116//!
117//! // Wait for the result concurrently, so many requests can
118//! // be pipelined at the same time.
119//! rx.await.expect("Sender should not be dropped")
120//! }
121//! .interop_actor_boxed(self)
122//! }
123//! }
124//!
125//! // Process responses
126//! impl<Req, Res, Err> StreamHandler<Res> for PipelineAdapter<Req, Res, Err>
127//! where
128//! Req: 'static,
129//! Res: 'static,
130//! Err: 'static,
131//! {
132//! fn handle(&mut self, msg: Res, _ctx: &mut Context<Self>) {
133//! // When we receive a response, just pull the first in-flight
134//! // request and forward on the result.
135//! let _ = self.in_flight_reqs
136//! .pop_front()
137//! .expect("There to be an in-flight request")
138//! .send(Ok(msg));
139//! }
140//! }
141//! ```
142//!
143
144#![deny(missing_docs, warnings)]
145
146use std::any::Any;
147use std::future::Future;
148use std::marker::PhantomData;
149use std::pin::Pin;
150use std::task::{Context, Poll};
151
152use actix::{Actor, ActorFuture, ActorStream, AsyncContext, ResponseActFuture};
153use futures::Stream;
154use pin_project::pin_project;
155use scoped_tls_hkt::scoped_thread_local;
156
157use self::local_handle::local_handle;
158
159mod local_handle;
160
161scoped_thread_local!(static mut CURRENT_ACTOR_CTX: for<'a> (&'a mut dyn Any, &'a mut dyn Any));
162
163fn set_actor_context<A, F, R>(actor: &mut A, ctx: &mut A::Context, f: F) -> R
164where
165 A: Actor,
166 F: FnOnce() -> R,
167{
168 CURRENT_ACTOR_CTX.set((actor, ctx), f)
169}
170
171/// May be called from within a future spawned onto an actor context to gain mutable access
172/// to the actor's state and/or context. The future must have been wrapped using
173/// [`interop_actor`](FutureInterop::interop_actor) or
174/// [`interop_actor_boxed`](FutureInterop::interop_actor_boxed).
175///
176/// Nested calls to this function will panic, as only one mutable borrow can be given out
177/// at a time.
178pub fn with_ctx<A, F, R>(f: F) -> R
179where
180 A: Actor + 'static,
181 F: FnOnce(&mut A, &mut A::Context) -> R,
182{
183 CURRENT_ACTOR_CTX.with(|(actor, ctx)| {
184 let actor = actor
185 .downcast_mut()
186 .expect("Future was spawned onto the wrong actor");
187 let ctx = ctx
188 .downcast_mut()
189 .expect("Future was spawned onto the wrong actor");
190 f(actor, ctx)
191 })
192}
193
194/// May be called in the same places as [`with_ctx`](with_ctx) to run a chunk of async
195/// code with exclusive access to an actor's state: no other futures spawned to this
196/// actor will be polled during the critical section.
197/// Unlike [`with_ctx`](with_ctx), calls to this function may be nested, although there
198/// is little point in doing so. Calling [`with_ctx`](with_ctx) from within a critical
199/// section is allowed (and expected).
200pub fn critical_section<A: Actor, F: Future>(f: F) -> impl Future<Output = F::Output>
201where
202 A::Context: AsyncContext<A>,
203 F: 'static,
204{
205 let (f, handle) = local_handle(f);
206 with_ctx(|actor: &mut A, ctx: &mut A::Context| ctx.wait(f.interop_actor(actor)));
207 handle
208}
209
210/// Future to ActorFuture adapter returned by [`interop_actor`](FutureInterop::interop_actor).
211#[pin_project]
212#[derive(Debug)]
213pub struct FutureInteropWrap<A: Actor, F> {
214 #[pin]
215 inner: F,
216 phantom: PhantomData<fn(&mut A, &mut A::Context)>,
217}
218
219impl<A: Actor, F: Future> FutureInteropWrap<A, F> {
220 fn new(inner: F) -> Self {
221 Self {
222 inner,
223 phantom: PhantomData,
224 }
225 }
226}
227
228impl<A: Actor, F: Future> ActorFuture<A> for FutureInteropWrap<A, F> {
229 type Output = F::Output;
230
231 fn poll(
232 self: Pin<&mut Self>,
233 actor: &mut A,
234 ctx: &mut A::Context,
235 task: &mut Context,
236 ) -> Poll<Self::Output> {
237 set_actor_context(actor, ctx, || self.project().inner.poll(task))
238 }
239}
240
241/// Extension trait implemented for all futures. Import this trait to bring the
242/// [`interop_actor`](FutureInterop::interop_actor) and
243/// [`interop_actor_boxed`](FutureInterop::interop_actor_boxed) methods into scope.
244pub trait FutureInterop<A: Actor>: Future + Sized {
245 /// Convert a future using the `with_ctx` or `critical_section` methods into an ActorFuture.
246 fn interop_actor(self, actor: &A) -> FutureInteropWrap<A, Self>;
247 /// Convert a future using the `with_ctx` or `critical_section` methods into a boxed
248 /// ActorFuture.
249 fn interop_actor_boxed(self, actor: &A) -> ResponseActFuture<A, Self::Output>
250 where
251 Self: 'static,
252 {
253 Box::pin(self.interop_actor(actor))
254 }
255}
256
257impl<A: Actor, F: Future> FutureInterop<A> for F {
258 fn interop_actor(self, _actor: &A) -> FutureInteropWrap<A, Self> {
259 FutureInteropWrap::new(self)
260 }
261}
262
263/// Stream to ActorStream adapter returned by [`interop_actor`](StreamInterop::interop_actor).
264#[pin_project]
265#[derive(Debug)]
266pub struct StreamInteropWrap<A: Actor, S> {
267 #[pin]
268 inner: S,
269 phantom: PhantomData<fn(&mut A, &mut A::Context)>,
270}
271
272impl<A: Actor, S: Stream> StreamInteropWrap<A, S> {
273 fn new(inner: S) -> Self {
274 Self {
275 inner,
276 phantom: PhantomData,
277 }
278 }
279}
280
281impl<A: Actor, S: Stream> ActorStream<A> for StreamInteropWrap<A, S> {
282 type Item = S::Item;
283
284 fn poll_next(
285 self: Pin<&mut Self>,
286 actor: &mut A,
287 ctx: &mut A::Context,
288 task: &mut Context,
289 ) -> Poll<Option<Self::Item>> {
290 set_actor_context(actor, ctx, || self.project().inner.poll_next(task))
291 }
292}
293
294/// Extension trait implemented for all streams. Import this trait to bring the
295/// [`interop_actor`](StreamInterop::interop_actor) and
296/// [`interop_actor_boxed`](StreamInterop::interop_actor_boxed) methods into scope.
297pub trait StreamInterop<A: Actor>: Stream + Sized {
298 /// Convert a stream using the `with_ctx` or `critical_section` methods into an ActorStream.
299 fn interop_actor(self, actor: &A) -> StreamInteropWrap<A, Self>;
300 /// Convert a stream using the `with_ctx` or `critical_section` methods into a boxed
301 /// ActorStream.
302 fn interop_actor_boxed(self, actor: &A) -> Box<dyn ActorStream<A, Item = Self::Item>>
303 where
304 Self: 'static,
305 {
306 Box::new(self.interop_actor(actor))
307 }
308}
309
310impl<A: Actor, S: Stream> StreamInterop<A> for S {
311 fn interop_actor(self, _actor: &A) -> StreamInteropWrap<A, Self> {
312 StreamInteropWrap::new(self)
313 }
314}
315
316#[cfg(test)]
317mod tests {
318
319 use super::{critical_section, with_ctx, FutureInterop};
320 use actix::prelude::*;
321
322 #[derive(Message)]
323 #[rtype(result = "Result<i32, ()>")] // we have to define the response type for `Sum` message
324 struct Sum(i32);
325
326 struct Summator {
327 field: i32,
328 }
329
330 impl Actor for Summator {
331 type Context = Context<Self>;
332 }
333
334 impl Handler<Sum> for Summator {
335 type Result = ResponseActFuture<Self, Result<i32, ()>>; // <- Message response type
336
337 fn handle(&mut self, msg: Sum, _ctx: &mut Context<Self>) -> Self::Result {
338 async move {
339 // Run some code with exclusive access to the actor
340 let accum = critical_section::<Self, _>(async {
341 with_ctx(move |a: &mut Self, _| {
342 a.field += msg.0;
343 a.field
344 })
345 })
346 .await;
347
348 // Return a result
349 Ok(accum)
350 }
351 .interop_actor_boxed(self)
352 }
353 }
354
355 impl StreamHandler<i32> for Summator {
356 fn handle(&mut self, msg: i32, _ctx: &mut Context<Self>) {
357 self.field += msg;
358 }
359 fn finished(&mut self, _ctx: &mut Context<Self>) {
360 assert_eq!(self.field, 10);
361 System::current().stop();
362 }
363 }
364
365 #[actix::test]
366 async fn can_run_future() -> Result<(), Box<dyn std::error::Error>> {
367 let addr = Summator { field: 0 }.start();
368
369 addr.send(Sum(3)).await.unwrap().unwrap();
370 let res = addr.send(Sum(4)).await?;
371
372 assert_eq!(res, Ok(7));
373 Ok(())
374 }
375
376 #[actix::test]
377 async fn can_run_stream() {
378 Summator::create(|ctx| {
379 ctx.add_stream(futures::stream::iter(1..5));
380 Summator { field: 0 }
381 });
382 }
383
384 mod pipeline {}
385}