kestrel_protocol_timer/
lib.rs1mod config;
66mod error;
67mod task;
68mod wheel;
69mod timer;
70mod service;
71
72pub use config::{
74 BatchConfig,
75 HierarchicalWheelConfig,
76 ServiceConfig, ServiceConfigBuilder,
77 TimerConfig, TimerConfigBuilder,
78 WheelConfig, WheelConfigBuilder,
79};
80pub use error::TimerError;
81pub use task::{CallbackWrapper, CompletionNotifier, TaskCompletionReason, TaskId, TimerCallback, TimerTask};
82pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
83pub use service::TimerService;
84
85#[cfg(test)]
86mod tests {
87 use super::*;
88 use std::sync::atomic::{AtomicU32, Ordering};
89 use std::sync::Arc;
90 use std::time::Duration;
91
92 #[tokio::test]
93 async fn test_basic_timer() {
94 let timer = TimerWheel::with_defaults();
95 let counter = Arc::new(AtomicU32::new(0));
96 let counter_clone = Arc::clone(&counter);
97
98 let task = TimerWheel::create_task(
99 Duration::from_millis(50),
100 Some(CallbackWrapper::new(move || {
101 let counter = Arc::clone(&counter_clone);
102 async move {
103 counter.fetch_add(1, Ordering::SeqCst);
104 }
105 })),
106 );
107 timer.register(task);
108
109 tokio::time::sleep(Duration::from_millis(100)).await;
110 assert_eq!(counter.load(Ordering::SeqCst), 1);
111 }
112
113 #[tokio::test]
114 async fn test_multiple_timers() {
115 let timer = TimerWheel::with_defaults();
116 let counter = Arc::new(AtomicU32::new(0));
117
118 for i in 0..10 {
120 let counter_clone = Arc::clone(&counter);
121 let task = TimerWheel::create_task(
122 Duration::from_millis(10 * (i + 1)),
123 Some(CallbackWrapper::new(move || {
124 let counter = Arc::clone(&counter_clone);
125 async move {
126 counter.fetch_add(1, Ordering::SeqCst);
127 }
128 })),
129 );
130 timer.register(task);
131 }
132
133 tokio::time::sleep(Duration::from_millis(200)).await;
134 assert_eq!(counter.load(Ordering::SeqCst), 10);
135 }
136
137 #[tokio::test]
138 async fn test_timer_cancellation() {
139 let timer = TimerWheel::with_defaults();
140 let counter = Arc::new(AtomicU32::new(0));
141
142 let mut handles = Vec::new();
144 for _ in 0..5 {
145 let counter_clone = Arc::clone(&counter);
146 let task = TimerWheel::create_task(
147 Duration::from_millis(100),
148 Some(CallbackWrapper::new(move || {
149 let counter = Arc::clone(&counter_clone);
150 async move {
151 counter.fetch_add(1, Ordering::SeqCst);
152 }
153 })),
154 );
155 let handle = timer.register(task);
156 handles.push(handle);
157 }
158
159 for i in 0..3 {
161 let cancel_result = handles[i].cancel();
162 assert!(cancel_result);
163 }
164
165 tokio::time::sleep(Duration::from_millis(200)).await;
166 assert_eq!(counter.load(Ordering::SeqCst), 2);
168 }
169
170 #[tokio::test]
171 async fn test_completion_notification_once() {
172 let timer = TimerWheel::with_defaults();
173 let counter = Arc::new(AtomicU32::new(0));
174 let counter_clone = Arc::clone(&counter);
175
176 let task = TimerWheel::create_task(
177 Duration::from_millis(50),
178 Some(CallbackWrapper::new(move || {
179 let counter = Arc::clone(&counter_clone);
180 async move {
181 counter.fetch_add(1, Ordering::SeqCst);
182 }
183 })),
184 );
185 let handle = timer.register(task);
186
187 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
189
190 tokio::time::sleep(Duration::from_millis(20)).await;
192 assert_eq!(counter.load(Ordering::SeqCst), 1);
193 }
194
195 #[tokio::test]
196 async fn test_notify_only_timer_once() {
197 let timer = TimerWheel::with_defaults();
198
199 let task = TimerTask::new(Duration::from_millis(50), None);
200 let handle = timer.register(task);
201
202 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
204 }
205
206 #[tokio::test]
207 async fn test_batch_completion_notifications() {
208 let timer = TimerWheel::with_defaults();
209 let counter = Arc::new(AtomicU32::new(0));
210
211 let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
213 .map(|i| {
214 let counter = Arc::clone(&counter);
215 let delay = Duration::from_millis(50 + i * 10);
216 let callback = CallbackWrapper::new(move || {
217 let counter = Arc::clone(&counter);
218 async move {
219 counter.fetch_add(1, Ordering::SeqCst);
220 }
221 });
222 (delay, Some(callback))
223 })
224 .collect();
225
226 let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
227 let batch = timer.register_batch(tasks);
228 let receivers = batch.into_completion_receivers();
229
230 for rx in receivers {
232 rx.await.expect("Should receive completion notification");
233 }
234
235 tokio::time::sleep(Duration::from_millis(50)).await;
237
238 assert_eq!(counter.load(Ordering::SeqCst), 5);
240 }
241
242 #[tokio::test]
243 async fn test_completion_reason_expired() {
244 let timer = TimerWheel::with_defaults();
245
246 let task = TimerTask::new(Duration::from_millis(50), None);
247 let handle = timer.register(task);
248
249 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
251 assert_eq!(result, TaskCompletionReason::Expired);
252 }
253
254 #[tokio::test]
255 async fn test_completion_reason_cancelled() {
256 let timer = TimerWheel::with_defaults();
257
258 let task = TimerTask::new(Duration::from_secs(10), None);
259 let handle = timer.register(task);
260
261 let cancelled = handle.cancel();
263 assert!(cancelled);
264
265 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
267 assert_eq!(result, TaskCompletionReason::Cancelled);
268 }
269
270 #[tokio::test]
271 async fn test_batch_completion_reasons() {
272 let timer = TimerWheel::with_defaults();
273
274 let tasks: Vec<_> = (0..5)
276 .map(|_| TimerTask::new(Duration::from_secs(10), None))
277 .collect();
278
279 let batch = timer.register_batch(tasks);
280 let task_ids: Vec<_> = batch.task_ids.clone();
281 let mut receivers = batch.into_completion_receivers();
282
283 timer.cancel_batch(&task_ids[0..3]);
285
286 for rx in receivers.drain(0..3) {
288 let result = rx.await.expect("Should receive completion notification");
289 assert_eq!(result, TaskCompletionReason::Cancelled);
290 }
291
292 timer.cancel_batch(&task_ids[3..5]);
294 for rx in receivers {
295 let result = rx.await.expect("Should receive completion notification");
296 assert_eq!(result, TaskCompletionReason::Cancelled);
297 }
298 }
299}