use super::types::SchedulerError;
use crate::core::base::component::action::ActionPriority;
use crate::core::platform::container::job::Job;
use crate::core::platform::container::task::{
ContentIndexingService, DataBackupService, EmailNotificationService, Task, TaskService,
};
use chrono::{DateTime, Datelike, Utc};
use std::collections::HashMap;
use std::time::Duration;
use tokio::time::interval;
use uuid::Uuid;
pub use crate::core::platform::container::schedule::{
Schedule, ScheduledJob, ScheduledJobInfo, SchedulerStats,
};
pub struct SchedulerOrchestrator {
scheduled_jobs: HashMap<Uuid, ScheduledJob>,
services: HashMap<String, Box<dyn TaskService>>,
running: bool,
tick_interval: Duration,
}
pub type Scheduler = SchedulerOrchestrator;
impl SchedulerOrchestrator {
pub fn new() -> Self {
let mut scheduler = Self {
scheduled_jobs: HashMap::new(),
services: HashMap::new(),
running: false,
tick_interval: Duration::from_secs(60), };
scheduler.register_default_services();
scheduler
}
pub fn register_service(&mut self, service: Box<dyn TaskService>) {
let service_name = service.name().to_string();
self.services.insert(service_name.clone(), service);
println!("Registered service: {}", service_name);
}
fn register_default_services(&mut self) {
self.register_service(Box::new(DataBackupService {
backup_path: "/var/backups".to_string(),
}));
self.register_service(Box::new(ContentIndexingService {
index_name: "main_index".to_string(),
}));
self.register_service(Box::new(EmailNotificationService {
smtp_server: "localhost:587".to_string(),
}));
}
pub fn add_job(&mut self, job: Job, schedule: Schedule) -> Result<Uuid, SchedulerError> {
for task in &job.tasks {
if !self.services.contains_key(&task.service_name) {
return Err(SchedulerError::ServiceNotFound(task.service_name.clone()));
}
}
let job_id = job.id();
let next_run = Self::calculate_next_run(&schedule);
let scheduled_job = ScheduledJob {
job,
schedule,
enabled: true,
next_run,
last_run: None,
run_count: 0,
};
self.scheduled_jobs.insert(job_id, scheduled_job);
println!("Added job {} to scheduler", job_id);
Ok(job_id)
}
pub fn remove_job(&mut self, job_id: Uuid) -> bool {
match self.scheduled_jobs.remove(&job_id) {
Some(scheduled_job) => {
println!("Removed job '{}' from scheduler", scheduled_job.job.name());
true
}
None => false,
}
}
pub fn enable_job(&mut self, job_id: Uuid) -> bool {
if let Some(scheduled_job) = self.scheduled_jobs.get_mut(&job_id) {
scheduled_job.enabled = true;
scheduled_job.next_run = Self::calculate_next_run(&scheduled_job.schedule);
println!("Enabled job '{}'", scheduled_job.job.name());
true
} else {
false
}
}
pub fn disable_job(&mut self, job_id: Uuid) -> bool {
if let Some(scheduled_job) = self.scheduled_jobs.get_mut(&job_id) {
scheduled_job.enabled = false;
scheduled_job.next_run = None;
println!("Disabled job '{}'", scheduled_job.job.name());
true
} else {
false
}
}
pub async fn start(&mut self) {
if self.running {
println!("Scheduler is already running");
return;
}
self.running = true;
println!(
"Starting scheduler with {} jobs and {} services",
self.scheduled_jobs.len(),
self.services.len()
);
if self.scheduled_jobs.is_empty() {
self.add_default_jobs();
}
self.execute_startup_jobs().await;
let mut interval = interval(self.tick_interval);
while self.running {
interval.tick().await;
self.check_and_execute_jobs().await;
}
}
pub fn stop(&mut self) {
self.running = false;
println!("Scheduler stopped");
}
async fn execute_startup_jobs(&mut self) {
let startup_job_ids: Vec<_> = self
.scheduled_jobs
.iter()
.filter(|(_, scheduled_job)| {
scheduled_job.enabled && matches!(scheduled_job.schedule, Schedule::OnStartup)
})
.map(|(id, _)| *id)
.collect();
for job_id in startup_job_ids {
self.execute_job(job_id).await;
}
}
async fn check_and_execute_jobs(&mut self) {
let now = Utc::now();
let mut jobs_to_execute = Vec::new();
for (job_id, scheduled_job) in &self.scheduled_jobs {
if scheduled_job.enabled
&& let Some(next_run) = scheduled_job.next_run
&& now >= next_run
{
jobs_to_execute.push(*job_id);
}
}
for job_id in jobs_to_execute {
self.execute_job(job_id).await;
}
}
async fn execute_job(&mut self, job_id: Uuid) {
if let Some(scheduled_job) = self.scheduled_jobs.get_mut(&job_id) {
println!("Executing scheduled job: '{}'", scheduled_job.job.name());
let start_time = Utc::now();
match scheduled_job.job.execute(&self.services).await {
Ok(_) => {
let stats = scheduled_job.job.job_stats();
println!(
"Job '{}' completed successfully: {} of {} tasks completed ({}% success rate)",
scheduled_job.job.name(),
stats.completed_tasks,
stats.total_tasks,
stats.success_rate
);
}
Err(e) => {
println!("Job '{}' failed: {}", scheduled_job.job.name(), e);
let stats = scheduled_job.job.job_stats();
if stats.completed_tasks > 0 {
println!(
" Partial completion: {} of {} tasks completed",
stats.completed_tasks, stats.total_tasks
);
}
}
}
scheduled_job.last_run = Some(start_time);
scheduled_job.run_count += 1;
scheduled_job.next_run = match &scheduled_job.schedule {
Schedule::Once(_) => None, Schedule::OnStartup => None, _ => Self::calculate_next_run(&scheduled_job.schedule),
};
if let Some(next_run) = scheduled_job.next_run {
println!(
"Next run for job '{}': {}",
scheduled_job.job.name(),
next_run
);
} else {
println!("Job '{}' will not run again", scheduled_job.job.name());
}
}
}
fn add_default_jobs(&mut self) {
let backup_task = Task::new(
"Daily Backup".to_string(),
"Performs daily data backup".to_string(),
"DataBackupService".to_string(),
);
let backup_job = Job::new(
"Daily Backup Job".to_string(),
"Automated daily backup job".to_string(),
vec![backup_task],
)
.with_priority(ActionPriority::High);
if let Err(e) = self.add_job(
backup_job,
Schedule::Interval(Duration::from_secs(6 * 3600)),
) {
println!("Failed to add backup job: {}", e);
}
let indexing_task = Task::new(
"Content Indexing".to_string(),
"Indexes content for search".to_string(),
"ContentIndexingService".to_string(),
);
let indexing_job = Job::new(
"Content Indexing Job".to_string(),
"Automated content indexing job".to_string(),
vec![indexing_task],
)
.with_priority(ActionPriority::Normal);
if let Err(e) = self.add_job(indexing_job, Schedule::Interval(Duration::from_secs(3600))) {
println!("Failed to add indexing job: {}", e);
}
let cleanup_task = Task::new(
"System Cleanup".to_string(),
"Cleans up temporary files".to_string(),
"DataBackupService".to_string(), );
let mut email_task = Task::new(
"Weekly Report".to_string(),
"Sends weekly status report".to_string(),
"EmailNotificationService".to_string(),
);
email_task
.action
.add_argument("to_email".to_string(), "admin@example.com")
.unwrap();
email_task
.action
.add_argument("subject".to_string(), "Weekly System Report")
.unwrap();
let maintenance_job = Job::new(
"Weekly Maintenance".to_string(),
"Weekly system maintenance and reporting".to_string(),
vec![cleanup_task, email_task],
)
.with_priority(ActionPriority::Low);
if let Err(e) = self.add_job(maintenance_job, Schedule::Weekly(0, 2, 0)) {
println!("Failed to add maintenance job: {}", e);
}
let validation_task = Task::new(
"System Validation".to_string(),
"Validates system on startup".to_string(),
"ContentIndexingService".to_string(),
);
let startup_job = Job::new(
"Startup Validation".to_string(),
"Validates system health on startup".to_string(),
vec![validation_task],
)
.with_priority(ActionPriority::Critical);
if let Err(e) = self.add_job(startup_job, Schedule::OnStartup) {
println!("Failed to add startup job: {}", e);
}
}
fn calculate_next_run(schedule: &Schedule) -> Option<DateTime<Utc>> {
let now = Utc::now();
match schedule {
Schedule::Interval(duration) => Some(now + chrono::Duration::from_std(*duration).ok()?),
Schedule::Daily(hour, minute) => {
let mut next = now.date_naive().and_hms_opt(*hour, *minute, 0)?;
if next <= now.naive_utc() {
next += chrono::Duration::days(1);
}
Some(DateTime::from_naive_utc_and_offset(next, Utc))
}
Schedule::Weekly(day_of_week, hour, minute) => {
let current_weekday = now.weekday().number_from_sunday() as u8;
let days_until_target = if *day_of_week >= current_weekday {
*day_of_week - current_weekday
} else {
7 - current_weekday + *day_of_week
};
let target_date =
now.date_naive() + chrono::Duration::days(days_until_target as i64);
let next = target_date.and_hms_opt(*hour, *minute, 0)?;
if days_until_target == 0 && next <= now.naive_utc() {
let next_week = target_date + chrono::Duration::days(7);
let next = next_week.and_hms_opt(*hour, *minute, 0)?;
Some(DateTime::from_naive_utc_and_offset(next, Utc))
} else {
Some(DateTime::from_naive_utc_and_offset(next, Utc))
}
}
Schedule::Monthly(day, hour, minute) => {
let mut target_date = now.date_naive();
if let Some(date_with_day) = target_date.with_day(*day) {
let next = date_with_day.and_hms_opt(*hour, *minute, 0)?;
if next > now.naive_utc() {
return Some(DateTime::from_naive_utc_and_offset(next, Utc));
}
}
target_date = target_date.with_day(1)?;
target_date = if target_date.month() == 12 {
target_date
.with_year(target_date.year() + 1)?
.with_month(1)?
} else {
target_date.with_month(target_date.month() + 1)?
};
let next = target_date.with_day(*day)?.and_hms_opt(*hour, *minute, 0)?;
Some(DateTime::from_naive_utc_and_offset(next, Utc))
}
Schedule::Once(datetime) => {
if *datetime > now {
Some(*datetime)
} else {
None }
}
Schedule::OnStartup => {
None }
}
}
pub fn list_jobs(&self) -> Vec<ScheduledJobInfo> {
self.scheduled_jobs
.iter()
.map(|(id, scheduled_job)| ScheduledJobInfo {
id: *id,
name: scheduled_job.job.name().to_string(),
enabled: scheduled_job.enabled,
next_run: scheduled_job.next_run,
last_run: scheduled_job.last_run,
run_count: scheduled_job.run_count,
task_count: scheduled_job.job.tasks.len(),
status: scheduled_job.job.status().clone(),
schedule: scheduled_job.schedule.clone(),
})
.collect()
}
pub fn get_job(&self, job_id: Uuid) -> Option<&ScheduledJob> {
self.scheduled_jobs.get(&job_id)
}
pub fn list_services(&self) -> Vec<String> {
self.services.keys().cloned().collect()
}
pub fn stats(&self) -> SchedulerStats {
let total_jobs = self.scheduled_jobs.len();
let enabled_jobs = self.scheduled_jobs.values().filter(|j| j.enabled).count();
let total_runs = self.scheduled_jobs.values().map(|j| j.run_count).sum();
let next_job = self
.scheduled_jobs
.values()
.filter(|j| j.enabled && j.next_run.is_some())
.min_by_key(|j| j.next_run.unwrap());
SchedulerStats {
total_jobs,
enabled_jobs,
total_runs,
total_services: self.services.len(),
next_job_name: next_job.map(|j| j.job.name().to_string()),
next_job_time: next_job.and_then(|j| j.next_run),
}
}
}
impl Default for SchedulerOrchestrator {
fn default() -> Self {
Self::new()
}
}
pub async fn start_scheduler() {
let mut scheduler = SchedulerOrchestrator::new();
scheduler.start().await;
}
pub fn create_scheduler_with_services(
services: Vec<Box<dyn TaskService>>,
) -> SchedulerOrchestrator {
let mut scheduler = SchedulerOrchestrator::new();
if !services.is_empty() {
scheduler.services.clear();
for service in services {
scheduler.register_service(service);
}
}
scheduler
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
#[tokio::test]
async fn test_scheduler_creation() {
let scheduler = SchedulerOrchestrator::new();
assert!(!scheduler.running);
assert_eq!(scheduler.scheduled_jobs.len(), 0);
assert!(!scheduler.services.is_empty()); }
#[tokio::test]
async fn test_add_job() {
let mut scheduler = SchedulerOrchestrator::new();
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
let job_id = scheduler
.add_job(job, Schedule::Interval(Duration::from_secs(3600)))
.unwrap();
assert_eq!(scheduler.scheduled_jobs.len(), 1);
assert!(scheduler.scheduled_jobs.contains_key(&job_id));
}
#[tokio::test]
async fn test_service_registration() {
let mut scheduler = SchedulerOrchestrator::new();
let initial_service_count = scheduler.services.len();
let custom_service = DataBackupService {
backup_path: "/custom/backup".to_string(),
};
scheduler.register_service(Box::new(custom_service));
assert_eq!(scheduler.services.len(), initial_service_count);
let service = scheduler.services.get("DataBackupService").unwrap();
assert_eq!(service.name(), "DataBackupService");
}
#[tokio::test]
async fn test_service_registration_new_service() {
let mut scheduler = SchedulerOrchestrator::new();
let initial_service_count = scheduler.services.len();
#[derive(Debug)]
struct CustomService;
#[async_trait]
impl crate::core::platform::container::task::TaskService for CustomService {
fn name(&self) -> &str {
"CustomTestService"
}
async fn execute(
&self,
_action: &crate::core::base::component::action::Action,
) -> Result<Option<serde_json::Value>, crate::core::platform::container::task::TaskError>
{
Ok(Some(serde_json::json!({"test": "success"})))
}
fn clone_service(&self) -> Box<dyn TaskService> {
Box::new(CustomService)
}
}
scheduler.register_service(Box::new(CustomService));
assert_eq!(scheduler.services.len(), initial_service_count + 1);
assert!(scheduler.services.contains_key("CustomTestService"));
}
#[tokio::test]
async fn test_service_replacement() {
let mut scheduler = SchedulerOrchestrator::new();
let initial_service_count = scheduler.services.len();
let replacement_service = DataBackupService {
backup_path: "/replacement/backup".to_string(),
};
scheduler.register_service(Box::new(replacement_service));
assert_eq!(scheduler.services.len(), initial_service_count);
assert!(scheduler.services.contains_key("DataBackupService"));
}
#[tokio::test]
async fn test_schedule_calculation() {
let interval_schedule = Schedule::Interval(Duration::from_secs(3600));
let next_run = SchedulerOrchestrator::calculate_next_run(&interval_schedule);
assert!(next_run.is_some());
let future_time = Utc::now() + chrono::Duration::hours(1);
let once_schedule = Schedule::Once(future_time);
let next_run = SchedulerOrchestrator::calculate_next_run(&once_schedule);
assert_eq!(next_run, Some(future_time));
let past_time = Utc::now() - chrono::Duration::hours(1);
let past_schedule = Schedule::Once(past_time);
let next_run = SchedulerOrchestrator::calculate_next_run(&past_schedule);
assert_eq!(next_run, None);
}
#[tokio::test]
async fn test_job_enable_disable() {
let mut scheduler = SchedulerOrchestrator::new();
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
let job_id = scheduler
.add_job(job, Schedule::Interval(Duration::from_secs(3600)))
.unwrap();
assert!(scheduler.disable_job(job_id));
let scheduled_job = scheduler.scheduled_jobs.get(&job_id).unwrap();
assert!(!scheduled_job.enabled);
assert!(scheduled_job.next_run.is_none());
assert!(scheduler.enable_job(job_id));
let scheduled_job = scheduler.scheduled_jobs.get(&job_id).unwrap();
assert!(scheduled_job.enabled);
assert!(scheduled_job.next_run.is_some());
}
#[tokio::test]
async fn test_error_variants_display() {
let service_not_found = SchedulerError::ServiceNotFound("TestService".to_string());
assert_eq!(
service_not_found.to_string(),
"Service not found: TestService"
);
let job_not_found = SchedulerError::JobNotFound(Uuid::new_v4());
assert!(job_not_found.to_string().contains("Job not found"));
let invalid_schedule = SchedulerError::InvalidSchedule("bad schedule".to_string());
assert_eq!(
invalid_schedule.to_string(),
"Invalid schedule: bad schedule"
);
}
#[tokio::test]
async fn test_remove_job() {
let mut scheduler = SchedulerOrchestrator::new();
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
let job_id = scheduler
.add_job(job, Schedule::Interval(Duration::from_secs(3600)))
.unwrap();
assert_eq!(scheduler.scheduled_jobs.len(), 1);
assert!(scheduler.remove_job(job_id));
assert_eq!(scheduler.scheduled_jobs.len(), 0);
assert!(!scheduler.remove_job(job_id));
}
#[tokio::test]
async fn test_enable_nonexistent_job() {
let mut scheduler = SchedulerOrchestrator::new();
let fake_id = Uuid::new_v4();
assert!(!scheduler.enable_job(fake_id));
}
#[tokio::test]
async fn test_disable_nonexistent_job() {
let mut scheduler = SchedulerOrchestrator::new();
let fake_id = Uuid::new_v4();
assert!(!scheduler.disable_job(fake_id));
}
#[tokio::test]
async fn test_add_job_with_missing_service() {
let mut scheduler = SchedulerOrchestrator::new();
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"NonExistentService".to_string(), );
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
let result = scheduler.add_job(job, Schedule::Interval(Duration::from_secs(3600)));
assert!(result.is_err());
match result {
Err(SchedulerError::ServiceNotFound(name)) => {
assert_eq!(name, "NonExistentService");
}
_ => panic!("Expected ServiceNotFound error"),
}
}
#[tokio::test]
async fn test_scheduler_stats() {
let scheduler = SchedulerOrchestrator::new();
let stats = scheduler.stats();
assert_eq!(stats.total_jobs, 0);
assert_eq!(stats.enabled_jobs, 0);
assert_eq!(stats.total_runs, 0);
assert!(stats.total_services > 0); assert!(stats.next_job_name.is_none());
assert!(stats.next_job_time.is_none());
}
#[tokio::test]
async fn test_scheduler_stats_with_jobs() {
let mut scheduler = SchedulerOrchestrator::new();
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
scheduler
.add_job(job, Schedule::Interval(Duration::from_secs(3600)))
.unwrap();
let stats = scheduler.stats();
assert_eq!(stats.total_jobs, 1);
assert_eq!(stats.enabled_jobs, 1);
assert!(stats.next_job_name.is_some());
assert!(stats.next_job_time.is_some());
}
#[tokio::test]
async fn test_scheduled_job_structure() {
let task = Task::new(
"Test Task".to_string(),
"A test task".to_string(),
"DataBackupService".to_string(),
);
let job = Job::new("Test Job".to_string(), "A test job".to_string(), vec![task]);
let schedule = Schedule::Interval(Duration::from_secs(3600));
let scheduled_job = ScheduledJob {
job: job.clone(),
schedule: schedule.clone(),
enabled: true,
next_run: Some(Utc::now()),
last_run: None,
run_count: 0,
};
assert!(scheduled_job.enabled);
assert!(scheduled_job.next_run.is_some());
assert!(scheduled_job.last_run.is_none());
assert_eq!(scheduled_job.run_count, 0);
}
#[tokio::test]
async fn test_schedule_variants() {
let interval = Schedule::Interval(Duration::from_secs(3600));
assert!(matches!(interval, Schedule::Interval(_)));
let daily = Schedule::Daily(9, 30); assert!(matches!(daily, Schedule::Daily(9, 30)));
let weekly = Schedule::Weekly(1, 10, 0); assert!(matches!(weekly, Schedule::Weekly(1, 10, 0)));
let monthly = Schedule::Monthly(15, 14, 30); assert!(matches!(monthly, Schedule::Monthly(15, 14, 30)));
let once_time = Utc::now() + chrono::Duration::hours(1);
let once = Schedule::Once(once_time);
assert!(matches!(once, Schedule::Once(_)));
let startup = Schedule::OnStartup;
assert!(matches!(startup, Schedule::OnStartup));
}
#[tokio::test]
async fn test_calculate_next_run_daily() {
let daily_schedule = Schedule::Daily(9, 0);
let next_run = SchedulerOrchestrator::calculate_next_run(&daily_schedule);
assert!(next_run.is_some());
}
}