rocketmq_rust/
schedule.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17pub mod executor;
18pub mod scheduler;
19pub mod task;
20pub mod trigger;
21
22use std::error::Error;
23use std::fmt;
24
25pub use executor::ExecutorPool;
26pub use task::Task;
27pub use task::TaskContext;
28pub use task::TaskResult;
29pub use task::TaskStatus;
30
31/// Scheduler error type
32#[derive(Debug)]
33pub enum SchedulerError {
34    TaskNotFound(String),
35    TaskAlreadyExists(String),
36    ExecutorError(String),
37    TriggerError(String),
38    SystemError(String),
39}
40
41impl fmt::Display for SchedulerError {
42    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43        match self {
44            SchedulerError::TaskNotFound(id) => write!(f, "Task not found: {id}"),
45            SchedulerError::TaskAlreadyExists(id) => write!(f, "Task already exists: {id}"),
46            SchedulerError::ExecutorError(msg) => write!(f, "Executor error: {msg}"),
47            SchedulerError::TriggerError(msg) => write!(f, "Trigger error: {msg}"),
48            SchedulerError::SystemError(msg) => write!(f, "System error: {msg}"),
49        }
50    }
51}
52
53impl Error for SchedulerError {}
54
55pub type SchedulerResult<T> = Result<T, SchedulerError>;
56
57pub mod simple_scheduler {
58    use std::collections::HashMap;
59    use std::future::Future;
60    use std::sync::atomic::AtomicU64;
61    use std::sync::atomic::Ordering;
62    use std::sync::Arc;
63
64    use anyhow::Result;
65    use parking_lot::RwLock;
66    use tokio::sync::Semaphore;
67    use tokio::task::JoinHandle;
68    use tokio::time::Duration;
69    use tokio::time::Instant;
70    use tokio::time::{self};
71    use tokio_util::sync::CancellationToken;
72    use tracing::error;
73    use tracing::info;
74
75    use crate::ArcMut;
76
77    #[derive(Debug, Clone, Copy)]
78    pub enum ScheduleMode {
79        /// Align the beats, and they might pile up.
80        FixedRate,
81        /// Sleep only after the task is completed, and there will be no accumulation.
82        FixedDelay,
83        /// Align the beats, but skip if the last task is not yet completed.
84        FixedRateNoOverlap,
85    }
86
87    type TaskId = u64;
88
89    pub struct TaskInfo {
90        cancel_token: CancellationToken,
91        handle: JoinHandle<()>,
92    }
93
94    #[derive(Clone)]
95    pub struct ScheduledTaskManager {
96        tasks: Arc<RwLock<HashMap<TaskId, TaskInfo>>>,
97        counter: Arc<AtomicU64>,
98    }
99
100    impl Default for ScheduledTaskManager {
101        fn default() -> Self {
102            Self::new()
103        }
104    }
105
106    impl ScheduledTaskManager {
107        pub fn new() -> Self {
108            Self {
109                tasks: Arc::new(RwLock::new(HashMap::new())),
110                counter: Arc::new(AtomicU64::new(0)),
111            }
112        }
113
114        fn next_id(&self) -> TaskId {
115            self.counter.fetch_add(1, Ordering::Relaxed)
116        }
117
118        /// Adds a fixed-rate scheduled task to the task manager.
119        ///
120        /// # Arguments
121        /// * `initial_delay` - The delay before the first execution of the task.
122        /// * `period` - The interval between task executions.
123        /// * `task_fn` - A function that defines the task to be executed. It takes a
124        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
125        ///   `Result<()>`.
126        ///
127        /// # Returns
128        /// A `TaskId` representing the unique identifier of the scheduled task.
129        ///
130        /// # Notes
131        /// - Tasks are executed at fixed intervals, even if previous executions overlap.
132        pub fn add_fixed_rate_task<F, Fut>(
133            &self,
134            initial_delay: Duration,
135            period: Duration,
136            task_fn: F,
137        ) -> TaskId
138        where
139            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
140            Fut: Future<Output = Result<()>> + Send + 'static,
141        {
142            self.add_scheduled_task(ScheduleMode::FixedRate, initial_delay, period, task_fn)
143        }
144
145        /// Adds a fixed-delay scheduled task to the task manager.
146        ///
147        /// # Arguments
148        /// * `initial_delay` - The delay before the first execution of the task.
149        /// * `period` - The interval between task executions.
150        /// * `task_fn` - A function that defines the task to be executed. It takes a
151        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
152        ///   `Result<()>`.
153        ///
154        /// # Returns
155        /// A `TaskId` representing the unique identifier of the scheduled task.
156        ///
157        /// # Notes
158        /// - Tasks are executed serially, with a delay after each task completes.
159        pub fn add_fixed_delay_task<F, Fut>(
160            &self,
161            initial_delay: Duration,
162            period: Duration,
163            task_fn: F,
164        ) -> TaskId
165        where
166            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
167            Fut: Future<Output = Result<()>> + Send + 'static,
168        {
169            self.add_scheduled_task(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
170        }
171
172        /// Adds a fixed-rate-no-overlap scheduled task to the task manager.
173        ///
174        /// # Arguments
175        /// * `initial_delay` - The delay before the first execution of the task.
176        /// * `period` - The interval between task executions.
177        /// * `task_fn` - A function that defines the task to be executed. It takes a
178        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
179        ///   `Result<()>`.
180        ///
181        /// # Returns
182        /// A `TaskId` representing the unique identifier of the scheduled task.
183        ///
184        /// # Notes
185        /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
186        pub fn add_fixed_rate_no_overlap_task<F, Fut>(
187            &self,
188            initial_delay: Duration,
189            period: Duration,
190            task_fn: F,
191        ) -> TaskId
192        where
193            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
194            Fut: Future<Output = Result<()>> + Send + 'static,
195        {
196            self.add_scheduled_task(
197                ScheduleMode::FixedRateNoOverlap,
198                initial_delay,
199                period,
200                task_fn,
201            )
202        }
203
204        /// Adds a scheduled task to the task manager.
205        ///
206        /// # Arguments
207        /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
208        ///   - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
209        ///   - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
210        ///   - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
211        ///     still running.
212        /// * `initial_delay` - The delay before the first execution of the task.
213        /// * `period` - The interval between task executions.
214        /// * `task_fn` - A function that defines the task to be executed. It takes a
215        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
216        ///   `Result<()>`.
217        ///
218        /// # Returns
219        /// A `TaskId` representing the unique identifier of the scheduled task.
220        ///
221        /// # Notes
222        /// - The task function is executed asynchronously.
223        /// - The `CancellationToken` can be used to gracefully cancel the task.
224        /// - The task is added to the internal task manager and can be managed (e.g., canceled or
225        ///   aborted) later.
226        pub fn add_scheduled_task<F, Fut>(
227            &self,
228            mode: ScheduleMode,
229            initial_delay: Duration,
230            period: Duration,
231            task_fn: F,
232        ) -> TaskId
233        where
234            F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
235            Fut: Future<Output = Result<()>> + Send + 'static,
236        {
237            let id = self.next_id();
238            let token = CancellationToken::new();
239            let token_child = token.clone();
240
241            let task_fn = ArcMut::new(task_fn);
242
243            let handle = tokio::spawn({
244                let mut task_fn = task_fn.clone();
245                async move {
246                    match mode {
247                        ScheduleMode::FixedRate => {
248                            let start = Instant::now() + initial_delay;
249                            let mut ticker = time::interval_at(start, period);
250
251                            loop {
252                                tokio::select! {
253                                    _ = token_child.cancelled() => {
254                                        info!("Task {} cancelled gracefully", id);
255                                        break;
256                                    }
257                                    _ = ticker.tick() => {
258                                        // Allow concurrent execution: One subtask per tick
259                                        let mut task_fn = task_fn.clone();
260                                        let child = token_child.clone();
261                                        tokio::spawn(async move {
262                                            // 1) Lock out &mut F, call once to get a future
263                                            let fut = {
264                                                (task_fn)(child)
265                                            };
266                                            // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
267                                            if let Err(e) = fut.await {
268                                                error!("FixedRate task {} failed: {:?}", id, e);
269                                            }
270                                        });
271                                    }
272                                }
273                            }
274                        }
275
276                        ScheduleMode::FixedDelay => {
277                            time::sleep(initial_delay).await;
278                            loop {
279                                tokio::select! {
280                                    _ = token_child.cancelled() => {
281                                        info!("Task {} cancelled gracefully", id);
282                                        break;
283                                    }
284                                    _ = async {
285                                        // Serial execution: complete one task and then sleep
286                                        let fut = {
287                                            (task_fn)(token_child.clone())
288                                        };
289                                        if let Err(e) = fut.await {
290                                            error!("FixedDelay task {} failed: {:?}", id, e);
291                                        }
292                                        time::sleep(period).await;
293                                    } => {}
294                                }
295                            }
296                        }
297
298                        ScheduleMode::FixedRateNoOverlap => {
299                            let start = Instant::now() + initial_delay;
300                            let mut ticker = time::interval_at(start, period);
301
302                            // Permission=1, controls non-overlapping execution
303                            let gate = Arc::new(Semaphore::new(1));
304
305                            loop {
306                                tokio::select! {
307                                    _ = token_child.cancelled() => {
308                                        info!("Task {} cancelled gracefully", id);
309                                        break;
310                                    }
311                                    _ = ticker.tick() => {
312                                        // Try to acquire permission. If unable to acquire, skip the current tick.
313                                        if let Ok(permit) = gate.clone().try_acquire_owned() {
314                                            let mut task_fn = task_fn.clone();
315                                            let child = token_child.clone();
316                                            tokio::spawn(async move {
317                                                // Release the lock immediately after generating the future
318                                                let fut = {
319                                                    (task_fn)(child)
320                                                };
321                                                if let Err(e) = fut.await {
322                                                    error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
323                                                }
324                                                drop(permit); // Release the permit after completion
325                                            });
326                                        } else {
327                                            info!("Task {} skipped due to overlap", id);
328                                        }
329                                    }
330                                }
331                            }
332                        }
333                    }
334                }
335            });
336
337            self.tasks.write().insert(
338                id,
339                TaskInfo {
340                    cancel_token: token,
341                    handle,
342                },
343            );
344
345            id
346        }
347
348        /// Graceful cancellation
349        pub fn cancel_task(&self, id: TaskId) {
350            if let Some(info) = self.tasks.write().remove(&id) {
351                info.cancel_token.cancel();
352                tokio::spawn(async move {
353                    let _ = info.handle.await;
354                });
355            }
356        }
357
358        /// Roughly abort
359        pub fn abort_task(&self, id: TaskId) {
360            if let Some(info) = self.tasks.write().remove(&id) {
361                info.handle.abort();
362            }
363        }
364
365        /// Batch cancel
366        pub fn cancel_all(&self) {
367            let mut tasks = self.tasks.write();
368            for (_, info) in tasks.drain() {
369                info.cancel_token.cancel();
370                tokio::spawn(async move {
371                    let _ = info.handle.await;
372                });
373            }
374        }
375
376        /// Batch abort
377        pub fn abort_all(&self) {
378            let mut tasks = self.tasks.write();
379            for (_, info) in tasks.drain() {
380                info.handle.abort();
381            }
382        }
383
384        pub fn task_count(&self) -> usize {
385            self.tasks.read().len()
386        }
387    }
388
389    impl ScheduledTaskManager {
390        /// Adds a fixed-rate scheduled task to the task manager asynchronously.
391        ///
392        /// # Arguments
393        /// * `initial_delay` - The delay before the first execution of the task.
394        /// * `period` - The interval between task executions.
395        /// * `task_fn` - A function that defines the task to be executed. It takes a
396        ///   `CancellationToken` as an argument and returns a `Result<()>`.
397        ///
398        /// # Returns
399        /// A `TaskId` representing the unique identifier of the scheduled task.
400        ///
401        /// # Notes
402        /// - Tasks are executed at fixed intervals, even if previous executions overlap.
403        /// - The task function is executed asynchronously.
404        pub fn add_fixed_rate_task_async<F>(
405            &self,
406            initial_delay: Duration,
407            period: Duration,
408            task_fn: F,
409        ) -> TaskId
410        where
411            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
412            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
413        {
414            self.add_scheduled_task_async(ScheduleMode::FixedRate, initial_delay, period, task_fn)
415        }
416
417        /// Adds a fixed-delay scheduled task to the task manager asynchronously.
418        ///
419        /// # Arguments
420        /// * `initial_delay` - The delay before the first execution of the task.
421        /// * `period` - The interval between task executions.
422        /// * `task_fn` - A function that defines the task to be executed. It takes a
423        ///   `CancellationToken` as an argument and returns a `Result<()>`.
424        ///
425        /// # Returns
426        /// A `TaskId` representing the unique identifier of the scheduled task.
427        ///
428        /// # Notes
429        /// - Tasks are executed serially, with a delay after each task completes.
430        /// - The task function is executed asynchronously.
431        pub fn add_fixed_delay_task_async<F>(
432            &self,
433            initial_delay: Duration,
434            period: Duration,
435            task_fn: F,
436        ) -> TaskId
437        where
438            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
439            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
440        {
441            self.add_scheduled_task_async(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
442        }
443
444        /// Adds a fixed-rate-no-overlap scheduled task to the task manager asynchronously.
445        ///
446        /// # Arguments
447        /// * `initial_delay` - The delay before the first execution of the task.
448        /// * `period` - The interval between task executions.
449        /// * `task_fn` - A function that defines the task to be executed. It takes a
450        ///   `CancellationToken` as an argument and returns a `Result<()>`.
451        ///
452        /// # Returns
453        /// A `TaskId` representing the unique identifier of the scheduled task.
454        ///
455        /// # Notes
456        /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
457        /// - The task function is executed asynchronously.
458        pub fn add_fixed_rate_no_overlap_task_async<F>(
459            &self,
460            initial_delay: Duration,
461            period: Duration,
462            task_fn: F,
463        ) -> TaskId
464        where
465            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
466            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
467        {
468            self.add_scheduled_task_async(
469                ScheduleMode::FixedRateNoOverlap,
470                initial_delay,
471                period,
472                task_fn,
473            )
474        }
475
476        /// Adds a scheduled task to the task manager asynchronously.
477        ///
478        /// # Arguments
479        /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
480        ///   - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
481        ///   - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
482        ///   - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
483        ///     still running.
484        /// * `initial_delay` - The delay before the first execution of the task.
485        /// * `period` - The interval between task executions.
486        /// * `task_fn` - A function that defines the task to be executed. It takes a
487        ///   `CancellationToken` as an argument and returns a `Future` that resolves to a
488        ///   `Result<()>`.
489        ///
490        /// # Returns
491        /// A `TaskId` representing the unique identifier of the scheduled task.
492        ///
493        /// # Notes
494        /// - The task function is executed asynchronously.
495        /// - The `CancellationToken` can be used to gracefully cancel the task.
496        /// - The task is added to the internal task manager and can be managed (e.g., canceled or
497        ///   aborted) later.
498        pub fn add_scheduled_task_async<F>(
499            &self,
500            mode: ScheduleMode,
501            initial_delay: Duration,
502            period: Duration,
503            task_fn: F,
504        ) -> TaskId
505        where
506            F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
507            for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
508        {
509            let id = self.next_id();
510            let token = CancellationToken::new();
511            let token_child = token.clone();
512
513            let task_fn = ArcMut::new(task_fn);
514
515            let handle = tokio::spawn({
516                let mut task_fn = task_fn.clone();
517                async move {
518                    match mode {
519                        ScheduleMode::FixedRate => {
520                            let start = Instant::now() + initial_delay;
521                            let mut ticker = time::interval_at(start, period);
522
523                            loop {
524                                tokio::select! {
525                                    _ = token_child.cancelled() => {
526                                        info!("Task {} cancelled gracefully", id);
527                                        break;
528                                    }
529                                    _ = ticker.tick() => {
530                                        // Allow concurrent execution: One subtask per tick
531                                        let mut task_fn = task_fn.clone();
532                                        let child = token_child.clone();
533                                        tokio::spawn(async move {
534                                            // 1) Lock out &mut F, call once to get a future
535                                            let fut = {
536                                                task_fn(child)
537                                            };
538                                            // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
539                                            if let Err(e) = fut.await {
540                                                error!("FixedRate task {} failed: {:?}", id, e);
541                                            }
542                                        });
543                                    }
544                                }
545                            }
546                        }
547
548                        ScheduleMode::FixedDelay => {
549                            time::sleep(initial_delay).await;
550                            loop {
551                                tokio::select! {
552                                    _ = token_child.cancelled() => {
553                                        info!("Task {} cancelled gracefully", id);
554                                        break;
555                                    }
556                                    _ = async {
557                                        // Serial execution: complete one task and then sleep
558                                        let fut = {
559                                            (task_fn)(token_child.clone())
560                                        };
561                                        if let Err(e) = fut.await {
562                                            error!("FixedDelay task {} failed: {:?}", id, e);
563                                        }
564                                        time::sleep(period).await;
565                                    } => {}
566                                }
567                            }
568                        }
569
570                        ScheduleMode::FixedRateNoOverlap => {
571                            let start = Instant::now() + initial_delay;
572                            let mut ticker = time::interval_at(start, period);
573
574                            // Permission=1, controls non-overlapping execution
575                            let gate = Arc::new(Semaphore::new(1));
576
577                            loop {
578                                tokio::select! {
579                                    _ = token_child.cancelled() => {
580                                        info!("Task {} cancelled gracefully", id);
581                                        break;
582                                    }
583                                    _ = ticker.tick() => {
584                                        // Try to acquire permission. If unable to acquire, skip the current tick.
585                                        if let Ok(permit) = gate.clone().try_acquire_owned() {
586                                            let mut task_fn = task_fn.clone();
587                                            let child = token_child.clone();
588                                            tokio::spawn(async move {
589                                                // Release the lock immediately after generating the future
590                                                let fut = {
591                                                    (task_fn)(child)
592                                                };
593                                                if let Err(e) = fut.await {
594                                                    error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
595                                                }
596                                                drop(permit); // Release the permit after completion
597                                            });
598                                        } else {
599                                            info!("Task {} skipped due to overlap", id);
600                                        }
601                                    }
602                                }
603                            }
604                        }
605                    }
606                }
607            });
608
609            self.tasks.write().insert(
610                id,
611                TaskInfo {
612                    cancel_token: token,
613                    handle,
614                },
615            );
616
617            id
618        }
619    }
620}
621
622#[cfg(test)]
623mod tests {
624    use std::sync::atomic::AtomicUsize;
625    use std::sync::atomic::Ordering;
626    use std::sync::Arc;
627    use std::time::Duration;
628
629    use tokio::time;
630
631    use crate::schedule::simple_scheduler::*;
632
633    #[tokio::test]
634    async fn adds_task_and_increments_task_count() {
635        let manager = ScheduledTaskManager::new();
636        let task_id = manager.add_scheduled_task(
637            ScheduleMode::FixedRate,
638            Duration::from_secs(1),
639            Duration::from_secs(2),
640            |token| async move {
641                if token.is_cancelled() {
642                    return Ok(());
643                }
644                Ok(())
645            },
646        );
647
648        assert_eq!(manager.task_count(), 1);
649        manager.cancel_task(task_id);
650    }
651
652    #[tokio::test]
653    async fn cancels_task_and_decrements_task_count() {
654        let manager = ScheduledTaskManager::new();
655        let task_id = manager.add_scheduled_task(
656            ScheduleMode::FixedRate,
657            Duration::from_secs(1),
658            Duration::from_secs(2),
659            |token| async move {
660                if token.is_cancelled() {
661                    return Ok(());
662                }
663                Ok(())
664            },
665        );
666
667        manager.cancel_task(task_id);
668        assert_eq!(manager.task_count(), 0);
669    }
670
671    #[tokio::test]
672    async fn aborts_task_and_decrements_task_count() {
673        let manager = ScheduledTaskManager::new();
674        let task_id = manager.add_scheduled_task(
675            ScheduleMode::FixedRate,
676            Duration::from_secs(1),
677            Duration::from_secs(2),
678            |token| async move {
679                if token.is_cancelled() {
680                    return Ok(());
681                }
682                Ok(())
683            },
684        );
685
686        manager.abort_task(task_id);
687        assert_eq!(manager.task_count(), 0);
688    }
689
690    #[tokio::test]
691    async fn cancels_all_tasks() {
692        let manager = ScheduledTaskManager::new();
693        for _ in 0..3 {
694            manager.add_scheduled_task(
695                ScheduleMode::FixedRate,
696                Duration::from_secs(1),
697                Duration::from_secs(2),
698                |token| async move {
699                    if token.is_cancelled() {
700                        return Ok(());
701                    }
702                    Ok(())
703                },
704            );
705        }
706
707        assert_eq!(manager.task_count(), 3);
708        manager.cancel_all();
709        assert_eq!(manager.task_count(), 0);
710    }
711
712    #[tokio::test]
713    async fn aborts_all_tasks() {
714        let manager = ScheduledTaskManager::new();
715        for _ in 0..3 {
716            manager.add_scheduled_task(
717                ScheduleMode::FixedRate,
718                Duration::from_secs(1),
719                Duration::from_secs(2),
720                |token| async move {
721                    if token.is_cancelled() {
722                        return Ok(());
723                    }
724                    Ok(())
725                },
726            );
727        }
728
729        assert_eq!(manager.task_count(), 3);
730        manager.abort_all();
731        assert_eq!(manager.task_count(), 0);
732    }
733
734    #[tokio::test]
735    async fn skips_task_execution_in_fixed_rate_no_overlap_mode() {
736        let manager = ScheduledTaskManager::new();
737        let task_id = manager.add_scheduled_task(
738            ScheduleMode::FixedRateNoOverlap,
739            Duration::from_secs(0),
740            Duration::from_millis(100),
741            |token| async move {
742                tokio::time::sleep(Duration::from_millis(200)).await;
743                if token.is_cancelled() {
744                    return Ok(());
745                }
746                Ok(())
747            },
748        );
749
750        time::sleep(Duration::from_millis(400)).await;
751        manager.cancel_task(task_id);
752        assert_eq!(manager.task_count(), 0);
753    }
754
755    fn new_manager() -> ScheduledTaskManager {
756        ScheduledTaskManager::new()
757    }
758
759    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
760    async fn test_fixed_rate_task() {
761        let mgr = new_manager();
762        let counter = Arc::new(AtomicUsize::new(0));
763
764        let c = counter.clone();
765        let task_id = mgr.add_fixed_rate_task_async(
766            Duration::from_millis(50),
767            Duration::from_millis(100),
768            async move |_ctx| {
769                c.fetch_add(1, Ordering::Relaxed);
770                Ok(())
771            },
772        );
773
774        time::sleep(Duration::from_millis(500)).await;
775
776        mgr.cancel_task(task_id);
777        time::sleep(Duration::from_millis(50)).await;
778
779        let executed = counter.load(Ordering::Relaxed);
780        assert!(
781            executed >= 3,
782            "FixedRate executed too few times: {}",
783            executed
784        );
785    }
786
787    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
788    async fn test_fixed_delay_task() {
789        let mgr = new_manager();
790        let counter = Arc::new(AtomicUsize::new(0));
791
792        let c = counter.clone();
793        let task_id = mgr.add_fixed_delay_task_async(
794            Duration::from_millis(10),
795            Duration::from_millis(50),
796            async move |_ctx| {
797                c.fetch_add(1, Ordering::Relaxed);
798                Ok(())
799            },
800        );
801
802        time::sleep(Duration::from_millis(300)).await;
803
804        mgr.cancel_task(task_id);
805        time::sleep(Duration::from_millis(50)).await;
806
807        let executed = counter.load(Ordering::Relaxed);
808        assert!(
809            (3..=6).contains(&executed),
810            "FixedDelay count unexpected: {}",
811            executed
812        );
813    }
814
815    #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
816    async fn test_fixed_rate_no_overlap_task() {
817        let mgr = new_manager();
818        let counter = Arc::new(AtomicUsize::new(0));
819
820        let c = counter.clone();
821        let task_id = mgr.add_fixed_rate_no_overlap_task_async(
822            Duration::from_millis(10),
823            Duration::from_millis(50),
824            async move |_ctx| {
825                time::sleep(Duration::from_millis(80)).await;
826                c.fetch_add(1, Ordering::Relaxed);
827                Ok(())
828            },
829        );
830
831        time::sleep(Duration::from_millis(400)).await;
832
833        mgr.cancel_task(task_id);
834        time::sleep(Duration::from_millis(50)).await;
835
836        let executed = counter.load(Ordering::Relaxed);
837        assert!(
838            (2..=5).contains(&executed),
839            "FixedRateNoOverlap count unexpected: {}",
840            executed
841        );
842    }
843}