mqtt_proto/v3/
poll.rs

1use crate::{
2    block_on, read_u16, Error, GenericPollBodyState, GenericPollPacket, GenericPollPacketState,
3    Pid, PollHeader,
4};
5
6use super::{
7    Connack, Connect, Header, Packet, PacketType, Publish, Suback, Subscribe, Unsubscribe,
8};
9
10impl PollHeader for Header {
11    type Error = Error;
12    type Packet = Packet;
13
14    fn new_with(hd: u8, remaining_len: u32) -> Result<Self, Self::Error>
15    where
16        Self: Sized,
17    {
18        Header::new_with(hd, remaining_len)
19    }
20
21    fn build_empty_packet(&self) -> Option<Self::Packet> {
22        let packet = match self.typ {
23            PacketType::Pingreq => Packet::Pingreq,
24            PacketType::Pingresp => Packet::Pingresp,
25            PacketType::Disconnect => Packet::Disconnect,
26            _ => return None,
27        };
28        Some(packet)
29    }
30
31    fn block_decode(self, reader: &mut &[u8]) -> Result<Self::Packet, Self::Error> {
32        match self.typ {
33            PacketType::Connect => block_on(Connect::decode_async(reader)).map(Into::into),
34            PacketType::Connack => block_on(Connack::decode_async(reader)).map(Into::into),
35            PacketType::Publish => block_on(Publish::decode_async(reader, self)).map(Into::into),
36            PacketType::Puback => Ok(Packet::Puback(Pid::try_from(block_on(read_u16(reader))?)?)),
37            PacketType::Pubrec => Ok(Packet::Pubrec(Pid::try_from(block_on(read_u16(reader))?)?)),
38            PacketType::Pubrel => Ok(Packet::Pubrel(Pid::try_from(block_on(read_u16(reader))?)?)),
39            PacketType::Pubcomp => Ok(Packet::Pubcomp(Pid::try_from(block_on(read_u16(reader))?)?)),
40            PacketType::Subscribe => {
41                block_on(Subscribe::decode_async(reader, self.remaining_len())).map(Into::into)
42            }
43            PacketType::Suback => {
44                block_on(Suback::decode_async(reader, self.remaining_len())).map(Into::into)
45            }
46            PacketType::Unsubscribe => {
47                block_on(Unsubscribe::decode_async(reader, self.remaining_len())).map(Into::into)
48            }
49            PacketType::Unsuback => Ok(Packet::Unsuback(Pid::try_from(block_on(read_u16(
50                reader,
51            ))?)?)),
52            PacketType::Pingreq | PacketType::Pingresp | PacketType::Disconnect => unreachable!(),
53        }
54    }
55
56    fn remaining_len(&self) -> usize {
57        self.remaining_len as usize
58    }
59
60    fn is_eof_error(err: &Self::Error) -> bool {
61        err.is_eof()
62    }
63}
64
65pub type PollPacket<'a, T> = GenericPollPacket<'a, T, Header>;
66pub type PollPacketState = GenericPollPacketState<Header>;
67pub type PollBodyState = GenericPollBodyState<Header>;