1use crate::Error;
8use crate::codec::util::{decode_bytes, decode_string, encode_bytes, encode_string};
9use crate::protocol::common::{ConnectFrame, WillFrame};
10use crate::protocol::common::{ConnectHeader, connect};
11use crate::protocol::{Protocol, QoS, util};
12use bit_field::BitField;
13use bytes::{Bytes, BytesMut};
14use std::ops::RangeInclusive;
15
16const WILL_FLAG: usize = 2;
17const WILL_QOS: RangeInclusive<usize> = 3..=4;
18const WILL_RETAIN: usize = 5;
19
20#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct Will {
40 pub topic: String,
42
43 pub payload: Bytes,
45
46 pub qos: QoS,
48
49 pub retain: bool,
51}
52
53impl Will {
54 pub fn new<T: Into<String>>(topic: T, payload: Bytes, qos: QoS, retain: bool) -> Self {
60 let topic = topic.into();
61
62 if !util::is_valid_topic_name(&topic) {
63 panic!("Invalid topic name: '{}'", topic);
64 }
65
66 Will {
67 topic,
68 payload,
69 qos,
70 retain,
71 }
72 }
73}
74
75impl WillFrame for Will {
76 fn encoded_len(&self) -> usize {
80 2 + self.topic.len() + 2 + self.payload.len()
81 }
82
83 fn update_flags(&self, flags: &mut u8) {
85 flags.set_bit(WILL_FLAG, true);
87
88 flags.set_bits(WILL_QOS, self.qos as u8);
90
91 flags.set_bit(WILL_RETAIN, self.retain);
93 }
94
95 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
97 encode_string(buf, &self.topic);
98 encode_bytes(buf, &self.payload);
99 Ok(())
100 }
101
102 fn decode(buf: &mut Bytes, flags: u8) -> Result<Option<Self>, Error> {
104 if !flags.get_bit(WILL_FLAG) {
105 return Ok(None);
107 }
108
109 let qos = flags.get_bits(WILL_QOS).try_into()?;
110 let retain = flags.get_bit(WILL_RETAIN);
111
112 let topic = decode_string(buf)?;
113 if !util::is_valid_topic_name(&topic) {
114 return Err(Error::InvalidTopicName(topic));
115 }
116
117 let message = decode_bytes(buf)?;
118 Ok(Some(Will::new(topic, message, qos, retain)))
119 }
120}
121
122#[derive(Debug, Default, Copy, Clone, PartialEq, Eq)]
124pub(crate) struct Propertyless;
125
126impl ConnectFrame for ConnectHeader<Propertyless> {
127 fn encoded_len(&self) -> usize {
129 self.primary_encoded_len()
130 }
131
132 fn encode(&self, buf: &mut BytesMut) -> Result<(), Error> {
134 self.primary_encode(buf);
135 Ok(())
136 }
137
138 fn decode(buf: &mut Bytes) -> Result<Self, Error> {
140 Self::primary_decode(buf)
141 }
142}
143
144connect!(Connect<Propertyless, Will>, Protocol::V4);
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150 use crate::codec::PacketCodec;
151 use crate::codec::*;
152 use crate::protocol::*;
153 use bytes::{Bytes, BytesMut};
154 use std::time::Duration;
155 use tokio_util::codec::Decoder;
156
157 fn connect_sample() -> [u8; 43] {
158 [
159 (PacketType::Connect as u8) << 4, 0x29, 0x00, 0x04,
163 b'M', b'Q',
165 b'T',
166 b'T',
167 Protocol::V4.into(), 0b1101_0110, 0x00, 0x10,
171 0x00, 0x06,
173 b'c',
174 b'l',
175 b'i',
176 b'e',
177 b'n',
178 b't',
179 0x00, 0x04,
181 b'/',
182 b'a',
183 b'b',
184 b'c',
185 0x00, 0x03,
187 b'b',
188 b'y',
189 b'e',
190 0x00, 0x04,
192 b'u',
193 b's',
194 b'e',
195 b'r',
196 0x00, 0x04,
198 b'p',
199 b'a',
200 b's',
201 b's',
202 ]
203 }
204
205 fn connect_packet() -> Connect {
206 let auth = Some(Credentials::full("user", "pass"));
207 let will = Some(Will::new(
208 "/abc",
209 Bytes::from("bye"),
210 QoS::ExactlyOnce,
211 false,
212 ));
213
214 Connect::new("client", auth, will, Duration::from_secs(16), true)
215 }
216
217 #[test]
218 fn connect_decode() {
219 let mut codec = PacketCodec::new(None, None);
220
221 let mut buf = BytesMut::new();
222
223 buf.extend_from_slice(&connect_sample());
224
225 let raw_packet = codec.decode(&mut buf).unwrap().unwrap();
226 let packet = Connect::decode(raw_packet).unwrap();
227 assert_eq!(packet, connect_packet());
228 }
229
230 #[test]
231 fn connect_encode_v4() {
232 let packet = connect_packet();
233 let mut buf = BytesMut::new();
234 packet.encode(&mut buf).unwrap();
235 assert_eq!(buf, Vec::from(connect_sample()));
236 }
237}