1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18use std::time::SystemTime;
19
20use tokio::sync::RwLock;
21use tokio::time::interval;
22use tracing::error;
23use tracing::info;
24use uuid::Uuid;
25
26use crate::schedule::executor::ExecutorConfig;
27use crate::schedule::task::TaskExecution;
28use crate::schedule::trigger::DelayedIntervalTrigger;
29use crate::schedule::trigger::Trigger;
30use crate::schedule::ExecutorPool;
31use crate::schedule::SchedulerError;
32use crate::schedule::SchedulerResult;
33use crate::schedule::Task;
34use crate::DelayTrigger;
35
36#[derive(Debug, Clone)]
38pub struct SchedulerConfig {
39 pub executor_config: ExecutorConfig,
40 pub executor_pool_size: usize,
41 pub check_interval: Duration,
42 pub max_scheduler_threads: usize,
43 pub enable_persistence: bool,
44 pub persistence_interval: Duration,
45}
46
47impl Default for SchedulerConfig {
48 fn default() -> Self {
49 Self {
50 executor_config: ExecutorConfig::default(),
51 executor_pool_size: 3,
52 check_interval: Duration::from_secs(1),
53 max_scheduler_threads: 2,
54 enable_persistence: false,
55 persistence_interval: Duration::from_secs(60),
56 }
57 }
58}
59
60#[derive(Clone)]
62pub struct ScheduledJob {
63 pub id: String,
64 pub task: Arc<Task>,
65 pub trigger: Arc<dyn Trigger>,
66 pub next_execution: Option<SystemTime>,
67 pub enabled: bool,
68 pub created_at: SystemTime,
69 pub last_execution: Option<SystemTime>,
70}
71
72impl ScheduledJob {
73 pub fn new(task: Arc<Task>, trigger: Arc<dyn Trigger>) -> Self {
74 let next_execution = trigger.next_execution_time(SystemTime::now());
75
76 Self {
77 id: Uuid::new_v4().to_string(),
78 task,
79 trigger,
80 next_execution,
81 enabled: true,
82 created_at: SystemTime::now(),
83 last_execution: None,
84 }
85 }
86
87 pub fn with_id(mut self, id: impl Into<String>) -> Self {
88 self.id = id.into();
89 self
90 }
91
92 pub fn enabled(mut self, enabled: bool) -> Self {
93 self.enabled = enabled;
94 self
95 }
96
97 pub fn update_next_execution(&mut self) {
98 let after = self.last_execution.unwrap_or_else(SystemTime::now);
99 self.next_execution = self.trigger.next_execution_time(after);
100 }
101
102 pub fn should_execute(&self, now: SystemTime) -> bool {
103 self.enabled && self.next_execution.is_some_and(|next| next <= now)
104 }
105}
106
107pub struct TaskScheduler {
109 config: SchedulerConfig,
110 executor_pool: Arc<ExecutorPool>,
111 jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
112 running: Arc<RwLock<bool>>,
113 scheduler_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
114}
115
116impl Default for TaskScheduler {
117 fn default() -> Self {
118 Self::new(SchedulerConfig::default())
119 }
120}
121
122impl TaskScheduler {
123 pub fn new(config: SchedulerConfig) -> Self {
125 let executor_pool = Arc::new(ExecutorPool::new(
126 config.executor_pool_size,
127 config.executor_config.clone(),
128 ));
129
130 Self {
131 config,
132 executor_pool,
133 jobs: Arc::new(RwLock::new(HashMap::new())),
134 running: Arc::new(RwLock::new(false)),
135 scheduler_handles: Arc::new(RwLock::new(Vec::new())),
136 }
137 }
138
139 pub async fn start(&self) -> SchedulerResult<()> {
141 let mut running = self.running.write().await;
142 if *running {
143 return Err(SchedulerError::SystemError("Scheduler is already running".to_string()));
144 }
145 *running = true;
146
147 info!("Starting task scheduler");
148
149 let mut handles = self.scheduler_handles.write().await;
151
152 for i in 0..self.config.max_scheduler_threads {
153 let scheduler = self.clone_for_thread();
154 let handle = tokio::spawn(async move {
155 scheduler.scheduler_loop(i).await;
156 });
157 handles.push(handle);
158 }
159
160 if self.config.enable_persistence {
162 let scheduler = self.clone_for_thread();
163 let handle = tokio::spawn(async move {
164 scheduler.cleanup_loop().await;
165 });
166 handles.push(handle);
167 }
168
169 info!(
170 "Task scheduler started with {} threads",
171 self.config.max_scheduler_threads
172 );
173 Ok(())
174 }
175
176 pub async fn stop(&self) -> SchedulerResult<()> {
178 let mut running = self.running.write().await;
179 if !*running {
180 return Ok(());
181 }
182 *running = false;
183
184 info!("Stopping task scheduler");
185
186 let mut handles = self.scheduler_handles.write().await;
188 for handle in handles.drain(..) {
189 handle.abort();
190 }
191
192 info!("Task scheduler stopped");
193 Ok(())
194 }
195
196 pub async fn schedule_job(&self, task: Arc<Task>, trigger: Arc<dyn Trigger>) -> SchedulerResult<String> {
198 let job = ScheduledJob::new(task.clone(), trigger);
199 let job_id = job.id.clone();
200
201 let mut jobs = self.jobs.write().await;
202 if jobs.contains_key(&task.id) {
203 return Err(SchedulerError::TaskAlreadyExists(task.id.clone()));
204 }
205
206 jobs.insert(job_id.clone(), job);
207 info!("Job scheduled: {} ({})", task.name, job_id);
208
209 Ok(job_id)
210 }
211
212 pub async fn schedule_job_with_id(
214 &self,
215 job_id: impl Into<String>,
216 task: Arc<Task>,
217 trigger: Arc<dyn Trigger>,
218 ) -> SchedulerResult<String> {
219 let job_id = job_id.into();
220 let job = ScheduledJob::new(task.clone(), trigger).with_id(job_id.clone());
221
222 let mut jobs = self.jobs.write().await;
223 if jobs.contains_key(&job_id) {
224 return Err(SchedulerError::TaskAlreadyExists(job_id));
225 }
226
227 jobs.insert(job_id.clone(), job);
228 info!("Job scheduled with ID: {} ({})", task.name, job_id);
229
230 Ok(job_id)
231 }
232
233 pub async fn unschedule_job(&self, job_id: &str) -> SchedulerResult<()> {
235 let mut jobs = self.jobs.write().await;
236 if jobs.remove(job_id).is_some() {
237 info!("Job unscheduled: {}", job_id);
238 Ok(())
239 } else {
240 Err(SchedulerError::TaskNotFound(job_id.to_string()))
241 }
242 }
243
244 pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> SchedulerResult<()> {
246 let mut jobs = self.jobs.write().await;
247 if let Some(job) = jobs.get_mut(job_id) {
248 job.enabled = enabled;
249 info!(
250 "Job {} {}: {}",
251 if enabled { "enabled" } else { "disabled" },
252 job_id,
253 job.task.name
254 );
255 Ok(())
256 } else {
257 Err(SchedulerError::TaskNotFound(job_id.to_string()))
258 }
259 }
260
261 pub async fn get_job(&self, job_id: &str) -> Option<ScheduledJob> {
263 let jobs = self.jobs.read().await;
264 jobs.get(job_id).cloned()
265 }
266
267 pub async fn get_all_jobs(&self) -> Vec<ScheduledJob> {
269 let jobs = self.jobs.read().await;
270 jobs.values().cloned().collect()
271 }
272
273 pub async fn get_jobs_by_group(&self, group: &str) -> Vec<ScheduledJob> {
275 let jobs = self.jobs.read().await;
276 jobs.values()
277 .filter(|job| job.task.group.as_ref().is_some_and(|g| g == group))
278 .cloned()
279 .collect()
280 }
281
282 pub async fn execute_job_now(&self, job_id: &str) -> SchedulerResult<String> {
284 let job = {
285 let jobs = self.jobs.read().await;
286 jobs.get(job_id).cloned()
287 };
288
289 if let Some(job) = job {
290 let executor = self.executor_pool.get_executor().await;
291 let execution_id = executor.execute_task(job.task, SystemTime::now()).await?;
292 info!("Job executed immediately: {} ({})", job_id, execution_id);
293 Ok(execution_id)
294 } else {
295 Err(SchedulerError::TaskNotFound(job_id.to_string()))
296 }
297 }
298
299 pub async fn schedule_delayed_job(&self, task: Arc<Task>, delay: Duration) -> SchedulerResult<String> {
301 let trigger = Arc::new(DelayTrigger::new(delay));
302 self.schedule_job(task, trigger).await
303 }
304
305 pub async fn schedule_delayed_job_with_id(
307 &self,
308 job_id: impl Into<String>,
309 task: Arc<Task>,
310 delay: Duration,
311 ) -> SchedulerResult<String> {
312 let trigger = Arc::new(DelayTrigger::new(delay));
313 self.schedule_job_with_id(job_id, task, trigger).await
314 }
315
316 pub async fn schedule_interval_job_with_delay(
318 &self,
319 task: Arc<Task>,
320 interval: Duration,
321 initial_delay: Duration,
322 ) -> SchedulerResult<String> {
323 let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
324 self.schedule_job(task, trigger).await
325 }
326
327 pub async fn schedule_interval_job_with_delay_and_id(
329 &self,
330 job_id: impl Into<String>,
331 task: Arc<Task>,
332 interval: Duration,
333 initial_delay: Duration,
334 ) -> SchedulerResult<String> {
335 let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
336 self.schedule_job_with_id(job_id, task, trigger).await
337 }
338
339 pub async fn execute_job_now_with_delay(
341 &self,
342 job_id: &str,
343 execution_delay: Option<Duration>,
344 ) -> SchedulerResult<String> {
345 let job = {
346 let jobs = self.jobs.read().await;
347 jobs.get(job_id).cloned()
348 };
349
350 if let Some(job) = job {
351 let executor = self.executor_pool.get_executor().await;
352 let execution_id = executor
353 .execute_task_with_delay(job.task, SystemTime::now(), execution_delay)
354 .await?;
355 info!("Job executed immediately with delay: {} ({})", job_id, execution_id);
356 Ok(execution_id)
357 } else {
358 Err(SchedulerError::TaskNotFound(job_id.to_string()))
359 }
360 }
361
362 pub async fn get_execution(&self, execution_id: &str) -> Option<TaskExecution> {
364 for _ in 0..self.config.executor_pool_size {
366 let executor = self.executor_pool.get_executor().await;
367 if let Some(execution) = executor.get_execution(execution_id).await {
368 return Some(execution);
369 }
370 }
371 None
372 }
373
374 pub async fn cancel_execution(&self, execution_id: &str) -> SchedulerResult<()> {
376 for _ in 0..self.config.executor_pool_size {
378 let executor = self.executor_pool.get_executor().await;
379 if executor.cancel_task(execution_id).await.is_ok() {
380 return Ok(());
381 }
382 }
383 Err(SchedulerError::TaskNotFound(execution_id.to_string()))
384 }
385
386 pub async fn get_status(&self) -> SchedulerStatus {
388 let jobs = self.jobs.read().await;
389 let running = *self.running.read().await;
390 let total_jobs = jobs.len();
391 let enabled_jobs = jobs.values().filter(|job| job.enabled).count();
392 let running_tasks = self.executor_pool.total_running_tasks().await;
393
394 SchedulerStatus {
395 running,
396 total_jobs,
397 enabled_jobs,
398 running_tasks,
399 }
400 }
401
402 fn clone_for_thread(&self) -> TaskSchedulerInternal {
405 TaskSchedulerInternal {
406 config: self.config.clone(),
407 executor_pool: self.executor_pool.clone(),
408 jobs: self.jobs.clone(),
409 running: self.running.clone(),
410 }
411 }
412
413 async fn scheduler_loop(&self, thread_id: usize) {
414 let internal = self.clone_for_thread();
415 internal.scheduler_loop(thread_id).await;
416 }
417
418 async fn cleanup_loop(&self) {
419 let internal = self.clone_for_thread();
420 internal.cleanup_loop().await;
421 }
422}
423
424#[derive(Clone)]
426struct TaskSchedulerInternal {
427 config: SchedulerConfig,
428 executor_pool: Arc<ExecutorPool>,
429 jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
430 running: Arc<RwLock<bool>>,
431}
432
433impl TaskSchedulerInternal {
434 async fn scheduler_loop(&self, thread_id: usize) {
435 info!("Scheduler thread {} started", thread_id);
436
437 let mut interval = interval(self.config.check_interval);
438
439 while *self.running.read().await {
440 interval.tick().await;
441
442 let now = SystemTime::now();
443 let jobs_to_execute = self.get_jobs_to_execute(now).await;
444
445 for job_id in jobs_to_execute {
446 if !*self.running.read().await {
447 break;
448 }
449
450 self.execute_job(&job_id, now).await;
451 }
452 }
453
454 info!("Scheduler thread {} stopped", thread_id);
455 }
456
457 async fn cleanup_loop(&self) {
458 info!("Cleanup thread started");
459
460 let mut interval = interval(self.config.persistence_interval);
461
462 while *self.running.read().await {
463 interval.tick().await;
464
465 let cleanup_duration = Duration::from_secs(24 * 3600);
467 for _ in 0..self.config.executor_pool_size {
468 let executor = self.executor_pool.get_executor().await;
469 executor.cleanup_old_executions(cleanup_duration).await;
470 }
471 }
472
473 info!("Cleanup thread stopped");
474 }
475
476 async fn get_jobs_to_execute(&self, now: SystemTime) -> Vec<String> {
477 let jobs = self.jobs.read().await;
478 jobs.values()
479 .filter(|job| job.should_execute(now))
480 .map(|job| job.id.clone())
481 .collect()
482 }
483
484 async fn execute_job(&self, job_id: &str, now: SystemTime) {
485 let job = {
486 let jobs = self.jobs.read().await;
487 jobs.get(job_id).cloned()
488 };
489
490 if let Some(mut job) = job {
491 let executor = self.executor_pool.get_executor().await;
493
494 match executor.execute_task(job.task.clone(), now).await {
495 Ok(execution_id) => {
496 job.last_execution = Some(now);
498 job.update_next_execution();
499
500 let mut jobs = self.jobs.write().await;
502 jobs.insert(job_id.to_string(), job);
503
504 info!("Job executed: {} ({})", job_id, execution_id);
505 }
506 Err(e) => {
507 error!("Failed to execute job {}: {}", job_id, e);
508 }
509 }
510 }
511 }
512}
513
514#[derive(Debug, Clone)]
516pub struct SchedulerStatus {
517 pub running: bool,
518 pub total_jobs: usize,
519 pub enabled_jobs: usize,
520 pub running_tasks: usize,
521}