kestrel_protocol_timer/
lib.rs1mod config;
60mod error;
61mod task;
62mod wheel;
63mod timer;
64mod service;
65
66pub use config::{
68 BatchConfig,
69 ServiceConfig, ServiceConfigBuilder,
70 TimerConfig, TimerConfigBuilder,
71 WheelConfig, WheelConfigBuilder,
72};
73pub use error::TimerError;
74pub use task::{CallbackWrapper, CompletionNotifier, TaskCompletionReason, TaskId, TimerCallback, TimerTask};
75pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
76pub use service::TimerService;
77
78#[cfg(test)]
79mod tests {
80 use super::*;
81 use std::sync::atomic::{AtomicU32, Ordering};
82 use std::sync::Arc;
83 use std::time::Duration;
84
85 #[tokio::test]
86 async fn test_basic_timer() {
87 let timer = TimerWheel::with_defaults();
88 let counter = Arc::new(AtomicU32::new(0));
89 let counter_clone = Arc::clone(&counter);
90
91 let task = TimerWheel::create_task(
92 Duration::from_millis(50),
93 Some(CallbackWrapper::new(move || {
94 let counter = Arc::clone(&counter_clone);
95 async move {
96 counter.fetch_add(1, Ordering::SeqCst);
97 }
98 })),
99 );
100 timer.register(task);
101
102 tokio::time::sleep(Duration::from_millis(100)).await;
103 assert_eq!(counter.load(Ordering::SeqCst), 1);
104 }
105
106 #[tokio::test]
107 async fn test_multiple_timers() {
108 let timer = TimerWheel::with_defaults();
109 let counter = Arc::new(AtomicU32::new(0));
110
111 for i in 0..10 {
113 let counter_clone = Arc::clone(&counter);
114 let task = TimerWheel::create_task(
115 Duration::from_millis(10 * (i + 1)),
116 Some(CallbackWrapper::new(move || {
117 let counter = Arc::clone(&counter_clone);
118 async move {
119 counter.fetch_add(1, Ordering::SeqCst);
120 }
121 })),
122 );
123 timer.register(task);
124 }
125
126 tokio::time::sleep(Duration::from_millis(200)).await;
127 assert_eq!(counter.load(Ordering::SeqCst), 10);
128 }
129
130 #[tokio::test]
131 async fn test_timer_cancellation() {
132 let timer = TimerWheel::with_defaults();
133 let counter = Arc::new(AtomicU32::new(0));
134
135 let mut handles = Vec::new();
137 for _ in 0..5 {
138 let counter_clone = Arc::clone(&counter);
139 let task = TimerWheel::create_task(
140 Duration::from_millis(100),
141 Some(CallbackWrapper::new(move || {
142 let counter = Arc::clone(&counter_clone);
143 async move {
144 counter.fetch_add(1, Ordering::SeqCst);
145 }
146 })),
147 );
148 let handle = timer.register(task);
149 handles.push(handle);
150 }
151
152 for i in 0..3 {
154 let cancel_result = handles[i].cancel();
155 assert!(cancel_result);
156 }
157
158 tokio::time::sleep(Duration::from_millis(200)).await;
159 assert_eq!(counter.load(Ordering::SeqCst), 2);
161 }
162
163 #[tokio::test]
164 async fn test_completion_notification_once() {
165 let timer = TimerWheel::with_defaults();
166 let counter = Arc::new(AtomicU32::new(0));
167 let counter_clone = Arc::clone(&counter);
168
169 let task = TimerWheel::create_task(
170 Duration::from_millis(50),
171 Some(CallbackWrapper::new(move || {
172 let counter = Arc::clone(&counter_clone);
173 async move {
174 counter.fetch_add(1, Ordering::SeqCst);
175 }
176 })),
177 );
178 let handle = timer.register(task);
179
180 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
182
183 tokio::time::sleep(Duration::from_millis(20)).await;
185 assert_eq!(counter.load(Ordering::SeqCst), 1);
186 }
187
188 #[tokio::test]
189 async fn test_notify_only_timer_once() {
190 let timer = TimerWheel::with_defaults();
191
192 let task = TimerTask::new(Duration::from_millis(50), None);
193 let handle = timer.register(task);
194
195 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
197 }
198
199 #[tokio::test]
200 async fn test_batch_completion_notifications() {
201 let timer = TimerWheel::with_defaults();
202 let counter = Arc::new(AtomicU32::new(0));
203
204 let callbacks: Vec<(Duration, Option<CallbackWrapper>)> = (0..5)
206 .map(|i| {
207 let counter = Arc::clone(&counter);
208 let delay = Duration::from_millis(50 + i * 10);
209 let callback = CallbackWrapper::new(move || {
210 let counter = Arc::clone(&counter);
211 async move {
212 counter.fetch_add(1, Ordering::SeqCst);
213 }
214 });
215 (delay, Some(callback))
216 })
217 .collect();
218
219 let tasks = TimerWheel::create_batch_with_callbacks(callbacks);
220 let batch = timer.register_batch(tasks);
221 let receivers = batch.into_completion_receivers();
222
223 for rx in receivers {
225 rx.await.expect("Should receive completion notification");
226 }
227
228 tokio::time::sleep(Duration::from_millis(50)).await;
230
231 assert_eq!(counter.load(Ordering::SeqCst), 5);
233 }
234
235 #[tokio::test]
236 async fn test_completion_reason_expired() {
237 let timer = TimerWheel::with_defaults();
238
239 let task = TimerTask::new(Duration::from_millis(50), None);
240 let handle = timer.register(task);
241
242 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
244 assert_eq!(result, TaskCompletionReason::Expired);
245 }
246
247 #[tokio::test]
248 async fn test_completion_reason_cancelled() {
249 let timer = TimerWheel::with_defaults();
250
251 let task = TimerTask::new(Duration::from_secs(10), None);
252 let handle = timer.register(task);
253
254 let cancelled = handle.cancel();
256 assert!(cancelled);
257
258 let result = handle.into_completion_receiver().0.await.expect("Should receive completion notification");
260 assert_eq!(result, TaskCompletionReason::Cancelled);
261 }
262
263 #[tokio::test]
264 async fn test_batch_completion_reasons() {
265 let timer = TimerWheel::with_defaults();
266
267 let tasks: Vec<_> = (0..5)
269 .map(|_| TimerTask::new(Duration::from_secs(10), None))
270 .collect();
271
272 let batch = timer.register_batch(tasks);
273 let task_ids: Vec<_> = batch.task_ids.clone();
274 let mut receivers = batch.into_completion_receivers();
275
276 timer.cancel_batch(&task_ids[0..3]);
278
279 for rx in receivers.drain(0..3) {
281 let result = rx.await.expect("Should receive completion notification");
282 assert_eq!(result, TaskCompletionReason::Cancelled);
283 }
284
285 timer.cancel_batch(&task_ids[3..5]);
287 for rx in receivers {
288 let result = rx.await.expect("Should receive completion notification");
289 assert_eq!(result, TaskCompletionReason::Cancelled);
290 }
291 }
292}