rocketmq_rust/schedule/
scheduler.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Duration;
20use std::time::SystemTime;
21
22use tokio::sync::RwLock;
23use tokio::time::interval;
24use tracing::error;
25use tracing::info;
26use uuid::Uuid;
27
28use crate::schedule::executor::ExecutorConfig;
29use crate::schedule::task::TaskExecution;
30use crate::schedule::trigger::DelayedIntervalTrigger;
31use crate::schedule::trigger::Trigger;
32use crate::schedule::ExecutorPool;
33use crate::schedule::SchedulerError;
34use crate::schedule::SchedulerResult;
35use crate::schedule::Task;
36use crate::DelayTrigger;
37
38/// Scheduler configuration
39#[derive(Debug, Clone)]
40pub struct SchedulerConfig {
41    pub executor_config: ExecutorConfig,
42    pub executor_pool_size: usize,
43    pub check_interval: Duration,
44    pub max_scheduler_threads: usize,
45    pub enable_persistence: bool,
46    pub persistence_interval: Duration,
47}
48
49impl Default for SchedulerConfig {
50    fn default() -> Self {
51        Self {
52            executor_config: ExecutorConfig::default(),
53            executor_pool_size: 3,
54            check_interval: Duration::from_secs(1),
55            max_scheduler_threads: 2,
56            enable_persistence: false,
57            persistence_interval: Duration::from_secs(60),
58        }
59    }
60}
61
62/// Scheduled job containing task and trigger
63#[derive(Clone)]
64pub struct ScheduledJob {
65    pub id: String,
66    pub task: Arc<Task>,
67    pub trigger: Arc<dyn Trigger>,
68    pub next_execution: Option<SystemTime>,
69    pub enabled: bool,
70    pub created_at: SystemTime,
71    pub last_execution: Option<SystemTime>,
72}
73
74impl ScheduledJob {
75    pub fn new(task: Arc<Task>, trigger: Arc<dyn Trigger>) -> Self {
76        let next_execution = trigger.next_execution_time(SystemTime::now());
77
78        Self {
79            id: Uuid::new_v4().to_string(),
80            task,
81            trigger,
82            next_execution,
83            enabled: true,
84            created_at: SystemTime::now(),
85            last_execution: None,
86        }
87    }
88
89    pub fn with_id(mut self, id: impl Into<String>) -> Self {
90        self.id = id.into();
91        self
92    }
93
94    pub fn enabled(mut self, enabled: bool) -> Self {
95        self.enabled = enabled;
96        self
97    }
98
99    pub fn update_next_execution(&mut self) {
100        let after = self.last_execution.unwrap_or_else(SystemTime::now);
101        self.next_execution = self.trigger.next_execution_time(after);
102    }
103
104    pub fn should_execute(&self, now: SystemTime) -> bool {
105        self.enabled && self.next_execution.is_some_and(|next| next <= now)
106    }
107}
108
109/// Main task scheduler
110pub struct TaskScheduler {
111    config: SchedulerConfig,
112    executor_pool: Arc<ExecutorPool>,
113    jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
114    running: Arc<RwLock<bool>>,
115    scheduler_handles: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
116}
117
118impl Default for TaskScheduler {
119    fn default() -> Self {
120        Self::new(SchedulerConfig::default())
121    }
122}
123
124impl TaskScheduler {
125    /// Create a new task scheduler
126    pub fn new(config: SchedulerConfig) -> Self {
127        let executor_pool = Arc::new(ExecutorPool::new(
128            config.executor_pool_size,
129            config.executor_config.clone(),
130        ));
131
132        Self {
133            config,
134            executor_pool,
135            jobs: Arc::new(RwLock::new(HashMap::new())),
136            running: Arc::new(RwLock::new(false)),
137            scheduler_handles: Arc::new(RwLock::new(Vec::new())),
138        }
139    }
140
141    /// Start the scheduler
142    pub async fn start(&self) -> SchedulerResult<()> {
143        let mut running = self.running.write().await;
144        if *running {
145            return Err(SchedulerError::SystemError(
146                "Scheduler is already running".to_string(),
147            ));
148        }
149        *running = true;
150
151        info!("Starting task scheduler");
152
153        // Start scheduler threads
154        let mut handles = self.scheduler_handles.write().await;
155
156        for i in 0..self.config.max_scheduler_threads {
157            let scheduler = self.clone_for_thread();
158            let handle = tokio::spawn(async move {
159                scheduler.scheduler_loop(i).await;
160            });
161            handles.push(handle);
162        }
163
164        // Start cleanup thread
165        if self.config.enable_persistence {
166            let scheduler = self.clone_for_thread();
167            let handle = tokio::spawn(async move {
168                scheduler.cleanup_loop().await;
169            });
170            handles.push(handle);
171        }
172
173        info!(
174            "Task scheduler started with {} threads",
175            self.config.max_scheduler_threads
176        );
177        Ok(())
178    }
179
180    /// Stop the scheduler
181    pub async fn stop(&self) -> SchedulerResult<()> {
182        let mut running = self.running.write().await;
183        if !*running {
184            return Ok(());
185        }
186        *running = false;
187
188        info!("Stopping task scheduler");
189
190        // Cancel all scheduler threads
191        let mut handles = self.scheduler_handles.write().await;
192        for handle in handles.drain(..) {
193            handle.abort();
194        }
195
196        info!("Task scheduler stopped");
197        Ok(())
198    }
199
200    /// Schedule a new job
201    pub async fn schedule_job(
202        &self,
203        task: Arc<Task>,
204        trigger: Arc<dyn Trigger>,
205    ) -> SchedulerResult<String> {
206        let job = ScheduledJob::new(task.clone(), trigger);
207        let job_id = job.id.clone();
208
209        let mut jobs = self.jobs.write().await;
210        if jobs.contains_key(&task.id) {
211            return Err(SchedulerError::TaskAlreadyExists(task.id.clone()));
212        }
213
214        jobs.insert(job_id.clone(), job);
215        info!("Job scheduled: {} ({})", task.name, job_id);
216
217        Ok(job_id)
218    }
219
220    /// Schedule a job with a specific ID
221    pub async fn schedule_job_with_id(
222        &self,
223        job_id: impl Into<String>,
224        task: Arc<Task>,
225        trigger: Arc<dyn Trigger>,
226    ) -> SchedulerResult<String> {
227        let job_id = job_id.into();
228        let job = ScheduledJob::new(task.clone(), trigger).with_id(job_id.clone());
229
230        let mut jobs = self.jobs.write().await;
231        if jobs.contains_key(&job_id) {
232            return Err(SchedulerError::TaskAlreadyExists(job_id));
233        }
234
235        jobs.insert(job_id.clone(), job);
236        info!("Job scheduled with ID: {} ({})", task.name, job_id);
237
238        Ok(job_id)
239    }
240
241    /// Unschedule a job
242    pub async fn unschedule_job(&self, job_id: &str) -> SchedulerResult<()> {
243        let mut jobs = self.jobs.write().await;
244        if jobs.remove(job_id).is_some() {
245            info!("Job unscheduled: {}", job_id);
246            Ok(())
247        } else {
248            Err(SchedulerError::TaskNotFound(job_id.to_string()))
249        }
250    }
251
252    /// Enable or disable a job
253    pub async fn set_job_enabled(&self, job_id: &str, enabled: bool) -> SchedulerResult<()> {
254        let mut jobs = self.jobs.write().await;
255        if let Some(job) = jobs.get_mut(job_id) {
256            job.enabled = enabled;
257            info!(
258                "Job {} {}: {}",
259                if enabled { "enabled" } else { "disabled" },
260                job_id,
261                job.task.name
262            );
263            Ok(())
264        } else {
265            Err(SchedulerError::TaskNotFound(job_id.to_string()))
266        }
267    }
268
269    /// Get job information
270    pub async fn get_job(&self, job_id: &str) -> Option<ScheduledJob> {
271        let jobs = self.jobs.read().await;
272        jobs.get(job_id).cloned()
273    }
274
275    /// Get all jobs
276    pub async fn get_all_jobs(&self) -> Vec<ScheduledJob> {
277        let jobs = self.jobs.read().await;
278        jobs.values().cloned().collect()
279    }
280
281    /// Get jobs by task group
282    pub async fn get_jobs_by_group(&self, group: &str) -> Vec<ScheduledJob> {
283        let jobs = self.jobs.read().await;
284        jobs.values()
285            .filter(|job| job.task.group.as_ref().is_some_and(|g| g == group))
286            .cloned()
287            .collect()
288    }
289
290    /// Execute a job immediately (ad-hoc execution)
291    pub async fn execute_job_now(&self, job_id: &str) -> SchedulerResult<String> {
292        let job = {
293            let jobs = self.jobs.read().await;
294            jobs.get(job_id).cloned()
295        };
296
297        if let Some(job) = job {
298            let executor = self.executor_pool.get_executor().await;
299            let execution_id = executor.execute_task(job.task, SystemTime::now()).await?;
300            info!("Job executed immediately: {} ({})", job_id, execution_id);
301            Ok(execution_id)
302        } else {
303            Err(SchedulerError::TaskNotFound(job_id.to_string()))
304        }
305    }
306
307    /// Schedule a delayed job (execute once after delay)
308    pub async fn schedule_delayed_job(
309        &self,
310        task: Arc<Task>,
311        delay: Duration,
312    ) -> SchedulerResult<String> {
313        let trigger = Arc::new(DelayTrigger::new(delay));
314        self.schedule_job(task, trigger).await
315    }
316
317    /// Schedule a delayed job with specific ID
318    pub async fn schedule_delayed_job_with_id(
319        &self,
320        job_id: impl Into<String>,
321        task: Arc<Task>,
322        delay: Duration,
323    ) -> SchedulerResult<String> {
324        let trigger = Arc::new(DelayTrigger::new(delay));
325        self.schedule_job_with_id(job_id, task, trigger).await
326    }
327
328    /// Schedule an interval job with initial delay
329    pub async fn schedule_interval_job_with_delay(
330        &self,
331        task: Arc<Task>,
332        interval: Duration,
333        initial_delay: Duration,
334    ) -> SchedulerResult<String> {
335        let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
336        self.schedule_job(task, trigger).await
337    }
338
339    /// Schedule an interval job with initial delay and specific ID
340    pub async fn schedule_interval_job_with_delay_and_id(
341        &self,
342        job_id: impl Into<String>,
343        task: Arc<Task>,
344        interval: Duration,
345        initial_delay: Duration,
346    ) -> SchedulerResult<String> {
347        let trigger = Arc::new(DelayedIntervalTrigger::new(interval, initial_delay));
348        self.schedule_job_with_id(job_id, task, trigger).await
349    }
350
351    /// Execute a job immediately with optional execution delay
352    pub async fn execute_job_now_with_delay(
353        &self,
354        job_id: &str,
355        execution_delay: Option<Duration>,
356    ) -> SchedulerResult<String> {
357        let job = {
358            let jobs = self.jobs.read().await;
359            jobs.get(job_id).cloned()
360        };
361
362        if let Some(job) = job {
363            let executor = self.executor_pool.get_executor().await;
364            let execution_id = executor
365                .execute_task_with_delay(job.task, SystemTime::now(), execution_delay)
366                .await?;
367            info!(
368                "Job executed immediately with delay: {} ({})",
369                job_id, execution_id
370            );
371            Ok(execution_id)
372        } else {
373            Err(SchedulerError::TaskNotFound(job_id.to_string()))
374        }
375    }
376
377    /// Get task execution
378    pub async fn get_execution(&self, execution_id: &str) -> Option<TaskExecution> {
379        // Try to find the execution in any executor
380        for _ in 0..self.config.executor_pool_size {
381            let executor = self.executor_pool.get_executor().await;
382            if let Some(execution) = executor.get_execution(execution_id).await {
383                return Some(execution);
384            }
385        }
386        None
387    }
388
389    /// Cancel a running task execution
390    pub async fn cancel_execution(&self, execution_id: &str) -> SchedulerResult<()> {
391        // Try to cancel in any executor
392        for _ in 0..self.config.executor_pool_size {
393            let executor = self.executor_pool.get_executor().await;
394            if executor.cancel_task(execution_id).await.is_ok() {
395                return Ok(());
396            }
397        }
398        Err(SchedulerError::TaskNotFound(execution_id.to_string()))
399    }
400
401    /// Get scheduler status
402    pub async fn get_status(&self) -> SchedulerStatus {
403        let jobs = self.jobs.read().await;
404        let running = *self.running.read().await;
405        let total_jobs = jobs.len();
406        let enabled_jobs = jobs.values().filter(|job| job.enabled).count();
407        let running_tasks = self.executor_pool.total_running_tasks().await;
408
409        SchedulerStatus {
410            running,
411            total_jobs,
412            enabled_jobs,
413            running_tasks,
414        }
415    }
416
417    // Internal methods
418
419    fn clone_for_thread(&self) -> TaskSchedulerInternal {
420        TaskSchedulerInternal {
421            config: self.config.clone(),
422            executor_pool: self.executor_pool.clone(),
423            jobs: self.jobs.clone(),
424            running: self.running.clone(),
425        }
426    }
427
428    async fn scheduler_loop(&self, thread_id: usize) {
429        let internal = self.clone_for_thread();
430        internal.scheduler_loop(thread_id).await;
431    }
432
433    async fn cleanup_loop(&self) {
434        let internal = self.clone_for_thread();
435        internal.cleanup_loop().await;
436    }
437}
438
439// Internal scheduler for running the scheduling loop
440#[derive(Clone)]
441struct TaskSchedulerInternal {
442    config: SchedulerConfig,
443    executor_pool: Arc<ExecutorPool>,
444    jobs: Arc<RwLock<HashMap<String, ScheduledJob>>>,
445    running: Arc<RwLock<bool>>,
446}
447
448impl TaskSchedulerInternal {
449    async fn scheduler_loop(&self, thread_id: usize) {
450        info!("Scheduler thread {} started", thread_id);
451
452        let mut interval = interval(self.config.check_interval);
453
454        while *self.running.read().await {
455            interval.tick().await;
456
457            let now = SystemTime::now();
458            let jobs_to_execute = self.get_jobs_to_execute(now).await;
459
460            for job_id in jobs_to_execute {
461                if !*self.running.read().await {
462                    break;
463                }
464
465                self.execute_job(&job_id, now).await;
466            }
467        }
468
469        info!("Scheduler thread {} stopped", thread_id);
470    }
471
472    async fn cleanup_loop(&self) {
473        info!("Cleanup thread started");
474
475        let mut interval = interval(self.config.persistence_interval);
476
477        while *self.running.read().await {
478            interval.tick().await;
479
480            // Clean up old executions (keep last 24 hours)
481            let cleanup_duration = Duration::from_secs(24 * 3600);
482            for _ in 0..self.config.executor_pool_size {
483                let executor = self.executor_pool.get_executor().await;
484                executor.cleanup_old_executions(cleanup_duration).await;
485            }
486        }
487
488        info!("Cleanup thread stopped");
489    }
490
491    async fn get_jobs_to_execute(&self, now: SystemTime) -> Vec<String> {
492        let jobs = self.jobs.read().await;
493        jobs.values()
494            .filter(|job| job.should_execute(now))
495            .map(|job| job.id.clone())
496            .collect()
497    }
498
499    async fn execute_job(&self, job_id: &str, now: SystemTime) {
500        let job = {
501            let jobs = self.jobs.read().await;
502            jobs.get(job_id).cloned()
503        };
504
505        if let Some(mut job) = job {
506            // Get executor and execute task
507            let executor = self.executor_pool.get_executor().await;
508
509            match executor.execute_task(job.task.clone(), now).await {
510                Ok(execution_id) => {
511                    // Update job's last execution and next execution time
512                    job.last_execution = Some(now);
513                    job.update_next_execution();
514
515                    // Update the job in storage
516                    let mut jobs = self.jobs.write().await;
517                    jobs.insert(job_id.to_string(), job);
518
519                    info!("Job executed: {} ({})", job_id, execution_id);
520                }
521                Err(e) => {
522                    error!("Failed to execute job {}: {}", job_id, e);
523                }
524            }
525        }
526    }
527}
528
529/// Scheduler status information
530#[derive(Debug, Clone)]
531pub struct SchedulerStatus {
532    pub running: bool,
533    pub total_jobs: usize,
534    pub enabled_jobs: usize,
535    pub running_tasks: usize,
536}