use crate::varint::VarintBuffer;
use crate::{
message_types::{ControlPacket, MessageType},
packets::Pub,
};
use bit_field::BitField;
use serde::Serialize;
use varint_rs::VarintWriter;
const MAX_FIXED_HEADER_SIZE: usize = 5;
#[derive(Debug, Copy, Clone, PartialEq)]
#[non_exhaustive]
pub enum Error {
InsufficientMemory,
Custom,
}
#[derive(Debug, Copy, Clone, PartialEq)]
pub enum PubError<E> {
Error(Error),
Other(E),
}
impl serde::ser::StdError for Error {}
impl serde::ser::Error for Error {
fn custom<T: core::fmt::Display>(_msg: T) -> Self {
crate::error!("{}", _msg);
Error::Custom
}
}
impl core::fmt::Display for Error {
fn fmt(&self, f: &mut core::fmt::Formatter) -> core::fmt::Result {
write!(
f,
"{}",
match self {
Error::Custom => "Custom deserialization error",
Error::InsufficientMemory => "Not enough data to encode the packet",
}
)
}
}
pub struct MqttSerializer<'a> {
buf: &'a mut [u8],
index: usize,
with_header: bool,
}
impl<'a> MqttSerializer<'a> {
pub fn new(buf: &'a mut [u8]) -> Self {
Self {
buf,
index: MAX_FIXED_HEADER_SIZE,
with_header: true,
}
}
pub fn new_without_header(buf: &'a mut [u8]) -> Self {
Self {
buf,
index: 0,
with_header: false,
}
}
pub fn finish(self) -> &'a mut [u8] {
assert!(!self.with_header);
&mut self.buf[..self.index]
}
pub fn to_buffer_meta<T: Serialize + ControlPacket>(
buf: &'a mut [u8],
packet: &T,
) -> Result<(usize, &'a [u8]), Error> {
let mut serializer = Self::new(buf);
packet.serialize(&mut serializer)?;
let (offset, packet) = serializer.finalize(T::MESSAGE_TYPE, packet.fixed_header_flags())?;
Ok((offset, packet))
}
pub fn pub_to_buffer_meta<P: crate::publication::ToPayload>(
buf: &'a mut [u8],
pub_packet: Pub<'_, P>,
) -> Result<(usize, &'a [u8]), PubError<P::Error>> {
let mut serializer = crate::ser::MqttSerializer::new(buf);
pub_packet
.serialize(&mut serializer)
.map_err(PubError::Error)?;
let flags = pub_packet.fixed_header_flags();
let len = pub_packet
.payload
.serialize(serializer.remainder())
.map_err(PubError::Other)?;
serializer.commit(len).map_err(PubError::Error)?;
let (offset, packet) = serializer
.finalize(MessageType::Publish, flags)
.map_err(PubError::Error)?;
Ok((offset, packet))
}
pub fn pub_to_buffer<P: crate::publication::ToPayload>(
buf: &'a mut [u8],
pub_packet: Pub<'_, P>,
) -> Result<&'a [u8], PubError<P::Error>> {
let (_, packet) = Self::pub_to_buffer_meta(buf, pub_packet)?;
Ok(packet)
}
pub fn to_buffer<T: Serialize + ControlPacket>(
buf: &'a mut [u8],
packet: &T,
) -> Result<&'a [u8], Error> {
let (_, packet) = Self::to_buffer_meta(buf, packet)?;
Ok(packet)
}
pub fn finalize(self, typ: MessageType, flags: u8) -> Result<(usize, &'a [u8]), Error> {
let len = self.index - MAX_FIXED_HEADER_SIZE;
let mut buffer = VarintBuffer::new();
buffer
.write_u32_varint(len as u32)
.map_err(|_| Error::InsufficientMemory)?;
self.buf[MAX_FIXED_HEADER_SIZE - buffer.data.len()..MAX_FIXED_HEADER_SIZE]
.copy_from_slice(&buffer.data);
let header: u8 = *0u8.set_bits(4..8, typ as u8).set_bits(0..4, flags);
self.buf[MAX_FIXED_HEADER_SIZE - buffer.data.len() - 1] = header;
let offset = MAX_FIXED_HEADER_SIZE - buffer.data.len() - 1;
Ok((offset, &self.buf[offset..self.index]))
}
pub fn push_bytes(&mut self, data: &[u8]) -> Result<(), Error> {
crate::trace!("Pushing {:?}", data);
if self.buf.len() - self.index < data.len() {
return Err(Error::InsufficientMemory);
}
self.buf[self.index..][..data.len()].copy_from_slice(data);
self.index += data.len();
Ok(())
}
pub fn push(&mut self, byte: u8) -> Result<(), Error> {
if self.buf.len() - self.index < 1 {
return Err(Error::InsufficientMemory);
}
self.buf[self.index] = byte;
self.index += 1;
Ok(())
}
pub fn remainder(&mut self) -> &mut [u8] {
&mut self.buf[self.index..]
}
pub fn commit(&mut self, len: usize) -> Result<(), Error> {
if self.buf.len() < (self.index + len) {
return Err(Error::InsufficientMemory);
}
self.index += len;
Ok(())
}
}
impl<'a> serde::Serializer for &mut MqttSerializer<'a> {
type Ok = ();
type Error = Error;
type SerializeSeq = Self;
type SerializeTuple = Self;
type SerializeTupleStruct = Self;
type SerializeTupleVariant = Self;
type SerializeMap = Self;
type SerializeStruct = Self;
type SerializeStructVariant = Self;
fn serialize_bool(self, v: bool) -> Result<Self::Ok, Self::Error> {
self.push(v as u8)
}
fn serialize_i8(self, v: i8) -> Result<Self::Ok, Self::Error> {
self.push(v as u8)
}
fn serialize_i16(self, v: i16) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_i32(self, v: i32) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_i64(self, v: i64) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_u8(self, v: u8) -> Result<Self::Ok, Self::Error> {
self.push(v)
}
fn serialize_u16(self, v: u16) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_u32(self, v: u32) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_u64(self, v: u64) -> Result<Self::Ok, Self::Error> {
self.push_bytes(&v.to_be_bytes())
}
fn serialize_str(self, v: &str) -> Result<Self::Ok, Self::Error> {
self.serialize_bytes(v.as_bytes())
}
fn serialize_bytes(self, v: &[u8]) -> Result<Self::Ok, Self::Error> {
self.push_bytes(v)
}
fn serialize_none(self) -> Result<(), Error> {
Ok(())
}
fn serialize_unit_struct(self, _name: &'static str) -> Result<(), Error> {
Ok(())
}
fn serialize_newtype_struct<T: ?Sized + Serialize>(
self,
_name: &'static str,
value: &T,
) -> Result<(), Error> {
value.serialize(self)
}
fn serialize_some<T: ?Sized + Serialize>(self, value: &T) -> Result<(), Error> {
value.serialize(self)
}
fn serialize_seq(self, _len: Option<usize>) -> Result<Self::SerializeSeq, Error> {
Ok(self)
}
fn serialize_tuple(self, _len: usize) -> Result<Self::SerializeTuple, Error> {
Ok(self)
}
fn serialize_struct(
self,
_name: &'static str,
_len: usize,
) -> Result<Self::SerializeStruct, Error> {
Ok(self)
}
fn serialize_char(self, _v: char) -> Result<Self::Ok, Self::Error> {
unimplemented!()
}
fn serialize_unit(self) -> Result<(), Error> {
unimplemented!()
}
fn serialize_unit_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
) -> Result<(), Error> {
unimplemented!()
}
fn serialize_newtype_variant<T: ?Sized + Serialize>(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_value: &T,
) -> Result<(), Error> {
unimplemented!()
}
fn serialize_tuple_struct(
self,
_name: &'static str,
_len: usize,
) -> Result<Self::SerializeTupleStruct, Error> {
unimplemented!()
}
fn serialize_tuple_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize,
) -> Result<Self::SerializeTupleVariant, Error> {
unimplemented!()
}
fn serialize_map(self, _len: Option<usize>) -> Result<Self::SerializeMap, Error> {
unimplemented!()
}
fn serialize_struct_variant(
self,
_name: &'static str,
_variant_index: u32,
_variant: &'static str,
_len: usize,
) -> Result<Self::SerializeStructVariant, Error> {
unimplemented!()
}
fn collect_str<T: ?Sized>(self, _value: &T) -> Result<Self::Ok, Error> {
unimplemented!()
}
fn serialize_f32(self, _v: f32) -> Result<Self::Ok, Self::Error> {
unimplemented!()
}
fn serialize_f64(self, _v: f64) -> Result<Self::Ok, Self::Error> {
unimplemented!()
}
}
impl<'a> serde::ser::SerializeStruct for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(
&mut self,
_key: &'static str,
value: &T,
) -> Result<(), Error> {
value.serialize(&mut **self)
}
fn end(self) -> Result<(), Error> {
Ok(())
}
}
impl<'a> serde::ser::SerializeSeq for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_element<T: ?Sized + Serialize>(&mut self, value: &T) -> Result<(), Error> {
value.serialize(&mut **self)
}
fn end(self) -> Result<(), Error> {
Ok(())
}
}
impl<'a> serde::ser::SerializeTuple for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_element<T: ?Sized + Serialize>(&mut self, value: &T) -> Result<(), Error> {
value.serialize(&mut **self)
}
fn end(self) -> Result<(), Error> {
Ok(())
}
}
impl<'a> serde::ser::SerializeTupleStruct for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(&mut self, _value: &T) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
impl<'a> serde::ser::SerializeTupleVariant for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(&mut self, _value: &T) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
impl<'a> serde::ser::SerializeMap for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_key<T: ?Sized + Serialize>(&mut self, _key: &T) -> Result<(), Error> {
unimplemented!()
}
fn serialize_value<T: ?Sized + Serialize>(&mut self, _value: &T) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}
impl<'a> serde::ser::SerializeStructVariant for &'a mut MqttSerializer<'_> {
type Ok = ();
type Error = Error;
fn serialize_field<T: ?Sized + Serialize>(
&mut self,
_key: &'static str,
_value: &T,
) -> Result<(), Error> {
unimplemented!()
}
fn end(self) -> Result<(), Error> {
unimplemented!()
}
}