simple_actor/
lib.rs

1//! Helper to write actor-based async code.
2//!
3//! ```rust
4//! use futures::FutureExt;
5//! use simple_actor::Actor;
6//! use std::time::Duration;
7//!
8//! #[derive(Clone)]
9//! pub struct Adder(Actor<u32>);
10//!
11//! impl Adder {
12//!     pub fn new(initial_value: u32) -> Self {
13//!         let (actor, driver) = Actor::new(initial_value);
14//!         tokio::spawn(driver);
15//!         Self(actor)
16//!     }
17//!
18//!     pub async fn add(&self, x: u32) -> Option<()> {
19//!         self.0.queue(move |state| *state += x).await
20//!     }
21//!
22//!     pub async fn add_delayed(&self, x: u32) -> Option<()> {
23//!         self.0.queue_blocking(move |state| async move {
24//!             tokio::time::sleep(Duration::from_millis(500)).await;
25//!             *state += x
26//!         }.boxed()).await
27//!     }
28//!
29//!     pub async fn get(&self) -> Option<u32> {
30//!         self.0.query(move |state| *state).await
31//!     }
32//!
33//!     pub async fn get_delayed(&self) -> Option<u32> {
34//!         self.0.query_blocking(move |state| async move {
35//!             tokio::time::sleep(Duration::from_millis(500)).await;
36//!             *state
37//!         }.boxed()).await
38//!     }
39//! }
40//!
41//! #[tokio::main]
42//! async fn main() {
43//!     let adder = Adder::new(5);
44//!
45//!     assert_eq!(adder.add(2).await, Some(()));
46//!     assert_eq!(adder.get().await, Some(7));
47//!
48//!     assert_eq!(adder.add_delayed(3).await, Some(()));
49//!     assert_eq!(adder.get_delayed().await, Some(10));
50//!
51//!     assert!(adder.0.is_active());
52//!     adder.0.shutdown();
53//!     assert!(!adder.0.is_active());
54//!
55//!     assert_eq!(adder.add(2).await, None);
56//!     assert_eq!(adder.get().await, None);
57//!
58//!     assert_eq!(adder.add_delayed(2).await, None);
59//!     assert_eq!(adder.get_delayed().await, None);
60//! }
61//! ```
62//!
63//! ## Inspiration
64//!
65//! This crate is inspired by [`ghost_actor`], with a simpler implementation and
66//! API.
67//!
68//! This crate functions returns `None` if the actor is down, which
69//! avoids dealing with error type conversions.
70//!
71//! This crate also allows to use futures that can hold the state across
72//! `.await`.
73//!  
74//! [`ghost_actor`]: https://github.com/holochain/ghost_actor
75
76use futures::{
77    channel::{mpsc, oneshot},
78    Future, FutureExt, SinkExt, StreamExt,
79};
80use std::{hash::Hash, pin::Pin};
81
82type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
83type Blocking<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send>;
84type NonBlocking<T> = Box<dyn FnOnce(&mut T) + 'static + Send>;
85
86enum StateChange<T> {
87    Async(Blocking<T>),
88    Sync(NonBlocking<T>),
89}
90
91type StateChangeSender<T> = mpsc::Sender<StateChange<T>>;
92
93/// Actor wrapping a state.
94///
95/// Cloning the actor provides an handle to the same actor.
96pub struct Actor<T: 'static + Send>(StateChangeSender<T>);
97impl<T: 'static + Send> Actor<T> {
98    /// Creates a new `Actor` with default inbound channel capacity (1024).
99    ///
100    /// Returned future must be spawned in an async executor.
101    pub fn new(state: T) -> (Self, impl Future<Output = ()>) {
102        Self::new_with_capacity(state, 1024)
103    }
104
105    /// Creates a new `Actor` with given capacity for its inbound channel.
106    ///
107    /// Returned future must be spawned in an async executor.
108    pub fn new_with_capacity(mut state: T, capacity: usize) -> (Self, impl Future<Output = ()>) {
109        let (send, recv) = mpsc::channel::<StateChange<T>>(capacity);
110
111        let driver = FutureExt::boxed(async move {
112            let mut recv = StreamExt::ready_chunks(recv, 1024);
113
114            while let Some(changes) = recv.next().await {
115                for change in changes {
116                    match change {
117                        StateChange::Async(f) => f(&mut state).await,
118                        StateChange::Sync(f) => f(&mut state),
119                    }
120                }
121            }
122        });
123
124        (Self(send), driver)
125    }
126
127    /// Queue an async function on the state. The future that this function
128    /// returns can hold the state across await points, meaning it will prevent
129    /// other functions to be processed until the future is complete.
130    ///
131    /// [`queue_blocking`] resolves once the order is sent to the actor, and
132    /// doesn't wait for it to be processed by the actor, but cannot have
133    /// an output value.
134    ///
135    /// To wait for the order to be processed and get an output, use
136    /// [`query_blocking`].
137    ///
138    /// [`queue_blocking`]: Actor::queue_blocking
139    /// [`query_blocking`]: Actor::query_blocking
140    pub async fn queue_blocking<F>(&self, f: F) -> Option<()>
141    where
142        F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send + 'static,
143    {
144        let mut send = self.0.clone();
145
146        let f: Blocking<T> = Box::new(move |state: &mut T| {
147            async move {
148                f(state).await;
149            }
150            .boxed()
151        });
152
153        send.send(StateChange::Async(f)).await.ok()
154    }
155
156    /// Queue a function on the state. It is more performant to have multiple
157    /// [`queue`]/[`query`] in a row, as it can avoid using `.await` on the internal channel
158    /// or on a future-based change ([`queue_blocking`]/[`query_blocking`]).
159    ///
160    /// [`queue`] resolves once the order is sent to the actor, and doesn't wait
161    /// for it to be processed by the actor, but cannot have an output value.
162    ///
163    /// To wait for the order to be processed and get an output, use [`query`].
164    ///
165    /// [`queue`]: Actor::queue
166    /// [`query`]: Actor::query
167    /// [`queue_blocking`]: Actor::queue_blocking
168    /// [`query_blocking`]: Actor::query_blocking
169    pub async fn queue<F>(&self, f: F) -> Option<()>
170    where
171        F: FnOnce(&mut T) + 'static + Send,
172    {
173        let mut send = self.0.clone();
174
175        send.send(StateChange::Sync(Box::new(f))).await.ok()
176    }
177
178    /// Queue an async function on the state. The future that this function
179    /// returns can hold the state across await points, meaning it will prevent
180    /// other functions to be processed until the future is complete.
181    ///
182    /// [`query_blocking`] resolves once the order as been processed by the actor,
183    /// which allows it to return an output.
184    ///
185    /// If an output is not needed and it is not needed to wait for the order
186    /// to be processed, use [`queue_blocking`].
187    ///
188    /// [`query_blocking`]: Actor::query_blocking
189    /// [`queue_blocking`]: Actor::queue_blocking
190    pub async fn query_blocking<F, R>(&self, f: F) -> Option<R>
191    where
192        F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send + 'static,
193        R: 'static + Send,
194    {
195        let mut send = self.0.clone();
196        let (output_send, output_recv) = oneshot::channel();
197
198        let f: Blocking<T> = Box::new(move |state: &mut T| {
199            async move {
200                let output = f(state).await;
201                let _ = output_send.send(output);
202            }
203            .boxed()
204        });
205
206        send.send(StateChange::Async(f)).await.ok()?;
207
208        output_recv.await.ok()
209    }
210
211    /// Queue a function on the state. It is more performant to have multiple
212    /// [`queue`]/[`query`] in a row, as it can avoid using `.await` on the internal channel
213    /// or on a future-based change ([`queue_blocking`]/[`query_blocking`]).
214    ///
215    /// [`query_blocking`] resolves once the order as been processed by the actor,
216    /// which allows it to return an output.
217    ///
218    /// If an output is not needed and it is not needed to wait for the order
219    /// to be processed, use [`queue`].
220    ///
221    ///
222    /// [`queue`]: Actor::queue
223    /// [`query`]: Actor::query
224    /// [`queue_blocking`]: Actor::queue_blocking
225    /// [`query_blocking`]: Actor::query_blocking
226    pub async fn query<F, R>(&self, f: F) -> Option<R>
227    where
228        F: FnOnce(&mut T) -> R + 'static + Send,
229        R: 'static + Send,
230    {
231        let mut invoke_tx = self.0.clone();
232        let (response_tx, response_rx) = oneshot::channel();
233
234        invoke_tx
235            .send(StateChange::Sync(Box::new(move |state| {
236                let output = f(state);
237                let _ = response_tx.send(output);
238            })))
239            .await
240            .ok()?;
241
242        response_rx.await.ok()
243    }
244
245    /// Tells if the actor still accepts new invokes.
246    pub fn is_active(&self) -> bool {
247        !self.0.is_closed()
248    }
249
250    /// Stop the actor, which will process every already queued invokes
251    /// before really stopping.
252    pub fn shutdown(&self) {
253        self.0.clone().close_channel()
254    }
255}
256
257impl<T: 'static + Send> Clone for Actor<T> {
258    fn clone(&self) -> Self {
259        Self(self.0.clone())
260    }
261}
262
263impl<T: 'static + Send> PartialEq for Actor<T> {
264    fn eq(&self, other: &Self) -> bool {
265        self.0.same_receiver(&other.0)
266    }
267}
268
269impl<T: 'static + Send> Eq for Actor<T> {}
270
271impl<T: 'static + Send> Hash for Actor<T> {
272    fn hash<Hasher: std::hash::Hasher>(&self, hasher: &mut Hasher) {
273        self.0.hash_receiver(hasher);
274    }
275}