lwactors/
lib.rs

1#[cfg(feature = "async-global-executor14")]
2extern crate async_global_executor14 as async_global_executor;
3#[cfg(feature = "tokio02")]
4extern crate tokio02 as tokio;
5#[cfg(feature = "tokio10")]
6extern crate tokio10 as tokio;
7
8use std::future::Future;
9
10use futures_channel::{mpsc, oneshot};
11use futures_util::{
12    future::{self, Either, FutureExt},
13    stream::StreamExt,
14    task::SpawnExt,
15};
16
17use thiserror::Error;
18
19/// Construct a new actor, requires an `Executor` and an initial state.  Returns a reference that can be cheaply
20/// cloned and passed between threads.  A specific implementation is expected to wrap this return value and implement
21/// the required custom logic.
22pub fn actor_with_executor<A, S, R, E>(
23    executor: impl SpawnExt,
24    initial_state: S,
25) -> ActorSender<A, R, E>
26where
27    A: Action<State = S, Result = R, Error = E> + Send + 'static,
28    S: Send + 'static,
29    R: Send + 'static,
30    E: Send + 'static,
31{
32    let (tx, rx) = mpsc::unbounded();
33    executor
34        .spawn(actor_future(rx, initial_state))
35        .expect("Cannot schedule actor on executor");
36    ActorSender(tx)
37}
38
39/// Constructs a new actor using a global executor. If the feature flag to enable Tokio is chosen,
40/// this will use `tokio::spawn` if the feature flag for the async-global-executor crate is chosen,
41/// that will be used.
42#[cfg(feature = "__global_executor")]
43pub fn actor<A, S, R, E>(initial_state: S) -> ActorSender<A, R, E>
44where
45    A: Action<State = S, Result = R, Error = E> + Send + 'static,
46    S: Send + 'static,
47    R: Send + 'static,
48    E: Send + 'static,
49{
50    let (tx, rx) = mpsc::unbounded();
51
52    #[cfg(feature = "async-global-executor14")]
53    async_global_executor::spawn(actor_future(rx, initial_state)).detach();
54
55    #[cfg(feature = "__tokio")]
56    tokio::spawn(actor_future(rx, initial_state));
57
58    ActorSender(tx)
59}
60
61type ActorSenderInner<A, R, E> = mpsc::UnboundedSender<(A, oneshot::Sender<Result<R, E>>)>;
62
63#[derive(Debug)]
64pub struct ActorSender<A, R, E>(ActorSenderInner<A, R, E>);
65
66impl<A, R, E> Clone for ActorSender<A, R, E> {
67    fn clone(&self) -> Self {
68        ActorSender(self.0.clone())
69    }
70}
71
72impl<A, R, E> ActorSender<A, R, E>
73where
74    A: Send + 'static,
75    R: Send + 'static,
76    E: Send + From<ActorError> + 'static,
77{
78    /// Invokes a specific action on the actor.  Returns a future that completes when the actor has
79    /// performed the action.
80    ///
81    /// Specifically not an `async` function to allow it to be used correctly in a fire-and-forget
82    /// manner, although this is not advised.
83    pub fn invoke(&self, action: A) -> impl Future<Output = ActResult<R, E>> {
84        let (tx, rx) = oneshot::channel();
85        if let Err(e) = self.0.unbounded_send((action, tx)) {
86            Either::Right(future::err(ActorError::from(e).into()))
87        } else {
88            Either::Left(rx.then(|result| match result {
89                Ok(Ok(r)) => future::ok(r),
90                Ok(Err(e)) => future::err(e),
91                Err(e) => future::err(ActorError::from(e).into()),
92            }))
93        }
94    }
95}
96
97async fn actor_future<A, S, R, E>(
98    mut receiver: mpsc::UnboundedReceiver<(A, oneshot::Sender<Result<R, E>>)>,
99    mut state: S,
100) where
101    A: Action<State = S, Result = R, Error = E>,
102{
103    while let Some((a, tx)) = receiver.next().await {
104        let _ = tx.send(a.act(&mut state));
105    }
106}
107
108pub trait Action {
109    type State;
110    type Result;
111    type Error;
112
113    fn act(self, s: &mut Self::State) -> Result<Self::Result, Self::Error>;
114}
115
116pub type ActResult<R, E> = Result<R, E>;
117
118#[derive(Debug, Error)]
119pub enum ActorError {
120    /// Cannot send message to the actor
121    #[error("Cannot send message to actor")]
122    InvokeError,
123    /// Response was cancelled before being received.
124    #[error("Cannot wait for an answer")]
125    WaitError,
126}
127
128impl<T> From<mpsc::TrySendError<T>> for ActorError {
129    fn from(_from: mpsc::TrySendError<T>) -> Self {
130        ActorError::InvokeError
131    }
132}
133
134impl From<oneshot::Canceled> for ActorError {
135    fn from(_from: oneshot::Canceled) -> Self {
136        ActorError::WaitError
137    }
138}