Skip to main content

rocketmq_rust/schedule/
scheduler.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18use std::time::SystemTime;
19
20use tokio::sync::RwLock;
21use tokio::time::interval;
22use tracing::error;
23use tracing::info;
24use uuid::Uuid;
25
26use crate::schedule::executor::ExecutorConfig;
27use crate::schedule::task::TaskExecution;
28use crate::schedule::trigger::DelayedIntervalTrigger;
29use crate::schedule::trigger::Trigger;
30use crate::schedule::ExecutorPool;
31use crate::schedule::SchedulerError;
32use crate::schedule::SchedulerResult;
33use crate::schedule::Task;
34use crate::DelayTrigger;
35
36/// Scheduler configuration
37#[derive(Debug, Clone)]
38pub struct SchedulerConfig {
39    pub executor_config: ExecutorConfig,
40    pub executor_pool_size: usize,
41    pub check_interval: Duration,
42    pub max_scheduler_threads: usize,
43    pub enable_persistence: bool,
44    pub persistence_interval: Duration,
45}
46
47impl Default for SchedulerConfig {
48    fn default() -> Self {
49        Self {
50            executor_config: ExecutorConfig::default(),
51            executor_pool_size: 3,
52            check_interval: Duration::from_secs(1),
53            max_scheduler_threads: 2,
54            enable_persistence: false,
55            persistence_interval: Duration::from_secs(60),
56        }
57    }
58}
59
60/// Scheduled job containing task and trigger
61#[derive(Clone)]
62pub struct ScheduledJob {
63    pub id: String,
64    pub task: Arc<Task>,
65    pub trigger: Arc<dyn Trigger>,
66    pub next_execution: Option<SystemTime>,
67    pub enabled: bool,
68    pub created_at: SystemTime,
69    pub last_execution: Option<SystemTime>,
70}
71
72impl ScheduledJob {
73    pub fn new(task: Arc<Task>, trigger: Arc<dyn Trigger>) -> Self {
74        let next_execution = trigger.next_execution_time(SystemTime::now());
75
76        Self {
77            id: Uuid::new_v4().to_string(),
78            task,
79            trigger,
80            next_execution,
81            enabled: true,
82            created_at: SystemTime::now(),
83            last_execution: None,
84        }
85    }
86
87    pub fn with_id(mut self, id: impl Into<String>) -> Self {
88        self.id = id.into();
89        self
90    }
91
92    pub fn enabled(mut self, enabled: bool) -> Self {
93        self.enabled = enabled;
94        self
95    }
96
97    pub fn update_next_execution(&mut self) {
98        let after = self.last_execution.unwrap_or_else(SystemTime::now);
99        self.next_execution = self.trigger.next_execution_time(after);
100    }
101
102    pub fn should_execute(&self, now: SystemTime) -> bool {
103        self.enabled && self.next_execution.is_some_and(|next| next <= now)
104    }
105}
106
107/// Main task scheduler
108pub struct TaskScheduler {
109    config: SchedulerConfig,
110    executor_pool: Arc<ExecutorPool>,
111    jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
112    running: Arc<RwLock<bool>>,
113    scheduler_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
114}
115
116impl Default for TaskScheduler {
117    fn default() -> Self {
118        Self::new(SchedulerConfig::default())
119    }
120}
121
122impl TaskScheduler {
123    /// Create a new task scheduler
124    pub fn new(config: SchedulerConfig) -> Self {
125        let executor_pool = Arc::new(ExecutorPool::new(
126            config.executor_pool_size,
127            config.executor_config.clone(),
128        ));
129
130        Self {
131            config,
132            executor_pool,
133            jobs: Arc::new(RwLock::new(HashMap::new())),
134            running: Arc::new(RwLock::new(false)),
135            scheduler_handles: Arc::new(RwLock::new(Vec::new())),
136        }
137    }
138
139    /// Start the scheduler
140    pub async fn start(&self) -> SchedulerResult<()> {
141        let mut running = self.running.write().await;
142        if *running {
143            return Err(SchedulerError::SystemError("Scheduler is already running".to_string()));
144        }
145        *running = true;
146
147        info!("Starting task scheduler");
148
149        // Start scheduler threads
150        let mut handles = self.scheduler_handles.write().await;
151
152        for i in 0..self.config.max_scheduler_threads {
153            let scheduler = self.clone_for_thread();
154            let handle = tokio::spawn(async move {
155                scheduler.scheduler_loop(i).await;
156            });
157            handles.push(handle);
158        }
159
160        // Start cleanup thread
161        if self.config.enable_persistence {
162            let scheduler = self.clone_for_thread();
163            let handle = tokio::spawn(async move {
164                scheduler.cleanup_loop().await;
165            });
166            handles.push(handle);
167        }
168
169        info!(
170            "Task scheduler started with {} threads",
171            self.config.max_scheduler_threads
172        );
173        Ok(())
174    }
175
176    /// Stop the scheduler
177    pub async fn stop(&self) -> SchedulerResult<()> {
178        let mut running = self.running.write().await;
179        if !*running {
180            return Ok(());
181        }
182        *running = false;
183
184        info!("Stopping task scheduler");
185
186        // Cancel all scheduler threads
187        let mut handles = self.scheduler_handles.write().await;
188        for handle in handles.drain(..) {
189            handle.abort();
190        }
191
192        info!("Task scheduler stopped");
193        Ok(())
194    }
195
196    /// Schedule a new job
197    pub async fn schedule_job(&self, task: Arc<Task>, trigger: Arc<dyn Trigger>) -> SchedulerResult<String> {
198        let job = ScheduledJob::new(task.clone(), trigger);
199        let job_id = job.id.clone();
200
201        let mut jobs = self.jobs.write().await;
202        if jobs.contains_key(&task.id) {
203            return Err(SchedulerError::TaskAlreadyExists(task.id.clone()));
204        }
205
206        jobs.insert(job_id.clone(), job);
207        info!("Job scheduled: {} ({})", task.name, job_id);
208
209        Ok(job_id)
210    }
211
212    /// Schedule a job with a specific ID
213    pub async fn schedule_job_with_id(
214        &self,
215        job_id: impl Into<String>,
216        task: Arc<Task>,
217        trigger: Arc<dyn Trigger>,
218    ) -> SchedulerResult<String> {
219        let job_id = job_id.into();
220        let job = ScheduledJob::new(task.clone(), trigger).with_id(job_id.clone());
221
222        let mut jobs = self.jobs.write().await;
223        if jobs.contains_key(&job_id) {
224            return Err(SchedulerError::TaskAlreadyExists(job_id));
225        }
226
227        jobs.insert(job_id.clone(), job);
228        info!("Job scheduled with ID: {} ({})", task.name, job_id);
229
230        Ok(job_id)
231    }
232
233    /// Unschedule a job
234    pub async fn unschedule_job(&self, job_id: &str) -> SchedulerResult<()> {
235        let mut jobs = self.jobs.write().await;
236        if jobs.remove(job_id).is_some() {
237            info!("Job unscheduled: {}", job_id);
238            Ok(())
239        } else {
240            Err(SchedulerError::TaskNotFound(job_id.to_string()))
241        }
242    }
243
244    /// Enable or disable a job
245    pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> SchedulerResult<()> {
246        let mut jobs = self.jobs.write().await;
247        if let Some(job) = jobs.get_mut(job_id) {
248            job.enabled = enabled;
249            info!(
250                "Job {} {}: {}",
251                if enabled { "enabled" } else { "disabled" },
252                job_id,
253                job.task.name
254            );
255            Ok(())
256        } else {
257            Err(SchedulerError::TaskNotFound(job_id.to_string()))
258        }
259    }
260
261    /// Get job information
262    pub async fn get_job(&self, job_id: &str) -> Option<ScheduledJob> {
263        let jobs = self.jobs.read().await;
264        jobs.get(job_id).cloned()
265    }
266
267    /// Get all jobs
268    pub async fn get_all_jobs(&self) -> Vec<ScheduledJob> {
269        let jobs = self.jobs.read().await;
270        jobs.values().cloned().collect()
271    }
272
273    /// Get jobs by task group
274    pub async fn get_jobs_by_group(&self, group: &str) -> Vec<ScheduledJob> {
275        let jobs = self.jobs.read().await;
276        jobs.values()
277            .filter(|job| job.task.group.as_ref().is_some_and(|g| g == group))
278            .cloned()
279            .collect()
280    }
281
282    /// Execute a job immediately (ad-hoc execution)
283    pub async fn execute_job_now(&self, job_id: &str) -> SchedulerResult<String> {
284        let job = {
285            let jobs = self.jobs.read().await;
286            jobs.get(job_id).cloned()
287        };
288
289        if let Some(job) = job {
290            let executor = self.executor_pool.get_executor().await;
291            let execution_id = executor.execute_task(job.task, SystemTime::now()).await?;
292            info!("Job executed immediately: {} ({})", job_id, execution_id);
293            Ok(execution_id)
294        } else {
295            Err(SchedulerError::TaskNotFound(job_id.to_string()))
296        }
297    }
298
299    /// Schedule a delayed job (execute once after delay)
300    pub async fn schedule_delayed_job(&self, task: Arc<Task>, delay: Duration) -> SchedulerResult<String> {
301        let trigger = Arc::new(DelayTrigger::new(delay));
302        self.schedule_job(task, trigger).await
303    }
304
305    /// Schedule a delayed job with specific ID
306    pub async fn schedule_delayed_job_with_id(
307        &self,
308        job_id: impl Into<String>,
309        task: Arc<Task>,
310        delay: Duration,
311    ) -> SchedulerResult<String> {
312        let trigger = Arc::new(DelayTrigger::new(delay));
313        self.schedule_job_with_id(job_id, task, trigger).await
314    }
315
316    /// Schedule an interval job with initial delay
317    pub async fn schedule_interval_job_with_delay(
318        &self,
319        task: Arc<Task>,
320        interval: Duration,
321        initial_delay: Duration,
322    ) -> SchedulerResult<String> {
323        let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
324        self.schedule_job(task, trigger).await
325    }
326
327    /// Schedule an interval job with initial delay and specific ID
328    pub async fn schedule_interval_job_with_delay_and_id(
329        &self,
330        job_id: impl Into<String>,
331        task: Arc<Task>,
332        interval: Duration,
333        initial_delay: Duration,
334    ) -> SchedulerResult<String> {
335        let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
336        self.schedule_job_with_id(job_id, task, trigger).await
337    }
338
339    /// Execute a job immediately with optional execution delay
340    pub async fn execute_job_now_with_delay(
341        &self,
342        job_id: &str,
343        execution_delay: Option<Duration>,
344    ) -> SchedulerResult<String> {
345        let job = {
346            let jobs = self.jobs.read().await;
347            jobs.get(job_id).cloned()
348        };
349
350        if let Some(job) = job {
351            let executor = self.executor_pool.get_executor().await;
352            let execution_id = executor
353                .execute_task_with_delay(job.task, SystemTime::now(), execution_delay)
354                .await?;
355            info!("Job executed immediately with delay: {} ({})", job_id, execution_id);
356            Ok(execution_id)
357        } else {
358            Err(SchedulerError::TaskNotFound(job_id.to_string()))
359        }
360    }
361
362    /// Get task execution
363    pub async fn get_execution(&self, execution_id: &str) -> Option<TaskExecution> {
364        // Try to find the execution in any executor
365        for _ in 0..self.config.executor_pool_size {
366            let executor = self.executor_pool.get_executor().await;
367            if let Some(execution) = executor.get_execution(execution_id).await {
368                return Some(execution);
369            }
370        }
371        None
372    }
373
374    /// Cancel a running task execution
375    pub async fn cancel_execution(&self, execution_id: &str) -> SchedulerResult<()> {
376        // Try to cancel in any executor
377        for _ in 0..self.config.executor_pool_size {
378            let executor = self.executor_pool.get_executor().await;
379            if executor.cancel_task(execution_id).await.is_ok() {
380                return Ok(());
381            }
382        }
383        Err(SchedulerError::TaskNotFound(execution_id.to_string()))
384    }
385
386    /// Get scheduler status
387    pub async fn get_status(&self) -> SchedulerStatus {
388        let jobs = self.jobs.read().await;
389        let running = *self.running.read().await;
390        let total_jobs = jobs.len();
391        let enabled_jobs = jobs.values().filter(|job| job.enabled).count();
392        let running_tasks = self.executor_pool.total_running_tasks().await;
393
394        SchedulerStatus {
395            running,
396            total_jobs,
397            enabled_jobs,
398            running_tasks,
399        }
400    }
401
402    // Internal methods
403
404    fn clone_for_thread(&self) -> TaskSchedulerInternal {
405        TaskSchedulerInternal {
406            config: self.config.clone(),
407            executor_pool: self.executor_pool.clone(),
408            jobs: self.jobs.clone(),
409            running: self.running.clone(),
410        }
411    }
412
413    async fn scheduler_loop(&self, thread_id: usize) {
414        let internal = self.clone_for_thread();
415        internal.scheduler_loop(thread_id).await;
416    }
417
418    async fn cleanup_loop(&self) {
419        let internal = self.clone_for_thread();
420        internal.cleanup_loop().await;
421    }
422}
423
424// Internal scheduler for running the scheduling loop
425#[derive(Clone)]
426struct TaskSchedulerInternal {
427    config: SchedulerConfig,
428    executor_pool: Arc<ExecutorPool>,
429    jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
430    running: Arc<RwLock<bool>>,
431}
432
433impl TaskSchedulerInternal {
434    async fn scheduler_loop(&self, thread_id: usize) {
435        info!("Scheduler thread {} started", thread_id);
436
437        let mut interval = interval(self.config.check_interval);
438
439        while *self.running.read().await {
440            interval.tick().await;
441
442            let now = SystemTime::now();
443            let jobs_to_execute = self.get_jobs_to_execute(now).await;
444
445            for job_id in jobs_to_execute {
446                if !*self.running.read().await {
447                    break;
448                }
449
450                self.execute_job(&job_id, now).await;
451            }
452        }
453
454        info!("Scheduler thread {} stopped", thread_id);
455    }
456
457    async fn cleanup_loop(&self) {
458        info!("Cleanup thread started");
459
460        let mut interval = interval(self.config.persistence_interval);
461
462        while *self.running.read().await {
463            interval.tick().await;
464
465            // Clean up old executions (keep last 24 hours)
466            let cleanup_duration = Duration::from_secs(24 * 3600);
467            for _ in 0..self.config.executor_pool_size {
468                let executor = self.executor_pool.get_executor().await;
469                executor.cleanup_old_executions(cleanup_duration).await;
470            }
471        }
472
473        info!("Cleanup thread stopped");
474    }
475
476    async fn get_jobs_to_execute(&self, now: SystemTime) -> Vec<String> {
477        let jobs = self.jobs.read().await;
478        jobs.values()
479            .filter(|job| job.should_execute(now))
480            .map(|job| job.id.clone())
481            .collect()
482    }
483
484    async fn execute_job(&self, job_id: &str, now: SystemTime) {
485        let job = {
486            let jobs = self.jobs.read().await;
487            jobs.get(job_id).cloned()
488        };
489
490        if let Some(mut job) = job {
491            // Get executor and execute task
492            let executor = self.executor_pool.get_executor().await;
493
494            match executor.execute_task(job.task.clone(), now).await {
495                Ok(execution_id) => {
496                    // Update job's last execution and next execution time
497                    job.last_execution = Some(now);
498                    job.update_next_execution();
499
500                    // Update the job in storage
501                    let mut jobs = self.jobs.write().await;
502                    jobs.insert(job_id.to_string(), job);
503
504                    info!("Job executed: {} ({})", job_id, execution_id);
505                }
506                Err(e) => {
507                    error!("Failed to execute job {}: {}", job_id, e);
508                }
509            }
510        }
511    }
512}
513
514/// Scheduler status information
515#[derive(Debug, Clone)]
516pub struct SchedulerStatus {
517    pub running: bool,
518    pub total_jobs: usize,
519    pub enabled_jobs: usize,
520    pub running_tasks: usize,
521}