simple_actor/lib.rs
1//! Helper to write actor-based async code.
2//!
3//! ```rust
4//! use futures::FutureExt;
5//! use simple_actor::Actor;
6//! use std::time::Duration;
7//!
8//! #[derive(Clone)]
9//! pub struct Adder(Actor<u32>);
10//!
11//! impl Adder {
12//! pub fn new(initial_value: u32) -> Self {
13//! let (actor, driver) = Actor::new(initial_value);
14//! tokio::spawn(driver);
15//! Self(actor)
16//! }
17//!
18//! pub async fn add(&self, x: u32) -> Option<()> {
19//! self.0.queue(move |state| *state += x).await
20//! }
21//!
22//! pub async fn add_delayed(&self, x: u32) -> Option<()> {
23//! self.0.queue_blocking(move |state| async move {
24//! tokio::time::sleep(Duration::from_millis(500)).await;
25//! *state += x
26//! }.boxed()).await
27//! }
28//!
29//! pub async fn get(&self) -> Option<u32> {
30//! self.0.query(move |state| *state).await
31//! }
32//!
33//! pub async fn get_delayed(&self) -> Option<u32> {
34//! self.0.query_blocking(move |state| async move {
35//! tokio::time::sleep(Duration::from_millis(500)).await;
36//! *state
37//! }.boxed()).await
38//! }
39//! }
40//!
41//! #[tokio::main]
42//! async fn main() {
43//! let adder = Adder::new(5);
44//!
45//! assert_eq!(adder.add(2).await, Some(()));
46//! assert_eq!(adder.get().await, Some(7));
47//!
48//! assert_eq!(adder.add_delayed(3).await, Some(()));
49//! assert_eq!(adder.get_delayed().await, Some(10));
50//!
51//! assert!(adder.0.is_active());
52//! adder.0.shutdown();
53//! assert!(!adder.0.is_active());
54//!
55//! assert_eq!(adder.add(2).await, None);
56//! assert_eq!(adder.get().await, None);
57//!
58//! assert_eq!(adder.add_delayed(2).await, None);
59//! assert_eq!(adder.get_delayed().await, None);
60//! }
61//! ```
62//!
63//! ## Inspiration
64//!
65//! This crate is inspired by [`ghost_actor`], with a simpler implementation and
66//! API.
67//!
68//! This crate functions returns `None` if the actor is down, which
69//! avoids dealing with error type conversions.
70//!
71//! This crate also allows to use futures that can hold the state across
72//! `.await`.
73//!
74//! [`ghost_actor`]: https://github.com/holochain/ghost_actor
75
76use futures::{
77 channel::{mpsc, oneshot},
78 Future, FutureExt, SinkExt, StreamExt,
79};
80use std::{hash::Hash, pin::Pin};
81
82type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
83type Blocking<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send>;
84type NonBlocking<T> = Box<dyn FnOnce(&mut T) + 'static + Send>;
85
86enum StateChange<T> {
87 Async(Blocking<T>),
88 Sync(NonBlocking<T>),
89}
90
91type StateChangeSender<T> = mpsc::Sender<StateChange<T>>;
92
93/// Actor wrapping a state.
94///
95/// Cloning the actor provides an handle to the same actor.
96pub struct Actor<T: 'static + Send>(StateChangeSender<T>);
97impl<T: 'static + Send> Actor<T> {
98 /// Creates a new `Actor` with default inbound channel capacity (1024).
99 ///
100 /// Returned future must be spawned in an async executor.
101 pub fn new(state: T) -> (Self, impl Future<Output = ()>) {
102 Self::new_with_capacity(state, 1024)
103 }
104
105 /// Creates a new `Actor` with given capacity for its inbound channel.
106 ///
107 /// Returned future must be spawned in an async executor.
108 pub fn new_with_capacity(mut state: T, capacity: usize) -> (Self, impl Future<Output = ()>) {
109 let (send, recv) = mpsc::channel::<StateChange<T>>(capacity);
110
111 let driver = FutureExt::boxed(async move {
112 let mut recv = StreamExt::ready_chunks(recv, 1024);
113
114 while let Some(changes) = recv.next().await {
115 for change in changes {
116 match change {
117 StateChange::Async(f) => f(&mut state).await,
118 StateChange::Sync(f) => f(&mut state),
119 }
120 }
121 }
122 });
123
124 (Self(send), driver)
125 }
126
127 /// Queue an async function on the state. The future that this function
128 /// returns can hold the state across await points, meaning it will prevent
129 /// other functions to be processed until the future is complete.
130 ///
131 /// [`queue_blocking`] resolves once the order is sent to the actor, and
132 /// doesn't wait for it to be processed by the actor, but cannot have
133 /// an output value.
134 ///
135 /// To wait for the order to be processed and get an output, use
136 /// [`query_blocking`].
137 ///
138 /// [`queue_blocking`]: Actor::queue_blocking
139 /// [`query_blocking`]: Actor::query_blocking
140 pub async fn queue_blocking<F>(&self, f: F) -> Option<()>
141 where
142 F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send + 'static,
143 {
144 let mut send = self.0.clone();
145
146 let f: Blocking<T> = Box::new(move |state: &mut T| {
147 async move {
148 f(state).await;
149 }
150 .boxed()
151 });
152
153 send.send(StateChange::Async(f)).await.ok()
154 }
155
156 /// Queue a function on the state. It is more performant to have multiple
157 /// [`queue`]/[`query`] in a row, as it can avoid using `.await` on the internal channel
158 /// or on a future-based change ([`queue_blocking`]/[`query_blocking`]).
159 ///
160 /// [`queue`] resolves once the order is sent to the actor, and doesn't wait
161 /// for it to be processed by the actor, but cannot have an output value.
162 ///
163 /// To wait for the order to be processed and get an output, use [`query`].
164 ///
165 /// [`queue`]: Actor::queue
166 /// [`query`]: Actor::query
167 /// [`queue_blocking`]: Actor::queue_blocking
168 /// [`query_blocking`]: Actor::query_blocking
169 pub async fn queue<F>(&self, f: F) -> Option<()>
170 where
171 F: FnOnce(&mut T) + 'static + Send,
172 {
173 let mut send = self.0.clone();
174
175 send.send(StateChange::Sync(Box::new(f))).await.ok()
176 }
177
178 /// Queue an async function on the state. The future that this function
179 /// returns can hold the state across await points, meaning it will prevent
180 /// other functions to be processed until the future is complete.
181 ///
182 /// [`query_blocking`] resolves once the order as been processed by the actor,
183 /// which allows it to return an output.
184 ///
185 /// If an output is not needed and it is not needed to wait for the order
186 /// to be processed, use [`queue_blocking`].
187 ///
188 /// [`query_blocking`]: Actor::query_blocking
189 /// [`queue_blocking`]: Actor::queue_blocking
190 pub async fn query_blocking<F, R>(&self, f: F) -> Option<R>
191 where
192 F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send + 'static,
193 R: 'static + Send,
194 {
195 let mut send = self.0.clone();
196 let (output_send, output_recv) = oneshot::channel();
197
198 let f: Blocking<T> = Box::new(move |state: &mut T| {
199 async move {
200 let output = f(state).await;
201 let _ = output_send.send(output);
202 }
203 .boxed()
204 });
205
206 send.send(StateChange::Async(f)).await.ok()?;
207
208 output_recv.await.ok()
209 }
210
211 /// Queue a function on the state. It is more performant to have multiple
212 /// [`queue`]/[`query`] in a row, as it can avoid using `.await` on the internal channel
213 /// or on a future-based change ([`queue_blocking`]/[`query_blocking`]).
214 ///
215 /// [`query_blocking`] resolves once the order as been processed by the actor,
216 /// which allows it to return an output.
217 ///
218 /// If an output is not needed and it is not needed to wait for the order
219 /// to be processed, use [`queue`].
220 ///
221 ///
222 /// [`queue`]: Actor::queue
223 /// [`query`]: Actor::query
224 /// [`queue_blocking`]: Actor::queue_blocking
225 /// [`query_blocking`]: Actor::query_blocking
226 pub async fn query<F, R>(&self, f: F) -> Option<R>
227 where
228 F: FnOnce(&mut T) -> R + 'static + Send,
229 R: 'static + Send,
230 {
231 let mut invoke_tx = self.0.clone();
232 let (response_tx, response_rx) = oneshot::channel();
233
234 invoke_tx
235 .send(StateChange::Sync(Box::new(move |state| {
236 let output = f(state);
237 let _ = response_tx.send(output);
238 })))
239 .await
240 .ok()?;
241
242 response_rx.await.ok()
243 }
244
245 /// Tells if the actor still accepts new invokes.
246 pub fn is_active(&self) -> bool {
247 !self.0.is_closed()
248 }
249
250 /// Stop the actor, which will process every already queued invokes
251 /// before really stopping.
252 pub fn shutdown(&self) {
253 self.0.clone().close_channel()
254 }
255}
256
257impl<T: 'static + Send> Clone for Actor<T> {
258 fn clone(&self) -> Self {
259 Self(self.0.clone())
260 }
261}
262
263impl<T: 'static + Send> PartialEq for Actor<T> {
264 fn eq(&self, other: &Self) -> bool {
265 self.0.same_receiver(&other.0)
266 }
267}
268
269impl<T: 'static + Send> Eq for Actor<T> {}
270
271impl<T: 'static + Send> Hash for Actor<T> {
272 fn hash<Hasher: std::hash::Hasher>(&self, hasher: &mut Hasher) {
273 self.0.hash_receiver(hasher);
274 }
275}