1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use tokio::sync::RwLock;
23use tokio::time::interval;
24use tracing::error;
25use tracing::info;
26use uuid::Uuid;
27
28use crate::schedule::executor::ExecutorConfig;
29use crate::schedule::task::TaskExecution;
30use crate::schedule::trigger::DelayedIntervalTrigger;
31use crate::schedule::trigger::Trigger;
32use crate::schedule::ExecutorPool;
33use crate::schedule::SchedulerError;
34use crate::schedule::SchedulerResult;
35use crate::schedule::Task;
36use crate::DelayTrigger;
37
38#[derive(Debug, Clone)]
40pub struct SchedulerConfig {
41 pub executor_config: ExecutorConfig,
42 pub executor_pool_size: usize,
43 pub check_interval: Duration,
44 pub max_scheduler_threads: usize,
45 pub enable_persistence: bool,
46 pub persistence_interval: Duration,
47}
48
49impl Default for SchedulerConfig {
50 fn default() -> Self {
51 Self {
52 executor_config: ExecutorConfig::default(),
53 executor_pool_size: 3,
54 check_interval: Duration::from_secs(1),
55 max_scheduler_threads: 2,
56 enable_persistence: false,
57 persistence_interval: Duration::from_secs(60),
58 }
59 }
60}
61
62#[derive(Clone)]
64pub struct ScheduledJob {
65 pub id: String,
66 pub task: Arc<Task>,
67 pub trigger: Arc<dyn Trigger>,
68 pub next_execution: Option<SystemTime>,
69 pub enabled: bool,
70 pub created_at: SystemTime,
71 pub last_execution: Option<SystemTime>,
72}
73
74impl ScheduledJob {
75 pub fn new(task: Arc<Task>, trigger: Arc<dyn Trigger>) -> Self {
76 let next_execution = trigger.next_execution_time(SystemTime::now());
77
78 Self {
79 id: Uuid::new_v4().to_string(),
80 task,
81 trigger,
82 next_execution,
83 enabled: true,
84 created_at: SystemTime::now(),
85 last_execution: None,
86 }
87 }
88
89 pub fn with_id(mut self, id: impl Into<String>) -> Self {
90 self.id = id.into();
91 self
92 }
93
94 pub fn enabled(mut self, enabled: bool) -> Self {
95 self.enabled = enabled;
96 self
97 }
98
99 pub fn update_next_execution(&mut self) {
100 let after = self.last_execution.unwrap_or_else(SystemTime::now);
101 self.next_execution = self.trigger.next_execution_time(after);
102 }
103
104 pub fn should_execute(&self, now: SystemTime) -> bool {
105 self.enabled && self.next_execution.is_some_and(|next| next <= now)
106 }
107}
108
109pub struct TaskScheduler {
111 config: SchedulerConfig,
112 executor_pool: Arc<ExecutorPool>,
113 jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
114 running: Arc<RwLock<bool>>,
115 scheduler_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
116}
117
118impl Default for TaskScheduler {
119 fn default() -> Self {
120 Self::new(SchedulerConfig::default())
121 }
122}
123
124impl TaskScheduler {
125 pub fn new(config: SchedulerConfig) -> Self {
127 let executor_pool = Arc::new(ExecutorPool::new(
128 config.executor_pool_size,
129 config.executor_config.clone(),
130 ));
131
132 Self {
133 config,
134 executor_pool,
135 jobs: Arc::new(RwLock::new(HashMap::new())),
136 running: Arc::new(RwLock::new(false)),
137 scheduler_handles: Arc::new(RwLock::new(Vec::new())),
138 }
139 }
140
141 pub async fn start(&self) -> SchedulerResult<()> {
143 let mut running = self.running.write().await;
144 if *running {
145 return Err(SchedulerError::SystemError(
146 "Scheduler is already running".to_string(),
147 ));
148 }
149 *running = true;
150
151 info!("Starting task scheduler");
152
153 let mut handles = self.scheduler_handles.write().await;
155
156 for i in 0..self.config.max_scheduler_threads {
157 let scheduler = self.clone_for_thread();
158 let handle = tokio::spawn(async move {
159 scheduler.scheduler_loop(i).await;
160 });
161 handles.push(handle);
162 }
163
164 if self.config.enable_persistence {
166 let scheduler = self.clone_for_thread();
167 let handle = tokio::spawn(async move {
168 scheduler.cleanup_loop().await;
169 });
170 handles.push(handle);
171 }
172
173 info!(
174 "Task scheduler started with {} threads",
175 self.config.max_scheduler_threads
176 );
177 Ok(())
178 }
179
180 pub async fn stop(&self) -> SchedulerResult<()> {
182 let mut running = self.running.write().await;
183 if !*running {
184 return Ok(());
185 }
186 *running = false;
187
188 info!("Stopping task scheduler");
189
190 let mut handles = self.scheduler_handles.write().await;
192 for handle in handles.drain(..) {
193 handle.abort();
194 }
195
196 info!("Task scheduler stopped");
197 Ok(())
198 }
199
200 pub async fn schedule_job(
202 &self,
203 task: Arc<Task>,
204 trigger: Arc<dyn Trigger>,
205 ) -> SchedulerResult<String> {
206 let job = ScheduledJob::new(task.clone(), trigger);
207 let job_id = job.id.clone();
208
209 let mut jobs = self.jobs.write().await;
210 if jobs.contains_key(&task.id) {
211 return Err(SchedulerError::TaskAlreadyExists(task.id.clone()));
212 }
213
214 jobs.insert(job_id.clone(), job);
215 info!("Job scheduled: {} ({})", task.name, job_id);
216
217 Ok(job_id)
218 }
219
220 pub async fn schedule_job_with_id(
222 &self,
223 job_id: impl Into<String>,
224 task: Arc<Task>,
225 trigger: Arc<dyn Trigger>,
226 ) -> SchedulerResult<String> {
227 let job_id = job_id.into();
228 let job = ScheduledJob::new(task.clone(), trigger).with_id(job_id.clone());
229
230 let mut jobs = self.jobs.write().await;
231 if jobs.contains_key(&job_id) {
232 return Err(SchedulerError::TaskAlreadyExists(job_id));
233 }
234
235 jobs.insert(job_id.clone(), job);
236 info!("Job scheduled with ID: {} ({})", task.name, job_id);
237
238 Ok(job_id)
239 }
240
241 pub async fn unschedule_job(&self, job_id: &str) -> SchedulerResult<()> {
243 let mut jobs = self.jobs.write().await;
244 if jobs.remove(job_id).is_some() {
245 info!("Job unscheduled: {}", job_id);
246 Ok(())
247 } else {
248 Err(SchedulerError::TaskNotFound(job_id.to_string()))
249 }
250 }
251
252 pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> SchedulerResult<()> {
254 let mut jobs = self.jobs.write().await;
255 if let Some(job) = jobs.get_mut(job_id) {
256 job.enabled = enabled;
257 info!(
258 "Job {} {}: {}",
259 if enabled { "enabled" } else { "disabled" },
260 job_id,
261 job.task.name
262 );
263 Ok(())
264 } else {
265 Err(SchedulerError::TaskNotFound(job_id.to_string()))
266 }
267 }
268
269 pub async fn get_job(&self, job_id: &str) -> Option<ScheduledJob> {
271 let jobs = self.jobs.read().await;
272 jobs.get(job_id).cloned()
273 }
274
275 pub async fn get_all_jobs(&self) -> Vec<ScheduledJob> {
277 let jobs = self.jobs.read().await;
278 jobs.values().cloned().collect()
279 }
280
281 pub async fn get_jobs_by_group(&self, group: &str) -> Vec<ScheduledJob> {
283 let jobs = self.jobs.read().await;
284 jobs.values()
285 .filter(|job| job.task.group.as_ref().is_some_and(|g| g == group))
286 .cloned()
287 .collect()
288 }
289
290 pub async fn execute_job_now(&self, job_id: &str) -> SchedulerResult<String> {
292 let job = {
293 let jobs = self.jobs.read().await;
294 jobs.get(job_id).cloned()
295 };
296
297 if let Some(job) = job {
298 let executor = self.executor_pool.get_executor().await;
299 let execution_id = executor.execute_task(job.task, SystemTime::now()).await?;
300 info!("Job executed immediately: {} ({})", job_id, execution_id);
301 Ok(execution_id)
302 } else {
303 Err(SchedulerError::TaskNotFound(job_id.to_string()))
304 }
305 }
306
307 pub async fn schedule_delayed_job(
309 &self,
310 task: Arc<Task>,
311 delay: Duration,
312 ) -> SchedulerResult<String> {
313 let trigger = Arc::new(DelayTrigger::new(delay));
314 self.schedule_job(task, trigger).await
315 }
316
317 pub async fn schedule_delayed_job_with_id(
319 &self,
320 job_id: impl Into<String>,
321 task: Arc<Task>,
322 delay: Duration,
323 ) -> SchedulerResult<String> {
324 let trigger = Arc::new(DelayTrigger::new(delay));
325 self.schedule_job_with_id(job_id, task, trigger).await
326 }
327
328 pub async fn schedule_interval_job_with_delay(
330 &self,
331 task: Arc<Task>,
332 interval: Duration,
333 initial_delay: Duration,
334 ) -> SchedulerResult<String> {
335 let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
336 self.schedule_job(task, trigger).await
337 }
338
339 pub async fn schedule_interval_job_with_delay_and_id(
341 &self,
342 job_id: impl Into<String>,
343 task: Arc<Task>,
344 interval: Duration,
345 initial_delay: Duration,
346 ) -> SchedulerResult<String> {
347 let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
348 self.schedule_job_with_id(job_id, task, trigger).await
349 }
350
351 pub async fn execute_job_now_with_delay(
353 &self,
354 job_id: &str,
355 execution_delay: Option<Duration>,
356 ) -> SchedulerResult<String> {
357 let job = {
358 let jobs = self.jobs.read().await;
359 jobs.get(job_id).cloned()
360 };
361
362 if let Some(job) = job {
363 let executor = self.executor_pool.get_executor().await;
364 let execution_id = executor
365 .execute_task_with_delay(job.task, SystemTime::now(), execution_delay)
366 .await?;
367 info!(
368 "Job executed immediately with delay: {} ({})",
369 job_id, execution_id
370 );
371 Ok(execution_id)
372 } else {
373 Err(SchedulerError::TaskNotFound(job_id.to_string()))
374 }
375 }
376
377 pub async fn get_execution(&self, execution_id: &str) -> Option<TaskExecution> {
379 for _ in 0..self.config.executor_pool_size {
381 let executor = self.executor_pool.get_executor().await;
382 if let Some(execution) = executor.get_execution(execution_id).await {
383 return Some(execution);
384 }
385 }
386 None
387 }
388
389 pub async fn cancel_execution(&self, execution_id: &str) -> SchedulerResult<()> {
391 for _ in 0..self.config.executor_pool_size {
393 let executor = self.executor_pool.get_executor().await;
394 if executor.cancel_task(execution_id).await.is_ok() {
395 return Ok(());
396 }
397 }
398 Err(SchedulerError::TaskNotFound(execution_id.to_string()))
399 }
400
401 pub async fn get_status(&self) -> SchedulerStatus {
403 let jobs = self.jobs.read().await;
404 let running = *self.running.read().await;
405 let total_jobs = jobs.len();
406 let enabled_jobs = jobs.values().filter(|job| job.enabled).count();
407 let running_tasks = self.executor_pool.total_running_tasks().await;
408
409 SchedulerStatus {
410 running,
411 total_jobs,
412 enabled_jobs,
413 running_tasks,
414 }
415 }
416
417 fn clone_for_thread(&self) -> TaskSchedulerInternal {
420 TaskSchedulerInternal {
421 config: self.config.clone(),
422 executor_pool: self.executor_pool.clone(),
423 jobs: self.jobs.clone(),
424 running: self.running.clone(),
425 }
426 }
427
428 async fn scheduler_loop(&self, thread_id: usize) {
429 let internal = self.clone_for_thread();
430 internal.scheduler_loop(thread_id).await;
431 }
432
433 async fn cleanup_loop(&self) {
434 let internal = self.clone_for_thread();
435 internal.cleanup_loop().await;
436 }
437}
438
439#[derive(Clone)]
441struct TaskSchedulerInternal {
442 config: SchedulerConfig,
443 executor_pool: Arc<ExecutorPool>,
444 jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
445 running: Arc<RwLock<bool>>,
446}
447
448impl TaskSchedulerInternal {
449 async fn scheduler_loop(&self, thread_id: usize) {
450 info!("Scheduler thread {} started", thread_id);
451
452 let mut interval = interval(self.config.check_interval);
453
454 while *self.running.read().await {
455 interval.tick().await;
456
457 let now = SystemTime::now();
458 let jobs_to_execute = self.get_jobs_to_execute(now).await;
459
460 for job_id in jobs_to_execute {
461 if !*self.running.read().await {
462 break;
463 }
464
465 self.execute_job(&job_id, now).await;
466 }
467 }
468
469 info!("Scheduler thread {} stopped", thread_id);
470 }
471
472 async fn cleanup_loop(&self) {
473 info!("Cleanup thread started");
474
475 let mut interval = interval(self.config.persistence_interval);
476
477 while *self.running.read().await {
478 interval.tick().await;
479
480 let cleanup_duration = Duration::from_secs(24 * 3600);
482 for _ in 0..self.config.executor_pool_size {
483 let executor = self.executor_pool.get_executor().await;
484 executor.cleanup_old_executions(cleanup_duration).await;
485 }
486 }
487
488 info!("Cleanup thread stopped");
489 }
490
491 async fn get_jobs_to_execute(&self, now: SystemTime) -> Vec<String> {
492 let jobs = self.jobs.read().await;
493 jobs.values()
494 .filter(|job| job.should_execute(now))
495 .map(|job| job.id.clone())
496 .collect()
497 }
498
499 async fn execute_job(&self, job_id: &str, now: SystemTime) {
500 let job = {
501 let jobs = self.jobs.read().await;
502 jobs.get(job_id).cloned()
503 };
504
505 if let Some(mut job) = job {
506 let executor = self.executor_pool.get_executor().await;
508
509 match executor.execute_task(job.task.clone(), now).await {
510 Ok(execution_id) => {
511 job.last_execution = Some(now);
513 job.update_next_execution();
514
515 let mut jobs = self.jobs.write().await;
517 jobs.insert(job_id.to_string(), job);
518
519 info!("Job executed: {} ({})", job_id, execution_id);
520 }
521 Err(e) => {
522 error!("Failed to execute job {}: {}", job_id, e);
523 }
524 }
525 }
526 }
527}
528
529#[derive(Debug, Clone)]
531pub struct SchedulerStatus {
532 pub running: bool,
533 pub total_jobs: usize,
534 pub enabled_jobs: usize,
535 pub running_tasks: usize,
536}