kestrel_protocol_timer/
lib.rs1mod config;
58mod error;
59mod task;
60mod wheel;
61mod timer;
62mod service;
63
64pub use config::{
66 BatchConfig,
67 ServiceConfig, ServiceConfigBuilder,
68 TimerConfig, TimerConfigBuilder,
69 WheelConfig, WheelConfigBuilder,
70};
71pub use error::TimerError;
72pub use task::{CallbackWrapper, CompletionNotifier, TaskCompletionReason, TaskId, TimerCallback, TimerTask};
73pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
74pub use service::TimerService;
75
76#[cfg(test)]
77mod tests {
78 use super::*;
79 use std::sync::atomic::{AtomicU32, Ordering};
80 use std::sync::Arc;
81 use std::time::Duration;
82
83 #[tokio::test]
84 async fn test_basic_timer() {
85 let timer = TimerWheel::with_defaults();
86 let counter = Arc::new(AtomicU32::new(0));
87 let counter_clone = Arc::clone(&counter);
88
89 let task = TimerWheel::create_task(
90 Duration::from_millis(50),
91 move || {
92 let counter = Arc::clone(&counter_clone);
93 async move {
94 counter.fetch_add(1, Ordering::SeqCst);
95 }
96 },
97 );
98 timer.register(task);
99
100 tokio::time::sleep(Duration::from_millis(100)).await;
101 assert_eq!(counter.load(Ordering::SeqCst), 1);
102 }
103
104 #[tokio::test]
105 async fn test_multiple_timers() {
106 let timer = TimerWheel::with_defaults();
107 let counter = Arc::new(AtomicU32::new(0));
108
109 for i in 0..10 {
111 let counter_clone = Arc::clone(&counter);
112 let task = TimerWheel::create_task(
113 Duration::from_millis(10 * (i + 1)),
114 move || {
115 let counter = Arc::clone(&counter_clone);
116 async move {
117 counter.fetch_add(1, Ordering::SeqCst);
118 }
119 },
120 );
121 timer.register(task);
122 }
123
124 tokio::time::sleep(Duration::from_millis(200)).await;
125 assert_eq!(counter.load(Ordering::SeqCst), 10);
126 }
127
128 #[tokio::test]
129 async fn test_timer_cancellation() {
130 let timer = TimerWheel::with_defaults();
131 let counter = Arc::new(AtomicU32::new(0));
132
133 let mut handles = Vec::new();
135 for _ in 0..5 {
136 let counter_clone = Arc::clone(&counter);
137 let task = TimerWheel::create_task(
138 Duration::from_millis(100),
139 move || {
140 let counter = Arc::clone(&counter_clone);
141 async move {
142 counter.fetch_add(1, Ordering::SeqCst);
143 }
144 },
145 );
146 let handle = timer.register(task);
147 handles.push(handle);
148 }
149
150 for i in 0..3 {
152 let cancel_result = handles[i].cancel();
153 assert!(cancel_result);
154 }
155
156 tokio::time::sleep(Duration::from_millis(200)).await;
157 assert_eq!(counter.load(Ordering::SeqCst), 2);
159 }
160
161 #[tokio::test]
162 async fn test_completion_notification_once() {
163 let timer = TimerWheel::with_defaults();
164 let counter = Arc::new(AtomicU32::new(0));
165 let counter_clone = Arc::clone(&counter);
166
167 let task = TimerWheel::create_task(
168 Duration::from_millis(50),
169 move || {
170 let counter = Arc::clone(&counter_clone);
171 async move {
172 counter.fetch_add(1, Ordering::SeqCst);
173 }
174 },
175 );
176 let handle = timer.register(task);
177
178 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
180
181 tokio::time::sleep(Duration::from_millis(20)).await;
183 assert_eq!(counter.load(Ordering::SeqCst), 1);
184 }
185
186 #[tokio::test]
187 async fn test_notify_only_timer_once() {
188 let timer = TimerWheel::with_defaults();
189
190 let task = TimerTask::new(Duration::from_millis(50), None);
191 let handle = timer.register(task);
192
193 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
195 }
196
197 #[tokio::test]
198 async fn test_batch_completion_notifications() {
199 let timer = TimerWheel::with_defaults();
200 let counter = Arc::new(AtomicU32::new(0));
201
202 let callbacks: Vec<(Duration, _)> = (0..5)
204 .map(|i| {
205 let counter = Arc::clone(&counter);
206 let delay = Duration::from_millis(50 + i * 10);
207 let callback = move || {
208 let counter = Arc::clone(&counter);
209 async move {
210 counter.fetch_add(1, Ordering::SeqCst);
211 }
212 };
213 (delay, callback)
214 })
215 .collect();
216
217 let tasks = TimerWheel::create_batch(callbacks);
218 let batch = timer.register_batch(tasks);
219 let receivers = batch.into_completion_receivers();
220
221 for rx in receivers {
223 rx.await.expect("Should receive completion notification");
224 }
225
226 tokio::time::sleep(Duration::from_millis(50)).await;
228
229 assert_eq!(counter.load(Ordering::SeqCst), 5);
231 }
232
233 #[tokio::test]
234 async fn test_completion_reason_expired() {
235 let timer = TimerWheel::with_defaults();
236
237 let task = TimerTask::new(Duration::from_millis(50), None);
238 let handle = timer.register(task);
239
240 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
242 assert_eq!(result, TaskCompletionReason::Expired);
243 }
244
245 #[tokio::test]
246 async fn test_completion_reason_cancelled() {
247 let timer = TimerWheel::with_defaults();
248
249 let task = TimerTask::new(Duration::from_secs(10), None);
250 let handle = timer.register(task);
251
252 let cancelled = handle.cancel();
254 assert!(cancelled);
255
256 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
258 assert_eq!(result, TaskCompletionReason::Cancelled);
259 }
260
261 #[tokio::test]
262 async fn test_batch_completion_reasons() {
263 let timer = TimerWheel::with_defaults();
264
265 let tasks: Vec<_> = (0..5)
267 .map(|_| TimerTask::new(Duration::from_secs(10), None))
268 .collect();
269
270 let batch = timer.register_batch(tasks);
271 let task_ids: Vec<_> = batch.task_ids.clone();
272 let mut receivers = batch.into_completion_receivers();
273
274 timer.cancel_batch(&task_ids[0..3]);
276
277 for rx in receivers.drain(0..3) {
279 let result = rx.await.expect("Should receive completion notification");
280 assert_eq!(result, TaskCompletionReason::Cancelled);
281 }
282
283 timer.cancel_batch(&task_ids[3..5]);
285 for rx in receivers {
286 let result = rx.await.expect("Should receive completion notification");
287 assert_eq!(result, TaskCompletionReason::Cancelled);
288 }
289 }
290}