1pub mod config;
106pub mod error;
107pub mod task;
108pub mod wheel;
109pub mod timer;
110mod service;
111
112pub use task::{CallbackWrapper, TaskId, TimerTask};
114pub use timer::{TimerWheel};
115pub use service::TimerService;
116
117#[cfg(test)]
118mod tests {
119 use super::*;
120 use std::sync::atomic::{AtomicU32, Ordering};
121 use std::sync::Arc;
122 use std::time::Duration;
123 use crate::task::TaskCompletionReason;
124
125 #[tokio::test]
126 async fn test_basic_timer() {
127 let timer = TimerWheel::with_defaults();
128 let counter = Arc::new(AtomicU32::new(0));
129 let counter_clone = Arc::clone(&counter);
130
131 let task = TimerWheel::create_task(
132 Duration::from_millis(50),
133 Some(CallbackWrapper::new(move || {
134 let counter = Arc::clone(&counter_clone);
135 async move {
136 counter.fetch_add(1, Ordering::SeqCst);
137 }
138 })),
139 );
140 timer.register(task);
141
142 tokio::time::sleep(Duration::from_millis(100)).await;
143 assert_eq!(counter.load(Ordering::SeqCst), 1);
144 }
145
146 #[tokio::test]
147 async fn test_multiple_timers() {
148 let timer = TimerWheel::with_defaults();
149 let counter = Arc::new(AtomicU32::new(0));
150
151 for i in 0..10 {
153 let counter_clone = Arc::clone(&counter);
154 let task = TimerWheel::create_task(
155 Duration::from_millis(10 * (i + 1)),
156 Some(CallbackWrapper::new(move || {
157 let counter = Arc::clone(&counter_clone);
158 async move {
159 counter.fetch_add(1, Ordering::SeqCst);
160 }
161 })),
162 );
163 timer.register(task);
164 }
165
166 tokio::time::sleep(Duration::from_millis(200)).await;
167 assert_eq!(counter.load(Ordering::SeqCst), 10);
168 }
169
170 #[tokio::test]
171 async fn test_timer_cancellation() {
172 let timer = TimerWheel::with_defaults();
173 let counter = Arc::new(AtomicU32::new(0));
174
175 let mut handles = Vec::new();
177 for _ in 0..5 {
178 let counter_clone = Arc::clone(&counter);
179 let task = TimerWheel::create_task(
180 Duration::from_millis(100),
181 Some(CallbackWrapper::new(move || {
182 let counter = Arc::clone(&counter_clone);
183 async move {
184 counter.fetch_add(1, Ordering::SeqCst);
185 }
186 })),
187 );
188 let handle = timer.register(task);
189 handles.push(handle);
190 }
191
192 for i in 0..3 {
194 let cancel_result = handles[i].cancel();
195 assert!(cancel_result);
196 }
197
198 tokio::time::sleep(Duration::from_millis(200)).await;
199 assert_eq!(counter.load(Ordering::SeqCst), 2);
201 }
202
203 #[tokio::test]
204 async fn test_completion_notification_once() {
205 let timer = TimerWheel::with_defaults();
206 let counter = Arc::new(AtomicU32::new(0));
207 let counter_clone = Arc::clone(&counter);
208
209 let task = TimerWheel::create_task(
210 Duration::from_millis(50),
211 Some(CallbackWrapper::new(move || {
212 let counter = Arc::clone(&counter_clone);
213 async move {
214 counter.fetch_add(1, Ordering::SeqCst);
215 }
216 })),
217 );
218 let handle = timer.register(task);
219
220 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
222
223 tokio::time::sleep(Duration::from_millis(20)).await;
225 assert_eq!(counter.load(Ordering::SeqCst), 1);
226 }
227
228 #[tokio::test]
229 async fn test_notify_only_timer_once() {
230 let timer = TimerWheel::with_defaults();
231
232 let task = TimerTask::new(Duration::from_millis(50), None);
233 let handle = timer.register(task);
234
235 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
237 }
238
239 #[tokio::test]
240 async fn test_batch_completion_notifications() {
241 let timer = TimerWheel::with_defaults();
242 let counter = Arc::new(AtomicU32::new(0));
243
244 let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
246 .map(|i| {
247 let counter = Arc::clone(&counter);
248 let delay = Duration::from_millis(50 + i * 10);
249 let callback = CallbackWrapper::new(move || {
250 let counter = Arc::clone(&counter);
251 async move {
252 counter.fetch_add(1, Ordering::SeqCst);
253 }
254 });
255 (delay, Some(callback))
256 })
257 .collect();
258
259 let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
260 let batch = timer.register_batch(tasks);
261 let receivers = batch.into_completion_receivers();
262
263 for rx in receivers {
265 rx.await.expect("Should receive completion notification");
266 }
267
268 tokio::time::sleep(Duration::from_millis(50)).await;
270
271 assert_eq!(counter.load(Ordering::SeqCst), 5);
273 }
274
275 #[tokio::test]
276 async fn test_completion_reason_expired() {
277 let timer = TimerWheel::with_defaults();
278
279 let task = TimerTask::new(Duration::from_millis(50), None);
280 let handle = timer.register(task);
281
282 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
284 assert_eq!(result, TaskCompletionReason::Expired);
285 }
286
287 #[tokio::test]
288 async fn test_completion_reason_cancelled() {
289 let timer = TimerWheel::with_defaults();
290
291 let task = TimerTask::new(Duration::from_secs(10), None);
292 let handle = timer.register(task);
293
294 let cancelled = handle.cancel();
296 assert!(cancelled);
297
298 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
300 assert_eq!(result, TaskCompletionReason::Cancelled);
301 }
302
303 #[tokio::test]
304 async fn test_batch_completion_reasons() {
305 let timer = TimerWheel::with_defaults();
306
307 let tasks: Vec<_> = (0..5)
309 .map(|_| TimerTask::new(Duration::from_secs(10), None))
310 .collect();
311
312 let batch = timer.register_batch(tasks);
313 let task_ids: Vec<_> = batch.task_ids.clone();
314 let mut receivers = batch.into_completion_receivers();
315
316 timer.cancel_batch(&task_ids[0..3]);
318
319 for rx in receivers.drain(0..3) {
321 let result = rx.await.expect("Should receive completion notification");
322 assert_eq!(result, TaskCompletionReason::Cancelled);
323 }
324
325 timer.cancel_batch(&task_ids[3..5]);
327 for rx in receivers {
328 let result = rx.await.expect("Should receive completion notification");
329 assert_eq!(result, TaskCompletionReason::Cancelled);
330 }
331 }
332}