1pub mod config;
113pub mod error;
114pub mod task;
115pub mod wheel;
116pub mod timer;
117mod service;
118pub mod utils {
119 pub mod oneshot;
120 pub mod spsc;
121 pub mod ringbuf;
122 pub(crate) mod vec;
123}
124
125pub use task::{CallbackWrapper, TaskId, TimerTask, TaskCompletion};
127pub use timer::handle::{TimerHandle, TimerHandleWithCompletion, BatchHandle, BatchHandleWithCompletion};
128pub use task::CompletionReceiver;
129pub use timer::TimerWheel;
130pub use service::{TimerService, TaskNotification};
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use std::sync::atomic::{AtomicU32, Ordering};
136 use std::sync::Arc;
137 use std::time::Duration;
138
139 #[tokio::test]
140 async fn test_basic_timer() {
141 let timer = TimerWheel::with_defaults();
142 let counter = Arc::new(AtomicU32::new(0));
143 let counter_clone = Arc::clone(&counter);
144
145 let task = TimerTask::new_oneshot(
146 Duration::from_millis(50),
147 Some(CallbackWrapper::new(move || {
148 let counter = Arc::clone(&counter_clone);
149 async move {
150 counter.fetch_add(1, Ordering::SeqCst);
151 }
152 })),
153 );
154 timer.register(task);
155
156 tokio::time::sleep(Duration::from_millis(100)).await;
157 assert_eq!(counter.load(Ordering::SeqCst), 1);
158 }
159
160 #[tokio::test]
161 async fn test_multiple_timers() {
162 let timer = TimerWheel::with_defaults();
163 let counter = Arc::new(AtomicU32::new(0));
164
165 for i in 0..10 {
167 let counter_clone = Arc::clone(&counter);
168 let task = TimerTask::new_oneshot(
169 Duration::from_millis(10 * (i + 1)),
170 Some(CallbackWrapper::new(move || {
171 let counter = Arc::clone(&counter_clone);
172 async move {
173 counter.fetch_add(1, Ordering::SeqCst);
174 }
175 })),
176 );
177 timer.register(task);
178 }
179
180 tokio::time::sleep(Duration::from_millis(200)).await;
181 assert_eq!(counter.load(Ordering::SeqCst), 10);
182 }
183
184 #[tokio::test]
185 async fn test_timer_cancellation() {
186 let timer = TimerWheel::with_defaults();
187 let counter = Arc::new(AtomicU32::new(0));
188
189 let mut handles = Vec::new();
191 for _ in 0..5 {
192 let counter_clone = Arc::clone(&counter);
193 let task = TimerTask::new_oneshot(
194 Duration::from_millis(100),
195 Some(CallbackWrapper::new(move || {
196 let counter = Arc::clone(&counter_clone);
197 async move {
198 counter.fetch_add(1, Ordering::SeqCst);
199 }
200 })),
201 );
202 let handle = timer.register(task);
203 handles.push(handle);
204 }
205
206 for i in 0..3 {
208 let cancel_result = handles[i].cancel();
209 assert!(cancel_result);
210 }
211
212 tokio::time::sleep(Duration::from_millis(200)).await;
213 assert_eq!(counter.load(Ordering::SeqCst), 2);
215 }
216
217 #[tokio::test]
218 async fn test_completion_notification_once() {
219 let timer = TimerWheel::with_defaults();
220 let counter = Arc::new(AtomicU32::new(0));
221 let counter_clone = Arc::clone(&counter);
222
223 let task = TimerTask::new_oneshot(
224 Duration::from_millis(50),
225 Some(CallbackWrapper::new(move || {
226 let counter = Arc::clone(&counter_clone);
227 async move {
228 counter.fetch_add(1, Ordering::SeqCst);
229 }
230 })),
231 );
232 let handle = timer.register(task);
233
234 let (rx, _handle) = handle.into_parts();
236 match rx {
237 task::CompletionReceiver::OneShot(receiver) => {
238 receiver.wait().await;
239 },
240 _ => panic!("Expected OneShot completion receiver"),
241 }
242
243 tokio::time::sleep(Duration::from_millis(20)).await;
245 assert_eq!(counter.load(Ordering::SeqCst), 1);
246 }
247
248 #[tokio::test]
249 async fn test_notify_only_timer_once() {
250 let timer = TimerWheel::with_defaults();
251
252 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
253 let handle = timer.register(task);
254
255 let (rx, _handle) = handle.into_parts();
257 match rx {
258 task::CompletionReceiver::OneShot(receiver) => {
259 receiver.wait().await;
260 },
261 _ => panic!("Expected OneShot completion receiver"),
262 }
263 }
264
265 #[tokio::test]
266 async fn test_batch_completion_notifications() {
267 let timer = TimerWheel::with_defaults();
268 let counter = Arc::new(AtomicU32::new(0));
269
270 let callbacks: Vec<TimerTask> = (0..5)
272 .map(|i| {
273 let counter = Arc::clone(&counter);
274 let delay = Duration::from_millis(50 + i * 10);
275 let callback = CallbackWrapper::new(move || {
276 let counter = Arc::clone(&counter);
277 async move {
278 counter.fetch_add(1, Ordering::SeqCst);
279 }
280 });
281 TimerTask::new_oneshot(delay, Some(callback))
282 })
283 .collect();
284
285 let batch = timer.register_batch(callbacks);
286 let (receivers, _batch_handle) = batch.into_parts();
287
288 for rx in receivers {
290 match rx {
291 task::CompletionReceiver::OneShot(receiver) => {
292 receiver.wait().await;
293 },
294 _ => panic!("Expected OneShot completion receiver"),
295 }
296 }
297
298 tokio::time::sleep(Duration::from_millis(50)).await;
300
301 assert_eq!(counter.load(Ordering::SeqCst), 5);
303 }
304
305 #[tokio::test]
306 async fn test_completion_reason_expired() {
307 let timer = TimerWheel::with_defaults();
308
309 let task = TimerTask::new_oneshot(Duration::from_millis(50), None);
310 let handle = timer.register(task);
311
312 let (rx, _handle) = handle.into_parts();
314 let result = match rx {
315 task::CompletionReceiver::OneShot(receiver) => {
316 receiver.wait().await
317 },
318 _ => panic!("Expected OneShot completion receiver"),
319 };
320 assert_eq!(result, TaskCompletion::Called);
321 }
322
323 #[tokio::test]
324 async fn test_completion_reason_cancelled() {
325 let timer = TimerWheel::with_defaults();
326
327 let task = TimerTask::new_oneshot(Duration::from_secs(10), None);
328 let handle = timer.register(task);
329
330 let cancelled = handle.cancel();
332 assert!(cancelled);
333
334 let (rx, _handle) = handle.into_parts();
336 let result = match rx {
337 task::CompletionReceiver::OneShot(receiver) => {
338 receiver.wait().await
339 },
340 _ => panic!("Expected OneShot completion receiver"),
341 };
342 assert_eq!(result, TaskCompletion::Cancelled);
343 }
344
345 #[tokio::test]
346 async fn test_batch_completion_reasons() {
347 let timer = TimerWheel::with_defaults();
348
349 let tasks: Vec<_> = (0..5)
351 .map(|_| TimerTask::new_oneshot(Duration::from_secs(10), None))
352 .collect();
353
354 let batch = timer.register_batch(tasks);
355 let task_ids: Vec<_> = batch.task_ids().to_vec();
356 let (mut receivers, _batch_handle) = batch.into_parts();
357
358 timer.cancel_batch(&task_ids[0..3]);
360
361 for rx in receivers.drain(0..3) {
363 let result = match rx {
364 task::CompletionReceiver::OneShot(receiver) => {
365 receiver.wait().await
366 },
367 _ => panic!("Expected OneShot completion receiver"),
368 };
369 assert_eq!(result, TaskCompletion::Cancelled);
370 }
371
372 timer.cancel_batch(&task_ids[3..5]);
374 for rx in receivers {
375 let result = match rx {
376 task::CompletionReceiver::OneShot(receiver) => {
377 receiver.wait().await
378 },
379 _ => panic!("Expected OneShot completion receiver"),
380 };
381 assert_eq!(result, TaskCompletion::Cancelled);
382 }
383 }
384}