use std::time::Duration;
use crate::channel_with_priority::Priority;
use crate::mailbox::{Command, CommandOrMessage};
use crate::scheduler::{SimulateAdvanceTime, TimeShift};
use crate::spawn_builder::SpawnBuilder;
use crate::{Actor, KillSwitch, Mailbox, QueueCapacity, Scheduler};
pub struct Universe {
scheduler_mailbox: Mailbox<Scheduler>,
kill_switch: KillSwitch,
}
impl Universe {
#[allow(clippy::new_without_default)]
pub fn new() -> Universe {
let scheduler = Scheduler::default();
let kill_switch = KillSwitch::default();
let (mailbox, _inbox) =
crate::create_mailbox("fake-mailbox".to_string(), QueueCapacity::Unbounded);
let (scheduler_mailbox, _scheduler_inbox) =
SpawnBuilder::new(scheduler, mailbox, kill_switch.clone()).spawn();
Universe {
scheduler_mailbox,
kill_switch,
}
}
pub fn kill(&self) {
self.kill_switch.kill();
}
pub async fn simulate_time_shift(&self, duration: Duration) {
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
let _ = self
.scheduler_mailbox
.send_message(SimulateAdvanceTime {
time_shift: TimeShift::ByDuration(duration),
tx,
})
.await;
let _ = rx.await;
}
pub fn spawn_actor<A: Actor>(&self, actor: A) -> SpawnBuilder<A> {
SpawnBuilder::new(
actor,
self.scheduler_mailbox.clone(),
self.kill_switch.clone(),
)
}
pub async fn send_exit_with_success<A: Actor>(
&self,
mailbox: &Mailbox<A>,
) -> Result<(), crate::SendError> {
mailbox
.send_with_priority(
CommandOrMessage::Command(Command::ExitWithSuccess),
Priority::Low,
)
.await
}
}
impl Drop for Universe {
fn drop(&mut self) {
self.kill_switch.kill();
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use async_trait::async_trait;
use crate::{Actor, ActorContext, ActorExitStatus, Handler, Universe};
#[derive(Default)]
pub struct ActorWithSchedule {
count: usize,
}
#[async_trait]
impl Actor for ActorWithSchedule {
type ObservableState = usize;
fn observable_state(&self) -> usize {
self.count
}
async fn initialize(&mut self, ctx: &ActorContext<Self>) -> Result<(), ActorExitStatus> {
self.handle(Loop, ctx).await
}
}
#[derive(Debug)]
struct Loop;
#[async_trait]
impl Handler<Loop> for ActorWithSchedule {
type Reply = ();
async fn handle(
&mut self,
_msg: Loop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
self.count += 1;
ctx.schedule_self_msg(Duration::from_secs(60), Loop).await;
Ok(())
}
}
#[tokio::test]
async fn test_schedule_for_actor() {
let universe = Universe::new();
let actor_with_schedule = ActorWithSchedule::default();
let (_maibox, handler) = universe.spawn_actor(actor_with_schedule).spawn();
let count_after_initialization = handler.process_pending_and_observe().await.state;
assert_eq!(count_after_initialization, 1);
universe.simulate_time_shift(Duration::from_secs(200)).await;
let count_after_advance_time = handler.process_pending_and_observe().await.state;
assert_eq!(count_after_advance_time, 4);
}
}