1pub mod config;
113pub mod error;
114pub mod task;
115pub mod wheel;
116pub mod timer;
117mod service;
118pub mod utils {
119 pub mod oneshot;
120 pub mod spsc;
121 pub mod ringbuf;
122 pub mod notify;
123 pub(crate) mod vec;
124}
125
126pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletion};
128pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
129pub use task::CompletionReceiver;
130pub use timer::TimerWheel;
131pub use service::{TimerService, TaskNotification};
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136 use std::sync::atomic::{AtomicU32, Ordering};
137 use std::sync::Arc;
138 use std::time::Duration;
139
140 #[tokio::test]
141 async fn test_basic_timer() {
142 let timer = TimerWheel::with_defaults();
143 let counter = Arc::new(AtomicU32::new(0));
144 let counter_clone = Arc::clone(&counter);
145
146 let task = TimerTask::new_oneshot(
147 Duration::from_millis(50),
148 Some(CallbackWrapper::new(move || {
149 let counter = Arc::clone(&counter_clone);
150 async move {
151 counter.fetch_add(1, Ordering::SeqCst);
152 }
153 })),
154 );
155 timer.register(task);
156
157 tokio::time::sleep(Duration::from_millis(100)).await;
158 assert_eq!(counter.load(Ordering::SeqCst), 1);
159 }
160
161 #[tokio::test]
162 async fn test_multiple_timers() {
163 let timer = TimerWheel::with_defaults();
164 let counter = Arc::new(AtomicU32::new(0));
165
166 for i in 0..10 {
168 let counter_clone = Arc::clone(&counter);
169 let task = TimerTask::new_oneshot(
170 Duration::from_millis(10 * (i + 1)),
171 Some(CallbackWrapper::new(move || {
172 let counter = Arc::clone(&counter_clone);
173 async move {
174 counter.fetch_add(1, Ordering::SeqCst);
175 }
176 })),
177 );
178 timer.register(task);
179 }
180
181 tokio::time::sleep(Duration::from_millis(200)).await;
182 assert_eq!(counter.load(Ordering::SeqCst), 10);
183 }
184
185 #[tokio::test]
186 async fn test_timer_cancellation() {
187 let timer = TimerWheel::with_defaults();
188 let counter = Arc::new(AtomicU32::new(0));
189
190 let mut handles = Vec::new();
192 for _ in 0..5 {
193 let counter_clone = Arc::clone(&counter);
194 let task = TimerTask::new_oneshot(
195 Duration::from_millis(100),
196 Some(CallbackWrapper::new(move || {
197 let counter = Arc::clone(&counter_clone);
198 async move {
199 counter.fetch_add(1, Ordering::SeqCst);
200 }
201 })),
202 );
203 let handle = timer.register(task);
204 handles.push(handle);
205 }
206
207 for i in 0..3 {
209 let cancel_result = handles[i].cancel();
210 assert!(cancel_result);
211 }
212
213 tokio::time::sleep(Duration::from_millis(200)).await;
214 assert_eq!(counter.load(Ordering::SeqCst), 2);
216 }
217
218 #[tokio::test]
219 async fn test_completion_notification_once() {
220 let timer = TimerWheel::with_defaults();
221 let counter = Arc::new(AtomicU32::new(0));
222 let counter_clone = Arc::clone(&counter);
223
224 let task = TimerTask::new_oneshot(
225 Duration::from_millis(50),
226 Some(CallbackWrapper::new(move || {
227 let counter = Arc::clone(&counter_clone);
228 async move {
229 counter.fetch_add(1, Ordering::SeqCst);
230 }
231 })),
232 );
233 let handle = timer.register(task);
234
235 let (rx, _handle) = handle.into_parts();
237 match rx {
238 task::CompletionReceiver::OneShot(receiver) => {
239 receiver.wait().await;
240 },
241 _ => panic!("Expected OneShot completion receiver"),
242 }
243
244 tokio::time::sleep(Duration::from_millis(20)).await;
246 assert_eq!(counter.load(Ordering::SeqCst), 1);
247 }
248
249 #[tokio::test]
250 async fn test_notify_only_timer_once() {
251 let timer = TimerWheel::with_defaults();
252
253 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
254 let handle = timer.register(task);
255
256 let (rx, _handle) = handle.into_parts();
258 match rx {
259 task::CompletionReceiver::OneShot(receiver) => {
260 receiver.wait().await;
261 },
262 _ => panic!("Expected OneShot completion receiver"),
263 }
264 }
265
266 #[tokio::test]
267 async fn test_batch_completion_notifications() {
268 let timer = TimerWheel::with_defaults();
269 let counter = Arc::new(AtomicU32::new(0));
270
271 let callbacks: Vec<TimerTask> = (0..5)
273 .map(|i| {
274 let counter = Arc::clone(&counter);
275 let delay = Duration::from_millis(50 + i * 10);
276 let callback = CallbackWrapper::new(move || {
277 let counter = Arc::clone(&counter);
278 async move {
279 counter.fetch_add(1, Ordering::SeqCst);
280 }
281 });
282 TimerTask::new_oneshot(delay, Some(callback))
283 })
284 .collect();
285
286 let batch = timer.register_batch(callbacks);
287 let (receivers, _batch_handle) = batch.into_parts();
288
289 for rx in receivers {
291 match rx {
292 task::CompletionReceiver::OneShot(receiver) => {
293 receiver.wait().await;
294 },
295 _ => panic!("Expected OneShot completion receiver"),
296 }
297 }
298
299 tokio::time::sleep(Duration::from_millis(50)).await;
301
302 assert_eq!(counter.load(Ordering::SeqCst), 5);
304 }
305
306 #[tokio::test]
307 async fn test_completion_reason_expired() {
308 let timer = TimerWheel::with_defaults();
309
310 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
311 let handle = timer.register(task);
312
313 let (rx, _handle) = handle.into_parts();
315 let result = match rx {
316 task::CompletionReceiver::OneShot(receiver) => {
317 receiver.wait().await
318 },
319 _ => panic!("Expected OneShot completion receiver"),
320 };
321 assert_eq!(result, TaskCompletion::Called);
322 }
323
324 #[tokio::test]
325 async fn test_completion_reason_cancelled() {
326 let timer = TimerWheel::with_defaults();
327
328 let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
329 let handle = timer.register(task);
330
331 let cancelled = handle.cancel();
333 assert!(cancelled);
334
335 let (rx, _handle) = handle.into_parts();
337 let result = match rx {
338 task::CompletionReceiver::OneShot(receiver) => {
339 receiver.wait().await
340 },
341 _ => panic!("Expected OneShot completion receiver"),
342 };
343 assert_eq!(result, TaskCompletion::Cancelled);
344 }
345
346 #[tokio::test]
347 async fn test_batch_completion_reasons() {
348 let timer = TimerWheel::with_defaults();
349
350 let tasks: Vec<_> = (0..5)
352 .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
353 .collect();
354
355 let batch = timer.register_batch(tasks);
356 let task_ids: Vec<_> = batch.task_ids().to_vec();
357 let (mut receivers, _batch_handle) = batch.into_parts();
358
359 timer.cancel_batch(&task_ids[0..3]);
361
362 for rx in receivers.drain(0..3) {
364 let result = match rx {
365 task::CompletionReceiver::OneShot(receiver) => {
366 receiver.wait().await
367 },
368 _ => panic!("Expected OneShot completion receiver"),
369 };
370 assert_eq!(result, TaskCompletion::Cancelled);
371 }
372
373 timer.cancel_batch(&task_ids[3..5]);
375 for rx in receivers {
376 let result = match rx {
377 task::CompletionReceiver::OneShot(receiver) => {
378 receiver.wait().await
379 },
380 _ => panic!("Expected OneShot completion receiver"),
381 };
382 assert_eq!(result, TaskCompletion::Cancelled);
383 }
384 }
385}