use bytes::{BufMut, BytesMut};
use mqtt4bytes::{mqtt_read, Packet, PubAck, Publish, QoS};
use rand::Rng;
#[test]
fn publish_encode_and_decode_works_as_expected() {
let mut stream = publishes(1024, 2 * 1024 * 1024);
let mut pkid = 0;
let mut read_stream = BytesMut::new();
loop {
let fill_size = rand::thread_rng().gen_range(0, 1024);
let len = stream.len();
let split_len = if len == 0 {
break;
} else if len > fill_size {
fill_size
} else {
len
};
let bytes = stream.split_to(split_len);
read_stream.put(bytes);
let packet = match mqtt_read(&mut read_stream, 10 * 1024) {
Err(mqtt4bytes::Error::InsufficientBytes(_)) => continue,
Err(e) => panic!(e),
Ok(packet) => packet,
};
match packet {
Packet::Publish(publish) => {
assert_eq!(publish.pkid, (pkid % 65000) + 1);
pkid = (pkid % 65000) as u16 + 1;
}
_ => panic!("Expecting a publish"),
}
}
}
pub fn publishes(size: usize, count: usize) -> BytesMut {
let mut stream = BytesMut::new();
for i in 0..count {
let payload = vec![i as u8; size];
let mut packet = Publish::new("hello/mqtt/topic/bytes", QoS::AtLeastOnce, payload);
packet.pkid = (i % 65000) as u16 + 1;
packet.write(&mut stream).unwrap();
}
stream
}
#[test]
fn pubacks_encode_and_decode_works_as_expected() {
let mut stream = pubacks(10 * 1024 * 1024);
let mut pkid = 0;
let mut read_stream = BytesMut::new();
loop {
let fill_size = rand::thread_rng().gen_range(0, 10);
let len = stream.len();
let split_len = if len == 0 {
break;
} else if len > fill_size {
fill_size
} else {
len
};
let bytes = stream.split_to(split_len);
read_stream.put(bytes);
let packet = match mqtt_read(&mut read_stream, 10 * 1024) {
Err(mqtt4bytes::Error::InsufficientBytes(_)) => continue,
Err(e) => panic!(e),
Ok(packet) => packet,
};
match packet {
Packet::PubAck(ack) => {
assert_eq!(ack.pkid, (pkid % 65000) + 1);
pkid = (pkid % 65000) as u16 + 1;
}
_ => panic!("Expecting a publish"),
}
}
}
pub fn pubacks(count: usize) -> BytesMut {
let mut stream = BytesMut::new();
for i in 0..count {
let packet = PubAck::new((i % 65000) as u16 + 1);
packet.write(&mut stream).unwrap();
}
stream
}