veilid_tools/
tick_task.rs1use super::*;
2
3use core::sync::atomic::{AtomicU64, Ordering};
4use once_cell::sync::OnceCell;
5
6type TickTaskRoutine<E> =
7 dyn Fn(StopToken, u64, u64) -> PinBoxFutureStatic<Result<(), E>> + Send + Sync + 'static;
8
9pub struct TickTask<E: Send + 'static> {
13 name: String,
14 last_timestamp_us: AtomicU64,
15 tick_period_us: u64,
16 routine: OnceCell<Box<TickTaskRoutine<E>>>,
17 stop_source: AsyncMutex<Option<StopSource>>,
18 single_future: MustJoinSingleFuture<Result<(), E>>,
19 running: Arc<AtomicBool>,
20}
21
22impl<E: Send + fmt::Debug + 'static> TickTask<E> {
23 #[must_use]
24 pub fn new_us(name: &str, tick_period_us: u64) -> Self {
25 Self {
26 name: name.to_string(),
27 last_timestamp_us: AtomicU64::new(0),
28 tick_period_us,
29 routine: OnceCell::new(),
30 stop_source: AsyncMutex::new(None),
31 single_future: MustJoinSingleFuture::new(),
32 running: Arc::new(AtomicBool::new(false)),
33 }
34 }
35 #[must_use]
36 pub fn new_ms(name: &str, tick_period_ms: u32) -> Self {
37 Self {
38 name: name.to_string(),
39 last_timestamp_us: AtomicU64::new(0),
40 tick_period_us: (tick_period_ms as u64) * 1000u64,
41 routine: OnceCell::new(),
42 stop_source: AsyncMutex::new(None),
43 single_future: MustJoinSingleFuture::new(),
44 running: Arc::new(AtomicBool::new(false)),
45 }
46 }
47 #[must_use]
48 pub fn new(name: &str, tick_period_sec: u32) -> Self {
49 Self {
50 name: name.to_string(),
51 last_timestamp_us: AtomicU64::new(0),
52 tick_period_us: (tick_period_sec as u64) * 1000000u64,
53 routine: OnceCell::new(),
54 stop_source: AsyncMutex::new(None),
55 single_future: MustJoinSingleFuture::new(),
56 running: Arc::new(AtomicBool::new(false)),
57 }
58 }
59
60 pub fn set_routine(
61 &self,
62 routine: impl Fn(StopToken, u64, u64) -> PinBoxFutureStatic<Result<(), E>>
63 + Send
64 + Sync
65 + 'static,
66 ) {
67 self.routine
68 .set(Box::new(routine))
69 .map_err(drop)
70 .unwrap_or_log();
71 }
72
73 pub fn is_running(&self) -> bool {
74 self.running.load(core::sync::atomic::Ordering::Acquire)
75 }
76
77 pub fn last_timestamp_us(&self) -> Option<u64> {
78 let ts = self
79 .last_timestamp_us
80 .load(core::sync::atomic::Ordering::Acquire);
81 if ts == 0 {
82 None
83 } else {
84 Some(ts)
85 }
86 }
87
88 pub async fn stop(&self) -> Result<(), E> {
89 {
91 let mut stop_source_guard = self.stop_source.lock().await;
92 if stop_source_guard.is_none() {
93 return Ok(());
95 }
96 drop(stop_source_guard.take());
97 }
98
99 match pin_future!(self.single_future.join()).await {
101 Ok(Some(Err(err))) => Err(err),
102 _ => Ok(()),
103 }
104 }
105
106 pub async fn tick(&self) -> Result<(), E> {
107 let now = get_raw_timestamp();
108 let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
109
110 if last_timestamp_us != 0u64 && now.saturating_sub(last_timestamp_us) < self.tick_period_us
111 {
112 return Ok(());
114 }
115
116 let itick = self.internal_tick(now, last_timestamp_us);
117
118 itick.await.map(drop)
119 }
120
121 pub async fn try_tick_now(&self) -> Result<bool, E> {
122 let now = get_raw_timestamp();
123 let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
124
125 let itick = self.internal_tick(now, last_timestamp_us);
126
127 itick.await
128 }
129
130 async fn internal_tick(&self, now: u64, last_timestamp_us: u64) -> Result<bool, E> {
131 let mut stop_source_guard = self.stop_source.lock().await;
133
134 let stop_source = StopSource::new();
136 let stop_token = stop_source.token();
137 let make_singlefuture_closure = || {
138 let running = self.running.clone();
139 let routine = self.routine.get().unwrap_or_log()(stop_token, last_timestamp_us, now);
140
141 Box::pin(async move {
142 running.store(true, core::sync::atomic::Ordering::Release);
143 let out = routine.await;
144 running.store(false, core::sync::atomic::Ordering::Release);
145 out
146 })
147 };
148
149 match self
150 .single_future
151 .single_spawn(&self.name, make_singlefuture_closure)
152 .await
153 {
154 Ok((res, ran)) => {
156 if ran {
158 self.last_timestamp_us.store(now, Ordering::Release);
160 *stop_source_guard = Some(stop_source);
162 }
163
164 match res {
165 Some(Ok(())) => {
166 Ok(ran)
168 }
169 Some(Err(e)) => {
170 Err(e)
172 }
173 None => {
174 Ok(ran)
176 }
177 }
178 }
179 Err(()) => {
180 Ok(false)
183 }
184 }
185 }
186}