hebo_codec/v3/
subscribe_ack.rs

1// Copyright (c) 2020 Xu Shaohua <shaohua@biofan.org>. All rights reserved.
2// Use of this source is governed by Apache-2.0 License that can be found
3// in the LICENSE file.
4
5use crate::{
6    ByteArray, DecodeError, DecodePacket, EncodeError, EncodePacket, FixedHeader, Packet, PacketId,
7    PacketType, QoS, VarIntError,
8};
9
10/// Reply to each subscribed topic.
11#[repr(u8)]
12#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
13pub enum SubscribeAck {
14    /// Maximum level of QoS the Server granted for this topic.
15    QoS(QoS),
16
17    /// This subscription if failed or not.
18    #[default]
19    Failed,
20}
21
22/// Reply to Subscribe packet.
23///
24/// Basic structure of packet is:
25/// ```txt
26/// +---------------------------+
27/// | Fixed header              |
28/// |                           |
29/// +---------------------------+
30/// | Packet id                 |
31/// |                           |
32/// +---------------------------+
33/// | Ack 0                     |
34/// +---------------------------+
35/// | Ack 1                     |
36/// +---------------------------+
37/// | Ack N ...                 |
38/// +---------------------------+
39/// ```
40#[allow(clippy::module_name_repetitions)]
41#[derive(Debug, Default, Clone, PartialEq, Eq)]
42pub struct SubscribeAckPacket {
43    /// `packet_id` field is identical in Subscribe packet.
44    packet_id: PacketId,
45
46    /// A list of acknowledgement to subscribed topics.
47    ///
48    /// The order of acknowledgement match the order of topic in Subscribe packet.
49    acknowledgements: Vec<SubscribeAck>,
50}
51
52impl SubscribeAckPacket {
53    /// Create a subscribe ack packet with `ack`.
54    #[must_use]
55    pub fn new(packet_id: PacketId, ack: SubscribeAck) -> Self {
56        Self {
57            packet_id,
58            acknowledgements: vec![ack],
59        }
60    }
61
62    /// Create a subscribe ack packet with multiple `acknowledgements`.
63    #[must_use]
64    pub fn with_vec(packet_id: PacketId, acknowledgements: Vec<SubscribeAck>) -> Self {
65        Self {
66            packet_id,
67            acknowledgements,
68        }
69    }
70
71    /// Update packet id.
72    pub fn set_packet_id(&mut self, packet_id: PacketId) -> &mut Self {
73        self.packet_id = packet_id;
74        self
75    }
76
77    /// Get current packet id.
78    #[must_use]
79    pub const fn packet_id(&self) -> PacketId {
80        self.packet_id
81    }
82
83    /// Update acknowledgement list.
84    pub fn set_ack(&mut self, ack: &[SubscribeAck]) -> &mut Self {
85        self.acknowledgements.clear();
86        self.acknowledgements.extend(ack);
87        self
88    }
89
90    /// Get current acknowledgements.
91    #[must_use]
92    pub fn acknowledgements(&self) -> &[SubscribeAck] {
93        &self.acknowledgements
94    }
95
96    fn get_fixed_header(&self) -> Result<FixedHeader, VarIntError> {
97        let remaining_length = PacketId::bytes() + QoS::bytes() * self.acknowledgements.len();
98        FixedHeader::new(PacketType::SubscribeAck, remaining_length)
99    }
100}
101
102impl DecodePacket for SubscribeAckPacket {
103    fn decode(ba: &mut ByteArray) -> Result<Self, DecodeError> {
104        let fixed_header = FixedHeader::decode(ba)?;
105        if fixed_header.packet_type() != PacketType::SubscribeAck {
106            return Err(DecodeError::InvalidPacketType);
107        }
108
109        let packet_id = PacketId::decode(ba)?;
110
111        let mut acknowledgements = Vec::new();
112        let mut remaining_length = PacketId::bytes();
113
114        while remaining_length < fixed_header.remaining_length() {
115            let payload = ba.read_byte()?;
116            remaining_length += QoS::bytes();
117            match payload & 0b1000_0011 {
118                0b1000_0000 => acknowledgements.push(SubscribeAck::Failed),
119                0b0000_0010 => acknowledgements.push(SubscribeAck::QoS(QoS::ExactOnce)),
120                0b0000_0001 => acknowledgements.push(SubscribeAck::QoS(QoS::AtLeastOnce)),
121                0b0000_0000 => acknowledgements.push(SubscribeAck::QoS(QoS::AtMostOnce)),
122
123                _ => return Err(DecodeError::InvalidQoS),
124            }
125        }
126
127        Ok(Self {
128            packet_id,
129            acknowledgements,
130        })
131    }
132}
133
134impl EncodePacket for SubscribeAckPacket {
135    fn encode(&self, buf: &mut Vec<u8>) -> Result<usize, EncodeError> {
136        let old_len = buf.len();
137
138        let fixed_header = self.get_fixed_header()?;
139        fixed_header.encode(buf)?;
140
141        // Write variable header
142        self.packet_id.encode(buf)?;
143        for ack in &self.acknowledgements {
144            let flag = {
145                match *ack {
146                    SubscribeAck::Failed => 0b1000_0000,
147                    SubscribeAck::QoS(qos) => qos as u8,
148                }
149            };
150            buf.push(flag);
151        }
152
153        Ok(buf.len() - old_len)
154    }
155}
156
157impl Packet for SubscribeAckPacket {
158    fn packet_type(&self) -> PacketType {
159        PacketType::SubscribeAck
160    }
161
162    fn bytes(&self) -> Result<usize, VarIntError> {
163        let fixed_header = self.get_fixed_header()?;
164        Ok(fixed_header.bytes() + fixed_header.remaining_length())
165    }
166}