1use crate::codec::util::{decode_bytes, decode_string, encode_bytes, encode_string};
8use crate::protocol::common::{connect, ConnectHeader};
9use crate::protocol::common::{ConnectFrame, WillFrame};
10use crate::protocol::{Protocol, QoS};
11use crate::Error;
12use bit_field::BitField;
13use bytes::{Bytes, BytesMut};
14use std::ops::RangeInclusive;
15
16const WILL_FLAG: usize = 2;
17const WILL_QOS: RangeInclusive<usize> = 3..=4;
18const WILL_RETAIN: usize = 5;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct Will {
39 pub topic: String,
41
42 pub payload: Bytes,
44
45 pub qos: QoS,
47
48 pub retain: bool,
50}
51
52impl Will {
53 pub fn new<T: Into<String>>(topic: T, payload: Bytes, qos: QoS, retain: bool) -> Self {
55 Will {
56 topic: topic.into(),
57 payload,
58 qos,
59 retain,
60 }
61 }
62}
63
64impl WillFrame for Will {
65 fn encoded_len(&self) -> usize {
69 2 + self.topic.len() + 2 + self.payload.len()
70 }
71
72 fn update_flags(&self, flags: &mut u8) {
74 flags.set_bit(WILL_FLAG, true);
76
77 flags.set_bits(WILL_QOS, self.qos as u8);
79
80 flags.set_bit(WILL_RETAIN, self.retain);
82 }
83
84 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
86 encode_string(buf, &self.topic);
87 encode_bytes(buf, &self.payload);
88 Ok(())
89 }
90
91 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
93 if !flags.get_bit(WILL_FLAG) {
94 return Ok(None);
96 }
97
98 let qos = flags.get_bits(WILL_QOS).try_into()?;
99 let retain = flags.get_bit(WILL_RETAIN);
100
101 let topic = decode_string(buf)?;
102 let message = decode_bytes(buf)?;
103 Ok(Some(Will::new(topic, message, qos, retain)))
104 }
105}
106
107#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
109pub(crate) struct Propertyless;
110
111impl ConnectFrame for ConnectHeader<Propertyless> {
112 fn encoded_len(&self) -> usize {
114 self.primary_encoded_len()
115 }
116
117 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
119 self.primary_encode(buf);
120 Ok(())
121 }
122
123 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
125 Self::primary_decode(buf)
126 }
127}
128
129connect!(Connect<Propertyless, Will>, Protocol::V4);
131
132#[cfg(test)]
133mod tests {
134 use std::time::Duration;
135 use super::*;
136 use crate::codec::PacketCodec;
137 use crate::codec::*;
138 use crate::protocol::*;
139 use bytes::{Bytes, BytesMut};
140 use tokio_util::codec::Decoder;
141
142 fn connect_sample() -> [u8; 43] {
143 [
144 (PacketType::Connect as u8) << 4, 0x29, 0x00, 0x04,
148 b'M', b'Q',
150 b'T',
151 b'T',
152 Protocol::V4.into(), 0b1101_0110, 0x00, 0x10,
156 0x00, 0x06,
158 b'c',
159 b'l',
160 b'i',
161 b'e',
162 b'n',
163 b't',
164 0x00, 0x04,
166 b'/',
167 b'a',
168 b'b',
169 b'c',
170 0x00, 0x03,
172 b'b',
173 b'y',
174 b'e',
175 0x00, 0x04,
177 b'u',
178 b's',
179 b'e',
180 b'r',
181 0x00, 0x04,
183 b'p',
184 b'a',
185 b's',
186 b's',
187 ]
188 }
189
190 fn connect_packet() -> Connect {
191 let auth = Some(Credentials::login("user", "pass"));
192 let will = Some(Will::new(
193 "/abc",
194 Bytes::from("bye"),
195 QoS::ExactlyOnce,
196 false,
197 ));
198
199 Connect::new("client", auth, will, Duration::from_secs(16), true)
200 }
201
202 #[test]
203 fn connect_decode() {
204 let mut codec = PacketCodec::new(None, None);
205
206 let mut buf = BytesMut::new();
207
208 buf.extend_from_slice(&connect_sample());
209
210 let raw_packet = codec.decode(&mut buf).unwrap().unwrap();
211 let packet = Connect::decode(raw_packet).unwrap();
212 assert_eq!(packet, connect_packet());
213 }
214
215 #[test]
216 fn connect_encode_v4() {
217 let packet = connect_packet();
218 let mut buf = BytesMut::new();
219 packet.encode(&mut buf).unwrap();
220 assert_eq!(buf, Vec::from(connect_sample()));
221 }
222}