use std::{
future::Future,
marker::PhantomData,
pin::pin,
sync::{Arc, atomic::AtomicBool},
time::Duration,
};
use futures::{FutureExt, Stream, StreamExt as _, channel::oneshot};
use crate::{
Actor, Addr, Context,
actor::restart_strategy::{RecreateFromDefault, RestartOnly, RestartStrategy},
channel::{Channel, Rx},
context::Core,
context::StopNotifier,
handler::StreamHandler,
runtime::sleep,
};
mod payload;
pub(crate) use payload::Payload;
#[derive(Debug, Default)]
pub struct EventLoopConfig {
pub timeout: Option<Duration>,
pub fail_on_timeout: bool,
}
pub struct EventLoop<A: Actor, R: RestartStrategy<A> = RestartOnly> {
ctx: Context<A>,
addr: Addr<A>,
stop: StopNotifier,
config: EventLoopConfig,
rx: Rx<A>,
phantom: PhantomData<R>,
}
impl<A: Actor, R: RestartStrategy<A>> EventLoop<A, R> {
pub(crate) fn from_channel(channel: Channel<A>) -> Self {
let (tx_running, rx_running) = oneshot::channel::<()>();
let running_flag = Arc::new(AtomicBool::new(true));
let (tx, rx) = channel.break_up();
let weak_tx = tx.downgrade();
let ctx = Context {
weak_tx,
core: Core::new(Arc::clone(&running_flag), rx_running.shared()),
children: Default::default(),
tasks: Default::default(),
};
let stop = StopNotifier::new(running_flag, tx_running);
let addr = Addr {
tx,
core: ctx.core.clone(),
};
EventLoop {
ctx,
addr,
stop,
rx,
config: Default::default(),
phantom: PhantomData,
}
}
pub(crate) const fn with_config(mut self, config: EventLoopConfig) -> Self {
self.config = config;
self
}
}
impl<A: Actor> EventLoop<A> {
pub fn bounded(capacity: usize) -> Self {
Self::from_channel(Channel::bounded(capacity))
}
pub fn unbounded() -> Self {
Self::from_channel(Channel::unbounded())
}
pub fn abort_after(mut self, timeout: Duration) -> Self {
self.config.timeout = timeout.into();
self.config.fail_on_timeout = false;
self
}
pub fn fail_after(mut self, timeout: Duration) -> Self {
self.config.timeout = timeout.into();
self.config.fail_on_timeout = true;
self
}
}
pub async fn timeout_fut(
fut: impl Future<Output = ()>,
timeout: Option<Duration>,
) -> crate::DynResult<()> {
if let Some(timeout) = timeout {
futures::select! {
res = fut.map(Ok).fuse() => res,
_ = FutureExt::fuse(sleep(timeout)) => Err(crate::error::ActorError::Timeout.into())
}
} else {
fut.map(Ok).await
}
}
impl<A: Actor, R: RestartStrategy<A>> EventLoop<A, R> {
pub fn create(mut self, mut actor: A) -> (impl Future<Output = crate::DynResult<A>>, Addr<A>) {
let actor_loop = async move {
actor.started(&mut self.ctx).await?;
let timeout = self.config.timeout;
#[cfg(feature = "async_channel")]
log::trace!(actor=A::NAME, id:% =self.ctx.core.id; "still waiting for {} events", self.rx.len());
let mut payload_rx = pin!(self.rx);
while let Some(event) = payload_rx.next().await {
log::trace!(actor=A::NAME, id:% =self.ctx.core.id; "processing event");
match event {
Payload::Restart => {
log::trace!(actor=A::NAME, id:% =self.ctx.core.id; "restarting");
actor = R::refresh(actor, &mut self.ctx).await?
}
Payload::Task(f) => {
log::trace!(actor=A::NAME, id:% =self.ctx.core.id; "received task");
if let Err(err) = timeout_fut(f(&mut actor, &mut self.ctx), timeout).await {
if self.config.fail_on_timeout {
log::warn!(actor=A::NAME, id:% =self.ctx.core.id; "{}, exiting", err);
actor.cancelled(&mut self.ctx).await;
return Err(err);
} else {
log::warn!(actor=A::NAME, id:% =self.ctx.core.id; "{}, ignoring", err);
continue;
}
}
}
Payload::Stop => break,
}
}
actor.stopped(&mut self.ctx).await;
self.stop.notify();
Ok(actor)
};
(actor_loop, self.addr)
}
pub fn create_on_stream<S>(
mut self,
mut actor: A,
stream: S,
) -> (impl Future<Output = crate::DynResult<A>>, Addr<A>)
where
S: Stream + Unpin + Send + 'static,
S::Item: 'static + Send,
A: StreamHandler<S::Item>,
{
let timeout = self.config.timeout;
let mut stream = stream.fuse();
let actor_loop = async move {
actor.started(&mut self.ctx).await?;
let mut rx = pin!(self.rx);
loop {
futures::select! {
event = rx.next().fuse() => {
match event {
Some(Payload::Task(task_fn)) => {
log::trace!(name = A::NAME; "received task");
if let Err(err) = timeout_fut(task_fn(&mut actor, &mut self.ctx), timeout).await {
if self.config.fail_on_timeout {
log::warn!("{} {}, exiting", A::NAME, err);
actor.cancelled(&mut self.ctx).await;
return Err(err);
} else {
log::warn!("{} {}, ignoring", A::NAME, err);
}
}
}
Some(Payload::Stop) => break,
Some(Payload::Restart) => {
panic!("restart message in stream-handling actor")
},
None => break
}
},
stream_msg = stream.next() => {
let Some(msg) = stream_msg else {
actor.finished(&mut self.ctx).await;
continue;
};
log::trace!(name = A::NAME; "received stream message");
if let Err(err) = timeout_fut(
StreamHandler::handle(&mut actor, &mut self.ctx, msg) , timeout).await {
if self.config.fail_on_timeout {
log::warn!("{} {}, exiting", A::NAME, err);
actor.cancelled(&mut self.ctx).await;
return Err(err);
} else {
log::warn!("{} {}, ignoring", A::NAME, err);
continue;
}
}
},
}
}
actor.stopped(&mut self.ctx).await;
self.stop.notify();
Ok(actor)
};
(actor_loop, self.addr)
}
}
impl<A: Actor, R: RestartStrategy<A>> EventLoop<A, R>
where
A: Default,
{
pub fn recreating(self) -> EventLoop<A, RecreateFromDefault> {
EventLoop {
ctx: self.ctx,
addr: self.addr,
stop: self.stop,
config: self.config,
rx: self.rx,
phantom: PhantomData,
}
}
}
#[cfg(test)]
mod test_event_loop {
#![allow(clippy::unwrap_used)]
use futures::stream;
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering::SeqCst},
};
use super::*;
use crate::prelude::*;
struct StreamActor {
stopped: Arc<AtomicBool>,
finished: Option<futures::channel::oneshot::Sender<()>>,
}
impl Actor for StreamActor {
async fn stopped(&mut self, _ctx: &mut Context<Self>) {
self.stopped.store(true, SeqCst);
}
}
#[message(response = Echo)]
struct Echo;
impl Handler<Echo> for StreamActor {
async fn handle(&mut self, _ctx: &mut Context<Self>, msg: Echo) -> Echo {
msg
}
}
impl StreamHandler<i32> for StreamActor {
async fn handle(&mut self, _ctx: &mut Context<Self>, _msg: i32) {}
async fn finished(&mut self, _ctx: &mut Context<Self>) {
self.finished.take().unwrap().send(()).unwrap();
log::info!("StreamActor finished");
}
}
#[test_log::test(tokio::test)]
async fn finished_streams_notify_the_actor() {
let stopped = Arc::new(AtomicBool::new(false));
let (finished_sender, finished) = oneshot::channel::<()>();
let actor = StreamActor {
stopped: Arc::clone(&stopped),
finished: Some(finished_sender),
};
let num_stream = stream::iter(1..30).fuse();
let addr = crate::setup_actor(actor).on_stream(num_stream).spawn();
assert!(!stopped.load(SeqCst), "actor supposed to be running");
assert!(finished.await.is_ok(), "finished_called should be true");
assert!(!stopped.load(SeqCst), "actor supposed to be running");
assert!(addr.call(Echo).await.is_ok(), "actor supposed to respond");
}
}