use super::*;
use bytes::{Buf, Bytes};
fn len(subscribe: &Subscribe) -> usize {
2 + subscribe.filters.iter().fold(0, |s, t| s + filter::len(t))
}
pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Subscribe, Error> {
let variable_header_index = fixed_header.fixed_header_len;
bytes.advance(variable_header_index);
let pkid = read_u16(&mut bytes)?;
let filters = filter::read(&mut bytes)?;
match filters.len() {
0 => Err(Error::EmptySubscription),
_ => Ok(Subscribe { pkid, filters }),
}
}
pub fn write(subscribe: &Subscribe, buffer: &mut BytesMut) -> Result<usize, Error> {
buffer.put_u8(0x82);
let remaining_len = len(subscribe);
let remaining_len_bytes = write_remaining_length(buffer, remaining_len)?;
buffer.put_u16(subscribe.pkid);
for f in subscribe.filters.iter() {
filter::write(f, buffer);
}
Ok(1 + remaining_len_bytes + remaining_len)
}
mod filter {
use super::*;
pub fn len(filter: &Filter) -> usize {
2 + filter.path.len() + 1
}
pub fn read(bytes: &mut Bytes) -> Result<Vec<Filter>, Error> {
let mut filters = Vec::new();
while bytes.has_remaining() {
let path = read_mqtt_bytes(bytes)?;
let path = std::str::from_utf8(&path)?.to_owned();
let options = read_u8(bytes)?;
let requested_qos = options & 0b0000_0011;
filters.push(Filter {
path,
qos: qos(requested_qos).ok_or(Error::InvalidQoS(requested_qos))?,
nolocal: false,
preserve_retain: false,
retain_forward_rule: RetainForwardRule::OnEverySubscribe,
});
}
Ok(filters)
}
pub fn write(filter: &Filter, buffer: &mut BytesMut) {
let mut options = 0;
options |= filter.qos as u8;
write_mqtt_string(buffer, filter.path.as_str());
buffer.put_u8(options);
}
}