1use crate::{Event, Incoming, Outgoing, Request};
2
3use crate::mqttbytes::v4::*;
4use crate::mqttbytes::{self, *};
5use bytes::BytesMut;
6use std::collections::VecDeque;
7use std::{io, mem, time::Instant};
8
9#[derive(Debug, thiserror::Error)]
11pub enum StateError {
12 #[error("Io error {0:?}")]
14 Io(#[from] io::Error),
15 #[error("Connect return code `{0:?}`")]
17 Connect(ConnectReturnCode),
18 #[error("Invalid state for a given operation")]
20 InvalidState,
21 #[error("Received unsolicited ack pkid {0}")]
23 Unsolicited(u16),
24 #[error("Last pingreq isn't acked")]
26 AwaitPingResp,
27 #[error("Received a wrong packet while waiting for another packet")]
29 WrongPacket,
30 #[error("Timeout while waiting to resolve collision")]
31 CollisionTimeout,
32 #[error("Mqtt serialization/deserialization error")]
33 Deserialization(mqttbytes::Error),
34}
35
36impl From<mqttbytes::Error> for StateError {
37 fn from(e: mqttbytes::Error) -> StateError {
38 StateError::Deserialization(e)
39 }
40}
41
42#[derive(Debug, Clone)]
49pub struct MqttState {
50 pub await_pingresp: bool,
52 pub collision_ping_count: usize,
56 last_incoming: Instant,
58 last_outgoing: Instant,
60 pub(crate) last_pkid: u16,
62 pub(crate) inflight: u16,
64 pub(crate) max_inflight: u16,
66 pub(crate) outgoing_pub: Vec<Option<Publish>>,
68 pub(crate) outgoing_rel: Vec<Option<u16>>,
70 pub(crate) incoming_pub: Vec<Option<u16>>,
72 pub collision: Option<Publish>,
74 pub events: VecDeque<Event>,
76 pub write: BytesMut,
78 pub manual_acks: bool,
80}
81
82impl MqttState {
83 pub fn new(max_inflight: u16, manual_acks: bool) -> Self {
87 MqttState {
88 await_pingresp: false,
89 collision_ping_count: 0,
90 last_incoming: Instant::now(),
91 last_outgoing: Instant::now(),
92 last_pkid: 0,
93 inflight: 0,
94 max_inflight,
95 outgoing_pub: vec![None; max_inflight as usize + 1],
97 outgoing_rel: vec![None; max_inflight as usize + 1],
98 incoming_pub: vec![None; std::u16::MAX as usize + 1],
99 collision: None,
100 events: VecDeque::with_capacity(100),
102 write: BytesMut::with_capacity(10 * 1024),
103 manual_acks,
104 }
105 }
106
107 pub fn clean(&mut self) -> Vec<Request> {
109 let mut pending = Vec::with_capacity(100);
110 for publish in self.outgoing_pub.iter_mut() {
112 if let Some(publish) = publish.take() {
113 let request = Request::Publish(publish);
114 pending.push(request);
115 }
116 }
117
118 for rel in self.outgoing_rel.iter_mut() {
120 if let Some(pkid) = rel.take() {
121 let request = Request::PubRel(PubRel::new(pkid));
122 pending.push(request);
123 }
124 }
125
126 for id in self.incoming_pub.iter_mut() {
128 id.take();
129 }
130
131 self.await_pingresp = false;
132 self.collision_ping_count = 0;
133 self.inflight = 0;
134 pending
135 }
136
137 pub fn inflight(&self) -> u16 {
138 self.inflight
139 }
140
141 pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
144 match request {
145 Request::Publish(publish) => self.outgoing_publish(publish)?,
146 Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?,
147 Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?,
148 Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?,
149 Request::PingReq => self.outgoing_ping()?,
150 Request::Disconnect => self.outgoing_disconnect()?,
151 Request::PubAck(puback) => self.outgoing_puback(puback)?,
152 Request::PubRec(pubrec) => self.outgoing_pubrec(pubrec)?,
153 _ => unimplemented!(),
154 };
155
156 self.last_outgoing = Instant::now();
157 Ok(())
158 }
159
160 pub fn handle_incoming_packet(&mut self, packet: Incoming) -> Result<(), StateError> {
165 let out = match &packet {
166 Incoming::PingResp => self.handle_incoming_pingresp(),
167 Incoming::Publish(publish) => self.handle_incoming_publish(publish),
168 Incoming::SubAck(_suback) => self.handle_incoming_suback(),
169 Incoming::UnsubAck(_unsuback) => self.handle_incoming_unsuback(),
170 Incoming::PubAck(puback) => self.handle_incoming_puback(puback),
171 Incoming::PubRec(pubrec) => self.handle_incoming_pubrec(pubrec),
172 Incoming::PubRel(pubrel) => self.handle_incoming_pubrel(pubrel),
173 Incoming::PubComp(pubcomp) => self.handle_incoming_pubcomp(pubcomp),
174 _ => {
175 error!("Invalid incoming packet = {:?}", packet);
176 return Err(StateError::WrongPacket);
177 }
178 };
179
180 out?;
181 self.events.push_back(Event::Incoming(packet));
182 self.last_incoming = Instant::now();
183 Ok(())
184 }
185
186 fn handle_incoming_suback(&mut self) -> Result<(), StateError> {
187 Ok(())
188 }
189
190 fn handle_incoming_unsuback(&mut self) -> Result<(), StateError> {
191 Ok(())
192 }
193
194 fn handle_incoming_publish(&mut self, publish: &Publish) -> Result<(), StateError> {
197 let qos = publish.qos;
198
199 match qos {
200 QoS::AtMostOnce => Ok(()),
201 QoS::AtLeastOnce => {
202 if !self.manual_acks {
203 let puback = PubAck::new(publish.pkid);
204 self.outgoing_puback(puback)?
205 }
206 Ok(())
207 }
208 QoS::ExactlyOnce => {
209 let pkid = publish.pkid;
210 self.incoming_pub[pkid as usize] = Some(pkid);
211 if !self.manual_acks {
212 let pubrec = PubRec::new(pkid);
213 self.outgoing_pubrec(pubrec)?;
214 }
215 Ok(())
216 }
217 }
218 }
219
220 fn handle_incoming_puback(&mut self, puback: &PubAck) -> Result<(), StateError> {
221 let v = match mem::replace(&mut self.outgoing_pub[puback.pkid as usize], None) {
222 Some(_) => {
223 self.inflight -= 1;
224 Ok(())
225 }
226 None => {
227 error!("Unsolicited puback packet: {:?}", puback.pkid);
228 Err(StateError::Unsolicited(puback.pkid))
229 }
230 };
231
232 if let Some(publish) = self.check_collision(puback.pkid) {
233 self.outgoing_pub[publish.pkid as usize] = Some(publish.clone());
234 self.inflight += 1;
235
236 publish.write(&mut self.write)?;
237 let event = Event::Outgoing(Outgoing::Publish(publish.pkid, publish.topic));
238 self.events.push_back(event);
239 self.collision_ping_count = 0;
240 }
241
242 v
243 }
244
245 fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result<(), StateError> {
246 match mem::replace(&mut self.outgoing_pub[pubrec.pkid as usize], None) {
247 Some(_) => {
248 self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
250 PubRel::new(pubrec.pkid).write(&mut self.write)?;
251
252 let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
253 self.events.push_back(event);
254 Ok(())
255 }
256 None => {
257 error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
258 Err(StateError::Unsolicited(pubrec.pkid))
259 }
260 }
261 }
262
263 fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<(), StateError> {
264 match mem::replace(&mut self.incoming_pub[pubrel.pkid as usize], None) {
265 Some(_) => {
266 PubComp::new(pubrel.pkid).write(&mut self.write)?;
267 let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
268 self.events.push_back(event);
269 Ok(())
270 }
271 None => {
272 error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
273 Err(StateError::Unsolicited(pubrel.pkid))
274 }
275 }
276 }
277
278 fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<(), StateError> {
279 if let Some(publish) = self.check_collision(pubcomp.pkid) {
280 publish.write(&mut self.write)?;
281 let event = Event::Outgoing(Outgoing::Publish(publish.pkid, publish.topic));
282 self.events.push_back(event);
283 self.collision_ping_count = 0;
284 }
285
286 match mem::replace(&mut self.outgoing_rel[pubcomp.pkid as usize], None) {
287 Some(_) => {
288 self.inflight -= 1;
289 Ok(())
290 }
291 None => {
292 error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
293 Err(StateError::Unsolicited(pubcomp.pkid))
294 }
295 }
296 }
297
298 fn handle_incoming_pingresp(&mut self) -> Result<(), StateError> {
299 self.await_pingresp = false;
300 Ok(())
301 }
302
303 fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> {
306 if publish.qos != QoS::AtMostOnce {
307 if publish.pkid == 0 {
308 publish.pkid = self.next_pkid();
309 }
310
311 let pkid = publish.pkid;
312 if self
313 .outgoing_pub
314 .get(publish.pkid as usize)
315 .unwrap()
316 .is_some()
317 {
318 info!("Collision on packet id = {:?}", publish.pkid);
319 self.collision = Some(publish);
320 let event = Event::Outgoing(Outgoing::AwaitAck(pkid));
321 self.events.push_back(event);
322 return Ok(());
323 }
324
325 self.outgoing_pub[pkid as usize] = Some(publish.clone());
328 self.inflight += 1;
329 };
330
331 debug!(
332 "Publish. Topic = {}, Pkid = {:?}, Payload Size = {:?}",
333 publish.topic,
334 publish.pkid,
335 publish.payload.len()
336 );
337
338 publish.write(&mut self.write)?;
339 let event = Event::Outgoing(Outgoing::Publish(publish.pkid, publish.topic));
340 self.events.push_back(event);
341 Ok(())
342 }
343
344 fn outgoing_pubrel(&mut self, pubrel: PubRel) -> Result<(), StateError> {
345 let pubrel = self.save_pubrel(pubrel)?;
346
347 debug!("Pubrel. Pkid = {}", pubrel.pkid);
348 PubRel::new(pubrel.pkid).write(&mut self.write)?;
349
350 let event = Event::Outgoing(Outgoing::PubRel(pubrel.pkid));
351 self.events.push_back(event);
352 Ok(())
353 }
354
355 fn outgoing_puback(&mut self, puback: PubAck) -> Result<(), StateError> {
356 puback.write(&mut self.write)?;
357 let event = Event::Outgoing(Outgoing::PubAck(puback.pkid));
358 self.events.push_back(event);
359 Ok(())
360 }
361
362 fn outgoing_pubrec(&mut self, pubrec: PubRec) -> Result<(), StateError> {
363 pubrec.write(&mut self.write)?;
364 let event = Event::Outgoing(Outgoing::PubRec(pubrec.pkid));
365 self.events.push_back(event);
366 Ok(())
367 }
368
369 fn outgoing_ping(&mut self) -> Result<(), StateError> {
373 let elapsed_in = self.last_incoming.elapsed();
374 let elapsed_out = self.last_outgoing.elapsed();
375
376 if self.collision.is_some() {
377 self.collision_ping_count += 1;
378 if self.collision_ping_count >= 2 {
379 return Err(StateError::CollisionTimeout);
380 }
381 }
382
383 if self.await_pingresp {
385 return Err(StateError::AwaitPingResp);
386 }
387
388 self.await_pingresp = true;
389
390 debug!(
391 "Pingreq,
392 last incoming packet before {} millisecs,
393 last outgoing request before {} millisecs",
394 elapsed_in.as_millis(),
395 elapsed_out.as_millis()
396 );
397
398 PingReq.write(&mut self.write)?;
399 let event = Event::Outgoing(Outgoing::PingReq);
400 self.events.push_back(event);
401 Ok(())
402 }
403
404 fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> {
405 let pkid = self.next_pkid();
406 subscription.pkid = pkid;
407
408 debug!(
409 "Subscribe. Topics = {:?}, Pkid = {:?}",
410 subscription.filters, subscription.pkid
411 );
412
413 subscription.write(&mut self.write)?;
414 let event = Event::Outgoing(Outgoing::Subscribe(subscription.pkid));
415 self.events.push_back(event);
416 Ok(())
417 }
418
419 fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> {
420 let pkid = self.next_pkid();
421 unsub.pkid = pkid;
422
423 debug!(
424 "Unsubscribe. Topics = {:?}, Pkid = {:?}",
425 unsub.topics, unsub.pkid
426 );
427
428 unsub.write(&mut self.write)?;
429 let event = Event::Outgoing(Outgoing::Unsubscribe(unsub.pkid));
430 self.events.push_back(event);
431 Ok(())
432 }
433
434 fn outgoing_disconnect(&mut self) -> Result<(), StateError> {
435 debug!("Disconnect");
436
437 Disconnect.write(&mut self.write)?;
438 let event = Event::Outgoing(Outgoing::Disconnect);
439 self.events.push_back(event);
440 Ok(())
441 }
442
443 fn check_collision(&mut self, pkid: u16) -> Option<Publish> {
444 if let Some(publish) = &self.collision {
445 if publish.pkid == pkid {
446 return self.collision.take();
447 }
448 }
449
450 None
451 }
452
453 fn save_pubrel(&mut self, mut pubrel: PubRel) -> Result<PubRel, StateError> {
454 let pubrel = match pubrel.pkid {
455 0 => {
457 pubrel.pkid = self.next_pkid();
458 pubrel
459 }
460 _ => pubrel,
461 };
462
463 self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
464 Ok(pubrel)
465 }
466
467 fn next_pkid(&mut self) -> u16 {
471 let next_pkid = self.last_pkid + 1;
472
473 if next_pkid == self.max_inflight {
478 self.last_pkid = 0;
479 return next_pkid;
480 }
481
482 self.last_pkid = next_pkid;
483 next_pkid
484 }
485}
486
487#[cfg(test)]
488mod test {
489 use super::{MqttState, StateError};
490 use crate::mqttbytes::v4::*;
491 use crate::mqttbytes::*;
492 use crate::{Event, Incoming, MqttOptions, Outgoing, Request};
493
494 fn build_outgoing_publish(qos: QoS) -> Publish {
495 let topic = "hello/world".to_owned();
496 let payload = vec![1, 2, 3];
497
498 let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
499 publish.qos = qos;
500 publish
501 }
502
503 fn build_incoming_publish(qos: QoS, pkid: u16) -> Publish {
504 let topic = "hello/world".to_owned();
505 let payload = vec![1, 2, 3];
506
507 let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
508 publish.pkid = pkid;
509 publish.qos = qos;
510 publish
511 }
512
513 fn build_mqttstate() -> MqttState {
514 MqttState::new(100, false)
515 }
516
517 #[test]
518 fn next_pkid_increments_as_expected() {
519 let mut mqtt = build_mqttstate();
520
521 for i in 1..=100 {
522 let pkid = mqtt.next_pkid();
523
524 let expected = i % 100;
526 if expected == 0 {
527 break;
528 }
529
530 assert_eq!(expected, pkid);
531 }
532 }
533
534 #[test]
535 fn outgoing_publish_should_set_pkid_and_add_publish_to_queue() {
536 let mut mqtt = build_mqttstate();
537
538 let publish = build_outgoing_publish(QoS::AtMostOnce);
540
541 mqtt.outgoing_publish(publish).unwrap();
543 assert_eq!(mqtt.last_pkid, 0);
544 assert_eq!(mqtt.inflight, 0);
545
546 let publish = build_outgoing_publish(QoS::AtLeastOnce);
548
549 mqtt.outgoing_publish(publish.clone()).unwrap();
551 assert_eq!(mqtt.last_pkid, 1);
552 assert_eq!(mqtt.inflight, 1);
553
554 mqtt.outgoing_publish(publish).unwrap();
556 assert_eq!(mqtt.last_pkid, 2);
557 assert_eq!(mqtt.inflight, 2);
558
559 let publish = build_outgoing_publish(QoS::ExactlyOnce);
561
562 mqtt.outgoing_publish(publish.clone()).unwrap();
564 assert_eq!(mqtt.last_pkid, 3);
565 assert_eq!(mqtt.inflight, 3);
566
567 mqtt.outgoing_publish(publish).unwrap();
569 assert_eq!(mqtt.last_pkid, 4);
570 assert_eq!(mqtt.inflight, 4);
571 }
572
573 #[test]
574 fn incoming_publish_should_be_added_to_queue_correctly() {
575 let mut mqtt = build_mqttstate();
576
577 let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
579 let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
580 let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
581
582 mqtt.handle_incoming_publish(&publish1).unwrap();
583 mqtt.handle_incoming_publish(&publish2).unwrap();
584 mqtt.handle_incoming_publish(&publish3).unwrap();
585
586 let pkid = mqtt.incoming_pub[3].unwrap();
587
588 assert_eq!(pkid, 3);
590 }
591
592 #[test]
593 fn incoming_publish_should_be_acked() {
594 let mut mqtt = build_mqttstate();
595
596 let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
598 let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
599 let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
600
601 mqtt.handle_incoming_publish(&publish1).unwrap();
602 mqtt.handle_incoming_publish(&publish2).unwrap();
603 mqtt.handle_incoming_publish(&publish3).unwrap();
604
605 if let Event::Outgoing(Outgoing::PubAck(pkid)) = mqtt.events[0] {
606 assert_eq!(pkid, 2);
607 } else {
608 panic!("missing puback")
609 }
610
611 if let Event::Outgoing(Outgoing::PubRec(pkid)) = mqtt.events[1] {
612 assert_eq!(pkid, 3);
613 } else {
614 panic!("missing PubRec")
615 }
616 }
617
618 #[test]
619 fn incoming_publish_should_not_be_acked_with_manual_acks() {
620 let mut mqtt = build_mqttstate();
621 mqtt.manual_acks = true;
622
623 let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
625 let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
626 let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
627
628 mqtt.handle_incoming_publish(&publish1).unwrap();
629 mqtt.handle_incoming_publish(&publish2).unwrap();
630 mqtt.handle_incoming_publish(&publish3).unwrap();
631
632 let pkid = mqtt.incoming_pub[3].unwrap();
633 assert_eq!(pkid, 3);
634
635 assert!(mqtt.events.is_empty());
636 }
637
638 #[test]
639 fn incoming_qos2_publish_should_send_rec_to_network_and_publish_to_user() {
640 let mut mqtt = build_mqttstate();
641 let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
642
643 mqtt.handle_incoming_publish(&publish).unwrap();
644 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
645 match packet {
646 Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
647 _ => panic!("Invalid network request: {:?}", packet),
648 }
649 }
650
651 #[test]
652 fn incoming_puback_should_remove_correct_publish_from_queue() {
653 let mut mqtt = build_mqttstate();
654
655 let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
656 let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
657
658 mqtt.outgoing_publish(publish1).unwrap();
659 mqtt.outgoing_publish(publish2).unwrap();
660 assert_eq!(mqtt.inflight, 2);
661
662 mqtt.handle_incoming_puback(&PubAck::new(1)).unwrap();
663 assert_eq!(mqtt.inflight, 1);
664
665 mqtt.handle_incoming_puback(&PubAck::new(2)).unwrap();
666 assert_eq!(mqtt.inflight, 0);
667
668 assert!(mqtt.outgoing_pub[1].is_none());
669 assert!(mqtt.outgoing_pub[2].is_none());
670 }
671
672 #[test]
673 fn incoming_pubrec_should_release_publish_from_queue_and_add_relid_to_rel_queue() {
674 let mut mqtt = build_mqttstate();
675
676 let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
677 let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
678
679 let _publish_out = mqtt.outgoing_publish(publish1);
680 let _publish_out = mqtt.outgoing_publish(publish2);
681
682 mqtt.handle_incoming_pubrec(&PubRec::new(2)).unwrap();
683 assert_eq!(mqtt.inflight, 2);
684
685 let backup = mqtt.outgoing_pub[1].clone();
687 assert_eq!(backup.unwrap().pkid, 1);
688
689 assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
691 }
692
693 #[test]
694 fn incoming_pubrec_should_send_release_to_network_and_nothing_to_user() {
695 let mut mqtt = build_mqttstate();
696
697 let publish = build_outgoing_publish(QoS::ExactlyOnce);
698 mqtt.outgoing_publish(publish).unwrap();
699 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
700 match packet {
701 Packet::Publish(publish) => assert_eq!(publish.pkid, 1),
702 packet => panic!("Invalid network request: {:?}", packet),
703 }
704
705 mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
706 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
707 match packet {
708 Packet::PubRel(pubrel) => assert_eq!(pubrel.pkid, 1),
709 packet => panic!("Invalid network request: {:?}", packet),
710 }
711 }
712
713 #[test]
714 fn incoming_pubrel_should_send_comp_to_network_and_nothing_to_user() {
715 let mut mqtt = build_mqttstate();
716 let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
717
718 mqtt.handle_incoming_publish(&publish).unwrap();
719 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
720 match packet {
721 Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
722 packet => panic!("Invalid network request: {:?}", packet),
723 }
724
725 mqtt.handle_incoming_pubrel(&PubRel::new(1)).unwrap();
726 let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
727 match packet {
728 Packet::PubComp(pubcomp) => assert_eq!(pubcomp.pkid, 1),
729 packet => panic!("Invalid network request: {:?}", packet),
730 }
731 }
732
733 #[test]
734 fn incoming_pubcomp_should_release_correct_pkid_from_release_queue() {
735 let mut mqtt = build_mqttstate();
736 let publish = build_outgoing_publish(QoS::ExactlyOnce);
737
738 mqtt.outgoing_publish(publish).unwrap();
739 mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
740
741 mqtt.handle_incoming_pubcomp(&PubComp::new(1)).unwrap();
742 assert_eq!(mqtt.inflight, 0);
743 }
744
745 #[test]
746 fn outgoing_ping_handle_should_throw_errors_for_no_pingresp() {
747 let mut mqtt = build_mqttstate();
748 let mut opts = MqttOptions::new("test", "localhost", 1883);
749 opts.set_keep_alive(std::time::Duration::from_secs(10));
750 mqtt.outgoing_ping().unwrap();
751
752 let publish = build_outgoing_publish(QoS::AtLeastOnce);
754 mqtt.handle_outgoing_packet(Request::Publish(publish))
755 .unwrap();
756 mqtt.handle_incoming_packet(Incoming::PubAck(PubAck::new(1)))
757 .unwrap();
758
759 match mqtt.outgoing_ping() {
761 Ok(_) => panic!("Should throw pingresp await error"),
762 Err(StateError::AwaitPingResp) => (),
763 Err(e) => panic!("Should throw pingresp await error. Error = {:?}", e),
764 }
765 }
766
767 #[test]
768 fn outgoing_ping_handle_should_succeed_if_pingresp_is_received() {
769 let mut mqtt = build_mqttstate();
770
771 let mut opts = MqttOptions::new("test", "localhost", 1883);
772 opts.set_keep_alive(std::time::Duration::from_secs(10));
773
774 mqtt.outgoing_ping().unwrap();
776 mqtt.handle_incoming_packet(Incoming::PingResp).unwrap();
777
778 mqtt.outgoing_ping().unwrap();
780 }
781}