1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
use super::*;

use core::sync::atomic::{AtomicU64, Ordering};
use once_cell::sync::OnceCell;

type TickTaskRoutine<E> =
    dyn Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static;

/// Runs a single-future background processing task, attempting to run it once every 'tick period' microseconds.
/// If the prior tick is still running, it will allow it to finish, and do another tick when the timer comes around again.
/// One should attempt to make tasks short-lived things that run in less than the tick period if you want things to happen with regular periodicity.
pub struct TickTask<E: Send + 'static> {
    last_timestamp_us: AtomicU64,
    tick_period_us: u64,
    routine: OnceCell<Box<TickTaskRoutine<E>>>,
    stop_source: AsyncMutex<Option<StopSource>>,
    single_future: MustJoinSingleFuture<Result<(), E>>,
    running: Arc<AtomicBool>,
}

impl<E: Send + 'static> TickTask<E> {
    pub fn new_us(tick_period_us: u64) -> Self {
        Self {
            last_timestamp_us: AtomicU64::new(0),
            tick_period_us,
            routine: OnceCell::new(),
            stop_source: AsyncMutex::new(None),
            single_future: MustJoinSingleFuture::new(),
            running: Arc::new(AtomicBool::new(false)),
        }
    }
    pub fn new_ms(tick_period_ms: u32) -> Self {
        Self {
            last_timestamp_us: AtomicU64::new(0),
            tick_period_us: (tick_period_ms as u64) * 1000u64,
            routine: OnceCell::new(),
            stop_source: AsyncMutex::new(None),
            single_future: MustJoinSingleFuture::new(),
            running: Arc::new(AtomicBool::new(false)),
        }
    }
    pub fn new(tick_period_sec: u32) -> Self {
        Self {
            last_timestamp_us: AtomicU64::new(0),
            tick_period_us: (tick_period_sec as u64) * 1000000u64,
            routine: OnceCell::new(),
            stop_source: AsyncMutex::new(None),
            single_future: MustJoinSingleFuture::new(),
            running: Arc::new(AtomicBool::new(false)),
        }
    }

    pub fn set_routine(
        &self,
        routine: impl Fn(StopToken, u64, u64) -> SendPinBoxFuture<Result<(), E>> + Send + Sync + 'static,
    ) {
        self.routine.set(Box::new(routine)).map_err(drop).unwrap();
    }

    pub fn is_running(&self) -> bool {
        self.running.load(core::sync::atomic::Ordering::Relaxed)
    }

    pub async fn stop(&self) -> Result<(), E> {
        // drop the stop source if we have one
        let opt_stop_source = &mut *self.stop_source.lock().await;
        if opt_stop_source.is_none() {
            // already stopped, just return
            trace!("tick task already stopped");
            return Ok(());
        }
        drop(opt_stop_source.take());

        // wait for completion of the tick task
        trace!("stopping single future");
        match self.single_future.join().await {
            Ok(Some(Err(err))) => Err(err),
            _ => Ok(()),
        }
    }

    pub async fn tick(&self) -> Result<(), E> {
        let now = get_timestamp();
        let last_timestamp_us = self.last_timestamp_us.load(Ordering::Acquire);

        if last_timestamp_us != 0u64 && now.saturating_sub(last_timestamp_us) < self.tick_period_us
        {
            // It's not time yet
            return Ok(());
        }

        // Lock the stop source, tells us if we have ever started this future
        let opt_stop_source = &mut *self.stop_source.lock().await;
        if opt_stop_source.is_some() {
            // See if the previous execution finished with an error
            match self.single_future.check().await {
                Ok(Some(Err(e))) => {
                    // We have an error result, which means the singlefuture ran but we need to propagate the error
                    return Err(e);
                }
                Ok(Some(Ok(()))) => {
                    // We have an ok result, which means the singlefuture ran, and we should run it again this tick
                }
                Ok(None) => {
                    // No prior result to return which means things are still running
                    // We can just return now, since the singlefuture will not run a second time
                    return Ok(());
                }
                Err(()) => {
                    // If we get this, it's because we are joining the singlefuture already
                    // Don't bother running but this is not an error in this case
                    return Ok(());
                }
            };
        }

        // Run the singlefuture
        let stop_source = StopSource::new();
        let stop_token = stop_source.token();
        let running = self.running.clone();
        let routine = self.routine.get().unwrap()(stop_token, last_timestamp_us, now);
        let wrapped_routine = Box::pin(async move {
            running.store(true, core::sync::atomic::Ordering::Relaxed);
            let out = routine.await;
            running.store(false, core::sync::atomic::Ordering::Relaxed);
            out
        });
        match self.single_future.single_spawn(wrapped_routine).await {
            // We should have already consumed the result of the last run, or there was none
            // and we should definitely have run, because the prior 'check()' operation
            // should have ensured the singlefuture was ready to run
            Ok((None, true)) => {
                // Set new timer
                self.last_timestamp_us.store(now, Ordering::Release);
                // Save new stopper
                *opt_stop_source = Some(stop_source);
                Ok(())
            }
            // All other conditions should not be reachable
            _ => {
                unreachable!();
            }
        }
    }
}