mqtt_proto/v3/
poll.rs

1use futures_lite::future::block_on;
2
3use super::{
4    Connack, Connect, Header, Packet, PacketType, Publish, Suback, Subscribe, Unsubscribe,
5};
6use crate::{
7    read_u16, Error, GenericPollBodyState, GenericPollPacket, GenericPollPacketState, Pid,
8    PollHeader,
9};
10
11impl PollHeader for Header {
12    type Error = Error;
13    type Packet = Packet;
14
15    fn new_with(hd: u8, remaining_len: u32) -> Result<Self, Self::Error>
16    where
17        Self: Sized,
18    {
19        Header::new_with(hd, remaining_len)
20    }
21
22    fn build_empty_packet(&self) -> Option<Self::Packet> {
23        let packet = match self.typ {
24            PacketType::Pingreq => Packet::Pingreq,
25            PacketType::Pingresp => Packet::Pingresp,
26            PacketType::Disconnect => Packet::Disconnect,
27            _ => return None,
28        };
29        Some(packet)
30    }
31
32    fn block_decode(self, reader: &mut &[u8]) -> Result<Self::Packet, Self::Error> {
33        match self.typ {
34            PacketType::Connect => block_on(Connect::decode_async(reader)).map(Into::into),
35            PacketType::Connack => block_on(Connack::decode_async(reader)).map(Into::into),
36            PacketType::Publish => block_on(Publish::decode_async(reader, self)).map(Into::into),
37            PacketType::Puback => Ok(Packet::Puback(Pid::try_from(block_on(read_u16(reader))?)?)),
38            PacketType::Pubrec => Ok(Packet::Pubrec(Pid::try_from(block_on(read_u16(reader))?)?)),
39            PacketType::Pubrel => Ok(Packet::Pubrel(Pid::try_from(block_on(read_u16(reader))?)?)),
40            PacketType::Pubcomp => Ok(Packet::Pubcomp(Pid::try_from(block_on(read_u16(reader))?)?)),
41            PacketType::Subscribe => {
42                block_on(Subscribe::decode_async(reader, self.remaining_len())).map(Into::into)
43            }
44            PacketType::Suback => {
45                block_on(Suback::decode_async(reader, self.remaining_len())).map(Into::into)
46            }
47            PacketType::Unsubscribe => {
48                block_on(Unsubscribe::decode_async(reader, self.remaining_len())).map(Into::into)
49            }
50            PacketType::Unsuback => Ok(Packet::Unsuback(Pid::try_from(block_on(read_u16(
51                reader,
52            ))?)?)),
53            PacketType::Pingreq | PacketType::Pingresp | PacketType::Disconnect => unreachable!(),
54        }
55    }
56
57    fn remaining_len(&self) -> usize {
58        self.remaining_len as usize
59    }
60
61    fn is_eof_error(err: &Self::Error) -> bool {
62        err.is_eof()
63    }
64}
65
66pub type PollPacket<'a, T> = GenericPollPacket<'a, T, Header>;
67pub type PollPacketState = GenericPollPacketState<Header>;
68pub type PollBodyState = GenericPollBodyState<Header>;