1use std::collections::VecDeque;
2
3use crate::data_policy::{DataDeliveryPolicy, DeliveryPolicy, StorageTryPushOutput};
4
5#[derive(Clone, Debug)]
7pub struct Deque<T>
8where
9 T: DataDeliveryPolicy,
10{
11 data: VecDeque<T>,
12 capacity: usize,
13 ordered: bool,
14}
15
16impl<T> Deque<T>
17where
18 T: DataDeliveryPolicy,
19{
20 #[inline]
22 pub fn bounded(capacity: usize) -> Self {
23 Self {
24 data: VecDeque::with_capacity(capacity),
25 capacity,
26 ordered: false,
27 }
28 }
29 #[inline]
31 pub fn set_ordering(mut self, v: bool) -> Self {
32 self.ordered = v;
33 self
34 }
35 pub fn try_push(&mut self, value: T) -> StorageTryPushOutput<T> {
42 macro_rules! push {
43 () => {{
44 self.data.push_back(value);
45 if self.ordered {
46 sort_by_priority(&mut self.data);
47 }
48 StorageTryPushOutput::Pushed
49 }};
50 }
51 if value.is_expired() {
52 return StorageTryPushOutput::Pushed;
53 }
54 if value.is_delivery_policy_single() {
55 self.data.retain(|d| !d.eq_kind(&value) && !d.is_expired());
56 }
57 macro_rules! push_final {
58 () => {
59 if self.data.len() < self.capacity {
60 push!()
61 } else {
62 StorageTryPushOutput::Full(value)
63 }
64 };
65 }
66 if self.data.len() < self.capacity {
67 push!()
68 } else {
69 match value.delivery_policy() {
70 DeliveryPolicy::Always | DeliveryPolicy::Single => {
71 let mut entry_removed = false;
72 self.data.retain(|d| {
73 if entry_removed {
74 true
75 } else if d.is_expired() || d.is_delivery_policy_optional() {
76 entry_removed = true;
77 false
78 } else {
79 true
80 }
81 });
82 push_final!()
83 }
84 DeliveryPolicy::Latest => {
85 let mut entry_removed = false;
86 self.data.retain(|d| {
87 if entry_removed {
88 true
89 } else if d.is_expired()
90 || d.is_delivery_policy_optional()
91 || d.eq_kind(&value)
92 {
93 entry_removed = true;
94 false
95 } else {
96 true
97 }
98 });
99 push_final!()
100 }
101 DeliveryPolicy::Optional | DeliveryPolicy::SingleOptional => {
102 StorageTryPushOutput::Skipped
103 }
104 }
105 }
106 }
107 #[inline]
109 pub fn get(&mut self) -> Option<T> {
110 loop {
111 let value = self.data.pop_front();
112 if let Some(ref val) = value {
113 if !val.is_expired() {
114 break value;
115 }
116 } else {
117 break None;
118 }
119 }
120 }
121 #[inline]
123 pub fn clear(&mut self) {
124 self.data.clear();
125 }
126 #[inline]
128 pub fn len(&self) -> usize {
129 self.data.len()
130 }
131 #[inline]
133 pub fn is_full(&self) -> bool {
134 self.len() == self.capacity
135 }
136 #[inline]
138 pub fn is_empty(&self) -> bool {
139 self.data.is_empty()
140 }
141}
142
143fn sort_by_priority<T: DataDeliveryPolicy>(v: &mut VecDeque<T>) {
144 v.rotate_right(v.as_slices().1.len());
145 assert!(v.as_slices().1.is_empty());
146 v.as_mut_slices()
147 .0
148 .sort_by(|a, b| a.priority().partial_cmp(&b.priority()).unwrap());
149}
150
151#[cfg(test)]
152mod test {
153 use super::Deque;
154 use crate::data_policy::{DataDeliveryPolicy, StorageTryPushOutput};
155
156 struct Data {
157 id: u32,
158 value: f64,
159 }
160
161 impl DataDeliveryPolicy for Data {
162 fn delivery_policy(&self) -> crate::data_policy::DeliveryPolicy {
163 crate::data_policy::DeliveryPolicy::Single
164 }
165
166 fn priority(&self) -> usize {
167 100
168 }
169
170 fn eq_kind(&self, other: &Self) -> bool {
171 self.id == other.id
172 }
173
174 fn is_expired(&self) -> bool {
175 false
176 }
177 }
178
179 #[test]
180 #[allow(clippy::float_cmp)]
181 fn test_dp_single() {
182 let mut d: Deque<Data> = Deque::bounded(2);
183 assert!(matches!(
184 d.try_push(Data { id: 1, value: 1.0 }),
185 StorageTryPushOutput::Pushed
186 ));
187 assert!(matches!(
188 d.try_push(Data { id: 2, value: 2.0 }),
189 StorageTryPushOutput::Pushed
190 ));
191 assert!(matches!(
192 d.try_push(Data { id: 1, value: 3.0 }),
193 StorageTryPushOutput::Pushed
194 ));
195 assert_eq!(d.len(), 2);
196 let v1 = d.get().unwrap();
197 assert_eq!(v1.id, 2);
198 assert_eq!(v1.value, 2.0);
199 let v2 = d.get().unwrap();
200 assert_eq!(v2.id, 1);
201 assert_eq!(v2.value, 3.0);
202 assert!(d.get().is_none());
203 }
204}