use futures::{
channel::{mpsc, oneshot},
Future, FutureExt, SinkExt, StreamExt,
};
use std::{hash::Hash, pin::Pin};
type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a + Send>>;
type Blocking<T> = Box<dyn for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send>;
type NonBlocking<T> = Box<dyn FnOnce(&mut T) + 'static + Send>;
enum StateChange<T> {
Async(Blocking<T>),
Sync(NonBlocking<T>),
}
type StateChangeSender<T> = mpsc::Sender<StateChange<T>>;
pub struct Actor<T: 'static + Send>(StateChangeSender<T>);
impl<T: 'static + Send> Actor<T> {
pub fn new(state: T) -> (Self, impl Future<Output = ()>) {
Self::new_with_capacity(state, 1024)
}
pub fn new_with_capacity(mut state: T, capacity: usize) -> (Self, impl Future<Output = ()>) {
let (send, recv) = mpsc::channel::<StateChange<T>>(capacity);
let driver = FutureExt::boxed(async move {
let mut recv = StreamExt::ready_chunks(recv, 1024);
while let Some(changes) = recv.next().await {
for change in changes {
match change {
StateChange::Async(f) => f(&mut state).await,
StateChange::Sync(f) => f(&mut state),
}
}
}
});
(Self(send), driver)
}
pub async fn queue_blocking<F>(&self, f: F) -> Option<()>
where
F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, ()> + Send + 'static,
{
let mut send = self.0.clone();
let f: Blocking<T> = Box::new(move |state: &mut T| {
async move {
f(state).await;
}
.boxed()
});
send.send(StateChange::Async(f)).await.ok()
}
pub async fn queue<F>(&self, f: F) -> Option<()>
where
F: FnOnce(&mut T) + 'static + Send,
{
let mut send = self.0.clone();
send.send(StateChange::Sync(Box::new(f))).await.ok()
}
pub async fn query_blocking<F, R>(&self, f: F) -> Option<R>
where
F: for<'a> FnOnce(&'a mut T) -> BoxFuture<'a, R> + Send + 'static,
R: 'static + Send,
{
let mut send = self.0.clone();
let (output_send, output_recv) = oneshot::channel();
let f: Blocking<T> = Box::new(move |state: &mut T| {
async move {
let output = f(state).await;
let _ = output_send.send(output);
}
.boxed()
});
send.send(StateChange::Async(f)).await.ok()?;
output_recv.await.ok()
}
pub async fn query<F, R>(&self, f: F) -> Option<R>
where
F: FnOnce(&mut T) -> R + 'static + Send,
R: 'static + Send,
{
let mut invoke_tx = self.0.clone();
let (response_tx, response_rx) = oneshot::channel();
invoke_tx
.send(StateChange::Sync(Box::new(move |state| {
let output = f(state);
let _ = response_tx.send(output);
})))
.await
.ok()?;
response_rx.await.ok()
}
pub fn is_active(&self) -> bool {
!self.0.is_closed()
}
pub fn shutdown(&self) {
self.0.clone().close_channel()
}
}
impl<T: 'static + Send> Clone for Actor<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<T: 'static + Send> PartialEq for Actor<T> {
fn eq(&self, other: &Self) -> bool {
self.0.same_receiver(&other.0)
}
}
impl<T: 'static + Send> Eq for Actor<T> {}
impl<T: 'static + Send> Hash for Actor<T> {
fn hash<Hasher: std::hash::Hasher>(&self, hasher: &mut Hasher) {
self.0.hash_receiver(hasher);
}
}