sync_wait_object/lib.rs
1#![doc = include_str!("../README.md")]
2
3use std::{time, time::Duration, ops::Deref, sync::{Arc, Condvar, Mutex, MutexGuard}, mem};
4use std::ops::DerefMut;
5
6#[cfg(windows)]
7pub mod windows;
8
9// ------------------------------ DATA TYPES ----------------------------------
10#[derive(Debug, PartialEq)]
11pub enum WaitObjectError {
12 /// OS error code with its description. This error code is only when using APIs based on OS.
13 OsError(isize, String),
14
15 /// Meaning a sync object gets broken (or poisoned) due to panic!()
16 SynchronizationBroken,
17
18 /// Wait is timed out
19 Timeout
20}
21
22pub type Result<T> = std::result::Result<T, WaitObjectError>;
23
24/// Create a wait event object of any type T. To use this wait object in multi-threaded scenario, just clone the object and distribute it.
25///
26/// This wait object is just a wrapper of Mutex and Condvar combination with the suggested pattern (from Rust document) for waiting a value.
27///
28/// There are two ways to wait. The first is just to want until an expected value.
29///
30/// ```rust, no_run
31/// # use sync_wait_object::WaitEvent;
32/// use std::thread;
33/// let wait3 = WaitEvent::new_init(0);
34/// let mut wait_handle = wait3.clone();
35///
36/// thread::spawn(move || {
37/// for i in 1..=3 {
38/// wait_handle.set_state(i).unwrap();
39/// }
40/// });
41///
42/// let timeout = std::time::Duration::from_secs(1);
43/// let r#final = *wait3.wait(Some(timeout), |i| *i == 3).unwrap();
44/// let current = *wait3.value().unwrap();
45/// assert_eq!(r#final, 3);
46/// assert_eq!(current, 3);
47/// ```
48///
49/// The second is to wait and then reset the value to a desired state.
50/// ```rust, no_run
51/// # use sync_wait_object::WaitEvent;
52/// use std::thread;
53/// let wait3 = WaitEvent::new_init(0);
54/// let mut wait_handle = wait3.clone();
55///
56/// thread::spawn(move || {
57/// for i in 1..=3 {
58/// wait_handle.set_state(i).unwrap();
59/// }
60/// });
61///
62/// let timeout = std::time::Duration::from_secs(1);
63/// let r#final = wait3.wait_reset(Some(timeout), || 1, |i| *i == 3).unwrap();
64/// let current = *wait3.value().unwrap();
65/// assert_eq!(r#final, 3);
66/// assert_eq!(current, 1);
67/// ```
68///
69#[derive(Clone)]
70pub struct WaitEvent<T>(Arc<(Mutex<T>, Condvar)>);
71
72/// Wrapper of [`WaitEvent`] of type `bool`, which focuses on waiting for `true` without resetting.
73#[derive(Clone)]
74pub struct ManualResetEvent(WaitEvent<bool>);
75
76/// Wrapper of [`WaitEvent`] of type `bool`, which focuses on waiting for `true` with automatic reset to `false`.
77#[derive(Clone)]
78pub struct AutoResetEvent(WaitEvent<bool>);
79
80// Boolean signal with ability to wait and set state.
81pub trait SignalWaitable {
82 fn wait_until_set(&self) -> Result<()>;
83 fn wait(&self, timeout: Duration) -> Result<()>;
84 fn set(&mut self) -> Result<()>;
85 fn reset(&mut self) -> Result<()>;
86}
87
88// ------------------------------ IMPLEMENTATIONS ------------------------------
89impl<T> WaitEvent<T> {
90 #[inline]
91 pub fn new_init(initial_state: T) -> Self {
92 Self(Arc::new((Mutex::new(initial_state), Condvar::new())))
93 }
94
95 pub fn value(&self) -> Result<MutexGuard<T>> {
96 self.0.0.lock().map_err(|e| e.into())
97 }
98
99 /// Wait until the `checker` returns true, or timed-out from `timeout`.
100 ///
101 /// # Arguments
102 ///
103 /// * `timeout` - Maximum wait time
104 /// * `checker` - Checker function, once it returns `true`, the wait ends
105 pub fn wait(&self, timeout: Option<Duration>, checker: impl FnMut(&T) -> bool) -> Result<MutexGuard<T>> {
106 match timeout {
107 Some(_) => self.wait_with_waiter(timeout, checker),
108 None => self.wait_with_waiter(timeout, checker)
109 }
110 }
111
112 /// Wait until the `checker` returns true, or timed-out from `timeout`. If the wait ends from `checker` condition, the interval value is reset by `reset`.
113 ///
114 /// # Arguments
115 ///
116 /// * `timeout` - Maximum wait time
117 /// * `reset` - Function that provides a reset value
118 /// * `checker` - Checker function, once it returns `true`, the wait ends
119 ///
120 /// # Examples
121 ///
122 /// ```rust
123 /// # use std::{thread, time::Duration};
124 /// use sync_wait_object::{WaitEvent, WaitObjectError};
125 ///
126 /// let wait3 = WaitEvent::new_init(0);
127 /// let mut wait_handle = wait3.clone();
128 ///
129 /// thread::spawn(move || {
130 /// for i in 1..=3 {
131 /// thread::sleep(Duration::from_millis(50));
132 /// wait_handle.set_state(i).unwrap();
133 /// }
134 /// });
135 ///
136 /// let timeout = Duration::from_millis(250);
137 /// let r#final = wait3.wait_reset(Some(timeout), || 0, |i| *i == 5);
138 /// let current = *wait3.value().unwrap();
139 /// assert_eq!(r#final, Err(WaitObjectError::Timeout));
140 /// assert_eq!(current, 3);
141 /// ```
142 pub fn wait_reset(&self, timeout: Option<Duration>, reset: impl FnMut() -> T, checker: impl FnMut(&T) -> bool) -> Result<T> {
143 match timeout {
144 Some(_) => self.wait_and_reset_with_waiter(timeout, checker, reset),
145 None => self.wait_and_reset_with_waiter(timeout, checker, reset)
146 }
147 }
148
149 pub fn wait_with_waiter(&self, timeout: Option<Duration>, mut checker: impl FnMut(&T) -> bool) -> Result<MutexGuard<T>> {
150 let (lock, cond) = self.0.deref();
151 let mut state = lock.lock()?;
152 let waiter = Self::create_waiter(timeout);
153 let mut continue_wait = waiter();
154 let mut pass = checker(&*state);
155 while continue_wait && !pass {
156 state = match timeout {
157 Some(t) => {
158 let (g, _) = cond.wait_timeout(state, t)?;
159 g
160 },
161 None => cond.wait(state)?
162 };
163 continue_wait = waiter();
164 pass = checker(&*state);
165 }
166 if pass { Ok(state) }
167 else { Err(WaitObjectError::Timeout) }
168 }
169
170 pub fn wait_and_reset_with_waiter(&self, timeout: Option<Duration>, checker: impl FnMut(&T) -> bool, mut reset: impl FnMut() -> T) -> Result<T> {
171 let state = self.wait_with_waiter(timeout, checker);
172 state.map(|mut g| mem::replace(g.deref_mut(), reset()))
173 }
174
175 /// Synchronously change state of WaitObject by value
176 pub fn set_state(&mut self, new_state: T) -> Result<()> {
177 let (lock, cond) = self.0.deref();
178 let mut state = lock.lock()?;
179 *state = new_state;
180 cond.notify_all();
181 Ok(())
182 }
183
184 /// Synchronously change state of WaitObject by a function's return value
185 ///
186 /// # Example
187 ///
188 /// ```rust
189 /// # use std::{time::Duration, thread};
190 /// use sync_wait_object::WaitEvent;
191 ///
192 /// let wait = WaitEvent::new_init(0);
193 /// let mut w1 = wait.clone();
194 /// let mut w2 = wait.clone();
195 /// let mut w3 = wait.clone();
196 ///
197 /// thread::spawn(move || w1.set_state_func(|v| v + 1));
198 /// thread::spawn(move || w2.set_state_func(|v| v + 1));
199 /// thread::spawn(move || w3.set_state_func(|v| v + 1));
200 ///
201 /// let result = *wait.wait(Some(Duration::from_millis(200)), |v| *v == 3).unwrap();
202 /// assert_eq!(result, 3);
203 /// ```
204 pub fn set_state_func<F>(&mut self, setter: F) -> Result<()>
205 where F: FnOnce(&T) -> T
206 {
207 let (lock, cond) = self.0.deref();
208 let mut state = lock.lock()?;
209 *state = setter(&*state);
210 cond.notify_all();
211 Ok(())
212 }
213
214 fn create_waiter(timeout: Option<Duration>) -> impl Fn() -> bool {
215 let start = time::Instant::now();
216 move || {
217 match timeout {
218 Some(t) => (time::Instant::now() - start) < t,
219 None => true
220 }
221 }
222 }
223}
224
225impl ManualResetEvent {
226 #[inline]
227 pub fn new() -> Self { Self::new_init(false) }
228
229 #[inline]
230 pub fn new_init(initial_state: bool) -> Self {
231 Self(WaitEvent::new_init(initial_state))
232 }
233}
234
235impl SignalWaitable for ManualResetEvent {
236 #[inline] fn wait_until_set(&self) -> Result<()> { self.0.wait(None, |v| *v).map(|_| ()) }
237 #[inline] fn wait(&self, timeout: Duration) -> Result<()> { self.0.wait(Some(timeout), |v| *v).map(|_| ()) }
238 #[inline] fn set(&mut self) -> Result<()> {
239 self.0.set_state(true)
240 }
241 #[inline] fn reset(&mut self) -> Result<()> {
242 self.0.set_state(false)
243 }
244}
245
246impl AutoResetEvent {
247 #[inline] pub fn new() -> Self { Self::new_init(false) }
248 #[inline] pub fn new_init(initial_state: bool) -> Self { Self(WaitEvent::new_init(initial_state)) }
249}
250
251impl SignalWaitable for AutoResetEvent {
252 #[inline] fn wait_until_set(&self) -> Result<()> { self.0.wait_reset(None, || false, |v| *v).map(|_| ()) }
253 #[inline] fn wait(&self, timeout: Duration) -> Result<()> { self.0.wait_reset(Some(timeout), || false, |v| *v).map(|_| ()) }
254 #[inline] fn set(&mut self) -> Result<()> {
255 self.0.set_state(true)
256 }
257 #[inline] fn reset(&mut self) -> Result<()> {
258 self.0.set_state(false)
259 }
260}
261
262impl<T> From<std::sync::PoisonError<T>> for WaitObjectError {
263 fn from(_value: std::sync::PoisonError<T>) -> Self {
264 Self::SynchronizationBroken
265 }
266}
267
268impl From<WaitEvent<bool>> for ManualResetEvent {
269 fn from(value: WaitEvent<bool>) -> Self {
270 Self(value)
271 }
272}
273
274impl From<ManualResetEvent> for WaitEvent<bool> {
275 fn from(value: ManualResetEvent) -> Self {
276 value.0
277 }
278}
279
280impl From<WaitEvent<bool>> for AutoResetEvent {
281 fn from(value: WaitEvent<bool>) -> Self {
282 Self(value)
283 }
284}
285
286impl From<AutoResetEvent> for WaitEvent<bool> {
287 fn from(value: AutoResetEvent) -> Self {
288 value.0
289 }
290}