synchronous_timer/
task.rs

1use std::panic::UnwindSafe;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6#[derive(Debug, Default)]
7struct TaskState {
8    running: Arc<AtomicBool>,
9    dropped: Arc<AtomicBool>,
10}
11
12pub(crate) enum TaskCallable {
13    Once(Box<dyn FnOnce() + UnwindSafe + Send + 'static>),
14    Repeating(Box<dyn FnMut() + UnwindSafe + Send + 'static>, Duration),
15}
16
17impl TaskCallable {
18    pub fn new_once<F: FnOnce() + UnwindSafe + Send + 'static>(f: F) -> Self {
19        Self::Once(Box::new(f))
20    }
21
22    pub fn new_repeating<F: FnMut() + UnwindSafe + Send + 'static>(
23        f: F,
24        interval: Duration,
25    ) -> Self {
26        Self::Repeating(Box::new(f), interval)
27    }
28}
29
30impl std::fmt::Debug for TaskCallable {
31    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32        match self {
33            Self::Once(_) => write!(f, "TaskCallable::Once(<unformattable>)"),
34            Self::Repeating(_, i) => write!(f, "TaskCallable::Repeating(<unformattable>, {:?})", i),
35        }
36    }
37}
38
39#[derive(Debug, PartialEq, Eq)]
40pub(crate) enum Ready {
41    Now,
42    In(Duration),
43}
44
45#[derive(Debug)]
46pub(crate) struct Task {
47    task_id: u64,
48    next_execution: Instant,
49    task: TaskState,
50    callable: TaskCallable,
51}
52
53impl Task {
54    pub fn new(task_id: u64, next_execution: Instant, callable: TaskCallable) -> Self {
55        Self {
56            task_id,
57            next_execution,
58            task: TaskState::default(),
59            callable,
60        }
61    }
62
63    /// Run this task. If there is a "next_execution", return a new TaskHandle with the fields
64    /// updated
65    pub fn run(self) -> Option<Task> {
66        let task_id = self.task_id;
67        let task = self.task;
68        let was_running = task.running.swap(true, Ordering::Acquire);
69        if was_running {
70            log::error!("encountered a running task (a.k.a. a panic); not running again");
71            return None;
72        }
73        match self.callable {
74            TaskCallable::Repeating(mut f, interval) => {
75                let next_execution = Instant::now() + interval;
76                f();
77                task.running.store(false, Ordering::Release);
78                Some(Task {
79                    task_id,
80                    next_execution,
81                    task,
82                    callable: TaskCallable::Repeating(f, interval),
83                })
84            }
85            TaskCallable::Once(f) => {
86                f();
87                task.running.store(false, Ordering::Release);
88                None
89            }
90        }
91    }
92
93    pub fn id(&self) -> u64 {
94        self.task_id
95    }
96
97    pub fn dropped(&self) -> bool {
98        self.task.dropped.load(Ordering::Relaxed)
99    }
100
101    pub fn ready(&self, now: Instant) -> Ready {
102        if now > self.next_execution {
103            Ready::Now
104        } else {
105            Ready::In(self.next_execution - now)
106        }
107    }
108
109    pub fn guard(&self) -> TaskGuard {
110        TaskGuard::new(self.task_id, Arc::clone(&self.task.dropped))
111    }
112}
113
114impl PartialEq<Task> for Task {
115    fn eq(&self, other: &Task) -> bool {
116        self.task_id == other.task_id
117    }
118}
119
120impl PartialOrd<Task> for Task {
121    fn partial_cmp(&self, other: &Task) -> Option<std::cmp::Ordering> {
122        Some(self.cmp(other))
123    }
124}
125
126impl Eq for Task {}
127
128impl Ord for Task {
129    fn cmp(&self, other: &Task) -> std::cmp::Ordering {
130        match self.next_execution.cmp(&other.next_execution).reverse() {
131            std::cmp::Ordering::Equal => self.task_id.cmp(&other.task_id).reverse(),
132            other => other,
133        }
134    }
135}
136
137#[derive(Debug)]
138/// A `TaskGuard` represents a handle to a future task. When it is dropped, we will attempt to cancel that task. If you would like the task to continue running in the background, use the `.detach()` method
139pub struct TaskGuard {
140    task_id: u64,
141    dropped: Option<Arc<AtomicBool>>,
142}
143
144impl TaskGuard {
145    fn new(task_id: u64, dropped: Arc<AtomicBool>) -> Self {
146        Self {
147            task_id,
148            dropped: Some(dropped),
149        }
150    }
151
152    /// Get the ID of the underlying task, for debugging
153    pub fn task_id(&self) -> u64 {
154        self.task_id
155    }
156
157    /// Detach this `TaskGuard` from the underlying `Task` so that dropping this guard will no
158    /// longer cancel the task.
159    pub fn detach(mut self) {
160        self.dropped.take();
161    }
162}
163
164impl Drop for TaskGuard {
165    fn drop(&mut self) {
166        if let Some(dropped) = self.dropped.take() {
167            dropped.store(true, Ordering::Relaxed);
168        }
169    }
170}