utls 0.13.10

A simple utilities library for stuff I actually use sometimes, with a large focus on convenience and lack of dependencies.
Documentation
use std::{
    sync::{atomic::AtomicBool, Arc, Mutex},
    thread::{self, JoinHandle},
    time::{self, Duration, Instant},
};
use crate::expiration::ExpirationAction;

// TODO: improve actions to run upon timeout

/// A thread-safe wrapper that watches for changes in a value.
///
/// The `Watcher<T>` maintains a value of type `T` and monitors it for changes
/// in a separate thread. It provides thread-safe access to the value and
/// tracks whether the value has changed since the last check.
///
/// # Type Parameters
///
/// * `T`: The type of value to watch. Must implement `PartialEq`, `Send`, and have a static lifetime.
///
/// # Examples
///
/// ```
/// use std::sync::{Arc, Mutex, atomic::AtomicBool};
///
/// let shutdown = Arc::new(Mutex::new(AtomicBool::new(false)));
/// let watcher = Watcher::new(42, 100, shutdown.clone()); // (cloning is optional)
///
/// watcher.set_value(43);
/// assert!(watcher.has_changed());
/// ```
pub struct Watcher<T: PartialEq + Send + 'static> {
    /// The value being watched
    pub value: Arc<Mutex<T>>,
    pub(crate) changed: Arc<Mutex<bool>>,
    pub(crate) handle: Option<JoinHandle<()>>,
    pub(crate) shutdown: Arc<Mutex<AtomicBool>>,
    pub(crate) expiration_time: Option<Instant>,
}

impl<T: PartialEq + Send + Clone + 'static> Watcher<T> {
    /// Creates a new `Watcher` instance.
    ///
    /// # Parameters
    ///
    /// * `initial_value`: The initial value to watch
    /// * `poll_delay_ms`: The delay between checks for changes in milliseconds
    /// * `shutdown`: An atomic boolean wrapped in Arc<Mutex> to control the watcher thread's lifecycle
    ///
    /// # Returns
    ///
    /// A new `Watcher` instance initialized with the given value
    pub fn new(initial_value: T, poll_delay_ms: u64, shutdown: Arc<Mutex<AtomicBool>>) -> Self {
        let value = Arc::new(Mutex::new(initial_value));
        let changed = Arc::new(Mutex::new(false));

        let value_watch = value.clone();
        let changed_watch = changed.clone();

        let shutdown = Arc::clone(&shutdown);
        let shutdown2 = Arc::clone(&shutdown);

        let handle = thread::spawn(move || {
            let mut last_val = value_watch.lock().unwrap().clone();
            loop {
                {
                    let lock = shutdown.lock().unwrap();
                    if lock.load(std::sync::atomic::Ordering::Relaxed) {
                        break;
                    }
                }
                let current_val = value_watch.lock().unwrap().clone();
                if current_val != last_val {
                    *changed_watch.lock().unwrap() = true;
                    last_val = current_val;
                }
                thread::sleep(Duration::from_millis(poll_delay_ms));
            }
        });

        Self {
            value,
            changed,
            handle: Some(handle),
            shutdown: shutdown2,
            expiration_time: None,
        }
    }

    /// Creates a new Watcher instance with a specified lifetime.
    ///
    /// This constructor creates a Watcher that will expire after the specified duration.
    /// When the watcher expires, it will execute the specified expiration action.
    ///
    /// # Parameters
    ///
    /// * `initial_value`: The initial value to watch
    /// * `poll_delay_ms`: The delay between checks for changes in milliseconds
    /// * `shutdown`: An atomic boolean wrapped in Arc<Mutex> to control the watcher thread's lifecycle
    /// * `lifetime`: The duration after which the watcher will expire
    /// * `action`: The action to take when the watcher expires
    ///
    /// # Returns
    ///
    /// A new Watcher instance initialized with the given value and lifetime settings
    ///
    /// # Examples
    ///
    /// ```
    /// use std::sync::{Arc, Mutex, atomic::AtomicBool};
    /// use std::time::Duration;
    ///
    /// let shutdown = Arc::new(Mutex::new(AtomicBool::new(false)));
    /// let watcher = Watcher::with_lifetime(
    ///     42,
    ///     100,
    ///     shutdown.clone(),
    ///     Duration::from_secs(60),
    ///     ExpirationAction::Reset
    /// );
    /// ```
    pub fn with_lifetime<
        A: std::marker::Tuple + std::marker::Send + Clone + 'static,
        F: Fn(A) + std::marker::Send + 'static,
    >(
        initial_value: T,
        poll_delay_ms: u64,
        shutdown: Arc<Mutex<AtomicBool>>,
        lifetime: Duration,
        action: ExpirationAction<T, A, F>,
    ) -> Self {
        let value = Arc::new(Mutex::new(initial_value.clone()));
        let changed = Arc::new(Mutex::new(false));

        let value_watch = value.clone();
        let changed_watch = changed.clone();

        let expiration_time = time::Instant::now() + lifetime;

        let shutdown = Arc::clone(&shutdown);
        let shutdown2 = Arc::clone(&shutdown);

        let handle = thread::spawn(move || {
            let mut last_val = value_watch.lock().unwrap().clone();
            let initial = last_val.clone();

            loop {
                // Check for shutdown signal
                {
                    let lock = shutdown.lock().unwrap();
                    if lock.load(std::sync::atomic::Ordering::Relaxed) {
                        break;
                    }
                }

                // Check for expiration
                if time::Instant::now() >= expiration_time {
                    match action {
                        ExpirationAction::None => {
                            let lock = shutdown.lock().unwrap();
                            lock.store(true, std::sync::atomic::Ordering::Relaxed);
                            break;
                        }
                        ExpirationAction::Reset => {
                            let mut val = value_watch.lock().unwrap();
                            *val = initial.clone();
                            *changed_watch.lock().unwrap() = true;
                            let lock = shutdown.lock().unwrap();
                            lock.store(true, std::sync::atomic::Ordering::Relaxed);
                            break;
                        }
                        ExpirationAction::Shutdown => {
                            let lock = shutdown.lock().unwrap();
                            lock.store(true, std::sync::atomic::Ordering::Relaxed);
                            break;
                        }
                        ExpirationAction::Set(ref t) => {
                            let mut val = value_watch.lock().unwrap();
                            *val = t.clone();
                            *changed_watch.lock().unwrap() = true;
                            let lock = shutdown.lock().unwrap();
                            lock.store(true, std::sync::atomic::Ordering::Relaxed);
                            break;
                        }
                        ExpirationAction::Panic => panic!(),
                        ExpirationAction::PanicWMessage(m) => panic!("{}", m),
                        ExpirationAction::RunFunc(ref f, ref a) => {
                            f(a.clone());
                            let lock = shutdown.lock().unwrap();
                            lock.store(true, std::sync::atomic::Ordering::Relaxed);
                            break;
                        }
                    }
                }

                // Normal value change check
                let current_val = value_watch.lock().unwrap().clone();
                if current_val != last_val {
                    *changed_watch.lock().unwrap() = true;
                    last_val = current_val;
                }

                thread::sleep(Duration::from_millis(poll_delay_ms));
            }
        });

        Self {
            value,
            changed,
            handle: Some(handle),
            shutdown: shutdown2,
            expiration_time: Some(expiration_time),
        }
    }

    /// Checks if the watcher has expired
    pub fn is_expired(&self) -> bool {
        self.expiration_time
            .map_or(false, |expiration| time::Instant::now() >= expiration)
    }

    /// Returns the remaining lifetime of the watcher, if any
    pub fn remaining_lifetime(&self) -> Option<Duration> {
        self.expiration_time.map(|time| {
            let remaining = time - Instant::now();
            remaining.clamp(Duration::from_secs(0), Duration::from_secs(u64::MAX))
        })
    }

    /// Sets a new value to watch.
    ///
    /// If the new value is different from the current value,
    /// the changed flag will be set to true.
    ///
    /// # Parameters
    ///
    /// * `new_value`: The new value to set
    pub fn set_value(&self, new_value: T) {
        let mut value = self.value.lock().unwrap();
        if *value != new_value {
            *value = new_value;
            *self.changed.lock().unwrap() = true;
        }
    }

    /// Retrieves the current value.
    ///
    /// # Returns
    ///
    /// A clone of the current value
    ///
    /// # Type Constraints
    ///
    /// The type T must implement Clone
    pub fn get_value(&self) -> T
    where
        T: Clone,
    {
        self.value.lock().unwrap().clone()
    }

    /// Checks if the value has changed since the last check or reset.
    ///
    /// # Returns
    ///
    /// `true` if the value has changed, `false` otherwise
    pub fn has_changed(&self) -> bool {
        *self.changed.lock().unwrap()
    }

    /// Resets the changed flag to false.
    pub fn reset_changed(&self) {
        *self.changed.lock().unwrap() = false;
    }
}

impl<T: PartialEq + Send + 'static> Watcher<T> {
    /// Signals the watching thread to shut down.
    ///
    /// This method sets the shutdown flag to true, which will cause
    /// the watching thread to terminate on its next iteration.
    pub fn shutdown_thread(&mut self) {
        self.shutdown
            .lock()
            .unwrap()
            .store(true, std::sync::atomic::Ordering::Relaxed);
    }
}

impl<T: PartialEq + Send + 'static> Drop for Watcher<T> {
    /// Implements the drop behavior for Watcher.
    ///
    /// This ensures that the watching thread is properly shut down
    /// and joined when the Watcher is dropped.
    fn drop(&mut self) {
        self.shutdown_thread();
        if let Some(handle) = self.handle.take() {
            handle.join().ok();
        }
    }
}