ringbuf_blocking/
sync.rs

1use core::time::Duration;
2#[cfg(feature = "std")]
3use std::{
4    mem::replace,
5    sync::{Condvar, Mutex},
6};
7
8pub const NO_WAIT: Option<Duration> = Some(Duration::ZERO);
9pub const FOREVER: Option<Duration> = None;
10
11/// Elapsed time counter.
12pub trait Instant {
13    fn now() -> Self;
14    fn elapsed(&self) -> Duration;
15}
16
17/// Binary semaphore.
18pub trait Semaphore: Default {
19    type Instant: Instant;
20
21    /// Increment semaphore.
22    ///
23    /// Does nothing if already given.
24    fn give(&self);
25
26    /// Try decrement semaphore.
27    ///
28    /// Returns previous value.
29    ///
30    /// Does nothing if already taken.
31    fn try_take(&self) -> bool;
32
33    /// Wait for semaphore to be given and take it.
34    ///
35    /// Returns:
36    /// + on success - `true`,
37    /// + on timeout - `false`.
38    fn take(&self, timeout: Option<Duration>) -> bool;
39
40    fn take_iter(&self, timeout: Option<Duration>) -> TakeIter<Self> {
41        TakeIter {
42            reset: false,
43            semaphore: self,
44            timeout_iter: TimeoutIter::new(timeout),
45        }
46    }
47}
48
49#[cfg(feature = "std")]
50pub use std::time::Instant as StdInstant;
51
52#[cfg(feature = "std")]
53impl Instant for StdInstant {
54    fn now() -> Self {
55        StdInstant::now()
56    }
57    fn elapsed(&self) -> Duration {
58        StdInstant::elapsed(self)
59    }
60}
61
62#[cfg(feature = "std")]
63#[derive(Default)]
64pub struct StdSemaphore {
65    condvar: Condvar,
66    mutex: Mutex<bool>,
67}
68
69#[cfg(feature = "std")]
70impl Semaphore for StdSemaphore {
71    type Instant = StdInstant;
72
73    fn give(&self) {
74        let mut guard = self.mutex.lock().unwrap();
75        *guard = true;
76        self.condvar.notify_one();
77    }
78
79    fn try_take(&self) -> bool {
80        replace(&mut self.mutex.lock().unwrap(), false)
81    }
82    fn take(&self, timeout: Option<Duration>) -> bool {
83        let mut guard = self.mutex.lock().unwrap();
84        for timeout in TimeoutIter::<Self::Instant>::new(timeout) {
85            if replace(&mut guard, false) {
86                return true;
87            }
88            match timeout {
89                Some(t) => {
90                    let r;
91                    (guard, r) = self.condvar.wait_timeout(guard, t).unwrap();
92                    if r.timed_out() {
93                        break;
94                    }
95                }
96                None => guard = self.condvar.wait(guard).unwrap(),
97            };
98        }
99        replace(&mut guard, false)
100    }
101}
102
103#[derive(Clone, Debug)]
104pub struct TimeoutIter<I: Instant> {
105    start: I,
106    timeout: Option<Duration>,
107}
108
109impl<I: Instant> TimeoutIter<I> {
110    pub fn new(timeout: Option<Duration>) -> Self {
111        Self { start: I::now(), timeout }
112    }
113}
114
115impl<I: Instant> Iterator for TimeoutIter<I> {
116    type Item = Option<Duration>;
117    fn next(&mut self) -> Option<Self::Item> {
118        match self.timeout {
119            Some(dur) => {
120                let elapsed = self.start.elapsed();
121                if dur > elapsed {
122                    Some(Some(dur - elapsed))
123                } else {
124                    None
125                }
126            }
127            None => Some(None),
128        }
129    }
130}
131
132pub struct TakeIter<'a, X: Semaphore> {
133    reset: bool,
134    semaphore: &'a X,
135    timeout_iter: TimeoutIter<X::Instant>,
136}
137
138impl<X: Semaphore> Iterator for TakeIter<'_, X> {
139    type Item = ();
140    fn next(&mut self) -> Option<Self::Item> {
141        if self.reset {
142            self.reset = false;
143            self.semaphore.try_take();
144            Some(())
145        } else if self.semaphore.take(self.timeout_iter.next()?) {
146            Some(())
147        } else {
148            None
149        }
150    }
151}
152
153impl<X: Semaphore> TakeIter<'_, X> {
154    pub fn reset(mut self) -> Self {
155        self.reset = true;
156        self
157    }
158}