sync_wait_object/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{time, time::Duration, ops::Deref, sync::{Arc, Condvar, Mutex, MutexGuard}, mem};
4use std::ops::DerefMut;
5
6#[cfg(windows)]
7pub mod windows;
8
9// ------------------------------ DATA TYPES ----------------------------------
10#[derive(Debug, PartialEq)]
11pub enum WaitObjectError {
12    /// OS error code with its description. This error code is only when using APIs based on OS.
13    OsError(isize, String),
14
15    /// Meaning a sync object gets broken (or poisoned) due to panic!()
16    SynchronizationBroken,
17
18    /// Wait is timed out
19    Timeout
20}
21
22pub type Result<T> = std::result::Result<T, WaitObjectError>;
23
24/// Create a wait event object of any type T. To use this wait object in multi-threaded scenario, just clone the object and distribute it.
25///
26/// This wait object is just a wrapper of Mutex and Condvar combination with the suggested pattern (from Rust document) for waiting a value.
27///
28/// There are two ways to wait. The first is just to want until an expected value.
29///
30/// ```rust, no_run
31/// # use sync_wait_object::WaitEvent;
32/// use std::thread;
33/// let wait3 = WaitEvent::new_init(0);
34/// let mut wait_handle = wait3.clone();
35///
36/// thread::spawn(move || {
37///     for i in 1..=3 {
38///         wait_handle.set_state(i).unwrap();
39///     }
40/// });
41///
42/// let timeout = std::time::Duration::from_secs(1);
43/// let r#final = *wait3.wait(Some(timeout), |i| *i == 3).unwrap();
44/// let current = *wait3.value().unwrap();
45/// assert_eq!(r#final, 3);
46/// assert_eq!(current, 3);
47/// ```
48///
49/// The second is to wait and then reset the value to a desired state.
50/// ```rust, no_run
51/// # use sync_wait_object::WaitEvent;
52/// use std::thread;
53/// let wait3 = WaitEvent::new_init(0);
54/// let mut wait_handle = wait3.clone();
55///
56/// thread::spawn(move || {
57///     for i in 1..=3 {
58///         wait_handle.set_state(i).unwrap();
59///     }
60/// });
61///
62/// let timeout = std::time::Duration::from_secs(1);
63/// let r#final = wait3.wait_reset(Some(timeout), || 1, |i| *i == 3).unwrap();
64/// let current = *wait3.value().unwrap();
65/// assert_eq!(r#final, 3);
66/// assert_eq!(current, 1);
67/// ```
68///
69#[derive(Clone)]
70pub struct WaitEvent<T>(Arc<(Mutex<T>, Condvar)>);
71
72/// Wrapper of [`WaitEvent`] of type `bool`, which focuses on waiting for `true` without resetting.
73#[derive(Clone)]
74pub struct ManualResetEvent(WaitEvent<bool>);
75
76/// Wrapper of [`WaitEvent`] of type `bool`, which focuses on waiting for `true` with automatic reset to `false`.
77#[derive(Clone)]
78pub struct AutoResetEvent(WaitEvent<bool>);
79
80// Boolean signal with ability to wait and set state.
81pub trait SignalWaitable {
82    fn wait_until_set(&self) -> Result<()>;
83    fn wait(&self, timeout: Duration) -> Result<()>;
84    fn set(&mut self) -> Result<()>;
85    fn reset(&mut self) -> Result<()>;
86}
87
88// ------------------------------ IMPLEMENTATIONS ------------------------------
89impl<T> WaitEvent<T> {
90    #[inline]
91    pub fn new_init(initial_state: T) -> Self {
92        Self(Arc::new((Mutex::new(initial_state), Condvar::new())))
93    }
94
95    pub fn value(&self) -> Result<MutexGuard<T>> {
96        self.0.0.lock().map_err(|e| e.into())
97    }
98
99    /// Wait until the `checker` returns true, or timed-out from `timeout`.
100    ///
101    /// # Arguments
102    ///
103    /// * `timeout` - Maximum wait time
104    /// * `checker` - Checker function, once it returns `true`, the wait ends
105    pub fn wait(&self, timeout: Option<Duration>, checker: impl FnMut(&T) -> bool) -> Result<MutexGuard<T>> {
106        match timeout {
107            Some(_) => self.wait_with_waiter(timeout, checker),
108            None => self.wait_with_waiter(timeout, checker)
109        }
110    }
111
112    /// Wait until the `checker` returns true, or timed-out from `timeout`. If the wait ends from `checker` condition, the interval value is reset by `reset`.
113    ///
114    /// # Arguments
115    ///
116    /// * `timeout` - Maximum wait time
117    /// * `reset` - Function that provides a reset value
118    /// * `checker` - Checker function, once it returns `true`, the wait ends
119    ///
120    /// # Examples
121    ///
122    /// ```rust
123    /// # use std::{thread, time::Duration};
124    /// use sync_wait_object::{WaitEvent, WaitObjectError};
125    ///
126    /// let wait3 = WaitEvent::new_init(0);
127    /// let mut wait_handle = wait3.clone();
128    ///
129    /// thread::spawn(move || {
130    ///     for i in 1..=3 {
131    ///         thread::sleep(Duration::from_millis(50));
132    ///         wait_handle.set_state(i).unwrap();
133    ///     }
134    /// });
135    ///
136    /// let timeout = Duration::from_millis(250);
137    /// let r#final = wait3.wait_reset(Some(timeout), || 0, |i| *i == 5);
138    /// let current = *wait3.value().unwrap();
139    /// assert_eq!(r#final, Err(WaitObjectError::Timeout));
140    /// assert_eq!(current, 3);
141    /// ```
142    pub fn wait_reset(&self, timeout: Option<Duration>, reset: impl FnMut() -> T, checker: impl FnMut(&T) -> bool) -> Result<T> {
143        match timeout {
144            Some(_) => self.wait_and_reset_with_waiter(timeout, checker, reset),
145            None => self.wait_and_reset_with_waiter(timeout, checker, reset)
146        }
147    }
148
149    pub fn wait_with_waiter(&self, timeout: Option<Duration>, mut checker: impl FnMut(&T) -> bool) -> Result<MutexGuard<T>> {
150        let (lock, cond) = self.0.deref();
151        let mut state = lock.lock()?;
152        let waiter = Self::create_waiter(timeout);
153        let mut continue_wait = waiter();
154        let mut pass = checker(&*state);
155        while continue_wait && !pass {
156            state = match timeout {
157                Some(t) => {
158                    let (g, _) = cond.wait_timeout(state, t)?;
159                    g
160                },
161                None => cond.wait(state)?
162            };
163            continue_wait = waiter();
164            pass = checker(&*state);
165        }
166        if pass { Ok(state) }
167        else { Err(WaitObjectError::Timeout) }
168    }
169
170    pub fn wait_and_reset_with_waiter(&self, timeout: Option<Duration>, checker: impl FnMut(&T) -> bool, mut reset: impl FnMut() -> T) -> Result<T> {
171        let state = self.wait_with_waiter(timeout, checker);
172        state.map(|mut g| mem::replace(g.deref_mut(), reset()))
173    }
174
175    /// Synchronously change state of WaitObject by value
176    pub fn set_state(&mut self, new_state: T) -> Result<()> {
177        let (lock, cond) = self.0.deref();
178        let mut state = lock.lock()?;
179        *state = new_state;
180        cond.notify_all();
181        Ok(())
182    }
183
184    /// Synchronously change state of WaitObject by a function's return value
185    ///
186    /// # Example
187    ///
188    /// ```rust
189    /// # use std::{time::Duration, thread};
190    /// use sync_wait_object::WaitEvent;
191    ///
192    /// let wait = WaitEvent::new_init(0);
193    /// let mut w1 = wait.clone();
194    /// let mut w2 = wait.clone();
195    /// let mut w3 = wait.clone();
196    ///
197    /// thread::spawn(move || w1.set_state_func(|v| v + 1));
198    /// thread::spawn(move || w2.set_state_func(|v| v + 1));
199    /// thread::spawn(move || w3.set_state_func(|v| v + 1));
200    ///
201    /// let result = *wait.wait(Some(Duration::from_millis(200)), |v| *v == 3).unwrap();
202    /// assert_eq!(result, 3);
203    /// ```
204    pub fn set_state_func<F>(&mut self, setter: F) -> Result<()>
205    where F: FnOnce(&T) -> T
206    {
207        let (lock, cond) = self.0.deref();
208        let mut state = lock.lock()?;
209        *state = setter(&*state);
210        cond.notify_all();
211        Ok(())
212    }
213
214    fn create_waiter(timeout: Option<Duration>) -> impl Fn() -> bool {
215        let start = time::Instant::now();
216        move || {
217            match timeout {
218                Some(t) => (time::Instant::now() - start) < t,
219                None => true
220            }
221        }
222    }
223}
224
225impl ManualResetEvent {
226    #[inline]
227    pub fn new() -> Self { Self::new_init(false) }
228
229    #[inline]
230    pub fn new_init(initial_state: bool) -> Self {
231        Self(WaitEvent::new_init(initial_state))
232    }
233}
234
235impl SignalWaitable for ManualResetEvent {
236    #[inline] fn wait_until_set(&self) -> Result<()> { self.0.wait(None, |v| *v).map(|_| ()) }
237    #[inline] fn wait(&self, timeout: Duration) -> Result<()> { self.0.wait(Some(timeout), |v| *v).map(|_| ()) }
238    #[inline] fn set(&mut self) -> Result<()> {
239        self.0.set_state(true)
240    }
241    #[inline] fn reset(&mut self) -> Result<()> {
242        self.0.set_state(false)
243    }
244}
245
246impl AutoResetEvent {
247    #[inline] pub fn new() -> Self { Self::new_init(false) }
248    #[inline] pub fn new_init(initial_state: bool) -> Self { Self(WaitEvent::new_init(initial_state)) }
249}
250
251impl SignalWaitable for AutoResetEvent {
252    #[inline] fn wait_until_set(&self) -> Result<()> { self.0.wait_reset(None, || false, |v| *v).map(|_| ()) }
253    #[inline] fn wait(&self, timeout: Duration) -> Result<()> { self.0.wait_reset(Some(timeout), || false, |v| *v).map(|_| ()) }
254    #[inline] fn set(&mut self) -> Result<()> {
255        self.0.set_state(true)
256    }
257    #[inline] fn reset(&mut self) -> Result<()> {
258        self.0.set_state(false)
259    }
260}
261
262impl<T> From<std::sync::PoisonError<T>> for WaitObjectError {
263    fn from(_value: std::sync::PoisonError<T>) -> Self {
264        Self::SynchronizationBroken
265    }
266}
267
268impl From<WaitEvent<bool>> for ManualResetEvent {
269    fn from(value: WaitEvent<bool>) -> Self {
270                                          Self(value)
271                                                     }
272}
273
274impl From<ManualResetEvent> for WaitEvent<bool> {
275    fn from(value: ManualResetEvent) -> Self {
276                                           value.0
277                                                  }
278}
279
280impl From<WaitEvent<bool>> for AutoResetEvent {
281    fn from(value: WaitEvent<bool>) -> Self {
282                                          Self(value)
283                                                     }
284}
285
286impl From<AutoResetEvent> for WaitEvent<bool> {
287    fn from(value: AutoResetEvent) -> Self {
288                                         value.0
289                                                }
290}