use crate::{set_recent, Clock};
use std::{
fmt, io,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::{self, JoinHandle},
time::Duration,
};
static GLOBAL_UPKEEP_RUNNING: AtomicBool = AtomicBool::new(false);
#[derive(Debug)]
pub struct Upkeep {
interval: Duration,
clock: Clock,
}
#[derive(Debug)]
pub struct Handle {
done: Arc<AtomicBool>,
handle: Option<JoinHandle<()>>,
}
#[derive(Debug)]
pub enum Error {
UpkeepRunning,
FailedToSpawnUpkeepThread(io::Error),
}
impl fmt::Display for Error {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
Error::UpkeepRunning => write!(f, "upkeep thread already running"),
Error::FailedToSpawnUpkeepThread(e) => {
write!(f, "failed to spawn upkeep thread: {}", e)
}
}
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::UpkeepRunning => None,
Self::FailedToSpawnUpkeepThread(e) => Some(e),
}
}
}
impl Upkeep {
pub fn new(interval: Duration) -> Upkeep {
Self::new_with_clock(interval, Clock::new())
}
pub fn new_with_clock(interval: Duration, clock: Clock) -> Upkeep {
Upkeep { interval, clock }
}
pub fn start(self) -> Result<Handle, Error> {
let _ = GLOBAL_UPKEEP_RUNNING
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.map_err(|_| Error::UpkeepRunning)?;
let interval = self.interval;
let clock = self.clock;
set_recent(clock.now());
let done = Arc::new(AtomicBool::new(false));
let their_done = done.clone();
let result = thread::Builder::new()
.name("quanta-upkeep".to_string())
.spawn(move || {
while !their_done.load(Ordering::Acquire) {
set_recent(clock.now());
thread::sleep(interval);
}
GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
})
.map_err(Error::FailedToSpawnUpkeepThread);
if result.is_err() {
GLOBAL_UPKEEP_RUNNING.store(false, Ordering::SeqCst);
}
let handle = result?;
Ok(Handle {
done,
handle: Some(handle),
})
}
}
impl Drop for Handle {
fn drop(&mut self) {
self.done.store(true, Ordering::Release);
if let Some(handle) = self.handle.take() {
let _result = handle
.join()
.map_err(|_| io::Error::new(io::ErrorKind::Other, "failed to stop upkeep thread"));
}
}
}
#[cfg(test)]
mod tests {
use super::Upkeep;
use std::time::Duration;
#[test]
#[cfg_attr(target_arch = "wasm32", ignore)] fn test_spawning_second_upkeep() {
let first = Upkeep::new(Duration::from_millis(250)).start();
let second = Upkeep::new(Duration::from_millis(250))
.start()
.map_err(|e| e.to_string());
assert!(first.is_ok());
let second_err = second.expect_err("second upkeep should be error, got handle");
assert_eq!(second_err, "upkeep thread already running");
}
}