Skip to main content

veilid_tools/
tick_task.rs

1use super::*;
2
3use core::sync::atomic::{AtomicU64, Ordering};
4use once_cell::sync::OnceCell;
5
6type TickTaskRoutine<E> =
7    dyn Fn(StopToken, u64, u64) -> PinBoxFutureStatic<Result<(), E>> + Send + Sync + 'static;
8
9/// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds.
10/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
11/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
12pub struct TickTask<E: Send + 'static> {
13    name: String,
14    last_timestamp_us: AtomicU64,
15    tick_period_us: u64,
16    routine: OnceCell<Box<TickTaskRoutine<E>>>,
17    stop_source: AsyncMutex<Option<StopSource>>,
18    single_future: MustJoinSingleFuture<Result<(), E>>,
19    running: Arc<AtomicBool>,
20}
21
22impl<E: Send + fmt::Debug + 'static> TickTask<E> {
23    #[must_use]
24    pub fn new_us(name: &str, tick_period_us: u64) -> Self {
25        Self {
26            name: name.to_string(),
27            last_timestamp_us: AtomicU64::new(0),
28            tick_period_us,
29            routine: OnceCell::new(),
30            stop_source: AsyncMutex::new(None),
31            single_future: MustJoinSingleFuture::new(),
32            running: Arc::new(AtomicBool::new(false)),
33        }
34    }
35    #[must_use]
36    pub fn new_ms(name: &str, tick_period_ms: u32) -> Self {
37        Self {
38            name: name.to_string(),
39            last_timestamp_us: AtomicU64::new(0),
40            tick_period_us: (tick_period_ms as u64) * 1000u64,
41            routine: OnceCell::new(),
42            stop_source: AsyncMutex::new(None),
43            single_future: MustJoinSingleFuture::new(),
44            running: Arc::new(AtomicBool::new(false)),
45        }
46    }
47    #[must_use]
48    pub fn new(name: &str, tick_period_sec: u32) -> Self {
49        Self {
50            name: name.to_string(),
51            last_timestamp_us: AtomicU64::new(0),
52            tick_period_us: (tick_period_sec as u64) * 1000000u64,
53            routine: OnceCell::new(),
54            stop_source: AsyncMutex::new(None),
55            single_future: MustJoinSingleFuture::new(),
56            running: Arc::new(AtomicBool::new(false)),
57        }
58    }
59
60    pub fn set_routine(
61        &self,
62        routine: impl Fn(StopToken, u64, u64) -> PinBoxFutureStatic<Result<(), E>>
63            + Send
64            + Sync
65            + 'static,
66    ) {
67        self.routine
68            .set(Box::new(routine))
69            .map_err(drop)
70            .unwrap_or_log();
71    }
72
73    pub fn is_running(&self) -> bool {
74        self.running.load(core::sync::atomic::Ordering::Acquire)
75    }
76
77    pub fn last_timestamp_us(&self) -> Option<u64> {
78        let ts = self
79            .last_timestamp_us
80            .load(core::sync::atomic::Ordering::Acquire);
81        if ts == 0 {
82            None
83        } else {
84            Some(ts)
85        }
86    }
87
88    pub async fn stop(&self) -> Result<(), E> {
89        // drop the stop source if we have one
90        {
91            let mut stop_source_guard = self.stop_source.lock().await;
92            if stop_source_guard.is_none() {
93                // already stopped, just return
94                return Ok(());
95            }
96            drop(stop_source_guard.take());
97        }
98
99        // wait for completion of the tick task
100        match pin_future!(self.single_future.join()).await {
101            Ok(Some(Err(err))) => Err(err),
102            _ => Ok(()),
103        }
104    }
105
106    pub async fn tick(&self) -> Result<(), E> {
107        let now = get_raw_timestamp();
108        let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
109
110        if last_timestamp_us != 0u64 && now.saturating_sub(last_timestamp_us) < self.tick_period_us
111        {
112            // It's not time yet
113            return Ok(());
114        }
115
116        let itick = self.internal_tick(now, last_timestamp_us);
117
118        itick.await.map(drop)
119    }
120
121    pub async fn try_tick_now(&self) -> Result<bool, E> {
122        let now = get_raw_timestamp();
123        let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
124
125        let itick = self.internal_tick(now, last_timestamp_us);
126
127        itick.await
128    }
129
130    async fn internal_tick(&self, now: u64, last_timestamp_us: u64) -> Result<bool, E> {
131        // Lock the stop source, tells us if we have ever started this future
132        let mut stop_source_guard = self.stop_source.lock().await;
133
134        // Run the singlefuture
135        let stop_source = StopSource::new();
136        let stop_token = stop_source.token();
137        let make_singlefuture_closure = || {
138            let running = self.running.clone();
139            let routine = self.routine.get().unwrap_or_log()(stop_token, last_timestamp_us, now);
140
141            Box::pin(async move {
142                running.store(true, core::sync::atomic::Ordering::Release);
143                let out = routine.await;
144                running.store(false, core::sync::atomic::Ordering::Release);
145                out
146            })
147        };
148
149        match self
150            .single_future
151            .single_spawn(&self.name, make_singlefuture_closure)
152            .await
153        {
154            // A new singlefuture ran
155            Ok((res, ran)) => {
156                // If the previous run finished and we started a new one, switch the stopper
157                if ran {
158                    // Set new timer
159                    self.last_timestamp_us.store(now, Ordering::Release);
160                    // Save new stopper
161                    *stop_source_guard = Some(stop_source);
162                }
163
164                match res {
165                    Some(Ok(())) => {
166                        // Prior run returned successfully
167                        Ok(ran)
168                    }
169                    Some(Err(e)) => {
170                        // Prior run returned an error, propagate it
171                        Err(e)
172                    }
173                    None => {
174                        // No prior run or nothing completed
175                        Ok(ran)
176                    }
177                }
178            }
179            Err(()) => {
180                // If we get this, it's because we are joining the singlefuture already
181                // Don't bother running but this is not an error in this case
182                Ok(false)
183            }
184        }
185    }
186}