1use std::io::{self, Result, Error, ErrorKind};
2
3use byteorder::{BigEndian, WriteBytesExt};
4
5use proto::*;
6use packet::*;
7
8pub const MAX_VARIABLE_LENGTH: usize = 268435455; pub trait WritePacketHelper: io::Write {
11 #[inline]
12 fn write_fixed_header(&mut self, packet: &Packet) -> Result<usize> {
13 let content_size = self.calc_content_size(packet);
14
15 debug!(
16 "write FixedHeader {{ type={}, flags={}, remaining_length={} }} ",
17 packet.packet_type(),
18 packet.packet_flags(),
19 content_size
20 );
21
22 Ok(
23 self.write(
24 &[(packet.packet_type() << 4) | packet.packet_flags()],
25 )? + self.write_variable_length(content_size)?,
26 )
27 }
28
29 fn calc_content_size(&mut self, packet: &Packet) -> usize {
30 match *packet {
31 Packet::Connect {
32 ref last_will,
33 client_id,
34 username,
35 password,
36 ..
37 } => {
38 let mut n = 2 + 4 + 1 + 1 + 2;
40
41 n += 2 + client_id.len();
43
44 if let &Some(LastWill { topic, message, .. }) = last_will {
46 n += 2 + topic.len() + 2 + message.len();
47 }
48
49 if let Some(s) = username {
50 n += 2 + s.len();
51 }
52
53 if let Some(s) = password {
54 n += 2 + s.len();
55 }
56
57 n
58 }
59
60 Packet::ConnectAck { .. } => 2, Packet::Publish {
63 topic,
64 packet_id,
65 payload,
66 ..
67 } => {
68 2 + topic.len() + packet_id.map_or(0, |_| 2) + payload.len()
70 }
71
72 Packet::PublishAck { .. } |
73 Packet::PublishReceived { .. } |
74 Packet::PublishRelease { .. } |
75 Packet::PublishComplete { .. } |
76 Packet::UnsubscribeAck { .. } => 2, Packet::Subscribe { ref topic_filters, .. } => {
79 2 +
80 topic_filters.iter().fold(0, |acc, &(filter, _)| {
81 acc + 2 + filter.len() + 1
82 })
83 }
84
85 Packet::SubscribeAck { ref status, .. } => 2 + status.len(),
86
87 Packet::Unsubscribe { ref topic_filters, .. } => {
88 2 +
89 topic_filters.iter().fold(
90 0,
91 |acc, &filter| acc + 2 + filter.len(),
92 )
93 }
94
95 Packet::PingRequest | Packet::PingResponse | Packet::Disconnect => 0,
96 }
97 }
98
99 fn write_content(&mut self, packet: &Packet) -> Result<usize> {
100 let mut n = 0;
101
102 match *packet {
103 Packet::Connect {
104 protocol,
105 clean_session,
106 keep_alive,
107 ref last_will,
108 client_id,
109 username,
110 password,
111 } => {
112 n += self.write_utf8_str(protocol.name())?;
113
114 let mut flags = ConnectFlags::empty();
115
116 if username.is_some() {
117 flags |= ConnectFlags::USERNAME;
118 }
119 if password.is_some() {
120 flags |= ConnectFlags::PASSWORD;
121 }
122
123 if let &Some(LastWill { qos, retain, .. }) = last_will {
124 flags |= ConnectFlags::WILL;
125
126 if retain {
127 flags |= ConnectFlags::WILL_RETAIN;
128 }
129
130 let b: u8 = qos.into();
131
132 flags |= ConnectFlags::from_bits_truncate(b << WILL_QOS_SHIFT);
133 }
134
135 if clean_session {
136 flags |= ConnectFlags::CLEAN_SESSION;
137 }
138
139 n += self.write(&[protocol.level(), flags.bits()])?;
140
141 self.write_u16::<BigEndian>(keep_alive)?;
142 n += 2;
143
144 n += self.write_utf8_str(client_id)?;
145
146 if let &Some(LastWill { topic, message, .. }) = last_will {
147 n += self.write_utf8_str(topic)?;
148 n += self.write_fixed_length_bytes(message)?;
149 }
150
151 if let Some(s) = username {
152 n += self.write_utf8_str(s)?;
153 }
154
155 if let Some(s) = password {
156 n += self.write_fixed_length_bytes(s)?;
157 }
158 }
159
160 Packet::ConnectAck {
161 session_present,
162 return_code,
163 } => {
164 n += self.write(
165 &[
166 if session_present { 0x01 } else { 0x00 },
167 return_code.into(),
168 ],
169 )?;
170 }
171
172 Packet::Publish {
173 qos,
174 topic,
175 packet_id,
176 payload,
177 ..
178 } => {
179 n += self.write_utf8_str(topic)?;
180
181 if qos == QoS::AtLeastOnce || qos == QoS::ExactlyOnce {
182 self.write_u16::<BigEndian>(packet_id.unwrap())?;
183
184 n += 2;
185 }
186
187 n += self.write(payload)?;
188 }
189
190 Packet::PublishAck { packet_id } |
191 Packet::PublishReceived { packet_id } |
192 Packet::PublishRelease { packet_id } |
193 Packet::PublishComplete { packet_id } |
194 Packet::UnsubscribeAck { packet_id } => {
195 self.write_u16::<BigEndian>(packet_id)?;
196
197 n += 2;
198 }
199
200 Packet::Subscribe {
201 packet_id,
202 ref topic_filters,
203 } => {
204 self.write_u16::<BigEndian>(packet_id)?;
205
206 n += 2;
207
208 for &(filter, qos) in topic_filters {
209 n += self.write_utf8_str(filter)? + self.write(&[qos.into()])?;
210 }
211 }
212
213 Packet::SubscribeAck {
214 packet_id,
215 ref status,
216 } => {
217 self.write_u16::<BigEndian>(packet_id)?;
218
219 n += 2;
220
221 let buf: Vec<u8> = status
222 .iter()
223 .map(|s| if let SubscribeReturnCode::Success(qos) = *s {
224 qos.into()
225 } else {
226 0x80
227 })
228 .collect();
229
230 n += self.write(&buf)?;
231 }
232
233 Packet::Unsubscribe {
234 packet_id,
235 ref topic_filters,
236 } => {
237 self.write_u16::<BigEndian>(packet_id)?;
238
239 n += 2;
240
241 for filter in topic_filters {
242 n += self.write_utf8_str(filter)?;
243 }
244 }
245
246 Packet::PingRequest | Packet::PingResponse | Packet::Disconnect => {}
247 }
248
249 Ok(n)
250 }
251
252 #[inline]
253 fn write_utf8_str(&mut self, s: &str) -> Result<usize> {
254 self.write_u16::<BigEndian>(s.len() as u16)?;
255
256 Ok(2 + self.write(s.as_bytes())?)
257 }
258
259 #[inline]
260 fn write_fixed_length_bytes(&mut self, s: &[u8]) -> Result<usize> {
261 self.write_u16::<BigEndian>(s.len() as u16)?;
262
263 Ok(2 + self.write(s)?)
264 }
265
266 #[inline]
267 fn write_variable_length(&mut self, size: usize) -> Result<usize> {
268 if size > MAX_VARIABLE_LENGTH {
269 Err(Error::new(ErrorKind::Other, "out of range"))
270 } else if size < 128 {
271 self.write(&[size as u8])
272 } else {
273 let mut v = Vec::new();
274 let mut s = size;
275
276 while s > 0 {
277 let mut b = (s % 128) as u8;
278
279 s >>= 7;
280
281 if s > 0 {
282 b |= 0x80;
283 }
284
285 v.push(b);
286 }
287
288 debug!("write variable length {} in {} bytes", size, v.len());
289
290 self.write(&v)
291 }
292 }
293}
294
295pub trait WritePacketExt: io::Write {
307 #[inline]
308 fn write_packet(&mut self, packet: &Packet) -> Result<usize> {
310 Ok(
311 self.write_fixed_header(packet)? + self.write_content(packet)?,
312 )
313 }
314}
315
316impl<W: io::Write + ?Sized> WritePacketHelper for W {}
317impl<W: io::Write + ?Sized> WritePacketExt for W {}
318
319#[cfg(test)]
320mod tests {
321 extern crate env_logger;
322
323 use decode::*;
324 use super::*;
325
326 #[test]
327 fn test_encode_variable_length() {
328 let mut v = Vec::new();
329
330 assert_eq!(v.write_variable_length(123).unwrap(), 1);
331 assert_eq!(v, &[123]);
332
333 v.clear();
334
335 assert_eq!(v.write_variable_length(129).unwrap(), 2);
336 assert_eq!(v, b"\x81\x01");
337
338 v.clear();
339
340 assert_eq!(v.write_variable_length(16383).unwrap(), 2);
341 assert_eq!(v, b"\xff\x7f");
342
343 v.clear();
344
345 assert_eq!(v.write_variable_length(2097151).unwrap(), 3);
346 assert_eq!(v, b"\xff\xff\x7f");
347
348 v.clear();
349
350 assert_eq!(v.write_variable_length(268435455).unwrap(), 4);
351 assert_eq!(v, b"\xff\xff\xff\x7f");
352
353 assert!(v.write_variable_length(MAX_VARIABLE_LENGTH + 1).is_err())
354 }
355
356 #[test]
357 fn test_encode_fixed_header() {
358 let mut v = Vec::new();
359 let p = Packet::PingRequest;
360
361 assert_eq!(v.calc_content_size(&p), 0);
362 assert_eq!(v.write_fixed_header(&p).unwrap(), 2);
363 assert_eq!(v, b"\xc0\x00");
364
365 v.clear();
366
367 let p = Packet::Publish {
368 dup: true,
369 retain: true,
370 qos: QoS::ExactlyOnce,
371 topic: "topic",
372 packet_id: Some(0x4321),
373 payload: &(0..255).map(|b| b).collect::<Vec<u8>>(),
374 };
375
376 assert_eq!(v.calc_content_size(&p), 264);
377 assert_eq!(v.write_fixed_header(&p).unwrap(), 3);
378 assert_eq!(v, b"\x3d\x88\x02");
379 }
380
381 macro_rules! assert_packet {
382 ($p:expr, $data:expr) => {
383 let mut v = Vec::new();
384 assert_eq!(v.write_packet(&$p).unwrap(), $data.len());
385 assert_eq!(v, $data);
386 assert_eq!(read_packet($data).unwrap(), (&b""[..], $p));
387 }
388 }
389
390 #[test]
391 fn test_encode_connect_packets() {
392 assert_packet!(
393 Packet::Connect {
394 protocol: Protocol::MQTT(4),
395 clean_session: false,
396 keep_alive: 60,
397 client_id: "12345",
398 last_will: None,
399 username: Some("user"),
400 password: Some(b"pass"),
401 },
402 &b"\x10\x1D\x00\x04MQTT\x04\xC0\x00\x3C\x00\
403\x0512345\x00\x04user\x00\x04pass"[..]
404 );
405
406 assert_packet!(
407 Packet::Connect {
408 protocol: Protocol::MQTT(4),
409 clean_session: false,
410 keep_alive: 60,
411 client_id: "12345",
412 last_will: Some(LastWill {
413 qos: QoS::ExactlyOnce,
414 retain: false,
415 topic: "topic",
416 message: b"message",
417 }),
418 username: None,
419 password: None,
420 },
421 &b"\x10\x21\x00\x04MQTT\x04\x14\x00\x3C\x00\
422\x0512345\x00\x05topic\x00\x07message"[..]
423 );
424
425 assert_packet!(Packet::Disconnect, b"\xe0\x00");
426 }
427
428 #[test]
429 fn test_encode_publish_packets() {
430 assert_packet!(
431 Packet::Publish {
432 dup: true,
433 retain: true,
434 qos: QoS::ExactlyOnce,
435 topic: "topic",
436 packet_id: Some(0x4321),
437 payload: b"data",
438 },
439 b"\x3d\x0D\x00\x05topic\x43\x21data"
440 );
441
442 assert_packet!(
443 Packet::Publish {
444 dup: false,
445 retain: false,
446 qos: QoS::AtMostOnce,
447 topic: "topic",
448 packet_id: None,
449 payload: b"data",
450 },
451 b"\x30\x0b\x00\x05topicdata"
452 );
453 }
454
455 #[test]
456 fn test_encode_subscribe_packets() {
457 assert_packet!(
458 Packet::Subscribe {
459 packet_id: 0x1234,
460 topic_filters: vec![("test", QoS::AtLeastOnce), ("filter", QoS::ExactlyOnce)],
461 },
462 b"\x82\x12\x12\x34\x00\x04test\x01\x00\x06filter\x02"
463 );
464
465 assert_packet!(
466 Packet::SubscribeAck {
467 packet_id: 0x1234,
468 status: vec![
469 SubscribeReturnCode::Success(QoS::AtLeastOnce),
470 SubscribeReturnCode::Failure,
471 SubscribeReturnCode::Success(QoS::ExactlyOnce),
472 ],
473 },
474 b"\x90\x05\x12\x34\x01\x80\x02"
475 );
476
477 assert_packet!(
478 Packet::Unsubscribe {
479 packet_id: 0x1234,
480 topic_filters: vec!["test", "filter"],
481 },
482 b"\xa2\x10\x12\x34\x00\x04test\x00\x06filter"
483 );
484
485 assert_packet!(
486 Packet::UnsubscribeAck { packet_id: 0x4321 },
487 b"\xb0\x02\x43\x21"
488 );
489 }
490
491 #[test]
492 fn test_encode_ping_packets() {
493 assert_packet!(Packet::PingRequest, b"\xc0\x00");
494 assert_packet!(Packet::PingResponse, b"\xd0\x00");
495 }
496}