mqtt4bytes 0.4.0

MQTT 4 core serialization and deserialization
Documentation
use bytes::{BufMut, BytesMut};
use mqtt4bytes::{mqtt_read, Packet, PubAck, Publish, QoS};
use rand::Rng;

#[test]
fn publish_encode_and_decode_works_as_expected() {
    let mut stream = publishes(1024, 2 * 1024 * 1024);
    let mut pkid = 0;

    // stream which decoder reads from
    let mut read_stream = BytesMut::new();
    loop {
        // fill the decoder stream with n bytes.
        let fill_size = rand::thread_rng().gen_range(0, 1024);
        let len = stream.len();
        let split_len = if len == 0 {
            break;
        } else if len > fill_size {
            fill_size
        } else {
            len
        };

        let bytes = stream.split_to(split_len);
        read_stream.put(bytes);
        let packet = match mqtt_read(&mut read_stream, 10 * 1024) {
            Err(mqtt4bytes::Error::InsufficientBytes(_)) => continue,
            Err(e) => panic!(e),
            Ok(packet) => packet,
        };

        match packet {
            Packet::Publish(publish) => {
                assert_eq!(publish.pkid, (pkid % 65000) + 1);
                pkid = (pkid % 65000) as u16 + 1;
            }
            _ => panic!("Expecting a publish"),
        }
    }
}

pub fn publishes(size: usize, count: usize) -> BytesMut {
    let mut stream = BytesMut::new();

    for i in 0..count {
        let payload = vec![i as u8; size];
        let mut packet = Publish::new("hello/mqtt/topic/bytes", QoS::AtLeastOnce, payload);
        packet.pkid = (i % 65000) as u16 + 1;
        packet.write(&mut stream).unwrap();
    }

    stream
}

#[test]
fn pubacks_encode_and_decode_works_as_expected() {
    let mut stream = pubacks(10 * 1024 * 1024);
    let mut pkid = 0;

    // stream which decoder reads from
    let mut read_stream = BytesMut::new();
    loop {
        // fill the decoder stream with n bytes.
        let fill_size = rand::thread_rng().gen_range(0, 10);
        let len = stream.len();
        let split_len = if len == 0 {
            break;
        } else if len > fill_size {
            fill_size
        } else {
            len
        };

        let bytes = stream.split_to(split_len);
        read_stream.put(bytes);
        let packet = match mqtt_read(&mut read_stream, 10 * 1024) {
            Err(mqtt4bytes::Error::InsufficientBytes(_)) => continue,
            Err(e) => panic!(e),
            Ok(packet) => packet,
        };

        match packet {
            Packet::PubAck(ack) => {
                assert_eq!(ack.pkid, (pkid % 65000) + 1);
                pkid = (pkid % 65000) as u16 + 1;
            }
            _ => panic!("Expecting a publish"),
        }
    }
}

pub fn pubacks(count: usize) -> BytesMut {
    let mut stream = BytesMut::new();

    for i in 0..count {
        let packet = PubAck::new((i % 65000) as u16 + 1);
        packet.write(&mut stream).unwrap();
    }

    stream
}