rtsc/pdeque.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
use std::collections::VecDeque;
use crate::data_policy::{DataDeliveryPolicy, DeliveryPolicy, StorageTryPushOutput};
/// A deque which stores values with respect of [`DataDeliveryPolicy`]
#[derive(Clone, Debug)]
pub struct Deque<T>
where
T: DataDeliveryPolicy,
{
data: VecDeque<T>,
capacity: usize,
ordered: bool,
}
impl<T> Deque<T>
where
T: DataDeliveryPolicy,
{
/// Creates a new bounded deque
#[inline]
pub fn bounded(capacity: usize) -> Self {
Self {
data: VecDeque::with_capacity(capacity),
capacity,
ordered: false,
}
}
/// Enabled/disables priority ordering, can be used as a build pattern
#[inline]
pub fn set_ordering(mut self, v: bool) -> Self {
self.ordered = v;
self
}
/// Tries to store the value
///
/// Returns the value back if there is no capacity even after all [`DataDeliveryPolicy`]
/// rules have been applied
///
/// Note: expired values are dropped and the operation returns: pushed=true
pub fn try_push(&mut self, value: T) -> StorageTryPushOutput<T> {
macro_rules! push {
() => {{
self.data.push_back(value);
if self.ordered {
sort_by_priority(&mut self.data);
}
StorageTryPushOutput::Pushed
}};
}
if value.is_expired() {
return StorageTryPushOutput::Pushed;
}
if value.is_delivery_policy_single() {
self.data.retain(|d| !d.eq_kind(&value) && !d.is_expired());
}
macro_rules! push_final {
() => {
if self.data.len() < self.capacity {
push!()
} else {
StorageTryPushOutput::Full(value)
}
};
}
if self.data.len() < self.capacity {
push!()
} else {
match value.delivery_policy() {
DeliveryPolicy::Always | DeliveryPolicy::Single => {
let mut entry_removed = false;
self.data.retain(|d| {
if entry_removed {
true
} else if d.is_expired() || d.is_delivery_policy_optional() {
entry_removed = true;
false
} else {
true
}
});
push_final!()
}
DeliveryPolicy::Latest => {
let mut entry_removed = false;
self.data.retain(|d| {
if entry_removed {
true
} else if d.is_expired()
|| d.is_delivery_policy_optional()
|| d.eq_kind(&value)
{
entry_removed = true;
false
} else {
true
}
});
push_final!()
}
DeliveryPolicy::Optional | DeliveryPolicy::SingleOptional => {
StorageTryPushOutput::Skipped
}
}
}
}
/// Returns the first available value, ignores expired ones
#[inline]
pub fn get(&mut self) -> Option<T> {
loop {
let value = self.data.pop_front();
if let Some(ref val) = value {
if !val.is_expired() {
break value;
}
} else {
break None;
}
}
}
/// Clears the deque
#[inline]
pub fn clear(&mut self) {
self.data.clear();
}
/// Returns number of elements in deque
#[inline]
pub fn len(&self) -> usize {
self.data.len()
}
/// Returns is the deque full
#[inline]
pub fn is_full(&self) -> bool {
self.len() == self.capacity
}
/// Returns is the deque empty
#[inline]
pub fn is_empty(&self) -> bool {
self.data.is_empty()
}
}
fn sort_by_priority<T: DataDeliveryPolicy>(v: &mut VecDeque<T>) {
v.rotate_right(v.as_slices().1.len());
assert!(v.as_slices().1.is_empty());
v.as_mut_slices()
.0
.sort_by(|a, b| a.priority().partial_cmp(&b.priority()).unwrap());
}