win_events/waiters/
wait_all.rs

1use std::collections::VecDeque;
2use std::error::Error;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use crate::events::event::{try_consume_all, AutoUnregister};
7use crate::waiters::waiter::{Signaler, Waiter};
8use crate::Event;
9
10struct Positions {
11    vec: Vec<bool>,
12    remaining: usize,
13}
14
15type WaitAll = Waiter<Positions>;
16
17impl Signaler for WaitAll {
18    fn fire(&self, pos: usize) -> Result<(), Box<dyn Error>> {
19        let completed = {
20            let lock = self.lock();
21            let mut data = match lock {
22                Ok(data) => data,
23                Err(error) => {
24                    return Err(format!("Failed to set event: {}", error).into());
25                }
26            };
27
28            if data.vec[pos] == true {
29                return Ok(());
30            }
31
32            data.vec[pos] = true;
33            data.remaining -= 1;
34            data.remaining == 0
35        };
36
37        // the last one should notify the waiting thread
38        if completed {
39            self.condvar.notify_all();
40        }
41
42        Ok(())
43    }
44
45    fn clear(&self, pos: usize) -> Result<(), Box<dyn Error>> {
46        let lock = self.lock();
47        let mut data = match lock {
48            Ok(data) => data,
49            Err(error) => return Err(format!("Failed to set event: {}", error).into()),
50        };
51
52        if data.vec[pos] == false {
53            return Ok(());
54        }
55
56        data.vec[pos] = false;
57        data.remaining += 1;
58        Ok(())
59    }
60}
61
62/// Waits for the all events to fire and returns true on success
63///
64/// # Examples
65///
66/// This example spawns a thread that will set all of the the events created after a short delay.
67///
68/// The main thread will wait on all the events to be set and return a remaining count of zero
69///
70///```
71/// use std::thread::{sleep, spawn};
72/// use std::time::Duration;
73/// use win_events::{ManualResetEvent, wait_all};
74///
75/// let evt0 = ManualResetEvent::new(false);
76/// let evt1 = ManualResetEvent::new(false);
77/// let evt2 = ManualResetEvent::new(false);
78/// let evt_inner0 = evt0.clone();
79/// let evt_inner1 = evt1.clone();
80/// let evt_inner2 = evt2.clone();
81///
82/// let worker = spawn(move || {
83///    sleep(Duration::from_millis(10));
84///    evt_inner0.set();
85///    evt_inner1.set();
86///    evt_inner2.set()
87/// });
88///
89/// let wait_result = wait_all(
90///    vec![&evt0, &evt1, &evt2],
91///    Duration::from_millis(100),
92/// ).unwrap();
93///
94/// assert_eq!(true, wait_result);
95/// worker.join().unwrap()
96///```
97pub fn wait_all(events: Vec<&dyn Event>, dur: Duration) -> Result<bool, Box<dyn Error>> {
98    // sort the events by the arc internal heap address, this will make the consume all
99    // ordered when trying to lock the wake locks for each event. This should prevent
100    // dead locks with multiple threads waiting on mutexes to be freed.
101    let events = {
102        let mut sorted_events = events;
103        sorted_events.sort_by_key(|e| Arc::as_ptr(e.handle()));
104        sorted_events
105    };
106
107    // create a waiter
108    let signaler = Arc::new(WaitAll::new(Positions {
109        remaining: events.len(),
110        vec: vec![false; events.len()],
111    }));
112
113    // Register the waiter with the events
114    let mut registered_locks = VecDeque::new();
115    for (pos, event) in events.iter().enumerate() {
116        registered_locks.push_back(AutoUnregister::register_signaler(
117            Arc::<WaitAll>::clone(&signaler),
118            pos,
119            *event,
120        )?);
121    }
122
123    let start = SystemTime::now();
124    let mut dur_remaining = dur;
125    let result = loop {
126        let lock = signaler.lock()?;
127        let lock_result = signaler
128            .condvar
129            .wait_timeout_while(lock, dur_remaining, |pos| pos.remaining > 0);
130        let (mut positions, timeout) = match lock_result {
131            Ok(result) => result,
132            Err(error) => break Err(format!("Wait failed: {}", error).into()),
133        };
134
135        // check if there are any remaining events, if there are there was a time out
136        if timeout.timed_out() {
137            break Ok(false);
138        }
139
140        // Try to consume all the events, if not recalculate the remaining values
141        if try_consume_all(&events)? {
142            break Ok(true);
143        }
144
145        // reset the wait information
146        positions.remaining = 0;
147        for (pos, event) in events.iter().enumerate() {
148            let set = event.handle().is_set();
149            if !set {
150                positions.remaining += 1
151            }
152            positions.vec[pos] = set
153        }
154
155        // Calculate the remaining time
156        match start.elapsed() {
157            Ok(elapsed) => {
158                if elapsed > dur_remaining {
159                    break Ok(false);
160                }
161                dur_remaining = dur - elapsed;
162            }
163            // The system time can change and cause this to return early... I'm sure it's fine
164            Err(_) => break Ok(false),
165        }
166    };
167
168    drop(registered_locks);
169    result
170}