wnf 0.3.0

Safe bindings for the Windows Notification Facility
Documentation
//! Methods for synchronously waiting for state updates
//!
//! This module only adds inherent impls to [`OwnedState<T>`] and [`BorrowedState<'_, T>`](BorrowedState).

#![deny(unsafe_code)]

use std::borrow::Borrow;
use std::io::{self, ErrorKind};
use std::sync::{Arc, Condvar, Mutex};
use std::time::Duration;

use crate::data::OpaqueData;
use crate::predicate::{ChangedPredicate, Predicate, PredicateStage};
use crate::read::Read;
use crate::state::{BorrowedState, OwnedState, RawState};
use crate::subscribe::{DataAccessor, SeenChangeStamp};

impl<T> OwnedState<T>
where
    T: ?Sized,
{
    /// Waits until this state is updated
    ///
    /// This waits for *any* update to the state regardless of the value, even if the value is the same as the previous
    /// one. In order to wait until the state data satisfy a certain condition, use
    /// [`wait_until_blocking`](OwnedState::wait_until_blocking).
    ///
    /// Use this method if you want to wait for a state update *once*. In order to execute some logic on every state
    /// update, use the [`subscribe`](OwnedState::subscribe) method.
    ///
    /// This is a blocking method. If you are in an async context, use [`wait_async`](OwnedState::wait_async).
    ///
    /// # Errors
    /// Returns an error if querying, subscribing to or unsubscribing from the state fails or if the timeout has
    /// elapsed. In the latter case, [`io::Error::kind`] returns [`ErrorKind::TimedOut`].
    pub fn wait_blocking(&self, timeout: Duration) -> io::Result<()> {
        self.raw.wait_blocking(timeout)
    }
}

impl<T> OwnedState<T>
where
    T: Read<T>,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data
    ///
    /// This returns immediately if the current data already satisfy the predicate. Otherwise, it waits until the state
    /// is updated with data that satisfy the predicate. If you want to unconditionally wait until the state is updated,
    /// use [`wait_blocking`](OwnedState::wait_blocking).
    ///
    /// This returns the data for which the predicate returned `true`, causing the wait to finish. It produces an owned
    /// `T` on the stack and hence requires `T: Sized`. In order to produce a `Box<T>` for `T: ?Sized`, use the
    /// [`wait_until_boxed_blocking`](OwnedState::wait_until_boxed_blocking) method.
    ///
    /// For example, to wait until the value of a state reaches a given minimum:
    /// ```
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// use std::sync::Arc;
    /// use std::time::Duration;
    /// use std::{io, thread};
    ///
    /// use wnf::{AsState, OwnedState};
    ///
    /// fn wait_until_at_least<S>(state: S, min_value: u32) -> io::Result<u32>
    /// where
    ///     S: AsState<Data = u32>,
    /// {
    ///     state
    ///         .as_state()
    ///         .wait_until_blocking(|value| *value >= min_value, Duration::MAX)
    /// }
    ///
    /// let state = Arc::new(OwnedState::create_temporary()?);
    /// state.set(&0)?;
    ///
    /// {
    ///     let state = Arc::clone(&state);
    ///     thread::spawn(move || loop {
    ///         state.apply(|value| value + 1).unwrap();
    ///         thread::sleep(Duration::from_millis(10));
    ///     });
    /// }
    ///
    /// let value = wait_until_at_least(&state, 10)?;
    /// assert!(value >= 10);
    /// # Ok(()) }
    /// ```
    ///
    /// This is a blocking method. If you are in an async context, use
    /// [`wait_until_async`](OwnedState::wait_until_async).
    ///
    /// # Errors
    /// Returns an error if querying, subscribing to or unsubscribing from the state fails or if the timeout has
    /// elapsed. In the latter case, [`io::Error::kind`] returns [`ErrorKind::TimedOut`].
    pub fn wait_until_blocking<F>(&self, predicate: F, timeout: Duration) -> io::Result<T>
    where
        F: FnMut(&T) -> bool,
    {
        self.raw.wait_until_blocking(predicate, timeout)
    }
}

impl<T> OwnedState<T>
where
    T: Read<Box<T>> + ?Sized,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data as a box
    ///
    /// This returns immediately if the current data already satisfy the predicate. Otherwise, it waits until the state
    /// is updated with data that satisfy the predicate. If you want to unconditionally wait until the state is updated,
    /// use [`wait_blocking`](OwnedState::wait_blocking).
    ///
    /// This returns the data for which the predicate returned `true`, causing the wait to finish. It produces a
    /// [`Box<T>`]. In order to produce an owned `T` on the stack (requiring `T: Sized`), use the
    /// [`wait_until_blocking`](OwnedState::wait_until_blocking) method.
    ///
    /// For example, to wait until the length of a slice reaches a given minimum:
    /// ```
    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
    /// use std::sync::Arc;
    /// use std::time::Duration;
    /// use std::{io, thread};
    ///
    /// use wnf::{AsState, OwnedState};
    ///
    /// fn wait_until_len_at_least<S>(state: S, min_len: usize) -> io::Result<usize>
    /// where
    ///     S: AsState<Data = [u32]>,
    /// {
    ///     state
    ///         .as_state()
    ///         .wait_until_boxed_blocking(|slice| slice.len() >= min_len, Duration::MAX)
    ///         .map(|slice| slice.len())
    /// }
    ///
    /// let state = Arc::new(OwnedState::<[u32]>::create_temporary()?);
    /// state.set(&[])?;
    ///
    /// {
    ///     let state = Arc::clone(&state);
    ///     thread::spawn(move || loop {
    ///         state
    ///             .apply_boxed(|slice| {
    ///                 let mut vec = slice.into_vec();
    ///                 vec.push(0);
    ///                 vec
    ///             })
    ///             .unwrap();
    ///
    ///         thread::sleep(Duration::from_millis(10));
    ///     });
    /// }
    ///
    /// let len = wait_until_len_at_least(&state, 10)?;
    /// assert!(len >= 10);
    /// # Ok(()) }
    /// ```
    ///
    /// This is a blocking method. If you are in an async context, use
    /// [`wait_until_boxed_async`](OwnedState::wait_until_boxed_async).
    ///
    /// # Errors
    /// Returns an error if querying, subscribing to or unsubscribing from the state fails or if the timeout has
    /// elapsed. In the latter case, [`io::Error::kind`] returns [`ErrorKind::TimedOut`].
    pub fn wait_until_boxed_blocking<F>(&self, predicate: F, timeout: Duration) -> io::Result<Box<T>>
    where
        F: FnMut(&T) -> bool,
    {
        self.raw.wait_until_boxed_blocking(predicate, timeout)
    }
}

impl<T> BorrowedState<'_, T>
where
    T: ?Sized,
{
    /// Waits until this state is updated
    ///
    /// See [`OwnedState::wait_blocking`]
    pub fn wait_blocking(self, timeout: Duration) -> io::Result<()> {
        self.raw.wait_blocking(timeout)
    }
}

impl<T> BorrowedState<'_, T>
where
    T: Read<T>,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data
    ///
    /// See [`OwnedState::wait_until_blocking`]
    pub fn wait_until_blocking<F>(self, predicate: F, timeout: Duration) -> io::Result<T>
    where
        F: FnMut(&T) -> bool,
    {
        self.raw.wait_until_blocking(predicate, timeout)
    }
}

impl<T> BorrowedState<'_, T>
where
    T: Read<Box<T>> + ?Sized,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data as a box
    ///
    /// See [`OwnedState::wait_until_boxed_blocking`]
    pub fn wait_until_boxed_blocking<F>(self, predicate: F, timeout: Duration) -> io::Result<Box<T>>
    where
        F: FnMut(&T) -> bool,
    {
        self.raw.wait_until_boxed_blocking(predicate, timeout)
    }
}

impl<T> RawState<T>
where
    T: ?Sized,
{
    /// Waits until this state is updated
    fn wait_blocking(self, timeout: Duration) -> io::Result<()> {
        let _: OpaqueData = self.cast().wait_until_blocking_internal(ChangedPredicate, timeout)?;
        Ok(())
    }
}

impl<T> RawState<T>
where
    T: Read<T>,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data
    fn wait_until_blocking<F>(self, predicate: F, timeout: Duration) -> io::Result<T>
    where
        F: FnMut(&T) -> bool,
    {
        self.wait_until_blocking_internal(predicate, timeout)
    }
}

impl<T> RawState<T>
where
    T: Read<Box<T>> + ?Sized,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data as a box
    fn wait_until_boxed_blocking<F>(self, predicate: F, timeout: Duration) -> io::Result<Box<T>>
    where
        F: FnMut(&T) -> bool,
    {
        self.wait_until_blocking_internal(predicate, timeout)
    }
}

impl<T> RawState<T>
where
    T: ?Sized,
{
    /// Waits until the data of this state satisfy a given predicate, returning the data as a value of type `D`
    ///
    /// The predicate is called once with [`PredicateStage::Initial`], then again with [`PredicateStage::Changed`] on
    /// every state update.
    ///
    /// If `T: Sized`, then `D` can be either `T` or `Box<T>`.
    /// If `T: !Sized`, then `D` must be `Box<T>`.
    fn wait_until_blocking_internal<D, F>(self, mut predicate: F, timeout: Duration) -> io::Result<D>
    where
        D: Borrow<T> + Send + 'static,
        F: Predicate<T>,
        T: Read<D>,
    {
        let (data, change_stamp) = self.query_as()?.into_data_change_stamp();

        if predicate.check(data.borrow(), PredicateStage::Initial) {
            return Ok(data);
        }

        let pair = Arc::new((Mutex::new(None), Condvar::new()));

        let subscription = {
            let pair = Arc::clone(&pair);

            self.subscribe(
                move |accessor: DataAccessor<'_, _>| {
                    let (mutex, condvar) = &*pair;
                    *mutex.lock().unwrap() = Some(accessor.get_as());
                    condvar.notify_one();
                },
                SeenChangeStamp::Value(change_stamp),
            )?
        };

        let (mutex, condvar) = &*pair;
        let (mut guard, timeout_result) = condvar
            .wait_timeout_while(mutex.lock().unwrap(), timeout, |result| match result.as_ref() {
                Some(Ok(data)) => !predicate.check(data.borrow(), PredicateStage::Changed),
                Some(Err(..)) => false,
                None => true,
            })
            .unwrap();

        subscription.unsubscribe()?;

        if timeout_result.timed_out() {
            Err(io::Error::new(
                ErrorKind::TimedOut,
                "waiting for state update timed out",
            ))
        } else {
            guard.take().unwrap()
        }
    }
}