stackforge_core/layer/mqtt/
builder.rs1use super::{CONNACK, CONNECT, PUBLISH, SUBACK, SUBSCRIBE, encode_variable_length};
23
24#[derive(Debug, Clone)]
28pub struct MqttBuilder {
29 msg_type: u8,
31 dup: bool,
33 qos: u8,
35 retain: bool,
37
38 proto_name: Vec<u8>,
41 proto_level: u8,
43 clean_session: bool,
45 will_flag: bool,
47 will_qos: u8,
49 will_retain: bool,
51 username_flag: bool,
53 password_flag: bool,
55 keep_alive: u16,
57 client_id: Vec<u8>,
59 will_topic: Vec<u8>,
61 will_msg: Vec<u8>,
63 username: Vec<u8>,
65 password: Vec<u8>,
67
68 topic: Vec<u8>,
71 msg_id: u16,
73
74 topics: Vec<(Vec<u8>, u8)>,
77
78 retcodes: Vec<u8>,
81
82 sess_present: u8,
85 ret_code: u8,
87
88 value: Vec<u8>,
91}
92
93impl Default for MqttBuilder {
94 fn default() -> Self {
95 Self {
96 msg_type: 12, dup: false,
98 qos: 0,
99 retain: false,
100 proto_name: b"MQTT".to_vec(),
101 proto_level: 4,
102 clean_session: false,
103 will_flag: false,
104 will_qos: 0,
105 will_retain: false,
106 username_flag: false,
107 password_flag: false,
108 keep_alive: 0,
109 client_id: Vec::new(),
110 will_topic: Vec::new(),
111 will_msg: Vec::new(),
112 username: Vec::new(),
113 password: Vec::new(),
114 topic: Vec::new(),
115 msg_id: 0,
116 topics: Vec::new(),
117 retcodes: Vec::new(),
118 sess_present: 0,
119 ret_code: 0,
120 value: Vec::new(),
121 }
122 }
123}
124
125impl MqttBuilder {
126 #[must_use]
128 pub fn new() -> Self {
129 Self::default()
130 }
131
132 #[must_use]
136 pub fn connect(mut self) -> Self {
137 self.msg_type = CONNECT;
138 self
139 }
140
141 #[must_use]
143 pub fn connack(mut self) -> Self {
144 self.msg_type = CONNACK;
145 self
146 }
147
148 #[must_use]
150 pub fn publish(mut self) -> Self {
151 self.msg_type = PUBLISH;
152 self
153 }
154
155 #[must_use]
157 pub fn puback(mut self) -> Self {
158 self.msg_type = super::PUBACK;
159 self
160 }
161
162 #[must_use]
164 pub fn pubrec(mut self) -> Self {
165 self.msg_type = super::PUBREC;
166 self
167 }
168
169 #[must_use]
171 pub fn pubrel(mut self) -> Self {
172 self.msg_type = super::PUBREL;
173 self
174 }
175
176 #[must_use]
178 pub fn pubcomp(mut self) -> Self {
179 self.msg_type = super::PUBCOMP;
180 self
181 }
182
183 #[must_use]
185 pub fn subscribe(mut self) -> Self {
186 self.msg_type = SUBSCRIBE;
187 self
188 }
189
190 #[must_use]
192 pub fn suback(mut self) -> Self {
193 self.msg_type = SUBACK;
194 self
195 }
196
197 #[must_use]
199 pub fn unsubscribe(mut self) -> Self {
200 self.msg_type = super::UNSUBSCRIBE;
201 self
202 }
203
204 #[must_use]
206 pub fn unsuback(mut self) -> Self {
207 self.msg_type = super::UNSUBACK;
208 self
209 }
210
211 #[must_use]
213 pub fn pingreq(mut self) -> Self {
214 self.msg_type = super::PINGREQ;
215 self
216 }
217
218 #[must_use]
220 pub fn pingresp(mut self) -> Self {
221 self.msg_type = super::PINGRESP;
222 self
223 }
224
225 #[must_use]
227 pub fn disconnect(mut self) -> Self {
228 self.msg_type = super::DISCONNECT;
229 self
230 }
231
232 #[must_use]
234 pub fn msg_type(mut self, t: u8) -> Self {
235 self.msg_type = t;
236 self
237 }
238
239 #[must_use]
243 pub fn dup(mut self, val: bool) -> Self {
244 self.dup = val;
245 self
246 }
247
248 #[must_use]
250 pub fn qos(mut self, val: u8) -> Self {
251 self.qos = val;
252 self
253 }
254
255 #[must_use]
257 pub fn retain(mut self, val: bool) -> Self {
258 self.retain = val;
259 self
260 }
261
262 pub fn proto_name<T: Into<Vec<u8>>>(mut self, name: T) -> Self {
266 self.proto_name = name.into();
267 self
268 }
269
270 #[must_use]
272 pub fn proto_level(mut self, level: u8) -> Self {
273 self.proto_level = level;
274 self
275 }
276
277 #[must_use]
279 pub fn clean_session(mut self, val: bool) -> Self {
280 self.clean_session = val;
281 self
282 }
283
284 #[must_use]
286 pub fn will(mut self, topic: &[u8], msg: &[u8], qos: u8, retain: bool) -> Self {
287 self.will_flag = true;
288 self.will_topic = topic.to_vec();
289 self.will_msg = msg.to_vec();
290 self.will_qos = qos;
291 self.will_retain = retain;
292 self
293 }
294
295 pub fn username<T: Into<Vec<u8>>>(mut self, name: T) -> Self {
297 self.username_flag = true;
298 self.username = name.into();
299 self
300 }
301
302 pub fn password<T: Into<Vec<u8>>>(mut self, pass: T) -> Self {
304 self.password_flag = true;
305 self.password = pass.into();
306 self
307 }
308
309 #[must_use]
311 pub fn keep_alive(mut self, secs: u16) -> Self {
312 self.keep_alive = secs;
313 self
314 }
315
316 pub fn client_id<T: Into<Vec<u8>>>(mut self, id: T) -> Self {
318 self.client_id = id.into();
319 self
320 }
321
322 pub fn topic<T: Into<Vec<u8>>>(mut self, t: T) -> Self {
326 self.topic = t.into();
327 self
328 }
329
330 #[must_use]
332 pub fn msg_id(mut self, id: u16) -> Self {
333 self.msg_id = id;
334 self
335 }
336
337 pub fn payload<T: Into<Vec<u8>>>(mut self, data: T) -> Self {
339 self.value = data.into();
340 self
341 }
342
343 #[must_use]
347 pub fn add_topic(mut self, filter: &[u8], qos: u8) -> Self {
348 self.topics.push((filter.to_vec(), qos));
349 self
350 }
351
352 pub fn retcodes<T: Into<Vec<u8>>>(mut self, codes: T) -> Self {
356 self.retcodes = codes.into();
357 self
358 }
359
360 #[must_use]
364 pub fn sess_present(mut self, val: u8) -> Self {
365 self.sess_present = val;
366 self
367 }
368
369 #[must_use]
371 pub fn ret_code(mut self, code: u8) -> Self {
372 self.ret_code = code;
373 self
374 }
375
376 #[must_use]
380 pub fn remaining_size(&self) -> usize {
381 match self.msg_type {
382 CONNECT => {
383 let var_header = 2 + self.proto_name.len() + 1 + 1 + 2;
385 let mut payload_len = 2 + self.client_id.len();
387 if self.will_flag {
389 payload_len += 2 + self.will_topic.len() + 2 + self.will_msg.len();
390 }
391 if self.username_flag {
393 payload_len += 2 + self.username.len();
394 }
395 if self.password_flag {
397 payload_len += 2 + self.password.len();
398 }
399 var_header + payload_len
400 },
401 CONNACK => 2, PUBLISH => {
403 let mut len = 2 + self.topic.len();
405 if self.qos > 0 {
406 len += 2; }
408 len += self.value.len();
409 len
410 },
411 super::PUBACK | super::PUBREC | super::PUBREL | super::PUBCOMP | super::UNSUBACK => {
412 2 },
414 SUBSCRIBE => {
415 let mut len = 2;
417 for (filter, _qos) in &self.topics {
418 len += 2 + filter.len() + 1; }
420 len
421 },
422 SUBACK => {
423 2 + self.retcodes.len()
425 },
426 super::UNSUBSCRIBE => {
427 let mut len = 2;
429 for (filter, _qos) in &self.topics {
430 len += 2 + filter.len();
431 }
432 len
433 },
434 _ => 0,
436 }
437 }
438
439 #[must_use]
441 pub fn header_size(&self) -> usize {
442 let rem = self.remaining_size() as u32;
443 1 + encode_variable_length(rem).len()
444 }
445
446 #[must_use]
448 pub fn packet_size(&self) -> usize {
449 self.header_size() + self.remaining_size()
450 }
451
452 #[must_use]
456 pub fn build(&self) -> Vec<u8> {
457 let remaining = self.remaining_size();
458 let rem_encoded = encode_variable_length(remaining as u32);
459 let total = 1 + rem_encoded.len() + remaining;
460 let mut buf = Vec::with_capacity(total);
461
462 let mut byte0: u8 = (self.msg_type & 0x0F) << 4;
464 if self.dup {
465 byte0 |= 0x08;
466 }
467 byte0 |= (self.qos & 0x03) << 1;
468 if self.retain {
469 byte0 |= 0x01;
470 }
471 match self.msg_type {
473 SUBSCRIBE | super::UNSUBSCRIBE | super::PUBREL => {
474 byte0 = (self.msg_type << 4) | 0x02;
475 },
476 _ => {},
477 }
478 buf.push(byte0);
479
480 buf.extend_from_slice(&rem_encoded);
482
483 match self.msg_type {
485 CONNECT => self.build_connect(&mut buf),
486 CONNACK => self.build_connack(&mut buf),
487 PUBLISH => self.build_publish(&mut buf),
488 super::PUBACK | super::PUBREC | super::PUBREL | super::PUBCOMP | super::UNSUBACK => {
489 buf.extend_from_slice(&self.msg_id.to_be_bytes());
490 },
491 SUBSCRIBE => self.build_subscribe(&mut buf),
492 SUBACK => self.build_suback(&mut buf),
493 super::UNSUBSCRIBE => {
494 buf.extend_from_slice(&self.msg_id.to_be_bytes());
495 for (filter, _qos) in &self.topics {
496 buf.extend_from_slice(&(filter.len() as u16).to_be_bytes());
497 buf.extend_from_slice(filter);
498 }
499 },
500 _ => {},
502 }
503
504 buf
505 }
506
507 fn build_connect(&self, buf: &mut Vec<u8>) {
508 buf.extend_from_slice(&(self.proto_name.len() as u16).to_be_bytes());
510 buf.extend_from_slice(&self.proto_name);
511
512 buf.push(self.proto_level);
514
515 let mut flags: u8 = 0;
517 if self.username_flag {
518 flags |= 0x80;
519 }
520 if self.password_flag {
521 flags |= 0x40;
522 }
523 if self.will_retain {
524 flags |= 0x20;
525 }
526 flags |= (self.will_qos & 0x03) << 3;
527 if self.will_flag {
528 flags |= 0x04;
529 }
530 if self.clean_session {
531 flags |= 0x02;
532 }
533 buf.push(flags);
534
535 buf.extend_from_slice(&self.keep_alive.to_be_bytes());
537
538 buf.extend_from_slice(&(self.client_id.len() as u16).to_be_bytes());
540 buf.extend_from_slice(&self.client_id);
541
542 if self.will_flag {
544 buf.extend_from_slice(&(self.will_topic.len() as u16).to_be_bytes());
545 buf.extend_from_slice(&self.will_topic);
546 buf.extend_from_slice(&(self.will_msg.len() as u16).to_be_bytes());
547 buf.extend_from_slice(&self.will_msg);
548 }
549
550 if self.username_flag {
552 buf.extend_from_slice(&(self.username.len() as u16).to_be_bytes());
553 buf.extend_from_slice(&self.username);
554 }
555
556 if self.password_flag {
558 buf.extend_from_slice(&(self.password.len() as u16).to_be_bytes());
559 buf.extend_from_slice(&self.password);
560 }
561 }
562
563 fn build_connack(&self, buf: &mut Vec<u8>) {
564 buf.push(self.sess_present & 0x01);
565 buf.push(self.ret_code);
566 }
567
568 fn build_publish(&self, buf: &mut Vec<u8>) {
569 buf.extend_from_slice(&(self.topic.len() as u16).to_be_bytes());
571 buf.extend_from_slice(&self.topic);
572
573 if self.qos > 0 {
575 buf.extend_from_slice(&self.msg_id.to_be_bytes());
576 }
577
578 buf.extend_from_slice(&self.value);
580 }
581
582 fn build_subscribe(&self, buf: &mut Vec<u8>) {
583 buf.extend_from_slice(&self.msg_id.to_be_bytes());
585
586 for (filter, qos) in &self.topics {
588 buf.extend_from_slice(&(filter.len() as u16).to_be_bytes());
589 buf.extend_from_slice(filter);
590 buf.push(*qos);
591 }
592 }
593
594 fn build_suback(&self, buf: &mut Vec<u8>) {
595 buf.extend_from_slice(&self.msg_id.to_be_bytes());
597
598 buf.extend_from_slice(&self.retcodes);
600 }
601}
602
603#[cfg(test)]
604mod tests {
605 use super::*;
606
607 #[test]
608 fn test_default_pingreq() {
609 let pkt = MqttBuilder::new().build();
610 assert_eq!(pkt, b"\xc0\x00", "default PINGREQ should be 0xC0 0x00");
611 }
612
613 #[test]
614 fn test_pingresp() {
615 let pkt = MqttBuilder::new().pingresp().build();
616 assert_eq!(pkt, b"\xd0\x00");
617 }
618
619 #[test]
620 fn test_disconnect() {
621 let pkt = MqttBuilder::new().disconnect().build();
622 assert_eq!(pkt, b"\xe0\x00");
623 }
624
625 #[test]
626 fn test_connack() {
627 let pkt = MqttBuilder::new()
628 .connack()
629 .sess_present(0)
630 .ret_code(0)
631 .build();
632 assert_eq!(pkt, b"\x20\x02\x00\x00");
633 }
634
635 #[test]
636 fn test_connack_with_retcode() {
637 let pkt = MqttBuilder::new().connack().ret_code(5).build();
638 assert_eq!(pkt.len(), 4);
639 assert_eq!(pkt[0], 0x20); assert_eq!(pkt[1], 0x02); assert_eq!(pkt[2], 0x00); assert_eq!(pkt[3], 0x05); }
644
645 #[test]
646 fn test_publish_qos0() {
647 let pkt = MqttBuilder::new()
648 .publish()
649 .topic(b"test".to_vec())
650 .payload(b"test".to_vec())
651 .build();
652 assert_eq!(pkt, b"\x30\x0a\x00\x04test\x74\x65\x73\x74");
657 assert_eq!(pkt[0], 0x30);
659 assert_eq!(pkt[1], 0x0a);
660 assert_eq!(pkt.len(), 12);
661 }
662
663 #[test]
664 fn test_publish_qos1() {
665 let pkt = MqttBuilder::new()
666 .publish()
667 .qos(1)
668 .topic(b"test".to_vec())
669 .msg_id(10)
670 .payload(b"data".to_vec())
671 .build();
672 assert_eq!(pkt[0], 0x32);
675 assert_eq!(pkt[1], 12);
676 assert_eq!(&pkt[2..4], &[0x00, 0x04]);
678 assert_eq!(&pkt[4..8], b"test");
679 assert_eq!(&pkt[8..10], &[0x00, 0x0a]);
681 assert_eq!(&pkt[10..14], b"data");
683 }
684
685 #[test]
686 fn test_connect_default() {
687 let pkt = MqttBuilder::new()
688 .connect()
689 .client_id(b"test")
690 .clean_session(true)
691 .keep_alive(60)
692 .build();
693
694 assert_eq!(pkt[0], 0x10);
696 assert_eq!(pkt[1], 16);
698 assert_eq!(&pkt[2..4], &[0x00, 0x04]);
700 assert_eq!(&pkt[4..8], b"MQTT");
701 assert_eq!(pkt[8], 4);
703 assert_eq!(pkt[9], 0x02);
705 assert_eq!(&pkt[10..12], &[0x00, 0x3c]);
707 assert_eq!(&pkt[12..14], &[0x00, 0x04]);
709 assert_eq!(&pkt[14..18], b"test");
710 }
711
712 #[test]
713 fn test_connect_mqisdp() {
714 let pkt = MqttBuilder::new()
716 .connect()
717 .proto_name(b"MQIsdp".to_vec())
718 .proto_level(3)
719 .clean_session(true)
720 .keep_alive(60)
721 .client_id(b"mosqpub/1440-kali".to_vec())
722 .build();
723
724 assert_eq!(pkt[0], 0x10); assert_eq!(pkt[1], 0x1f);
727 assert_eq!(&pkt[2..4], &[0x00, 0x06]);
728 assert_eq!(&pkt[4..10], b"MQIsdp");
729 assert_eq!(pkt[10], 3); assert_eq!(pkt[11], 0x02); assert_eq!(&pkt[12..14], &[0x00, 0x3c]); assert_eq!(&pkt[14..16], &[0x00, 0x11]); assert_eq!(&pkt[16..33], b"mosqpub/1440-kali");
734 }
735
736 #[test]
737 fn test_subscribe() {
738 let pkt = MqttBuilder::new()
739 .subscribe()
740 .msg_id(1)
741 .add_topic(b"test", 1)
742 .build();
743 assert_eq!(pkt[0], 0x82);
745 assert_eq!(pkt[1], 9);
747 assert_eq!(&pkt[2..4], &[0x00, 0x01]);
749 assert_eq!(&pkt[4..6], &[0x00, 0x04]);
751 assert_eq!(&pkt[6..10], b"test");
753 assert_eq!(pkt[10], 0x01);
755 }
756
757 #[test]
758 fn test_suback() {
759 let pkt = MqttBuilder::new()
760 .suback()
761 .msg_id(1)
762 .retcodes(vec![0x00])
763 .build();
764 assert_eq!(pkt, b"\x90\x03\x00\x01\x00");
766 }
767
768 #[test]
769 fn test_puback() {
770 let pkt = MqttBuilder::new().puback().msg_id(10).build();
771 assert_eq!(pkt, b"\x40\x02\x00\x0a");
772 }
773
774 #[test]
775 fn test_packet_size() {
776 let b = MqttBuilder::new(); assert_eq!(b.packet_size(), 2);
778 assert_eq!(b.header_size(), 2);
779 assert_eq!(b.remaining_size(), 0);
780 }
781}