rtsc/
pdeque.rs

1use std::collections::VecDeque;
2
3use crate::data_policy::{DataDeliveryPolicy, DeliveryPolicy, StorageTryPushOutput};
4
5/// A deque which stores values with respect of [`DataDeliveryPolicy`]
6#[derive(Clone, Debug)]
7pub struct Deque<T>
8where
9    T: DataDeliveryPolicy,
10{
11    data: VecDeque<T>,
12    capacity: usize,
13    ordered: bool,
14}
15
16impl<T> Deque<T>
17where
18    T: DataDeliveryPolicy,
19{
20    /// Creates a new bounded deque
21    #[inline]
22    pub fn bounded(capacity: usize) -> Self {
23        Self {
24            data: VecDeque::with_capacity(capacity),
25            capacity,
26            ordered: false,
27        }
28    }
29    /// Enabled/disables priority ordering, can be used as a build pattern
30    #[inline]
31    pub fn set_ordering(mut self, v: bool) -> Self {
32        self.ordered = v;
33        self
34    }
35    /// Tries to store the value
36    ///
37    /// Returns the value back if there is no capacity even after all [`DataDeliveryPolicy`]
38    /// rules have been applied
39    ///
40    /// Note: expired values are dropped and the operation returns: pushed=true
41    pub fn try_push(&mut self, value: T) -> StorageTryPushOutput<T> {
42        macro_rules! push {
43            () => {{
44                self.data.push_back(value);
45                if self.ordered {
46                    sort_by_priority(&mut self.data);
47                }
48                StorageTryPushOutput::Pushed
49            }};
50        }
51        if value.is_expired() {
52            return StorageTryPushOutput::Pushed;
53        }
54        if value.is_delivery_policy_single() {
55            self.data.retain(|d| !d.eq_kind(&value) && !d.is_expired());
56        }
57        macro_rules! push_final {
58            () => {
59                if self.data.len() < self.capacity {
60                    push!()
61                } else {
62                    StorageTryPushOutput::Full(value)
63                }
64            };
65        }
66        if self.data.len() < self.capacity {
67            push!()
68        } else {
69            match value.delivery_policy() {
70                DeliveryPolicy::Always | DeliveryPolicy::Single => {
71                    let mut entry_removed = false;
72                    self.data.retain(|d| {
73                        if entry_removed {
74                            true
75                        } else if d.is_expired() || d.is_delivery_policy_optional() {
76                            entry_removed = true;
77                            false
78                        } else {
79                            true
80                        }
81                    });
82                    push_final!()
83                }
84                DeliveryPolicy::Latest => {
85                    let mut entry_removed = false;
86                    self.data.retain(|d| {
87                        if entry_removed {
88                            true
89                        } else if d.is_expired()
90                            || d.is_delivery_policy_optional()
91                            || d.eq_kind(&value)
92                        {
93                            entry_removed = true;
94                            false
95                        } else {
96                            true
97                        }
98                    });
99                    push_final!()
100                }
101                DeliveryPolicy::Optional | DeliveryPolicy::SingleOptional => {
102                    StorageTryPushOutput::Skipped
103                }
104            }
105        }
106    }
107    /// Returns the first available value, ignores expired ones
108    #[inline]
109    pub fn get(&mut self) -> Option<T> {
110        loop {
111            let value = self.data.pop_front();
112            if let Some(ref val) = value {
113                if !val.is_expired() {
114                    break value;
115                }
116            } else {
117                break None;
118            }
119        }
120    }
121    /// Clears the deque
122    #[inline]
123    pub fn clear(&mut self) {
124        self.data.clear();
125    }
126    /// Returns number of elements in deque
127    #[inline]
128    pub fn len(&self) -> usize {
129        self.data.len()
130    }
131    /// Returns is the deque full
132    #[inline]
133    pub fn is_full(&self) -> bool {
134        self.len() == self.capacity
135    }
136    /// Returns is the deque empty
137    #[inline]
138    pub fn is_empty(&self) -> bool {
139        self.data.is_empty()
140    }
141}
142
143fn sort_by_priority<T: DataDeliveryPolicy>(v: &mut VecDeque<T>) {
144    v.rotate_right(v.as_slices().1.len());
145    assert!(v.as_slices().1.is_empty());
146    v.as_mut_slices()
147        .0
148        .sort_by(|a, b| a.priority().partial_cmp(&b.priority()).unwrap());
149}
150
151#[cfg(test)]
152mod test {
153    use super::Deque;
154    use crate::data_policy::{DataDeliveryPolicy, StorageTryPushOutput};
155
156    struct Data {
157        id: u32,
158        value: f64,
159    }
160
161    impl DataDeliveryPolicy for Data {
162        fn delivery_policy(&self) -> crate::data_policy::DeliveryPolicy {
163            crate::data_policy::DeliveryPolicy::Single
164        }
165
166        fn priority(&self) -> usize {
167            100
168        }
169
170        fn eq_kind(&self, other: &Self) -> bool {
171            self.id == other.id
172        }
173
174        fn is_expired(&self) -> bool {
175            false
176        }
177    }
178
179    #[test]
180    #[allow(clippy::float_cmp)]
181    fn test_dp_single() {
182        let mut d: Deque<Data> = Deque::bounded(2);
183        assert!(matches!(
184            d.try_push(Data { id: 1, value: 1.0 }),
185            StorageTryPushOutput::Pushed
186        ));
187        assert!(matches!(
188            d.try_push(Data { id: 2, value: 2.0 }),
189            StorageTryPushOutput::Pushed
190        ));
191        assert!(matches!(
192            d.try_push(Data { id: 1, value: 3.0 }),
193            StorageTryPushOutput::Pushed
194        ));
195        assert_eq!(d.len(), 2);
196        let v1 = d.get().unwrap();
197        assert_eq!(v1.id, 2);
198        assert_eq!(v1.value, 2.0);
199        let v2 = d.get().unwrap();
200        assert_eq!(v2.id, 1);
201        assert_eq!(v2.value, 3.0);
202        assert!(d.get().is_none());
203    }
204}