win_events/waiters/
wait_first.rs1use std::collections::VecDeque;
2use std::error::Error;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use crate::events::event::{try_consume_one, AutoUnregister};
7use crate::waiters::waiter::{Signaler, Waiter};
8use crate::Event;
9
10type WaitFirst = Waiter<i32>;
11
12impl Signaler for WaitFirst {
13 fn fire(&self, pos: usize) -> Result<(), Box<dyn Error>> {
14 let lock = self.lock();
15 let mut data = match lock {
16 Ok(data) => data,
17 Err(error) => {
18 return Err(format!("Failed to fire event: {}", error).into());
19 }
20 };
21
22 *data = pos as i32;
23 self.condvar.notify_all();
24 Ok(())
25 }
26
27 fn clear(&self, _: usize) -> Result<(), Box<dyn Error>> {
28 Ok(())
29 }
30}
31
32pub fn wait_first(events: Vec<&dyn Event>, dur: Duration) -> Result<i32, Box<dyn Error>> {
65 let signaler = Arc::new(WaitFirst::new(-1));
66 let mut registered_waits: VecDeque<AutoUnregister> = VecDeque::new();
67 for (pos, event) in events.iter().enumerate() {
68 if event.handle().is_set() && try_consume_one(*event)? {
69 return Ok(pos as i32);
70 }
71
72 registered_waits.push_back(AutoUnregister::register_signaler(
73 Arc::<WaitFirst>::clone(&signaler),
74 pos,
75 *event,
76 )?);
77 }
78
79 let start = SystemTime::now();
81 let mut dur_remaining = dur;
82 let result = loop {
83 let data = signaler.lock()?;
84 let lock_result = signaler
85 .condvar
86 .wait_timeout_while(data, dur_remaining, |fired| *fired == -1);
87
88 let (mut fired, timeout) = match lock_result {
89 Ok(result) => result,
90 Err(error) => break Err(format!("Wait failed: {}", error).into()),
91 };
92
93 if timeout.timed_out() {
95 break Ok(-1);
96 }
97
98 let event = events[*fired as usize];
100 if event.handle().can_consume() && try_consume_one(event)? {
101 break Ok(*fired);
102 }
103
104 for (pos, event) in events.iter().enumerate() {
106 if *fired != pos as i32 && event.handle().can_consume() && try_consume_one(*event)? {
107 return Ok(pos as i32);
108 }
109 }
110
111 *fired = -1;
113
114 let result = start.elapsed();
116 match result {
117 Ok(remaining) => dur_remaining = remaining,
118 Err(_) => break Ok(-1),
120 }
121 };
122
123 drop(registered_waits);
124 result
125}