Skip to main content

hiver_runtime/
time.rs

1//! Timer module
2//! 定时器模块
3//!
4//! # Overview / 概述
5//!
6//! This module provides efficient timer management using a hierarchical timing wheel.
7//! The timing wheel provides O(1) insertion, deletion, and advancement.
8//!
9//! 本模块使用分层时间轮提供高效的定时器管理。
10//! 时间轮提供O(1)插入、删除和推进操作。
11//!
12//! # Timer Wheel Algorithm / 时间轮算法
13//!
14//! The timer wheel is organized as a hierarchy of wheels with different granularities:
15//! - Wheel 0: 1ms resolution, 256 slots (256ms range)
16//! - Wheel 1: 256ms resolution, 64 slots (16.384s range)
17//! - Wheel 2: 16.384s resolution, 64 slots (1048.576s range)
18//! - Wheel 3: 1048.576s resolution, 64 slots (67108.864s range)
19//!
20//! 时间轮组织为具有不同粒度的层级:
21//! - 轮0:1ms分辨率,256个槽(256ms范围)
22//! - 轮1:256ms分辨率,64个槽(16.384s范围)
23//! - 轮2:16.384s分辨率,64个槽(1048.576s范围)
24//! - 轮3:1048.576s分辨率,64个槽(67108.864s范围)
25//!
26//! # Example / 示例
27//!
28//! ```rust,no_run,ignore
29//! use hiver_runtime::time::{sleep, Duration};
30//!
31//! async fn example() {
32//!     sleep(Duration::from_millis(100)).await;
33//!     println!("Woke up after 100ms");
34//! }
35//! ```
36
37use std::cell::UnsafeCell;
38use std::collections::{HashMap, LinkedList};
39use std::future::Future;
40use std::pin::Pin;
41use std::sync::Mutex;
42use std::sync::OnceLock;
43use std::sync::atomic::{AtomicU64, Ordering};
44use std::task::{Context, Poll, Waker};
45
46/// Standard library duration re-export
47/// 标准库Duration重新导出
48pub use std::time::Duration;
49
50/// Standard library instant re-export
51/// 标准库Instant重新导出
52pub use std::time::Instant;
53
54/// Tick size in milliseconds (1ms)
55/// 滴答大小(毫秒)
56const TICK_MS: u64 = 1;
57
58/// Wheel 0: 1ms tick, 256 slots (256ms total)
59/// 轮0:1ms滴答,256个槽(总共256ms)
60const WHEEL0_SIZE: usize = 256;
61const WHEEL0_SHIFT: usize = 8; // 2^8 = 256
62const WHEEL0_MASK: usize = WHEEL0_SIZE - 1;
63
64/// Wheel 1: 256ms tick, 64 slots (16.384s total)
65/// 轮1:256ms滴答,64个槽(总共16.384s)
66const WHEEL1_SIZE: usize = 64;
67const WHEEL1_SHIFT: usize = 6; // 2^6 = 64
68const WHEEL1_MASK: usize = WHEEL1_SIZE - 1;
69
70/// Wheel 2: 16.384s tick, 64 slots (1048.576s total)
71/// 轮2:16.384s滴答,64个槽(总共1048.576s)
72const WHEEL2_SIZE: usize = 64;
73const WHEEL2_SHIFT: usize = 6;
74const WHEEL2_MASK: usize = WHEEL2_SIZE - 1;
75
76/// Wheel 3: 1048.576s tick, 64 slots (67108.864s total)
77/// 轮3:1048.576s滴答,64个槽(总共67108.864s)
78const WHEEL3_SIZE: usize = 64;
79#[allow(dead_code)]
80const WHEEL3_SHIFT: usize = 6;
81const WHEEL3_MASK: usize = WHEEL3_SIZE - 1;
82
83/// Maximum timeout in milliseconds (about 18.6 hours)
84/// 最大超时时间(毫秒,约18.6小时)
85#[allow(dead_code)]
86const MAX_TIMEOUT_MS: u64 =
87    (WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE * WHEEL3_SIZE) as u64 * TICK_MS;
88
89/// Timer entry in the timing wheel
90/// 时间轮中的定时器条目
91struct TimerEntry {
92    /// Unique identifier for this timer
93    /// 此定时器的唯一标识符
94    id: u64,
95    /// Expiration time in milliseconds (absolute)
96    /// 到期时间(毫秒,绝对值)
97    expiration_ms: u64,
98    /// Waker for this timer
99    /// 此定时器的waker
100    waker: Option<Waker>,
101    /// Whether this timer has been canceled
102    /// 此定时器是否已取消
103    #[allow(dead_code)]
104    canceled: Mutex<bool>,
105}
106
107unsafe impl Send for TimerEntry {}
108unsafe impl Sync for TimerEntry {}
109
110/// A timing wheel slot containing timer entries
111/// 包含定时器条目的时间轮槽
112#[derive(Debug)]
113struct TimerSlot {
114    /// List of timer entries in this slot
115    /// 此槽中的定时器条目列表
116    timers: UnsafeCell<LinkedList<TimerEntry>>,
117}
118
119impl TimerSlot {
120    fn new() -> Self {
121        Self {
122            timers: UnsafeCell::new(LinkedList::new()),
123        }
124    }
125
126    /// Add a timer to this slot
127    /// 向此槽添加定时器
128    unsafe fn push(&self, timer: TimerEntry) {
129        let list = &mut *self.timers.get();
130        list.push_back(timer);
131    }
132
133    /// Get all timers from this slot, clearing it
134    /// 从此槽获取所有定时器并清空
135    unsafe fn take_all(&self) -> LinkedList<TimerEntry> {
136        let list = &mut *self.timers.get();
137        std::mem::take(list)
138    }
139}
140
141// SAFETY: TimerSlot uses interior mutability with controlled access
142// TimerSlot使用受控访问的内部可变性
143unsafe impl Send for TimerSlot {}
144unsafe impl Sync for TimerSlot {}
145
146/// Hierarchical timing wheel for efficient timer management
147/// 用于高效定时器管理的分层时间轮
148///
149/// Uses 4 wheels with different granularities to cover a wide range of timeouts.
150/// 使用4个具有不同粒度的轮来覆盖大范围的超时。
151pub struct TimerWheel {
152    /// Current time in ticks
153    /// 当前时间(滴答)
154    current_ticks: AtomicU64,
155    /// Wheel 0 (finest granularity)
156    /// 轮0(最细粒度)
157    wheel0: Box<[TimerSlot; WHEEL0_SIZE]>,
158    /// Wheel 1
159    /// 轮1
160    wheel1: Box<[TimerSlot; WHEEL1_SIZE]>,
161    /// Wheel 2
162    /// 轮2
163    wheel2: Box<[TimerSlot; WHEEL2_SIZE]>,
164    /// Wheel 3 (coarsest granularity)
165    /// 轮3(最粗粒度)
166    wheel3: Box<[TimerSlot; WHEEL3_SIZE]>,
167    /// Next timer ID
168    /// 下一个定时器ID
169    next_id: AtomicU64,
170    /// Active timer registry for cancellation (ID -> slot index)
171    /// 活跃定时器注册表用于取消(ID -> 槽索引)
172    timer_registry: Mutex<HashMap<u64, TimerLocation>>,
173}
174
175/// Location of a timer in the wheel
176/// 定时器在轮中的位置
177#[derive(Clone, Copy, Debug)]
178struct TimerLocation {
179    /// Wheel level (0-3)
180    #[allow(dead_code)]
181    wheel_level: u8,
182    /// Slot index within the wheel
183    #[allow(dead_code)]
184    slot_index: usize,
185}
186
187// SAFETY: TimerWheel uses atomic operations and interior mutability
188// TimerWheel使用原子操作和内部可变性
189unsafe impl Send for TimerWheel {}
190unsafe impl Sync for TimerWheel {}
191
192impl TimerWheel {
193    /// Create a new timer wheel
194    /// 创建新的时间轮
195    pub fn new() -> Self {
196        Self {
197            current_ticks: AtomicU64::new(0),
198            wheel0: (0..WHEEL0_SIZE)
199                .map(|_| TimerSlot::new())
200                .collect::<Vec<_>>()
201                .into_boxed_slice()
202                .try_into()
203                .unwrap(),
204            wheel1: (0..WHEEL1_SIZE)
205                .map(|_| TimerSlot::new())
206                .collect::<Vec<_>>()
207                .into_boxed_slice()
208                .try_into()
209                .unwrap(),
210            wheel2: (0..WHEEL2_SIZE)
211                .map(|_| TimerSlot::new())
212                .collect::<Vec<_>>()
213                .into_boxed_slice()
214                .try_into()
215                .unwrap(),
216            wheel3: (0..WHEEL3_SIZE)
217                .map(|_| TimerSlot::new())
218                .collect::<Vec<_>>()
219                .into_boxed_slice()
220                .try_into()
221                .unwrap(),
222            next_id: AtomicU64::new(1),
223            timer_registry: Mutex::new(HashMap::new()),
224        }
225    }
226
227    /// Cancel a timer by ID
228    /// 通过ID取消定时器
229    pub fn cancel_timer(&self, id: u64) -> bool {
230        let mut registry = self.timer_registry.lock().unwrap();
231        if let Some(_location) = registry.remove(&id) {
232            // Timer was found and removed from registry
233            // The actual cancellation will be checked when the timer expires
234            // 定时器已找到并从注册表中移除
235            // 实际取消将在定时器到期时检查
236            true
237        } else {
238            false
239        }
240    }
241
242    /// Get the current tick count
243    /// 获取当前滴答计数
244    #[inline]
245    pub fn current_ticks(&self) -> u64 {
246        self.current_ticks.load(Ordering::Acquire)
247    }
248
249    /// Advance the timer wheel by the specified number of ticks
250    /// 将时间轮推进指定数量的滴答
251    ///
252    /// Returns the number of timers that expired during this advancement.
253    /// 返回在此推进期间到期的定时器数量。
254    pub fn advance(&self, ticks: u64) -> usize {
255        let mut expired = 0;
256        let _start = self.current_ticks.load(Ordering::Acquire);
257
258        for _ in 0..ticks {
259            let tick = self.current_ticks.fetch_add(1, Ordering::AcqRel);
260            let pos0 = (tick & WHEEL0_MASK as u64) as usize;
261
262            // Process wheel 0
263            // 处理轮0
264            unsafe {
265                let timers = self.wheel0[pos0].take_all();
266                for timer in timers {
267                    // Check if timer is still in registry (not canceled)
268                    // 检查定时器是否仍在注册表中(未取消)
269                    let is_active = {
270                        let mut registry = self.timer_registry.lock().unwrap();
271                        registry.remove(&timer.id).is_some()
272                    };
273
274                    if is_active {
275                        // Try to wake the timer
276                        // 尝试唤醒定时器
277                        if let Some(waker) = timer.waker {
278                            waker.wake();
279                        }
280                        expired += 1;
281                    }
282                }
283            }
284
285            // Cascade to wheel 1 every WHEEL0_SIZE ticks
286            // 每WHEEL0_SIZE个滴答级联到轮1
287            if tick & (WHEEL0_SIZE as u64 - 1) == 0 {
288                let pos1 = ((tick >> WHEEL0_SHIFT) & WHEEL1_MASK as u64) as usize;
289                unsafe {
290                    let timers = self.wheel1[pos1].take_all();
291                    for timer in timers {
292                        // Check if timer is still in registry
293                        // 检查定时器是否仍在注册表中
294                        let is_active = {
295                            let registry = self.timer_registry.lock().unwrap();
296                            registry.contains_key(&timer.id)
297                        };
298
299                        if is_active {
300                            // Re-insert into wheel 0
301                            // 重新插入轮0
302                            self.insert_timer_inner(timer);
303                        }
304                    }
305                }
306            }
307
308            // Cascade to wheel 2 every (WHEEL0_SIZE * WHEEL1_SIZE) ticks
309            // 每(WHEEL0_SIZE * WHEEL1_SIZE)个滴答级联到轮2
310            if tick & ((WHEEL0_SIZE * WHEEL1_SIZE) as u64 - 1) == 0 {
311                let pos2 = ((tick >> (WHEEL0_SHIFT + WHEEL1_SHIFT)) & WHEEL2_MASK as u64) as usize;
312                unsafe {
313                    let timers = self.wheel2[pos2].take_all();
314                    for timer in timers {
315                        // Check if timer is still in registry
316                        let is_active = {
317                            let registry = self.timer_registry.lock().unwrap();
318                            registry.contains_key(&timer.id)
319                        };
320
321                        if is_active {
322                            self.insert_timer_inner(timer);
323                        }
324                    }
325                }
326            }
327
328            // Cascade to wheel 3 every (WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE) ticks
329            // 每(WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE)个滴答级联到轮3
330            if tick & ((WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE) as u64 - 1) == 0 {
331                let pos3 = ((tick >> (WHEEL0_SHIFT + WHEEL1_SHIFT + WHEEL2_SHIFT))
332                    & WHEEL3_MASK as u64) as usize;
333                unsafe {
334                    let timers = self.wheel3[pos3].take_all();
335                    for timer in timers {
336                        // Check if timer is still in registry
337                        let is_active = {
338                            let registry = self.timer_registry.lock().unwrap();
339                            registry.contains_key(&timer.id)
340                        };
341
342                        if is_active {
343                            self.insert_timer_inner(timer);
344                        }
345                    }
346                }
347            }
348        }
349
350        expired
351    }
352
353    /// Insert a timer into the wheel
354    /// 向时间轮插入定时器
355    fn insert_timer_inner(&self, timer: TimerEntry) {
356        let current = self.current_ticks.load(Ordering::Acquire);
357        let expiration = timer.expiration_ms / TICK_MS;
358        let id = timer.id;
359
360        if expiration <= current {
361            // Already expired, wake immediately
362            // 已到期,立即唤醒
363            if let Some(waker) = timer.waker {
364                waker.wake();
365            }
366            return;
367        }
368
369        // Determine wheel level and position before inserting
370        // 在插入前确定轮层级和位置
371        let ticks = expiration - current;
372        let (wheel_level, pos) = if ticks < WHEEL0_SIZE as u64 {
373            (0u8, ((current + ticks) & WHEEL0_MASK as u64) as usize)
374        } else if ticks < (WHEEL0_SIZE * WHEEL1_SIZE) as u64 {
375            (1u8, (((current + ticks) >> WHEEL0_SHIFT) & WHEEL1_MASK as u64) as usize)
376        } else if ticks < (WHEEL0_SIZE * WHEEL1_SIZE * WHEEL2_SIZE) as u64 {
377            (
378                2u8,
379                (((current + ticks) >> (WHEEL0_SHIFT + WHEEL1_SHIFT)) & WHEEL2_MASK as u64)
380                    as usize,
381            )
382        } else {
383            (
384                3u8,
385                (((current + ticks) >> (WHEEL0_SHIFT + WHEEL1_SHIFT + WHEEL2_SHIFT))
386                    & WHEEL3_MASK as u64) as usize,
387            )
388        };
389
390        // Add to registry before inserting into wheel
391        // 在插入到轮之前添加到注册表
392        {
393            let mut registry = self.timer_registry.lock().unwrap();
394            registry.insert(
395                id,
396                TimerLocation {
397                    wheel_level,
398                    slot_index: pos,
399                },
400            );
401        }
402
403        // Insert into appropriate wheel
404        // 插入到适当的轮中
405        match wheel_level {
406            0 => unsafe { self.wheel0[pos].push(timer) },
407            1 => unsafe { self.wheel1[pos].push(timer) },
408            2 => unsafe { self.wheel2[pos].push(timer) },
409            _ => unsafe { self.wheel3[pos].push(timer) },
410        }
411    }
412
413    /// Insert a timer with the specified duration
414    /// 插入具有指定持续时间的定时器
415    pub fn insert_timer(&self, duration: Duration) -> TimerHandle {
416        let duration_ms = duration.as_millis() as u64;
417        let current = self.current_ticks.load(Ordering::Acquire);
418        let expiration_ms = (current * TICK_MS) + duration_ms;
419
420        let id = self.next_id.fetch_add(1, Ordering::AcqRel);
421
422        let timer = TimerEntry {
423            id,
424            expiration_ms,
425            waker: None,
426            canceled: Mutex::new(false),
427        };
428
429        // Insert into wheel and registry
430        // 插入到轮和注册表中
431        self.insert_timer_inner(timer);
432
433        TimerHandle::new(id)
434    }
435
436    /// Insert a timer with the specified duration and associated waker
437    /// 插入具有指定持续时间和关联waker的定时器
438    pub fn insert_timer_with_waker(&self, duration: Duration, waker: Waker) -> TimerHandle {
439        let duration_ms = duration.as_millis() as u64;
440        let current = self.current_ticks.load(Ordering::Acquire);
441        let expiration_ms = (current * TICK_MS) + duration_ms;
442
443        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
444
445        let timer = TimerEntry {
446            id,
447            expiration_ms,
448            waker: Some(waker),
449            canceled: Mutex::new(false),
450        };
451
452        self.insert_timer_inner(timer);
453
454        TimerHandle::new(id)
455    }
456
457    /// Get the next timer expiration time in milliseconds
458    /// 获取下一个定时器到期时间(毫秒)
459    ///
460    /// Returns `None` if there are no active timers.
461    /// 如果没有活动定时器则返回 `None`。
462    pub fn next_expiration(&self) -> Option<u64> {
463        // This is a simplified implementation
464        // A full implementation would scan all wheels
465        // 这是简化实现,完整实现会扫描所有轮
466        None
467    }
468}
469
470impl Default for TimerWheel {
471    fn default() -> Self {
472        Self::new()
473    }
474}
475
476/// Handle to a timer
477/// 定时器句柄
478///
479/// Can be used to cancel the timer before it expires.
480/// The handle references the global timer wheel, which has static lifetime.
481/// 可用于在定时器到期前取消它。
482/// 句柄引用全局时间轮,全局时间轮具有静态生命周期。
483#[derive(Clone)]
484pub struct TimerHandle {
485    #[allow(dead_code)]
486    id: u64,
487}
488
489// SAFETY: TimerHandle only contains a u64 — no raw pointers or non-Send data.
490// 安全:TimerHandle 只包含 u64 — 没有裸指针或非 Send 数据。
491unsafe impl Send for TimerHandle {}
492
493impl TimerHandle {
494    /// Cancel this timer
495    /// 取消此定时器
496    pub fn cancel(&self) {
497        // Access the global timer wheel directly — it has static lifetime
498        // 直接访问全局时间轮 — 它具有静态生命周期
499        global_timer().cancel_timer(self.id);
500    }
501
502    /// Create a new timer handle
503    /// 创建新的定时器句柄
504    fn new(id: u64) -> Self {
505        Self { id }
506    }
507}
508
509/// Global timer wheel instance
510/// 全局时间轮实例
511static GLOBAL_TIMER: OnceLock<TimerWheel> = OnceLock::new();
512
513/// Get the global timer wheel
514/// 获取全局时间轮
515#[inline]
516pub fn global_timer() -> &'static TimerWheel {
517    GLOBAL_TIMER.get_or_init(|| TimerWheel::new())
518}
519
520/// Sleep future that completes after the specified duration
521/// 在指定持续时间后完成的sleep future
522pub struct Sleep {
523    /// Duration to sleep / 睡眠持续时间
524    duration: Duration,
525    /// Whether the timer has been registered
526    /// 定时器是否已注册
527    registered: bool,
528    /// Start time / 开始时间
529    start: Option<Instant>,
530}
531
532impl Sleep {
533    /// Create a new sleep future
534    /// 创建新的sleep future
535    pub fn new(duration: Duration) -> Self {
536        Self {
537            duration,
538            registered: false,
539            start: None,
540        }
541    }
542}
543
544impl Future for Sleep {
545    type Output = ();
546
547    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
548        if self.registered {
549            // Check if the duration has elapsed
550            // 检查持续时间是否已过
551            if let Some(start) = self.start
552                && start.elapsed() >= self.duration
553            {
554                return Poll::Ready(());
555            }
556            Poll::Pending
557        } else {
558            // First poll: register the timer
559            // 第一次轮询:注册定时器
560            self.registered = true;
561            self.start = Some(Instant::now());
562
563            // Insert timer into the global timer wheel
564            // 将定时器插入全局时间轮
565            global_timer().insert_timer_with_waker(self.duration, cx.waker().clone());
566
567            // Check if already expired
568            // 检查是否已到期
569            Poll::Pending
570        }
571    }
572}
573
574/// Sleep for the specified duration
575/// 睡眠指定持续时间
576///
577/// # Example / 示例
578///
579/// ```rust,no_run,ignore
580/// use hiver_runtime::time::{sleep, Duration};
581///
582/// async fn example() {
583///     sleep(Duration::from_millis(100)).await;
584///     println!("Woke up after 100ms");
585/// }
586/// ```
587pub fn sleep(duration: Duration) -> Sleep {
588    Sleep::new(duration)
589}
590
591/// Sleep until the specified instant
592/// 睡眠直到指定时刻
593///
594/// # Example / 示例
595///
596/// ```rust,no_run,ignore
597/// use hiver_runtime::time::{sleep_until, Instant, Duration};
598///
599/// async fn example() {
600///     let deadline = Instant::now() + Duration::from_secs(5);
601///     sleep_until(deadline).await;
602///     println!("5 seconds have passed");
603/// }
604/// ```
605pub fn sleep_until(instant: Instant) -> SleepUntil {
606    let now = Instant::now();
607    let duration = if instant > now {
608        instant.duration_since(now)
609    } else {
610        Duration::ZERO
611    };
612
613    SleepUntil {
614        sleep: sleep(duration),
615    }
616}
617
618/// Sleep until future
619/// sleep until future
620pub struct SleepUntil {
621    sleep: Sleep,
622}
623
624impl Future for SleepUntil {
625    type Output = ();
626
627    fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
628        Pin::new(&mut self.sleep).poll(_cx)
629    }
630}
631
632/// Interval timer that yields at regular intervals
633/// 以固定间隔产生的间隔定时器
634///
635/// # Example / 示例
636///
637/// ```rust,no_run,ignore
638/// use hiver_runtime::time::{interval, Duration};
639///
640/// async fn example() {
641///     let mut ticker = interval(Duration::from_secs(1));
642///     for _ in 0..5 {
643///         ticker.tick().await;
644///         println!("Tick!");
645///     }
646/// }
647/// ```
648pub fn interval(duration: Duration) -> Interval {
649    Interval {
650        duration,
651        next: Instant::now(),
652    }
653}
654
655/// Interval stream
656/// 间隔流
657pub struct Interval {
658    duration: Duration,
659    next: Instant,
660}
661
662impl Interval {
663    /// Wait for the next tick
664    /// 等待下一个滴答
665    pub async fn tick(&mut self) -> Instant {
666        let now = Instant::now();
667        if now >= self.next {
668            self.next = now + self.duration;
669        }
670
671        sleep_until(self.next).await;
672        self.next
673    }
674}
675
676#[cfg(test)]
677mod tests {
678    use super::*;
679
680    #[test]
681    fn test_timer_wheel_creation() {
682        let wheel = TimerWheel::new();
683        assert_eq!(wheel.current_ticks(), 0);
684    }
685
686    #[test]
687    fn test_timer_constants() {
688        assert_eq!(TICK_MS, 1);
689        assert_eq!(WHEEL0_SIZE, 256);
690        assert_eq!(WHEEL1_SIZE, 64);
691        assert_eq!(WHEEL2_SIZE, 64);
692        assert_eq!(WHEEL3_SIZE, 64);
693    }
694
695    #[test]
696    fn test_global_timer() {
697        let timer = global_timer();
698        assert_eq!(timer.current_ticks(), 0);
699    }
700
701    #[test]
702    fn test_max_timeout() {
703        // Maximum timeout should be about 18.6 hours
704        // 最大超时应该约18.6小时
705        assert!(MAX_TIMEOUT_MS > 60_000 * 60 * 18);
706        assert!(MAX_TIMEOUT_MS < 60_000 * 60 * 20);
707    }
708}