use crate::{
UsenetDownloader,
scheduler::{ScheduleAction, Scheduler},
};
use chrono::Local;
use std::sync::Arc;
use std::sync::atomic::Ordering;
use tokio::time::{Duration, sleep};
use tracing::{debug, info};
pub struct SchedulerTask {
scheduler: Arc<Scheduler>,
downloader: Arc<UsenetDownloader>,
}
impl SchedulerTask {
pub fn new(downloader: Arc<UsenetDownloader>, scheduler: Arc<Scheduler>) -> Self {
Self {
scheduler,
downloader,
}
}
pub async fn run(self) {
info!("Scheduler task started");
let mut last_action: Option<ScheduleAction> = None;
loop {
if !self
.downloader
.queue_state
.accepting_new
.load(Ordering::SeqCst)
{
info!("Scheduler task shutting down");
break;
}
let now = Local::now();
let current_action = self.scheduler.get_current_action(now);
if current_action != last_action {
match ¤t_action {
Some(action) => {
debug!(?action, "Schedule action changed, applying new action");
self.apply_action(action).await;
}
None => {
debug!("No schedule rules active, clearing any previous actions");
self.clear_schedule_actions().await;
}
}
last_action = current_action;
} else {
debug!(
?current_action,
"Schedule action unchanged, no action needed"
);
}
sleep(Duration::from_secs(60)).await;
}
info!("Scheduler task stopped");
}
async fn apply_action(&self, action: &ScheduleAction) {
match action {
ScheduleAction::SpeedLimit(limit_bps) => {
info!(limit_bps = %limit_bps, "Applying scheduled speed limit");
self.downloader.set_speed_limit(Some(*limit_bps)).await;
}
ScheduleAction::Unlimited => {
info!("Applying scheduled unlimited speed");
self.downloader.set_speed_limit(None).await;
}
ScheduleAction::Pause => {
info!("Applying scheduled pause");
if let Err(e) = self.downloader.pause_all().await {
tracing::warn!(error = %e, "Failed to pause downloads for scheduled action");
}
}
}
}
async fn clear_schedule_actions(&self) {
info!("Clearing schedule actions, reverting to default behavior");
self.downloader.set_speed_limit(None).await;
}
}
#[allow(clippy::unwrap_used, clippy::expect_used)]
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::{ScheduleRule, Weekday};
use chrono::{Datelike, NaiveTime, Timelike};
async fn create_test_downloader() -> (UsenetDownloader, tempfile::TempDir) {
crate::downloader::test_helpers::create_test_downloader().await
}
#[tokio::test]
async fn test_scheduler_task_shutdown_on_signal() {
let (downloader, _temp_dir) = create_test_downloader().await;
let scheduler = Scheduler::new(vec![]);
let downloader_arc = Arc::new(downloader);
downloader_arc
.queue_state
.accepting_new
.store(false, Ordering::SeqCst);
let task = SchedulerTask::new(downloader_arc.clone(), Arc::new(scheduler));
let handle = tokio::spawn(async move {
task.run().await;
});
let result = tokio::time::timeout(Duration::from_secs(1), handle).await;
assert!(
result.is_ok(),
"Scheduler task should exit on shutdown signal"
);
}
#[tokio::test]
async fn test_scheduler_task_applies_speed_limit() {
let (downloader, _temp_dir) = create_test_downloader().await;
let now = Local::now();
let current_weekday = Weekday::from_chrono(now.weekday());
let start_time = NaiveTime::from_hms_opt(now.hour().saturating_sub(1), 0, 0)
.unwrap_or(NaiveTime::from_hms_opt(0, 0, 0).unwrap());
let end_time = NaiveTime::from_hms_opt((now.hour() + 1) % 24, 59, 59)
.unwrap_or(NaiveTime::from_hms_opt(23, 59, 59).unwrap());
let rule = ScheduleRule {
id: crate::scheduler::RuleId(1),
name: "Test Speed Limit".into(),
days: vec![current_weekday],
start_time,
end_time,
action: ScheduleAction::SpeedLimit(1_000_000), enabled: true,
};
let scheduler = Scheduler::new(vec![rule]);
let downloader_arc = Arc::new(downloader);
let task = SchedulerTask::new(downloader_arc.clone(), Arc::new(scheduler));
assert_eq!(downloader_arc.get_speed_limit(), None);
task.apply_action(&ScheduleAction::SpeedLimit(1_000_000))
.await;
assert_eq!(downloader_arc.get_speed_limit(), Some(1_000_000));
}
#[tokio::test]
async fn test_scheduler_task_applies_unlimited() {
let (downloader, _temp_dir) = create_test_downloader().await;
let downloader_arc = Arc::new(downloader);
downloader_arc.set_speed_limit(Some(500_000)).await;
assert_eq!(downloader_arc.get_speed_limit(), Some(500_000));
let scheduler = Scheduler::new(vec![]);
let task = SchedulerTask::new(downloader_arc.clone(), Arc::new(scheduler));
task.apply_action(&ScheduleAction::Unlimited).await;
assert_eq!(downloader_arc.get_speed_limit(), None);
}
#[tokio::test]
async fn test_scheduler_task_clears_actions_when_no_rules_match() {
let (downloader, _temp_dir) = create_test_downloader().await;
let downloader_arc = Arc::new(downloader);
downloader_arc.set_speed_limit(Some(1_000_000)).await;
assert_eq!(downloader_arc.get_speed_limit(), Some(1_000_000));
let scheduler = Scheduler::new(vec![]);
let task = SchedulerTask::new(downloader_arc.clone(), Arc::new(scheduler));
task.clear_schedule_actions().await;
assert_eq!(downloader_arc.get_speed_limit(), None);
}
}