1pub mod config;
113pub mod error;
114pub mod task;
115pub mod wheel;
116pub mod timer;
117mod service;
118
119pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletion};
121pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
122pub use task::CompletionReceiver;
123pub use timer::TimerWheel;
124pub use service::{TimerService, TaskNotification};
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129 use std::sync::atomic::{AtomicU32, Ordering};
130 use std::sync::Arc;
131 use std::time::Duration;
132
133 #[tokio::test]
134 async fn test_basic_timer() {
135 let timer = TimerWheel::with_defaults();
136 let counter = Arc::new(AtomicU32::new(0));
137 let counter_clone = Arc::clone(&counter);
138
139 let task = TimerTask::new_oneshot(
140 Duration::from_millis(50),
141 Some(CallbackWrapper::new(move || {
142 let counter = Arc::clone(&counter_clone);
143 async move {
144 counter.fetch_add(1, Ordering::SeqCst);
145 }
146 })),
147 );
148 timer.register(task);
149
150 tokio::time::sleep(Duration::from_millis(100)).await;
151 assert_eq!(counter.load(Ordering::SeqCst), 1);
152 }
153
154 #[tokio::test]
155 async fn test_multiple_timers() {
156 let timer = TimerWheel::with_defaults();
157 let counter = Arc::new(AtomicU32::new(0));
158
159 for i in 0..10 {
161 let counter_clone = Arc::clone(&counter);
162 let task = TimerTask::new_oneshot(
163 Duration::from_millis(10 * (i + 1)),
164 Some(CallbackWrapper::new(move || {
165 let counter = Arc::clone(&counter_clone);
166 async move {
167 counter.fetch_add(1, Ordering::SeqCst);
168 }
169 })),
170 );
171 timer.register(task);
172 }
173
174 tokio::time::sleep(Duration::from_millis(200)).await;
175 assert_eq!(counter.load(Ordering::SeqCst), 10);
176 }
177
178 #[tokio::test]
179 async fn test_timer_cancellation() {
180 let timer = TimerWheel::with_defaults();
181 let counter = Arc::new(AtomicU32::new(0));
182
183 let mut handles = Vec::new();
185 for _ in 0..5 {
186 let counter_clone = Arc::clone(&counter);
187 let task = TimerTask::new_oneshot(
188 Duration::from_millis(100),
189 Some(CallbackWrapper::new(move || {
190 let counter = Arc::clone(&counter_clone);
191 async move {
192 counter.fetch_add(1, Ordering::SeqCst);
193 }
194 })),
195 );
196 let handle = timer.register(task);
197 handles.push(handle);
198 }
199
200 for i in 0..3 {
202 let cancel_result = handles[i].cancel();
203 assert!(cancel_result);
204 }
205
206 tokio::time::sleep(Duration::from_millis(200)).await;
207 assert_eq!(counter.load(Ordering::SeqCst), 2);
209 }
210
211 #[tokio::test]
212 async fn test_completion_notification_once() {
213 let timer = TimerWheel::with_defaults();
214 let counter = Arc::new(AtomicU32::new(0));
215 let counter_clone = Arc::clone(&counter);
216
217 let task = TimerTask::new_oneshot(
218 Duration::from_millis(50),
219 Some(CallbackWrapper::new(move || {
220 let counter = Arc::clone(&counter_clone);
221 async move {
222 counter.fetch_add(1, Ordering::SeqCst);
223 }
224 })),
225 );
226 let handle = timer.register(task);
227
228 let (rx, _handle) = handle.into_parts();
230 match rx {
231 task::CompletionReceiver::OneShot(receiver) => {
232 receiver.wait().await;
233 },
234 _ => panic!("Expected OneShot completion receiver"),
235 }
236
237 tokio::time::sleep(Duration::from_millis(20)).await;
239 assert_eq!(counter.load(Ordering::SeqCst), 1);
240 }
241
242 #[tokio::test]
243 async fn test_notify_only_timer_once() {
244 let timer = TimerWheel::with_defaults();
245
246 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
247 let handle = timer.register(task);
248
249 let (rx, _handle) = handle.into_parts();
251 match rx {
252 task::CompletionReceiver::OneShot(receiver) => {
253 receiver.wait().await;
254 },
255 _ => panic!("Expected OneShot completion receiver"),
256 }
257 }
258
259 #[tokio::test]
260 async fn test_batch_completion_notifications() {
261 let timer = TimerWheel::with_defaults();
262 let counter = Arc::new(AtomicU32::new(0));
263
264 let callbacks: Vec<TimerTask> = (0..5)
266 .map(|i| {
267 let counter = Arc::clone(&counter);
268 let delay = Duration::from_millis(50 + i * 10);
269 let callback = CallbackWrapper::new(move || {
270 let counter = Arc::clone(&counter);
271 async move {
272 counter.fetch_add(1, Ordering::SeqCst);
273 }
274 });
275 TimerTask::new_oneshot(delay, Some(callback))
276 })
277 .collect();
278
279 let batch = timer.register_batch(callbacks);
280 let (receivers, _batch_handle) = batch.into_parts();
281
282 for rx in receivers {
284 match rx {
285 task::CompletionReceiver::OneShot(receiver) => {
286 receiver.wait().await;
287 },
288 _ => panic!("Expected OneShot completion receiver"),
289 }
290 }
291
292 tokio::time::sleep(Duration::from_millis(50)).await;
294
295 assert_eq!(counter.load(Ordering::SeqCst), 5);
297 }
298
299 #[tokio::test]
300 async fn test_completion_reason_expired() {
301 let timer = TimerWheel::with_defaults();
302
303 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
304 let handle = timer.register(task);
305
306 let (rx, _handle) = handle.into_parts();
308 let result = match rx {
309 task::CompletionReceiver::OneShot(receiver) => {
310 receiver.wait().await
311 },
312 _ => panic!("Expected OneShot completion receiver"),
313 };
314 assert_eq!(result, TaskCompletion::Called);
315 }
316
317 #[tokio::test]
318 async fn test_completion_reason_cancelled() {
319 let timer = TimerWheel::with_defaults();
320
321 let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
322 let handle = timer.register(task);
323
324 let cancelled = handle.cancel();
326 assert!(cancelled);
327
328 let (rx, _handle) = handle.into_parts();
330 let result = match rx {
331 task::CompletionReceiver::OneShot(receiver) => {
332 receiver.wait().await
333 },
334 _ => panic!("Expected OneShot completion receiver"),
335 };
336 assert_eq!(result, TaskCompletion::Cancelled);
337 }
338
339 #[tokio::test]
340 async fn test_batch_completion_reasons() {
341 let timer = TimerWheel::with_defaults();
342
343 let tasks: Vec<_> = (0..5)
345 .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
346 .collect();
347
348 let batch = timer.register_batch(tasks);
349 let task_ids: Vec<_> = batch.task_ids().to_vec();
350 let (mut receivers, _batch_handle) = batch.into_parts();
351
352 timer.cancel_batch(&task_ids[0..3]);
354
355 for rx in receivers.drain(0..3) {
357 let result = match rx {
358 task::CompletionReceiver::OneShot(receiver) => {
359 receiver.wait().await
360 },
361 _ => panic!("Expected OneShot completion receiver"),
362 };
363 assert_eq!(result, TaskCompletion::Cancelled);
364 }
365
366 timer.cancel_batch(&task_ids[3..5]);
368 for rx in receivers {
369 let result = match rx {
370 task::CompletionReceiver::OneShot(receiver) => {
371 receiver.wait().await
372 },
373 _ => panic!("Expected OneShot completion receiver"),
374 };
375 assert_eq!(result, TaskCompletion::Cancelled);
376 }
377 }
378}