use derive_more::From;
use speare::{Actor, Backoff, Ctx, Limit, Node, Request, Supervision};
use tokio::task;
struct Accumulator {
items: Vec<String>,
}
#[derive(From)]
enum AccMsg {
Push(String),
GetItems(Request<(), Vec<String>>),
}
impl Actor for Accumulator {
type Props = bool; type Msg = AccMsg;
type Err = String;
async fn init(ctx: &mut Ctx<Self>) -> Result<Self, Self::Err> {
if *ctx.props() {
ctx.clear_mailbox();
}
Ok(Self { items: vec![] })
}
async fn handle(&mut self, msg: AccMsg, _: &mut Ctx<Self>) -> Result<(), Self::Err> {
match msg {
AccMsg::Push(s) => {
if s == "fail" {
return Err("fail".into());
}
self.items.push(s);
}
AccMsg::GetItems(req) => req.reply(self.items.clone()),
}
Ok(())
}
}
#[tokio::test]
async fn clear_mailbox_discards_pending_messages_on_restart() {
let clear_on_init = true;
let mut node = Node::default();
let handle = node
.actor::<Accumulator>(clear_on_init)
.supervision(Supervision::Restart {
max: Limit::None,
backoff: Backoff::None,
})
.spawn();
task::yield_now().await;
handle.send(AccMsg::Push("a".into()));
handle.send(AccMsg::Push("b".into()));
handle.send(AccMsg::Push("fail".into())); handle.send(AccMsg::Push("c".into())); handle.send(AccMsg::Push("d".into()));
task::yield_now().await;
let items: Vec<String> = handle.req(()).await.unwrap();
assert!(items.is_empty());
}
#[tokio::test]
async fn without_clear_mailbox_pending_messages_survive_restart() {
let clear_on_init = false;
let mut node = Node::default();
let handle = node
.actor::<Accumulator>(clear_on_init)
.supervision(Supervision::Restart {
max: Limit::None,
backoff: Backoff::None,
})
.spawn();
task::yield_now().await;
handle.send(AccMsg::Push("a".into()));
handle.send(AccMsg::Push("fail".into())); handle.send(AccMsg::Push("b".into())); task::yield_now().await;
let items: Vec<String> = handle.req(()).await.unwrap();
assert_eq!(items, vec!["b"]);
}
#[tokio::test]
async fn new_messages_after_clear_mailbox_are_received() {
let clear_on_init = true;
let mut node = Node::default();
let handle = node
.actor::<Accumulator>(clear_on_init)
.supervision(Supervision::Restart {
max: Limit::None,
backoff: Backoff::None,
})
.spawn();
task::yield_now().await;
handle.send(AccMsg::Push("fail".into()));
task::yield_now().await;
handle.send(AccMsg::Push("x".into()));
handle.send(AccMsg::Push("y".into()));
let items: Vec<String> = handle.req(()).await.unwrap();
assert_eq!(items, vec!["x", "y"]);
}