synchronous_timer/
task.rs1use std::panic::UnwindSafe;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::sync::Arc;
4use std::time::{Duration, Instant};
5
6#[derive(Debug, Default)]
7struct TaskState {
8 running: Arc<AtomicBool>,
9 dropped: Arc<AtomicBool>,
10}
11
12pub(crate) enum TaskCallable {
13 Once(Box<dyn FnOnce() + UnwindSafe + Send + 'static>),
14 Repeating(Box<dyn FnMut() + UnwindSafe + Send + 'static>, Duration),
15}
16
17impl TaskCallable {
18 pub fn new_once<F: FnOnce() + UnwindSafe + Send + 'static>(f: F) -> Self {
19 Self::Once(Box::new(f))
20 }
21
22 pub fn new_repeating<F: FnMut() + UnwindSafe + Send + 'static>(
23 f: F,
24 interval: Duration,
25 ) -> Self {
26 Self::Repeating(Box::new(f), interval)
27 }
28}
29
30impl std::fmt::Debug for TaskCallable {
31 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
32 match self {
33 Self::Once(_) => write!(f, "TaskCallable::Once(<unformattable>)"),
34 Self::Repeating(_, i) => write!(f, "TaskCallable::Repeating(<unformattable>, {:?})", i),
35 }
36 }
37}
38
39#[derive(Debug, PartialEq, Eq)]
40pub(crate) enum Ready {
41 Now,
42 In(Duration),
43}
44
45#[derive(Debug)]
46pub(crate) struct Task {
47 task_id: u64,
48 next_execution: Instant,
49 task: TaskState,
50 callable: TaskCallable,
51}
52
53impl Task {
54 pub fn new(task_id: u64, next_execution: Instant, callable: TaskCallable) -> Self {
55 Self {
56 task_id,
57 next_execution,
58 task: TaskState::default(),
59 callable,
60 }
61 }
62
63 pub fn run(self) -> Option<Task> {
66 let task_id = self.task_id;
67 let task = self.task;
68 let was_running = task.running.swap(true, Ordering::Acquire);
69 if was_running {
70 log::error!("encountered a running task (a.k.a. a panic); not running again");
71 return None;
72 }
73 match self.callable {
74 TaskCallable::Repeating(mut f, interval) => {
75 let next_execution = Instant::now() + interval;
76 f();
77 task.running.store(false, Ordering::Release);
78 Some(Task {
79 task_id,
80 next_execution,
81 task,
82 callable: TaskCallable::Repeating(f, interval),
83 })
84 }
85 TaskCallable::Once(f) => {
86 f();
87 task.running.store(false, Ordering::Release);
88 None
89 }
90 }
91 }
92
93 pub fn id(&self) -> u64 {
94 self.task_id
95 }
96
97 pub fn dropped(&self) -> bool {
98 self.task.dropped.load(Ordering::Relaxed)
99 }
100
101 pub fn ready(&self, now: Instant) -> Ready {
102 if now > self.next_execution {
103 Ready::Now
104 } else {
105 Ready::In(self.next_execution - now)
106 }
107 }
108
109 pub fn guard(&self) -> TaskGuard {
110 TaskGuard::new(self.task_id, Arc::clone(&self.task.dropped))
111 }
112}
113
114impl PartialEq<Task> for Task {
115 fn eq(&self, other: &Task) -> bool {
116 self.task_id == other.task_id
117 }
118}
119
120impl PartialOrd<Task> for Task {
121 fn partial_cmp(&self, other: &Task) -> Option<std::cmp::Ordering> {
122 Some(self.cmp(other))
123 }
124}
125
126impl Eq for Task {}
127
128impl Ord for Task {
129 fn cmp(&self, other: &Task) -> std::cmp::Ordering {
130 match self.next_execution.cmp(&other.next_execution).reverse() {
131 std::cmp::Ordering::Equal => self.task_id.cmp(&other.task_id).reverse(),
132 other => other,
133 }
134 }
135}
136
137#[derive(Debug)]
138pub struct TaskGuard {
140 task_id: u64,
141 dropped: Option<Arc<AtomicBool>>,
142}
143
144impl TaskGuard {
145 fn new(task_id: u64, dropped: Arc<AtomicBool>) -> Self {
146 Self {
147 task_id,
148 dropped: Some(dropped),
149 }
150 }
151
152 pub fn task_id(&self) -> u64 {
154 self.task_id
155 }
156
157 pub fn detach(mut self) {
160 self.dropped.take();
161 }
162}
163
164impl Drop for TaskGuard {
165 fn drop(&mut self) {
166 if let Some(dropped) = self.dropped.take() {
167 dropped.store(true, Ordering::Relaxed);
168 }
169 }
170}