1use futures::sync::mpsc::UnboundedSender;
8use std::collections::HashMap;
9
10use bytes::Bytes;
11
12pub(crate) struct MlesDb {
13 channels: Option<HashMap<u64, UnboundedSender<Bytes>>>,
14 messages: Vec<Bytes>,
15 peer_tx: Option<UnboundedSender<UnboundedSender<Bytes>>>,
16 history_limit: usize,
17 tx_db: Vec<UnboundedSender<Bytes>>,
18}
19
20impl MlesDb {
21 pub fn new(hlim: usize) -> MlesDb {
22 MlesDb {
23 channels: None,
24 messages: Vec::with_capacity(hlim),
25 peer_tx: None,
26 history_limit: hlim,
27 tx_db: Vec::new(),
28 }
29 }
30
31 pub fn get_channels(&self) -> Option<&HashMap<u64, UnboundedSender<Bytes>>> {
32 self.channels.as_ref()
33 }
34
35 pub fn get_messages(&self) -> &Vec<Bytes> {
36 &self.messages
37 }
38
39 pub fn get_messages_len(&self) -> usize {
40 self.messages.len()
41 }
42
43 pub fn get_tx_db(&self) -> &Vec<UnboundedSender<Bytes>> {
44 &self.tx_db
45 }
46
47 pub fn clear_tx_db(&mut self) {
48 self.tx_db = Vec::new();
49 }
50
51 pub fn add_tx_db(&mut self, tx: UnboundedSender<Bytes>) {
52 self.tx_db.push(tx);
53 }
54
55 pub fn get_history_limit(&self) -> usize {
56 self.history_limit
57 }
58
59 pub fn get_peer_tx(&self) -> Option<&UnboundedSender<UnboundedSender<Bytes>>> {
60 self.peer_tx.as_ref()
61 }
62
63 pub fn add_channel(&mut self, cid: u64, sender: UnboundedSender<Bytes>) {
64 if self.channels.is_none() {
65 self.channels = Some(HashMap::new());
66 }
67 if let Some(ref mut channels) = self.channels {
68 channels.insert(cid, sender);
69 }
70 }
71
72 pub fn check_for_duplicate_cid(&self, cid: u32) -> bool {
73 if let Some(ref channels) = self.channels {
74 if channels.contains_key(&(cid as u64)) {
75 return true;
76 }
77 }
78 false
79 }
80
81 pub fn rem_channel(&mut self, cid: u64) {
82 if let Some(ref mut channels) = self.channels {
83 channels.remove(&cid);
84 }
85 }
86
87 pub fn get_channels_len(&mut self) -> usize {
88 if let Some(ref channels) = self.channels {
89 return channels.len();
90 }
91 0
92 }
93
94 pub fn add_message(&mut self, message: Bytes) {
95 if 0 == self.get_history_limit() {
96 return;
97 }
98 if self.messages.len() == self.get_history_limit() {
99 self.messages.remove(0);
100 }
101 self.messages.push(message);
102 }
103
104 pub fn set_peer_tx(&mut self, peer_tx: UnboundedSender<UnboundedSender<Bytes>>) {
105 self.peer_tx = Some(peer_tx);
106 }
107
108 pub fn rem_peer_tx(&mut self) {
109 self.peer_tx = None;
110 }
111
112 pub fn check_peer(&self) -> bool {
113 self.peer_tx.is_some()
114 }
115}
116
117pub(crate) struct MlesPeerDb {
118 channels: HashMap<u64, UnboundedSender<Bytes>>,
119 messages: Vec<Bytes>,
120 history_limit: usize,
121 rx_stats: u64,
122 tx_stats: u64,
123}
124
125impl MlesPeerDb {
126 pub fn new(hlim: usize) -> MlesPeerDb {
127 MlesPeerDb {
128 channels: HashMap::new(),
129 messages: Vec::with_capacity(hlim),
130 history_limit: hlim,
131 rx_stats: 0,
132 tx_stats: 0,
133 }
134 }
135
136 pub fn get_channels(&self) -> &HashMap<u64, UnboundedSender<Bytes>> {
137 &self.channels
138 }
139
140 pub fn add_channel(&mut self, cid: u64, channel: UnboundedSender<Bytes>) {
141 self.channels.insert(cid, channel);
142 }
143
144 pub fn rem_channel(&mut self, cid: u64) {
145 self.channels.remove(&cid);
146 }
147
148 pub fn clear_channels(&mut self) {
149 self.channels = HashMap::new();
150 }
151
152 pub fn clear_stats(&mut self) {
153 self.rx_stats = 0;
154 self.tx_stats = 0;
155 }
156
157 pub fn add_message(&mut self, message: Bytes) {
158 if 0 == self.get_history_limit() {
159 return;
160 }
161 if self.messages.len() == self.get_history_limit() {
162 self.messages.remove(0);
163 }
164 self.messages.push(message);
165 }
166
167 pub fn get_messages(&self) -> &Vec<Bytes> {
168 &self.messages
169 }
170
171 pub fn get_history_limit(&self) -> usize {
172 self.history_limit
173 }
174
175 pub fn get_messages_len(&self) -> usize {
176 self.messages.len()
177 }
178
179 pub fn add_rx_stats(&mut self) {
180 self.rx_stats += 1;
181 }
182
183 pub fn add_tx_stats(&mut self) {
184 self.tx_stats += 1;
185 }
186
187 pub fn get_rx_stats(&self) -> u64 {
188 self.rx_stats
189 }
190
191 }
195
196#[cfg(test)]
197
198mod tests {
199 use super::*;
200 const HISTLIMIT: usize = 100;
201
202 #[test]
203 fn test_new_db() {
204 let msg = "Message".to_string().into_bytes();
205 let mut mles_db = MlesDb::new(HISTLIMIT);
206 mles_db.add_message(Bytes::from(msg));
207 assert_eq!(1, mles_db.get_messages().len());
208 assert_eq!(0, mles_db.get_channels_len());
209 let channel = mles_db.get_channels();
210 assert_eq!(true, channel.is_none());
211 }
212
213 #[test]
214 fn test_new_peer_db() {
215 let msg = "Message".to_string().into_bytes();
216 let mut mles_peer = MlesPeerDb::new(HISTLIMIT);
217 assert_eq!(0, mles_peer.get_messages_len());
218 mles_peer.add_message(Bytes::from(msg));
219 assert_eq!(1, mles_peer.get_messages_len());
220 }
221
222 #[test]
223 fn test_db_history_limit() {
224 let mut limit = HISTLIMIT;
225 let msg = "Message".to_string().into_bytes();
226 let mut mles_peer = MlesPeerDb::new(limit);
227 assert_eq!(0, mles_peer.get_messages_len());
228 mles_peer.add_message(Bytes::from(msg.clone()));
229 assert_eq!(1, mles_peer.get_messages_len());
230 while limit > 0 {
231 mles_peer.add_message(Bytes::from(msg.clone()));
232 limit -= 1;
233 }
234 assert_eq!(HISTLIMIT, mles_peer.get_messages_len());
235 assert_eq!(HISTLIMIT, mles_peer.get_history_limit());
236 }
237}