use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use crate::actor::{Actor, ActorRef, Handler};
use crate::message::Message;
pub trait TimerHandle: Send + 'static {
fn cancel(self);
}
pub fn send_after<A, M, R>(actor_ref: &R, msg: M, delay: Duration) -> JoinHandle<()>
where
A: Actor + Handler<M>,
M: Message<Reply = ()>,
R: ActorRef<A> + Clone + 'static,
{
let actor = actor_ref.clone();
tokio::spawn(async move {
tokio::time::sleep(delay).await;
let _ = actor.tell(msg);
})
}
pub fn send_interval<A, M, R, F>(
actor_ref: &R,
msg_factory: F,
interval: Duration,
cancel: CancellationToken,
) -> JoinHandle<()>
where
A: Actor + Handler<M>,
M: Message<Reply = ()>,
R: ActorRef<A> + Clone + 'static,
F: Fn() -> M + Send + 'static,
{
let actor = actor_ref.clone();
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
tokio::select! {
biased;
_ = cancel.cancelled() => break,
_ = interval_timer.tick() => {
if actor.tell(msg_factory()).is_err() { break; }
}
}
}
})
}
#[cfg(test)]
#[cfg(feature = "test-support")]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use async_trait::async_trait;
use crate::actor::{Actor, ActorContext};
use crate::message::Message;
use crate::test_support::test_runtime::TestRuntime;
struct Tick;
impl Message for Tick {
type Reply = ();
}
struct TickCounter {
count: Arc<AtomicU64>,
}
impl Actor for TickCounter {
type Args = Arc<AtomicU64>;
type Deps = ();
fn create(args: Arc<AtomicU64>, _deps: ()) -> Self {
Self { count: args }
}
}
#[async_trait]
impl Handler<Tick> for TickCounter {
async fn handle(&mut self, _msg: Tick, _ctx: &mut ActorContext) {
self.count.fetch_add(1, Ordering::SeqCst);
}
}
#[tokio::test]
async fn send_after_delivers_message() {
let count = Arc::new(AtomicU64::new(0));
let rt = TestRuntime::new();
let actor = rt.spawn::<TickCounter>("ticker", count.clone()).await.unwrap();
send_after::<TickCounter, Tick, _>(&actor, Tick, Duration::from_millis(50));
tokio::time::sleep(Duration::from_millis(10)).await;
assert_eq!(count.load(Ordering::SeqCst), 0);
tokio::time::sleep(Duration::from_millis(100)).await;
assert_eq!(count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn send_interval_delivers_multiple_messages() {
let count = Arc::new(AtomicU64::new(0));
let rt = TestRuntime::new();
let actor = rt.spawn::<TickCounter>("ticker", count.clone()).await.unwrap();
let cancel = CancellationToken::new();
send_interval::<TickCounter, Tick, _, _>(
&actor,
|| Tick,
Duration::from_millis(30),
cancel.clone(),
);
tokio::time::sleep(Duration::from_millis(200)).await;
let received = count.load(Ordering::SeqCst);
assert!(received >= 3, "expected ≥ 3 ticks, got {}", received);
cancel.cancel();
tokio::time::sleep(Duration::from_millis(50)).await;
let after_cancel = count.load(Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(100)).await;
let final_count = count.load(Ordering::SeqCst);
assert_eq!(after_cancel, final_count, "ticks continued after cancel");
}
#[tokio::test]
async fn send_interval_stops_when_actor_stops() {
let count = Arc::new(AtomicU64::new(0));
let rt = TestRuntime::new();
let actor = rt.spawn::<TickCounter>("ticker", count.clone()).await.unwrap();
let cancel = CancellationToken::new();
let handle = send_interval::<TickCounter, Tick, _, _>(
&actor,
|| Tick,
Duration::from_millis(20),
cancel,
);
tokio::time::sleep(Duration::from_millis(100)).await;
actor.stop();
let _ = tokio::time::timeout(Duration::from_secs(1), handle).await;
}
}