hiver_runtime/time.rs
1//! Timer module
2//! 定时器模块
3//!
4//! # Overview / 概述
5//!
6//! This module provides efficient timer management using a hierarchical timing wheel.
7//! The timing wheel provides O(1) insertion, deletion, and advancement.
8//!
9//! 本模块使用分层时间轮提供高效的定时器管理。
10//! 时间轮提供O(1)插入、删除和推进操作。
11//!
12//! # Timer Wheel Algorithm / 时间轮算法
13//!
14//! The timer wheel is organized as a hierarchy of wheels with different granularities:
15//! - Wheel 0: 1ms resolution, 256 slots (256ms range)
16//! - Wheel 1: 256ms resolution, 64 slots (16.384s range)
17//! - Wheel 2: 16.384s resolution, 64 slots (1048.576s range)
18//! - Wheel 3: 1048.576s resolution, 64 slots (67108.864s range)
19//!
20//! 时间轮组织为具有不同粒度的层级:
21//! - 轮0:1ms分辨率,256个槽(256ms范围)
22//! - 轮1:256ms分辨率,64个槽(16.384s范围)
23//! - 轮2:16.384s分辨率,64个槽(1048.576s范围)
24//! - 轮3:1048.576s分辨率,64个槽(67108.864s范围)
25//!
26//! # Example / 示例
27//!
28//! ```rust,no_run,ignore
29//! use hiver_runtime::time::{sleep, Duration};
30//!
31//! async fn example() {
32//! sleep(Duration::from_millis(100)).await;
33//! println!("Woke up after 100ms");
34//! }
35//! ```
36
37use std::cell::UnsafeCell;
38use std::collections::{HashMap, LinkedList};
39use std::future::Future;
40use std::pin::Pin;
41use std::sync::Mutex;
42use std::sync::OnceLock;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::task::{Context, Poll, Waker};
45
46/// Standard library duration re-export
47/// 标准库Duration重新导出
48pub use std::time::Duration;
49
50/// Standard library instant re-export
51/// 标准库Instant重新导出
52pub use std::time::Instant;
53
54/// Tick size in milliseconds (1ms)
55/// 滴答大小(毫秒)
56const TICK_MS: u64 = 1;
57
58/// Wheel 0: 1ms tick, 256 slots (256ms total)
59/// 轮0:1ms滴答,256个槽(总共256ms)
60const WHEEL0_SIZE: usize = 256;
61const WHEEL0_SHIFT: usize = 8; // 2^8 = 256
62const WHEEL0_MASK: usize = WHEEL0_SIZE - 1;
63
64/// Wheel 1: 256ms tick, 64 slots (16.384s total)
65/// 轮1:256ms滴答,64个槽(总共16.384s)
66const WHEEL1_SIZE: usize = 64;
67const WHEEL1_SHIFT: usize = 6; // 2^6 = 64
68const WHEEL1_MASK: usize = WHEEL1_SIZE - 1;
69
70/// Wheel 2: 16.384s tick, 64 slots (1048.576s total)
71/// 轮2:16.384s滴答,64个槽(总共1048.576s)
72const WHEEL2_SIZE: usize = 64;
73const WHEEL2_SHIFT: usize = 6;
74const WHEEL2_MASK: usize = WHEEL2_SIZE - 1;
75
76/// Wheel 3: 1048.576s tick, 64 slots (67108.864s total)
77/// 轮3:1048.576s滴答,64个槽(总共67108.864s)
78const WHEEL3_SIZE: usize = 64;
79#[allow(dead_code)]
80const WHEEL3_SHIFT: usize = 6;
81const WHEEL3_MASK: usize = WHEEL3_SIZE - 1;
82
83/// Maximum timeout in milliseconds (about 18.6 hours)
84/// 最大超时时间(毫秒,约18.6小时)
85#[allow(dead_code)]
86const MAX_TIMEOUT_MS: u64 =
87 (WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE * WHEEL3_SIZE) as u64 * TICK_MS;
88
89/// Timer entry in the timing wheel
90/// 时间轮中的定时器条目
91struct TimerEntry {
92 /// Unique identifier for this timer
93 /// 此定时器的唯一标识符
94 id: u64,
95 /// Expiration time in milliseconds (absolute)
96 /// 到期时间(毫秒,绝对值)
97 expiration_ms: u64,
98 /// Waker for this timer
99 /// 此定时器的waker
100 waker: Option<Waker>,
101 /// Whether this timer has been canceled
102 /// 此定时器是否已取消
103 #[allow(dead_code)]
104 canceled: Mutex<bool>,
105}
106
107unsafe impl Send for TimerEntry {}
108unsafe impl Sync for TimerEntry {}
109
110/// A timing wheel slot containing timer entries
111/// 包含定时器条目的时间轮槽
112#[derive(Debug)]
113struct TimerSlot {
114 /// List of timer entries in this slot
115 /// 此槽中的定时器条目列表
116 timers: UnsafeCell<LinkedList<TimerEntry>>,
117}
118
119impl TimerSlot {
120 fn new() -> Self {
121 Self {
122 timers: UnsafeCell::new(LinkedList::new()),
123 }
124 }
125
126 /// Add a timer to this slot
127 /// 向此槽添加定时器
128 unsafe fn push(&self, timer: TimerEntry) {
129 let list = &mut *self.timers.get();
130 list.push_back(timer);
131 }
132
133 /// Get all timers from this slot, clearing it
134 /// 从此槽获取所有定时器并清空
135 unsafe fn take_all(&self) -> LinkedList<TimerEntry> {
136 let list = &mut *self.timers.get();
137 std::mem::take(list)
138 }
139}
140
141// SAFETY: TimerSlot uses interior mutability with controlled access
142// TimerSlot使用受控访问的内部可变性
143unsafe impl Send for TimerSlot {}
144unsafe impl Sync for TimerSlot {}
145
146/// Hierarchical timing wheel for efficient timer management
147/// 用于高效定时器管理的分层时间轮
148///
149/// Uses 4 wheels with different granularities to cover a wide range of timeouts.
150/// 使用4个具有不同粒度的轮来覆盖大范围的超时。
151pub struct TimerWheel {
152 /// Current time in ticks
153 /// 当前时间(滴答)
154 current_ticks: AtomicU64,
155 /// Wheel 0 (finest granularity)
156 /// 轮0(最细粒度)
157 wheel0: Box<[TimerSlot; WHEEL0_SIZE]>,
158 /// Wheel 1
159 /// 轮1
160 wheel1: Box<[TimerSlot; WHEEL1_SIZE]>,
161 /// Wheel 2
162 /// 轮2
163 wheel2: Box<[TimerSlot; WHEEL2_SIZE]>,
164 /// Wheel 3 (coarsest granularity)
165 /// 轮3(最粗粒度)
166 wheel3: Box<[TimerSlot; WHEEL3_SIZE]>,
167 /// Next timer ID
168 /// 下一个定时器ID
169 next_id: AtomicU64,
170 /// Active timer registry for cancellation (ID -> slot index)
171 /// 活跃定时器注册表用于取消(ID -> 槽索引)
172 timer_registry: Mutex<HashMap<u64, TimerLocation>>,
173}
174
175/// Location of a timer in the wheel
176/// 定时器在轮中的位置
177#[derive(Clone, Copy, Debug)]
178struct TimerLocation {
179 /// Wheel level (0-3)
180 #[allow(dead_code)]
181 wheel_level: u8,
182 /// Slot index within the wheel
183 #[allow(dead_code)]
184 slot_index: usize,
185}
186
187// SAFETY: TimerWheel uses atomic operations and interior mutability
188// TimerWheel使用原子操作和内部可变性
189unsafe impl Send for TimerWheel {}
190unsafe impl Sync for TimerWheel {}
191
192impl TimerWheel {
193 /// Create a new timer wheel
194 /// 创建新的时间轮
195 pub fn new() -> Self {
196 Self {
197 current_ticks: AtomicU64::new(0),
198 wheel0: (0..WHEEL0_SIZE)
199 .map(|_| TimerSlot::new())
200 .collect::<Vec<_>>()
201 .into_boxed_slice()
202 .try_into()
203 .unwrap(),
204 wheel1: (0..WHEEL1_SIZE)
205 .map(|_| TimerSlot::new())
206 .collect::<Vec<_>>()
207 .into_boxed_slice()
208 .try_into()
209 .unwrap(),
210 wheel2: (0..WHEEL2_SIZE)
211 .map(|_| TimerSlot::new())
212 .collect::<Vec<_>>()
213 .into_boxed_slice()
214 .try_into()
215 .unwrap(),
216 wheel3: (0..WHEEL3_SIZE)
217 .map(|_| TimerSlot::new())
218 .collect::<Vec<_>>()
219 .into_boxed_slice()
220 .try_into()
221 .unwrap(),
222 next_id: AtomicU64::new(1),
223 timer_registry: Mutex::new(HashMap::new()),
224 }
225 }
226
227 /// Cancel a timer by ID
228 /// 通过ID取消定时器
229 pub fn cancel_timer(&self, id: u64) -> bool {
230 let mut registry = self.timer_registry.lock().unwrap();
231 if let Some(_location) = registry.remove(&id) {
232 // Timer was found and removed from registry
233 // The actual cancellation will be checked when the timer expires
234 // 定时器已找到并从注册表中移除
235 // 实际取消将在定时器到期时检查
236 true
237 } else {
238 false
239 }
240 }
241
242 /// Get the current tick count
243 /// 获取当前滴答计数
244 #[inline]
245 pub fn current_ticks(&self) -> u64 {
246 self.current_ticks.load(Ordering::Acquire)
247 }
248
249 /// Advance the timer wheel by the specified number of ticks
250 /// 将时间轮推进指定数量的滴答
251 ///
252 /// Returns the number of timers that expired during this advancement.
253 /// 返回在此推进期间到期的定时器数量。
254 pub fn advance(&self, ticks: u64) -> usize {
255 let mut expired = 0;
256 let _start = self.current_ticks.load(Ordering::Acquire);
257
258 for _ in 0..ticks {
259 let tick = self.current_ticks.fetch_add(1, Ordering::AcqRel);
260 let pos0 = (tick & WHEEL0_MASK as u64) as usize;
261
262 // Process wheel 0
263 // 处理轮0
264 unsafe {
265 let timers = self.wheel0[pos0].take_all();
266 for timer in timers {
267 // Check if timer is still in registry (not canceled)
268 // 检查定时器是否仍在注册表中(未取消)
269 let is_active = {
270 let mut registry = self.timer_registry.lock().unwrap();
271 registry.remove(&timer.id).is_some()
272 };
273
274 if is_active {
275 // Try to wake the timer
276 // 尝试唤醒定时器
277 if let Some(waker) = timer.waker {
278 waker.wake();
279 }
280 expired += 1;
281 }
282 }
283 }
284
285 // Cascade to wheel 1 every WHEEL0_SIZE ticks
286 // 每WHEEL0_SIZE个滴答级联到轮1
287 if tick & (WHEEL0_SIZE as u64 - 1) == 0 {
288 let pos1 = ((tick >> WHEEL0_SHIFT) & WHEEL1_MASK as u64) as usize;
289 unsafe {
290 let timers = self.wheel1[pos1].take_all();
291 for timer in timers {
292 // Check if timer is still in registry
293 // 检查定时器是否仍在注册表中
294 let is_active = {
295 let registry = self.timer_registry.lock().unwrap();
296 registry.contains_key(&timer.id)
297 };
298
299 if is_active {
300 // Re-insert into wheel 0
301 // 重新插入轮0
302 self.insert_timer_inner(timer);
303 }
304 }
305 }
306 }
307
308 // Cascade to wheel 2 every (WHEEL0_SIZE * WHEEL1_SIZE) ticks
309 // 每(WHEEL0_SIZE * WHEEL1_SIZE)个滴答级联到轮2
310 if tick & ((WHEEL0_SIZE * WHEEL1_SIZE) as u64 - 1) == 0 {
311 let pos2 = ((tick >> (WHEEL0_SHIFT + WHEEL1_SHIFT)) & WHEEL2_MASK as u64) as usize;
312 unsafe {
313 let timers = self.wheel2[pos2].take_all();
314 for timer in timers {
315 // Check if timer is still in registry
316 let is_active = {
317 let registry = self.timer_registry.lock().unwrap();
318 registry.contains_key(&timer.id)
319 };
320
321 if is_active {
322 self.insert_timer_inner(timer);
323 }
324 }
325 }
326 }
327
328 // Cascade to wheel 3 every (WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE) ticks
329 // 每(WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE)个滴答级联到轮3
330 if tick & ((WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE) as u64 - 1) == 0 {
331 let pos3 = ((tick >> (WHEEL0_SHIFT + WHEEL1_SHIFT + WHEEL2_SHIFT))
332 & WHEEL3_MASK as u64) as usize;
333 unsafe {
334 let timers = self.wheel3[pos3].take_all();
335 for timer in timers {
336 // Check if timer is still in registry
337 let is_active = {
338 let registry = self.timer_registry.lock().unwrap();
339 registry.contains_key(&timer.id)
340 };
341
342 if is_active {
343 self.insert_timer_inner(timer);
344 }
345 }
346 }
347 }
348 }
349
350 expired
351 }
352
353 /// Insert a timer into the wheel
354 /// 向时间轮插入定时器
355 fn insert_timer_inner(&self, timer: TimerEntry) {
356 let current = self.current_ticks.load(Ordering::Acquire);
357 let expiration = timer.expiration_ms / TICK_MS;
358 let id = timer.id;
359
360 if expiration <= current {
361 // Already expired, wake immediately
362 // 已到期,立即唤醒
363 if let Some(waker) = timer.waker {
364 waker.wake();
365 }
366 return;
367 }
368
369 // Determine wheel level and position before inserting
370 // 在插入前确定轮层级和位置
371 let ticks = expiration - current;
372 let (wheel_level, pos) = if ticks < WHEEL0_SIZE as u64 {
373 (0u8, ((current + ticks) & WHEEL0_MASK as u64) as usize)
374 } else if ticks < (WHEEL0_SIZE * WHEEL1_SIZE) as u64 {
375 (1u8, (((current + ticks) >> WHEEL0_SHIFT) & WHEEL1_MASK as u64) as usize)
376 } else if ticks < (WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE) as u64 {
377 (
378 2u8,
379 (((current + ticks) >> (WHEEL0_SHIFT + WHEEL1_SHIFT)) & WHEEL2_MASK as u64)
380 as usize,
381 )
382 } else {
383 (
384 3u8,
385 (((current + ticks) >> (WHEEL0_SHIFT + WHEEL1_SHIFT + WHEEL2_SHIFT))
386 & WHEEL3_MASK as u64) as usize,
387 )
388 };
389
390 // Add to registry before inserting into wheel
391 // 在插入到轮之前添加到注册表
392 {
393 let mut registry = self.timer_registry.lock().unwrap();
394 registry.insert(
395 id,
396 TimerLocation {
397 wheel_level,
398 slot_index: pos,
399 },
400 );
401 }
402
403 // Insert into appropriate wheel
404 // 插入到适当的轮中
405 match wheel_level {
406 0 => unsafe { self.wheel0[pos].push(timer) },
407 1 => unsafe { self.wheel1[pos].push(timer) },
408 2 => unsafe { self.wheel2[pos].push(timer) },
409 _ => unsafe { self.wheel3[pos].push(timer) },
410 }
411 }
412
413 /// Insert a timer with the specified duration
414 /// 插入具有指定持续时间的定时器
415 pub fn insert_timer(&self, duration: Duration) -> TimerHandle {
416 let duration_ms = duration.as_millis() as u64;
417 let current = self.current_ticks.load(Ordering::Acquire);
418 let expiration_ms = (current * TICK_MS) + duration_ms;
419
420 let id = self.next_id.fetch_add(1, Ordering::AcqRel);
421
422 let timer = TimerEntry {
423 id,
424 expiration_ms,
425 waker: None,
426 canceled: Mutex::new(false),
427 };
428
429 // Insert into wheel and registry
430 // 插入到轮和注册表中
431 self.insert_timer_inner(timer);
432
433 TimerHandle::new(id)
434 }
435
436 /// Insert a timer with the specified duration and associated waker
437 /// 插入具有指定持续时间和关联waker的定时器
438 pub fn insert_timer_with_waker(&self, duration: Duration, waker: Waker) -> TimerHandle {
439 let duration_ms = duration.as_millis() as u64;
440 let current = self.current_ticks.load(Ordering::Acquire);
441 let expiration_ms = (current * TICK_MS) + duration_ms;
442
443 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
444
445 let timer = TimerEntry {
446 id,
447 expiration_ms,
448 waker: Some(waker),
449 canceled: Mutex::new(false),
450 };
451
452 self.insert_timer_inner(timer);
453
454 TimerHandle::new(id)
455 }
456
457 /// Get the next timer expiration time in milliseconds
458 /// 获取下一个定时器到期时间(毫秒)
459 ///
460 /// Returns `None` if there are no active timers.
461 /// 如果没有活动定时器则返回 `None`。
462 pub fn next_expiration(&self) -> Option<u64> {
463 // This is a simplified implementation
464 // A full implementation would scan all wheels
465 // 这是简化实现,完整实现会扫描所有轮
466 None
467 }
468}
469
470impl Default for TimerWheel {
471 fn default() -> Self {
472 Self::new()
473 }
474}
475
476/// Handle to a timer
477/// 定时器句柄
478///
479/// Can be used to cancel the timer before it expires.
480/// The handle references the global timer wheel, which has static lifetime.
481/// 可用于在定时器到期前取消它。
482/// 句柄引用全局时间轮,全局时间轮具有静态生命周期。
483#[derive(Clone)]
484pub struct TimerHandle {
485 #[allow(dead_code)]
486 id: u64,
487}
488
489// SAFETY: TimerHandle only contains a u64 — no raw pointers or non-Send data.
490// 安全:TimerHandle 只包含 u64 — 没有裸指针或非 Send 数据。
491unsafe impl Send for TimerHandle {}
492
493impl TimerHandle {
494 /// Cancel this timer
495 /// 取消此定时器
496 pub fn cancel(&self) {
497 // Access the global timer wheel directly — it has static lifetime
498 // 直接访问全局时间轮 — 它具有静态生命周期
499 global_timer().cancel_timer(self.id);
500 }
501
502 /// Create a new timer handle
503 /// 创建新的定时器句柄
504 fn new(id: u64) -> Self {
505 Self { id }
506 }
507}
508
509/// Global timer wheel instance
510/// 全局时间轮实例
511static GLOBAL_TIMER: OnceLock<TimerWheel> = OnceLock::new();
512
513/// Get the global timer wheel
514/// 获取全局时间轮
515#[inline]
516pub fn global_timer() -> &'static TimerWheel {
517 GLOBAL_TIMER.get_or_init(|| TimerWheel::new())
518}
519
520/// Sleep future that completes after the specified duration
521/// 在指定持续时间后完成的sleep future
522pub struct Sleep {
523 /// Duration to sleep / 睡眠持续时间
524 duration: Duration,
525 /// Whether the timer has been registered
526 /// 定时器是否已注册
527 registered: bool,
528 /// Start time / 开始时间
529 start: Option<Instant>,
530}
531
532impl Sleep {
533 /// Create a new sleep future
534 /// 创建新的sleep future
535 pub fn new(duration: Duration) -> Self {
536 Self {
537 duration,
538 registered: false,
539 start: None,
540 }
541 }
542}
543
544impl Future for Sleep {
545 type Output = ();
546
547 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
548 if self.registered {
549 // Check if the duration has elapsed
550 // 检查持续时间是否已过
551 if let Some(start) = self.start
552 && start.elapsed() >= self.duration
553 {
554 return Poll::Ready(());
555 }
556 Poll::Pending
557 } else {
558 // First poll: register the timer
559 // 第一次轮询:注册定时器
560 self.registered = true;
561 self.start = Some(Instant::now());
562
563 // Insert timer into the global timer wheel
564 // 将定时器插入全局时间轮
565 global_timer().insert_timer_with_waker(self.duration, cx.waker().clone());
566
567 // Check if already expired
568 // 检查是否已到期
569 Poll::Pending
570 }
571 }
572}
573
574/// Sleep for the specified duration
575/// 睡眠指定持续时间
576///
577/// # Example / 示例
578///
579/// ```rust,no_run,ignore
580/// use hiver_runtime::time::{sleep, Duration};
581///
582/// async fn example() {
583/// sleep(Duration::from_millis(100)).await;
584/// println!("Woke up after 100ms");
585/// }
586/// ```
587pub fn sleep(duration: Duration) -> Sleep {
588 Sleep::new(duration)
589}
590
591/// Sleep until the specified instant
592/// 睡眠直到指定时刻
593///
594/// # Example / 示例
595///
596/// ```rust,no_run,ignore
597/// use hiver_runtime::time::{sleep_until, Instant, Duration};
598///
599/// async fn example() {
600/// let deadline = Instant::now() + Duration::from_secs(5);
601/// sleep_until(deadline).await;
602/// println!("5 seconds have passed");
603/// }
604/// ```
605pub fn sleep_until(instant: Instant) -> SleepUntil {
606 let now = Instant::now();
607 let duration = if instant > now {
608 instant.duration_since(now)
609 } else {
610 Duration::ZERO
611 };
612
613 SleepUntil {
614 sleep: sleep(duration),
615 }
616}
617
618/// Sleep until future
619/// sleep until future
620pub struct SleepUntil {
621 sleep: Sleep,
622}
623
624impl Future for SleepUntil {
625 type Output = ();
626
627 fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
628 Pin::new(&mut self.sleep).poll(_cx)
629 }
630}
631
632/// Interval timer that yields at regular intervals
633/// 以固定间隔产生的间隔定时器
634///
635/// # Example / 示例
636///
637/// ```rust,no_run,ignore
638/// use hiver_runtime::time::{interval, Duration};
639///
640/// async fn example() {
641/// let mut ticker = interval(Duration::from_secs(1));
642/// for _ in 0..5 {
643/// ticker.tick().await;
644/// println!("Tick!");
645/// }
646/// }
647/// ```
648pub fn interval(duration: Duration) -> Interval {
649 Interval {
650 duration,
651 next: Instant::now(),
652 }
653}
654
655/// Interval stream
656/// 间隔流
657pub struct Interval {
658 duration: Duration,
659 next: Instant,
660}
661
662impl Interval {
663 /// Wait for the next tick
664 /// 等待下一个滴答
665 pub async fn tick(&mut self) -> Instant {
666 let now = Instant::now();
667 if now >= self.next {
668 self.next = now + self.duration;
669 }
670
671 sleep_until(self.next).await;
672 self.next
673 }
674}
675
676#[cfg(test)]
677mod tests {
678 use super::*;
679
680 #[test]
681 fn test_timer_wheel_creation() {
682 let wheel = TimerWheel::new();
683 assert_eq!(wheel.current_ticks(), 0);
684 }
685
686 #[test]
687 fn test_timer_constants() {
688 assert_eq!(TICK_MS, 1);
689 assert_eq!(WHEEL0_SIZE, 256);
690 assert_eq!(WHEEL1_SIZE, 64);
691 assert_eq!(WHEEL2_SIZE, 64);
692 assert_eq!(WHEEL3_SIZE, 64);
693 }
694
695 #[test]
696 fn test_global_timer() {
697 let timer = global_timer();
698 assert_eq!(timer.current_ticks(), 0);
699 }
700
701 #[test]
702 fn test_max_timeout() {
703 // Maximum timeout should be about 18.6 hours
704 // 最大超时应该约18.6小时
705 assert!(MAX_TIMEOUT_MS > 60_000 * 60 * 18);
706 assert!(MAX_TIMEOUT_MS < 60_000 * 60 * 20);
707 }
708}