1#[cfg(feature = "async-global-executor14")]
2extern crate async_global_executor14 as async_global_executor;
3#[cfg(feature = "tokio02")]
4extern crate tokio02 as tokio;
5#[cfg(feature = "tokio10")]
6extern crate tokio10 as tokio;
7
8use std::future::Future;
9
10use futures_channel::{mpsc, oneshot};
11use futures_util::{
12 future::{self, Either, FutureExt},
13 stream::StreamExt,
14 task::SpawnExt,
15};
16
17use thiserror::Error;
18
19pub fn actor_with_executor<A, S, R, E>(
23 executor: impl SpawnExt,
24 initial_state: S,
25) -> ActorSender<A, R, E>
26where
27 A: Action<State = S, Result = R, Error = E> + Send + 'static,
28 S: Send + 'static,
29 R: Send + 'static,
30 E: Send + 'static,
31{
32 let (tx, rx) = mpsc::unbounded();
33 executor
34 .spawn(actor_future(rx, initial_state))
35 .expect("Cannot schedule actor on executor");
36 ActorSender(tx)
37}
38
39#[cfg(feature = "__global_executor")]
43pub fn actor<A, S, R, E>(initial_state: S) -> ActorSender<A, R, E>
44where
45 A: Action<State = S, Result = R, Error = E> + Send + 'static,
46 S: Send + 'static,
47 R: Send + 'static,
48 E: Send + 'static,
49{
50 let (tx, rx) = mpsc::unbounded();
51
52 #[cfg(feature = "async-global-executor14")]
53 async_global_executor::spawn(actor_future(rx, initial_state)).detach();
54
55 #[cfg(feature = "__tokio")]
56 tokio::spawn(actor_future(rx, initial_state));
57
58 ActorSender(tx)
59}
60
61type ActorSenderInner<A, R, E> = mpsc::UnboundedSender<(A, oneshot::Sender<Result<R, E>>)>;
62
63#[derive(Debug)]
64pub struct ActorSender<A, R, E>(ActorSenderInner<A, R, E>);
65
66impl<A, R, E> Clone for ActorSender<A, R, E> {
67 fn clone(&self) -> Self {
68 ActorSender(self.0.clone())
69 }
70}
71
72impl<A, R, E> ActorSender<A, R, E>
73where
74 A: Send + 'static,
75 R: Send + 'static,
76 E: Send + From<ActorError> + 'static,
77{
78 pub fn invoke(&self, action: A) -> impl Future<Output = ActResult<R, E>> {
84 let (tx, rx) = oneshot::channel();
85 if let Err(e) = self.0.unbounded_send((action, tx)) {
86 Either::Right(future::err(ActorError::from(e).into()))
87 } else {
88 Either::Left(rx.then(|result| match result {
89 Ok(Ok(r)) => future::ok(r),
90 Ok(Err(e)) => future::err(e),
91 Err(e) => future::err(ActorError::from(e).into()),
92 }))
93 }
94 }
95}
96
97async fn actor_future<A, S, R, E>(
98 mut receiver: mpsc::UnboundedReceiver<(A, oneshot::Sender<Result<R, E>>)>,
99 mut state: S,
100) where
101 A: Action<State = S, Result = R, Error = E>,
102{
103 while let Some((a, tx)) = receiver.next().await {
104 let _ = tx.send(a.act(&mut state));
105 }
106}
107
108pub trait Action {
109 type State;
110 type Result;
111 type Error;
112
113 fn act(self, s: &mut Self::State) -> Result<Self::Result, Self::Error>;
114}
115
116pub type ActResult<R, E> = Result<R, E>;
117
118#[derive(Debug, Error)]
119pub enum ActorError {
120 #[error("Cannot send message to actor")]
122 InvokeError,
123 #[error("Cannot wait for an answer")]
125 WaitError,
126}
127
128impl<T> From<mpsc::TrySendError<T>> for ActorError {
129 fn from(_from: mpsc::TrySendError<T>) -> Self {
130 ActorError::InvokeError
131 }
132}
133
134impl From<oneshot::Canceled> for ActorError {
135 fn from(_from: oneshot::Canceled) -> Self {
136 ActorError::WaitError
137 }
138}