1pub mod config;
113pub mod error;
114pub mod task;
115pub mod wheel;
116pub mod timer;
117mod service;
118pub mod utils {
119 pub(crate) mod atomic_waker;
120 pub mod oneshot;
121 pub mod spsc;
122 pub mod ringbuf;
123 pub mod notify;
124 pub(crate) mod vec;
125}
126
127pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletion};
129pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
130pub use task::CompletionReceiver;
131pub use timer::TimerWheel;
132pub use service::{TimerService, TaskNotification};
133
134#[cfg(test)]
135mod tests {
136 use super::*;
137 use std::sync::atomic::{AtomicU32, Ordering};
138 use std::sync::Arc;
139 use std::time::Duration;
140
141 #[tokio::test]
142 async fn test_basic_timer() {
143 let timer = TimerWheel::with_defaults();
144 let counter = Arc::new(AtomicU32::new(0));
145 let counter_clone = Arc::clone(&counter);
146
147 let task = TimerTask::new_oneshot(
148 Duration::from_millis(50),
149 Some(CallbackWrapper::new(move || {
150 let counter = Arc::clone(&counter_clone);
151 async move {
152 counter.fetch_add(1, Ordering::SeqCst);
153 }
154 })),
155 );
156 timer.register(task);
157
158 tokio::time::sleep(Duration::from_millis(100)).await;
159 assert_eq!(counter.load(Ordering::SeqCst), 1);
160 }
161
162 #[tokio::test]
163 async fn test_multiple_timers() {
164 let timer = TimerWheel::with_defaults();
165 let counter = Arc::new(AtomicU32::new(0));
166
167 for i in 0..10 {
169 let counter_clone = Arc::clone(&counter);
170 let task = TimerTask::new_oneshot(
171 Duration::from_millis(10 * (i + 1)),
172 Some(CallbackWrapper::new(move || {
173 let counter = Arc::clone(&counter_clone);
174 async move {
175 counter.fetch_add(1, Ordering::SeqCst);
176 }
177 })),
178 );
179 timer.register(task);
180 }
181
182 tokio::time::sleep(Duration::from_millis(200)).await;
183 assert_eq!(counter.load(Ordering::SeqCst), 10);
184 }
185
186 #[tokio::test]
187 async fn test_timer_cancellation() {
188 let timer = TimerWheel::with_defaults();
189 let counter = Arc::new(AtomicU32::new(0));
190
191 let mut handles = Vec::new();
193 for _ in 0..5 {
194 let counter_clone = Arc::clone(&counter);
195 let task = TimerTask::new_oneshot(
196 Duration::from_millis(100),
197 Some(CallbackWrapper::new(move || {
198 let counter = Arc::clone(&counter_clone);
199 async move {
200 counter.fetch_add(1, Ordering::SeqCst);
201 }
202 })),
203 );
204 let handle = timer.register(task);
205 handles.push(handle);
206 }
207
208 for i in 0..3 {
210 let cancel_result = handles[i].cancel();
211 assert!(cancel_result);
212 }
213
214 tokio::time::sleep(Duration::from_millis(200)).await;
215 assert_eq!(counter.load(Ordering::SeqCst), 2);
217 }
218
219 #[tokio::test]
220 async fn test_completion_notification_once() {
221 let timer = TimerWheel::with_defaults();
222 let counter = Arc::new(AtomicU32::new(0));
223 let counter_clone = Arc::clone(&counter);
224
225 let task = TimerTask::new_oneshot(
226 Duration::from_millis(50),
227 Some(CallbackWrapper::new(move || {
228 let counter = Arc::clone(&counter_clone);
229 async move {
230 counter.fetch_add(1, Ordering::SeqCst);
231 }
232 })),
233 );
234 let handle = timer.register(task);
235
236 let (rx, _handle) = handle.into_parts();
238 match rx {
239 task::CompletionReceiver::OneShot(receiver) => {
240 receiver.wait().await;
241 },
242 _ => panic!("Expected OneShot completion receiver"),
243 }
244
245 tokio::time::sleep(Duration::from_millis(20)).await;
247 assert_eq!(counter.load(Ordering::SeqCst), 1);
248 }
249
250 #[tokio::test]
251 async fn test_notify_only_timer_once() {
252 let timer = TimerWheel::with_defaults();
253
254 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
255 let handle = timer.register(task);
256
257 let (rx, _handle) = handle.into_parts();
259 match rx {
260 task::CompletionReceiver::OneShot(receiver) => {
261 receiver.wait().await;
262 },
263 _ => panic!("Expected OneShot completion receiver"),
264 }
265 }
266
267 #[tokio::test]
268 async fn test_batch_completion_notifications() {
269 let timer = TimerWheel::with_defaults();
270 let counter = Arc::new(AtomicU32::new(0));
271
272 let callbacks: Vec<TimerTask> = (0..5)
274 .map(|i| {
275 let counter = Arc::clone(&counter);
276 let delay = Duration::from_millis(50 + i * 10);
277 let callback = CallbackWrapper::new(move || {
278 let counter = Arc::clone(&counter);
279 async move {
280 counter.fetch_add(1, Ordering::SeqCst);
281 }
282 });
283 TimerTask::new_oneshot(delay, Some(callback))
284 })
285 .collect();
286
287 let batch = timer.register_batch(callbacks);
288 let (receivers, _batch_handle) = batch.into_parts();
289
290 for rx in receivers {
292 match rx {
293 task::CompletionReceiver::OneShot(receiver) => {
294 receiver.wait().await;
295 },
296 _ => panic!("Expected OneShot completion receiver"),
297 }
298 }
299
300 tokio::time::sleep(Duration::from_millis(50)).await;
302
303 assert_eq!(counter.load(Ordering::SeqCst), 5);
305 }
306
307 #[tokio::test]
308 async fn test_completion_reason_expired() {
309 let timer = TimerWheel::with_defaults();
310
311 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
312 let handle = timer.register(task);
313
314 let (rx, _handle) = handle.into_parts();
316 let result = match rx {
317 task::CompletionReceiver::OneShot(receiver) => {
318 receiver.wait().await
319 },
320 _ => panic!("Expected OneShot completion receiver"),
321 };
322 assert_eq!(result, TaskCompletion::Called);
323 }
324
325 #[tokio::test]
326 async fn test_completion_reason_cancelled() {
327 let timer = TimerWheel::with_defaults();
328
329 let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
330 let handle = timer.register(task);
331
332 let cancelled = handle.cancel();
334 assert!(cancelled);
335
336 let (rx, _handle) = handle.into_parts();
338 let result = match rx {
339 task::CompletionReceiver::OneShot(receiver) => {
340 receiver.wait().await
341 },
342 _ => panic!("Expected OneShot completion receiver"),
343 };
344 assert_eq!(result, TaskCompletion::Cancelled);
345 }
346
347 #[tokio::test]
348 async fn test_batch_completion_reasons() {
349 let timer = TimerWheel::with_defaults();
350
351 let tasks: Vec<_> = (0..5)
353 .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
354 .collect();
355
356 let batch = timer.register_batch(tasks);
357 let task_ids: Vec<_> = batch.task_ids().to_vec();
358 let (mut receivers, _batch_handle) = batch.into_parts();
359
360 timer.cancel_batch(&task_ids[0..3]);
362
363 for rx in receivers.drain(0..3) {
365 let result = match rx {
366 task::CompletionReceiver::OneShot(receiver) => {
367 receiver.wait().await
368 },
369 _ => panic!("Expected OneShot completion receiver"),
370 };
371 assert_eq!(result, TaskCompletion::Cancelled);
372 }
373
374 timer.cancel_batch(&task_ids[3..5]);
376 for rx in receivers {
377 let result = match rx {
378 task::CompletionReceiver::OneShot(receiver) => {
379 receiver.wait().await
380 },
381 _ => panic!("Expected OneShot completion receiver"),
382 };
383 assert_eq!(result, TaskCompletion::Cancelled);
384 }
385 }
386}