1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145
use super::*;
use core::sync::atomic::{AtomicU64, Ordering};
use once_cell::sync::OnceCell;
type TickTaskRoutine<E> =
dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static;
/// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds.
/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
pub struct TickTask<E: Send + 'static> {
last_timestamp_us: AtomicU64,
tick_period_us: u64,
routine: OnceCell<Box<TickTaskRoutine<E>>>,
stop_source: AsyncMutex<Option<StopSource>>,
single_future: MustJoinSingleFuture<Result<(), E>>,
running: Arc<AtomicBool>,
}
impl<E: Send + 'static> TickTask<E> {
pub fn new_us(tick_period_us: u64) -> Self {
Self {
last_timestamp_us: AtomicU64::new(0),
tick_period_us,
routine: OnceCell::new(),
stop_source: AsyncMutex::new(None),
single_future: MustJoinSingleFuture::new(),
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn new_ms(tick_period_ms: u32) -> Self {
Self {
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_ms as u64) * 1000u64,
routine: OnceCell::new(),
stop_source: AsyncMutex::new(None),
single_future: MustJoinSingleFuture::new(),
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn new(tick_period_sec: u32) -> Self {
Self {
last_timestamp_us: AtomicU64::new(0),
tick_period_us: (tick_period_sec as u64) * 1000000u64,
routine: OnceCell::new(),
stop_source: AsyncMutex::new(None),
single_future: MustJoinSingleFuture::new(),
running: Arc::new(AtomicBool::new(false)),
}
}
pub fn set_routine(
&self,
routine: impl Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static,
) {
self.routine.set(Box::new(routine)).map_err(drop).unwrap();
}
pub fn is_running(&self) -> bool {
self.running.load(core::sync::atomic::Ordering::Relaxed)
}
pub async fn stop(&self) -> Result<(), E> {
// drop the stop source if we have one
let opt_stop_source = &mut *self.stop_source.lock().await;
if opt_stop_source.is_none() {
// already stopped, just return
trace!("tick task already stopped");
return Ok(());
}
drop(opt_stop_source.take());
// wait for completion of the tick task
trace!("stopping single future");
match self.single_future.join().await {
Ok(Some(Err(err))) => Err(err),
_ => Ok(()),
}
}
pub async fn tick(&self) -> Result<(), E> {
let now = get_timestamp();
let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);
if last_timestamp_us != 0u64 && now.saturating_sub(last_timestamp_us) < self.tick_period_us
{
// It's not time yet
return Ok(());
}
// Lock the stop source, tells us if we have ever started this future
let opt_stop_source = &mut *self.stop_source.lock().await;
if opt_stop_source.is_some() {
// See if the previous execution finished with an error
match self.single_future.check().await {
Ok(Some(Err(e))) => {
// We have an error result, which means the singlefuture ran but we need to propagate the error
return Err(e);
}
Ok(Some(Ok(()))) => {
// We have an ok result, which means the singlefuture ran, and we should run it again this tick
}
Ok(None) => {
// No prior result to return which means things are still running
// We can just return now, since the singlefuture will not run a second time
return Ok(());
}
Err(()) => {
// If we get this, it's because we are joining the singlefuture already
// Don't bother running but this is not an error in this case
return Ok(());
}
};
}
// Run the singlefuture
let stop_source = StopSource::new();
let stop_token = stop_source.token();
let running = self.running.clone();
let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
let wrapped_routine = Box::pin(async move {
running.store(true, core::sync::atomic::Ordering::Relaxed);
let out = routine.await;
running.store(false, core::sync::atomic::Ordering::Relaxed);
out
});
match self.single_future.single_spawn(wrapped_routine).await {
// We should have already consumed the result of the last run, or there was none
// and we should definitely have run, because the prior 'check()' operation
// should have ensured the singlefuture was ready to run
Ok((None, true)) => {
// Set new timer
self.last_timestamp_us.store(now, Ordering::Release);
// Save new stopper
*opt_stop_source = Some(stop_source);
Ok(())
}
// All other conditions should not be reachable
_ => {
unreachable!();
}
}
}
}