1use crate::codec::util::{decode_bytes, decode_string, encode_bytes, encode_string};
8use crate::protocol::common::{connect, ConnectHeader};
9use crate::protocol::common::{ConnectFrame, WillFrame};
10use crate::protocol::{Protocol, QoS};
11use crate::Error;
12use bit_field::BitField;
13use bytes::{Bytes, BytesMut};
14use std::ops::RangeInclusive;
15
16const WILL_FLAG: usize = 2;
17const WILL_QOS: RangeInclusive<usize> = 3..=4;
18const WILL_RETAIN: usize = 5;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct Will {
39 pub topic: String,
41
42 pub payload: Bytes,
44
45 pub qos: QoS,
47
48 pub retain: bool,
50}
51
52impl Will {
53 pub fn new<T: Into<String>>(topic: T, payload: Bytes, qos: QoS, retain: bool) -> Self {
55 Will {
56 topic: topic.into(),
57 payload,
58 qos,
59 retain,
60 }
61 }
62}
63
64impl WillFrame for Will {
65 fn encoded_len(&self) -> usize {
69 2 + self.topic.len() + 2 + self.payload.len()
70 }
71
72 fn update_flags(&self, flags: &mut u8) {
74 flags.set_bit(WILL_FLAG, true);
76
77 flags.set_bits(WILL_QOS, self.qos as u8);
79
80 flags.set_bit(WILL_RETAIN, self.retain);
82 }
83
84 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
86 encode_string(buf, &self.topic);
87 encode_bytes(buf, &self.payload);
88 Ok(())
89 }
90
91 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
93 if !flags.get_bit(WILL_FLAG) {
94 return Ok(None);
96 }
97
98 let qos = flags.get_bits(WILL_QOS).try_into()?;
99 let retain = flags.get_bit(WILL_RETAIN);
100
101 let topic = decode_string(buf)?;
102 let message = decode_bytes(buf)?;
103 Ok(Some(Will::new(topic, message, qos, retain)))
104 }
105}
106
107#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
109pub(crate) struct Propertyless;
110
111impl ConnectFrame for ConnectHeader<Propertyless> {
112 fn encoded_len(&self) -> usize {
114 self.primary_encoded_len()
115 }
116
117 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
119 self.primary_encode(buf);
120 Ok(())
121 }
122
123 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
125 Self::primary_decode(buf)
126 }
127}
128
129connect!(Connect<Propertyless, Will>, Protocol::V4);
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use crate::codec::PacketCodec;
136 use crate::codec::*;
137 use crate::protocol::*;
138 use bytes::{Bytes, BytesMut};
139 use tokio_util::codec::Decoder;
140
141 fn connect_sample() -> [u8; 43] {
142 [
143 (PacketType::Connect as u8) << 4, 0x29, 0x00, 0x04,
147 b'M', b'Q',
149 b'T',
150 b'T',
151 Protocol::V4.into(), 0b1101_0110, 0x00, 0x10,
155 0x00, 0x06,
157 b'c',
158 b'l',
159 b'i',
160 b'e',
161 b'n',
162 b't',
163 0x00, 0x04,
165 b'/',
166 b'a',
167 b'b',
168 b'c',
169 0x00, 0x03,
171 b'b',
172 b'y',
173 b'e',
174 0x00, 0x04,
176 b'u',
177 b's',
178 b'e',
179 b'r',
180 0x00, 0x04,
182 b'p',
183 b'a',
184 b's',
185 b's',
186 ]
187 }
188
189 fn connect_packet() -> Connect {
190 let auth = Some(Credentials::login("user", "pass"));
191 let will = Some(Will::new(
192 "/abc",
193 Bytes::from("bye"),
194 QoS::ExactlyOnce,
195 false,
196 ));
197
198 Connect::new("client", auth, will, 16, true)
199 }
200
201 #[test]
202 fn connect_decode() {
203 let mut codec = PacketCodec::new(None, None);
204
205 let mut buf = BytesMut::new();
206
207 buf.extend_from_slice(&connect_sample());
208
209 let raw_packet = codec.decode(&mut buf).unwrap().unwrap();
210 let packet = Connect::decode(raw_packet).unwrap();
211 assert_eq!(packet, connect_packet());
212 }
213
214 #[test]
215 fn connect_encode_v4() {
216 let packet = connect_packet();
217 let mut buf = BytesMut::new();
218 packet.encode(&mut buf).unwrap();
219 assert_eq!(buf, Vec::from(connect_sample()));
220 }
221}