use std::{convert::Infallible, error::Error, fmt::Debug, hash::Hash};
use async_trait::async_trait;
use flume::{Receiver, Sender};
use futures::channel::oneshot;
use snafu::ResultExt;
use tracing::{error, instrument, warn};
use super::types::CompletionToken;
use crate::{
executor::Executor,
types::Waiter,
util::either_error::{EitherError, LeftSnafu, RightSnafu},
};
pub trait Event: Eq + Debug + Ord + Hash + Send + Sync + 'static {
type Flags: Clone + Eq + Debug + Ord + Hash + Send + Sync + 'static;
fn flags(&self) -> Self::Flags;
fn token(&mut self) -> Option<CompletionToken> {
None
}
fn tokenize(&mut self) -> Option<CompletionToken> {
None
}
#[must_use]
fn stateless_clone(&self) -> Self;
}
impl Event for () {
type Flags = ();
fn flags(&self) -> Self::Flags {}
fn stateless_clone(&self) -> Self {}
}
#[async_trait]
pub trait EventConsumer<E: Event>: Send + Sync + 'static {
type Error: Error + Send + Sync + Sized + 'static;
async fn accept(&self, event: E) -> Result<(), Self::Error>;
fn accept_sync(&self, event: E) -> Result<(), Self::Error>;
}
#[async_trait]
impl<E: Event, F, H> EventConsumer<E> for (F, H)
where
F: Fn(&E::Flags, &E) -> bool + Send + Sync + 'static,
H: Fn(E) + Send + Sync + 'static,
{
type Error = Infallible;
async fn accept(&self, event: E) -> Result<(), Self::Error> {
self.1(event);
Ok(())
}
fn accept_sync(&self, event: E) -> Result<(), Self::Error> {
self.1(event);
Ok(())
}
}
#[async_trait]
impl<E: Event> EventConsumer<E> for Sender<E> {
type Error = flume::SendError<E>;
async fn accept(&self, event: E) -> Result<(), Self::Error> {
self.send_async(event).await
}
fn accept_sync(&self, event: E) -> Result<(), Self::Error> {
self.send(event)
}
}
#[async_trait]
pub trait EventProducer<E: Event>: Send + Sync + 'static {
type Error: Error + Send + Sync + 'static;
async fn register_consumer<C>(&self, consumer: C) -> Result<(), Self::Error>
where
C: EventConsumer<E> + Send + Sync + 'static;
async fn register_callback<F>(
&self,
callback: F,
token: CompletionToken,
) -> Result<(), Self::Error>
where
F: FnOnce(E) + Send + Sync + 'static;
fn register_consumer_sync<C>(&self, consumer: C) -> Result<(), Self::Error>
where
C: EventConsumer<E> + Send + Sync + 'static;
fn register_callback_sync<F>(
&self,
callback: F,
token: CompletionToken,
) -> Result<(), Self::Error>
where
F: FnOnce(E) + Send + Sync + 'static;
}
pub trait Actor<I: Event, O: Event, E: Executor>: Send + Sync + 'static {
type Inbox: EventConsumer<I> + 'static;
type Outbox: EventProducer<O> + 'static;
fn inbox(&self) -> &Self::Inbox;
fn outbox(&self) -> &Self::Outbox;
fn shutdown(&self) -> Waiter;
fn catchup(&self) -> Waiter;
}
#[async_trait]
pub trait ActorExt<I: Event, O: Event, E: Executor>: Actor<I, O, E> {
#[instrument(skip(self))]
async fn call(
&self,
mut event: I,
) -> Result<
Option<O>,
EitherError<
<Self::Inbox as EventConsumer<I>>::Error,
<Self::Outbox as EventProducer<O>>::Error,
>,
> {
if let Some(token) = event.tokenize() {
let (tx, rx) = oneshot::channel();
self.outbox()
.register_callback(
move |event| {
let _res = tx.send(event);
},
token,
)
.await
.context(RightSnafu)?;
self.inbox().accept(event).await.context(LeftSnafu)?;
match rx.await {
Ok(x) => Ok(Some(x)),
Err(e) => {
error!(?e, "Failed to await returned event");
Ok(None)
}
}
} else {
warn!("Event was not tokenizeable");
Ok(None)
}
}
#[instrument(skip(self))]
#[allow(clippy::type_complexity)]
fn call_sync(
&self,
mut event: I,
) -> Result<
Option<O>,
EitherError<
<Self::Inbox as EventConsumer<I>>::Error,
<Self::Outbox as EventProducer<O>>::Error,
>,
> {
if let Some(token) = event.tokenize() {
let (tx, rx) = flume::bounded(1);
self.outbox()
.register_callback_sync(
move |event| {
let _res = tx.send(event);
},
token,
)
.context(RightSnafu)?;
self.inbox().accept_sync(event).context(LeftSnafu)?;
match rx.recv() {
Ok(x) => Ok(Some(x)),
Err(e) => {
error!(?e, "Failed to await returned event");
Ok(None)
}
}
} else {
warn!("Event was not tokenizeable");
Ok(None)
}
}
#[instrument(skip(self))]
async fn stream(
&self,
bound: Option<usize>,
) -> Result<Receiver<O>, <Self::Outbox as EventProducer<O>>::Error> {
let (i, o) = match bound {
Some(bound) => flume::bounded(bound),
None => flume::unbounded(),
};
self.outbox().register_consumer(i).await?;
Ok(o)
}
#[instrument(skip(self))]
fn stream_sync(
&self,
bound: Option<usize>,
) -> Result<Receiver<O>, <Self::Outbox as EventProducer<O>>::Error> {
let (i, o) = match bound {
Some(bound) => flume::bounded(bound),
None => flume::unbounded(),
};
self.outbox().register_consumer_sync(i)?;
Ok(o)
}
}
impl<I: Event, O: Event, E: Executor, T: Actor<I, O, E>> ActorExt<I, O, E> for T {}