rocketmq_rust/schedule.rs
1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17pub mod executor;
18pub mod scheduler;
19pub mod task;
20pub mod trigger;
21
22use std::error::Error;
23use std::fmt;
24
25pub use executor::ExecutorPool;
26pub use task::Task;
27pub use task::TaskContext;
28pub use task::TaskResult;
29pub use task::TaskStatus;
30
31/// Scheduler error type
32#[derive(Debug)]
33pub enum SchedulerError {
34 TaskNotFound(String),
35 TaskAlreadyExists(String),
36 ExecutorError(String),
37 TriggerError(String),
38 SystemError(String),
39}
40
41impl fmt::Display for SchedulerError {
42 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
43 match self {
44 SchedulerError::TaskNotFound(id) => write!(f, "Task not found: {id}"),
45 SchedulerError::TaskAlreadyExists(id) => write!(f, "Task already exists: {id}"),
46 SchedulerError::ExecutorError(msg) => write!(f, "Executor error: {msg}"),
47 SchedulerError::TriggerError(msg) => write!(f, "Trigger error: {msg}"),
48 SchedulerError::SystemError(msg) => write!(f, "System error: {msg}"),
49 }
50 }
51}
52
53impl Error for SchedulerError {}
54
55pub type SchedulerResult<T> = Result<T, SchedulerError>;
56
57pub mod simple_scheduler {
58 use std::collections::HashMap;
59 use std::future::Future;
60 use std::sync::atomic::AtomicU64;
61 use std::sync::atomic::Ordering;
62 use std::sync::Arc;
63
64 use anyhow::Result;
65 use parking_lot::RwLock;
66 use tokio::sync::Semaphore;
67 use tokio::task::JoinHandle;
68 use tokio::time::Duration;
69 use tokio::time::Instant;
70 use tokio::time::{self};
71 use tokio_util::sync::CancellationToken;
72 use tracing::error;
73 use tracing::info;
74
75 use crate::ArcMut;
76
77 #[derive(Debug, Clone, Copy)]
78 pub enum ScheduleMode {
79 /// Align the beats, and they might pile up.
80 FixedRate,
81 /// Sleep only after the task is completed, and there will be no accumulation.
82 FixedDelay,
83 /// Align the beats, but skip if the last task is not yet completed.
84 FixedRateNoOverlap,
85 }
86
87 type TaskId = u64;
88
89 pub struct TaskInfo {
90 cancel_token: CancellationToken,
91 handle: JoinHandle<()>,
92 }
93
94 #[derive(Clone)]
95 pub struct ScheduledTaskManager {
96 tasks: Arc<RwLock<HashMap<TaskId, TaskInfo>>>,
97 counter: Arc<AtomicU64>,
98 }
99
100 impl Default for ScheduledTaskManager {
101 fn default() -> Self {
102 Self::new()
103 }
104 }
105
106 impl ScheduledTaskManager {
107 pub fn new() -> Self {
108 Self {
109 tasks: Arc::new(RwLock::new(HashMap::new())),
110 counter: Arc::new(AtomicU64::new(0)),
111 }
112 }
113
114 fn next_id(&self) -> TaskId {
115 self.counter.fetch_add(1, Ordering::Relaxed)
116 }
117
118 /// Adds a fixed-rate scheduled task to the task manager.
119 ///
120 /// # Arguments
121 /// * `initial_delay` - The delay before the first execution of the task.
122 /// * `period` - The interval between task executions.
123 /// * `task_fn` - A function that defines the task to be executed. It takes a
124 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
125 /// `Result<()>`.
126 ///
127 /// # Returns
128 /// A `TaskId` representing the unique identifier of the scheduled task.
129 ///
130 /// # Notes
131 /// - Tasks are executed at fixed intervals, even if previous executions overlap.
132 pub fn add_fixed_rate_task<F, Fut>(
133 &self,
134 initial_delay: Duration,
135 period: Duration,
136 task_fn: F,
137 ) -> TaskId
138 where
139 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
140 Fut: Future<Output = Result<()>> + Send + 'static,
141 {
142 self.add_scheduled_task(ScheduleMode::FixedRate, initial_delay, period, task_fn)
143 }
144
145 /// Adds a fixed-delay scheduled task to the task manager.
146 ///
147 /// # Arguments
148 /// * `initial_delay` - The delay before the first execution of the task.
149 /// * `period` - The interval between task executions.
150 /// * `task_fn` - A function that defines the task to be executed. It takes a
151 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
152 /// `Result<()>`.
153 ///
154 /// # Returns
155 /// A `TaskId` representing the unique identifier of the scheduled task.
156 ///
157 /// # Notes
158 /// - Tasks are executed serially, with a delay after each task completes.
159 pub fn add_fixed_delay_task<F, Fut>(
160 &self,
161 initial_delay: Duration,
162 period: Duration,
163 task_fn: F,
164 ) -> TaskId
165 where
166 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
167 Fut: Future<Output = Result<()>> + Send + 'static,
168 {
169 self.add_scheduled_task(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
170 }
171
172 /// Adds a fixed-rate-no-overlap scheduled task to the task manager.
173 ///
174 /// # Arguments
175 /// * `initial_delay` - The delay before the first execution of the task.
176 /// * `period` - The interval between task executions.
177 /// * `task_fn` - A function that defines the task to be executed. It takes a
178 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
179 /// `Result<()>`.
180 ///
181 /// # Returns
182 /// A `TaskId` representing the unique identifier of the scheduled task.
183 ///
184 /// # Notes
185 /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
186 pub fn add_fixed_rate_no_overlap_task<F, Fut>(
187 &self,
188 initial_delay: Duration,
189 period: Duration,
190 task_fn: F,
191 ) -> TaskId
192 where
193 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
194 Fut: Future<Output = Result<()>> + Send + 'static,
195 {
196 self.add_scheduled_task(
197 ScheduleMode::FixedRateNoOverlap,
198 initial_delay,
199 period,
200 task_fn,
201 )
202 }
203
204 /// Adds a scheduled task to the task manager.
205 ///
206 /// # Arguments
207 /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
208 /// - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
209 /// - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
210 /// - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
211 /// still running.
212 /// * `initial_delay` - The delay before the first execution of the task.
213 /// * `period` - The interval between task executions.
214 /// * `task_fn` - A function that defines the task to be executed. It takes a
215 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
216 /// `Result<()>`.
217 ///
218 /// # Returns
219 /// A `TaskId` representing the unique identifier of the scheduled task.
220 ///
221 /// # Notes
222 /// - The task function is executed asynchronously.
223 /// - The `CancellationToken` can be used to gracefully cancel the task.
224 /// - The task is added to the internal task manager and can be managed (e.g., canceled or
225 /// aborted) later.
226 pub fn add_scheduled_task<F, Fut>(
227 &self,
228 mode: ScheduleMode,
229 initial_delay: Duration,
230 period: Duration,
231 task_fn: F,
232 ) -> TaskId
233 where
234 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
235 Fut: Future<Output = Result<()>> + Send + 'static,
236 {
237 let id = self.next_id();
238 let token = CancellationToken::new();
239 let token_child = token.clone();
240
241 let task_fn = ArcMut::new(task_fn);
242
243 let handle = tokio::spawn({
244 let mut task_fn = task_fn.clone();
245 async move {
246 match mode {
247 ScheduleMode::FixedRate => {
248 let start = Instant::now() + initial_delay;
249 let mut ticker = time::interval_at(start, period);
250
251 loop {
252 tokio::select! {
253 _ = token_child.cancelled() => {
254 info!("Task {} cancelled gracefully", id);
255 break;
256 }
257 _ = ticker.tick() => {
258 // Allow concurrent execution: One subtask per tick
259 let mut task_fn = task_fn.clone();
260 let child = token_child.clone();
261 tokio::spawn(async move {
262 // 1) Lock out &mut F, call once to get a future
263 let fut = {
264 (task_fn)(child)
265 };
266 // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
267 if let Err(e) = fut.await {
268 error!("FixedRate task {} failed: {:?}", id, e);
269 }
270 });
271 }
272 }
273 }
274 }
275
276 ScheduleMode::FixedDelay => {
277 time::sleep(initial_delay).await;
278 loop {
279 tokio::select! {
280 _ = token_child.cancelled() => {
281 info!("Task {} cancelled gracefully", id);
282 break;
283 }
284 _ = async {
285 // Serial execution: complete one task and then sleep
286 let fut = {
287 (task_fn)(token_child.clone())
288 };
289 if let Err(e) = fut.await {
290 error!("FixedDelay task {} failed: {:?}", id, e);
291 }
292 time::sleep(period).await;
293 } => {}
294 }
295 }
296 }
297
298 ScheduleMode::FixedRateNoOverlap => {
299 let start = Instant::now() + initial_delay;
300 let mut ticker = time::interval_at(start, period);
301
302 // Permission=1, controls non-overlapping execution
303 let gate = Arc::new(Semaphore::new(1));
304
305 loop {
306 tokio::select! {
307 _ = token_child.cancelled() => {
308 info!("Task {} cancelled gracefully", id);
309 break;
310 }
311 _ = ticker.tick() => {
312 // Try to acquire permission. If unable to acquire, skip the current tick.
313 if let Ok(permit) = gate.clone().try_acquire_owned() {
314 let mut task_fn = task_fn.clone();
315 let child = token_child.clone();
316 tokio::spawn(async move {
317 // Release the lock immediately after generating the future
318 let fut = {
319 (task_fn)(child)
320 };
321 if let Err(e) = fut.await {
322 error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
323 }
324 drop(permit); // Release the permit after completion
325 });
326 } else {
327 info!("Task {} skipped due to overlap", id);
328 }
329 }
330 }
331 }
332 }
333 }
334 }
335 });
336
337 self.tasks.write().insert(
338 id,
339 TaskInfo {
340 cancel_token: token,
341 handle,
342 },
343 );
344
345 id
346 }
347
348 /// Graceful cancellation
349 pub fn cancel_task(&self, id: TaskId) {
350 if let Some(info) = self.tasks.write().remove(&id) {
351 info.cancel_token.cancel();
352 tokio::spawn(async move {
353 let _ = info.handle.await;
354 });
355 }
356 }
357
358 /// Roughly abort
359 pub fn abort_task(&self, id: TaskId) {
360 if let Some(info) = self.tasks.write().remove(&id) {
361 info.handle.abort();
362 }
363 }
364
365 /// Batch cancel
366 pub fn cancel_all(&self) {
367 let mut tasks = self.tasks.write();
368 for (_, info) in tasks.drain() {
369 info.cancel_token.cancel();
370 tokio::spawn(async move {
371 let _ = info.handle.await;
372 });
373 }
374 }
375
376 /// Batch abort
377 pub fn abort_all(&self) {
378 let mut tasks = self.tasks.write();
379 for (_, info) in tasks.drain() {
380 info.handle.abort();
381 }
382 }
383
384 pub fn task_count(&self) -> usize {
385 self.tasks.read().len()
386 }
387 }
388
389 impl ScheduledTaskManager {
390 /// Adds a fixed-rate scheduled task to the task manager asynchronously.
391 ///
392 /// # Arguments
393 /// * `initial_delay` - The delay before the first execution of the task.
394 /// * `period` - The interval between task executions.
395 /// * `task_fn` - A function that defines the task to be executed. It takes a
396 /// `CancellationToken` as an argument and returns a `Result<()>`.
397 ///
398 /// # Returns
399 /// A `TaskId` representing the unique identifier of the scheduled task.
400 ///
401 /// # Notes
402 /// - Tasks are executed at fixed intervals, even if previous executions overlap.
403 /// - The task function is executed asynchronously.
404 pub fn add_fixed_rate_task_async<F>(
405 &self,
406 initial_delay: Duration,
407 period: Duration,
408 task_fn: F,
409 ) -> TaskId
410 where
411 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
412 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
413 {
414 self.add_scheduled_task_async(ScheduleMode::FixedRate, initial_delay, period, task_fn)
415 }
416
417 /// Adds a fixed-delay scheduled task to the task manager asynchronously.
418 ///
419 /// # Arguments
420 /// * `initial_delay` - The delay before the first execution of the task.
421 /// * `period` - The interval between task executions.
422 /// * `task_fn` - A function that defines the task to be executed. It takes a
423 /// `CancellationToken` as an argument and returns a `Result<()>`.
424 ///
425 /// # Returns
426 /// A `TaskId` representing the unique identifier of the scheduled task.
427 ///
428 /// # Notes
429 /// - Tasks are executed serially, with a delay after each task completes.
430 /// - The task function is executed asynchronously.
431 pub fn add_fixed_delay_task_async<F>(
432 &self,
433 initial_delay: Duration,
434 period: Duration,
435 task_fn: F,
436 ) -> TaskId
437 where
438 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
439 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
440 {
441 self.add_scheduled_task_async(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
442 }
443
444 /// Adds a fixed-rate-no-overlap scheduled task to the task manager asynchronously.
445 ///
446 /// # Arguments
447 /// * `initial_delay` - The delay before the first execution of the task.
448 /// * `period` - The interval between task executions.
449 /// * `task_fn` - A function that defines the task to be executed. It takes a
450 /// `CancellationToken` as an argument and returns a `Result<()>`.
451 ///
452 /// # Returns
453 /// A `TaskId` representing the unique identifier of the scheduled task.
454 ///
455 /// # Notes
456 /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
457 /// - The task function is executed asynchronously.
458 pub fn add_fixed_rate_no_overlap_task_async<F>(
459 &self,
460 initial_delay: Duration,
461 period: Duration,
462 task_fn: F,
463 ) -> TaskId
464 where
465 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
466 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
467 {
468 self.add_scheduled_task_async(
469 ScheduleMode::FixedRateNoOverlap,
470 initial_delay,
471 period,
472 task_fn,
473 )
474 }
475
476 /// Adds a scheduled task to the task manager asynchronously.
477 ///
478 /// # Arguments
479 /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
480 /// - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
481 /// - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
482 /// - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
483 /// still running.
484 /// * `initial_delay` - The delay before the first execution of the task.
485 /// * `period` - The interval between task executions.
486 /// * `task_fn` - A function that defines the task to be executed. It takes a
487 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
488 /// `Result<()>`.
489 ///
490 /// # Returns
491 /// A `TaskId` representing the unique identifier of the scheduled task.
492 ///
493 /// # Notes
494 /// - The task function is executed asynchronously.
495 /// - The `CancellationToken` can be used to gracefully cancel the task.
496 /// - The task is added to the internal task manager and can be managed (e.g., canceled or
497 /// aborted) later.
498 pub fn add_scheduled_task_async<F>(
499 &self,
500 mode: ScheduleMode,
501 initial_delay: Duration,
502 period: Duration,
503 task_fn: F,
504 ) -> TaskId
505 where
506 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
507 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
508 {
509 let id = self.next_id();
510 let token = CancellationToken::new();
511 let token_child = token.clone();
512
513 let task_fn = ArcMut::new(task_fn);
514
515 let handle = tokio::spawn({
516 let mut task_fn = task_fn.clone();
517 async move {
518 match mode {
519 ScheduleMode::FixedRate => {
520 let start = Instant::now() + initial_delay;
521 let mut ticker = time::interval_at(start, period);
522
523 loop {
524 tokio::select! {
525 _ = token_child.cancelled() => {
526 info!("Task {} cancelled gracefully", id);
527 break;
528 }
529 _ = ticker.tick() => {
530 // Allow concurrent execution: One subtask per tick
531 let mut task_fn = task_fn.clone();
532 let child = token_child.clone();
533 tokio::spawn(async move {
534 // 1) Lock out &mut F, call once to get a future
535 let fut = {
536 task_fn(child)
537 };
538 // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
539 if let Err(e) = fut.await {
540 error!("FixedRate task {} failed: {:?}", id, e);
541 }
542 });
543 }
544 }
545 }
546 }
547
548 ScheduleMode::FixedDelay => {
549 time::sleep(initial_delay).await;
550 loop {
551 tokio::select! {
552 _ = token_child.cancelled() => {
553 info!("Task {} cancelled gracefully", id);
554 break;
555 }
556 _ = async {
557 // Serial execution: complete one task and then sleep
558 let fut = {
559 (task_fn)(token_child.clone())
560 };
561 if let Err(e) = fut.await {
562 error!("FixedDelay task {} failed: {:?}", id, e);
563 }
564 time::sleep(period).await;
565 } => {}
566 }
567 }
568 }
569
570 ScheduleMode::FixedRateNoOverlap => {
571 let start = Instant::now() + initial_delay;
572 let mut ticker = time::interval_at(start, period);
573
574 // Permission=1, controls non-overlapping execution
575 let gate = Arc::new(Semaphore::new(1));
576
577 loop {
578 tokio::select! {
579 _ = token_child.cancelled() => {
580 info!("Task {} cancelled gracefully", id);
581 break;
582 }
583 _ = ticker.tick() => {
584 // Try to acquire permission. If unable to acquire, skip the current tick.
585 if let Ok(permit) = gate.clone().try_acquire_owned() {
586 let mut task_fn = task_fn.clone();
587 let child = token_child.clone();
588 tokio::spawn(async move {
589 // Release the lock immediately after generating the future
590 let fut = {
591 (task_fn)(child)
592 };
593 if let Err(e) = fut.await {
594 error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
595 }
596 drop(permit); // Release the permit after completion
597 });
598 } else {
599 info!("Task {} skipped due to overlap", id);
600 }
601 }
602 }
603 }
604 }
605 }
606 }
607 });
608
609 self.tasks.write().insert(
610 id,
611 TaskInfo {
612 cancel_token: token,
613 handle,
614 },
615 );
616
617 id
618 }
619 }
620}
621
622#[cfg(test)]
623mod tests {
624 use std::sync::atomic::AtomicUsize;
625 use std::sync::atomic::Ordering;
626 use std::sync::Arc;
627 use std::time::Duration;
628
629 use tokio::time;
630
631 use crate::schedule::simple_scheduler::*;
632
633 #[tokio::test]
634 async fn adds_task_and_increments_task_count() {
635 let manager = ScheduledTaskManager::new();
636 let task_id = manager.add_scheduled_task(
637 ScheduleMode::FixedRate,
638 Duration::from_secs(1),
639 Duration::from_secs(2),
640 |token| async move {
641 if token.is_cancelled() {
642 return Ok(());
643 }
644 Ok(())
645 },
646 );
647
648 assert_eq!(manager.task_count(), 1);
649 manager.cancel_task(task_id);
650 }
651
652 #[tokio::test]
653 async fn cancels_task_and_decrements_task_count() {
654 let manager = ScheduledTaskManager::new();
655 let task_id = manager.add_scheduled_task(
656 ScheduleMode::FixedRate,
657 Duration::from_secs(1),
658 Duration::from_secs(2),
659 |token| async move {
660 if token.is_cancelled() {
661 return Ok(());
662 }
663 Ok(())
664 },
665 );
666
667 manager.cancel_task(task_id);
668 assert_eq!(manager.task_count(), 0);
669 }
670
671 #[tokio::test]
672 async fn aborts_task_and_decrements_task_count() {
673 let manager = ScheduledTaskManager::new();
674 let task_id = manager.add_scheduled_task(
675 ScheduleMode::FixedRate,
676 Duration::from_secs(1),
677 Duration::from_secs(2),
678 |token| async move {
679 if token.is_cancelled() {
680 return Ok(());
681 }
682 Ok(())
683 },
684 );
685
686 manager.abort_task(task_id);
687 assert_eq!(manager.task_count(), 0);
688 }
689
690 #[tokio::test]
691 async fn cancels_all_tasks() {
692 let manager = ScheduledTaskManager::new();
693 for _ in 0..3 {
694 manager.add_scheduled_task(
695 ScheduleMode::FixedRate,
696 Duration::from_secs(1),
697 Duration::from_secs(2),
698 |token| async move {
699 if token.is_cancelled() {
700 return Ok(());
701 }
702 Ok(())
703 },
704 );
705 }
706
707 assert_eq!(manager.task_count(), 3);
708 manager.cancel_all();
709 assert_eq!(manager.task_count(), 0);
710 }
711
712 #[tokio::test]
713 async fn aborts_all_tasks() {
714 let manager = ScheduledTaskManager::new();
715 for _ in 0..3 {
716 manager.add_scheduled_task(
717 ScheduleMode::FixedRate,
718 Duration::from_secs(1),
719 Duration::from_secs(2),
720 |token| async move {
721 if token.is_cancelled() {
722 return Ok(());
723 }
724 Ok(())
725 },
726 );
727 }
728
729 assert_eq!(manager.task_count(), 3);
730 manager.abort_all();
731 assert_eq!(manager.task_count(), 0);
732 }
733
734 #[tokio::test]
735 async fn skips_task_execution_in_fixed_rate_no_overlap_mode() {
736 let manager = ScheduledTaskManager::new();
737 let task_id = manager.add_scheduled_task(
738 ScheduleMode::FixedRateNoOverlap,
739 Duration::from_secs(0),
740 Duration::from_millis(100),
741 |token| async move {
742 tokio::time::sleep(Duration::from_millis(200)).await;
743 if token.is_cancelled() {
744 return Ok(());
745 }
746 Ok(())
747 },
748 );
749
750 time::sleep(Duration::from_millis(400)).await;
751 manager.cancel_task(task_id);
752 assert_eq!(manager.task_count(), 0);
753 }
754
755 fn new_manager() -> ScheduledTaskManager {
756 ScheduledTaskManager::new()
757 }
758
759 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
760 async fn test_fixed_rate_task() {
761 let mgr = new_manager();
762 let counter = Arc::new(AtomicUsize::new(0));
763
764 let c = counter.clone();
765 let task_id = mgr.add_fixed_rate_task_async(
766 Duration::from_millis(50),
767 Duration::from_millis(100),
768 async move |_ctx| {
769 c.fetch_add(1, Ordering::Relaxed);
770 Ok(())
771 },
772 );
773
774 time::sleep(Duration::from_millis(500)).await;
775
776 mgr.cancel_task(task_id);
777 time::sleep(Duration::from_millis(50)).await;
778
779 let executed = counter.load(Ordering::Relaxed);
780 assert!(
781 executed >= 3,
782 "FixedRate executed too few times: {}",
783 executed
784 );
785 }
786
787 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
788 async fn test_fixed_delay_task() {
789 let mgr = new_manager();
790 let counter = Arc::new(AtomicUsize::new(0));
791
792 let c = counter.clone();
793 let task_id = mgr.add_fixed_delay_task_async(
794 Duration::from_millis(10),
795 Duration::from_millis(50),
796 async move |_ctx| {
797 c.fetch_add(1, Ordering::Relaxed);
798 Ok(())
799 },
800 );
801
802 time::sleep(Duration::from_millis(300)).await;
803
804 mgr.cancel_task(task_id);
805 time::sleep(Duration::from_millis(50)).await;
806
807 let executed = counter.load(Ordering::Relaxed);
808 assert!(
809 (3..=6).contains(&executed),
810 "FixedDelay count unexpected: {}",
811 executed
812 );
813 }
814
815 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
816 async fn test_fixed_rate_no_overlap_task() {
817 let mgr = new_manager();
818 let counter = Arc::new(AtomicUsize::new(0));
819
820 let c = counter.clone();
821 let task_id = mgr.add_fixed_rate_no_overlap_task_async(
822 Duration::from_millis(10),
823 Duration::from_millis(50),
824 async move |_ctx| {
825 time::sleep(Duration::from_millis(80)).await;
826 c.fetch_add(1, Ordering::Relaxed);
827 Ok(())
828 },
829 );
830
831 time::sleep(Duration::from_millis(400)).await;
832
833 mgr.cancel_task(task_id);
834 time::sleep(Duration::from_millis(50)).await;
835
836 let executed = counter.load(Ordering::Relaxed);
837 assert!(
838 (2..=5).contains(&executed),
839 "FixedRateNoOverlap count unexpected: {}",
840 executed
841 );
842 }
843}