win_events/waiters/
wait_first.rs

1use std::collections::VecDeque;
2use std::error::Error;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use crate::events::event::{try_consume_one, AutoUnregister};
7use crate::waiters::waiter::{Signaler, Waiter};
8use crate::Event;
9
10type WaitFirst = Waiter<i32>;
11
12impl Signaler for WaitFirst {
13    fn fire(&self, pos: usize) -> Result<(), Box<dyn Error>> {
14        let lock = self.lock();
15        let mut data = match lock {
16            Ok(data) => data,
17            Err(error) => {
18                return Err(format!("Failed to fire event: {}", error).into());
19            }
20        };
21
22        *data = pos as i32;
23        self.condvar.notify_all();
24        Ok(())
25    }
26
27    fn clear(&self, _: usize) -> Result<(), Box<dyn Error>> {
28        Ok(())
29    }
30}
31
32/// Waits for the first event to fire and returns the position of the event given
33///
34/// # Examples
35///
36/// This example spawns a thread that will set the third event created after a short delay.
37///
38/// The main thread will wait on all three events and return a position of two for the third event
39/// of the provided events
40///
41///```
42/// use std::thread::{sleep, spawn};
43/// use std::time::Duration;
44/// use win_events::{ManualResetEvent, wait_first};
45///
46/// let evt0 = ManualResetEvent::new(false);
47/// let evt1 = ManualResetEvent::new(false);
48/// let evt2 = ManualResetEvent::new(false);
49/// let evt_inner = evt2.clone();
50///
51/// let worker = spawn(move || {
52///    sleep(Duration::from_millis(10));
53///    evt_inner.set()
54/// });
55///
56/// let wait_result = wait_first(
57///    vec![&evt0, &evt1, &evt2],
58///    Duration::from_millis(100),
59/// ).unwrap();
60///
61/// assert_eq!(2, wait_result);
62/// worker.join().unwrap()
63///```
64pub fn wait_first(events: Vec<&dyn Event>, dur: Duration) -> Result<i32, Box<dyn Error>> {
65    let signaler = Arc::new(WaitFirst::new(-1));
66    let mut registered_waits: VecDeque<AutoUnregister> = VecDeque::new();
67    for (pos, event) in events.iter().enumerate() {
68        if event.handle().is_set() && try_consume_one(*event)? {
69            return Ok(pos as i32);
70        }
71
72        registered_waits.push_back(AutoUnregister::register_signaler(
73            Arc::<WaitFirst>::clone(&signaler),
74            pos,
75            *event,
76        )?);
77    }
78
79    // wait on the event to be fired
80    let start = SystemTime::now();
81    let mut dur_remaining = dur;
82    let result = loop {
83        let data = signaler.lock()?;
84        let lock_result = signaler
85            .condvar
86            .wait_timeout_while(data, dur_remaining, |fired| *fired == -1);
87
88        let (mut fired, timeout) = match lock_result {
89            Ok(result) => result,
90            Err(error) => break Err(format!("Wait failed: {}", error).into()),
91        };
92
93        // Check if it timed out
94        if timeout.timed_out() {
95            break Ok(-1);
96        }
97
98        // try to consume the event that triggered the wake
99        let event = events[*fired as usize];
100        if event.handle().can_consume() && try_consume_one(event)? {
101            break Ok(*fired);
102        }
103
104        // recheck and try to consume the other events
105        for (pos, event) in events.iter().enumerate() {
106            if *fired != pos as i32 && event.handle().can_consume() && try_consume_one(*event)? {
107                return Ok(pos as i32);
108            }
109        }
110
111        // reset fired event value
112        *fired = -1;
113
114        // Calculate the remaining time
115        let result = start.elapsed();
116        match result {
117            Ok(remaining) => dur_remaining = remaining,
118            // The system time can change and cause this to return early... I'm sure it's fine
119            Err(_) => break Ok(-1),
120        }
121    };
122
123    drop(registered_waits);
124    result
125}