mles_utils/
lib.rs

1#![warn(missing_docs)]
2//! `Mles utils` library is provided for Mles client and server implementations for easy handling of
3//! proper header and message structures.
4
5/* This Source Code Form is subject to the terms of the Mozilla Public
6*  License, v. 2.0. If a copy of the MPL was not distributed with this
7*  file, You can obtain one at http://mozilla.org/MPL/2.0/.
8*
9*  Copyright (C) 2017-2018  Mles developers
10* */
11#[macro_use]
12extern crate serde_derive;
13extern crate serde_bytes;
14extern crate serde_cbor;
15
16mod frame;
17mod local_db;
18mod peer;
19mod server;
20
21use siphasher::sip::SipHasher;
22use std::hash::{Hash, Hasher};
23use std::io::Cursor;
24use std::io::Write;
25use std::io::{Error, Read};
26use std::net::TcpStream;
27
28use bytes::{Buf, BufMut, BytesMut};
29use std::net::{IpAddr, SocketAddr};
30
31/// HDRL defines the size of the header including version, length and timestamp
32pub(crate) const HDRL: usize = 8;
33/// CIDL defines the size of the connection id
34pub(crate) const CIDL: usize = 4;
35/// KEYL defines the size of the key
36pub(crate) const KEYL: usize = 8;
37/// HDRKEYL defines the size of the header + key
38pub(crate) const HDRKEYL: usize = HDRL + KEYL;
39
40/// Max message size
41pub(crate) const MSGMAXSIZE: usize = 0xffffff;
42
43const KEEPALIVE: u64 = 5;
44
45/// MsgHdr structure
46///
47/// This structure defines the header of the Mles message including first 'M' byte,
48/// length of the encoded data, connection id and SipHash key.
49/// Encoded message will always be in network byte order.
50///
51pub struct MsgHdr {
52    thlen: u32,
53    cid: u32,
54    key: u64,
55}
56
57impl MsgHdr {
58    /// Create a new MsgHdr object with length, cid and key.
59    ///
60    /// # Example
61    /// ```
62    /// use mles_utils::{MsgHdr};
63    ///
64    /// let key = 0xf00f;
65    /// let cid = MsgHdr::select_cid(key);
66    /// let len = 0;
67    ///
68    /// let msghdr = MsgHdr::new(len, cid, key);
69    /// ```
70    pub fn new(len: u32, cid: u32, key: u64) -> MsgHdr {
71        MsgHdr {
72            thlen: hdr_set_len(len),
73            cid,
74            key,
75        }
76    }
77
78    /// Get type of MsgHdr.
79    ///
80    /// # Example
81    /// ```
82    /// use mles_utils::{MsgHdr};
83    ///
84    /// let key = 0xf00f;
85    /// let cid = MsgHdr::select_cid(key);
86    /// let len = 0;
87    ///
88    /// let mut msghdr = MsgHdr::new(len, cid, key);
89    /// msghdr.get_type();
90    /// assert_eq!('M' as u8, msghdr.get_type());
91    /// ```
92    pub fn get_type(&self) -> u8 {
93        hdr_get_type(self.thlen)
94    }
95
96    /// Get MsgHdr length on the line.
97    ///
98    pub fn get_hdrkey_len() -> usize {
99        HDRKEYL
100    }
101
102    /// Set length of MsgHdr.
103    ///
104    /// # Example
105    /// ```
106    /// use mles_utils::{MsgHdr};
107    ///
108    /// let key = 0xf00f;
109    /// let cid = MsgHdr::select_cid(key);
110    /// let len = 0;
111    ///
112    /// let mut msghdr = MsgHdr::new(len, cid, key);
113    /// msghdr.set_len(515);
114    /// ```
115    pub fn set_len(&mut self, len: u32) {
116        self.thlen = hdr_set_len(len);
117    }
118
119    /// Get length of MsgHdr.
120    ///
121    /// # Example
122    /// ```
123    /// use mles_utils::{MsgHdr};
124    ///
125    /// let key = 0xf00f;
126    /// let cid = MsgHdr::select_cid(key);
127    /// let len = 0;
128    ///
129    /// let mut msghdr = MsgHdr::new(len, cid, key);
130    /// msghdr.set_len(515);
131    /// assert_eq!(515, msghdr.get_len());
132    /// ```
133    pub fn get_len(&self) -> u32 {
134        hdr_get_len(self.thlen)
135    }
136
137    /// Set cid of MsgHdr.
138    ///
139    /// # Example
140    /// ```
141    /// use mles_utils::{MsgHdr};
142    ///
143    /// let key = 0xf00f;
144    /// let cid = MsgHdr::select_cid(key);
145    /// let len = 0;
146    ///
147    /// let mut msghdr = MsgHdr::new(len, cid, key);
148    /// msghdr.set_cid(515);
149    /// ```
150    pub fn set_cid(&mut self, cid: u32) {
151        self.cid = cid;
152    }
153
154    /// Get cid of MsgHdr.
155    ///
156    /// # Example
157    /// ```
158    /// use mles_utils::{MsgHdr};
159    ///
160    /// let key = 0xf00f;
161    /// let cid = MsgHdr::select_cid(key);
162    /// let len = 0;
163    ///
164    /// let mut msghdr = MsgHdr::new(len, cid, key);
165    /// msghdr.set_cid(515);
166    /// assert_eq!(515, msghdr.get_cid());
167    /// ```
168    pub fn get_cid(&self) -> u32 {
169        self.cid
170    }
171
172    /// Set key of MsgHdr.
173    ///
174    /// # Example
175    /// ```
176    /// use mles_utils::{MsgHdr};
177    ///
178    /// let key = 0xf00f;
179    /// let cid = MsgHdr::select_cid(key);
180    /// let len = 0;
181    ///
182    /// let mut msghdr = MsgHdr::new(len, cid, key);
183    /// msghdr.set_key(515);
184    /// ```
185    pub fn set_key(&mut self, key: u64) {
186        self.key = key;
187    }
188
189    /// Get key of MsgHdr.
190    ///
191    /// # Example
192    /// ```
193    /// use mles_utils::{MsgHdr};
194    ///
195    /// let key = 0xf00f;
196    /// let cid = MsgHdr::select_cid(key);
197    /// let len = 0;
198    ///
199    /// let mut msghdr = MsgHdr::new(len, cid, key);
200    /// msghdr.set_key(515);
201    /// assert_eq!(515, msghdr.get_key());
202    /// ```
203    pub fn get_key(&self) -> u64 {
204        self.key
205    }
206
207    /// Encode MsgHdr to line format.
208    ///
209    ///
210    /// # Example
211    /// ```
212    /// use mles_utils::{MsgHdr};
213    ///
214    /// let key = 0xf00f;
215    /// let cid = MsgHdr::select_cid(key);
216    /// let len = 0;
217    ///
218    /// let mut msghdr = MsgHdr::new(len, cid, key);
219    /// let msgv: Vec<u8> = msghdr.encode();
220    /// ```
221    pub fn encode(&self) -> Vec<u8> {
222        let mut msgv = write_hdr(self.get_len() as usize, self.get_cid()).to_vec();
223        msgv.extend(write_key(self.get_key()).to_vec());
224        msgv
225    }
226
227    /// Decode MsgHdr from line format.
228    ///
229    ///
230    /// # Example
231    /// ```
232    /// use mles_utils::{MsgHdr};
233    ///
234    /// let key = 0xf00f;
235    /// let cid = MsgHdr::select_cid(key);
236    /// let len = 16;
237    ///
238    /// let mut msghdr = MsgHdr::new(len, cid, key);
239    /// let msgv: Vec<u8> = msghdr.encode();
240    /// let msgh = MsgHdr::decode(msgv);
241    /// assert_eq!(key, msgh.get_key());
242    /// assert_eq!(cid, msgh.get_cid());
243    /// assert_eq!(len, msgh.get_len());
244    /// ```
245    pub fn decode(buf: Vec<u8>) -> MsgHdr {
246        MsgHdr::new(
247            read_hdr_len(&buf) as u32,
248            read_cid_from_hdr(&buf),
249            read_key_from_hdr(&buf),
250        )
251    }
252    /// Do a valid hash for Mles over provided UTF-8 String list.
253    ///
254    /// # Example
255    /// ```
256    /// use mles_utils::MsgHdr;
257    ///
258    /// let hashstr1 = "A string".to_string();
259    /// let hashstr2 = "Another string".to_string();
260    /// let hashable = vec![hashstr1, hashstr2];
261    /// let key: u64 = MsgHdr::do_hash(&hashable);
262    /// ```
263    #[inline]
264    pub fn do_hash(t: &[String]) -> u64 {
265        let mut s = SipHasher::new();
266        for item in t {
267            item.hash(&mut s);
268        }
269        s.finish()
270    }
271
272    /// Return a connection id from key.
273    ///
274    /// # Example
275    /// ```
276    /// use mles_utils::MsgHdr;
277    ///
278    /// let cid = MsgHdr::select_cid(0x1000000100000001);
279    /// assert_eq!(cid, 0x00000001);
280    /// ```
281    #[inline]
282    pub fn select_cid(key: u64) -> u32 {
283        key as u32
284    }
285
286    /// Do a valid UTF-8 string from a `SocketAddr`.
287    ///
288    /// For IPv4 the format is "x.x.x.x:y", where x is u8 and y is u16
289    /// For IPv6 the format is "[z:z:z:z:z:z:z:z]:y", where z is u16 in hexadecimal format and y is u16
290    ///
291    /// # Example
292    /// ```
293    ///
294    /// use std::net::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr};
295    /// use mles_utils::MsgHdr;
296    ///
297    /// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
298    /// let addrstr = MsgHdr::addr2str(&addr);
299    ///
300    /// assert_eq!("127.0.0.1:8080", addrstr);
301    ///
302    /// let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xff03, 0, 0, 0, 0, 0, 0, 1)), 8077);
303    /// let addrstr = MsgHdr::addr2str(&addr);
304    ///
305    /// assert_eq!("[ff03:0:0:0:0:0:0:1]:8077", addrstr);
306    /// ```
307    #[inline]
308    pub fn addr2str(addr: &SocketAddr) -> String {
309        let ipaddr = addr.ip();
310        match ipaddr {
311            IpAddr::V4(v4) => {
312                let v4oct = v4.octets();
313                let v4str = format!(
314                    "{}.{}.{}.{}:{}",
315                    v4oct[0],
316                    v4oct[1],
317                    v4oct[2],
318                    v4oct[3],
319                    addr.port()
320                );
321                v4str
322            }
323            IpAddr::V6(v6) => {
324                let v6seg = v6.segments();
325                let v6str = format!(
326                    "[{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}]:{}",
327                    v6seg[0],
328                    v6seg[1],
329                    v6seg[2],
330                    v6seg[3],
331                    v6seg[4],
332                    v6seg[5],
333                    v6seg[6],
334                    v6seg[7],
335                    addr.port()
336                );
337                v6str
338            }
339        }
340    }
341}
342
343fn hdr_set_len(len: u32) -> u32 {
344    77 << 24 | len & 0xffffff
345}
346
347fn hdr_get_len(thlen: u32) -> u32 {
348    thlen & 0xffffff
349}
350
351fn hdr_get_type(thlen: u32) -> u8 {
352    (thlen >> 24) as u8
353}
354
355/// Msg structure
356///
357/// This structure defines the Mles interface value triplet (uid, channel, message).
358/// It is eventually serialized and deserialized by CBOR.
359///
360#[derive(Serialize, Deserialize, Debug, Clone)]
361pub struct Msg {
362    uid: String,
363    channel: String,
364    #[serde(with = "serde_bytes")]
365    message: Vec<u8>,
366}
367
368impl Msg {
369    /// Create a new Msg object with value triplet.
370    ///
371    /// # Example
372    /// ```
373    /// use mles_utils::Msg;
374    ///
375    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
376    /// ```
377    #[inline]
378    pub fn new(uid: String, channel: String, message: Vec<u8>) -> Msg {
379        Msg {
380            uid,
381            channel,
382            message,
383        }
384    }
385
386    /// Set uid for Msg object.
387    ///
388    /// # Example
389    /// ```
390    /// use mles_utils::Msg;
391    ///
392    /// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
393    /// let msg = msg.set_uid("New uid".to_string());
394    ///
395    /// assert_eq!("New uid".to_string(), *msg.get_uid());
396    /// ```
397    #[inline]
398    pub fn set_uid(mut self, uid: String) -> Msg {
399        self.uid = uid;
400        self
401    }
402
403    /// Set channel for Msg object.
404    ///
405    /// # Example
406    /// ```
407    /// use mles_utils::Msg;
408    ///
409    /// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
410    /// let msg = msg.set_channel("New channel".to_string());
411    ///
412    /// assert_eq!("New channel".to_string(), *msg.get_channel());
413    /// ```
414    #[inline]
415    pub fn set_channel(mut self, channel: String) -> Msg {
416        self.channel = channel;
417        self
418    }
419
420    /// Set message for Msg object.
421    ///
422    /// # Example
423    /// ```
424    /// use mles_utils::Msg;
425    ///
426    /// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
427    /// let new_message: Vec<u8> = "New message".to_string().into_bytes();
428    /// let msg = msg.set_message(new_message);
429    /// ```
430    #[inline]
431    pub fn set_message(mut self, message: Vec<u8>) -> Msg {
432        self.message = message;
433        self
434    }
435
436    /// Get uid for Msg object. See example for set uid.
437    #[inline]
438    pub fn get_uid(&self) -> &String {
439        &self.uid
440    }
441
442    /// Get channel for Msg object. See example for set channel.
443    #[inline]
444    pub fn get_channel(&self) -> &String {
445        &self.channel
446    }
447
448    /// Get message for Msg object.
449    ///
450    /// # Example
451    /// ```
452    /// use mles_utils::Msg;
453    ///
454    /// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
455    /// let msg: &Vec<u8> = msg.get_message();
456    /// ```
457    #[inline]
458    pub fn get_message(&self) -> &Vec<u8> {
459        &self.message
460    }
461
462    /// Get message len for Msg object.
463    ///
464    /// # Example
465    /// ```
466    /// use mles_utils::Msg;
467    ///
468    /// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
469    /// let msg_len: usize = msg.get_message_len();
470    /// ```
471    #[inline]
472    pub fn get_message_len(&self) -> usize {
473        self.message.len()
474    }
475
476    /// Get mutable message reference for Msg object.
477    ///
478    /// # Example
479    /// ```
480    /// use mles_utils::Msg;
481    ///
482    /// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), "My
483    /// message".to_string().into_bytes());
484    /// let message = msg.get_mut_message();
485    /// message.extend_from_slice(&" is mutable".to_string().into_bytes());
486    /// ```
487    #[inline]
488    pub fn get_mut_message(&mut self) -> &mut Vec<u8> {
489        &mut self.message
490    }
491
492    /// Encode Msg object to CBOR.
493    ///
494    /// # Errors
495    /// If message cannot be encoded, an empty vector is returned.
496    ///
497    /// # Example
498    /// ```
499    /// use mles_utils::Msg;
500    ///
501    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
502    /// let encoded_msg: Vec<u8> = msg.encode();
503    /// ```
504    #[inline]
505    pub fn encode(&self) -> Vec<u8> {
506        let encoded = serde_cbor::to_vec(self);
507        match encoded {
508            Ok(encoded) => encoded,
509            Err(err) => {
510                println!("Error on encode: {}", err);
511                Vec::new()
512            }
513        }
514    }
515
516    /// Decode CBOR byte string to Msg object.
517    ///
518    /// # Errors
519    /// If message cannot be decoded, a Msg structure with empty items is returned.
520    ///
521    /// # Example
522    /// ```
523    /// use mles_utils::Msg;
524    ///
525    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
526    /// let encoded_msg: Vec<u8> = msg.encode();
527    /// let decoded_msg: Msg = Msg::decode(&encoded_msg);
528    /// ```
529    #[inline]
530    pub fn decode(slice: &[u8]) -> Msg {
531        let value = serde_cbor::from_slice(slice);
532        match value {
533            Ok(value) => value,
534            Err(err) => {
535                println!("Error on decode: {}", err);
536                Msg {
537                    uid: "".to_string(),
538                    channel: "".to_string(),
539                    message: Vec::new(),
540                } // return empty vec in case of error
541            }
542        }
543    }
544}
545
546#[derive(Serialize, Deserialize, Debug, Clone)]
547struct MsgVec {
548    #[serde(with = "serde_bytes")]
549    encoded_msg: Vec<u8>, // this is encoded Msg
550}
551
552impl MsgVec {
553    pub fn new(encoded_msg: &Vec<u8>) -> MsgVec {
554        MsgVec {
555            encoded_msg: encoded_msg.clone(),
556        }
557    }
558
559    pub fn get(&self) -> &Vec<u8> {
560        &self.encoded_msg
561    }
562}
563
564/// ResyncMsg structure
565///
566/// This structure defines resynchronization Msg structure that can be used
567/// to resynchronize history state to root server from peers. The resynchronization
568/// message can be sent only during initial connection message and packs the
569/// history into one message that can be taken into account by Mles root server.
570///
571#[derive(Serialize, Deserialize, Debug, Clone)]
572pub struct ResyncMsg {
573    resync_message: Vec<MsgVec>,
574}
575
576impl ResyncMsg {
577    /// Create a new ResyncMsg object with encoded message vector.
578    ///
579    /// # Example
580    /// ```
581    /// use mles_utils::{Msg, ResyncMsg};
582    ///
583    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
584    /// let msg = msg.encode();
585    /// let vec = vec![msg];
586    /// let rmsg = ResyncMsg::new(&vec);
587    /// ```
588    #[inline]
589    pub fn new(messages: &Vec<Vec<u8>>) -> ResyncMsg {
590        let mut rmsg = ResyncMsg {
591            resync_message: Vec::new(),
592        };
593        //transform to correct format
594        for msg in messages {
595            rmsg.resync_message.push(MsgVec::new(&msg));
596        }
597        rmsg
598    }
599
600    /// Get the length of the resync message vector
601    ///
602    /// # Example
603    /// ```
604    /// use mles_utils::{Msg, ResyncMsg};
605    ///
606    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
607    /// let msg = msg.encode();
608    /// let vec = vec![msg];
609    /// let rmsg = ResyncMsg::new(&vec);
610    /// assert_eq!(1, rmsg.len());
611    /// ```
612    #[inline]
613    pub fn len(&self) -> usize {
614        self.resync_message.len()
615    }
616
617    /// Get all items of the resync message vector
618    ///
619    /// # Example
620    /// ```
621    /// use mles_utils::{Msg, ResyncMsg};
622    ///
623    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
624    /// let msg = msg.encode();
625    /// let vec = vec![msg];
626    /// let rmsg = ResyncMsg::new(&vec);
627    /// let rvec = rmsg.get_messages();
628    /// assert_eq!(vec[0], rvec[0]);
629    /// ```
630    #[inline]
631    pub fn get_messages(&self) -> Vec<Vec<u8>> {
632        //transform to correct format
633        let mut messages = Vec::new();
634        for msg in self.resync_message.iter() {
635            let msg = msg.get();
636            messages.push(msg.clone());
637        }
638        messages
639    }
640
641    /// Encode ResyncMsg object to CBOR.
642    ///
643    /// # Errors
644    /// If resync message cannot be encoded, an empty vector is returned.
645    ///
646    /// # Example
647    /// ```
648    /// use mles_utils::{ResyncMsg, Msg};
649    ///
650    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
651    /// let msg = msg.encode();
652    /// let vec = vec![msg];
653    /// let rmsg = ResyncMsg::new(&vec);
654    /// let encoded_msg: Vec<u8> = rmsg.encode();
655    /// ```
656    #[inline]
657    pub fn encode(&self) -> Vec<u8> {
658        let encoded = serde_cbor::to_vec(self);
659        match encoded {
660            Ok(encoded) => encoded,
661            Err(err) => {
662                println!("Error on resync encode: {}", err);
663                Vec::new()
664            }
665        }
666    }
667
668    /// Decode CBOR byte string to ResyncMsg object.
669    ///
670    /// # Errors
671    /// If message cannot be decoded, a ResyncMsg structure with empty items is returned.
672    ///
673    /// # Example
674    /// ```
675    /// use mles_utils::{ResyncMsg, Msg};
676    ///
677    /// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
678    /// let msg = msg.encode();
679    /// let vec = vec![msg];
680    /// let rmsg = ResyncMsg::new(&vec);
681    /// let encoded_msg: Vec<u8> = rmsg.encode();
682    /// let decoded_msg: ResyncMsg = ResyncMsg::decode(&encoded_msg);
683    /// assert_eq!(1, decoded_msg.len());
684    /// ```
685    #[inline]
686    pub fn decode(slice: &[u8]) -> ResyncMsg {
687        let value = serde_cbor::from_slice(slice);
688        match value {
689            Ok(value) => value,
690            Err(_) => {
691                ResyncMsg {
692                    resync_message: Vec::new(),
693                } // return empty vec in case of error
694            }
695        }
696    }
697}
698
699/// Msg connection structure
700///
701/// This structure defines the Mles connection for simple synchronous connections.
702///
703pub struct MsgConn {
704    uid: String,
705    channel: String,
706    key: Option<u64>,
707    stream: Option<TcpStream>,
708}
709
710impl MsgConn {
711    /// Create a new MsgConn object for a connection.
712    ///
713    /// # Example
714    /// ```
715    /// use mles_utils::MsgConn;
716    ///
717    /// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
718    /// ```
719    #[inline]
720    pub fn new(uid: String, channel: String) -> MsgConn {
721        MsgConn {
722            uid,
723            channel,
724            key: None,
725            stream: None,
726        }
727    }
728
729    /// Gets the defined uid.
730    ///
731    /// # Example
732    /// ```
733    /// use mles_utils::MsgConn;
734    ///
735    /// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
736    /// assert_eq!("My uid".to_string(), conn.get_uid());
737    /// ```
738    #[inline]
739    pub fn get_uid(&self) -> String {
740        self.uid.clone()
741    }
742
743    /// Gets the defined channel.
744    ///
745    /// # Example
746    /// ```
747    /// use mles_utils::MsgConn;
748    ///
749    /// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
750    /// assert_eq!("My channel".to_string(), conn.get_channel());
751    /// ```
752    #[inline]
753    pub fn get_channel(&self) -> String {
754        self.channel.clone()
755    }
756
757    /// Gets the defined key.
758    ///
759    /// # Example
760    /// ```
761    /// use mles_utils::MsgConn;
762    ///
763    /// //key is set only when connection is initiated..
764    /// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
765    /// assert_eq!(true, conn.get_key().is_none());
766    /// ```
767    #[inline]
768    pub fn get_key(&self) -> Option<u64> {
769        self.key
770    }
771
772    /// Connects to the defined address with a message.
773    ///
774    #[inline]
775    pub fn connect_with_message(mut self, raddr: SocketAddr, msg: Vec<u8>) -> MsgConn {
776        let msg = Msg::new(self.get_uid(), self.get_channel(), msg);
777        match TcpStream::connect(raddr) {
778            Ok(mut stream) => {
779                let _val = stream.set_nodelay(true);
780
781                if self.get_key().is_none() {
782                    let mut keys = Vec::new();
783
784                    let laddr = match stream.local_addr() {
785                        Ok(laddr) => laddr,
786                        Err(_) => {
787                            let addr = "0.0.0.0:0";
788                            addr.parse::<SocketAddr>().unwrap()
789                        }
790                    };
791                    keys.push(MsgHdr::addr2str(&laddr));
792                    keys.push(self.get_uid());
793                    keys.push(self.get_channel());
794                    let key = MsgHdr::do_hash(&keys);
795                    self.key = Some(key);
796                }
797                let encoded_msg = msg.encode();
798                let key = self.get_key().unwrap();
799                let keyv = write_key(key);
800                let mut msgv = write_hdr_with_capacity(
801                    encoded_msg.len(),
802                    MsgHdr::select_cid(key),
803                    HDRKEYL + encoded_msg.len(),
804                );
805                msgv.extend(keyv);
806                msgv.extend(encoded_msg);
807                let msgv = msgv.freeze();
808                match stream.write_all(msgv.as_ref()) {
809                    Ok(_) => self.stream = Some(stream),
810                    Err(err) => {
811                        println!("Send error {}", err);
812                        self.stream = None;
813                    }
814                }
815                self
816            }
817            Err(_) => {
818                println!("Could not connect to server {}", raddr);
819                self
820            }
821        }
822    }
823
824    /// Connects to the defined address (without a message).
825    ///
826    #[inline]
827    pub fn connect(self, raddr: SocketAddr) -> MsgConn {
828        self.connect_with_message(raddr, Vec::new())
829    }
830
831    /// Send a message. Blocks until a message is sent.
832    ///
833    /// # Errors
834    /// If a message cannot be sent, stream is set to None.
835    ///
836    #[inline]
837    pub fn send_message(mut self, msg: Vec<u8>) -> MsgConn {
838        let message = Msg::new(self.get_uid(), self.get_channel(), msg);
839        let encoded_msg = message.encode();
840        let key = self.get_key().unwrap();
841        let keyv = write_key(key);
842        let mut msgv = write_hdr_with_capacity(
843            encoded_msg.len(),
844            MsgHdr::select_cid(key),
845            HDRKEYL + encoded_msg.len(),
846        );
847        msgv.extend(keyv);
848        msgv.extend(encoded_msg);
849        let msgv = msgv.freeze();
850        let mut stream = self.stream.unwrap();
851        match stream.write_all(msgv.as_ref()) {
852            Ok(_) => self.stream = Some(stream),
853            Err(err) => {
854                println!("Send error {}", err);
855                self.stream = None;
856            }
857        }
858        self
859    }
860
861    /// Reads a message with non-zero message content. Blocks until a message is received.
862    ///
863    /// # Errors
864    /// If message cannot be read, an empty message is returned.
865    ///
866    #[inline]
867    pub fn read_message(mut self) -> (MsgConn, Vec<u8>) {
868        let stream = self.stream.unwrap();
869        loop {
870            let tuple = read_n(&stream, HDRKEYL);
871            let status = tuple.0;
872            if let Ok(0) = status {
873                println!("Read failed: eof");
874                self.stream = None;
875                return (self, Vec::new());
876            }
877            let buf = tuple.1;
878            if buf.is_empty() {
879                continue;
880            }
881            if read_hdr_type(buf.as_slice()) != 'M' as u32 {
882                continue;
883            }
884            let hdr_len = read_hdr_len(buf.as_slice());
885            if 0 == hdr_len {
886                continue;
887            }
888            let tuple = read_n(&stream, hdr_len);
889            let status = tuple.0;
890            if let Ok(0) = status {
891                continue;
892            };
893            let payload = tuple.1;
894            if payload.len() != (hdr_len as usize) {
895                continue;
896            }
897            let decoded_message = Msg::decode(payload.as_slice());
898            if 0 == decoded_message.get_message_len() {
899                continue;
900            }
901            self.stream = Some(stream);
902            return (self, decoded_message.get_message().to_owned());
903        }
904    }
905
906    /// Closes the connection.
907    ///
908    #[inline]
909    pub fn close(mut self) -> MsgConn {
910        if self.stream.is_some() {
911            drop(self.stream.unwrap());
912        }
913        self.stream = None;
914        self
915    }
916}
917
918#[inline]
919pub(crate) fn read_hdr_type(hdr: &[u8]) -> u32 {
920    if hdr.len() < HDRL {
921        return 0;
922    }
923    let mut buf = Cursor::new(&hdr[..]);
924    let num = buf.get_u32_be();
925    num >> 24
926}
927
928fn read_hdr_len(hdr: &[u8]) -> usize {
929    if hdr.len() < HDRL {
930        return 0;
931    }
932    let mut buf = Cursor::new(&hdr[..]);
933    let num = buf.get_u32_be();
934    (num & 0xffffff) as usize
935}
936
937fn write_hdr(len: usize, cid: u32) -> BytesMut {
938    let hdr = (('M' as u32) << 24) | len as u32;
939    let mut msgv = BytesMut::with_capacity(HDRKEYL);
940    msgv.put_u32_be(hdr);
941    msgv.put_u32_be(cid);
942    msgv
943}
944
945fn write_hdr_with_capacity(len: usize, cid: u32, cap: usize) -> BytesMut {
946    let hdr = (('M' as u32) << 24) | len as u32;
947    let mut msgv = BytesMut::with_capacity(cap);
948    msgv.put_u32_be(hdr);
949    msgv.put_u32_be(cid);
950    msgv
951}
952
953fn write_hdr_without_cid(len: usize) -> BytesMut {
954    let hdr = (('M' as u32) << 24) | len as u32;
955    let mut msgv = BytesMut::with_capacity(HDRL);
956    msgv.put_u32_be(hdr);
957    msgv
958}
959
960#[inline]
961pub(crate) fn write_len_to_hdr(len: usize, mut hdrv: BytesMut) -> BytesMut {
962    if hdrv.len() < HDRL {
963        return BytesMut::new();
964    }
965    let tail = hdrv.split_off(HDRL - CIDL);
966    let mut nhdrv = write_hdr_without_cid(len);
967    nhdrv.extend(tail);
968    nhdrv
969}
970
971fn write_key(val: u64) -> BytesMut {
972    let key = val;
973    let mut msgv = BytesMut::with_capacity(KEYL);
974    msgv.put_u64_be(key);
975    msgv
976}
977
978fn write_hdr_with_key(len: usize, key: u64) -> BytesMut {
979    let mut hdrv = write_hdr(len, MsgHdr::select_cid(key));
980    hdrv.extend(write_key(key));
981    hdrv
982}
983
984fn read_key_from_hdr(keyv: &[u8]) -> u64 {
985    if keyv.len() < HDRKEYL {
986        return 0;
987    }
988    let mut buf = Cursor::new(&keyv[HDRL..]);
989    buf.get_u64_be()
990}
991
992fn read_cid_from_hdr(hdrv: &[u8]) -> u32 {
993    if hdrv.len() < HDRL {
994        return 0;
995    }
996    let mut buf = Cursor::new(&hdrv[(HDRL - CIDL)..]);
997    buf.get_u32_be()
998}
999
1000/// Check if a peer is defined
1001///
1002/// # Example
1003/// ```
1004/// use mles_utils::has_peer;
1005///
1006/// let sockaddr = None;
1007/// assert_eq!(false, has_peer(&sockaddr));
1008/// ```
1009#[inline]
1010pub fn has_peer(peer: &Option<SocketAddr>) -> bool {
1011    peer::has_peer(peer)
1012}
1013
1014fn read_n<R>(reader: R, bytes_to_read: usize) -> (Result<usize, Error>, Vec<u8>)
1015where
1016    R: Read,
1017{
1018    let mut buf = Vec::with_capacity(bytes_to_read);
1019    let mut chunk = reader.take(bytes_to_read as u64);
1020    let status = chunk.read_to_end(&mut buf);
1021    (status, buf)
1022}
1023
1024/// Run an Mles server
1025///
1026/// # Example
1027/// ```
1028/// use std::thread;
1029/// use std::net::{IpAddr, Ipv4Addr};
1030/// use std::net::{SocketAddr, ToSocketAddrs};
1031/// use mles_utils::server_run;
1032///
1033/// let uid = "User".to_string();
1034/// let channel = "Channel".to_string();
1035/// let message = "Hello World!".to_string();
1036/// let address = "127.0.0.1:8077".to_string();
1037/// let address = address.parse().unwrap();
1038/// let child = thread::spawn(move || server_run(address, None, "".to_string(), "".to_string(), 100, 0));
1039/// drop(child);
1040/// ```
1041#[inline]
1042pub fn server_run(
1043    address: SocketAddr,
1044    peer: Option<SocketAddr>,
1045    keyval: String,
1046    keyaddr: String,
1047    hist_limit: usize,
1048    debug_flags: u64,
1049) {
1050    server::run(address, peer, keyval, keyaddr, hist_limit, debug_flags);
1051}
1052
1053#[cfg(test)]
1054mod tests {
1055    use super::*;
1056    use std::net::SocketAddr;
1057    use std::thread;
1058    use std::time::Duration;
1059
1060    #[test]
1061    fn test_read_hdr_len_one() {
1062        let orig_len = 1;
1063        let hdrv = write_hdr(orig_len, 0x1);
1064        let len = read_hdr_len(hdrv.as_ref());
1065        assert_eq!(len, orig_len);
1066    }
1067
1068    #[test]
1069    fn test_read_hdr_len_16k() {
1070        let orig_len = 16000;
1071        let hdrv = write_hdr_with_capacity(orig_len, 0x1, HDRKEYL + orig_len);
1072        let len = read_hdr_len(hdrv.as_ref());
1073        assert_eq!(len, orig_len);
1074    }
1075
1076    #[test]
1077    fn test_read_hdr_len_16_7m() {
1078        let orig_len = 16777215;
1079        let hdrv = write_hdr(orig_len, 0x1);
1080        let len = read_hdr_len(hdrv.as_ref());
1081        assert_eq!(len, orig_len);
1082    }
1083
1084    #[test]
1085    fn test_encode_decode_msg() {
1086        let uid = "User".to_string();
1087        let channel = "Channel".to_string();
1088        let msg = "a test msg".to_string().into_bytes();
1089        let orig_msg = Msg::new(uid, channel, msg);
1090        let encoded_msg = orig_msg.encode();
1091        let decoded_msg = Msg::decode(&encoded_msg);
1092        assert_eq!(decoded_msg.uid, orig_msg.uid);
1093        assert_eq!(decoded_msg.channel, orig_msg.channel);
1094        assert_eq!(decoded_msg.message, orig_msg.message);
1095    }
1096
1097    #[test]
1098    fn test_encode_decode_resync_msg() {
1099        let uid = "User".to_string();
1100        let channel = "Channel".to_string();
1101        let msg = "a test msg".to_string().into_bytes();
1102        let orig_msg = Msg::new(uid, channel, msg);
1103        let encoded_msg = orig_msg.encode();
1104        let uid2 = "User two".to_string();
1105        let channel2 = "Channel two".to_string();
1106        let msg2 = "a test msg two".to_string().into_bytes();
1107        let orig_msg2 = Msg::new(uid2, channel2, msg2);
1108        let encoded_msg2 = orig_msg2.encode();
1109        let vec = vec![encoded_msg, encoded_msg2];
1110        let rmsg = ResyncMsg::new(&vec);
1111        let encoded_resync_msg: Vec<u8> = rmsg.encode();
1112        let decoded_resync_msg: ResyncMsg = ResyncMsg::decode(&encoded_resync_msg);
1113        let mut cnt = 0;
1114        for msg in decoded_resync_msg.get_messages() {
1115            let decoded_msg = Msg::decode(&msg);
1116            if 0 == cnt {
1117                assert_eq!(decoded_msg.uid, orig_msg.uid);
1118                assert_eq!(decoded_msg.channel, orig_msg.channel);
1119                assert_eq!(decoded_msg.message, orig_msg.message);
1120            } else {
1121                assert_eq!(decoded_msg.uid, orig_msg2.uid);
1122                assert_eq!(decoded_msg.channel, orig_msg2.channel);
1123                assert_eq!(decoded_msg.message, orig_msg2.message);
1124            }
1125            cnt += 1;
1126        }
1127    }
1128
1129    #[test]
1130    fn test_set_get_msg() {
1131        let uid = "User".to_string();
1132        let channel = "Channel".to_string();
1133        let msg = "a test msg".to_string().into_bytes();
1134        let orig_msg = Msg::new("".to_string(), channel.to_string(), Vec::new());
1135        let orig_msg = orig_msg.set_uid(uid.clone());
1136        let orig_msg = orig_msg.set_channel(channel.clone());
1137        let orig_msg = orig_msg.set_message(msg.clone());
1138        assert_eq!(&uid, orig_msg.get_uid());
1139        assert_eq!(&channel, orig_msg.get_channel());
1140        assert_eq!(&msg, orig_msg.get_message());
1141    }
1142
1143    #[test]
1144    fn test_set_get_mut_msg() {
1145        let uid = "User".to_string();
1146        let channel = "Channel".to_string();
1147        let omsg = "a test ".to_string().into_bytes();
1148        let nmsg = "a test mut msg".to_string().into_bytes();
1149        let orig_msg = Msg::new("".to_string(), channel.to_string(), omsg);
1150        let orig_msg = orig_msg.set_uid(uid.clone());
1151        let mut orig_msg = orig_msg.set_channel(channel.clone());
1152        let mut_msg = orig_msg.get_mut_message();
1153        mut_msg.extend_from_slice(&"mut msg".to_string().into_bytes());
1154        assert_eq!(&uid, orig_msg.get_uid());
1155        assert_eq!(&channel, orig_msg.get_channel());
1156        assert_eq!(&nmsg, orig_msg.get_message());
1157    }
1158
1159    #[test]
1160    fn test_cid() {
1161        let orig_key = 0xffeffe;
1162        let hdrv = write_hdr_with_key(64, orig_key);
1163        let orig_len = hdrv.len();
1164        let key = read_key_from_hdr(&hdrv);
1165        assert_eq!(orig_key, key);
1166        let read_cid = read_cid_from_hdr(&hdrv);
1167        assert_eq!(orig_key as u32, read_cid);
1168        let key = read_key_from_hdr(&hdrv);
1169        assert_eq!(orig_key, key);
1170        let len = hdrv.len();
1171        assert_eq!(orig_len, len);
1172    }
1173
1174    #[test]
1175    fn test_msgconn_send_read() {
1176        let sec = Duration::new(1, 0);
1177        let addr = "127.0.0.1:8078";
1178        let addr = addr.parse::<SocketAddr>().unwrap();
1179        let raddr = addr.clone();
1180        let uid = "User".to_string();
1181        let uid2 = "User two".to_string();
1182        let channel = "Channel".to_string();
1183        let message = "Hello World!".to_string();
1184
1185        //create server
1186        let child =
1187            thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1188        thread::sleep(sec);
1189
1190        //send hello world
1191        let mut conn = MsgConn::new(uid2.clone(), channel.clone());
1192        conn = conn.connect_with_message(raddr, message.into_bytes());
1193        conn.close();
1194
1195        //read hello world
1196        let mut conn = MsgConn::new(uid.clone(), channel.clone());
1197        conn = conn.connect(raddr);
1198        let (conn, msg) = conn.read_message();
1199        let msg = String::from_utf8_lossy(msg.as_slice());
1200        assert_eq!("Hello World!", msg);
1201
1202        //close connection
1203        conn.close();
1204
1205        //drop server
1206        drop(child);
1207    }
1208
1209    #[test]
1210    fn test_msgconn_read_send() {
1211        let sec = Duration::new(1, 0);
1212        let addr = "127.0.0.1:8076";
1213        let addr = addr.parse::<SocketAddr>().unwrap();
1214        let raddr = addr.clone();
1215        let uid = "User".to_string();
1216        let uid2 = "User two".to_string();
1217        let channel = "Channel".to_string();
1218        let message = "Hello World!".to_string();
1219
1220        //create server
1221        let child =
1222            thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1223        thread::sleep(sec);
1224
1225        //read connect
1226        let mut conn = MsgConn::new(uid.clone(), channel.clone());
1227        conn = conn.connect(raddr);
1228
1229        //send hello world
1230        let mut sconn = MsgConn::new(uid2.clone(), channel.clone());
1231        sconn = sconn.connect_with_message(raddr, message.into_bytes());
1232        sconn.close();
1233
1234        //read hello world
1235        let (conn, msg) = conn.read_message();
1236        let msg = String::from_utf8_lossy(msg.as_slice());
1237        assert_eq!("Hello World!", msg);
1238
1239        //close connection
1240        conn.close();
1241
1242        //drop server
1243        drop(child);
1244    }
1245
1246    #[test]
1247    fn test_msgconn_peer_send_read() {
1248        let sec = Duration::new(1, 0);
1249        let addr = "127.0.0.1:8075";
1250        let addr = addr.parse::<SocketAddr>().unwrap();
1251        let paddr = "127.0.0.1:8074";
1252        let paddr = paddr.parse::<SocketAddr>().unwrap();
1253        let praddr = paddr.clone();
1254        let uid = "User".to_string();
1255        let uid2 = "User two".to_string();
1256        let channel = "Channel".to_string();
1257        let message = "Hello World!".to_string();
1258
1259        //create server
1260        let child =
1261            thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1262        thread::sleep(sec);
1263
1264        //create peer server
1265        let pchild = thread::spawn(move || {
1266            server_run(paddr, Some(addr), "".to_string(), "".to_string(), 100, 0)
1267        });
1268        thread::sleep(sec);
1269
1270        //send hello world
1271        let mut conn = MsgConn::new(uid.clone(), channel.clone());
1272        conn = conn.connect_with_message(praddr, message.into_bytes());
1273        conn.close();
1274
1275        //read hello world
1276        let mut conn = MsgConn::new(uid2.clone(), channel.clone());
1277        conn = conn.connect(praddr);
1278        let (conn, msg) = conn.read_message();
1279        let msg = String::from_utf8_lossy(msg.as_slice());
1280        assert_eq!("Hello World!", msg);
1281
1282        //close connection
1283        conn.close();
1284
1285        //drop peer server
1286        drop(pchild);
1287
1288        //drop server
1289        drop(child);
1290    }
1291
1292    #[test]
1293    fn test_msgconn_peer_read_send() {
1294        let sec = Duration::new(1, 0);
1295        let addr = "127.0.0.1:8073";
1296        let addr = addr.parse::<SocketAddr>().unwrap();
1297        let paddr = "127.0.0.1:8072";
1298        let paddr = paddr.parse::<SocketAddr>().unwrap();
1299        let praddr = paddr.clone();
1300        let uid = "User".to_string();
1301        let uid2 = "User two".to_string();
1302        let channel = "Channel".to_string();
1303        let message = "Hello World!".to_string();
1304
1305        //create server
1306        let child =
1307            thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
1308        thread::sleep(sec);
1309
1310        //create peer server
1311        let pchild = thread::spawn(move || {
1312            server_run(paddr, Some(addr), "".to_string(), "".to_string(), 100, 0)
1313        });
1314        thread::sleep(sec);
1315
1316        //read connect
1317        let mut conn = MsgConn::new(uid.clone(), channel.clone());
1318        conn = conn.connect(praddr);
1319
1320        //send hello world
1321        let mut sconn = MsgConn::new(uid2.clone(), channel.clone());
1322        sconn = sconn.connect_with_message(praddr, message.into_bytes());
1323        sconn.close();
1324
1325        //read hello world
1326        let (conn, msg) = conn.read_message();
1327        let msg = String::from_utf8_lossy(msg.as_slice());
1328        assert_eq!("Hello World!", msg);
1329
1330        //close connection
1331        conn.close();
1332
1333        //drop peer server
1334        drop(pchild);
1335
1336        //drop server
1337        drop(child);
1338    }
1339
1340    #[test]
1341    fn test_msgconn_basic_read_send() {
1342        let sec = Duration::new(1, 0);
1343        //set server address to connect
1344        let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
1345        //create server
1346        let serv =
1347            thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 0, 0));
1348        thread::sleep(sec);
1349
1350        let child = thread::spawn(|| {
1351            let uid = "User two".to_string();
1352            let channel = "Channel".to_string();
1353            let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
1354            //connect client to server
1355            let mut conn = MsgConn::new(uid, channel);
1356            conn = conn.connect(addr);
1357
1358            //blocking read for hello world
1359            let (conn, msg) = conn.read_message();
1360            let msg = String::from_utf8_lossy(msg.as_slice());
1361            assert_eq!("Hello World!", msg);
1362            conn.close();
1363        });
1364        thread::sleep(sec);
1365
1366        let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
1367        let uid = "User".to_string();
1368        let channel = "Channel".to_string();
1369        let message = "Hello World!".to_string();
1370
1371        //send hello world to awaiting client
1372        let mut conn = MsgConn::new(uid, channel);
1373        conn = conn.connect_with_message(addr, message.into_bytes());
1374        conn.close();
1375
1376        let _res = child.join();
1377
1378        drop(serv);
1379    }
1380}