win_events/waiters/
wait_all.rs1use std::collections::VecDeque;
2use std::error::Error;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use crate::events::event::{try_consume_all, AutoUnregister};
7use crate::waiters::waiter::{Signaler, Waiter};
8use crate::Event;
9
10struct Positions {
11 vec: Vec<bool>,
12 remaining: usize,
13}
14
15type WaitAll = Waiter<Positions>;
16
17impl Signaler for WaitAll {
18 fn fire(&self, pos: usize) -> Result<(), Box<dyn Error>> {
19 let completed = {
20 let lock = self.lock();
21 let mut data = match lock {
22 Ok(data) => data,
23 Err(error) => {
24 return Err(format!("Failed to set event: {}", error).into());
25 }
26 };
27
28 if data.vec[pos] == true {
29 return Ok(());
30 }
31
32 data.vec[pos] = true;
33 data.remaining -= 1;
34 data.remaining == 0
35 };
36
37 if completed {
39 self.condvar.notify_all();
40 }
41
42 Ok(())
43 }
44
45 fn clear(&self, pos: usize) -> Result<(), Box<dyn Error>> {
46 let lock = self.lock();
47 let mut data = match lock {
48 Ok(data) => data,
49 Err(error) => return Err(format!("Failed to set event: {}", error).into()),
50 };
51
52 if data.vec[pos] == false {
53 return Ok(());
54 }
55
56 data.vec[pos] = false;
57 data.remaining += 1;
58 Ok(())
59 }
60}
61
62pub fn wait_all(events: Vec<&dyn Event>, dur: Duration) -> Result<bool, Box<dyn Error>> {
98 let events = {
102 let mut sorted_events = events;
103 sorted_events.sort_by_key(|e| Arc::as_ptr(e.handle()));
104 sorted_events
105 };
106
107 let signaler = Arc::new(WaitAll::new(Positions {
109 remaining: events.len(),
110 vec: vec![false; events.len()],
111 }));
112
113 let mut registered_locks = VecDeque::new();
115 for (pos, event) in events.iter().enumerate() {
116 registered_locks.push_back(AutoUnregister::register_signaler(
117 Arc::<WaitAll>::clone(&signaler),
118 pos,
119 *event,
120 )?);
121 }
122
123 let start = SystemTime::now();
124 let mut dur_remaining = dur;
125 let result = loop {
126 let lock = signaler.lock()?;
127 let lock_result = signaler
128 .condvar
129 .wait_timeout_while(lock, dur_remaining, |pos| pos.remaining > 0);
130 let (mut positions, timeout) = match lock_result {
131 Ok(result) => result,
132 Err(error) => break Err(format!("Wait failed: {}", error).into()),
133 };
134
135 if timeout.timed_out() {
137 break Ok(false);
138 }
139
140 if try_consume_all(&events)? {
142 break Ok(true);
143 }
144
145 positions.remaining = 0;
147 for (pos, event) in events.iter().enumerate() {
148 let set = event.handle().is_set();
149 if !set {
150 positions.remaining += 1
151 }
152 positions.vec[pos] = set
153 }
154
155 match start.elapsed() {
157 Ok(elapsed) => {
158 if elapsed > dur_remaining {
159 break Ok(false);
160 }
161 dur_remaining = dur - elapsed;
162 }
163 Err(_) => break Ok(false),
165 }
166 };
167
168 drop(registered_locks);
169 result
170}