1use std::{
2 future::Future,
3 task::{Context, Poll},
4};
5
6use crate::future::ActorFuture;
7use crate::message::Message;
8
9use thiserror::Error;
10use tokio::sync::{mpsc, oneshot};
11use tokio_util::sync::PollSender;
12use tower::Service;
13use tracing::Span;
14
15#[derive(Error, Debug)]
16pub enum ActorError {
17 #[error("Actor has terminated or panicked")]
18 ActorTerminated,
19 #[error("Actor has dropped this request without responding")]
20 ActorHungUp,
21}
22
23#[derive(Debug)]
26pub struct Actor<R, S, E> {
27 queue: tokio_util::sync::PollSender<Message<R, S, E>>,
28}
29
30impl<R, S, E> std::clone::Clone for Actor<R, S, E> {
32 fn clone(&self) -> Self {
33 Self {
34 queue: self.queue.clone(),
35 }
36 }
37}
38
39impl<R, S, E> Actor<R, S, E>
40where
41 R: Send + 'static,
42 S: Send + 'static,
43 E: Send + 'static,
44{
45 pub fn new<F, W>(bound: usize, f: F) -> Self
46 where
47 F: FnOnce(mpsc::Receiver<Message<R, S, E>>) -> W,
48 W: Future<Output = Result<(), E>> + Send + 'static,
49 {
50 #[cfg(tokio_unstable)]
51 {
52 return Self::named("tower-actor-worker", bound, f);
53 }
54
55 #[cfg(not(tokio_unstable))]
56 {
57 let (queue_tx, queue_rx) = mpsc::channel(bound);
58
59 tokio::spawn(f(queue_rx));
60
61 Self {
62 queue: PollSender::new(queue_tx),
63 }
64 }
65 }
66
67 #[cfg(tokio_unstable)]
68 pub fn named<'a, F, W>(name: &'a str, bound: usize, f: F) -> Self
69 where
70 F: FnOnce(mpsc::Receiver<Message<R, S, E>>) -> W,
71 W: Future<Output = Result<(), E>> + Send + 'static,
72 {
73 let (queue_tx, queue_rx) = mpsc::channel(bound);
74
75 tokio::task::Builder::new()
76 .name(name)
77 .spawn(f(queue_rx))
78 .expect("failed to spawn worker");
79
80 Self {
81 queue: PollSender::new(queue_tx),
82 }
83 }
84}
85
86impl<R, S, E> Service<R> for Actor<R, S, E>
87where
88 R: Send + 'static,
89 S: Send + 'static,
90 E: Send + 'static + From<ActorError>,
91{
92 type Response = S;
93 type Error = E;
94 type Future = ActorFuture<S, E>;
95
96 fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
97 self.queue
98 .poll_reserve(cx)
99 .map_err(|_e| ActorError::ActorTerminated.into())
100 }
101
102 fn call(&mut self, req: R) -> Self::Future {
103 debug_assert!(!self.queue.is_closed());
110
111 let span = Span::current();
112 let (tx, rx) = oneshot::channel();
113
114 self.queue
115 .send_item(Message {
116 req,
117 rsp_sender: tx,
118 span,
119 })
120 .unwrap_or_else(|e| panic!("Actor::call() called without `poll_ready`: {}", e)); ActorFuture { inner: rx }
123 }
124}