orouter_wireless/
lib_impl.rs

1// TODO Remaining tasks from .plan
2// * flush out unfinished messages from MessagePool after some time?
3// * - else what happens when a lost of unreceived parts blocks out the Pool for newer ones
4// * 7th message with 4th unmatching prefix comes in
5// * parts_vec could take Option<P2pMessagePart> for nicer api and lower alloc size with resize_default
6
7use std::collections::HashMap;
8
9use crc16::{State, X_25};
10use rand::prelude::*;
11
12type Prefix = Vec<u8>;
13type RndPart = Vec<u8>;
14pub type P2pMessagePart = crate::WirelessMessagePart;
15
16const MAX_P2P_MESSAGE_PART_COUNT: usize = u8::MAX as usize;
17const MAX_OVERLINE_MESSAGE_LENGTH: usize = 240 * MAX_P2P_MESSAGE_PART_COUNT;
18const MAX_UNFINISHED_P2P_MESSAGE_COUNT: usize = u8::MAX as usize;
19
20/// Represents a raw p2p message constructed back from chunks
21/// This has yet to be parsed into a typed [overline
22/// message](../overline/enum.OverlineMessageType.html)
23#[derive(Debug, PartialEq)]
24pub struct P2pMessage {
25    typ: super::MessageType,
26    data_type: u8,
27    data: Vec<u8>,
28}
29
30impl P2pMessage {
31    pub fn typ(&self) -> super::MessageType {
32        self.typ.clone()
33    }
34
35    pub fn data_type(&self) -> u8 {
36        self.data_type
37    }
38
39    pub fn data(&self) -> &[u8] {
40        self.data.as_slice()
41    }
42}
43
44#[derive(Debug, PartialEq)]
45pub enum Error {
46    PoolFull,
47    MalformedMessage(crate::ValidationError),
48    TooLong,
49}
50
51/// Holds parts of multipart messages before all parts have arrived
52///
53/// Messages in vector under each prefix key are inserted at the correct index - are kept sorted
54/// example of the map
55///
56/// ```ignore
57/// {
58///     '3B prefix': \[\[part_1\], \[part_2\], \[part3\]\], // these are ordered
59///     ...
60///     ...
61/// }
62/// ```
63pub struct MessagePool {
64    /// Contains parts of the raw P2pMessage. Parts are stored without the prefix
65    incomplete_message_map: HashMap<Prefix, Vec<P2pMessagePart>>,
66    network: crate::network::Network,
67}
68
69impl Default for MessagePool {
70    fn default() -> Self {
71        Self {
72            network: crate::network::DEFAULT,
73            incomplete_message_map: Default::default(),
74        }
75    }
76}
77
78impl MessagePool {
79    /// Try insert another part of sliced message. Will return `None` if this is not last (or the
80    /// only) message, else it will return just the data of this message (stripped of all the now
81    /// unnecessart protocol meta data)
82    pub fn try_insert(&mut self, msg: P2pMessagePart) -> Result<Option<P2pMessage>, Error> {
83        if let Err(e) = crate::is_valid_message(self.network, &msg) {
84            return Err(Error::MalformedMessage(e));
85        }
86
87        let part_num = &msg[crate::PART_NUMBER_IDX];
88        let total_count = &msg[crate::TOTAL_COUNT_IDX];
89        let len = msg[crate::LENGTH_IDX] as usize;
90        let typ = msg[crate::MSG_TYPE_IDX];
91        let data_type = msg[crate::DATA_TYPE_IDX];
92
93        // if this is part #1 of total count = 1, return immediately
94        if *part_num == 1 as u8 && *total_count == 1 as u8 {
95            return Ok(Some(P2pMessage {
96                typ: typ.into(),
97                data_type,
98                data: msg[crate::HEADER_LENGTH..crate::HEADER_LENGTH + len].to_vec(),
99            }));
100        }
101
102        log::trace!(
103            "after part_num = {}, total_count = {}",
104            part_num,
105            total_count
106        );
107
108        let prefix = msg[crate::PREFIX_IDX..crate::PREFIX_IDX + crate::PREFIX_LENGTH].to_vec();
109
110        log::trace!("prefix = {:02x?}", prefix);
111        // get the parts vec
112        let parts_vec = match self.incomplete_message_map.get_mut(&prefix) {
113            Some(parts) => parts,
114            None => {
115                // FIXME this should not be possible as MAX_UNFINISHED_P2P_MESSAGE_COUNT is u8::MAX
116                // and total_count field is 1B
117                if self.incomplete_message_map.len() == MAX_UNFINISHED_P2P_MESSAGE_COUNT as usize {
118                    return Err(Error::PoolFull);
119                }
120                let v = vec![];
121                log::trace!("inserting prefix = {:02x?}", prefix);
122                //
123                self.incomplete_message_map.insert(prefix.clone(), v);
124                self.incomplete_message_map.get_mut(&prefix).unwrap()
125            }
126        };
127        let parts_index = part_num - 1;
128        log::trace!("parts_index = {}", parts_index);
129
130        match parts_vec.get(parts_index as usize) {
131            Some(part) if !part.is_empty() => {} // we already have this message part, not a problem
132            Some(_) => {
133                parts_vec[parts_index as usize] =
134                    msg[crate::HEADER_LENGTH..crate::HEADER_LENGTH + len].to_vec();
135            }
136            None => {
137                // lets insert the message
138                parts_vec.resize(parts_index as usize + 1, vec![]);
139                parts_vec[parts_index as usize] =
140                    msg[crate::HEADER_LENGTH..crate::HEADER_LENGTH + len].to_vec();
141            }
142        }
143
144        let has_all = parts_vec.iter().all(|current| !current.is_empty());
145        let has_all = has_all && parts_vec.len() == *total_count as usize;
146
147        if has_all {
148            let mut data = vec![];
149            for part in parts_vec.iter() {
150                data.extend_from_slice(&part);
151            }
152
153            self.incomplete_message_map.remove(&prefix).unwrap();
154
155            return Ok(Some(P2pMessage {
156                typ: typ.into(),
157                data_type,
158                data,
159            }));
160        }
161
162        Ok(None)
163    }
164
165    /// Returns how many message parts (regardless of which message is the part of) are currently
166    /// being held by the internal state.
167    pub fn size(&self) -> u8 {
168        let mut size: u8 = 0;
169
170        for incomplete_message_parts in self.incomplete_message_map.values() {
171            for part in incomplete_message_parts {
172                if !part.is_empty() {
173                    size += 1
174                }
175            }
176        }
177
178        size
179    }
180
181    /// Resets the internal state of the pool. This equals constructing of a new pool - all the
182    /// message parts of unfinished messages inserted before calling of this method will be lost.
183    pub fn reset(&mut self) {
184        self.incomplete_message_map.clear();
185    }
186
187    // pub(crate) fn data_to_p2p_message(data: Vec<u8>) -> P2pMessage {}
188}
189
190/// Takes care of splitting a lengh-wise theoretically unlimited message into to
191/// chunks transmittable using oRouter with a header allowing receiver to assemble the logical message
192/// back from received parts (using [`crate::MessagePool`]).
193pub struct MessageSlicer {
194    rng: SmallRng,
195    network: crate::network::Network,
196}
197
198impl MessageSlicer {
199    /// `initial_seed` is a seed for rng for generating slice prefixes. Generate this using a
200    /// system source of randomness
201    pub fn new(initial_seed: u64, network: crate::network::Network) -> Self {
202        MessageSlicer {
203            rng: SmallRng::seed_from_u64(initial_seed),
204            network,
205        }
206    }
207
208    /// splits `data_bytes` to wireless message parts
209    pub fn slice(
210        &mut self,
211        data_bytes: &[u8],
212        message_type: super::MessageType,
213        data_type: u8,
214    ) -> Result<Vec<P2pMessagePart>, Error> {
215        // FIXME implement correct slicing - add typ, data_type, asemble hash from rnd and prefix,
216        // add network bytes
217        if data_bytes.len() > MAX_OVERLINE_MESSAGE_LENGTH {
218            return Err(Error::TooLong);
219        }
220
221        let mut prefix = Prefix::new();
222        prefix.resize(3, 0);
223        self.rng.fill(&mut prefix[0..3]);
224
225        let mut res = vec![];
226        let chunks = data_bytes
227            .chunks(crate::MAX_LORA_MESSAGE_SIZE - crate::HEADER_LENGTH - crate::CRC_LENGTH);
228        let total_count = chunks.len();
229        let typ = message_type.into();
230
231        for (i, part_bytes) in chunks.enumerate() {
232            let mut rnd_part = RndPart::new();
233            rnd_part.resize(3, 0);
234            self.rng.fill(&mut rnd_part[0..3]);
235            let mut p = P2pMessagePart::new();
236            p.extend_from_slice(&self.network);
237            p.extend_from_slice(&rnd_part);
238            p.extend_from_slice(&prefix);
239            p.push(i as u8 + 1); // part_num
240            p.push(total_count as u8); // total_count
241            p.push(part_bytes.len() as u8); // length
242            p.push(typ);
243            p.push(data_type);
244            p.extend_from_slice(&part_bytes); // data
245            let crc = State::<X_25>::calculate(p.as_slice());
246            p.extend_from_slice(&crc.to_be_bytes()[..]);
247            res.push(p);
248        }
249
250        Ok(res)
251    }
252}
253
254#[cfg(test)]
255mod tests {
256    use super::super::MessageType;
257    use super::*;
258    #[test]
259    fn test_pool_try_insert_1_of_1() {
260        let mut p = MessagePool {
261            network: crate::network::DEFAULT,
262            ..Default::default()
263        };
264        let msg1 = vec![
265            // network |prefix                             | num | tot | len |typ|  dtyp| data
266            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x01, 0x01, 0x03, 0x01, 0x01, 0xc0,
267            0xff, 0xee, // crc
268            0x31, 0x02,
269        ];
270
271        let res = p.try_insert(msg1).unwrap().unwrap();
272        assert_eq!(
273            res,
274            P2pMessage {
275                typ: 0x01.into(),
276                data_type: 0x01,
277                data: [0xc0, 0xff, 0xee].to_vec()
278            }
279        );
280    }
281
282    #[test]
283    fn test_pool_try_insert_2nd_of_3() {
284        let mut p = MessagePool {
285            network: crate::network::DEFAULT,
286            ..Default::default()
287        };
288        let msg1 = vec![
289            // network |prefix                             | num | tot | len |typ|  dtyp| data
290            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x03, 0x03, 0x01, 0x01, 0xc0,
291            0xff, 0xee, // crc
292            0x8c, 0x69,
293        ];
294
295        let res = p.try_insert(msg1).unwrap();
296        assert_eq!(res, None);
297    }
298
299    #[test]
300    fn test_pool_try_insert_1_and_2_of_2() {
301        let mut p = MessagePool {
302            network: crate::network::DEFAULT,
303            ..Default::default()
304        };
305        let msg1 = vec![
306            // network |prefix                             | num | tot | len |typ|  dtyp| data
307            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x01, 0x02, 0x03, 0x01, 0x01, 0xc0,
308            0xff, 0xee, // crc
309            0x99, 0x6c,
310        ];
311        let msg2 = vec![
312            // network |prefix                             | num | tot | len |typ|  dtyp| data
313            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x02, 0x03, 0x01, 0x01, 0xc1,
314            0xff, 0xee, // crc
315            0x49, 0x60,
316        ];
317
318        p.try_insert(msg1).unwrap();
319        let res = p.try_insert(msg2).unwrap().unwrap();
320        assert_eq!(
321            res,
322            P2pMessage {
323                typ: 0x01.into(),
324                data_type: 0x01,
325                data: [0xc0, 0xff, 0xee, 0xc1, 0xff, 0xee].to_vec()
326            }
327        );
328    }
329
330    #[test]
331    fn test_pool_try_insert_2_and_1_of_2() {
332        let mut p = MessagePool {
333            network: crate::network::TEST,
334            ..Default::default()
335        };
336        let msg1 = vec![
337            // network |prefix                             | num | tot | len |typ|  dtyp| data
338            0xCC, 0xAA, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x01, 0x02, 0x03, 0x01, 0x01, 0xc0,
339            0xff, 0xee, // crc
340            0x16, 0xba,
341        ];
342        let msg2 = vec![
343            // network |prefix                             | num | tot | len |typ|  dtyp| data
344            0xCC, 0xAA, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x02, 0x03, 0x01, 0x01, 0xc1,
345            0xff, 0xee, // crc
346            0xc6, 0xb6,
347        ];
348
349        assert_eq!(None, p.try_insert(msg2).unwrap());
350        let res = p.try_insert(msg1).unwrap().unwrap();
351        assert_eq!(
352            res,
353            P2pMessage {
354                typ: 0x01.into(),
355                data_type: 0x01,
356                data: [0xc0, 0xff, 0xee, 0xc1, 0xff, 0xee].to_vec()
357            }
358        );
359    }
360
361    #[test]
362    fn test_pool_size() {
363        let mut p = MessagePool::default();
364        assert_eq!(p.size(), 0);
365        // messages with 5 different prefixes
366        let msg1 = vec![
367            // network |prefix                             | num | tot | len |typ|  dtyp| data
368            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x01, 0x02, 0x01, 0x01, 0x01, 0xc0,
369            0xd6, 0x4e,
370        ];
371        assert_eq!(None, p.try_insert(msg1).unwrap());
372        assert_eq!(p.size(), 1);
373        let msg2 = vec![
374            // network |prefix                             | num | tot | len |typ|  dtyp| data
375            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x03, 0x03, 0x03, 0x02, 0x02, 0x03, 0x01, 0x01, 0xc1,
376            0xff, 0xee, // crc
377            0x7d, 0x2b,
378        ];
379        assert_eq!(None, p.try_insert(msg2).unwrap());
380        assert_eq!(p.size(), 2);
381        let msg1_2 = vec![
382            // network |prefix                             | num | tot | len |typ|  dtyp| data
383            0xAA, 0xCC, 0x02, 0x01, 0x01, 0x02, 0x02, 0x02, 0x02, 0x02, 0x01, 0x01, 0x01, 0xd0,
384            0x4d, 0x5c,
385        ];
386        assert_eq!(
387            Some(P2pMessage {
388                typ: MessageType::Data,
389                data_type: 0x01,
390                data: [0xc0, 0xd0].to_vec()
391            }),
392            p.try_insert(msg1_2).unwrap()
393        );
394        // with ^^ try_insert, msg1 was joined with msg1_2 so pool should have just one part
395        assert_eq!(p.size(), 1)
396    }
397
398    #[test]
399    fn test_pool_reset() {
400        let mut p = MessagePool::default();
401        assert_eq!(p.size(), 0);
402        let msg1 = vec![
403            // network |prefix                             | num | tot | len |typ|  dtyp| data
404            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x02, 0x02, 0x02, 0x01, 0x02, 0x03, 0x01, 0x01, 0xc0,
405            0xff, 0xee, // crc
406            0x99, 0x6c,
407        ];
408        assert_eq!(None, p.try_insert(msg1).unwrap());
409        assert_eq!(p.size(), 1);
410        let msg2 = vec![
411            // network |prefix                             | num | tot | len |typ|  dtyp| data
412            0xAA, 0xCC, 0x01, 0x01, 0x01, 0x03, 0x03, 0x03, 0x02, 0x02, 0x03, 0x01, 0x01, 0xc1,
413            0xff, 0xee, // crc
414            0x7d, 0x2b,
415        ];
416        assert_eq!(None, p.try_insert(msg2.clone()).unwrap());
417        assert_eq!(p.size(), 2);
418
419        p.reset();
420        assert_eq!(p.size(), 0);
421
422        assert_eq!(None, p.try_insert(msg2).unwrap());
423        assert_eq!(p.size(), 1);
424
425        p.reset();
426        assert_eq!(p.size(), 0);
427    }
428
429    #[test]
430    fn test_slicer_single_message() {
431        let mut s = MessageSlicer::new(0xdead_beef_cafe_d00d, crate::network::DEFAULT);
432        let parts = s
433            .slice(&[0xc0, 0xff, 0xee], MessageType::Data, 0x01)
434            .unwrap();
435        assert_eq!(parts.len(), 1);
436        assert_eq!(
437            parts[0],
438            // network     | rnd part        |prefix            |p_n|   tot |part_l|typ|  dtyp|data ->
439            &[
440                0xAA, 0xCC, 0x2a, 0x73, 0x5c, 0x3c, 0xce, 0x55, 0x01, 0x01, 0x03, 0x01, 0x01, 0xc0,
441                0xff, 0xee, // crc->
442                0xb7, 0xd3
443            ]
444        );
445
446        // try to insert if we get the same message
447        let mut p = MessagePool {
448            network: crate::network::DEFAULT,
449            ..Default::default()
450        };
451        assert_eq!(
452            Some(P2pMessage {
453                typ: MessageType::Data,
454                data_type: 0x01,
455                data: [0xc0, 0xff, 0xee].to_vec()
456            }),
457            p.try_insert(parts[0].clone()).unwrap()
458        );
459    }
460
461    #[test]
462    fn test_slicer_two_parts() {
463        let mut s = MessageSlicer::new(0xdead_beef_cafe_d00d, crate::network::TEST);
464        let mut test_data_message = vec![];
465        for b in core::iter::repeat(0xff)
466            .take(crate::MAX_LORA_MESSAGE_SIZE - crate::HEADER_LENGTH - crate::CRC_LENGTH)
467        {
468            test_data_message.push(b);
469        }
470        test_data_message.extend_from_slice(&[0xc0, 0xff, 0xee]); // this data should end up in the second part
471        let parts = s
472            .slice(&test_data_message, MessageType::Data, 0x01)
473            .unwrap();
474        assert_eq!(parts.len(), 2);
475        // TODO test part 1
476        assert_eq!(
477            parts[1],
478            //  network b  | rnd part        |prefix            |p_n|   tot |part_l|typ|  dtyp|data ->
479            &[
480                0xCC, 0xAA, 0x2b, 0x7a, 0xeb, 0x3c, 0xce, 0x55, 0x02, 0x02, 0x03, 0x01, 0x01, 0xc0,
481                0xff, 0xee, // crc->
482                0x5e, 0xa4
483            ]
484        );
485    }
486
487    #[test]
488    fn test_slicer_too_long_data() {
489        let mut s = MessageSlicer::new(0xdead_beef_cafe_d00d, crate::network::DEFAULT);
490        let mut test_data_message = vec![];
491        for b in core::iter::repeat(0xff).take(MAX_OVERLINE_MESSAGE_LENGTH + 1) {
492            test_data_message.push(b);
493        }
494        assert_eq!(
495            Err(Error::TooLong),
496            s.slice(&test_data_message, MessageType::Other, 0x01)
497        );
498    }
499}