use crate::error::{ActorError, Result};
use crate::{ActorSystem, Message, Pid};
use dashmap::DashMap;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use tokio_util::sync::CancellationToken;
#[cfg(feature = "telemetry")]
use crate::telemetry::SchedulerMetrics;
static TIMER_COUNTER: AtomicU64 = AtomicU64::new(1);
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Destination {
Pid(Pid),
Name(String),
}
impl From<Pid> for Destination {
fn from(pid: Pid) -> Self {
Destination::Pid(pid)
}
}
impl From<String> for Destination {
fn from(name: String) -> Self {
Destination::Name(name)
}
}
impl From<&str> for Destination {
fn from(name: &str) -> Self {
Destination::Name(name.to_string())
}
}
impl fmt::Display for Destination {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Destination::Pid(pid) => write!(f, "{}", pid),
Destination::Name(name) => write!(f, "{}", name),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TimerRef(u64);
impl TimerRef {
pub(crate) fn new(id: u64) -> Self {
Self(id)
}
pub fn id(&self) -> u64 {
self.0
}
}
impl fmt::Display for TimerRef {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "#TimerRef<{}>", self.0)
}
}
pub(crate) struct Scheduler {
timers: Arc<DashMap<TimerRef, CancellationToken>>,
}
impl Scheduler {
pub(crate) fn new() -> Self {
Self {
timers: Arc::new(DashMap::new()),
}
}
pub(crate) fn send_after(
&self,
system: Arc<ActorSystem>,
dest: Destination,
msg: Message,
duration: Duration,
) -> TimerRef {
let timer_ref = TimerRef::new(TIMER_COUNTER.fetch_add(1, Ordering::Relaxed));
let cancel_token = CancellationToken::new();
self.timers.insert(timer_ref, cancel_token.clone());
#[cfg(feature = "telemetry")]
SchedulerMetrics::message_scheduled();
let timers = Arc::clone(&self.timers);
tokio::spawn(async move {
tokio::select! {
_ = cancel_token.cancelled() => {
#[cfg(feature = "telemetry")]
SchedulerMetrics::timer_cancelled();
}
_ = tokio::time::sleep(duration) => {
let result = match &dest {
Destination::Pid(pid) => {
system.send(*pid, msg).await
}
Destination::Name(name) => {
match system.whereis(name) {
Some(pid) => system.send(pid, msg).await,
None => {
tracing::warn!(
"Scheduled message destination '{}' not found",
name
);
Err(ActorError::NameNotRegistered(name.clone()))
}
}
}
};
if let Err(e) = result {
tracing::warn!(
"Failed to deliver scheduled message to {}: {}",
dest,
e
);
}
#[cfg(feature = "telemetry")]
SchedulerMetrics::message_delivered();
}
}
timers.remove(&timer_ref);
});
timer_ref
}
pub(crate) fn cancel_timer(&self, timer_ref: TimerRef) -> Result<bool> {
if let Some((_, token)) = self.timers.remove(&timer_ref) {
token.cancel();
#[cfg(feature = "telemetry")]
SchedulerMetrics::timer_cancelled();
Ok(true)
} else {
Ok(false)
}
}
#[allow(dead_code)]
pub(crate) fn active_timer_count(&self) -> usize {
self.timers.len()
}
}
impl Default for Scheduler {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_destination_from_pid() {
let pid = Pid::new();
let dest: Destination = pid.into();
assert_eq!(dest, Destination::Pid(pid));
}
#[test]
fn test_destination_from_string() {
let dest: Destination = "worker".into();
assert_eq!(dest, Destination::Name("worker".to_string()));
}
#[test]
fn test_destination_display() {
let pid = Pid::new();
let dest_pid = Destination::Pid(pid);
let dest_name = Destination::Name("worker".to_string());
assert!(format!("{}", dest_pid).contains(&pid.to_string()));
assert_eq!(format!("{}", dest_name), "worker");
}
#[test]
fn test_timer_ref_equality() {
let ref1 = TimerRef::new(1);
let ref2 = TimerRef::new(1);
let ref3 = TimerRef::new(2);
assert_eq!(ref1, ref2);
assert_ne!(ref1, ref3);
}
#[test]
fn test_timer_ref_display() {
let timer_ref = TimerRef::new(42);
assert_eq!(format!("{}", timer_ref), "#TimerRef<42>");
}
#[test]
fn test_scheduler_creation() {
let scheduler = Scheduler::new();
assert_eq!(scheduler.active_timer_count(), 0);
}
#[tokio::test]
async fn test_cancel_nonexistent_timer() {
let scheduler = Scheduler::new();
let fake_ref = TimerRef::new(999);
let result = scheduler.cancel_timer(fake_ref).unwrap();
assert!(!result); }
}