rocketmq_rust/schedule.rs
1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod executor;
16pub mod scheduler;
17pub mod task;
18pub mod trigger;
19
20use std::error::Error;
21use std::fmt;
22
23pub use executor::ExecutorPool;
24pub use task::Task;
25pub use task::TaskContext;
26pub use task::TaskResult;
27pub use task::TaskStatus;
28
29/// Scheduler error type
30#[derive(Debug)]
31pub enum SchedulerError {
32 TaskNotFound(String),
33 TaskAlreadyExists(String),
34 ExecutorError(String),
35 TriggerError(String),
36 SystemError(String),
37}
38
39impl fmt::Display for SchedulerError {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 SchedulerError::TaskNotFound(id) => write!(f, "Task not found: {id}"),
43 SchedulerError::TaskAlreadyExists(id) => write!(f, "Task already exists: {id}"),
44 SchedulerError::ExecutorError(msg) => write!(f, "Executor error: {msg}"),
45 SchedulerError::TriggerError(msg) => write!(f, "Trigger error: {msg}"),
46 SchedulerError::SystemError(msg) => write!(f, "System error: {msg}"),
47 }
48 }
49}
50
51impl Error for SchedulerError {}
52
53pub type SchedulerResult<T> = Result<T, SchedulerError>;
54
55pub mod simple_scheduler {
56 use std::collections::HashMap;
57 use std::future::Future;
58 use std::sync::atomic::AtomicU64;
59 use std::sync::atomic::Ordering;
60 use std::sync::Arc;
61
62 use anyhow::Result;
63 use parking_lot::RwLock;
64 use tokio::sync::Semaphore;
65 use tokio::task::JoinHandle;
66 use tokio::time::Duration;
67 use tokio::time::Instant;
68 use tokio::time::{self};
69 use tokio_util::sync::CancellationToken;
70 use tracing::error;
71 use tracing::info;
72
73 use crate::ArcMut;
74
75 #[derive(Debug, Clone, Copy)]
76 pub enum ScheduleMode {
77 /// Align the beats, and they might pile up.
78 FixedRate,
79 /// Sleep only after the task is completed, and there will be no accumulation.
80 FixedDelay,
81 /// Align the beats, but skip if the last task is not yet completed.
82 FixedRateNoOverlap,
83 }
84
85 type TaskId = u64;
86
87 pub struct TaskInfo {
88 cancel_token: CancellationToken,
89 handle: JoinHandle<()>,
90 }
91
92 #[derive(Clone)]
93 pub struct ScheduledTaskManager {
94 tasks: Arc<RwLock<HashMap<TaskId, TaskInfo>>>,
95 counter: Arc<AtomicU64>,
96 }
97
98 impl Default for ScheduledTaskManager {
99 fn default() -> Self {
100 Self::new()
101 }
102 }
103
104 impl ScheduledTaskManager {
105 pub fn new() -> Self {
106 Self {
107 tasks: Arc::new(RwLock::new(HashMap::new())),
108 counter: Arc::new(AtomicU64::new(0)),
109 }
110 }
111
112 fn next_id(&self) -> TaskId {
113 self.counter.fetch_add(1, Ordering::Relaxed)
114 }
115
116 /// Adds a fixed-rate scheduled task to the task manager.
117 ///
118 /// # Arguments
119 /// * `initial_delay` - The delay before the first execution of the task.
120 /// * `period` - The interval between task executions.
121 /// * `task_fn` - A function that defines the task to be executed. It takes a
122 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
123 /// `Result<()>`.
124 ///
125 /// # Returns
126 /// A `TaskId` representing the unique identifier of the scheduled task.
127 ///
128 /// # Notes
129 /// - Tasks are executed at fixed intervals, even if previous executions overlap.
130 pub fn add_fixed_rate_task<F, Fut>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
131 where
132 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
133 Fut: Future<Output = Result<()>> + Send + 'static,
134 {
135 self.add_scheduled_task(ScheduleMode::FixedRate, initial_delay, period, task_fn)
136 }
137
138 /// Adds a fixed-delay scheduled task to the task manager.
139 ///
140 /// # Arguments
141 /// * `initial_delay` - The delay before the first execution of the task.
142 /// * `period` - The interval between task executions.
143 /// * `task_fn` - A function that defines the task to be executed. It takes a
144 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
145 /// `Result<()>`.
146 ///
147 /// # Returns
148 /// A `TaskId` representing the unique identifier of the scheduled task.
149 ///
150 /// # Notes
151 /// - Tasks are executed serially, with a delay after each task completes.
152 pub fn add_fixed_delay_task<F, Fut>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
153 where
154 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
155 Fut: Future<Output = Result<()>> + Send + 'static,
156 {
157 self.add_scheduled_task(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
158 }
159
160 /// Adds a fixed-rate-no-overlap scheduled task to the task manager.
161 ///
162 /// # Arguments
163 /// * `initial_delay` - The delay before the first execution of the task.
164 /// * `period` - The interval between task executions.
165 /// * `task_fn` - A function that defines the task to be executed. It takes a
166 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
167 /// `Result<()>`.
168 ///
169 /// # Returns
170 /// A `TaskId` representing the unique identifier of the scheduled task.
171 ///
172 /// # Notes
173 /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
174 pub fn add_fixed_rate_no_overlap_task<F, Fut>(
175 &self,
176 initial_delay: Duration,
177 period: Duration,
178 task_fn: F,
179 ) -> TaskId
180 where
181 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
182 Fut: Future<Output = Result<()>> + Send + 'static,
183 {
184 self.add_scheduled_task(ScheduleMode::FixedRateNoOverlap, initial_delay, period, task_fn)
185 }
186
187 /// Adds a scheduled task to the task manager.
188 ///
189 /// # Arguments
190 /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
191 /// - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
192 /// - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
193 /// - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
194 /// still running.
195 /// * `initial_delay` - The delay before the first execution of the task.
196 /// * `period` - The interval between task executions.
197 /// * `task_fn` - A function that defines the task to be executed. It takes a
198 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
199 /// `Result<()>`.
200 ///
201 /// # Returns
202 /// A `TaskId` representing the unique identifier of the scheduled task.
203 ///
204 /// # Notes
205 /// - The task function is executed asynchronously.
206 /// - The `CancellationToken` can be used to gracefully cancel the task.
207 /// - The task is added to the internal task manager and can be managed (e.g., canceled or
208 /// aborted) later.
209 pub fn add_scheduled_task<F, Fut>(
210 &self,
211 mode: ScheduleMode,
212 initial_delay: Duration,
213 period: Duration,
214 task_fn: F,
215 ) -> TaskId
216 where
217 F: FnMut(CancellationToken) -> Fut + Send + Sync + 'static,
218 Fut: Future<Output = Result<()>> + Send + 'static,
219 {
220 let id = self.next_id();
221 let token = CancellationToken::new();
222 let token_child = token.clone();
223
224 let task_fn = ArcMut::new(task_fn);
225
226 let handle = tokio::spawn({
227 let mut task_fn = task_fn;
228 async move {
229 match mode {
230 ScheduleMode::FixedRate => {
231 let start = Instant::now() + initial_delay;
232 let mut ticker = time::interval_at(start, period);
233
234 loop {
235 tokio::select! {
236 _ = token_child.cancelled() => {
237 info!("Task {} cancelled gracefully", id);
238 break;
239 }
240 _ = ticker.tick() => {
241 // Allow concurrent execution: One subtask per tick
242 let mut task_fn = task_fn.clone();
243 let child = token_child.clone();
244 tokio::spawn(async move {
245 // 1) Lock out &mut F, call once to get a future
246 let fut = {
247 (task_fn)(child)
248 };
249 // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
250 if let Err(e) = fut.await {
251 error!("FixedRate task {} failed: {:?}", id, e);
252 }
253 });
254 }
255 }
256 }
257 }
258
259 ScheduleMode::FixedDelay => {
260 time::sleep(initial_delay).await;
261 loop {
262 tokio::select! {
263 _ = token_child.cancelled() => {
264 info!("Task {} cancelled gracefully", id);
265 break;
266 }
267 _ = async {
268 // Serial execution: complete one task and then sleep
269 let fut = {
270 (task_fn)(token_child.clone())
271 };
272 if let Err(e) = fut.await {
273 error!("FixedDelay task {} failed: {:?}", id, e);
274 }
275 time::sleep(period).await;
276 } => {}
277 }
278 }
279 }
280
281 ScheduleMode::FixedRateNoOverlap => {
282 let start = Instant::now() + initial_delay;
283 let mut ticker = time::interval_at(start, period);
284
285 // Permission=1, controls non-overlapping execution
286 let gate = Arc::new(Semaphore::new(1));
287
288 loop {
289 tokio::select! {
290 _ = token_child.cancelled() => {
291 info!("Task {} cancelled gracefully", id);
292 break;
293 }
294 _ = ticker.tick() => {
295 // Try to acquire permission. If unable to acquire, skip the current tick.
296 if let Ok(permit) = gate.clone().try_acquire_owned() {
297 let mut task_fn = task_fn.clone();
298 let child = token_child.clone();
299 tokio::spawn(async move {
300 // Release the lock immediately after generating the future
301 let fut = {
302 (task_fn)(child)
303 };
304 if let Err(e) = fut.await {
305 error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
306 }
307 drop(permit); // Release the permit after completion
308 });
309 } else {
310 info!("Task {} skipped due to overlap", id);
311 }
312 }
313 }
314 }
315 }
316 }
317 }
318 });
319
320 self.tasks.write().insert(
321 id,
322 TaskInfo {
323 cancel_token: token,
324 handle,
325 },
326 );
327
328 id
329 }
330
331 /// Graceful cancellation
332 pub fn cancel_task(&self, id: TaskId) {
333 if let Some(info) = self.tasks.write().remove(&id) {
334 info.cancel_token.cancel();
335 tokio::spawn(async move {
336 let _ = info.handle.await;
337 });
338 }
339 }
340
341 /// Roughly abort
342 pub fn abort_task(&self, id: TaskId) {
343 if let Some(info) = self.tasks.write().remove(&id) {
344 info.handle.abort();
345 }
346 }
347
348 /// Batch cancel
349 pub fn cancel_all(&self) {
350 let mut tasks = self.tasks.write();
351 for (_, info) in tasks.drain() {
352 info.cancel_token.cancel();
353 tokio::spawn(async move {
354 let _ = info.handle.await;
355 });
356 }
357 }
358
359 /// Batch abort
360 pub fn abort_all(&self) {
361 let mut tasks = self.tasks.write();
362 for (_, info) in tasks.drain() {
363 info.handle.abort();
364 }
365 }
366
367 pub fn task_count(&self) -> usize {
368 self.tasks.read().len()
369 }
370 }
371
372 impl ScheduledTaskManager {
373 /// Adds a fixed-rate scheduled task to the task manager asynchronously.
374 ///
375 /// # Arguments
376 /// * `initial_delay` - The delay before the first execution of the task.
377 /// * `period` - The interval between task executions.
378 /// * `task_fn` - A function that defines the task to be executed. It takes a
379 /// `CancellationToken` as an argument and returns a `Result<()>`.
380 ///
381 /// # Returns
382 /// A `TaskId` representing the unique identifier of the scheduled task.
383 ///
384 /// # Notes
385 /// - Tasks are executed at fixed intervals, even if previous executions overlap.
386 /// - The task function is executed asynchronously.
387 pub fn add_fixed_rate_task_async<F>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
388 where
389 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
390 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
391 {
392 self.add_scheduled_task_async(ScheduleMode::FixedRate, initial_delay, period, task_fn)
393 }
394
395 /// Adds a fixed-delay scheduled task to the task manager asynchronously.
396 ///
397 /// # Arguments
398 /// * `initial_delay` - The delay before the first execution of the task.
399 /// * `period` - The interval between task executions.
400 /// * `task_fn` - A function that defines the task to be executed. It takes a
401 /// `CancellationToken` as an argument and returns a `Result<()>`.
402 ///
403 /// # Returns
404 /// A `TaskId` representing the unique identifier of the scheduled task.
405 ///
406 /// # Notes
407 /// - Tasks are executed serially, with a delay after each task completes.
408 /// - The task function is executed asynchronously.
409 pub fn add_fixed_delay_task_async<F>(&self, initial_delay: Duration, period: Duration, task_fn: F) -> TaskId
410 where
411 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
412 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
413 {
414 self.add_scheduled_task_async(ScheduleMode::FixedDelay, initial_delay, period, task_fn)
415 }
416
417 /// Adds a fixed-rate-no-overlap scheduled task to the task manager asynchronously.
418 ///
419 /// # Arguments
420 /// * `initial_delay` - The delay before the first execution of the task.
421 /// * `period` - The interval between task executions.
422 /// * `task_fn` - A function that defines the task to be executed. It takes a
423 /// `CancellationToken` as an argument and returns a `Result<()>`.
424 ///
425 /// # Returns
426 /// A `TaskId` representing the unique identifier of the scheduled task.
427 ///
428 /// # Notes
429 /// - Tasks are executed at fixed intervals, but overlapping executions are skipped.
430 /// - The task function is executed asynchronously.
431 pub fn add_fixed_rate_no_overlap_task_async<F>(
432 &self,
433 initial_delay: Duration,
434 period: Duration,
435 task_fn: F,
436 ) -> TaskId
437 where
438 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
439 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
440 {
441 self.add_scheduled_task_async(ScheduleMode::FixedRateNoOverlap, initial_delay, period, task_fn)
442 }
443
444 /// Adds a scheduled task to the task manager asynchronously.
445 ///
446 /// # Arguments
447 /// * `mode` - The scheduling mode for the task. Determines how the task is executed:
448 /// - `FixedRate`: Aligns the beats, allowing tasks to pile up if they take too long.
449 /// - `FixedDelay`: Executes tasks serially, with a delay after each task completes.
450 /// - `FixedRateNoOverlap`: Aligns the beats but skips execution if the previous task is
451 /// still running.
452 /// * `initial_delay` - The delay before the first execution of the task.
453 /// * `period` - The interval between task executions.
454 /// * `task_fn` - A function that defines the task to be executed. It takes a
455 /// `CancellationToken` as an argument and returns a `Future` that resolves to a
456 /// `Result<()>`.
457 ///
458 /// # Returns
459 /// A `TaskId` representing the unique identifier of the scheduled task.
460 ///
461 /// # Notes
462 /// - The task function is executed asynchronously.
463 /// - The `CancellationToken` can be used to gracefully cancel the task.
464 /// - The task is added to the internal task manager and can be managed (e.g., canceled or
465 /// aborted) later.
466 pub fn add_scheduled_task_async<F>(
467 &self,
468 mode: ScheduleMode,
469 initial_delay: Duration,
470 period: Duration,
471 task_fn: F,
472 ) -> TaskId
473 where
474 F: AsyncFnMut(CancellationToken) -> Result<()> + Send + Sync + 'static,
475 for<'a> <F as AsyncFnMut<(CancellationToken,)>>::CallRefFuture<'a>: Send,
476 {
477 let id = self.next_id();
478 let token = CancellationToken::new();
479 let token_child = token.clone();
480
481 let task_fn = ArcMut::new(task_fn);
482
483 let handle = tokio::spawn({
484 let mut task_fn = task_fn;
485 async move {
486 match mode {
487 ScheduleMode::FixedRate => {
488 let start = Instant::now() + initial_delay;
489 let mut ticker = time::interval_at(start, period);
490
491 loop {
492 tokio::select! {
493 _ = token_child.cancelled() => {
494 info!("Task {} cancelled gracefully", id);
495 break;
496 }
497 _ = ticker.tick() => {
498 // Allow concurrent execution: One subtask per tick
499 let mut task_fn = task_fn.clone();
500 let child = token_child.clone();
501 tokio::spawn(async move {
502 // 1) Lock out &mut F, call once to get a future
503 let fut = {
504 task_fn(child)
505 };
506 // The lock has been released. Awaiting here ensures the lock doesn't cross await boundaries.
507 if let Err(e) = fut.await {
508 error!("FixedRate task {} failed: {:?}", id, e);
509 }
510 });
511 }
512 }
513 }
514 }
515
516 ScheduleMode::FixedDelay => {
517 time::sleep(initial_delay).await;
518 loop {
519 tokio::select! {
520 _ = token_child.cancelled() => {
521 info!("Task {} cancelled gracefully", id);
522 break;
523 }
524 _ = async {
525 // Serial execution: complete one task and then sleep
526 let fut = {
527 (task_fn)(token_child.clone())
528 };
529 if let Err(e) = fut.await {
530 error!("FixedDelay task {} failed: {:?}", id, e);
531 }
532 time::sleep(period).await;
533 } => {}
534 }
535 }
536 }
537
538 ScheduleMode::FixedRateNoOverlap => {
539 let start = Instant::now() + initial_delay;
540 let mut ticker = time::interval_at(start, period);
541
542 // Permission=1, controls non-overlapping execution
543 let gate = Arc::new(Semaphore::new(1));
544
545 loop {
546 tokio::select! {
547 _ = token_child.cancelled() => {
548 info!("Task {} cancelled gracefully", id);
549 break;
550 }
551 _ = ticker.tick() => {
552 // Try to acquire permission. If unable to acquire, skip the current tick.
553 if let Ok(permit) = gate.clone().try_acquire_owned() {
554 let mut task_fn = task_fn.clone();
555 let child = token_child.clone();
556 tokio::spawn(async move {
557 // Release the lock immediately after generating the future
558 let fut = {
559 (task_fn)(child)
560 };
561 if let Err(e) = fut.await {
562 error!("FixedRateNoOverlap task {} failed: {:?}", id, e);
563 }
564 drop(permit); // Release the permit after completion
565 });
566 } else {
567 info!("Task {} skipped due to overlap", id);
568 }
569 }
570 }
571 }
572 }
573 }
574 }
575 });
576
577 self.tasks.write().insert(
578 id,
579 TaskInfo {
580 cancel_token: token,
581 handle,
582 },
583 );
584
585 id
586 }
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use std::sync::atomic::AtomicUsize;
593 use std::sync::atomic::Ordering;
594 use std::sync::Arc;
595 use std::time::Duration;
596
597 use tokio::time;
598
599 use crate::schedule::simple_scheduler::*;
600
601 #[tokio::test]
602 async fn adds_task_and_increments_task_count() {
603 let manager = ScheduledTaskManager::new();
604 let task_id = manager.add_scheduled_task(
605 ScheduleMode::FixedRate,
606 Duration::from_secs(1),
607 Duration::from_secs(2),
608 |token| async move {
609 if token.is_cancelled() {
610 return Ok(());
611 }
612 Ok(())
613 },
614 );
615
616 assert_eq!(manager.task_count(), 1);
617 manager.cancel_task(task_id);
618 }
619
620 #[tokio::test]
621 async fn cancels_task_and_decrements_task_count() {
622 let manager = ScheduledTaskManager::new();
623 let task_id = manager.add_scheduled_task(
624 ScheduleMode::FixedRate,
625 Duration::from_secs(1),
626 Duration::from_secs(2),
627 |token| async move {
628 if token.is_cancelled() {
629 return Ok(());
630 }
631 Ok(())
632 },
633 );
634
635 manager.cancel_task(task_id);
636 assert_eq!(manager.task_count(), 0);
637 }
638
639 #[tokio::test]
640 async fn aborts_task_and_decrements_task_count() {
641 let manager = ScheduledTaskManager::new();
642 let task_id = manager.add_scheduled_task(
643 ScheduleMode::FixedRate,
644 Duration::from_secs(1),
645 Duration::from_secs(2),
646 |token| async move {
647 if token.is_cancelled() {
648 return Ok(());
649 }
650 Ok(())
651 },
652 );
653
654 manager.abort_task(task_id);
655 assert_eq!(manager.task_count(), 0);
656 }
657
658 #[tokio::test]
659 async fn cancels_all_tasks() {
660 let manager = ScheduledTaskManager::new();
661 for _ in 0..3 {
662 manager.add_scheduled_task(
663 ScheduleMode::FixedRate,
664 Duration::from_secs(1),
665 Duration::from_secs(2),
666 |token| async move {
667 if token.is_cancelled() {
668 return Ok(());
669 }
670 Ok(())
671 },
672 );
673 }
674
675 assert_eq!(manager.task_count(), 3);
676 manager.cancel_all();
677 assert_eq!(manager.task_count(), 0);
678 }
679
680 #[tokio::test]
681 async fn aborts_all_tasks() {
682 let manager = ScheduledTaskManager::new();
683 for _ in 0..3 {
684 manager.add_scheduled_task(
685 ScheduleMode::FixedRate,
686 Duration::from_secs(1),
687 Duration::from_secs(2),
688 |token| async move {
689 if token.is_cancelled() {
690 return Ok(());
691 }
692 Ok(())
693 },
694 );
695 }
696
697 assert_eq!(manager.task_count(), 3);
698 manager.abort_all();
699 assert_eq!(manager.task_count(), 0);
700 }
701
702 #[tokio::test]
703 async fn skips_task_execution_in_fixed_rate_no_overlap_mode() {
704 let manager = ScheduledTaskManager::new();
705 let task_id = manager.add_scheduled_task(
706 ScheduleMode::FixedRateNoOverlap,
707 Duration::from_secs(0),
708 Duration::from_millis(100),
709 |token| async move {
710 tokio::time::sleep(Duration::from_millis(200)).await;
711 if token.is_cancelled() {
712 return Ok(());
713 }
714 Ok(())
715 },
716 );
717
718 time::sleep(Duration::from_millis(400)).await;
719 manager.cancel_task(task_id);
720 assert_eq!(manager.task_count(), 0);
721 }
722
723 fn new_manager() -> ScheduledTaskManager {
724 ScheduledTaskManager::new()
725 }
726
727 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
728 async fn test_fixed_rate_task() {
729 let mgr = new_manager();
730 let counter = Arc::new(AtomicUsize::new(0));
731
732 let c = counter.clone();
733 let task_id = mgr.add_fixed_rate_task_async(
734 Duration::from_millis(50),
735 Duration::from_millis(100),
736 async move |_ctx| {
737 c.fetch_add(1, Ordering::Relaxed);
738 Ok(())
739 },
740 );
741
742 time::sleep(Duration::from_millis(500)).await;
743
744 mgr.cancel_task(task_id);
745 time::sleep(Duration::from_millis(50)).await;
746
747 let executed = counter.load(Ordering::Relaxed);
748 assert!(executed >= 3, "FixedRate executed too few times: {}", executed);
749 }
750
751 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
752 async fn test_fixed_delay_task() {
753 let mgr = new_manager();
754 let counter = Arc::new(AtomicUsize::new(0));
755
756 let c = counter.clone();
757 let task_id = mgr.add_fixed_delay_task_async(
758 Duration::from_millis(10),
759 Duration::from_millis(50),
760 async move |_ctx| {
761 c.fetch_add(1, Ordering::Relaxed);
762 Ok(())
763 },
764 );
765
766 time::sleep(Duration::from_millis(300)).await;
767
768 mgr.cancel_task(task_id);
769 time::sleep(Duration::from_millis(50)).await;
770
771 let executed = counter.load(Ordering::Relaxed);
772 assert!((3..=6).contains(&executed), "FixedDelay count unexpected: {}", executed);
773 }
774
775 #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
776 async fn test_fixed_rate_no_overlap_task() {
777 let mgr = new_manager();
778 let counter = Arc::new(AtomicUsize::new(0));
779
780 let c = counter.clone();
781 let task_id = mgr.add_fixed_rate_no_overlap_task_async(
782 Duration::from_millis(10),
783 Duration::from_millis(50),
784 async move |_ctx| {
785 time::sleep(Duration::from_millis(80)).await;
786 c.fetch_add(1, Ordering::Relaxed);
787 Ok(())
788 },
789 );
790
791 time::sleep(Duration::from_millis(400)).await;
792
793 mgr.cancel_task(task_id);
794 time::sleep(Duration::from_millis(50)).await;
795
796 let executed = counter.load(Ordering::Relaxed);
797 assert!(
798 (2..=5).contains(&executed),
799 "FixedRateNoOverlap count unexpected: {}",
800 executed
801 );
802 }
803}