sage_mqtt/control/
pubcomp.rs

1use crate::{
2    codec, PropertiesDecoder, Property,
3    ReasonCode::{self, ProtocolError},
4    Result as SageResult,
5};
6use std::{convert::TryInto, marker::Unpin};
7use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
8
9/// The `PubComp` packet is sent during an `ExactlyOnce` quality of service
10/// publish.
11#[derive(Debug, PartialEq, Clone)]
12pub struct PubComp {
13    /// The packet identifier is used to identify the message throughout the
14    /// communication.
15    pub packet_identifier: u16,
16
17    /// The reason code for the acknowledgement. Can be any of:
18    /// - `Success`
19    /// - `NoMatchingSubscribers`
20    /// - `UnspecifiedError`
21    /// - `ImplementationSpecificError`
22    /// - `NotAuthorized`
23    /// - `TopicNameInvalid`
24    /// - `PacketIdentifierInUse`
25    /// - `QuotaExceeded`
26    /// - `PayloadFormatInvalid`
27    pub reason_code: ReasonCode,
28
29    /// If available, the reason string describing the acknowledgement.
30    pub reason_string: Option<String>,
31
32    /// General purpose user properties
33    pub user_properties: Vec<(String, String)>,
34}
35
36impl Default for PubComp {
37    fn default() -> Self {
38        PubComp {
39            packet_identifier: 0,
40            reason_code: ReasonCode::Success,
41            reason_string: None,
42            user_properties: Default::default(),
43        }
44    }
45}
46
47impl PubComp {
48    pub(crate) async fn write<W: AsyncWrite + Unpin>(self, writer: &mut W) -> SageResult<usize> {
49        let mut n_bytes = codec::write_two_byte_integer(self.packet_identifier, writer).await?;
50
51        let mut properties = Vec::new();
52
53        if let Some(v) = self.reason_string {
54            n_bytes += Property::ReasonString(v).encode(&mut properties).await?;
55        }
56        for (k, v) in self.user_properties {
57            n_bytes += Property::UserProperty(k, v).encode(&mut properties).await?;
58        }
59
60        if n_bytes == 2 && self.reason_code != ReasonCode::Success {
61            Ok(2)
62        } else {
63            n_bytes += codec::write_reason_code(self.reason_code, writer).await?;
64            n_bytes += codec::write_variable_byte_integer(properties.len() as u32, writer).await?;
65            writer.write_all(&properties).await?;
66            Ok(n_bytes)
67        }
68    }
69
70    pub(crate) async fn read<R: AsyncRead + Unpin>(
71        reader: &mut R,
72        shortened: bool,
73    ) -> SageResult<Self> {
74        let packet_identifier = codec::read_two_byte_integer(reader).await?;
75
76        let mut pubcomp = PubComp {
77            packet_identifier,
78            ..Default::default()
79        };
80
81        if shortened {
82            pubcomp.reason_code = ReasonCode::Success;
83        } else {
84            pubcomp.reason_code = codec::read_byte(reader).await?.try_into()?;
85
86            let mut properties = PropertiesDecoder::take(reader).await?;
87            while properties.has_properties() {
88                match properties.read().await? {
89                    Property::ReasonString(v) => pubcomp.reason_string = Some(v),
90                    Property::UserProperty(k, v) => pubcomp.user_properties.push((k, v)),
91                    _ => return Err(ProtocolError.into()),
92                }
93            }
94        }
95
96        Ok(pubcomp)
97    }
98}
99
100#[cfg(test)]
101mod unit {
102
103    use super::*;
104    use std::io::Cursor;
105
106    fn encoded() -> Vec<u8> {
107        vec![
108            5, 57, 146, 28, 31, 0, 11, 66, 108, 97, 99, 107, 32, 66, 101, 116, 116, 121, 38, 0, 6,
109            72, 195, 166, 114, 121, 97, 0, 3, 67, 97, 116,
110        ]
111    }
112
113    fn decoded() -> PubComp {
114        PubComp {
115            packet_identifier: 1337,
116            reason_code: ReasonCode::PacketIdentifierNotFound,
117            reason_string: Some("Black Betty".into()),
118            user_properties: vec![("Hærya".into(), "Cat".into())],
119        }
120    }
121
122    #[tokio::test]
123    async fn encode() {
124        let test_data = decoded();
125        let mut tested_result = Vec::new();
126        let n_bytes = test_data.write(&mut tested_result).await.unwrap();
127        assert_eq!(tested_result, encoded());
128        assert_eq!(n_bytes, 32);
129    }
130
131    #[tokio::test]
132    async fn decode() {
133        let mut test_data = Cursor::new(encoded());
134        let tested_result = PubComp::read(&mut test_data, false).await.unwrap();
135        assert_eq!(tested_result, decoded());
136    }
137}