1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
//! Provides an [`Actor`] type that wraps a state and allows mutating it
//! in turns using [`invoke`] and [`invoke_async`].
//!
//! [`invoke`]: Actor::invoke
//! [`invoke_async`]: Actor::invoke_async
//!
//! # Example
//!
//! It is recommended to create a wrapper type around the [`Actor`], and
//! implement async functions that use [`invoke`]/[`invoke_async`] to interact
//! with the inner private state.
//!
//! ```rust
//! use std::time::Duration;
//! use simple_actor::Actor;
//! use futures::FutureExt;
//!
//! #[derive(Clone)]
//! pub struct Adder(Actor<u32>);
//!
//! impl Adder {
//! pub fn new(initial_value: u32) -> Self {
//! let (actor, driver) = Actor::new(initial_value);
//! tokio::spawn(driver);
//! Self(actor)
//! }
//!
//! pub async fn add(&self, x: u32) {
//! let _ = self.0.invoke(move |state| {
//! // We can update the state.
//! *state += x
//! }).await;
//! }
//!
//! pub async fn add_twice_with_delay(&self, x: u32) -> Option<u32> {
//! self.0
//! .invoke_async(move |state| {
//! async move {
//! *state += x;
//! // We can .await while holding the state.
//! tokio::time::sleep(Duration::from_millis(500)).await;
//!
//! *state += x;
//! // We can return a value at the end.
//! *state
//! }
//! .boxed()
//! })
//! .await
//! }
//!
//! pub async fn result(&self) -> Option<u32> {
//! self.0.invoke(move |state| *state).await
//! }
//!
//! pub fn shutdown(&self) {
//! self.0.shutdown()
//! }
//! }
//!
//! #[tokio::main]
//! async fn main() {
//! let adder = Adder::new(5);
//!
//! adder.add(3).await;
//! assert_eq!(adder.result().await, Some(8));
//!
//! adder.add(2).await;
//! assert_eq!(adder.result().await, Some(10));
//!
//! assert_eq!(adder.add_twice_with_delay(3).await, Some(16));
//! assert_eq!(adder.result().await, Some(16));
//!
//! adder.shutdown();
//! assert_eq!(adder.result().await, None);
//! }
//! ```
//!
//! ## Inspiration
//!
//! This crate is inspired by [`ghost_actor`], with a simpler implementation and
//! API.
//!
//! This crate [`invoke`] function returns `None` if the actor is down, which
//! avoids dealing with error type conversions.
//!
//! It also allows to hold the state in [`invoke_async`] and thus use
//! async-based state.
//!
//! [`ghost_actor`]: https://github.com/holochain/ghost_actor
use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
};
use std::{hash::Hash, pin::Pin};
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
type InnerInvoke<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send>;
type SendInvoke<T> = mpsc::Sender<InnerInvoke<T>>;
/// Actor wrapping a state.
///
/// Cloning the actor provides an handle to the same actor.
pub struct Actor<T: 'static + Send>(SendInvoke<T>);
impl<T: 'static + Send> Actor<T> {
/// Creates a new `Actor` with default inbound channel capacity (1024).
///
/// Returned future must be spawned in an async executor.
#[must_use]
pub fn new(state: T) -> (Self, impl Future<Output = ()>) {
Self::new_with_capacity(state, 1024)
}
/// Creates a new `Actor` with given capacity for its inbound channel.
///
/// Returned future must be spawned in an async executor.
#[must_use]
pub fn new_with_capacity(mut state: T, capacity: usize) -> (Self, impl Future<Output = ()>) {
let (invoke_tx, mut invoke_rx) = mpsc::channel::<InnerInvoke<T>>(capacity);
let driver = FutureExt::boxed(async move {
while let Some(invoke) = invoke_rx.next().await {
invoke(&mut state).await;
}
});
(Self(invoke_tx), driver)
}
/// Interacts with the state using a closure returning a future.
/// This future holds the mutable reference to the state, and prevents the
/// actor to process further invokes until this future ends.
///
/// The future needs to be boxed using [`futures::FutureExt::boxed`].
///
/// Returns `None` if the actor is no longer running.
pub fn invoke_async<F, R>(&self, invoke: F) -> impl Future<Output = Option<R>>
where
F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send + 'static,
R: 'static + Send,
{
let mut invoke_tx = self.0.clone();
async move {
let (response_tx, response_rx) = oneshot::channel();
let real_invoke: InnerInvoke<T> = Box::new(move |state: &mut T| {
async move {
let res = invoke(state).await;
let _ = response_tx.send(res);
}
.boxed()
});
invoke_tx.send(real_invoke).await.ok()?;
response_rx.await.ok()
}
}
/// Interact with the state using a closure.
///
/// Returns `None` if the actor is no longer running.
pub async fn invoke<F, R>(&self, invoke: F) -> Option<R>
where
F: FnOnce(&mut T) -> R + 'static + Send,
R: 'static + Send,
{
self.invoke_async(|state| async move { invoke(state) }.boxed())
.await
}
/// Tells if the actor still accepts new invokes.
pub fn is_active(&self) -> bool {
!self.0.is_closed()
}
/// Stop the actor, which will process every already queued invokes
/// before really stopping.
pub fn shutdown(&self) {
self.0.clone().close_channel()
}
}
impl<T: 'static + Send> Clone for Actor<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: 'static + Send> PartialEq for Actor<T> {
fn eq(&self, other: &Self) -> bool {
self.0.same_receiver(&other.0)
}
}
impl<T: 'static + Send> Eq for Actor<T> {}
impl<T: 'static + Send> Hash for Actor<T> {
fn hash<Hasher: std::hash::Hasher>(&self, hasher: &mut Hasher) {
self.0.hash_receiver(hasher);
}
}