1use msg_store_uuid::{UuidManager,Uuid, UuidManagerError};
2use std::collections::BTreeMap;
3use std::fmt::Display;
4use std::sync::Arc;
5
6pub const DEFAULT_NODE_ID: Option<u16> = None;
7
8#[derive(Debug)]
9pub enum StoreErrorTy {
10 UuidManagerError(UuidManagerError),
11 ExceedesStoreMax,
12 ExceedesGroupMax,
13 LacksPriority,
14 SyncError
15}
16impl Display for StoreErrorTy {
17 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
18 match self {
19 Self::UuidManagerError(err) => write!(f, "({})", err),
20 Self::ExceedesStoreMax |
21 Self::ExceedesGroupMax |
22 Self::LacksPriority |
23 Self::SyncError => write!(f, "{:#?}", self)
24 }
25 }
26}
27
28#[derive(Debug)]
29pub struct StoreError {
30 pub err_ty: StoreErrorTy,
31 pub file: &'static str,
32 pub line: u32,
33 pub msg: Option<String>
34}
35
36impl Display for StoreError {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 if let Some(msg) = &self.msg {
39 write!(f, "STORE_ERROR: {}. file: {}, line: {}, msg: {}", self.err_ty, self.file, self.line, msg)
40 } else {
41 write!(f, "STORE_ERROR: {}. file: {}, line: {}.", self.err_ty, self.file, self.line)
42 }
43 }
44}
45
46macro_rules! store_error {
47 ($err_ty:expr) => {
48 StoreError {
49 err_ty: $err_ty,
50 file: file!(),
51 line: line!(),
52 msg: None
53 }
54 };
55 ($err_ty:expr, $msg:expr) => {
56 StoreError {
57 err_ty: $err_ty,
58 file: file!(),
59 line: line!(),
60 msg: Some($msg.to_string())
61 }
62 };
63}
64
65#[derive(Debug)]
66enum PruneBy {
67 Group,
68 Store
69}
70#[derive(Debug)]
71pub enum Deleted {
72 True,
73 False
74}
75
76#[derive(Debug)]
77pub struct StoreDefaults {
78 pub max_byte_size: Option<u64>
79}
80
81#[derive(Debug, Clone, Copy)]
82pub struct GroupDefaults {
83 pub max_byte_size: Option<u64>,
84}
85
86#[derive(Debug)]
87pub struct Group {
88 pub max_byte_size: Option<u64>,
89 pub byte_size: u64,
90 pub msgs_map: BTreeMap<Arc<Uuid>, u64>,
91}
92impl Group {
93 pub fn new(max_byte_size: Option<u64>) -> Group {
94 Group {
95 max_byte_size,
96 byte_size: 0,
97 msgs_map: BTreeMap::new()
98 }
99 }
100 pub fn update_from_config(&mut self, defaults: GroupDefaults) {
101 self.max_byte_size = defaults.max_byte_size;
102 }
103}
104
105#[derive(Debug)]
106struct RemovedMsgs {
107 priority: u16,
108 msgs: Vec<Arc<Uuid>>
109}
110impl RemovedMsgs {
111 pub fn new(priority: u16) -> RemovedMsgs {
112 RemovedMsgs {
113 priority,
114 msgs: vec![]
115 }
116 }
117 pub fn add(&mut self, uuid: Arc<Uuid>) {
118 self.msgs.push(uuid);
119 }
120}
121
122#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
123pub struct PacketMetaData {
124 pub uuid: Arc<Uuid>,
125 pub priority: u16,
126 pub byte_size: u64
127}
128
129#[derive(Debug)]
130pub struct AddResult {
131 pub uuid: Arc<Uuid>,
132 pub bytes_removed: u64,
133 pub groups_removed: Vec<u16>,
134 pub msgs_removed: Vec<Arc<Uuid>>
135}
136
137#[derive(Debug)]
154pub struct Store {
155 pub max_byte_size: Option<u64>,
156 pub byte_size: u64,
157 pub group_defaults: BTreeMap<u16, GroupDefaults>,
158 pub uuid_manager: UuidManager,
159 pub id_to_group_map: BTreeMap<Arc<Uuid>, u16>,
160 pub groups_map: BTreeMap<u16, Group>
161}
162
163impl Store {
164
165 pub fn new(node_id: Option<u16>) -> Result<Store, StoreError> {
166 let uuid_manager = match UuidManager::new(node_id) {
167 Ok(uuid_manager) => Ok(uuid_manager),
168 Err(error) => Err(store_error!(StoreErrorTy::UuidManagerError(error)))
169 }?;
170 Ok(Store {
171 max_byte_size: None,
172 byte_size: 0,
173 group_defaults: BTreeMap::new(),
174 uuid_manager,
175 id_to_group_map: BTreeMap::new(),
176 groups_map: BTreeMap::new()
177 })
178 }
179
180 fn msg_excedes_max_byte_size(byte_size: &u64, max_byte_size: &u64, msg_byte_size: &u64) -> bool {
181 &(byte_size + msg_byte_size) > max_byte_size
182 }
183
184 fn remove_msg(&mut self, uuid: Arc<Uuid>, group: &mut Group) -> Result<(), StoreError> {
185 let byte_size = match group.msgs_map.remove(&uuid) {
186 Some(byte_size) => Ok(byte_size),
187 None => Err(store_error!(StoreErrorTy::SyncError))
188 }?;
189 self.id_to_group_map.remove(&uuid);
190 self.byte_size -= byte_size;
191 group.byte_size -= byte_size;
192 Ok(())
193 }
194
195 fn get_group(&mut self, priority: u16) -> Group {
196 match self.groups_map.remove(&priority) {
197 Some(group) => group,
198 None => {
199 let max_byte_size = match self.group_defaults.get(&priority) {
200 Some(defaults) => defaults.max_byte_size.clone(),
201 None => None
202 };
203 Group::new(max_byte_size)
204 }
205 }
206 }
207
208 fn check_msg_size_agains_store(&self, msg_byte_size: u64) -> Result<(), StoreError> {
209 if let Some(store_max_byte_size) = self.max_byte_size {
210 if msg_byte_size > store_max_byte_size {
211 return Err(store_error!(StoreErrorTy::ExceedesStoreMax))
212 }
213 }
214 Ok(())
215 }
216
217 fn check_msg_size_against_group(&mut self, group: Group, msg_priority: u16, msg_byte_size: u64) -> Result<Group, StoreError> {
218 if let Some(group_max_byte_size) = &group.max_byte_size {
220 if &msg_byte_size > group_max_byte_size {
221 self.groups_map.insert(msg_priority, group);
222 return Err(store_error!(StoreErrorTy::ExceedesGroupMax));
223 }
224 }
225
226 let higher_priority_msg_total = {
229 let mut total = 0;
230 for (priority, group) in self.groups_map.iter().rev() {
231 if &msg_priority > priority {
232 break;
233 }
234 total += group.byte_size;
235 }
236 total
237 };
238
239 if let Some(store_max_byte_size) = self.max_byte_size {
241 if Self::msg_excedes_max_byte_size(&higher_priority_msg_total, &store_max_byte_size, &msg_byte_size) {
242 self.groups_map.insert(msg_priority, group);
243 return Err(store_error!(StoreErrorTy::LacksPriority));
244 }
245 }
246 Ok(group)
247 }
248
249 fn prune_group(&mut self, group: &mut Group, msg_byte_size: u64, prune_type: PruneBy) -> Result<(u64, Vec<Arc<Uuid>>), StoreError> {
250 let (byte_size, max_byte_size) = match prune_type {
251 PruneBy::Group => (group.byte_size, group.max_byte_size),
252 PruneBy::Store => (self.byte_size, self.max_byte_size)
253 };
254 let mut removed_msgs = vec![];
255 let mut bytes_removed = 0;
256 if let Some(max_byte_size) = &max_byte_size {
257 if Self::msg_excedes_max_byte_size(&byte_size, max_byte_size, &msg_byte_size) {
258 for (uuid, group_msg_byte_size) in group.msgs_map.iter() {
260 if !Self::msg_excedes_max_byte_size(&(byte_size - bytes_removed), max_byte_size, &msg_byte_size) {
261 break;
262 }
263 bytes_removed += group_msg_byte_size;
264 removed_msgs.push(uuid.clone());
265 }
266 for uuid in removed_msgs.iter() {
267 self.remove_msg(uuid.clone(), group)?;
268 }
269 }
270 }
271 Ok((bytes_removed, removed_msgs))
272 }
273
274 fn prune_store(&mut self, mut group: Option<&mut Group>, msg_priority: u16, msg_byte_size: u64) -> Result<(u64, Vec<u16>, Vec<Arc<Uuid>>), StoreError> {
275 let mut groups_removed = vec![];
276 let mut all_removed_msgs = vec![];
277 let mut bytes_removed = 0;
278 {
279 if let Some(store_max_byte_size) = self.max_byte_size.clone() {
280 if Self::msg_excedes_max_byte_size(&self.byte_size, &store_max_byte_size, &msg_byte_size) {
281 'groups: for (priority, group) in self.groups_map.iter_mut() {
282 if &msg_priority < priority {
283 break 'groups;
284 }
285 if !Self::msg_excedes_max_byte_size(&(self.byte_size - bytes_removed), &store_max_byte_size, &msg_byte_size) {
286 break 'groups;
287 }
288 let mut removed_msgs = RemovedMsgs::new(*priority);
289 let mut removed_msg_count = 0;
290 'messages: for (uuid, group_msg_byte_size) in group.msgs_map.iter() {
291 if !Self::msg_excedes_max_byte_size(&(self.byte_size - bytes_removed), &store_max_byte_size, &msg_byte_size) {
292 break 'messages;
293 }
294 bytes_removed += group_msg_byte_size;
295 removed_msg_count += 1;
296 removed_msgs.add(uuid.clone());
297 }
298 if group.msgs_map.len() == removed_msg_count {
299 groups_removed.push(*priority);
300 }
301 all_removed_msgs.push(removed_msgs);
302 }
303
304 if let Some(group) = &group {
306 let mut removed_msgs = RemovedMsgs::new(msg_priority);
307 let mut removed_msg_count = 0;
308 for (uuid, group_msg_byte_size) in group.msgs_map.iter() {
309 if !Self::msg_excedes_max_byte_size(&(self.byte_size - bytes_removed), &store_max_byte_size, &msg_byte_size) {
310 break;
311 }
312 bytes_removed += group_msg_byte_size;
313 removed_msg_count += 1;
314 removed_msgs.add(uuid.clone());
315 }
316 if group.msgs_map.len() == removed_msg_count {
317 groups_removed.push(msg_priority);
318 }
319 all_removed_msgs.push(removed_msgs);
320 }
321 for group_data in &all_removed_msgs {
323 if group_data.priority != msg_priority {
324 let mut group = match self.groups_map.remove(&group_data.priority) {
325 Some(group) => Ok(group),
326 None => Err(store_error!(StoreErrorTy::SyncError))
327 }?;
328 for uuid in group_data.msgs.iter() {
329 self.remove_msg(uuid.clone(), &mut group)?;
330 }
331 self.groups_map.insert(group_data.priority, group);
332 } else {
333 if let Some(mut group) = group.as_mut() {
334 for uuid in group_data.msgs.iter() {
335 self.remove_msg(uuid.clone(), &mut group)?;
336 }
337 };
338 }
339 }
340 for priority in &groups_removed {
341 self.groups_map.remove(&priority);
342 }
343
344 if let Some(group) = group {
346 self.prune_group(group, msg_byte_size, PruneBy::Store)?;
347 }
348 }
349 }
350 }
351
352 let msgs_removed: Vec<Arc<Uuid>> = all_removed_msgs.into_iter().map(|removed_msgs| removed_msgs.msgs).flatten().collect();
353 Ok((bytes_removed, groups_removed, msgs_removed))
354 }
355
356 fn insert_msg(&mut self, mut group: Group, uuid: Arc<Uuid>, priority: u16, msg_byte_size: u64) {
357 self.byte_size += msg_byte_size; self.id_to_group_map.insert(uuid.clone(), priority); group.byte_size += msg_byte_size; group.msgs_map.insert(uuid.clone(), msg_byte_size); self.groups_map.insert(priority, group);
362 }
363
364 pub fn add(&mut self, priority: u16, msg_byte_size: u64) -> Result<AddResult, StoreError> {
401 let uuid = match self.uuid_manager.next(priority) {
402 Ok(uuid) => Ok(uuid),
403 Err(error) => Err(store_error!(StoreErrorTy::UuidManagerError(error)))
404 }?;
405 self.add_with_uuid(uuid, msg_byte_size)
406 }
407
408 pub fn add_with_uuid(&mut self, uuid: Arc<Uuid>, msg_byte_size: u64) -> Result<AddResult, StoreError> {
440 let priority = uuid.priority;
442
443 self.check_msg_size_agains_store(msg_byte_size)?;
445
446 let group = self.get_group(priority);
449
450 let mut group = self.check_msg_size_against_group(group, priority, msg_byte_size)?;
452
453 let mut bytes_removed = 0;
454 let mut groups_removed = vec![];
455 let mut msgs_removed = vec![];
456
457 let (bytes_removed_from_group, mut msgs_removed_from_group) = self.prune_group(&mut group, msg_byte_size, PruneBy::Group)?;
459
460 bytes_removed += bytes_removed_from_group;
461 msgs_removed.append(&mut msgs_removed_from_group);
462
463 let (bytes_removed_from_groups, mut groups_removed_from_store, mut msgs_removed_from_groups) = self.prune_store(Some(&mut group), priority, msg_byte_size)?;
465 bytes_removed += bytes_removed_from_groups;
466 msgs_removed.append(&mut msgs_removed_from_groups);
467 groups_removed.append(&mut groups_removed_from_store);
468
469 self.insert_msg(group, uuid.clone(), priority, msg_byte_size);
471
472 Ok(AddResult{ uuid, bytes_removed, msgs_removed, groups_removed })
473 }
474
475 pub fn del(&mut self, uuid: Arc<Uuid>) -> Result<(), StoreError> {
495 let mut remove_group = false;
496 let priority = match self.id_to_group_map.get(&uuid) {
497 Some(priority) => priority,
498 None => {
499 return Ok(());
500 }
501 };
502 let mut group = match self.groups_map.get_mut(&priority) {
503 Some(group) => group,
504 None => {
505 return Ok(());
506 }
507 };
508 let bytes_removed = match group.msgs_map.remove(&uuid) {
509 Some(bytes_removed) => bytes_removed,
510 None => {
511 return Ok(());
512 }
513 };
514 group.byte_size -= bytes_removed;
515 self.byte_size -= bytes_removed;
516 if group.msgs_map.is_empty() {
517 remove_group = true;
518 }
519 if remove_group {
520 self.groups_map.remove(&priority);
521 }
522 self.id_to_group_map.remove(&uuid);
523 Ok(())
524 }
525
526 pub fn del_group(&mut self, priority: &u16) -> Result<(), StoreError> {
548 if let Some(group) = self.groups_map.remove(priority) {
549 for (uuid, _msg_byte_size) in group.msgs_map.iter() {
550 self.id_to_group_map.remove(uuid);
551 }
552 self.byte_size -= group.byte_size;
553 }
554 Ok(())
555 }
556
557 pub fn get(&self, uuid: Option<Arc<Uuid>>, priority: Option<u16>, reverse: bool) -> Result<Option<Arc<Uuid>>, StoreError> {
584
585 if let Some(uuid) = uuid {
586
587 match self.id_to_group_map.contains_key(&uuid) {
588 true => Ok(Some(uuid)),
589 false => Ok(None)
590 }
591
592 } else if let Some(priority) = priority {
593
594 let group = match self.groups_map.get(&priority) {
595 Some(group) => group,
596 None => { return Ok(None) }
597 };
598
599 let uuid_option = match !reverse {
600 true => group.msgs_map.keys().rev().next(),
601 false => group.msgs_map.keys().next()
602 };
603
604 match uuid_option {
605 Some(uuid) => Ok(Some(uuid.clone())),
606 None => { return Ok(None) }
607 }
608
609
610 } else {
611
612 let next_group_option = match !reverse {
613 true => self.groups_map.values().rev().next(),
614 false => self.groups_map.values().next()
615 };
616
617 let group = match next_group_option {
618 Some(group) => group,
619 None => { return Ok(None) }
620 };
621
622 let next_uuid_option = match !reverse {
623 true => group.msgs_map.keys().rev().next(),
624 false => group.msgs_map.keys().next()
625 };
626
627 match next_uuid_option {
628 Some(uuid) => Ok(Some(uuid.clone())),
629 None => Err(store_error!(StoreErrorTy::SyncError))
630 }
631
632 }
633 }
634
635 pub fn get_n(&self, n: usize, starting_priority: Option<u16>, after_uuid: Option<Arc<Uuid>>, reverse: bool) -> Vec<Arc<Uuid>> {
636 if let Some(starting_priority) = starting_priority {
637 if let Some(after_uuid) = after_uuid {
638 if !reverse {
639 self.id_to_group_map.iter()
640 .rev() .filter(|(uuid, _group)| uuid.priority <= starting_priority && uuid < &&after_uuid)
642 .map(|(uuid, _group)| uuid.clone())
643 .take(n)
644 .collect::<Vec<Arc<Uuid>>>()
645 } else {
646 self.id_to_group_map.iter()
647 .filter(|(uuid, _group)| uuid.priority <= starting_priority && uuid < &&after_uuid)
648 .map(|(uuid, _group)| uuid.clone())
649 .take(n)
650 .collect::<Vec<Arc<Uuid>>>()
651 }
652 } else {
653 if !reverse {
654 self.id_to_group_map.iter()
655 .rev() .filter(|(uuid, _group)| uuid.priority <= starting_priority)
657 .map(|(uuid, _group)| uuid.clone())
658 .take(n)
659 .collect::<Vec<Arc<Uuid>>>()
660 } else {
661 self.id_to_group_map.iter()
662 .filter(|(uuid, _group)| uuid.priority <= starting_priority)
663 .map(|(uuid, _group)| uuid.clone())
664 .take(n)
665 .collect::<Vec<Arc<Uuid>>>()
666 }
667 }
668 } else {
669 if let Some(after_uuid) = after_uuid {
670 if !reverse {
671 self.id_to_group_map.iter()
672 .rev() .filter(|(uuid, _group)| uuid < &&after_uuid)
674 .map(|(uuid, _group)| uuid.clone())
675 .take(n)
676 .collect::<Vec<Arc<Uuid>>>()
677 } else {
678 self.id_to_group_map.iter()
679 .filter(|(uuid, _group)| uuid < &&after_uuid)
680 .map(|(uuid, _group)| uuid.clone())
681 .take(n)
682 .collect::<Vec<Arc<Uuid>>>()
683 }
684 } else {
685 if !reverse {
686 self.id_to_group_map.iter()
687 .rev() .map(|(uuid, _group)| uuid.clone())
689 .take(n)
690 .collect::<Vec<Arc<Uuid>>>()
691 } else {
692 self.id_to_group_map.iter()
693 .map(|(uuid, _group)| uuid.clone())
694 .take(n)
695 .collect::<Vec<Arc<Uuid>>>()
696 }
697 }
698 }
699 }
700
701 pub fn get_metadata(&mut self, range: (u32, u32), priority: Option<u16>) -> Vec<PacketMetaData> {
723 let mut uuids = vec![];
724 let mut iter_count: u32 = 0;
725 let (start, end) = range;
726 let mut primer_iter = 0;
727 if let Some(priority) = priority {
728 if let Some(group) = self.groups_map.get(&priority) {
729 for (uuid, msg_byte_size) in group.msgs_map.iter() {
730 if primer_iter < start {
731 primer_iter += 1;
732 continue;
733 }
734 uuids.push(PacketMetaData{
735 uuid: uuid.clone(),
736 priority: priority.clone(),
737 byte_size: msg_byte_size.clone()
738 });
739 if iter_count == end {
740 break;
741 }
742 iter_count += 1;
743 }
744 }
745 } else {
746 'group: for (priority, group) in self.groups_map.iter() {
747 'msg: for (uuid, msg_byte_size) in group.msgs_map.iter().rev() {
748 if primer_iter < start {
749 primer_iter += 1;
750 continue 'msg;
751 }
752 uuids.push(PacketMetaData{
753 uuid: uuid.clone(),
754 priority: priority.clone(),
755 byte_size: msg_byte_size.clone()
756 });
757 if iter_count == end {
758 break 'group;
759 }
760 iter_count += 1;
761 }
762 }
763 }
764 uuids
765 }
766
767 pub fn update_group_defaults(&mut self, priority: u16, defaults: &GroupDefaults) -> Result<(u64, Vec<Arc<Uuid>>), StoreError> {
792 let mut bytes_removed = 0;
793 let mut msgs_removed = vec![];
794 self.group_defaults.insert(priority, defaults.clone());
795 if let Some(mut group) = self.groups_map.remove(&priority) {
796 group.update_from_config(defaults.clone());
797 let (bytes_removed_from_group, mut msgs_removed_from_group) = self.prune_group(&mut group, 0, PruneBy::Group)?;
798 bytes_removed += bytes_removed_from_group;
799 msgs_removed.append(&mut msgs_removed_from_group);
800 self.groups_map.insert(priority, group);
801 }
802 Ok((bytes_removed, msgs_removed))
803 }
804
805 pub fn delete_group_defaults(&mut self, priority: u16) {
829 self.group_defaults.remove(&priority);
830 if let Some(group) = self.groups_map.get_mut(&priority) {
831 group.max_byte_size = None;
832 }
833 }
834
835 pub fn update_store_defaults(&mut self, defaults: &StoreDefaults) -> Result<(u64, Vec<u16>, Vec<Arc<Uuid>>), StoreError> {
860 self.max_byte_size = defaults.max_byte_size;
861 self.prune_store(None, u16::MAX, 0)
862 }
863
864 pub fn uuid(&mut self, priority: u16) -> Result<Arc<Uuid>, StoreError> {
865 match self.uuid_manager.next(priority) {
866 Ok(uuid) => Ok(uuid),
867 Err(error) => Err(store_error!(StoreErrorTy::UuidManagerError(error)))
868 }
869 }
870
871}
872
873#[cfg(test)]
874mod tests {
875
876 mod add {
877 use crate::{ Store, GroupDefaults };
878
879 #[test]
880 fn should_increase_store_byte_size() {
881 let mut store = Store::new(None).unwrap();
882 store.add(1, "1234567890".len() as u64).expect("Could not add msg");
883 assert_eq!(store.byte_size, 10)
884 }
885
886 #[test]
887 fn should_increase_group_byte_size() {
888 let mut store = Store::new(None).unwrap();
889 store.add(1, "1234567890".len() as u64).expect("Could not add msg");
890 let group = store.groups_map.get(&1).expect("Could not find group");
891 assert_eq!(group.byte_size, 10)
892 }
893
894 #[test]
895 fn should_prune_store_byte_size_to_10_when_store_max_byte_size_exists() {
896 let mut store = Store::new(None).unwrap();
897 store.max_byte_size = Some(10);
898 store.add(1, "1234567890".len() as u64).expect("Could not add first msg");
899 store.add(1, "1234567890".len() as u64).expect("Could not second msg");
900 assert_eq!(store.byte_size, 10)
901 }
902
903 #[test]
904 fn should_prune_store_byte_size_to_10_when_group_max_byte_size_exists() {
905 let mut store = Store::new(None).unwrap();
906 store.add(1, "1234567890".len() as u64).expect("Could not add first msg");
907 let mut group = store.groups_map.get_mut(&1).expect("Could not find group");
908 group.max_byte_size = Some(10);
909 store.add(1, "1234567890".len() as u64).expect("Could not second msg");
910 assert_eq!(store.byte_size, 10)
911 }
912
913 #[test]
914 fn should_prune_group_byte_size_to_10_when_group_max_byte_size_exists() {
915 let mut store = Store::new(None).unwrap();
916 store.add(1, "1234567890".len() as u64).expect("Could not add first msg");
917 let mut group = store.groups_map.get_mut(&1).expect("Could not get mutable group");
918 group.max_byte_size = Some(10);
919 store.add(1, "1234567890".len() as u64).expect("Could not second msg");
920 let group = store.groups_map.get(&1).expect("Could get group ref");
921 assert_eq!(group.byte_size, 10)
922 }
923
924 #[test]
925 fn should_prune_oldest_msg_in_a_group_when_exceeding_group_max_byte_size() {
926 let mut store = Store::new(None).unwrap();
927 let first_uuid = store.add(1, "1234567890".len() as u64).expect("Could not add first msg").uuid;
928 let mut group = store.groups_map.get_mut(&1).expect("Could not get mutable group");
929 group.max_byte_size = Some(10);
930 let second_uuid = store.add(1, "1234567890".len() as u64).expect("Could not second msg").uuid;
931 assert_eq!(None, store.id_to_group_map.get(&first_uuid));
932 assert_eq!(Some(&1), store.id_to_group_map.get(&second_uuid));
933 }
934
935 #[test]
936 fn should_prune_oldest_msg_in_a_group_when_exceeding_store_max_byte_size() {
937 let mut store = Store::new(None).unwrap();
938 store.max_byte_size = Some(10);
939 let first_uuid = store.add(1, "1234567890".len() as u64).expect("Could not add first msg").uuid;
940 let second_uuid = store.add(1, "1234567890".len() as u64).expect("Could not second msg").uuid;
941 assert_eq!(None, store.id_to_group_map.get(&first_uuid));
942 assert_eq!(Some(&1), store.id_to_group_map.get(&second_uuid));
943 }
944
945 #[test]
946 fn should_prune_oldest_lowest_pri_msg_in_the_store_when_exceeding_store_max_byte_size() {
947 let mut store = Store::new(None).unwrap();
948 store.max_byte_size = Some(20);
949 let first_uuid = store.add(2, "1234567890".len() as u64).expect("Could not add first msg").uuid;
950 let second_uuid = store.add(1, "1234567890".len() as u64).expect("Could not second msg").uuid;
951 let third_uuid = store.add(1, "1234567890".len() as u64).expect("Could not second msg").uuid;
952 assert_eq!(Some(&2), store.id_to_group_map.get(&first_uuid));
953 assert_eq!(None, store.id_to_group_map.get(&second_uuid));
954 assert_eq!(Some(&1), store.id_to_group_map.get(&third_uuid));
955 }
956
957 #[test]
958 fn should_return_add_result_with_pruned_msgs() {
959 let mut store = Store::new(None).unwrap();
961 store.max_byte_size = Some(3);
962 let _first_uuid = store.add(1, "foo".len() as u64).expect("Could not add first msg").uuid;
963 let add_result = store.add(1, "foo".len() as u64).expect("Could not second msg");
964 assert_eq!(1, add_result.msgs_removed.len());
965 assert_eq!(3, add_result.bytes_removed);
966 assert_eq!(1, add_result.groups_removed[0]);
967
968 let mut store = Store::new(None).unwrap();
970 store.max_byte_size = Some(3);
971 let _first_uuid = store.add(1, "foo".len() as u64).expect("Could not add first msg").uuid;
972 let add_result = store.add(2, "foo".len() as u64).expect("Could not second msg");
973 assert_eq!(1, add_result.msgs_removed.len());
974 assert_eq!(3, add_result.bytes_removed);
975 assert_eq!(1, add_result.groups_removed[0]);
976 }
977
978 #[test]
979 fn should_return_msg_to_large_for_store_err() {
980 let mut store = Store::new(None).unwrap();
981 store.max_byte_size = Some(9);
982 let result = store.add(2, "1234567890".len() as u64);
983 assert!(result.is_err());
984 }
985
986 #[test]
987 fn should_return_msg_to_large_for_group_err() {
988 let mut store = Store::new(None).unwrap();
989 store.add(1, "1234567890".len() as u64).expect("Could not add first msg");
990 let mut group = store.groups_map.get_mut(&1).expect("Could not get mutable group");
991 group.max_byte_size = Some(10);
992 let result = store.add(1, "12345678901".len() as u64);
993 assert!(result.is_err());
994 }
995
996 #[test]
997 fn should_return_msg_lacks_priority_err() {
998 let mut store = Store::new(None).unwrap();
999 store.max_byte_size = Some(20);
1000 store.add(2, "1234567890".len() as u64).expect("Could not add first msg");
1001 store.add(2, "1234567890".len() as u64).expect("Could not second msg");
1002 let result = store.add(1, "1234567890".len() as u64);
1003 assert!(result.is_err());
1004 }
1005
1006 #[test]
1007 fn should_create_group_with_defaults() {
1008 let mut store = Store::new(None).unwrap();
1009 store.group_defaults.insert(1, GroupDefaults { max_byte_size: Some(10) });
1010 store.add(1, "1234567890".len() as u64).expect("Could not add msg");
1011 let group = store.groups_map.get(&1).expect("Could not get group");
1012 assert_eq!(Some(10), group.max_byte_size);
1013 }
1014
1015 #[test]
1016 fn should_reinsert_group_after_errors() {
1017 let mut store = Store::new(None).unwrap();
1018 store.max_byte_size = Some(10);
1019 store.add(2, "12345".len() as u64).expect("Could not add msg");
1020 let first_attempt = store.add(2, "12345678901".len() as u64);
1021 assert!(first_attempt.is_err());
1022 let mut group = store.groups_map.get_mut(&2).expect("Group not found");
1023 group.max_byte_size = Some(5);
1024 let second_attempt = store.add(2, "123456".len() as u64);
1025 assert!(second_attempt.is_err());
1026 let third_attempt = store.add(1, "123456".len() as u64);
1027 assert!(third_attempt.is_err());
1028 let group = store.groups_map.get(&2);
1029 assert!(group.is_some());
1030 }
1031
1032 }
1033
1034 mod get {
1035 use crate::Store;
1036
1037 #[test]
1038 fn should_return_msg() {
1039 let mut store = Store::new(None).unwrap();
1040 let uuid = store.add(1, "first message".len() as u64).unwrap().uuid;
1041 let stored_packet = store.get(Some(uuid.clone()), None, false).unwrap().expect("Msg not found");
1042 assert_eq!(uuid, stored_packet);
1043 }
1044
1045 #[test]
1046 fn should_return_oldest_msg() {
1047 let mut store = Store::new(None).unwrap();
1048 let first_uuid = store.add(1, "first message".len() as u64).unwrap().uuid;
1049 store.add(1, "second message".len() as u64).unwrap();
1050 let stored_packet = store.get(None, None, false).unwrap().expect("Msg not found");
1051 assert_eq!(first_uuid, stored_packet);
1052 }
1053
1054 #[test]
1055 fn should_return_youngest_msg() {
1056 let mut store = Store::new(None).unwrap();
1057 let _first_uuid = store.add(1, "first message".len() as u64).unwrap().uuid;
1058 let second_uuid = store.add(1, "second message".len() as u64).unwrap().uuid;
1059 let stored_packet = store.get(None, None, true).unwrap().expect("Msg not found");
1060 assert_eq!(second_uuid, stored_packet);
1061 }
1062
1063 #[test]
1064 fn should_return_highest_pri_msg() {
1065 let mut store = Store::new(None).unwrap();
1066 store.add(1, "first message".len() as u64).unwrap();
1067 let second_msg = store.add(2, "second message".len() as u64).unwrap().uuid;
1068 let stored_packet = store.get(None, None, false).unwrap().expect("Msg not found");
1069 assert_eq!(second_msg, stored_packet);
1070 }
1071
1072 #[test]
1073 fn should_return_lowest_pri_msg() {
1074 let mut store = Store::new(None).unwrap();
1075 let first_msg = store.add(1, "first message".len() as u64).unwrap().uuid;
1076 let _second_msg = store.add(2, "second message".len() as u64).unwrap().uuid;
1077 let stored_packet = store.get(None, None, true).unwrap().expect("Msg not found");
1078 assert_eq!(first_msg, stored_packet);
1079 }
1080
1081 #[test]
1082 fn should_return_oldest_msg_in_group() {
1083 let mut store = Store::new(None).unwrap();
1084 let first_uuid = store.add(1, "first message".len() as u64).unwrap().uuid;
1085 let _second_uuid = store.add(2, "second message".len() as u64).unwrap().uuid;
1086 let _third_uuid = store.add(1, "third message".len() as u64).unwrap().uuid;
1087 let stored_packet = store.get(None,Some(1), false).unwrap().expect("Msg not found");
1088 assert_eq!(first_uuid, stored_packet);
1089 }
1090
1091 #[test]
1092 fn should_return_youngest_msg_in_group() {
1093 let mut store = Store::new(None).unwrap();
1094 let _first_uuid = store.add(1, "first message".len() as u64).unwrap().uuid;
1095 let _second_uuid = store.add(2, "second message".len() as u64).unwrap().uuid;
1096 let third_uuid = store.add(1, "third message".len() as u64).unwrap().uuid;
1097 let stored_packet = store.get(None,Some(1), true).unwrap().expect("Msg not found");
1098 assert_eq!(third_uuid, stored_packet);
1099 }
1100
1101 }
1102
1103 mod get_n {
1104 use crate::Store;
1105
1106 #[test]
1107 fn should_return_n_msg_uuids() {
1108 let mut store = Store::new(None).unwrap();
1109 let uuids = vec![
1110 store.add(1, 10).unwrap().uuid, store.add(2, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(4, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid ];
1121 let rereived_uuids = store.get_n(10, None, None, false);
1122 assert_eq!(uuids.get(0).unwrap(), rereived_uuids.get(2).unwrap());
1123 assert_eq!(uuids.get(1).unwrap(), rereived_uuids.get(1).unwrap());
1124 assert_eq!(uuids.get(2).unwrap(), rereived_uuids.get(3).unwrap());
1125 assert_eq!(uuids.get(3).unwrap(), rereived_uuids.get(4).unwrap());
1126 assert_eq!(uuids.get(4).unwrap(), rereived_uuids.get(5).unwrap());
1127 assert_eq!(uuids.get(5).unwrap(), rereived_uuids.get(0).unwrap());
1128 assert_eq!(uuids.get(6).unwrap(), rereived_uuids.get(6).unwrap());
1129 assert_eq!(uuids.get(7).unwrap(), rereived_uuids.get(7).unwrap());
1130 assert_eq!(uuids.get(8).unwrap(), rereived_uuids.get(8).unwrap());
1131 assert_eq!(uuids.get(9).unwrap(), rereived_uuids.get(9).unwrap());
1132 }
1133
1134 #[test]
1135 fn should_return_9_messages_lt_4() {
1136 let mut store = Store::new(None).unwrap();
1137 let uuids = vec![
1138 store.add(1, 10).unwrap().uuid, store.add(2, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(4, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid ];
1149 let rereived_uuids = store.get_n(10, Some(2), None, false);
1150 assert_eq!(uuids.get(0).unwrap(), rereived_uuids.get(1).unwrap());
1151 assert_eq!(uuids.get(1).unwrap(), rereived_uuids.get(0).unwrap());
1152 assert_eq!(uuids.get(2).unwrap(), rereived_uuids.get(2).unwrap());
1153 assert_eq!(uuids.get(3).unwrap(), rereived_uuids.get(3).unwrap());
1154 assert_eq!(uuids.get(4).unwrap(), rereived_uuids.get(4).unwrap());
1155 assert_eq!(uuids.get(6).unwrap(), rereived_uuids.get(5).unwrap());
1156 assert_eq!(uuids.get(7).unwrap(), rereived_uuids.get(6).unwrap());
1157 assert_eq!(uuids.get(8).unwrap(), rereived_uuids.get(7).unwrap());
1158 assert_eq!(uuids.get(9).unwrap(), rereived_uuids.get(8).unwrap());
1159 assert_eq!(9, rereived_uuids.len());
1160 }
1161
1162 #[test]
1163 fn should_return_8_messages_lt_the_pri_2_message() {
1164 let mut store = Store::new(None).unwrap();
1165 let uuids = vec![
1166 store.add(1, 10).unwrap().uuid, store.add(2, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(4, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid, store.add(1, 10).unwrap().uuid ];
1177 let rereived_uuids = store.get_n(10, None, Some(uuids.get(1).unwrap().clone()), false);
1178 assert_eq!(uuids.get(0).unwrap(), rereived_uuids.get(0).unwrap());
1179 assert_eq!(uuids.get(2).unwrap(), rereived_uuids.get(1).unwrap());
1180 assert_eq!(uuids.get(3).unwrap(), rereived_uuids.get(2).unwrap());
1181 assert_eq!(uuids.get(4).unwrap(), rereived_uuids.get(3).unwrap());
1182 assert_eq!(uuids.get(6).unwrap(), rereived_uuids.get(4).unwrap());
1183 assert_eq!(uuids.get(7).unwrap(), rereived_uuids.get(5).unwrap());
1184 assert_eq!(uuids.get(8).unwrap(), rereived_uuids.get(6).unwrap());
1185 assert_eq!(uuids.get(9).unwrap(), rereived_uuids.get(7).unwrap());
1186 assert_eq!(8, rereived_uuids.len());
1187 }
1188
1189 }
1190
1191 mod get_metadata {
1192 use crate::Store;
1193
1194 #[test]
1195 fn should_return_2_message_data_points() {
1196 let mut store = Store::new(None).unwrap();
1197 let uuid1 = store.add(1, "first message".len() as u64).unwrap().uuid;
1198 let uuid2 = store.add(1, "second message".len() as u64).unwrap().uuid;
1199 let set = store.get_metadata((0, 1), None);
1200 assert_eq!(2, set.len());
1201 assert_eq!(uuid1, set[0].uuid);
1202 assert_eq!(uuid2, set[1].uuid);
1203 }
1204
1205 #[test]
1206 fn should_return_2_message_data_points_with_range_starting_at_2() {
1207 let mut store = Store::new(None).unwrap();
1208 let _uuid1 = store.add(1, "first message".len() as u64).unwrap().uuid;
1209 let _uuid2 = store.add(1, "second message".len() as u64).unwrap().uuid;
1210 let _uuid3 = store.add(1, "third message".len() as u64).unwrap().uuid;
1211 let set = store.get_metadata((1, 2), None);
1212 assert_eq!(2, set.len());
1213 }
1214
1215 }
1216
1217 mod del {
1218 use crate::Store;
1219
1220 #[test]
1221 fn should_decrease_byte_size() {
1222 let mut store = Store::new(None).unwrap();
1223 let uuid = store.add(1, "foo".len() as u64).unwrap().uuid;
1224 store.add(1, "bar".len() as u64).unwrap();
1225 let group = store.groups_map.get(&1).expect("Could get group ref");
1226 assert_eq!(6, store.byte_size);
1227 assert_eq!(6, group.byte_size);
1228 store.del(uuid).unwrap();
1229 let group = store.groups_map.get(&1).expect("Could get group ref");
1230 assert_eq!(3, store.byte_size);
1231 assert_eq!(3, group.byte_size);
1232 }
1233
1234 #[test]
1235 fn should_remove_empty_group() {
1236 let mut store = Store::new(None).unwrap();
1237 let uuid = store.add(1, "foo".len() as u64).unwrap().uuid;
1238 assert!(store.groups_map.get(&1).is_some());
1239 store.del(uuid).unwrap();
1240 assert!(store.groups_map.get(&1).is_none())
1241 }
1242
1243 }
1244
1245 mod del_group {
1246 use crate::Store;
1247
1248 #[test]
1249 fn should_decrease_byte_size() {
1250 let mut store = Store::new(None).unwrap();
1251 store.add(1, "foo".len() as u64).unwrap();
1252 store.add(1, "bar".len() as u64).unwrap();
1253 let group = store.groups_map.get(&1).expect("Could get group ref");
1254 assert_eq!(6, store.byte_size);
1255 assert_eq!(6, group.byte_size);
1256 store.del_group(&1).unwrap();
1257 assert_eq!(true, store.groups_map.get(&1).is_none());
1258 assert_eq!(0, store.byte_size);
1259 }
1260
1261 #[test]
1262 fn should_remove_empty_group() {
1263 let mut store = Store::new(None).unwrap();
1264 store.add(1, "foo".len() as u64).unwrap();
1265 store.add(1, "bar".len() as u64).unwrap();
1266 let group = store.groups_map.get(&1).expect("Could get group ref");
1267 assert_eq!(6, store.byte_size);
1268 assert_eq!(6, group.byte_size);
1269 store.del_group(&1).unwrap();
1270 assert_eq!(true, store.groups_map.get(&1).is_none());
1271 }
1272
1273 }
1274
1275 mod update_group_defaults {
1276 use crate::{ Store, GroupDefaults };
1277
1278 #[test]
1279 fn should_update_store_config() {
1280 let mut store = Store::new(None).unwrap();
1281 store.update_group_defaults(1, &GroupDefaults{ max_byte_size: Some(10) }).unwrap();
1282 let defaults = store.group_defaults.get(&1).expect("Could not find defaults");
1283 assert_eq!(Some(10), defaults.max_byte_size);
1284 }
1285
1286 #[test]
1287 fn should_update_existing_group() {
1288 let mut store = Store::new(None).unwrap();
1289 store.update_group_defaults(1, &GroupDefaults{ max_byte_size: Some(10) }).unwrap();
1290 store.add(1, "foo".len() as u64).unwrap();
1291 let group = store.groups_map.get(&1).expect("Could not find defaults");
1292 assert_eq!(Some(10), group.max_byte_size);
1293 }
1294
1295 #[test]
1296 fn should_prune_group_after_update() {
1297 let mut store = Store::new(None).unwrap();
1298 store.add(1, "foo".len() as u64).unwrap();
1299 store.add(1, "bar".len() as u64).unwrap();
1300 store.update_group_defaults(1, &GroupDefaults{ max_byte_size: Some(3) }).unwrap();
1301 let group = store.groups_map.get(&1).expect("Could not find group");
1302 assert_eq!(3, store.byte_size);
1303 assert_eq!(3, group.byte_size);
1304 }
1305
1306 }
1307
1308 mod delete_group_defaults {
1309 use crate::{Store, GroupDefaults};
1310
1311 #[test]
1312 fn should_update_existing_group() {
1313 let mut store = Store::new(None).unwrap();
1314 store.update_group_defaults(1, &GroupDefaults{ max_byte_size: Some(10) }).unwrap();
1315 store.add(1, "foo".len() as u64).unwrap();
1316 let group = store.groups_map.get(&1).expect("Could not find defaults");
1317 assert_eq!(Some(10), group.max_byte_size);
1318 store.delete_group_defaults(1);
1319 assert!(store.group_defaults.get(&1).is_none());
1320 let group = store.groups_map.get(&1).expect("Could not find defaults");
1321 assert_eq!(None, group.max_byte_size);
1322 }
1323 }
1324
1325 mod update_store_defaults {
1326 use crate::{ Store, StoreDefaults };
1327
1328 #[test]
1329 fn should_update_store_config() {
1330 let mut store = Store::new(None).unwrap();
1331 store.update_store_defaults(&StoreDefaults{ max_byte_size: Some(10) }).unwrap();
1332 assert_eq!(Some(10), store.max_byte_size);
1333 }
1334
1335 #[test]
1336 fn should_prune_store_after_update() {
1337 let mut store = Store::new(None).unwrap();
1338 store.add(1, "foo".len() as u64).unwrap();
1339 store.add(1, "bar".len() as u64).unwrap();
1340 store.update_store_defaults(&StoreDefaults{ max_byte_size: Some(3) }).unwrap();
1341 let group = store.groups_map.get(&1).expect("Could not find defaults");
1342 assert_eq!(3, store.byte_size);
1343 assert_eq!(3, group.byte_size);
1344 }
1345
1346 #[test]
1347 fn should_remove_empty_group_after_update() {
1348 let mut store = Store::new(None).unwrap();
1349 store.add(1, "foo".len() as u64).unwrap();
1350 store.update_store_defaults(&StoreDefaults{ max_byte_size: Some(2) }).unwrap();
1351 assert_eq!(0, store.groups_map.len());
1352 }
1353
1354 }
1355
1356 mod uuid {
1357 use msg_store_uuid::Uuid;
1358 use crate::Store;
1359 use std::sync::Arc;
1360
1361 #[test]
1362 fn should_convert_a_str_to_uuid() {
1363 let left = Arc::new(Uuid{ priority: 1, timestamp: 1636523479865480266, sequence: 1, node_id: 0 });
1364 assert_eq!(left, Uuid::from_string("1-1636523479865480266-1-0").unwrap())
1365 }
1366
1367 #[test]
1368 fn should_reflect_node_id() {
1369 let mut store = Store::new(Some(10)).unwrap();
1370 let uuid = store.uuid(1).unwrap();
1371 assert_eq!(10, uuid.node_id);
1372 }
1373
1374 }
1375
1376}