Skip to main content

rocketmq_rust/
schedule.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16pub mod scheduler;
17pub mod task;
18pub mod trigger;
19
20use std::error::Error;
21use std::fmt;
22
23pub use executor::ExecutorPool;
24pub use task::Task;
25pub use task::TaskContext;
26pub use task::TaskResult;
27pub use task::TaskStatus;
28
29/// Scheduler error type
30#[derive(Debug)]
31pub enum SchedulerError {
32    TaskNotFound(String),
33    TaskAlreadyExists(String),
34    ExecutorError(String),
35    TriggerError(String),
36    SystemError(String),
37}
38
39impl fmt::Display for SchedulerError {
40    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41        match self {
42            SchedulerError::TaskNotFound(id) => write!(f, "Task not found: {id}"),
43            SchedulerError::TaskAlreadyExists(id) => write!(f, "Task already exists: {id}"),
44            SchedulerError::ExecutorError(msg) => write!(f, "Executor error: {msg}"),
45            SchedulerError::TriggerError(msg) => write!(f, "Trigger error: {msg}"),
46            SchedulerError::SystemError(msg) => write!(f, "System error: {msg}"),
47        }
48    }
49}
50
51impl Error for SchedulerError {}
52
53pub type SchedulerResult<T> = Result<T, SchedulerError>;
54
55pub mod simple_scheduler {
56    use std::collections::HashMap;
57    use std::future::Future;
58    use std::sync::atomic::AtomicU64;
59    use std::sync::atomic::Ordering;
60    use std::sync::Arc;
61
62    use anyhow::Result;
63    use parking_lot::RwLock;
64    use tokio::sync::Semaphore;
65    use tokio::task::JoinHandle;
66    use tokio::time::Duration;
67    use tokio::time::Instant;
68    use tokio::time::{self};
69    use tokio_util::sync::CancellationToken;
70    use tracing::error;
71    use tracing::info;
72
73    use crate::ArcMut;
74
75    #[derive(Debug, Clone, Copy)]
76    pub enum ScheduleMode {
77        /// Align the beats, and they might pile up.
78        FixedRate,
79        /// Sleep only after the task is completed, and there will be no accumulation.
80        FixedDelay,
81        /// Align the beats, but skip if the last task is not yet completed.
82        FixedRateNoOverlap,
83    }
84
85    type TaskId = u64;
86
87    pub struct TaskInfo {
88        cancel_token: CancellationToken,
89        handle: JoinHandle<()>,
90    }
91
92    #[derive(Clone)]
93    pub struct ScheduledTaskManager {
94        tasks: Arc<RwLock<HashMap<TaskId, TaskInfo>>>,
95        counter: Arc<AtomicU64>,
96    }
97
98    impl Default for ScheduledTaskManager {
99        fn default() -> Self {
100            Self::new()
101        }
102    }
103
104    impl ScheduledTaskManager {
105        pub fn new() -> Self {
106            Self {
107                tasks: Arc::new(RwLock::new(HashMap::new())),
108                counter: Arc::new(AtomicU64::new(0)),
109            }
110        }
111
112        fn next_id(&self) -> TaskId {
113            self.counter.fetch_add(1, Ordering::Relaxed)
114        }
115
116        /// Adds a fixed-rate scheduled task to the task manager.
117        ///
118        /// # Arguments
119        /// * `initial_delay` - The delay before the first execution of the task.
120        /// * `period` - The interval between task executions.
121        /// * `task_fn` - A function that defines the task to be executed. It takes a
122        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
123        ///   `Result<()>`.
124        ///
125        /// # Returns
126        /// A `TaskId` representing the unique identifier of the scheduled task.
127        ///
128        /// # Notes
129        /// - Tasks are executed at fixed intervals, even if previous executions overlap.
130        pub fn add_fixed_rate_task<F, Fut>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
131        where
132            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
133            Fut: Future<Output = Result<()>> + Send + 'static,
134        {
135            self.add_scheduled_task(ScheduleMode::FixedRate, initial_delay, period, task_fn)
136        }
137
138        /// Adds a fixed-delay scheduled task to the task manager.
139        ///
140        /// # Arguments
141        /// * `initial_delay` - The delay before the first execution of the task.
142        /// * `period` - The interval between task executions.
143        /// * `task_fn` - A function that defines the task to be executed. It takes a
144        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
145        ///   `Result<()>`.
146        ///
147        /// # Returns
148        /// A `TaskId` representing the unique identifier of the scheduled task.
149        ///
150        /// # Notes
151        /// - Tasks are executed serially, with a delay after each task completes.
152        pub fn add_fixed_delay_task<F, Fut>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
153        where
154            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
155            Fut: Future<Output = Result<()>> + Send + 'static,
156        {
157            self.add_scheduled_task(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
158        }
159
160        /// Adds a fixed-rate-no-overlap scheduled task to the task manager.
161        ///
162        /// # Arguments
163        /// * `initial_delay` - The delay before the first execution of the task.
164        /// * `period` - The interval between task executions.
165        /// * `task_fn` - A function that defines the task to be executed. It takes a
166        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
167        ///   `Result<()>`.
168        ///
169        /// # Returns
170        /// A `TaskId` representing the unique identifier of the scheduled task.
171        ///
172        /// # Notes
173        /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
174        pub fn add_fixed_rate_no_overlap_task<F, Fut>(
175            &self,
176            initial_delay: Duration,
177            period: Duration,
178            task_fn: F,
179        ) -> TaskId
180        where
181            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
182            Fut: Future<Output = Result<()>> + Send + 'static,
183        {
184            self.add_scheduled_task(ScheduleMode::FixedRateNoOverlap, initial_delay, period, task_fn)
185        }
186
187        /// Adds a scheduled task to the task manager.
188        ///
189        /// # Arguments
190        /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
191        ///   - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
192        ///   - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
193        ///   - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
194        ///     still running.
195        /// * `initial_delay` - The delay before the first execution of the task.
196        /// * `period` - The interval between task executions.
197        /// * `task_fn` - A function that defines the task to be executed. It takes a
198        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
199        ///   `Result<()>`.
200        ///
201        /// # Returns
202        /// A `TaskId` representing the unique identifier of the scheduled task.
203        ///
204        /// # Notes
205        /// - The task function is executed asynchronously.
206        /// - The `CancellationToken` can be used to gracefully cancel the task.
207        /// - The task is added to the internal task manager and can be managed (e.g., canceled or
208        ///   aborted) later.
209        pub fn add_scheduled_task<F, Fut>(
210            &self,
211            mode: ScheduleMode,
212            initial_delay: Duration,
213            period: Duration,
214            task_fn: F,
215        ) -> TaskId
216        where
217            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
218            Fut: Future<Output = Result<()>> + Send + 'static,
219        {
220            let id = self.next_id();
221            let token = CancellationToken::new();
222            let token_child = token.clone();
223
224            let task_fn = ArcMut::new(task_fn);
225
226            let handle = tokio::spawn({
227                let mut task_fn = task_fn;
228                async move {
229                    match mode {
230                        ScheduleMode::FixedRate => {
231                            let start = Instant::now() + initial_delay;
232                            let mut ticker = time::interval_at(start, period);
233
234                            loop {
235                                tokio::select! {
236                                    _ = token_child.cancelled() => {
237                                        info!("Task {} cancelled gracefully", id);
238                                        break;
239                                    }
240                                    _ = ticker.tick() => {
241                                        // Allow concurrent execution: One subtask per tick
242                                        let mut task_fn = task_fn.clone();
243                                        let child = token_child.clone();
244                                        tokio::spawn(async move {
245                                            // 1) Lock out &mut F, call once to get a future
246                                            let fut = {
247                                                (task_fn)(child)
248                                            };
249                                            // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
250                                            if let Err(e) = fut.await {
251                                                error!("FixedRate task {} failed: {:?}", id, e);
252                                            }
253                                        });
254                                    }
255                                }
256                            }
257                        }
258
259                        ScheduleMode::FixedDelay => {
260                            time::sleep(initial_delay).await;
261                            loop {
262                                tokio::select! {
263                                    _ = token_child.cancelled() => {
264                                        info!("Task {} cancelled gracefully", id);
265                                        break;
266                                    }
267                                    _ = async {
268                                        // Serial execution: complete one task and then sleep
269                                        let fut = {
270                                            (task_fn)(token_child.clone())
271                                        };
272                                        if let Err(e) = fut.await {
273                                            error!("FixedDelay task {} failed: {:?}", id, e);
274                                        }
275                                        time::sleep(period).await;
276                                    } => {}
277                                }
278                            }
279                        }
280
281                        ScheduleMode::FixedRateNoOverlap => {
282                            let start = Instant::now() + initial_delay;
283                            let mut ticker = time::interval_at(start, period);
284
285                            // Permission=1, controls non-overlapping execution
286                            let gate = Arc::new(Semaphore::new(1));
287
288                            loop {
289                                tokio::select! {
290                                    _ = token_child.cancelled() => {
291                                        info!("Task {} cancelled gracefully", id);
292                                        break;
293                                    }
294                                    _ = ticker.tick() => {
295                                        // Try to acquire permission. If unable to acquire, skip the current tick.
296                                        if let Ok(permit) = gate.clone().try_acquire_owned() {
297                                            let mut task_fn = task_fn.clone();
298                                            let child = token_child.clone();
299                                            tokio::spawn(async move {
300                                                // Release the lock immediately after generating the future
301                                                let fut = {
302                                                    (task_fn)(child)
303                                                };
304                                                if let Err(e) = fut.await {
305                                                    error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
306                                                }
307                                                drop(permit); // Release the permit after completion
308                                            });
309                                        } else {
310                                            info!("Task {} skipped due to overlap", id);
311                                        }
312                                    }
313                                }
314                            }
315                        }
316                    }
317                }
318            });
319
320            self.tasks.write().insert(
321                id,
322                TaskInfo {
323                    cancel_token: token,
324                    handle,
325                },
326            );
327
328            id
329        }
330
331        /// Graceful cancellation
332        pub fn cancel_task(&self, id: TaskId) {
333            if let Some(info) = self.tasks.write().remove(&id) {
334                info.cancel_token.cancel();
335                tokio::spawn(async move {
336                    let _ = info.handle.await;
337                });
338            }
339        }
340
341        /// Roughly abort
342        pub fn abort_task(&self, id: TaskId) {
343            if let Some(info) = self.tasks.write().remove(&id) {
344                info.handle.abort();
345            }
346        }
347
348        /// Batch cancel
349        pub fn cancel_all(&self) {
350            let mut tasks = self.tasks.write();
351            for (_, info) in tasks.drain() {
352                info.cancel_token.cancel();
353                tokio::spawn(async move {
354                    let _ = info.handle.await;
355                });
356            }
357        }
358
359        /// Batch abort
360        pub fn abort_all(&self) {
361            let mut tasks = self.tasks.write();
362            for (_, info) in tasks.drain() {
363                info.handle.abort();
364            }
365        }
366
367        pub fn task_count(&self) -> usize {
368            self.tasks.read().len()
369        }
370    }
371
372    impl ScheduledTaskManager {
373        /// Adds a fixed-rate scheduled task to the task manager asynchronously.
374        ///
375        /// # Arguments
376        /// * `initial_delay` - The delay before the first execution of the task.
377        /// * `period` - The interval between task executions.
378        /// * `task_fn` - A function that defines the task to be executed. It takes a
379        ///   `CancellationToken` as an argument and returns a `Result<()>`.
380        ///
381        /// # Returns
382        /// A `TaskId` representing the unique identifier of the scheduled task.
383        ///
384        /// # Notes
385        /// - Tasks are executed at fixed intervals, even if previous executions overlap.
386        /// - The task function is executed asynchronously.
387        pub fn add_fixed_rate_task_async<F>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
388        where
389            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
390            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
391        {
392            self.add_scheduled_task_async(ScheduleMode::FixedRate, initial_delay, period, task_fn)
393        }
394
395        /// Adds a fixed-delay scheduled task to the task manager asynchronously.
396        ///
397        /// # Arguments
398        /// * `initial_delay` - The delay before the first execution of the task.
399        /// * `period` - The interval between task executions.
400        /// * `task_fn` - A function that defines the task to be executed. It takes a
401        ///   `CancellationToken` as an argument and returns a `Result<()>`.
402        ///
403        /// # Returns
404        /// A `TaskId` representing the unique identifier of the scheduled task.
405        ///
406        /// # Notes
407        /// - Tasks are executed serially, with a delay after each task completes.
408        /// - The task function is executed asynchronously.
409        pub fn add_fixed_delay_task_async<F>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
410        where
411            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
412            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
413        {
414            self.add_scheduled_task_async(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
415        }
416
417        /// Adds a fixed-rate-no-overlap scheduled task to the task manager asynchronously.
418        ///
419        /// # Arguments
420        /// * `initial_delay` - The delay before the first execution of the task.
421        /// * `period` - The interval between task executions.
422        /// * `task_fn` - A function that defines the task to be executed. It takes a
423        ///   `CancellationToken` as an argument and returns a `Result<()>`.
424        ///
425        /// # Returns
426        /// A `TaskId` representing the unique identifier of the scheduled task.
427        ///
428        /// # Notes
429        /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
430        /// - The task function is executed asynchronously.
431        pub fn add_fixed_rate_no_overlap_task_async<F>(
432            &self,
433            initial_delay: Duration,
434            period: Duration,
435            task_fn: F,
436        ) -> TaskId
437        where
438            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
439            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
440        {
441            self.add_scheduled_task_async(ScheduleMode::FixedRateNoOverlap, initial_delay, period, task_fn)
442        }
443
444        /// Adds a scheduled task to the task manager asynchronously.
445        ///
446        /// # Arguments
447        /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
448        ///   - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
449        ///   - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
450        ///   - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
451        ///     still running.
452        /// * `initial_delay` - The delay before the first execution of the task.
453        /// * `period` - The interval between task executions.
454        /// * `task_fn` - A function that defines the task to be executed. It takes a
455        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
456        ///   `Result<()>`.
457        ///
458        /// # Returns
459        /// A `TaskId` representing the unique identifier of the scheduled task.
460        ///
461        /// # Notes
462        /// - The task function is executed asynchronously.
463        /// - The `CancellationToken` can be used to gracefully cancel the task.
464        /// - The task is added to the internal task manager and can be managed (e.g., canceled or
465        ///   aborted) later.
466        pub fn add_scheduled_task_async<F>(
467            &self,
468            mode: ScheduleMode,
469            initial_delay: Duration,
470            period: Duration,
471            task_fn: F,
472        ) -> TaskId
473        where
474            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
475            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
476        {
477            let id = self.next_id();
478            let token = CancellationToken::new();
479            let token_child = token.clone();
480
481            let task_fn = ArcMut::new(task_fn);
482
483            let handle = tokio::spawn({
484                let mut task_fn = task_fn;
485                async move {
486                    match mode {
487                        ScheduleMode::FixedRate => {
488                            let start = Instant::now() + initial_delay;
489                            let mut ticker = time::interval_at(start, period);
490
491                            loop {
492                                tokio::select! {
493                                    _ = token_child.cancelled() => {
494                                        info!("Task {} cancelled gracefully", id);
495                                        break;
496                                    }
497                                    _ = ticker.tick() => {
498                                        // Allow concurrent execution: One subtask per tick
499                                        let mut task_fn = task_fn.clone();
500                                        let child = token_child.clone();
501                                        tokio::spawn(async move {
502                                            // 1) Lock out &mut F, call once to get a future
503                                            let fut = {
504                                                task_fn(child)
505                                            };
506                                            // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
507                                            if let Err(e) = fut.await {
508                                                error!("FixedRate task {} failed: {:?}", id, e);
509                                            }
510                                        });
511                                    }
512                                }
513                            }
514                        }
515
516                        ScheduleMode::FixedDelay => {
517                            time::sleep(initial_delay).await;
518                            loop {
519                                tokio::select! {
520                                    _ = token_child.cancelled() => {
521                                        info!("Task {} cancelled gracefully", id);
522                                        break;
523                                    }
524                                    _ = async {
525                                        // Serial execution: complete one task and then sleep
526                                        let fut = {
527                                            (task_fn)(token_child.clone())
528                                        };
529                                        if let Err(e) = fut.await {
530                                            error!("FixedDelay task {} failed: {:?}", id, e);
531                                        }
532                                        time::sleep(period).await;
533                                    } => {}
534                                }
535                            }
536                        }
537
538                        ScheduleMode::FixedRateNoOverlap => {
539                            let start = Instant::now() + initial_delay;
540                            let mut ticker = time::interval_at(start, period);
541
542                            // Permission=1, controls non-overlapping execution
543                            let gate = Arc::new(Semaphore::new(1));
544
545                            loop {
546                                tokio::select! {
547                                    _ = token_child.cancelled() => {
548                                        info!("Task {} cancelled gracefully", id);
549                                        break;
550                                    }
551                                    _ = ticker.tick() => {
552                                        // Try to acquire permission. If unable to acquire, skip the current tick.
553                                        if let Ok(permit) = gate.clone().try_acquire_owned() {
554                                            let mut task_fn = task_fn.clone();
555                                            let child = token_child.clone();
556                                            tokio::spawn(async move {
557                                                // Release the lock immediately after generating the future
558                                                let fut = {
559                                                    (task_fn)(child)
560                                                };
561                                                if let Err(e) = fut.await {
562                                                    error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
563                                                }
564                                                drop(permit); // Release the permit after completion
565                                            });
566                                        } else {
567                                            info!("Task {} skipped due to overlap", id);
568                                        }
569                                    }
570                                }
571                            }
572                        }
573                    }
574                }
575            });
576
577            self.tasks.write().insert(
578                id,
579                TaskInfo {
580                    cancel_token: token,
581                    handle,
582                },
583            );
584
585            id
586        }
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use std::sync::atomic::AtomicUsize;
593    use std::sync::atomic::Ordering;
594    use std::sync::Arc;
595    use std::time::Duration;
596
597    use tokio::time;
598
599    use crate::schedule::simple_scheduler::*;
600
601    #[tokio::test]
602    async fn adds_task_and_increments_task_count() {
603        let manager = ScheduledTaskManager::new();
604        let task_id = manager.add_scheduled_task(
605            ScheduleMode::FixedRate,
606            Duration::from_secs(1),
607            Duration::from_secs(2),
608            |token| async move {
609                if token.is_cancelled() {
610                    return Ok(());
611                }
612                Ok(())
613            },
614        );
615
616        assert_eq!(manager.task_count(), 1);
617        manager.cancel_task(task_id);
618    }
619
620    #[tokio::test]
621    async fn cancels_task_and_decrements_task_count() {
622        let manager = ScheduledTaskManager::new();
623        let task_id = manager.add_scheduled_task(
624            ScheduleMode::FixedRate,
625            Duration::from_secs(1),
626            Duration::from_secs(2),
627            |token| async move {
628                if token.is_cancelled() {
629                    return Ok(());
630                }
631                Ok(())
632            },
633        );
634
635        manager.cancel_task(task_id);
636        assert_eq!(manager.task_count(), 0);
637    }
638
639    #[tokio::test]
640    async fn aborts_task_and_decrements_task_count() {
641        let manager = ScheduledTaskManager::new();
642        let task_id = manager.add_scheduled_task(
643            ScheduleMode::FixedRate,
644            Duration::from_secs(1),
645            Duration::from_secs(2),
646            |token| async move {
647                if token.is_cancelled() {
648                    return Ok(());
649                }
650                Ok(())
651            },
652        );
653
654        manager.abort_task(task_id);
655        assert_eq!(manager.task_count(), 0);
656    }
657
658    #[tokio::test]
659    async fn cancels_all_tasks() {
660        let manager = ScheduledTaskManager::new();
661        for _ in 0..3 {
662            manager.add_scheduled_task(
663                ScheduleMode::FixedRate,
664                Duration::from_secs(1),
665                Duration::from_secs(2),
666                |token| async move {
667                    if token.is_cancelled() {
668                        return Ok(());
669                    }
670                    Ok(())
671                },
672            );
673        }
674
675        assert_eq!(manager.task_count(), 3);
676        manager.cancel_all();
677        assert_eq!(manager.task_count(), 0);
678    }
679
680    #[tokio::test]
681    async fn aborts_all_tasks() {
682        let manager = ScheduledTaskManager::new();
683        for _ in 0..3 {
684            manager.add_scheduled_task(
685                ScheduleMode::FixedRate,
686                Duration::from_secs(1),
687                Duration::from_secs(2),
688                |token| async move {
689                    if token.is_cancelled() {
690                        return Ok(());
691                    }
692                    Ok(())
693                },
694            );
695        }
696
697        assert_eq!(manager.task_count(), 3);
698        manager.abort_all();
699        assert_eq!(manager.task_count(), 0);
700    }
701
702    #[tokio::test]
703    async fn skips_task_execution_in_fixed_rate_no_overlap_mode() {
704        let manager = ScheduledTaskManager::new();
705        let task_id = manager.add_scheduled_task(
706            ScheduleMode::FixedRateNoOverlap,
707            Duration::from_secs(0),
708            Duration::from_millis(100),
709            |token| async move {
710                tokio::time::sleep(Duration::from_millis(200)).await;
711                if token.is_cancelled() {
712                    return Ok(());
713                }
714                Ok(())
715            },
716        );
717
718        time::sleep(Duration::from_millis(400)).await;
719        manager.cancel_task(task_id);
720        assert_eq!(manager.task_count(), 0);
721    }
722
723    fn new_manager() -> ScheduledTaskManager {
724        ScheduledTaskManager::new()
725    }
726
727    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
728    async fn test_fixed_rate_task() {
729        let mgr = new_manager();
730        let counter = Arc::new(AtomicUsize::new(0));
731
732        let c = counter.clone();
733        let task_id = mgr.add_fixed_rate_task_async(
734            Duration::from_millis(50),
735            Duration::from_millis(100),
736            async move |_ctx| {
737                c.fetch_add(1, Ordering::Relaxed);
738                Ok(())
739            },
740        );
741
742        time::sleep(Duration::from_millis(500)).await;
743
744        mgr.cancel_task(task_id);
745        time::sleep(Duration::from_millis(50)).await;
746
747        let executed = counter.load(Ordering::Relaxed);
748        assert!(executed >= 3, "FixedRate executed too few times: {}", executed);
749    }
750
751    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
752    async fn test_fixed_delay_task() {
753        let mgr = new_manager();
754        let counter = Arc::new(AtomicUsize::new(0));
755
756        let c = counter.clone();
757        let task_id = mgr.add_fixed_delay_task_async(
758            Duration::from_millis(10),
759            Duration::from_millis(50),
760            async move |_ctx| {
761                c.fetch_add(1, Ordering::Relaxed);
762                Ok(())
763            },
764        );
765
766        time::sleep(Duration::from_millis(300)).await;
767
768        mgr.cancel_task(task_id);
769        time::sleep(Duration::from_millis(50)).await;
770
771        let executed = counter.load(Ordering::Relaxed);
772        assert!((3..=6).contains(&executed), "FixedDelay count unexpected: {}", executed);
773    }
774
775    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
776    async fn test_fixed_rate_no_overlap_task() {
777        let mgr = new_manager();
778        let counter = Arc::new(AtomicUsize::new(0));
779
780        let c = counter.clone();
781        let task_id = mgr.add_fixed_rate_no_overlap_task_async(
782            Duration::from_millis(10),
783            Duration::from_millis(50),
784            async move |_ctx| {
785                time::sleep(Duration::from_millis(80)).await;
786                c.fetch_add(1, Ordering::Relaxed);
787                Ok(())
788            },
789        );
790
791        time::sleep(Duration::from_millis(400)).await;
792
793        mgr.cancel_task(task_id);
794        time::sleep(Duration::from_millis(50)).await;
795
796        let executed = counter.load(Ordering::Relaxed);
797        assert!(
798            (2..=5).contains(&executed),
799            "FixedRateNoOverlap count unexpected: {}",
800            executed
801        );
802    }
803}