mles_utils/
local_db.rs

1/* This Source Code Form is subject to the terms of the Mozilla Public
2*  License, v. 2.0. If a copy of the MPL was not distributed with this
3*  file, You can obtain one at http://mozilla.org/MPL/2.0/.
4*
5*  Copyright (C) 2017-2018  Mles developers
6* */
7use futures::sync::mpsc::UnboundedSender;
8use std::collections::HashMap;
9
10use bytes::Bytes;
11
12pub(crate) struct MlesDb {
13    channels: Option<HashMap<u64, UnboundedSender<Bytes>>>,
14    messages: Vec<Bytes>,
15    peer_tx: Option<UnboundedSender<UnboundedSender<Bytes>>>,
16    history_limit: usize,
17    tx_db: Vec<UnboundedSender<Bytes>>,
18}
19
20impl MlesDb {
21    pub fn new(hlim: usize) -> MlesDb {
22        MlesDb {
23            channels: None,
24            messages: Vec::with_capacity(hlim),
25            peer_tx: None,
26            history_limit: hlim,
27            tx_db: Vec::new(),
28        }
29    }
30
31    pub fn get_channels(&self) -> Option<&HashMap<u64, UnboundedSender<Bytes>>> {
32        self.channels.as_ref()
33    }
34
35    pub fn get_messages(&self) -> &Vec<Bytes> {
36        &self.messages
37    }
38
39    pub fn get_messages_len(&self) -> usize {
40        self.messages.len()
41    }
42
43    pub fn get_tx_db(&self) -> &Vec<UnboundedSender<Bytes>> {
44        &self.tx_db
45    }
46
47    pub fn clear_tx_db(&mut self) {
48        self.tx_db = Vec::new();
49    }
50
51    pub fn add_tx_db(&mut self, tx: UnboundedSender<Bytes>) {
52        self.tx_db.push(tx);
53    }
54
55    pub fn get_history_limit(&self) -> usize {
56        self.history_limit
57    }
58
59    pub fn get_peer_tx(&self) -> Option<&UnboundedSender<UnboundedSender<Bytes>>> {
60        self.peer_tx.as_ref()
61    }
62
63    pub fn add_channel(&mut self, cid: u64, sender: UnboundedSender<Bytes>) {
64        if self.channels.is_none() {
65            self.channels = Some(HashMap::new());
66        }
67        if let Some(ref mut channels) = self.channels {
68            channels.insert(cid, sender);
69        }
70    }
71
72    pub fn check_for_duplicate_cid(&self, cid: u32) -> bool {
73        if let Some(ref channels) = self.channels {
74            if channels.contains_key(&(cid as u64)) {
75                return true;
76            }
77        }
78        false
79    }
80
81    pub fn rem_channel(&mut self, cid: u64) {
82        if let Some(ref mut channels) = self.channels {
83            channels.remove(&cid);
84        }
85    }
86
87    pub fn get_channels_len(&mut self) -> usize {
88        if let Some(ref channels) = self.channels {
89            return channels.len();
90        }
91        0
92    }
93
94    pub fn add_message(&mut self, message: Bytes) {
95        if 0 == self.get_history_limit() {
96            return;
97        }
98        if self.messages.len() == self.get_history_limit() {
99            self.messages.remove(0);
100        }
101        self.messages.push(message);
102    }
103
104    pub fn set_peer_tx(&mut self, peer_tx: UnboundedSender<UnboundedSender<Bytes>>) {
105        self.peer_tx = Some(peer_tx);
106    }
107
108    pub fn rem_peer_tx(&mut self) {
109        self.peer_tx = None;
110    }
111
112    pub fn check_peer(&self) -> bool {
113        self.peer_tx.is_some()
114    }
115}
116
117pub(crate) struct MlesPeerDb {
118    channels: HashMap<u64, UnboundedSender<Bytes>>,
119    messages: Vec<Bytes>,
120    history_limit: usize,
121    rx_stats: u64,
122    tx_stats: u64,
123}
124
125impl MlesPeerDb {
126    pub fn new(hlim: usize) -> MlesPeerDb {
127        MlesPeerDb {
128            channels: HashMap::new(),
129            messages: Vec::with_capacity(hlim),
130            history_limit: hlim,
131            rx_stats: 0,
132            tx_stats: 0,
133        }
134    }
135
136    pub fn get_channels(&self) -> &HashMap<u64, UnboundedSender<Bytes>> {
137        &self.channels
138    }
139
140    pub fn add_channel(&mut self, cid: u64, channel: UnboundedSender<Bytes>) {
141        self.channels.insert(cid, channel);
142    }
143
144    pub fn rem_channel(&mut self, cid: u64) {
145        self.channels.remove(&cid);
146    }
147
148    pub fn clear_channels(&mut self) {
149        self.channels = HashMap::new();
150    }
151
152    pub fn clear_stats(&mut self) {
153        self.rx_stats = 0;
154        self.tx_stats = 0;
155    }
156
157    pub fn add_message(&mut self, message: Bytes) {
158        if 0 == self.get_history_limit() {
159            return;
160        }
161        if self.messages.len() == self.get_history_limit() {
162            self.messages.remove(0);
163        }
164        self.messages.push(message);
165    }
166
167    pub fn get_messages(&self) -> &Vec<Bytes> {
168        &self.messages
169    }
170
171    pub fn get_history_limit(&self) -> usize {
172        self.history_limit
173    }
174
175    pub fn get_messages_len(&self) -> usize {
176        self.messages.len()
177    }
178
179    pub fn add_rx_stats(&mut self) {
180        self.rx_stats += 1;
181    }
182
183    pub fn add_tx_stats(&mut self) {
184        self.tx_stats += 1;
185    }
186
187    pub fn get_rx_stats(&self) -> u64 {
188        self.rx_stats
189    }
190
191    //pub fn get_tx_stats(&self) -> u64 {
192    //    self.tx_stats
193    //}
194}
195
196#[cfg(test)]
197
198mod tests {
199    use super::*;
200    const HISTLIMIT: usize = 100;
201
202    #[test]
203    fn test_new_db() {
204        let msg = "Message".to_string().into_bytes();
205        let mut mles_db = MlesDb::new(HISTLIMIT);
206        mles_db.add_message(Bytes::from(msg));
207        assert_eq!(1, mles_db.get_messages().len());
208        assert_eq!(0, mles_db.get_channels_len());
209        let channel = mles_db.get_channels();
210        assert_eq!(true, channel.is_none());
211    }
212
213    #[test]
214    fn test_new_peer_db() {
215        let msg = "Message".to_string().into_bytes();
216        let mut mles_peer = MlesPeerDb::new(HISTLIMIT);
217        assert_eq!(0, mles_peer.get_messages_len());
218        mles_peer.add_message(Bytes::from(msg));
219        assert_eq!(1, mles_peer.get_messages_len());
220    }
221
222    #[test]
223    fn test_db_history_limit() {
224        let mut limit = HISTLIMIT;
225        let msg = "Message".to_string().into_bytes();
226        let mut mles_peer = MlesPeerDb::new(limit);
227        assert_eq!(0, mles_peer.get_messages_len());
228        mles_peer.add_message(Bytes::from(msg.clone()));
229        assert_eq!(1, mles_peer.get_messages_len());
230        while limit > 0 {
231            mles_peer.add_message(Bytes::from(msg.clone()));
232            limit -= 1;
233        }
234        assert_eq!(HISTLIMIT, mles_peer.get_messages_len());
235        assert_eq!(HISTLIMIT, mles_peer.get_history_limit());
236    }
237}