kestrel_protocol_timer/
lib.rs1mod config;
54mod error;
55mod task;
56mod wheel;
57mod timer;
58mod service;
59
60pub use config::{
62 BatchConfig,
63 ServiceConfig, ServiceConfigBuilder,
64 TimerConfig, TimerConfigBuilder,
65 WheelConfig, WheelConfigBuilder,
66};
67pub use error::TimerError;
68pub use task::{CallbackWrapper, CompletionNotifier, TaskId, TimerCallback};
69pub use timer::{BatchHandle, BatchHandleIter, CompletionReceiver, TimerHandle, TimerWheel};
70pub use service::TimerService;
71
72#[cfg(test)]
73mod tests {
74 use super::*;
75 use std::sync::atomic::{AtomicU32, Ordering};
76 use std::sync::Arc;
77 use std::time::Duration;
78
79 #[tokio::test]
80 async fn test_basic_timer() {
81 let timer = TimerWheel::with_defaults();
82 let counter = Arc::new(AtomicU32::new(0));
83 let counter_clone = Arc::clone(&counter);
84
85 timer.schedule_once(
86 Duration::from_millis(50),
87 move || {
88 let counter = Arc::clone(&counter_clone);
89 async move {
90 counter.fetch_add(1, Ordering::SeqCst);
91 }
92 },
93 ).await;
94
95 tokio::time::sleep(Duration::from_millis(100)).await;
96 assert_eq!(counter.load(Ordering::SeqCst), 1);
97 }
98
99 #[tokio::test]
100 async fn test_multiple_timers() {
101 let timer = TimerWheel::with_defaults();
102 let counter = Arc::new(AtomicU32::new(0));
103
104 for i in 0..10 {
106 let counter_clone = Arc::clone(&counter);
107 timer.schedule_once(
108 Duration::from_millis(10 * (i + 1)),
109 move || {
110 let counter = Arc::clone(&counter_clone);
111 async move {
112 counter.fetch_add(1, Ordering::SeqCst);
113 }
114 },
115 ).await;
116 }
117
118 tokio::time::sleep(Duration::from_millis(200)).await;
119 assert_eq!(counter.load(Ordering::SeqCst), 10);
120 }
121
122 #[tokio::test]
123 async fn test_timer_cancellation() {
124 let timer = TimerWheel::with_defaults();
125 let counter = Arc::new(AtomicU32::new(0));
126
127 let mut handles = Vec::new();
129 for _ in 0..5 {
130 let counter_clone = Arc::clone(&counter);
131 let handle = timer.schedule_once(
132 Duration::from_millis(100),
133 move || {
134 let counter = Arc::clone(&counter_clone);
135 async move {
136 counter.fetch_add(1, Ordering::SeqCst);
137 }
138 },
139 ).await;
140 handles.push(handle);
141 }
142
143 for i in 0..3 {
145 let cancel_result = handles[i].cancel();
146 assert!(cancel_result);
147 }
148
149 tokio::time::sleep(Duration::from_millis(200)).await;
150 assert_eq!(counter.load(Ordering::SeqCst), 2);
152 }
153
154 #[tokio::test]
155 async fn test_completion_notification_once() {
156 let timer = TimerWheel::with_defaults();
157 let counter = Arc::new(AtomicU32::new(0));
158 let counter_clone = Arc::clone(&counter);
159
160 let handle = timer.schedule_once(
161 Duration::from_millis(50),
162 move || {
163 let counter = Arc::clone(&counter_clone);
164 async move {
165 counter.fetch_add(1, Ordering::SeqCst);
166 }
167 },
168 ).await;
169
170 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
172
173 tokio::time::sleep(Duration::from_millis(20)).await;
175 assert_eq!(counter.load(Ordering::SeqCst), 1);
176 }
177
178 #[tokio::test]
179 async fn test_notify_only_timer_once() {
180 let timer = TimerWheel::with_defaults();
181
182 let handle = timer.schedule_once_notify(Duration::from_millis(50)).await;
183
184 handle.into_completion_receiver().0.await.expect("Should receive completion notification");
186 }
187
188 #[tokio::test]
189 async fn test_batch_completion_notifications() {
190 let timer = TimerWheel::with_defaults();
191 let counter = Arc::new(AtomicU32::new(0));
192
193 let callbacks: Vec<(Duration, _)> = (0..5)
195 .map(|i| {
196 let counter = Arc::clone(&counter);
197 let delay = Duration::from_millis(50 + i * 10);
198 let callback = move || {
199 let counter = Arc::clone(&counter);
200 async move {
201 counter.fetch_add(1, Ordering::SeqCst);
202 }
203 };
204 (delay, callback)
205 })
206 .collect();
207
208 let batch = timer.schedule_once_batch(callbacks).await;
209 let receivers = batch.into_completion_receivers();
210
211 for rx in receivers {
213 rx.await.expect("Should receive completion notification");
214 }
215
216 tokio::time::sleep(Duration::from_millis(50)).await;
218
219 assert_eq!(counter.load(Ordering::SeqCst), 5);
221 }
222}