kestrel_protocol_timer/
lib.rs1mod error;
54mod task;
55mod wheel;
56mod timer;
57mod service;
58
59pub use error::TimerError;
61pub use task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback};
62pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
63pub use service::TimerService;
64
65#[cfg(test)]
66mod tests {
67 use super::*;
68 use std::sync::atomic::{AtomicU32, Ordering};
69 use std::sync::Arc;
70 use std::time::Duration;
71
72 #[tokio::test]
73 async fn test_basic_timer() {
74 let timer = TimerWheel::with_defaults().unwrap();
75 let counter = Arc::new(AtomicU32::new(0));
76 let counter_clone = Arc::clone(&counter);
77
78 timer.schedule_once(
79 Duration::from_millis(50),
80 move || {
81 let counter = Arc::clone(&counter_clone);
82 async move {
83 counter.fetch_add(1, Ordering::SeqCst);
84 }
85 },
86 ).await.unwrap();
87
88 tokio::time::sleep(Duration::from_millis(100)).await;
89 assert_eq!(counter.load(Ordering::SeqCst), 1);
90 }
91
92 #[tokio::test]
93 async fn test_multiple_timers() {
94 let timer = TimerWheel::with_defaults().unwrap();
95 let counter = Arc::new(AtomicU32::new(0));
96
97 for i in 0..10 {
99 let counter_clone = Arc::clone(&counter);
100 timer.schedule_once(
101 Duration::from_millis(10 * (i + 1)),
102 move || {
103 let counter = Arc::clone(&counter_clone);
104 async move {
105 counter.fetch_add(1, Ordering::SeqCst);
106 }
107 },
108 ).await.unwrap();
109 }
110
111 tokio::time::sleep(Duration::from_millis(200)).await;
112 assert_eq!(counter.load(Ordering::SeqCst), 10);
113 }
114
115 #[tokio::test]
116 async fn test_timer_cancellation() {
117 let timer = TimerWheel::with_defaults().unwrap();
118 let counter = Arc::new(AtomicU32::new(0));
119
120 let mut handles = Vec::new();
122 for _ in 0..5 {
123 let counter_clone = Arc::clone(&counter);
124 let handle = timer.schedule_once(
125 Duration::from_millis(100),
126 move || {
127 let counter = Arc::clone(&counter_clone);
128 async move {
129 counter.fetch_add(1, Ordering::SeqCst);
130 }
131 },
132 ).await.unwrap();
133 handles.push(handle);
134 }
135
136 for i in 0..3 {
138 let cancel_result = handles[i].cancel();
139 assert!(cancel_result);
140 }
141
142 tokio::time::sleep(Duration::from_millis(200)).await;
143 assert_eq!(counter.load(Ordering::SeqCst), 2);
145 }
146
147 #[tokio::test]
148 async fn test_completion_notification_once() {
149 let timer = TimerWheel::with_defaults().unwrap();
150 let counter = Arc::new(AtomicU32::new(0));
151 let counter_clone = Arc::clone(&counter);
152
153 let handle = timer.schedule_once(
154 Duration::from_millis(50),
155 move || {
156 let counter = Arc::clone(&counter_clone);
157 async move {
158 counter.fetch_add(1, Ordering::SeqCst);
159 }
160 },
161 ).await.unwrap();
162
163 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
165
166 tokio::time::sleep(Duration::from_millis(20)).await;
168 assert_eq!(counter.load(Ordering::SeqCst), 1);
169 }
170
171 #[tokio::test]
172 async fn test_notify_only_timer_once() {
173 let timer = TimerWheel::with_defaults().unwrap();
174
175 let handle = timer.schedule_once_notify(Duration::from_millis(50)).await.unwrap();
176
177 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
179 }
180
181 #[tokio::test]
182 async fn test_batch_completion_notifications() {
183 let timer = TimerWheel::with_defaults().unwrap();
184 let counter = Arc::new(AtomicU32::new(0));
185
186 let callbacks: Vec<(Duration, _)> = (0..5)
188 .map(|i| {
189 let counter = Arc::clone(&counter);
190 let delay = Duration::from_millis(50 + i * 10);
191 let callback = move || {
192 let counter = Arc::clone(&counter);
193 async move {
194 counter.fetch_add(1, Ordering::SeqCst);
195 }
196 };
197 (delay, callback)
198 })
199 .collect();
200
201 let batch = timer.schedule_once_batch(callbacks).await.unwrap();
202 let receivers = batch.into_completion_receivers();
203
204 for rx in receivers {
206 rx.await.expect("Should receive completion notification");
207 }
208
209 tokio::time::sleep(Duration::from_millis(50)).await;
211
212 assert_eq!(counter.load(Ordering::SeqCst), 5);
214 }
215}