1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
/*
 * Created on Thu Aug 10 2023
 *
 * Copyright (c) storycraft. Licensed under the MIT Licence.
 */

use std::{
    num::NonZeroU64,
    sync::atomic::{AtomicU64, Ordering},
};

use futures_intrusive::timer::{Clock, Timer, TimerService};

pub use futures_intrusive::timer::TimerFuture;
use instant::Duration;

use crate::executor::executor_handle;

#[derive(Debug)]
pub(crate) struct ExecutorTimer {
    service: TimerService,
    next_expiration: AtomicU64,
}

impl ExecutorTimer {
    pub fn new() -> Self {
        struct InstantClock;

        impl Clock for InstantClock {
            fn now(&self) -> u64 {
                instant::now() as u64
            }
        }

        Self {
            service: TimerService::new(&InstantClock),
            next_expiration: AtomicU64::new(0),
        }
    }

    pub fn update_next(&self) -> UpdateState {
        let next = self.next_expiration.load(Ordering::Acquire);
        if next == 0 {
            return UpdateState::None;
        }

        let now = instant::now() as u64;

        if next <= now {
            self.service.check_expirations();
            self.next_expiration.store(
                self.service.next_expiration().unwrap_or(0),
                Ordering::Release,
            );

            UpdateState::Triggered
        } else {
            UpdateState::WaitTimeout(NonZeroU64::new(next - now).unwrap())
        }
    }

    pub fn delay(&self, delay: Duration) -> TimerFuture {
        self.deadline(instant::now() as u64 + delay.as_millis() as u64)
    }

    pub fn deadline(&self, timestamp: u64) -> TimerFuture {
        let future = self.service.deadline(timestamp);

        let _ = self
            .next_expiration
            .fetch_update(Ordering::Release, Ordering::Acquire, |next| {
                if next == 0 || next > timestamp {
                    Some(timestamp)
                } else {
                    None
                }
            });

        future
    }
}

#[derive(Debug, Clone, Copy)]
pub(crate) enum UpdateState {
    None,
    Triggered,
    WaitTimeout(NonZeroU64),
}

/// Create Future waiting for given duration
pub fn wait(delay: Duration) -> TimerFuture<'static> {
    executor_handle().wait(delay)
}

/// Create Future waiting for given timestamp
pub fn wait_deadline(timestamp: u64) -> TimerFuture<'static> {
    executor_handle().wait_deadline(timestamp)
}