use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use async_channel::{Receiver, Sender, bounded};
use crate::error::TaskError;
use crate::task::{Task, TaskId};
use crate::timer::wheel::{MulitWheel, TaskStatus};
use crate::timer::{Timer, TimerEvent};
pub struct MiniTimer {
wheel: Arc<MulitWheel>,
event_receiver: Receiver<TimerEvent>,
event_sender: Sender<TimerEvent>,
timer: Timer,
is_running: Arc<AtomicBool>,
}
impl MiniTimer {
pub fn new() -> Self {
let (event_sender, event_receiver) = bounded(16);
let wheel = Arc::new(MulitWheel::new());
let timer = Timer::new(event_sender.clone());
let is_running = Arc::new(AtomicBool::new(true));
let mini_timer = Self {
wheel,
event_receiver,
event_sender,
timer,
is_running: is_running.clone(),
};
let mut timer_clone = mini_timer.clone();
tokio::spawn(async move {
timer_clone.run().await;
});
mini_timer
}
pub async fn tick(&self) {
let _ = self.event_sender.send(TimerEvent::Tick).await;
}
pub fn add_task(&self, task: Task) -> Result<(), TaskError> {
self.wheel.add_task(task)
}
pub fn remove_task(&self, task_id: TaskId) -> Option<Task> {
self.wheel.remove_task(task_id)
}
pub fn contains_task(&self, task_id: TaskId) -> bool {
self.wheel.task_tracking_info(task_id).is_some()
}
pub fn task_count(&self) -> usize {
self.wheel.task_tracker_map.len()
}
pub fn get_pending_tasks(&self) -> Vec<TaskId> {
self.wheel.get_all_pending_tasks()
}
pub fn get_running_tasks(&self) -> Vec<TaskId> {
self.wheel.get_running_tasks()
}
pub fn task_status(&self, task_id: TaskId) -> Option<TaskStatus> {
self.wheel.task_status(task_id)
}
pub fn advance_task(
&self,
task_id: TaskId,
duration: Option<std::time::Duration>,
reset_frequency: bool,
) -> Result<(), TaskError> {
let duration_secs = duration.map(|d| d.as_secs());
self.wheel
.accelerate_task(task_id, duration_secs, reset_frequency)
}
pub fn update_task(&self, task_id: TaskId, new_task: Task) -> Result<(), TaskError> {
self.wheel.update_task(task_id, new_task)
}
pub async fn stop(&self) {
if !self.is_running.load(Ordering::Relaxed) {
return;
}
self.timer.stop();
self.is_running.store(false, Ordering::Relaxed);
}
pub(crate) async fn run(&mut self) {
self.is_running.store(true, Ordering::Relaxed);
let mut timer = self.timer.clone();
tokio::spawn(async move {
timer.run().await;
});
tokio::time::sleep(Duration::from_millis(10)).await;
loop {
match self.event_receiver.recv().await {
Ok(TimerEvent::Tick) => {
self.wheel.tick();
let arrived_tasks = self.wheel.execute_arrived_tasks();
for task in arrived_tasks {
self.wheel.process_arrived_task(task);
}
}
Ok(TimerEvent::StopTimer) => {
break;
}
Err(_) => {
break;
}
}
}
self.is_running.store(false, Ordering::Relaxed);
}
pub fn is_running(&self) -> bool {
self.is_running.load(Ordering::Relaxed)
}
}
impl Default for MiniTimer {
fn default() -> Self {
Self::new()
}
}
impl Clone for MiniTimer {
fn clone(&self) -> Self {
Self {
wheel: self.wheel.clone(),
event_receiver: self.event_receiver.clone(),
event_sender: self.event_sender.clone(),
timer: self.timer.clone(),
is_running: self.is_running.clone(),
}
}
}
unsafe impl Send for MiniTimer {}
unsafe impl Sync for MiniTimer {}