use std::{
sync::{atomic::AtomicBool, Arc, Mutex},
thread::{self, JoinHandle},
time::{self, Duration, Instant},
};
use crate::expiration::ExpirationAction;
pub struct Watcher<T: PartialEq + Send + 'static> {
pub value: Arc<Mutex<T>>,
pub(crate) changed: Arc<Mutex<bool>>,
pub(crate) handle: Option<JoinHandle<()>>,
pub(crate) shutdown: Arc<Mutex<AtomicBool>>,
pub(crate) expiration_time: Option<Instant>,
}
impl<T: PartialEq + Send + Clone + 'static> Watcher<T> {
pub fn new(initial_value: T, poll_delay_ms: u64, shutdown: Arc<Mutex<AtomicBool>>) -> Self {
let value = Arc::new(Mutex::new(initial_value));
let changed = Arc::new(Mutex::new(false));
let value_watch = value.clone();
let changed_watch = changed.clone();
let shutdown = Arc::clone(&shutdown);
let shutdown2 = Arc::clone(&shutdown);
let handle = thread::spawn(move || {
let mut last_val = value_watch.lock().unwrap().clone();
loop {
{
let lock = shutdown.lock().unwrap();
if lock.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}
let current_val = value_watch.lock().unwrap().clone();
if current_val != last_val {
*changed_watch.lock().unwrap() = true;
last_val = current_val;
}
thread::sleep(Duration::from_millis(poll_delay_ms));
}
});
Self {
value,
changed,
handle: Some(handle),
shutdown: shutdown2,
expiration_time: None,
}
}
pub fn with_lifetime<
A: std::marker::Tuple + std::marker::Send + Clone + 'static,
F: Fn(A) + std::marker::Send + 'static,
>(
initial_value: T,
poll_delay_ms: u64,
shutdown: Arc<Mutex<AtomicBool>>,
lifetime: Duration,
action: ExpirationAction<T, A, F>,
) -> Self {
let value = Arc::new(Mutex::new(initial_value.clone()));
let changed = Arc::new(Mutex::new(false));
let value_watch = value.clone();
let changed_watch = changed.clone();
let expiration_time = time::Instant::now() + lifetime;
let shutdown = Arc::clone(&shutdown);
let shutdown2 = Arc::clone(&shutdown);
let handle = thread::spawn(move || {
let mut last_val = value_watch.lock().unwrap().clone();
let initial = last_val.clone();
loop {
{
let lock = shutdown.lock().unwrap();
if lock.load(std::sync::atomic::Ordering::Relaxed) {
break;
}
}
if time::Instant::now() >= expiration_time {
match action {
ExpirationAction::None => {
let lock = shutdown.lock().unwrap();
lock.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
ExpirationAction::Reset => {
let mut val = value_watch.lock().unwrap();
*val = initial.clone();
*changed_watch.lock().unwrap() = true;
let lock = shutdown.lock().unwrap();
lock.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
ExpirationAction::Shutdown => {
let lock = shutdown.lock().unwrap();
lock.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
ExpirationAction::Set(ref t) => {
let mut val = value_watch.lock().unwrap();
*val = t.clone();
*changed_watch.lock().unwrap() = true;
let lock = shutdown.lock().unwrap();
lock.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
ExpirationAction::Panic => panic!(),
ExpirationAction::PanicWMessage(m) => panic!("{}", m),
ExpirationAction::RunFunc(ref f, ref a) => {
f(a.clone());
let lock = shutdown.lock().unwrap();
lock.store(true, std::sync::atomic::Ordering::Relaxed);
break;
}
}
}
let current_val = value_watch.lock().unwrap().clone();
if current_val != last_val {
*changed_watch.lock().unwrap() = true;
last_val = current_val;
}
thread::sleep(Duration::from_millis(poll_delay_ms));
}
});
Self {
value,
changed,
handle: Some(handle),
shutdown: shutdown2,
expiration_time: Some(expiration_time),
}
}
pub fn is_expired(&self) -> bool {
self.expiration_time
.map_or(false, |expiration| time::Instant::now() >= expiration)
}
pub fn remaining_lifetime(&self) -> Option<Duration> {
self.expiration_time.map(|time| {
let remaining = time - Instant::now();
remaining.clamp(Duration::from_secs(0), Duration::from_secs(u64::MAX))
})
}
pub fn set_value(&self, new_value: T) {
let mut value = self.value.lock().unwrap();
if *value != new_value {
*value = new_value;
*self.changed.lock().unwrap() = true;
}
}
pub fn get_value(&self) -> T
where
T: Clone,
{
self.value.lock().unwrap().clone()
}
pub fn has_changed(&self) -> bool {
*self.changed.lock().unwrap()
}
pub fn reset_changed(&self) {
*self.changed.lock().unwrap() = false;
}
}
impl<T: PartialEq + Send + 'static> Watcher<T> {
pub fn shutdown_thread(&mut self) {
self.shutdown
.lock()
.unwrap()
.store(true, std::sync::atomic::Ordering::Relaxed);
}
}
impl<T: PartialEq + Send + 'static> Drop for Watcher<T> {
fn drop(&mut self) {
self.shutdown_thread();
if let Some(handle) = self.handle.take() {
handle.join().ok();
}
}
}