open_coroutine_timer/
lib.rs

1#![deny(
2    // The following are allowed by default lints according to
3    // https://doc.rust-lang.org/rustc/lints/listing/allowed-by-default.html
4    anonymous_parameters,
5    bare_trait_objects,
6    box_pointers,
7    elided_lifetimes_in_paths,
8    missing_copy_implementations,
9    missing_debug_implementations,
10    missing_docs,
11    single_use_lifetimes,
12    trivial_casts,
13    trivial_numeric_casts,
14    unreachable_pub,
15    unsafe_code,
16    unstable_features,
17    unused_extern_crates,
18    unused_import_braces,
19    unused_qualifications,
20    unused_results,
21    variant_size_differences,
22    warnings, // treat all wanings as errors
23
24    clippy::all,
25    // clippy::restriction,
26    clippy::pedantic,
27    // clippy::nursery, // It's still under development
28    clippy::cargo,
29)]
30#![allow(
31    // Some explicitly allowed Clippy lints, must have clear reason to allow
32    clippy::blanket_clippy_restriction_lints, // allow clippy::restriction
33    clippy::implicit_return, // actually omitting the return keyword is idiomatic Rust code
34    clippy::module_name_repetitions, // repeation of module name in a struct name is not big deal
35    clippy::multiple_crate_versions, // multi-version dependency crates is not able to fix
36    clippy::panic_in_result_fn,
37    clippy::shadow_same, // Not too much bad
38    clippy::shadow_reuse, // Not too much bad
39    clippy::exhaustive_enums,
40    clippy::exhaustive_structs,
41    clippy::indexing_slicing,
42    clippy::separated_literal_suffix, // conflicts with clippy::unseparated_literal_suffix
43    clippy::single_char_lifetime_names,
44)]
45
46//! Associate `VecDeque` with `timestamps`.
47use std::collections::vec_deque::{Iter, IterMut};
48use std::collections::{BTreeMap, VecDeque};
49use std::sync::atomic::{AtomicUsize, Ordering};
50use std::time::{Duration, SystemTime, UNIX_EPOCH};
51
52/// get the current wall clock in ns
53///
54/// # Panics
55/// if the time is before `UNIX_EPOCH`
56#[must_use]
57pub fn now() -> u64 {
58    u64::try_from(
59        SystemTime::now()
60            .duration_since(UNIX_EPOCH)
61            .expect("1970-01-01 00:00:00 UTC was {} seconds ago!")
62            .as_nanos(),
63    )
64    .unwrap_or(u64::MAX)
65}
66
67/// current ns time add `dur`.
68#[must_use]
69pub fn get_timeout_time(dur: Duration) -> u64 {
70    u64::try_from(dur.as_nanos())
71        .map(|d| d.saturating_add(now()))
72        .unwrap_or(u64::MAX)
73}
74
75/// A queue for managing multiple entries under a specified timestamp.
76#[derive(Debug, Eq, PartialEq)]
77pub struct TimerEntry<T> {
78    timestamp: u64,
79    inner: VecDeque<T>,
80}
81
82impl<'t, T> IntoIterator for &'t mut TimerEntry<T> {
83    type Item = &'t mut T;
84    type IntoIter = IterMut<'t, T>;
85
86    fn into_iter(self) -> Self::IntoIter {
87        self.iter_mut()
88    }
89}
90
91impl<'t, T> IntoIterator for &'t TimerEntry<T> {
92    type Item = &'t T;
93    type IntoIter = Iter<'t, T>;
94
95    fn into_iter(self) -> Self::IntoIter {
96        self.iter()
97    }
98}
99
100impl<T> TimerEntry<T> {
101    /// Creates an empty deque.
102    #[must_use]
103    pub fn new(timestamp: u64) -> Self {
104        TimerEntry {
105            timestamp,
106            inner: VecDeque::new(),
107        }
108    }
109
110    /// Returns the number of elements in the deque.
111    #[must_use]
112    pub fn len(&self) -> usize {
113        self.inner.len()
114    }
115
116    /// Returns `true` if the deque is empty.
117    #[must_use]
118    pub fn is_empty(&self) -> bool {
119        self.inner.is_empty()
120    }
121
122    /// Get the timestamp.
123    #[must_use]
124    pub fn get_timestamp(&self) -> u64 {
125        self.timestamp
126    }
127
128    /// Removes the first element and returns it, or `None` if the deque is empty.
129    pub fn pop_front(&mut self) -> Option<T> {
130        self.inner.pop_front()
131    }
132
133    /// Appends an element to the back of the deque.
134    pub fn push_back(&mut self, t: T) {
135        self.inner.push_back(t);
136    }
137
138    /// Removes and returns the `t` from the deque.
139    /// Whichever end is closer to the removal point will be moved to make
140    /// room, and all the affected elements will be moved to new positions.
141    /// Returns `None` if `t` not found.
142    pub fn remove(&mut self, t: &T) -> Option<T>
143    where
144        T: Ord,
145    {
146        let index = self
147            .inner
148            .binary_search_by(|x| x.cmp(t))
149            .unwrap_or_else(|x| x);
150        self.inner.remove(index)
151    }
152
153    /// Returns a front-to-back iterator that returns mutable references.
154    pub fn iter_mut(&mut self) -> IterMut<'_, T> {
155        self.inner.iter_mut()
156    }
157
158    /// Returns a front-to-back iterator.
159    #[must_use]
160    pub fn iter(&self) -> Iter<'_, T> {
161        self.inner.iter()
162    }
163}
164
165/// A queue for managing multiple `TimerEntry`.
166#[repr(C)]
167#[derive(Debug)]
168pub struct TimerList<T> {
169    inner: BTreeMap<u64, TimerEntry<T>>,
170    total: AtomicUsize,
171}
172
173impl<T: PartialEq> PartialEq<Self> for TimerList<T> {
174    fn eq(&self, other: &Self) -> bool {
175        self.inner.eq(&other.inner)
176    }
177}
178
179impl<T: Eq> Eq for TimerList<T> {}
180
181impl<T> Default for TimerList<T> {
182    fn default() -> Self {
183        TimerList {
184            inner: BTreeMap::default(),
185            total: AtomicUsize::new(0),
186        }
187    }
188}
189
190impl<'t, T> IntoIterator for &'t TimerList<T> {
191    type Item = (&'t u64, &'t TimerEntry<T>);
192    type IntoIter = std::collections::btree_map::Iter<'t, u64, TimerEntry<T>>;
193
194    fn into_iter(self) -> Self::IntoIter {
195        self.iter()
196    }
197}
198
199impl<T> TimerList<T> {
200    /// Returns the number of elements in the deque.
201    #[must_use]
202    pub fn len(&self) -> usize {
203        if self.inner.is_empty() {
204            return 0;
205        }
206        self.total.load(Ordering::Acquire)
207    }
208
209    /// Returns the number of entries in the deque.
210    #[must_use]
211    pub fn entry_len(&self) -> usize {
212        self.inner.len()
213    }
214
215    /// Inserts an element at `timestamp` within the deque, shifting all elements
216    /// with indices greater than or equal to `timestamp` towards the back.
217    pub fn insert(&mut self, timestamp: u64, t: T) {
218        if let Some(entry) = self.inner.get_mut(&timestamp) {
219            entry.push_back(t);
220            _ = self.total.fetch_add(1, Ordering::Release);
221            return;
222        }
223        let mut entry = TimerEntry::new(timestamp);
224        entry.push_back(t);
225        _ = self.total.fetch_add(1, Ordering::Release);
226        if let Some(mut entry) = self.inner.insert(timestamp, entry) {
227            // concurrent, just retry
228            while !entry.is_empty() {
229                if let Some(e) = entry.pop_front() {
230                    self.insert(timestamp, e);
231                }
232            }
233        }
234    }
235
236    /// Provides a reference to the front element, or `None` if the deque is empty.
237    #[must_use]
238    pub fn front(&self) -> Option<(&u64, &TimerEntry<T>)> {
239        self.inner.first_key_value()
240    }
241
242    /// Removes the first element and returns it, or `None` if the deque is empty.
243    pub fn pop_front(&mut self) -> Option<(u64, TimerEntry<T>)> {
244        self.inner.pop_first().map(|(timestamp, entry)| {
245            _ = self.total.fetch_sub(entry.len(), Ordering::Release);
246            (timestamp, entry)
247        })
248    }
249
250    /// Returns `true` if the deque is empty.
251    #[must_use]
252    pub fn is_empty(&self) -> bool {
253        self.len() == 0
254    }
255
256    /// Removes and returns the element at `timestamp` from the deque.
257    /// Whichever end is closer to the removal point will be moved to make
258    /// room, and all the affected elements will be moved to new positions.
259    /// Returns `None` if `timestamp` is out of bounds.
260    pub fn remove_entry(&mut self, timestamp: &u64) -> Option<TimerEntry<T>> {
261        self.inner.remove(timestamp).map(|entry| {
262            _ = self.total.fetch_sub(entry.len(), Ordering::Release);
263            entry
264        })
265    }
266
267    /// Removes and returns the `t` from the deque.
268    /// Whichever end is closer to the removal point will be moved to make
269    /// room, and all the affected elements will be moved to new positions.
270    /// Returns `None` if `t` not found.
271    pub fn remove(&mut self, timestamp: &u64, t: &T) -> Option<T>
272    where
273        T: Ord,
274    {
275        if let Some(entry) = self.inner.get_mut(timestamp) {
276            let val = entry.remove(t).map(|item| {
277                _ = self.total.fetch_sub(1, Ordering::Release);
278                item
279            });
280            if entry.is_empty() {
281                _ = self.remove_entry(timestamp);
282            }
283            return val;
284        }
285        None
286    }
287
288    /// Returns a front-to-back iterator.
289    pub fn iter(&self) -> std::collections::btree_map::Iter<'_, u64, TimerEntry<T>> {
290        self.inner.iter()
291    }
292}
293
294#[cfg(test)]
295mod tests {
296    use super::*;
297
298    #[test]
299    fn test() {
300        assert!(now() > 0);
301    }
302
303    #[test]
304    fn timer_list() {
305        let mut list = TimerList::default();
306        assert_eq!(list.entry_len(), 0);
307        list.insert(1, String::from("data is 1"));
308        list.insert(2, String::from("data is 2"));
309        list.insert(3, String::from("data is 3"));
310        assert_eq!(list.entry_len(), 3);
311
312        let mut entry = list.pop_front().unwrap().1;
313        assert_eq!(entry.len(), 1);
314        let string = entry.pop_front().unwrap();
315        assert_eq!(string, String::from("data is 1"));
316        assert_eq!(entry.len(), 0);
317    }
318}