1pub mod config;
107pub mod error;
108pub mod task;
109pub mod wheel;
110pub mod timer;
111mod service;
112
113pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletionReason};
115pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion, CompletionReceiver};
116pub use timer::TimerWheel;
117pub use service::TimerService;
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122 use std::sync::atomic::{AtomicU32, Ordering};
123 use std::sync::Arc;
124 use std::time::Duration;
125 use crate::task::TaskCompletionReason;
126
127 #[tokio::test]
128 async fn test_basic_timer() {
129 let timer = TimerWheel::with_defaults();
130 let counter = Arc::new(AtomicU32::new(0));
131 let counter_clone = Arc::clone(&counter);
132
133 let task = TimerWheel::create_task(
134 Duration::from_millis(50),
135 Some(CallbackWrapper::new(move || {
136 let counter = Arc::clone(&counter_clone);
137 async move {
138 counter.fetch_add(1, Ordering::SeqCst);
139 }
140 })),
141 );
142 timer.register(task);
143
144 tokio::time::sleep(Duration::from_millis(100)).await;
145 assert_eq!(counter.load(Ordering::SeqCst), 1);
146 }
147
148 #[tokio::test]
149 async fn test_multiple_timers() {
150 let timer = TimerWheel::with_defaults();
151 let counter = Arc::new(AtomicU32::new(0));
152
153 for i in 0..10 {
155 let counter_clone = Arc::clone(&counter);
156 let task = TimerWheel::create_task(
157 Duration::from_millis(10 * (i + 1)),
158 Some(CallbackWrapper::new(move || {
159 let counter = Arc::clone(&counter_clone);
160 async move {
161 counter.fetch_add(1, Ordering::SeqCst);
162 }
163 })),
164 );
165 timer.register(task);
166 }
167
168 tokio::time::sleep(Duration::from_millis(200)).await;
169 assert_eq!(counter.load(Ordering::SeqCst), 10);
170 }
171
172 #[tokio::test]
173 async fn test_timer_cancellation() {
174 let timer = TimerWheel::with_defaults();
175 let counter = Arc::new(AtomicU32::new(0));
176
177 let mut handles = Vec::new();
179 for _ in 0..5 {
180 let counter_clone = Arc::clone(&counter);
181 let task = TimerWheel::create_task(
182 Duration::from_millis(100),
183 Some(CallbackWrapper::new(move || {
184 let counter = Arc::clone(&counter_clone);
185 async move {
186 counter.fetch_add(1, Ordering::SeqCst);
187 }
188 })),
189 );
190 let handle = timer.register(task);
191 handles.push(handle);
192 }
193
194 for i in 0..3 {
196 let cancel_result = handles[i].cancel();
197 assert!(cancel_result);
198 }
199
200 tokio::time::sleep(Duration::from_millis(200)).await;
201 assert_eq!(counter.load(Ordering::SeqCst), 2);
203 }
204
205 #[tokio::test]
206 async fn test_completion_notification_once() {
207 let timer = TimerWheel::with_defaults();
208 let counter = Arc::new(AtomicU32::new(0));
209 let counter_clone = Arc::clone(&counter);
210
211 let task = TimerWheel::create_task(
212 Duration::from_millis(50),
213 Some(CallbackWrapper::new(move || {
214 let counter = Arc::clone(&counter_clone);
215 async move {
216 counter.fetch_add(1, Ordering::SeqCst);
217 }
218 })),
219 );
220 let handle = timer.register(task);
221
222 let (rx, _handle) = handle.into_parts();
224 rx.0.await.expect("Should receive completion notification");
225
226 tokio::time::sleep(Duration::from_millis(20)).await;
228 assert_eq!(counter.load(Ordering::SeqCst), 1);
229 }
230
231 #[tokio::test]
232 async fn test_notify_only_timer_once() {
233 let timer = TimerWheel::with_defaults();
234
235 let task = TimerTask::new(Duration::from_millis(50), None);
236 let handle = timer.register(task);
237
238 let (rx, _handle) = handle.into_parts();
240 rx.0.await.expect("Should receive completion notification");
241 }
242
243 #[tokio::test]
244 async fn test_batch_completion_notifications() {
245 let timer = TimerWheel::with_defaults();
246 let counter = Arc::new(AtomicU32::new(0));
247
248 let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
250 .map(|i| {
251 let counter = Arc::clone(&counter);
252 let delay = Duration::from_millis(50 + i * 10);
253 let callback = CallbackWrapper::new(move || {
254 let counter = Arc::clone(&counter);
255 async move {
256 counter.fetch_add(1, Ordering::SeqCst);
257 }
258 });
259 (delay, Some(callback))
260 })
261 .collect();
262
263 let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
264 let batch = timer.register_batch(tasks);
265 let (receivers, _batch_handle) = batch.into_parts();
266
267 for rx in receivers {
269 rx.await.expect("Should receive completion notification");
270 }
271
272 tokio::time::sleep(Duration::from_millis(50)).await;
274
275 assert_eq!(counter.load(Ordering::SeqCst), 5);
277 }
278
279 #[tokio::test]
280 async fn test_completion_reason_expired() {
281 let timer = TimerWheel::with_defaults();
282
283 let task = TimerTask::new(Duration::from_millis(50), None);
284 let handle = timer.register(task);
285
286 let (rx, _handle) = handle.into_parts();
288 let result = rx.0.await.expect("Should receive completion notification");
289 assert_eq!(result, TaskCompletionReason::Expired);
290 }
291
292 #[tokio::test]
293 async fn test_completion_reason_cancelled() {
294 let timer = TimerWheel::with_defaults();
295
296 let task = TimerTask::new(Duration::from_secs(10), None);
297 let handle = timer.register(task);
298
299 let cancelled = handle.cancel();
301 assert!(cancelled);
302
303 let (rx, _handle) = handle.into_parts();
305 let result = rx.0.await.expect("Should receive completion notification");
306 assert_eq!(result, TaskCompletionReason::Cancelled);
307 }
308
309 #[tokio::test]
310 async fn test_batch_completion_reasons() {
311 let timer = TimerWheel::with_defaults();
312
313 let tasks: Vec<_> = (0..5)
315 .map(|_| TimerTask::new(Duration::from_secs(10), None))
316 .collect();
317
318 let batch = timer.register_batch(tasks);
319 let task_ids: Vec<_> = batch.task_ids().to_vec();
320 let (mut receivers, _batch_handle) = batch.into_parts();
321
322 timer.cancel_batch(&task_ids[0..3]);
324
325 for rx in receivers.drain(0..3) {
327 let result = rx.await.expect("Should receive completion notification");
328 assert_eq!(result, TaskCompletionReason::Cancelled);
329 }
330
331 timer.cancel_batch(&task_ids[3..5]);
333 for rx in receivers {
334 let result = rx.await.expect("Should receive completion notification");
335 assert_eq!(result, TaskCompletionReason::Cancelled);
336 }
337 }
338}