mqtt_proto/v5/
poll.rs

1use futures_lite::future::block_on;
2
3use super::{
4    Auth, Connack, Connect, Disconnect, ErrorV5, Header, Packet, PacketType, Puback, Pubcomp,
5    Publish, Pubrec, Pubrel, Suback, Subscribe, Unsuback, Unsubscribe,
6};
7use crate::{GenericPollBodyState, GenericPollPacket, GenericPollPacketState, PollHeader};
8
9impl PollHeader for Header {
10    type Error = ErrorV5;
11    type Packet = Packet;
12
13    fn new_with(hd: u8, remaining_len: u32) -> Result<Self, Self::Error>
14    where
15        Self: Sized,
16    {
17        Header::new_with(hd, remaining_len)
18    }
19
20    fn build_empty_packet(&self) -> Option<Self::Packet> {
21        let packet = match self.typ {
22            PacketType::Pingreq => Packet::Pingreq,
23            PacketType::Pingresp => Packet::Pingresp,
24            PacketType::Auth if self.remaining_len == 0 => Auth::new_success().into(),
25            PacketType::Disconnect if self.remaining_len == 0 => Disconnect::new_normal().into(),
26            _ => return None,
27        };
28        Some(packet)
29    }
30
31    fn block_decode(self, reader: &mut &[u8]) -> Result<Self::Packet, Self::Error> {
32        match self.typ {
33            PacketType::Connect => block_on(Connect::decode_async(reader, self)).map(Into::into),
34            PacketType::Connack => block_on(Connack::decode_async(reader, self)).map(Into::into),
35            PacketType::Publish => block_on(Publish::decode_async(reader, self)).map(Into::into),
36            PacketType::Puback => block_on(Puback::decode_async(reader, self)).map(Into::into),
37            PacketType::Pubrec => block_on(Pubrec::decode_async(reader, self)).map(Into::into),
38            PacketType::Pubrel => block_on(Pubrel::decode_async(reader, self)).map(Into::into),
39            PacketType::Pubcomp => block_on(Pubcomp::decode_async(reader, self)).map(Into::into),
40            PacketType::Subscribe => {
41                block_on(Subscribe::decode_async(reader, self)).map(Into::into)
42            }
43            PacketType::Suback => block_on(Suback::decode_async(reader, self)).map(Into::into),
44            PacketType::Unsubscribe => {
45                block_on(Unsubscribe::decode_async(reader, self)).map(Into::into)
46            }
47            PacketType::Unsuback => block_on(Unsuback::decode_async(reader, self)).map(Into::into),
48            PacketType::Disconnect => {
49                block_on(Disconnect::decode_async(reader, self)).map(Into::into)
50            }
51            PacketType::Auth => block_on(Auth::decode_async(reader, self)).map(Into::into),
52            PacketType::Pingreq | PacketType::Pingresp => unreachable!(),
53        }
54    }
55
56    fn remaining_len(&self) -> usize {
57        self.remaining_len as usize
58    }
59
60    fn is_eof_error(err: &Self::Error) -> bool {
61        err.is_eof()
62    }
63}
64
65pub type PollPacket<'a, T> = GenericPollPacket<'a, T, Header>;
66pub type PollPacketState = GenericPollPacketState<Header>;
67pub type PollBodyState = GenericPollBodyState<Header>;