tower_actor/
actor.rs

1use std::{
2    future::Future,
3    task::{Context, Poll},
4};
5
6use crate::future::ActorFuture;
7use crate::message::Message;
8
9use thiserror::Error;
10use tokio::sync::{mpsc, oneshot};
11use tokio_util::sync::PollSender;
12use tower::Service;
13use tracing::Span;
14
15#[derive(Error, Debug)]
16pub enum ActorError {
17    #[error("Actor has terminated or panicked")]
18    ActorTerminated,
19    #[error("Actor has dropped this request without responding")]
20    ActorHungUp,
21}
22
23/// A service backed by an actor that runs in a separate task and processes
24/// messages asynchronously.
25#[derive(Debug)]
26pub struct Actor<R, S, E> {
27    queue: tokio_util::sync::PollSender<Message<R, S, E>>,
28}
29
30// Implement this manually rather than via derive, so that it's not conditional on any bounds on R, S, E.
31impl<R, S, E> std::clone::Clone for Actor<R, S, E> {
32    fn clone(&self) -> Self {
33        Self {
34            queue: self.queue.clone(),
35        }
36    }
37}
38
39impl<R, S, E> Actor<R, S, E>
40where
41    R: Send + 'static,
42    S: Send + 'static,
43    E: Send + 'static,
44{
45    pub fn new<F, W>(bound: usize, f: F) -> Self
46    where
47        F: FnOnce(mpsc::Receiver<Message<R, S, E>>) -> W,
48        W: Future<Output = Result<(), E>> + Send + 'static,
49    {
50        #[cfg(tokio_unstable)]
51        {
52            return Self::named("tower-actor-worker", bound, f);
53        }
54
55        #[cfg(not(tokio_unstable))]
56        {
57            let (queue_tx, queue_rx) = mpsc::channel(bound);
58
59            tokio::spawn(f(queue_rx));
60
61            Self {
62                queue: PollSender::new(queue_tx),
63            }
64        }
65    }
66
67    #[cfg(tokio_unstable)]
68    pub fn named<'a, F, W>(name: &'a str, bound: usize, f: F) -> Self
69    where
70        F: FnOnce(mpsc::Receiver<Message<R, S, E>>) -> W,
71        W: Future<Output = Result<(), E>> + Send + 'static,
72    {
73        let (queue_tx, queue_rx) = mpsc::channel(bound);
74
75        tokio::task::Builder::new()
76            .name(name)
77            .spawn(f(queue_rx))
78            .expect("failed to spawn worker");
79
80        Self {
81            queue: PollSender::new(queue_tx),
82        }
83    }
84}
85
86impl<R, S, E> Service<R> for Actor<R, S, E>
87where
88    R: Send + 'static,
89    S: Send + 'static,
90    E: Send + 'static + From<ActorError>,
91{
92    type Response = S;
93    type Error = E;
94    type Future = ActorFuture<S, E>;
95
96    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
97        self.queue
98            .poll_reserve(cx)
99            .map_err(|_e| ActorError::ActorTerminated.into())
100    }
101
102    fn call(&mut self, req: R) -> Self::Future {
103        // Due to the permit system that PollSender uses, we can always send on the queue if
104        // Poll reserve succeeded.
105        // See: https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.close
106        //
107        // Since the Service contract requires that `poll_ready()` pass
108        // before calling `call()`, we can safely proceed without checking that the queue isn't closed.
109        debug_assert!(!self.queue.is_closed());
110
111        let span = Span::current();
112        let (tx, rx) = oneshot::channel();
113
114        self.queue
115            .send_item(Message {
116                req,
117                rsp_sender: tx,
118                span,
119            })
120            .unwrap_or_else(|e| panic!("Actor::call() called without `poll_ready`: {}", e)); // Non-debug expect()
121
122        ActorFuture { inner: rx }
123    }
124}