#![warn(missing_docs)]
//! `Mles utils` library is provided for Mles client and server implementations for easy handling of
//! proper header and message structures.
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Copyright (C) 2017-2018 Mles developers
* */
#[macro_use]
extern crate serde_derive;
extern crate serde_cbor;
extern crate serde_bytes;
mod server;
mod local_db;
mod frame;
mod peer;
use std::io::Cursor;
use siphasher::sip::SipHasher;
use std::hash::{Hash, Hasher};
use std::net::TcpStream;
use std::io::Write;
use std::io::{Read, Error};
use std::net::{IpAddr, SocketAddr};
use bytes::{BytesMut, BufMut, Buf};
/// HDRL defines the size of the header including version, length and timestamp
pub(crate) const HDRL: usize = 8;
/// CIDL defines the size of the connection id
pub(crate) const CIDL: usize = 4;
/// KEYL defines the size of the key
pub(crate) const KEYL: usize = 8;
/// HDRKEYL defines the size of the header + key
pub(crate) const HDRKEYL: usize = HDRL + KEYL;
/// Max message size
pub(crate) const MSGMAXSIZE: usize = 0xffffff;
const KEEPALIVE: u64 = 5;
/// MsgHdr structure
///
/// This structure defines the header of the Mles message including first 'M' byte,
/// length of the encoded data, connection id and SipHash key.
/// Encoded message will always be in network byte order.
///
pub struct MsgHdr {
thlen: u32,
cid: u32,
key: u64,
}
impl MsgHdr {
/// Create a new MsgHdr object with length, cid and key.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let msghdr = MsgHdr::new(len, cid, key);
/// ```
pub fn new(len: u32, cid: u32, key: u64) -> MsgHdr {
MsgHdr {
thlen: hdr_set_len(len),
cid: cid,
key: key,
}
}
/// Get type of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.get_type();
/// assert_eq!('M' as u8, msghdr.get_type());
/// ```
pub fn get_type(&self) -> u8 {
hdr_get_type(self.thlen)
}
/// Get MsgHdr length on the line.
///
pub fn get_hdrkey_len() -> usize {
HDRKEYL
}
/// Set length of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.set_len(515);
/// ```
pub fn set_len(&mut self, len: u32) {
self.thlen = hdr_set_len(len);
}
/// Get length of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.set_len(515);
/// assert_eq!(515, msghdr.get_len());
/// ```
pub fn get_len(&self) -> u32 {
hdr_get_len(self.thlen)
}
/// Set cid of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.set_cid(515);
/// ```
pub fn set_cid(&mut self, cid: u32) {
self.cid = cid;
}
/// Get cid of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.set_cid(515);
/// assert_eq!(515, msghdr.get_cid());
/// ```
pub fn get_cid(&self) -> u32 {
self.cid
}
/// Set key of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.set_key(515);
/// ```
pub fn set_key(&mut self, key: u64) {
self.key = key;
}
/// Get key of MsgHdr.
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// msghdr.set_key(515);
/// assert_eq!(515, msghdr.get_key());
/// ```
pub fn get_key(&self) -> u64 {
self.key
}
/// Encode MsgHdr to line format.
///
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 0;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// let msgv: Vec<u8> = msghdr.encode();
/// ```
pub fn encode(&self) -> Vec<u8> {
let mut msgv = write_hdr(self.get_len() as usize, self.get_cid()).to_vec();
msgv.extend(write_key(self.get_key()).to_vec());
msgv
}
/// Decode MsgHdr from line format.
///
///
/// # Example
/// ```
/// use mles_utils::{MsgHdr};
///
/// let key = 0xf00f;
/// let cid = MsgHdr::select_cid(key);
/// let len = 16;
///
/// let mut msghdr = MsgHdr::new(len, cid, key);
/// let msgv: Vec<u8> = msghdr.encode();
/// let msgh = MsgHdr::decode(msgv);
/// assert_eq!(key, msgh.get_key());
/// assert_eq!(cid, msgh.get_cid());
/// assert_eq!(len, msgh.get_len());
/// ```
pub fn decode(buf: Vec<u8>) -> MsgHdr {
MsgHdr::new(read_hdr_len(&buf) as u32, read_cid_from_hdr(&buf), read_key_from_hdr(&buf))
}
/// Do a valid hash for Mles over provided UTF-8 String list.
///
/// # Example
/// ```
/// use mles_utils::MsgHdr;
///
/// let hashstr1 = "A string".to_string();
/// let hashstr2 = "Another string".to_string();
/// let hashable = vec![hashstr1, hashstr2];
/// let key: u64 = MsgHdr::do_hash(&hashable);
/// ```
#[inline]
pub fn do_hash(t: &[String]) -> u64 {
let mut s = SipHasher::new();
for item in t {
item.hash(&mut s);
}
s.finish()
}
/// Return a connection id from key.
///
/// # Example
/// ```
/// use mles_utils::MsgHdr;
///
/// let cid = MsgHdr::select_cid(0x1000000100000001);
/// assert_eq!(cid, 0x00000001);
/// ```
#[inline]
pub fn select_cid(key: u64) -> u32 {
key as u32
}
/// Do a valid UTF-8 string from a `SocketAddr`.
///
/// For IPv4 the format is "x.x.x.x:y", where x is u8 and y is u16
/// For IPv6 the format is "[z:z:z:z:z:z:z:z]:y", where z is u16 in hexadecimal format and y is u16
///
/// # Example
/// ```
///
/// use std::net::{SocketAddr, IpAddr, Ipv4Addr, Ipv6Addr};
/// use mles_utils::MsgHdr;
///
/// let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
/// let addrstr = MsgHdr::addr2str(&addr);
///
/// assert_eq!("127.0.0.1:8080", addrstr);
///
/// let addr = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0xff03, 0, 0, 0, 0, 0, 0, 1)), 8077);
/// let addrstr = MsgHdr::addr2str(&addr);
///
/// assert_eq!("[ff03:0:0:0:0:0:0:1]:8077", addrstr);
/// ```
#[inline]
pub fn addr2str(addr: &SocketAddr) -> String {
let ipaddr = addr.ip();
match ipaddr {
IpAddr::V4(v4) => {
let v4oct = v4.octets();
let v4str = format!("{}.{}.{}.{}:{}",
v4oct[0], v4oct[1], v4oct[2], v4oct[3],
addr.port());
v4str
}
IpAddr::V6(v6) => {
let v6seg = v6.segments();
let v6str = format!("[{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}:{:x}]:{}",
v6seg[0], v6seg[1], v6seg[2], v6seg[3],
v6seg[4], v6seg[5], v6seg[6], v6seg[7],
addr.port());
v6str
}
}
}
}
fn hdr_set_len(len: u32) -> u32 {
77 << 24 | len & 0xffffff
}
fn hdr_get_len(thlen: u32) -> u32 {
thlen & 0xffffff
}
fn hdr_get_type(thlen: u32) -> u8 {
(thlen >> 24) as u8
}
/// Msg structure
///
/// This structure defines the Mles interface value triplet (uid, channel, message).
/// It is eventually serialized and deserialized by CBOR.
///
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Msg {
uid: String,
channel: String,
#[serde(with = "serde_bytes")]
message: Vec<u8>,
}
impl Msg {
/// Create a new Msg object with value triplet.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// ```
#[inline]
pub fn new(uid: String, channel: String, message: Vec<u8>) -> Msg {
Msg {
uid: uid,
channel: channel,
message: message
}
}
/// Set uid for Msg object.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.set_uid("New uid".to_string());
///
/// assert_eq!("New uid".to_string(), *msg.get_uid());
/// ```
#[inline]
pub fn set_uid(mut self, uid: String) -> Msg {
self.uid = uid;
self
}
/// Set channel for Msg object.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.set_channel("New channel".to_string());
///
/// assert_eq!("New channel".to_string(), *msg.get_channel());
/// ```
#[inline]
pub fn set_channel(mut self, channel: String) -> Msg {
self.channel = channel;
self
}
/// Set message for Msg object.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let new_message: Vec<u8> = "New message".to_string().into_bytes();
/// let msg = msg.set_message(new_message);
/// ```
#[inline]
pub fn set_message(mut self, message: Vec<u8>) -> Msg {
self.message = message;
self
}
/// Get uid for Msg object. See example for set uid.
#[inline]
pub fn get_uid(&self) -> &String {
&self.uid
}
/// Get channel for Msg object. See example for set channel.
#[inline]
pub fn get_channel(&self) -> &String {
&self.channel
}
/// Get message for Msg object.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg: &Vec<u8> = msg.get_message();
/// ```
#[inline]
pub fn get_message(&self) -> &Vec<u8> {
&self.message
}
/// Get message len for Msg object.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let mut msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg_len: usize = msg.get_message_len();
/// ```
#[inline]
pub fn get_message_len(&self) -> usize {
self.message.len()
}
/// Encode Msg object to CBOR.
///
/// # Errors
/// If message cannot be encoded, an empty vector is returned.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let encoded_msg: Vec<u8> = msg.encode();
/// ```
#[inline]
pub fn encode(&self) -> Vec<u8> {
let encoded = serde_cbor::to_vec(self);
match encoded {
Ok(encoded) => encoded,
Err(err) => {
println!("Error on encode: {}", err);
Vec::new()
}
}
}
/// Decode CBOR byte string to Msg object.
///
/// # Errors
/// If message cannot be decoded, a Msg structure with empty items is returned.
///
/// # Example
/// ```
/// use mles_utils::Msg;
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let encoded_msg: Vec<u8> = msg.encode();
/// let decoded_msg: Msg = Msg::decode(&encoded_msg);
/// ```
#[inline]
pub fn decode(slice: &[u8]) -> Msg {
let value = serde_cbor::from_slice(slice);
match value {
Ok(value) => {
value
},
Err(err) => {
println!("Error on decode: {}", err);
Msg { uid: "".to_string(), channel: "".to_string(), message: Vec::new() } // return empty vec in case of error
}
}
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct MsgVec {
#[serde(with = "serde_bytes")]
encoded_msg: Vec<u8>, // this is encoded Msg
}
impl MsgVec {
pub fn new(encoded_msg: &Vec<u8>) -> MsgVec {
MsgVec {
encoded_msg: encoded_msg.clone(),
}
}
pub fn get(&self) -> &Vec<u8> {
&self.encoded_msg
}
}
/// ResyncMsg structure
///
/// This structure defines resynchronization Msg structure that can be used
/// to resynchronize history state to root server from peers. The resynchronization
/// message can be sent only during initial connection message and packs the
/// history into one message that can be taken into account by Mles root server.
///
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ResyncMsg {
resync_message: Vec<MsgVec>,
}
impl ResyncMsg {
/// Create a new ResyncMsg object with encoded message vector.
///
/// # Example
/// ```
/// use mles_utils::{Msg, ResyncMsg};
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.encode();
/// let vec = vec![msg];
/// let rmsg = ResyncMsg::new(&vec);
/// ```
#[inline]
pub fn new(messages: &Vec<Vec<u8>>) -> ResyncMsg {
let mut rmsg = ResyncMsg {
resync_message: Vec::new(),
};
//transform to correct format
for msg in messages {
rmsg.resync_message.push(MsgVec::new(&msg));
}
rmsg
}
/// Get the length of the resync message vector
///
/// # Example
/// ```
/// use mles_utils::{Msg, ResyncMsg};
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.encode();
/// let vec = vec![msg];
/// let rmsg = ResyncMsg::new(&vec);
/// assert_eq!(1, rmsg.len());
/// ```
#[inline]
pub fn len(&self) -> usize {
self.resync_message.len()
}
/// Get all items of the resync message vector
///
/// # Example
/// ```
/// use mles_utils::{Msg, ResyncMsg};
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.encode();
/// let vec = vec![msg];
/// let rmsg = ResyncMsg::new(&vec);
/// let rvec = rmsg.get_messages();
/// assert_eq!(vec[0], rvec[0]);
/// ```
#[inline]
pub fn get_messages(&self) -> Vec<Vec<u8>> {
//transform to correct format
let mut messages = Vec::new();
for msg in self.resync_message.iter() {
let msg = msg.get();
messages.push(msg.clone());
}
messages
}
/// Encode ResyncMsg object to CBOR.
///
/// # Errors
/// If resync message cannot be encoded, an empty vector is returned.
///
/// # Example
/// ```
/// use mles_utils::{ResyncMsg, Msg};
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.encode();
/// let vec = vec![msg];
/// let rmsg = ResyncMsg::new(&vec);
/// let encoded_msg: Vec<u8> = rmsg.encode();
/// ```
#[inline]
pub fn encode(&self) -> Vec<u8> {
let encoded = serde_cbor::to_vec(self);
match encoded {
Ok(encoded) => encoded,
Err(err) => {
println!("Error on resync encode: {}", err);
Vec::new()
}
}
}
/// Decode CBOR byte string to ResyncMsg object.
///
/// # Errors
/// If message cannot be decoded, a ResyncMsg structure with empty items is returned.
///
/// # Example
/// ```
/// use mles_utils::{ResyncMsg, Msg};
///
/// let msg = Msg::new("My uid".to_string(), "My channel".to_string(), Vec::new());
/// let msg = msg.encode();
/// let vec = vec![msg];
/// let rmsg = ResyncMsg::new(&vec);
/// let encoded_msg: Vec<u8> = rmsg.encode();
/// let decoded_msg: ResyncMsg = ResyncMsg::decode(&encoded_msg);
/// assert_eq!(1, decoded_msg.len());
/// ```
#[inline]
pub fn decode(slice: &[u8]) -> ResyncMsg {
let value = serde_cbor::from_slice(slice);
match value {
Ok(value) => value,
Err(_) => {
ResyncMsg { resync_message: Vec::new() } // return empty vec in case of error
}
}
}
}
/// Msg connection structure
///
/// This structure defines the Mles connection for simple synchronous connections.
///
pub struct MsgConn {
uid: String,
channel: String,
key: Option<u64>,
stream: Option<TcpStream>,
}
impl MsgConn {
/// Create a new MsgConn object for a connection.
///
/// # Example
/// ```
/// use mles_utils::MsgConn;
///
/// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
/// ```
#[inline]
pub fn new(uid: String, channel: String) -> MsgConn {
MsgConn {
uid: uid,
channel: channel,
key: None,
stream: None
}
}
/// Gets the defined uid.
///
/// # Example
/// ```
/// use mles_utils::MsgConn;
///
/// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
/// assert_eq!("My uid".to_string(), conn.get_uid());
/// ```
#[inline]
pub fn get_uid(&self) -> String {
self.uid.clone()
}
/// Gets the defined channel.
///
/// # Example
/// ```
/// use mles_utils::MsgConn;
///
/// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
/// assert_eq!("My channel".to_string(), conn.get_channel());
/// ```
#[inline]
pub fn get_channel(&self) -> String {
self.channel.clone()
}
/// Gets the defined key.
///
/// # Example
/// ```
/// use mles_utils::MsgConn;
///
/// //key is set only when connection is initiated..
/// let conn = MsgConn::new("My uid".to_string(), "My channel".to_string());
/// assert_eq!(true, conn.get_key().is_none());
/// ```
#[inline]
pub fn get_key(&self) -> Option<u64> {
self.key
}
/// Connects to the defined address with a message.
///
#[inline]
pub fn connect_with_message(mut self, raddr: SocketAddr, msg: Vec<u8>) -> MsgConn {
let msg = Msg::new(self.get_uid(), self.get_channel(), msg);
match TcpStream::connect(raddr) {
Ok(mut stream) => {
let _val = stream.set_nodelay(true);
if self.get_key().is_none() {
let mut keys = Vec::new();
let laddr = match stream.local_addr() {
Ok(laddr) => laddr,
Err(_) => {
let addr = "0.0.0.0:0";
addr.parse::<SocketAddr>().unwrap()
}
};
keys.push(MsgHdr::addr2str(&laddr));
keys.push(self.get_uid());
keys.push(self.get_channel());
let key = MsgHdr::do_hash(&keys);
self.key = Some(key);
}
let encoded_msg = msg.encode();
let key = self.get_key().unwrap();
let keyv = write_key(key);
let mut msgv = write_hdr_with_capacity(encoded_msg.len(), MsgHdr::select_cid(key),
HDRKEYL + encoded_msg.len());
msgv.extend(keyv);
msgv.extend(encoded_msg);
let msgv = msgv.freeze();
match stream.write_all(msgv.as_ref()) {
Ok(_) => self.stream = Some(stream),
Err(err) => {
println!("Send error {}", err);
self.stream = None;
}
}
self
},
Err(_) => {
println!("Could not connect to server {}", raddr);
self
},
}
}
/// Connects to the defined address (without a message).
///
#[inline]
pub fn connect(self, raddr: SocketAddr) -> MsgConn {
self.connect_with_message(raddr, Vec::new())
}
/// Send a message. Blocks until a message is sent.
///
/// # Errors
/// If a message cannot be sent, stream is set to None.
///
#[inline]
pub fn send_message(mut self, msg: Vec<u8>) -> MsgConn {
let message = Msg::new(self.get_uid(), self.get_channel(), msg);
let encoded_msg = message.encode();
let key = self.get_key().unwrap();
let keyv = write_key(key);
let mut msgv = write_hdr_with_capacity(encoded_msg.len(), MsgHdr::select_cid(key),
HDRKEYL + encoded_msg.len());
msgv.extend(keyv);
msgv.extend(encoded_msg);
let msgv = msgv.freeze();
let mut stream = self.stream.unwrap();
match stream.write_all(msgv.as_ref()) {
Ok(_) => self.stream = Some(stream),
Err(err) => {
println!("Send error {}", err);
self.stream = None;
}
}
self
}
/// Reads a message with non-zero message content. Blocks until a message is received.
///
/// # Errors
/// If message cannot be read, an empty message is returned.
///
#[inline]
pub fn read_message(mut self) -> (MsgConn, Vec<u8>) {
let stream = self.stream.unwrap();
loop {
let tuple = read_n(&stream, HDRKEYL);
let status = tuple.0;
match status {
Ok(0) => {
println!("Read failed: eof");
self.stream = None;
return (self, Vec::new());
},
_ => {}
}
let buf = tuple.1;
if 0 == buf.len() {
continue;
}
if read_hdr_type(buf.as_slice()) != 'M' as u32 {
continue;
}
let hdr_len = read_hdr_len(buf.as_slice());
if 0 == hdr_len {
continue;
}
let tuple = read_n(&stream, hdr_len);
let status = tuple.0;
match status {
Ok(0) => continue,
_ => {}
}
let payload = tuple.1;
if payload.len() != (hdr_len as usize) {
continue;
}
let decoded_message = Msg::decode(payload.as_slice());
if 0 == decoded_message.get_message_len() {
continue;
}
self.stream = Some(stream);
return (self, decoded_message.get_message().to_owned());
}
}
/// Closes the connection.
///
#[inline]
pub fn close(mut self) -> MsgConn {
if self.stream.is_some() {
drop(self.stream.unwrap());
}
self.stream = None;
self
}
}
#[inline]
pub(crate) fn read_hdr_type(hdr: &[u8]) -> u32 {
if hdr.len() < HDRL {
return 0;
}
let mut buf = Cursor::new(&hdr[..]);
let num = buf.get_u32_be();
num >> 24
}
fn read_hdr_len(hdr: &[u8]) -> usize {
if hdr.len() < HDRL {
return 0;
}
let mut buf = Cursor::new(&hdr[..]);
let num = buf.get_u32_be();
(num & 0xffffff) as usize
}
fn write_hdr(len: usize, cid: u32) -> BytesMut {
let hdr = (('M' as u32) << 24) | len as u32;
let mut msgv = BytesMut::from(Vec::with_capacity(HDRKEYL));
msgv.put_u32_be(hdr);
msgv.put_u32_be(cid);
msgv
}
fn write_hdr_with_capacity(len: usize, cid: u32, cap: usize) -> BytesMut {
let hdr = (('M' as u32) << 24) | len as u32;
let mut msgv = BytesMut::from(Vec::with_capacity(cap));
msgv.put_u32_be(hdr);
msgv.put_u32_be(cid);
msgv
}
fn write_hdr_without_cid(len: usize) -> BytesMut {
let hdr = (('M' as u32) << 24) | len as u32;
let mut msgv = BytesMut::from(Vec::with_capacity(HDRL));
msgv.put_u32_be(hdr);
msgv
}
#[inline]
pub(crate) fn write_len_to_hdr(len: usize, mut hdrv: BytesMut) -> BytesMut {
if hdrv.len() < HDRL {
return BytesMut::new();
}
let tail = hdrv.split_off(HDRL-CIDL);
let mut nhdrv = write_hdr_without_cid(len);
nhdrv.extend(tail);
nhdrv
}
fn write_key(val: u64) -> BytesMut {
let key = val;
let mut msgv = BytesMut::from(Vec::with_capacity(KEYL));
msgv.put_u64_be(key);
msgv
}
fn write_hdr_with_key(len: usize, key: u64) -> BytesMut {
let mut hdrv = write_hdr(len, MsgHdr::select_cid(key));
hdrv.extend(write_key(key));
hdrv
}
fn read_key_from_hdr(keyv: &[u8]) -> u64 {
if keyv.len() < HDRKEYL {
return 0;
}
let mut buf = Cursor::new(&keyv[HDRL..]);
buf.get_u64_be()
}
fn read_cid_from_hdr(hdrv: &[u8]) -> u32 {
if hdrv.len() < HDRL {
return 0;
}
let mut buf = Cursor::new(&hdrv[(HDRL-CIDL)..]);
buf.get_u32_be()
}
/// Check if a peer is defined
///
/// # Example
/// ```
/// use mles_utils::has_peer;
///
/// let sockaddr = None;
/// assert_eq!(false, has_peer(&sockaddr));
/// ```
#[inline]
pub fn has_peer(peer: &Option<SocketAddr>) -> bool {
peer::has_peer(peer)
}
fn read_n<R>(reader: R, bytes_to_read: usize) -> (Result<usize, Error>, Vec<u8>)
where R: Read,
{
let mut buf = Vec::with_capacity(bytes_to_read);
let mut chunk = reader.take(bytes_to_read as u64);
let status = chunk.read_to_end(&mut buf);
(status, buf)
}
/// Run an Mles server
///
/// # Example
/// ```
/// use std::thread;
/// use std::net::{IpAddr, Ipv4Addr};
/// use std::net::{SocketAddr, ToSocketAddrs};
/// use mles_utils::server_run;
///
/// let uid = "User".to_string();
/// let channel = "Channel".to_string();
/// let message = "Hello World!".to_string();
/// let address = "127.0.0.1:8077".to_string();
/// let address = address.parse().unwrap();
/// let child = thread::spawn(move || server_run(address, None, "".to_string(), "".to_string(), 100, 0));
/// drop(child);
/// ```
#[inline]
pub fn server_run(address: SocketAddr, peer: Option<SocketAddr>, keyval: String, keyaddr: String, hist_limit: usize, debug_flags: u64) {
server::run(address, peer, keyval, keyaddr, hist_limit, debug_flags);
}
#[cfg(test)]
mod tests {
use std::net::SocketAddr;
use std::thread;
use std::time::Duration;
use super::*;
#[test]
fn test_read_hdr_len_one() {
let orig_len = 1;
let hdrv = write_hdr(orig_len, 0x1);
let len = read_hdr_len(hdrv.as_ref());
assert_eq!(len, orig_len);
}
#[test]
fn test_read_hdr_len_16k() {
let orig_len = 16000;
let hdrv = write_hdr_with_capacity(orig_len, 0x1, HDRKEYL + orig_len);
let len = read_hdr_len(hdrv.as_ref());
assert_eq!(len, orig_len);
}
#[test]
fn test_read_hdr_len_16_7m() {
let orig_len = 16777215;
let hdrv = write_hdr(orig_len, 0x1);
let len = read_hdr_len(hdrv.as_ref());
assert_eq!(len, orig_len);
}
#[test]
fn test_encode_decode_msg() {
let uid = "User".to_string();
let channel = "Channel".to_string();
let msg = "a test msg".to_string().into_bytes();
let orig_msg = Msg::new(uid, channel, msg);
let encoded_msg = orig_msg.encode();
let decoded_msg = Msg::decode(&encoded_msg);
assert_eq!(decoded_msg.uid, orig_msg.uid);
assert_eq!(decoded_msg.channel, orig_msg.channel);
assert_eq!(decoded_msg.message, orig_msg.message);
}
#[test]
fn test_encode_decode_resync_msg() {
let uid = "User".to_string();
let channel = "Channel".to_string();
let msg = "a test msg".to_string().into_bytes();
let orig_msg = Msg::new(uid, channel, msg);
let encoded_msg = orig_msg.encode();
let uid2 = "User two".to_string();
let channel2 = "Channel two".to_string();
let msg2 = "a test msg two".to_string().into_bytes();
let orig_msg2 = Msg::new(uid2, channel2, msg2);
let encoded_msg2 = orig_msg2.encode();
let vec = vec![encoded_msg, encoded_msg2];
let rmsg = ResyncMsg::new(&vec);
let encoded_resync_msg: Vec<u8> = rmsg.encode();
let decoded_resync_msg: ResyncMsg = ResyncMsg::decode(&encoded_resync_msg);
let mut cnt = 0;
for msg in decoded_resync_msg.get_messages() {
let decoded_msg = Msg::decode(&msg);
if 0 == cnt {
assert_eq!(decoded_msg.uid, orig_msg.uid);
assert_eq!(decoded_msg.channel, orig_msg.channel);
assert_eq!(decoded_msg.message, orig_msg.message);
}
else {
assert_eq!(decoded_msg.uid, orig_msg2.uid);
assert_eq!(decoded_msg.channel, orig_msg2.channel);
assert_eq!(decoded_msg.message, orig_msg2.message);
}
cnt += 1;
}
}
#[test]
fn test_set_get_msg() {
let uid = "User".to_string();
let channel = "Channel".to_string();
let msg = "a test msg".to_string().into_bytes();
let orig_msg = Msg::new("".to_string(), channel.to_string(), Vec::new());
let orig_msg = orig_msg.set_uid(uid.clone());
let orig_msg = orig_msg.set_channel(channel.clone());
let orig_msg = orig_msg.set_message(msg.clone());
assert_eq!(&uid, orig_msg.get_uid());
assert_eq!(&channel, orig_msg.get_channel());
assert_eq!(&msg, orig_msg.get_message());
}
#[test]
fn test_cid() {
let orig_key = 0xffeffe;
let hdrv = write_hdr_with_key(64, orig_key);
let orig_len = hdrv.len();
let key = read_key_from_hdr(&hdrv);
assert_eq!(orig_key, key);
let read_cid = read_cid_from_hdr(&hdrv);
assert_eq!(orig_key as u32, read_cid);
let key = read_key_from_hdr(&hdrv);
assert_eq!(orig_key, key);
let len = hdrv.len();
assert_eq!(orig_len, len);
}
#[test]
fn test_msgconn_send_read() {
let sec = Duration::new(1,0);
let addr = "127.0.0.1:8077";
let addr = addr.parse::<SocketAddr>().unwrap();
let raddr = addr.clone();
let uid = "User".to_string();
let uid2 = "User two".to_string();
let channel = "Channel".to_string();
let message = "Hello World!".to_string();
//create server
let child = thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
thread::sleep(sec);
//send hello world
let mut conn = MsgConn::new(uid2.clone(), channel.clone());
conn = conn.connect_with_message(raddr, message.into_bytes());
conn.close();
//read hello world
let mut conn = MsgConn::new(uid.clone(), channel.clone());
conn = conn.connect(raddr);
let (conn, msg) = conn.read_message();
let msg = String::from_utf8_lossy(msg.as_slice());
assert_eq!("Hello World!", msg);
//close connection
conn.close();
//drop server
drop(child);
}
#[test]
fn test_msgconn_read_send() {
let sec = Duration::new(1,0);
let addr = "127.0.0.1:8076";
let addr = addr.parse::<SocketAddr>().unwrap();
let raddr = addr.clone();
let uid = "User".to_string();
let uid2 = "User two".to_string();
let channel = "Channel".to_string();
let message = "Hello World!".to_string();
//create server
let child = thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
thread::sleep(sec);
//read connect
let mut conn = MsgConn::new(uid.clone(), channel.clone());
conn = conn.connect(raddr);
//send hello world
let mut sconn = MsgConn::new(uid2.clone(), channel.clone());
sconn = sconn.connect_with_message(raddr, message.into_bytes());
sconn.close();
//read hello world
let (conn, msg) = conn.read_message();
let msg = String::from_utf8_lossy(msg.as_slice());
assert_eq!("Hello World!", msg);
//close connection
conn.close();
//drop server
drop(child);
}
#[test]
fn test_msgconn_peer_send_read() {
let sec = Duration::new(1,0);
let addr = "127.0.0.1:8075";
let addr = addr.parse::<SocketAddr>().unwrap();
let paddr = "127.0.0.1:8074";
let paddr = paddr.parse::<SocketAddr>().unwrap();
let praddr = paddr.clone();
let uid = "User".to_string();
let uid2 = "User two".to_string();
let channel = "Channel".to_string();
let message = "Hello World!".to_string();
//create server
let child = thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
thread::sleep(sec);
//create peer server
let pchild = thread::spawn(move || server_run(paddr, Some(addr), "".to_string(), "".to_string(), 100, 0));
thread::sleep(sec);
//send hello world
let mut conn = MsgConn::new(uid.clone(), channel.clone());
conn = conn.connect_with_message(praddr, message.into_bytes());
conn.close();
//read hello world
let mut conn = MsgConn::new(uid2.clone(), channel.clone());
conn = conn.connect(praddr);
let (conn, msg) = conn.read_message();
let msg = String::from_utf8_lossy(msg.as_slice());
assert_eq!("Hello World!", msg);
//close connection
conn.close();
//drop peer server
drop(pchild);
//drop server
drop(child);
}
#[test]
fn test_msgconn_peer_read_send() {
let sec = Duration::new(1,0);
let addr = "127.0.0.1:8073";
let addr = addr.parse::<SocketAddr>().unwrap();
let paddr = "127.0.0.1:8072";
let paddr = paddr.parse::<SocketAddr>().unwrap();
let praddr = paddr.clone();
let uid = "User".to_string();
let uid2 = "User two".to_string();
let channel = "Channel".to_string();
let message = "Hello World!".to_string();
//create server
let child = thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 100, 0));
thread::sleep(sec);
//create peer server
let pchild = thread::spawn(move || server_run(paddr, Some(addr), "".to_string(), "".to_string(), 100, 0));
thread::sleep(sec);
//read connect
let mut conn = MsgConn::new(uid.clone(), channel.clone());
conn = conn.connect(praddr);
//send hello world
let mut sconn = MsgConn::new(uid2.clone(), channel.clone());
sconn = sconn.connect_with_message(praddr, message.into_bytes());
sconn.close();
//read hello world
let (conn, msg) = conn.read_message();
let msg = String::from_utf8_lossy(msg.as_slice());
assert_eq!("Hello World!", msg);
//close connection
conn.close();
//drop peer server
drop(pchild);
//drop server
drop(child);
}
#[test]
fn test_msgconn_basic_read_send() {
let sec = Duration::new(1,0);
//set server address to connect
let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
//create server
let serv = thread::spawn(move || server_run(addr, None, "".to_string(), "".to_string(), 0, 0));
thread::sleep(sec);
let child = thread::spawn(|| {
let uid = "User two".to_string();
let channel = "Channel".to_string();
let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
//connect client to server
let mut conn = MsgConn::new(uid, channel);
conn = conn.connect(addr);
//blocking read for hello world
let (conn, msg) = conn.read_message();
let msg = String::from_utf8_lossy(msg.as_slice());
assert_eq!("Hello World!", msg);
conn.close();
});
thread::sleep(sec);
let addr = "127.0.0.1:8071".parse::<SocketAddr>().unwrap();
let uid = "User".to_string();
let channel = "Channel".to_string();
let message = "Hello World!".to_string();
//send hello world to awaiting client
let mut conn = MsgConn::new(uid, channel);
conn = conn.connect_with_message(addr, message.into_bytes());
conn.close();
let _res = child.join();
drop(serv);
}
}