use std::future::{ready, Ready};
use std::{fmt, future::Future, pin::Pin};
pub mod channel;
mod channels;
pub use self::channel::mpsc::*;
pub use channels::delayed_message::DelayedMessage;
pub use channels::interval_channel::{IntervalMessage, Intervaller};
pub use channels::singleshot::Singleshot;
use channel::mpsc;
use futures::FutureExt;
pub use lakka_macro::messages;
use tokio::sync::broadcast;
#[derive(Debug)]
pub enum Message<Ask, Tell> {
Ask(Ask),
Tell(Tell),
}
pub trait ActorHandle<T> {
fn new(tx: Box<dyn ChannelSender<T>>) -> Self;
}
pub trait UnboundedActorHandle<T> {
fn new(tx: Box<dyn UnboundedChannelSender<T>>) -> Self;
}
pub type UnboundedActorSender<T> = Box<dyn UnboundedChannelSender<T>>;
pub type ActorSender<T> = Box<dyn ChannelSender<T>>;
type ActorReceiver<T> = Box<dyn Channel<T>>;
pub trait UnboundedActor: Actor {
type Handle: UnboundedActorHandle<Message<Self::Ask, Self::Tell>> + fmt::Debug;
fn run(self) -> <Self as UnboundedActor>::Handle {
<Self as UnboundedActor>::run_with_channels(self, vec![])
}
fn run_with_channels(
self,
extra_channel_receivers: Vec<Box<dyn Channel<<Self as Actor>::Tell>>>,
) -> <Self as UnboundedActor>::Handle {
let (tx, rx) = crate::mpsc::unbounded_channel::<<Self as ActorMessage>::Message>();
let rx = Box::new(rx);
let tx = Box::new(tx);
self.run_task(rx, extra_channel_receivers);
<Self as UnboundedActor>::Handle::new(tx)
}
}
pub trait BoundedActor: Actor {
type Handle: ActorHandle<Message<Self::Ask, Self::Tell>> + fmt::Debug;
fn run(self) -> <Self as BoundedActor>::Handle {
self.run_bounded(100, vec![])
}
fn run_with_channels(
self,
extra_channel_receivers: Vec<Box<dyn Channel<<Self as Actor>::Tell>>>,
) -> <Self as BoundedActor>::Handle {
self.run_bounded(100, extra_channel_receivers)
}
fn run_bounded(
self,
limit: usize,
extra_channel_receivers: Vec<Box<dyn Channel<<Self as Actor>::Tell>>>,
) -> <Self as BoundedActor>::Handle {
let (tx, rx) = crate::mpsc::channel::<<Self as ActorMessage>::Message>(limit);
let rx = Box::new(rx);
let tx = Box::new(tx);
self.run_task(rx, extra_channel_receivers);
<Self as BoundedActor>::Handle::new(tx)
}
}
pub trait Actor: Sized + Send + 'static {
type Ask: Send;
type Tell: Clone + Send + fmt::Debug;
fn handle_asks(
&mut self,
msg: Self::Ask,
_ctx: &mut ActorContext<Self>,
) -> impl Future<Output = ()> + Send;
fn handle_tells(
&mut self,
msg: Self::Tell,
_ctx: &mut ActorContext<Self>,
) -> impl Future<Output = ()> + Send;
fn handle_message(
&mut self,
msg: Message<Self::Ask, Self::Tell>,
mut _ctx: &mut ActorContext<Self>,
) -> impl Future<Output = ()> + Send {
async move {
match msg {
Message::Ask(ask_msg) => self.handle_asks(ask_msg, _ctx).await,
Message::Tell(tell_msg) => self.handle_tells(tell_msg, _ctx).await,
}
}
}
fn run_task(
mut self,
rx: ActorReceiver<<Self as ActorMessage>::Message>,
mut extra_channel_receivers: Vec<Box<dyn Channel<<Self as Actor>::Tell>>>,
) {
tokio::spawn(async move {
let mut ctx = ActorContext::<Self> {
rx,
extra_rxs: vec![],
kill_flag: false,
};
loop {
if !ctx.extra_rxs.is_empty() {
extra_channel_receivers.append(&mut ctx.extra_rxs);
}
let mut remove_index: Option<usize> = None;
if !extra_channel_receivers.is_empty() {
let future = futures::future::select_all(
extra_channel_receivers
.iter_mut()
.map(|channel| channel.recv().boxed()),
);
tokio::select! {
msg = ctx.rx.recv() => {
match msg {
Ok(msg) => self.handle_message(msg, &mut ctx).await,
Err(_) => {
break;
}
}
},
(result, index, _) = future => {
match result {
Ok(msg) => self.handle_tells(msg, &mut ctx).await,
Err(_) => remove_index = Some(index),
}
}
}
if let Some(index) = remove_index {
extra_channel_receivers.swap_remove(index);
}
} else {
let msg = ctx.rx.recv().await;
match msg {
Ok(msg) => self.handle_message(msg, &mut ctx).await,
Err(_) => {
break;
}
}
}
if ctx.kill_flag {
break;
}
}
});
}
}
pub trait ActorMessage: Actor {
type Message: Send;
}
impl<T: Actor> ActorMessage for T {
type Message = Message<T::Ask, T::Tell>;
}
pub struct ActorContext<A>
where
A: Actor,
{
pub rx: Box<dyn Channel<Message<A::Ask, A::Tell>>>,
pub extra_rxs: Vec<Box<dyn Channel<A::Tell>>>,
pub kill_flag: bool,
}
impl<A: Actor + ActorMessage> ActorContext<A> {
pub fn new(rx: Box<dyn Channel<Message<A::Ask, A::Tell>>>) -> Self {
Self {
rx,
extra_rxs: vec![],
kill_flag: false,
}
}
pub fn shut_down_actor(&mut self) {
self.kill_flag = true;
}
pub fn tell(&mut self, msg: A::Tell) {
let msg = Singleshot::new(msg);
self.extra_rxs.push(Box::new(msg));
}
pub fn delayed_tell(&mut self, msg: A::Tell, delay: std::time::Duration) {
let msg = DelayedMessage {
value: Some(msg),
delay: Box::pin(tokio::time::sleep(delay)),
};
self.extra_rxs.push(Box::new(msg));
}
pub fn add_channel(&mut self, channel: Box<dyn Channel<A::Tell>>) {
self.extra_rxs.push(channel);
}
}
#[derive(Debug)]
pub enum ActorError {
ActorClosed,
}
impl std::error::Error for ActorError {}
impl fmt::Display for ActorError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ActorError::ActorClosed => write!(f, "Actor is closed"),
}
}
}
impl<T> From<SendError<T>> for ActorError {
fn from(_: SendError<T>) -> Self {
ActorError::ActorClosed
}
}
impl From<RecvError> for ActorError {
fn from(_: RecvError) -> Self {
ActorError::ActorClosed
}
}
pub trait ActorChannelSender<'a, T>: Send + Sync + fmt::Debug {
type SendFuture: Future<Output = Result<(), ActorError>> + Send + 'a;
fn send(&'a self, msg: T) -> Self::SendFuture;
fn clone_box(&self) -> Box<dyn ActorChannelSender<'a, T, SendFuture = Self::SendFuture>>;
}
impl<'a, T: Send + 'static> ActorChannelSender<'a, T> for mpsc::Sender<T> {
type SendFuture = Pin<Box<dyn Future<Output = Result<(), ActorError>> + Send + 'a>>;
fn send(&'a self, msg: T) -> Self::SendFuture {
Box::pin(async move { self.send(msg).await.map_err(|e| e.into()) })
}
fn clone_box(&self) -> Box<dyn ActorChannelSender<'a, T, SendFuture = Self::SendFuture>> {
Box::new(self.clone())
}
}
impl<'a, T: Send + 'static> ActorChannelSender<'a, T> for mpsc::UnboundedSender<T> {
type SendFuture = Ready<Result<(), ActorError>>;
fn send(&self, msg: T) -> Self::SendFuture {
ready(self.send(msg).map_err(|e| e.into()))
}
fn clone_box(&self) -> Box<dyn ActorChannelSender<'a, T, SendFuture = Self::SendFuture>> {
Box::new(self.clone())
}
}
pub trait UnboundedChannelSender<T>: fmt::Debug {
fn send(&self, msg: T) -> Result<(), ActorError>;
fn clone_box(&self) -> Box<dyn UnboundedChannelSender<T>>;
}
impl<T> Clone for Box<dyn UnboundedChannelSender<T>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl<T: Send + 'static> UnboundedChannelSender<T> for mpsc::UnboundedSender<T> {
fn send(&self, msg: T) -> Result<(), ActorError> {
self.send(msg).map_err(|e| e.into())
}
fn clone_box(&self) -> Box<dyn UnboundedChannelSender<T>> {
Box::new(self.clone())
}
}
pub trait ChannelSender<T>: Send + Sync + fmt::Debug {
fn send(&self, msg: T) -> Pin<Box<dyn Future<Output = Result<(), ActorError>> + Send + '_>>;
fn clone_box(&self) -> Box<dyn ChannelSender<T>>;
}
impl<T> Clone for Box<dyn ChannelSender<T>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl<T: Send + 'static> ChannelSender<T> for mpsc::Sender<T> {
fn send(&self, msg: T) -> Pin<Box<dyn Future<Output = Result<(), ActorError>> + Send + '_>> {
Box::pin(async move { self.send(msg).await.map_err(|e| e.into()) })
}
fn clone_box(&self) -> Box<dyn ChannelSender<T>> {
Box::new(self.clone())
}
}
impl<T: Send + 'static> ChannelSender<T> for kanal::AsyncSender<T> {
fn send(&self, msg: T) -> Pin<Box<dyn Future<Output = Result<(), ActorError>> + Send + '_>> {
Box::pin(async move {
match self.send(msg).await {
Ok(_) => Ok(()),
Err(_) => Err(ActorError::ActorClosed),
}
})
}
fn clone_box(&self) -> Box<dyn ChannelSender<T>> {
Box::new(self.clone())
}
}
pub trait Channel<T>: Send {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send + '_>>;
}
impl<T: Send> Channel<T> for mpsc::Receiver<T> {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send + '_>> {
Box::pin(async move {
match self.recv().await {
Some(value) => Ok(value),
None => Err(ActorError::ActorClosed),
}
})
}
}
impl<T: Send> Channel<T> for mpsc::UnboundeReceiver<T> {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send + '_>> {
Box::pin(async move {
match self.recv().await {
Some(value) => Ok(value),
None => Err(ActorError::ActorClosed),
}
})
}
}
impl<T: Send + Clone> Channel<T> for broadcast::Receiver<T> {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send + '_>> {
Box::pin(async move {
match broadcast::Receiver::recv(self).await {
Ok(value) => Ok(value),
Err(err) => match err {
tokio::sync::broadcast::error::RecvError::Closed => {
Err(ActorError::ActorClosed)
}
tokio::sync::broadcast::error::RecvError::Lagged(x) => {
eprint!("Lagged!: {}", x);
Err(ActorError::ActorClosed) }
},
}
})
}
}
impl<T: Send> Channel<T> for kanal::AsyncReceiver<T> {
fn recv(&mut self) -> Pin<Box<dyn Future<Output = Result<T, ActorError>> + Send + '_>> {
Box::pin(async move {
match kanal::AsyncReceiver::recv(self).await {
Ok(value) => Ok(value),
Err(_) => Err(ActorError::ActorClosed),
}
})
}
}