use crate::varint::VarintBuffer;
use crate::{
message_types::{ControlPacket, MessageType},
packets::PublishHeader,
};
use bit_field::BitField;
use serde::Serialize;
pub(crate) const MAX_FIXED_HEADER_SIZE: usize = 5;
#[derive(defmt::Format, Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum Error {
InsufficientMemory,
Custom,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum PubError<E> {
Encode(Error),
Payload(E),
}
impl serde::ser::StdError for Error {}
impl serde::ser::Error for Error {
fn custom<T: core::fmt::Display>(_msg: T) -> Self {
crate::trace!("Serialization error");
Error::Custom
}
}
impl core::fmt::Display for Error {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(
f,
"{}",
match self {
Error::Custom => "Custom deserialization error",
Error::InsufficientMemory => "Not enough data to encode the packet",
}
)
}
}
pub struct MqttSerializer<'a> {
buf: &'a mut [u8],
index: usize,
with_header: bool,
}
impl<'a> MqttSerializer<'a> {
pub fn new(buf: &'a mut [u8]) -> Self {
Self {
buf,
index: MAX_FIXED_HEADER_SIZE,
with_header: true,
}
}
pub fn new_without_header(buf: &'a mut [u8]) -> Self {
Self {
buf,
index: 0,
with_header: false,
}
}
pub fn finish(self) -> &'a mut [u8] {
assert!(!self.with_header);
&mut self.buf[..self.index]
}
pub fn to_buffer_meta<T: Serialize + ControlPacket>(
buf: &'a mut [u8],
packet: &T,
) -> Result<(usize, &'a [u8]), Error> {
let mut serializer = Self::new(buf);
packet.serialize(&mut serializer)?;
let (offset, packet) = serializer.finalize(T::MESSAGE_TYPE, packet.fixed_header_flags())?;
Ok((offset, packet))
}
pub fn pub_to_buffer_meta<P: crate::publication::ToPayload>(
buf: &'a mut [u8],
header: &PublishHeader<'_>,
payload: P,
) -> Result<(usize, &'a [u8]), PubError<P::Error>> {
let mut serializer = crate::ser::MqttSerializer::new(buf);
header
.serialize(&mut serializer)
.map_err(PubError::Encode)?;
let flags = header.fixed_header_flags();
let len = payload
.serialize(serializer.remainder())
.map_err(PubError::Payload)?;
serializer.commit(len).map_err(PubError::Encode)?;
let (offset, packet) = serializer
.finalize(MessageType::Publish, flags)
.map_err(PubError::Encode)?;
Ok((offset, packet))
}
pub fn pub_to_buffer<P: crate::publication::ToPayload>(
buf: &'a mut [u8],
header: &PublishHeader<'_>,
payload: P,
) -> Result<&'a [u8], PubError<P::Error>> {
let (_, packet) = Self::pub_to_buffer_meta(buf, header, payload)?;
Ok(packet)
}
pub fn to_buffer<T: Serialize + ControlPacket>(
buf: &'a mut [u8],
packet: &T,
) -> Result<&'a [u8], Error> {
let (_, packet) = Self::to_buffer_meta(buf, packet)?;
Ok(packet)
}
pub fn finalize(self, typ: MessageType, flags: u8) -> Result<(usize, &'a [u8]), Error> {
let len = self
.index
.checked_sub(MAX_FIXED_HEADER_SIZE)
.ok_or(Error::InsufficientMemory)?;
let mut buffer = VarintBuffer::new();
crate::varint::write_mqtt_u32_varint(len as u32, &mut buffer)
.map_err(|_| Error::InsufficientMemory)?;
let remaining_len = buffer.as_slice();
if self.buf.len() < MAX_FIXED_HEADER_SIZE {
return Err(Error::InsufficientMemory);
}
self.buf[MAX_FIXED_HEADER_SIZE - remaining_len.len()..MAX_FIXED_HEADER_SIZE]
.copy_from_slice(remaining_len);
let header: u8 = *0u8.set_bits(4..8, typ as u8).set_bits(0..4, flags);
self.buf[MAX_FIXED_HEADER_SIZE - remaining_len.len() - 1] = header;
let offset = MAX_FIXED_HEADER_SIZE - remaining_len.len() - 1;
Ok((offset, &self.buf[offset..self.index]))
}
pub fn push_bytes(&mut self, data: &[u8]) -> Result<(), Error> {
crate::trace!("Serializer pushed {=usize} bytes", data.len());
if self.buf.len().saturating_sub(self.index) < data.len() {
return Err(Error::InsufficientMemory);
}
self.buf[self.index..][..data.len()].copy_from_slice(data);
self.index += data.len();
Ok(())
}
pub fn push(&mut self, byte: u8) -> Result<(), Error> {
if self.buf.len().saturating_sub(self.index) < 1 {
return Err(Error::InsufficientMemory);
}
self.buf[self.index] = byte;
self.index += 1;
Ok(())
}
pub fn remainder(&mut self) -> &mut [u8] {
let start = self.index.min(self.buf.len());
&mut self.buf[start..]
}
pub fn commit(&mut self, len: usize) -> Result<(), Error> {
if self.buf.len().saturating_sub(self.index) < len {
return Err(Error::InsufficientMemory);
}
self.index += len;
Ok(())
}
}
impl serde::Serializer for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
type SerializeSeq = Self;
type SerializeTuple = Self;
type SerializeTupleStruct = Self;
type SerializeTupleVariant = Self;
type SerializeMap = Self;
type SerializeStruct = Self;
type SerializeStructVariant = Self;
fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
self.push(v as u8)
}
fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
self.push(v as u8)
}
fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
self.push(v)
}
fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
self.serialize_bytes(v.as_bytes())
}
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
self.push_bytes(v)
}
fn serialize_none(self) -> Result<(), Error> {
Ok(())
}
fn serialize_unit_struct(self, _name: &'static str) -> Result<(), Error> {
Ok(())
}
fn serialize_newtype_struct<T: ?Sized + Serialize>(
self,
_name: &'static str,
value: &T,
) -> Result<(), Error> {
value.serialize(self)
}
fn serialize_some<T: ?Sized + Serialize>(self, value: &T) -> Result<(), Error> {
value.serialize(self)
}
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Error> {
Ok(self)
}
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Error> {
Ok(self)
}
fn serialize_struct(
self,
_name: &'static str,
_len: usize,
) -> Result<Self::SerializeStruct, Error> {
Ok(self)
}
fn serialize_char(self, _v: char) -> Result<Self::Ok, Self::Error> {
unimplemented!()
}
fn serialize_unit(self) -> Result<(), Error> {
unimplemented!()
}
fn serialize_unit_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
) -> Result<(), Error> {
unimplemented!()
}
fn serialize_newtype_variant<T: ?Sized + Serialize>(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_value: &T,
) -> Result<(), Error> {
unimplemented!()
}
fn serialize_tuple_struct(
self,
_name: &'static str,
_len: usize,
) -> Result<Self::SerializeTupleStruct, Error> {
unimplemented!()
}
fn serialize_tuple_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize,
) -> Result<Self::SerializeTupleVariant, Error> {
unimplemented!()
}
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Error> {
unimplemented!()
}
fn serialize_struct_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize,
) -> Result<Self::SerializeStructVariant, Error> {
unimplemented!()
}
fn collect_str<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Error> {
unimplemented!()
}
fn serialize_f32(self, _v: f32) -> Result<Self::Ok, Self::Error> {
unimplemented!()
}
fn serialize_f64(self, _v: f64) -> Result<Self::Ok, Self::Error> {
unimplemented!()
}
}
impl serde::ser::SerializeStruct for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(
&mut self,
_key: &'static str,
value: &T,
) -> Result<(), Error> {
value.serialize(&mut **self)
}
fn end(self) -> Result<(), Error> {
Ok(())
}
}
impl serde::ser::SerializeSeq for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_element<T: ?Sized + Serialize>(&mut self, value: &T) -> Result<(), Error> {
value.serialize(&mut **self)
}
fn end(self) -> Result<(), Error> {
Ok(())
}
}
impl serde::ser::SerializeTuple for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_element<T: ?Sized + Serialize>(&mut self, value: &T) -> Result<(), Error> {
value.serialize(&mut **self)
}
fn end(self) -> Result<(), Error> {
Ok(())
}
}
impl serde::ser::SerializeTupleStruct for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(&mut self, _value: &T) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
impl serde::ser::SerializeTupleVariant for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(&mut self, _value: &T) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
impl serde::ser::SerializeMap for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_key<T: ?Sized + Serialize>(&mut self, _key: &T) -> Result<(), Error> {
unimplemented!()
}
fn serialize_value<T: ?Sized + Serialize>(&mut self, _value: &T) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
impl serde::ser::SerializeStructVariant for &mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(
&mut self,
_key: &'static str,
_value: &T,
) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
#[cfg(test)]
mod tests {
use super::{Error, MqttSerializer};
use crate::{
packets::{PingReq, PublishHeader},
publication::Publication,
types::Utf8String,
};
#[test]
fn control_packet_encode_rejects_buffers_smaller_than_fixed_header() {
for len in 0..super::MAX_FIXED_HEADER_SIZE {
let mut buf = vec![0u8; len];
let result = MqttSerializer::to_buffer(&mut buf, &PingReq);
assert!(matches!(result, Err(Error::InsufficientMemory)));
}
}
#[test]
fn publish_encode_rejects_buffers_smaller_than_fixed_header() {
for len in 0..super::MAX_FIXED_HEADER_SIZE {
let mut buf = vec![0u8; len];
let publication = Publication::bytes("a", b"x");
let header = PublishHeader {
topic: Utf8String(publication.topic),
packet_id: None,
properties: publication.properties,
retain: publication.retain,
qos: publication.qos,
dup: false,
};
let result = MqttSerializer::pub_to_buffer(&mut buf, &header, publication.payload);
assert!(matches!(
result,
Err(crate::ser::PubError::Encode(Error::InsufficientMemory))
));
}
}
}