spotflow_rumqttc_fork/
state.rs

1use crate::{Event, Incoming, Outgoing, Request};
2
3use crate::mqttbytes::v4::*;
4use crate::mqttbytes::{self, *};
5use bytes::BytesMut;
6use std::collections::VecDeque;
7use std::{io, mem, time::Instant};
8
9/// Errors during state handling
10#[derive(Debug, thiserror::Error)]
11pub enum StateError {
12    /// Io Error while state is passed to network
13    #[error("Io error {0:?}")]
14    Io(#[from] io::Error),
15    /// Broker's error reply to client's connect packet
16    #[error("Connect return code `{0:?}`")]
17    Connect(ConnectReturnCode),
18    /// Invalid state for a given operation
19    #[error("Invalid state for a given operation")]
20    InvalidState,
21    /// Received a packet (ack) which isn't asked for
22    #[error("Received unsolicited ack pkid {0}")]
23    Unsolicited(u16),
24    /// Last pingreq isn't acked
25    #[error("Last pingreq isn't acked")]
26    AwaitPingResp,
27    /// Received a wrong packet while waiting for another packet
28    #[error("Received a wrong packet while waiting for another packet")]
29    WrongPacket,
30    #[error("Timeout while waiting to resolve collision")]
31    CollisionTimeout,
32    #[error("Mqtt serialization/deserialization error")]
33    Deserialization(mqttbytes::Error),
34}
35
36impl From<mqttbytes::Error> for StateError {
37    fn from(e: mqttbytes::Error) -> StateError {
38        StateError::Deserialization(e)
39    }
40}
41
42/// State of the mqtt connection.
43// Design: Methods will just modify the state of the object without doing any network operations
44// Design: All inflight queues are maintained in a pre initialized vec with index as packet id.
45// This is done for 2 reasons
46// Bad acks or out of order acks aren't O(n) causing cpu spikes
47// Any missing acks from the broker are detected during the next recycled use of packet ids
48#[derive(Debug, Clone)]
49pub struct MqttState {
50    /// Status of last ping
51    pub await_pingresp: bool,
52    /// Collision ping count. Collisions stop user requests
53    /// which inturn trigger pings. Multiple pings without
54    /// resolving collisions will result in error
55    pub collision_ping_count: usize,
56    /// Last incoming packet time
57    last_incoming: Instant,
58    /// Last outgoing packet time
59    last_outgoing: Instant,
60    /// Packet id of the last outgoing packet
61    pub(crate) last_pkid: u16,
62    /// Number of outgoing inflight publishes
63    pub(crate) inflight: u16,
64    /// Maximum number of allowed inflight
65    pub(crate) max_inflight: u16,
66    /// Outgoing QoS 1, 2 publishes which aren't acked yet
67    pub(crate) outgoing_pub: Vec<Option<Publish>>,
68    /// Packet ids of released QoS 2 publishes
69    pub(crate) outgoing_rel: Vec<Option<u16>>,
70    /// Packet ids on incoming QoS 2 publishes
71    pub(crate) incoming_pub: Vec<Option<u16>>,
72    /// Last collision due to broker not acking in order
73    pub collision: Option<Publish>,
74    /// Buffered incoming packets
75    pub events: VecDeque<Event>,
76    /// Write buffer
77    pub write: BytesMut,
78    /// Indicates if acknowledgements should be send immediately
79    pub manual_acks: bool,
80}
81
82impl MqttState {
83    /// Creates new mqtt state. Same state should be used during a
84    /// connection for persistent sessions while new state should
85    /// instantiated for clean sessions
86    pub fn new(max_inflight: u16, manual_acks: bool) -> Self {
87        MqttState {
88            await_pingresp: false,
89            collision_ping_count: 0,
90            last_incoming: Instant::now(),
91            last_outgoing: Instant::now(),
92            last_pkid: 0,
93            inflight: 0,
94            max_inflight,
95            // index 0 is wasted as 0 is not a valid packet id
96            outgoing_pub: vec![None; max_inflight as usize + 1],
97            outgoing_rel: vec![None; max_inflight as usize + 1],
98            incoming_pub: vec![None; std::u16::MAX as usize + 1],
99            collision: None,
100            // TODO: Optimize these sizes later
101            events: VecDeque::with_capacity(100),
102            write: BytesMut::with_capacity(10 * 1024),
103            manual_acks,
104        }
105    }
106
107    /// Returns inflight outgoing packets and clears internal queues
108    pub fn clean(&mut self) -> Vec<Request> {
109        let mut pending = Vec::with_capacity(100);
110        // remove and collect pending publishes
111        for publish in self.outgoing_pub.iter_mut() {
112            if let Some(publish) = publish.take() {
113                let request = Request::Publish(publish);
114                pending.push(request);
115            }
116        }
117
118        // remove and collect pending releases
119        for rel in self.outgoing_rel.iter_mut() {
120            if let Some(pkid) = rel.take() {
121                let request = Request::PubRel(PubRel::new(pkid));
122                pending.push(request);
123            }
124        }
125
126        // remove packed ids of incoming qos2 publishes
127        for id in self.incoming_pub.iter_mut() {
128            id.take();
129        }
130
131        self.await_pingresp = false;
132        self.collision_ping_count = 0;
133        self.inflight = 0;
134        pending
135    }
136
137    pub fn inflight(&self) -> u16 {
138        self.inflight
139    }
140
141    /// Consolidates handling of all outgoing mqtt packet logic. Returns a packet which should
142    /// be put on to the network by the eventloop
143    pub fn handle_outgoing_packet(&mut self, request: Request) -> Result<(), StateError> {
144        match request {
145            Request::Publish(publish) => self.outgoing_publish(publish)?,
146            Request::PubRel(pubrel) => self.outgoing_pubrel(pubrel)?,
147            Request::Subscribe(subscribe) => self.outgoing_subscribe(subscribe)?,
148            Request::Unsubscribe(unsubscribe) => self.outgoing_unsubscribe(unsubscribe)?,
149            Request::PingReq => self.outgoing_ping()?,
150            Request::Disconnect => self.outgoing_disconnect()?,
151            Request::PubAck(puback) => self.outgoing_puback(puback)?,
152            Request::PubRec(pubrec) => self.outgoing_pubrec(pubrec)?,
153            _ => unimplemented!(),
154        };
155
156        self.last_outgoing = Instant::now();
157        Ok(())
158    }
159
160    /// Consolidates handling of all incoming mqtt packets. Returns a `Notification` which for the
161    /// user to consume and `Packet` which for the eventloop to put on the network
162    /// E.g For incoming QoS1 publish packet, this method returns (Publish, Puback). Publish packet will
163    /// be forwarded to user and Pubck packet will be written to network
164    pub fn handle_incoming_packet(&mut self, packet: Incoming) -> Result<(), StateError> {
165        let out = match &packet {
166            Incoming::PingResp => self.handle_incoming_pingresp(),
167            Incoming::Publish(publish) => self.handle_incoming_publish(publish),
168            Incoming::SubAck(_suback) => self.handle_incoming_suback(),
169            Incoming::UnsubAck(_unsuback) => self.handle_incoming_unsuback(),
170            Incoming::PubAck(puback) => self.handle_incoming_puback(puback),
171            Incoming::PubRec(pubrec) => self.handle_incoming_pubrec(pubrec),
172            Incoming::PubRel(pubrel) => self.handle_incoming_pubrel(pubrel),
173            Incoming::PubComp(pubcomp) => self.handle_incoming_pubcomp(pubcomp),
174            _ => {
175                error!("Invalid incoming packet = {:?}", packet);
176                return Err(StateError::WrongPacket);
177            }
178        };
179
180        out?;
181        self.events.push_back(Event::Incoming(packet));
182        self.last_incoming = Instant::now();
183        Ok(())
184    }
185
186    fn handle_incoming_suback(&mut self) -> Result<(), StateError> {
187        Ok(())
188    }
189
190    fn handle_incoming_unsuback(&mut self) -> Result<(), StateError> {
191        Ok(())
192    }
193
194    /// Results in a publish notification in all the QoS cases. Replys with an ack
195    /// in case of QoS1 and Replys rec in case of QoS while also storing the message
196    fn handle_incoming_publish(&mut self, publish: &Publish) -> Result<(), StateError> {
197        let qos = publish.qos;
198
199        match qos {
200            QoS::AtMostOnce => Ok(()),
201            QoS::AtLeastOnce => {
202                if !self.manual_acks {
203                    let puback = PubAck::new(publish.pkid);
204                    self.outgoing_puback(puback)?
205                }
206                Ok(())
207            }
208            QoS::ExactlyOnce => {
209                let pkid = publish.pkid;
210                self.incoming_pub[pkid as usize] = Some(pkid);
211                if !self.manual_acks {
212                    let pubrec = PubRec::new(pkid);
213                    self.outgoing_pubrec(pubrec)?;
214                }
215                Ok(())
216            }
217        }
218    }
219
220    fn handle_incoming_puback(&mut self, puback: &PubAck) -> Result<(), StateError> {
221        let v = match mem::replace(&mut self.outgoing_pub[puback.pkid as usize], None) {
222            Some(_) => {
223                self.inflight -= 1;
224                Ok(())
225            }
226            None => {
227                error!("Unsolicited puback packet: {:?}", puback.pkid);
228                Err(StateError::Unsolicited(puback.pkid))
229            }
230        };
231
232        if let Some(publish) = self.check_collision(puback.pkid) {
233            self.outgoing_pub[publish.pkid as usize] = Some(publish.clone());
234            self.inflight += 1;
235
236            publish.write(&mut self.write)?;
237            let event = Event::Outgoing(Outgoing::Publish(publish.pkid, publish.topic));
238            self.events.push_back(event);
239            self.collision_ping_count = 0;
240        }
241
242        v
243    }
244
245    fn handle_incoming_pubrec(&mut self, pubrec: &PubRec) -> Result<(), StateError> {
246        match mem::replace(&mut self.outgoing_pub[pubrec.pkid as usize], None) {
247            Some(_) => {
248                // NOTE: Inflight - 1 for qos2 in comp
249                self.outgoing_rel[pubrec.pkid as usize] = Some(pubrec.pkid);
250                PubRel::new(pubrec.pkid).write(&mut self.write)?;
251
252                let event = Event::Outgoing(Outgoing::PubRel(pubrec.pkid));
253                self.events.push_back(event);
254                Ok(())
255            }
256            None => {
257                error!("Unsolicited pubrec packet: {:?}", pubrec.pkid);
258                Err(StateError::Unsolicited(pubrec.pkid))
259            }
260        }
261    }
262
263    fn handle_incoming_pubrel(&mut self, pubrel: &PubRel) -> Result<(), StateError> {
264        match mem::replace(&mut self.incoming_pub[pubrel.pkid as usize], None) {
265            Some(_) => {
266                PubComp::new(pubrel.pkid).write(&mut self.write)?;
267                let event = Event::Outgoing(Outgoing::PubComp(pubrel.pkid));
268                self.events.push_back(event);
269                Ok(())
270            }
271            None => {
272                error!("Unsolicited pubrel packet: {:?}", pubrel.pkid);
273                Err(StateError::Unsolicited(pubrel.pkid))
274            }
275        }
276    }
277
278    fn handle_incoming_pubcomp(&mut self, pubcomp: &PubComp) -> Result<(), StateError> {
279        if let Some(publish) = self.check_collision(pubcomp.pkid) {
280            publish.write(&mut self.write)?;
281            let event = Event::Outgoing(Outgoing::Publish(publish.pkid, publish.topic));
282            self.events.push_back(event);
283            self.collision_ping_count = 0;
284        }
285
286        match mem::replace(&mut self.outgoing_rel[pubcomp.pkid as usize], None) {
287            Some(_) => {
288                self.inflight -= 1;
289                Ok(())
290            }
291            None => {
292                error!("Unsolicited pubcomp packet: {:?}", pubcomp.pkid);
293                Err(StateError::Unsolicited(pubcomp.pkid))
294            }
295        }
296    }
297
298    fn handle_incoming_pingresp(&mut self) -> Result<(), StateError> {
299        self.await_pingresp = false;
300        Ok(())
301    }
302
303    /// Adds next packet identifier to QoS 1 and 2 publish packets and returns
304    /// it buy wrapping publish in packet
305    fn outgoing_publish(&mut self, mut publish: Publish) -> Result<(), StateError> {
306        if publish.qos != QoS::AtMostOnce {
307            if publish.pkid == 0 {
308                publish.pkid = self.next_pkid();
309            }
310
311            let pkid = publish.pkid;
312            if self
313                .outgoing_pub
314                .get(publish.pkid as usize)
315                .unwrap()
316                .is_some()
317            {
318                info!("Collision on packet id = {:?}", publish.pkid);
319                self.collision = Some(publish);
320                let event = Event::Outgoing(Outgoing::AwaitAck(pkid));
321                self.events.push_back(event);
322                return Ok(());
323            }
324
325            // if there is an existing publish at this pkid, this implies that broker hasn't acked this
326            // packet yet. This error is possible only when broker isn't acking sequentially
327            self.outgoing_pub[pkid as usize] = Some(publish.clone());
328            self.inflight += 1;
329        };
330
331        debug!(
332            "Publish. Topic = {}, Pkid = {:?}, Payload Size = {:?}",
333            publish.topic,
334            publish.pkid,
335            publish.payload.len()
336        );
337
338        publish.write(&mut self.write)?;
339        let event = Event::Outgoing(Outgoing::Publish(publish.pkid, publish.topic));
340        self.events.push_back(event);
341        Ok(())
342    }
343
344    fn outgoing_pubrel(&mut self, pubrel: PubRel) -> Result<(), StateError> {
345        let pubrel = self.save_pubrel(pubrel)?;
346
347        debug!("Pubrel. Pkid = {}", pubrel.pkid);
348        PubRel::new(pubrel.pkid).write(&mut self.write)?;
349
350        let event = Event::Outgoing(Outgoing::PubRel(pubrel.pkid));
351        self.events.push_back(event);
352        Ok(())
353    }
354
355    fn outgoing_puback(&mut self, puback: PubAck) -> Result<(), StateError> {
356        puback.write(&mut self.write)?;
357        let event = Event::Outgoing(Outgoing::PubAck(puback.pkid));
358        self.events.push_back(event);
359        Ok(())
360    }
361
362    fn outgoing_pubrec(&mut self, pubrec: PubRec) -> Result<(), StateError> {
363        pubrec.write(&mut self.write)?;
364        let event = Event::Outgoing(Outgoing::PubRec(pubrec.pkid));
365        self.events.push_back(event);
366        Ok(())
367    }
368
369    /// check when the last control packet/pingreq packet is received and return
370    /// the status which tells if keep alive time has exceeded
371    /// NOTE: status will be checked for zero keepalive times also
372    fn outgoing_ping(&mut self) -> Result<(), StateError> {
373        let elapsed_in = self.last_incoming.elapsed();
374        let elapsed_out = self.last_outgoing.elapsed();
375
376        if self.collision.is_some() {
377            self.collision_ping_count += 1;
378            if self.collision_ping_count >= 2 {
379                return Err(StateError::CollisionTimeout);
380            }
381        }
382
383        // raise error if last ping didn't receive ack
384        if self.await_pingresp {
385            return Err(StateError::AwaitPingResp);
386        }
387
388        self.await_pingresp = true;
389
390        debug!(
391            "Pingreq,
392            last incoming packet before {} millisecs,
393            last outgoing request before {} millisecs",
394            elapsed_in.as_millis(),
395            elapsed_out.as_millis()
396        );
397
398        PingReq.write(&mut self.write)?;
399        let event = Event::Outgoing(Outgoing::PingReq);
400        self.events.push_back(event);
401        Ok(())
402    }
403
404    fn outgoing_subscribe(&mut self, mut subscription: Subscribe) -> Result<(), StateError> {
405        let pkid = self.next_pkid();
406        subscription.pkid = pkid;
407
408        debug!(
409            "Subscribe. Topics = {:?}, Pkid = {:?}",
410            subscription.filters, subscription.pkid
411        );
412
413        subscription.write(&mut self.write)?;
414        let event = Event::Outgoing(Outgoing::Subscribe(subscription.pkid));
415        self.events.push_back(event);
416        Ok(())
417    }
418
419    fn outgoing_unsubscribe(&mut self, mut unsub: Unsubscribe) -> Result<(), StateError> {
420        let pkid = self.next_pkid();
421        unsub.pkid = pkid;
422
423        debug!(
424            "Unsubscribe. Topics = {:?}, Pkid = {:?}",
425            unsub.topics, unsub.pkid
426        );
427
428        unsub.write(&mut self.write)?;
429        let event = Event::Outgoing(Outgoing::Unsubscribe(unsub.pkid));
430        self.events.push_back(event);
431        Ok(())
432    }
433
434    fn outgoing_disconnect(&mut self) -> Result<(), StateError> {
435        debug!("Disconnect");
436
437        Disconnect.write(&mut self.write)?;
438        let event = Event::Outgoing(Outgoing::Disconnect);
439        self.events.push_back(event);
440        Ok(())
441    }
442
443    fn check_collision(&mut self, pkid: u16) -> Option<Publish> {
444        if let Some(publish) = &self.collision {
445            if publish.pkid == pkid {
446                return self.collision.take();
447            }
448        }
449
450        None
451    }
452
453    fn save_pubrel(&mut self, mut pubrel: PubRel) -> Result<PubRel, StateError> {
454        let pubrel = match pubrel.pkid {
455            // consider PacketIdentifier(0) as uninitialized packets
456            0 => {
457                pubrel.pkid = self.next_pkid();
458                pubrel
459            }
460            _ => pubrel,
461        };
462
463        self.outgoing_rel[pubrel.pkid as usize] = Some(pubrel.pkid);
464        Ok(pubrel)
465    }
466
467    /// http://stackoverflow.com/questions/11115364/mqtt-messageid-practical-implementation
468    /// Packet ids are incremented till maximum set inflight messages and reset to 1 after that.
469    ///
470    fn next_pkid(&mut self) -> u16 {
471        let next_pkid = self.last_pkid + 1;
472
473        // When next packet id is at the edge of inflight queue,
474        // set await flag. This instructs eventloop to stop
475        // processing requests until all the inflight publishes
476        // are acked
477        if next_pkid == self.max_inflight {
478            self.last_pkid = 0;
479            return next_pkid;
480        }
481
482        self.last_pkid = next_pkid;
483        next_pkid
484    }
485}
486
487#[cfg(test)]
488mod test {
489    use super::{MqttState, StateError};
490    use crate::mqttbytes::v4::*;
491    use crate::mqttbytes::*;
492    use crate::{Event, Incoming, MqttOptions, Outgoing, Request};
493
494    fn build_outgoing_publish(qos: QoS) -> Publish {
495        let topic = "hello/world".to_owned();
496        let payload = vec![1, 2, 3];
497
498        let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
499        publish.qos = qos;
500        publish
501    }
502
503    fn build_incoming_publish(qos: QoS, pkid: u16) -> Publish {
504        let topic = "hello/world".to_owned();
505        let payload = vec![1, 2, 3];
506
507        let mut publish = Publish::new(topic, QoS::AtLeastOnce, payload);
508        publish.pkid = pkid;
509        publish.qos = qos;
510        publish
511    }
512
513    fn build_mqttstate() -> MqttState {
514        MqttState::new(100, false)
515    }
516
517    #[test]
518    fn next_pkid_increments_as_expected() {
519        let mut mqtt = build_mqttstate();
520
521        for i in 1..=100 {
522            let pkid = mqtt.next_pkid();
523
524            // loops between 0-99. % 100 == 0 implies border
525            let expected = i % 100;
526            if expected == 0 {
527                break;
528            }
529
530            assert_eq!(expected, pkid);
531        }
532    }
533
534    #[test]
535    fn outgoing_publish_should_set_pkid_and_add_publish_to_queue() {
536        let mut mqtt = build_mqttstate();
537
538        // QoS0 Publish
539        let publish = build_outgoing_publish(QoS::AtMostOnce);
540
541        // QoS 0 publish shouldn't be saved in queue
542        mqtt.outgoing_publish(publish).unwrap();
543        assert_eq!(mqtt.last_pkid, 0);
544        assert_eq!(mqtt.inflight, 0);
545
546        // QoS1 Publish
547        let publish = build_outgoing_publish(QoS::AtLeastOnce);
548
549        // Packet id should be set and publish should be saved in queue
550        mqtt.outgoing_publish(publish.clone()).unwrap();
551        assert_eq!(mqtt.last_pkid, 1);
552        assert_eq!(mqtt.inflight, 1);
553
554        // Packet id should be incremented and publish should be saved in queue
555        mqtt.outgoing_publish(publish).unwrap();
556        assert_eq!(mqtt.last_pkid, 2);
557        assert_eq!(mqtt.inflight, 2);
558
559        // QoS1 Publish
560        let publish = build_outgoing_publish(QoS::ExactlyOnce);
561
562        // Packet id should be set and publish should be saved in queue
563        mqtt.outgoing_publish(publish.clone()).unwrap();
564        assert_eq!(mqtt.last_pkid, 3);
565        assert_eq!(mqtt.inflight, 3);
566
567        // Packet id should be incremented and publish should be saved in queue
568        mqtt.outgoing_publish(publish).unwrap();
569        assert_eq!(mqtt.last_pkid, 4);
570        assert_eq!(mqtt.inflight, 4);
571    }
572
573    #[test]
574    fn incoming_publish_should_be_added_to_queue_correctly() {
575        let mut mqtt = build_mqttstate();
576
577        // QoS0, 1, 2 Publishes
578        let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
579        let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
580        let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
581
582        mqtt.handle_incoming_publish(&publish1).unwrap();
583        mqtt.handle_incoming_publish(&publish2).unwrap();
584        mqtt.handle_incoming_publish(&publish3).unwrap();
585
586        let pkid = mqtt.incoming_pub[3].unwrap();
587
588        // only qos2 publish should be add to queue
589        assert_eq!(pkid, 3);
590    }
591
592    #[test]
593    fn incoming_publish_should_be_acked() {
594        let mut mqtt = build_mqttstate();
595
596        // QoS0, 1, 2 Publishes
597        let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
598        let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
599        let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
600
601        mqtt.handle_incoming_publish(&publish1).unwrap();
602        mqtt.handle_incoming_publish(&publish2).unwrap();
603        mqtt.handle_incoming_publish(&publish3).unwrap();
604
605        if let Event::Outgoing(Outgoing::PubAck(pkid)) = mqtt.events[0] {
606            assert_eq!(pkid, 2);
607        } else {
608            panic!("missing puback")
609        }
610
611        if let Event::Outgoing(Outgoing::PubRec(pkid)) = mqtt.events[1] {
612            assert_eq!(pkid, 3);
613        } else {
614            panic!("missing PubRec")
615        }
616    }
617
618    #[test]
619    fn incoming_publish_should_not_be_acked_with_manual_acks() {
620        let mut mqtt = build_mqttstate();
621        mqtt.manual_acks = true;
622
623        // QoS0, 1, 2 Publishes
624        let publish1 = build_incoming_publish(QoS::AtMostOnce, 1);
625        let publish2 = build_incoming_publish(QoS::AtLeastOnce, 2);
626        let publish3 = build_incoming_publish(QoS::ExactlyOnce, 3);
627
628        mqtt.handle_incoming_publish(&publish1).unwrap();
629        mqtt.handle_incoming_publish(&publish2).unwrap();
630        mqtt.handle_incoming_publish(&publish3).unwrap();
631
632        let pkid = mqtt.incoming_pub[3].unwrap();
633        assert_eq!(pkid, 3);
634
635        assert!(mqtt.events.is_empty());
636    }
637
638    #[test]
639    fn incoming_qos2_publish_should_send_rec_to_network_and_publish_to_user() {
640        let mut mqtt = build_mqttstate();
641        let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
642
643        mqtt.handle_incoming_publish(&publish).unwrap();
644        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
645        match packet {
646            Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
647            _ => panic!("Invalid network request: {:?}", packet),
648        }
649    }
650
651    #[test]
652    fn incoming_puback_should_remove_correct_publish_from_queue() {
653        let mut mqtt = build_mqttstate();
654
655        let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
656        let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
657
658        mqtt.outgoing_publish(publish1).unwrap();
659        mqtt.outgoing_publish(publish2).unwrap();
660        assert_eq!(mqtt.inflight, 2);
661
662        mqtt.handle_incoming_puback(&PubAck::new(1)).unwrap();
663        assert_eq!(mqtt.inflight, 1);
664
665        mqtt.handle_incoming_puback(&PubAck::new(2)).unwrap();
666        assert_eq!(mqtt.inflight, 0);
667
668        assert!(mqtt.outgoing_pub[1].is_none());
669        assert!(mqtt.outgoing_pub[2].is_none());
670    }
671
672    #[test]
673    fn incoming_pubrec_should_release_publish_from_queue_and_add_relid_to_rel_queue() {
674        let mut mqtt = build_mqttstate();
675
676        let publish1 = build_outgoing_publish(QoS::AtLeastOnce);
677        let publish2 = build_outgoing_publish(QoS::ExactlyOnce);
678
679        let _publish_out = mqtt.outgoing_publish(publish1);
680        let _publish_out = mqtt.outgoing_publish(publish2);
681
682        mqtt.handle_incoming_pubrec(&PubRec::new(2)).unwrap();
683        assert_eq!(mqtt.inflight, 2);
684
685        // check if the remaining element's pkid is 1
686        let backup = mqtt.outgoing_pub[1].clone();
687        assert_eq!(backup.unwrap().pkid, 1);
688
689        // check if the qos2 element's release pkid is 2
690        assert_eq!(mqtt.outgoing_rel[2].unwrap(), 2);
691    }
692
693    #[test]
694    fn incoming_pubrec_should_send_release_to_network_and_nothing_to_user() {
695        let mut mqtt = build_mqttstate();
696
697        let publish = build_outgoing_publish(QoS::ExactlyOnce);
698        mqtt.outgoing_publish(publish).unwrap();
699        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
700        match packet {
701            Packet::Publish(publish) => assert_eq!(publish.pkid, 1),
702            packet => panic!("Invalid network request: {:?}", packet),
703        }
704
705        mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
706        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
707        match packet {
708            Packet::PubRel(pubrel) => assert_eq!(pubrel.pkid, 1),
709            packet => panic!("Invalid network request: {:?}", packet),
710        }
711    }
712
713    #[test]
714    fn incoming_pubrel_should_send_comp_to_network_and_nothing_to_user() {
715        let mut mqtt = build_mqttstate();
716        let publish = build_incoming_publish(QoS::ExactlyOnce, 1);
717
718        mqtt.handle_incoming_publish(&publish).unwrap();
719        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
720        match packet {
721            Packet::PubRec(pubrec) => assert_eq!(pubrec.pkid, 1),
722            packet => panic!("Invalid network request: {:?}", packet),
723        }
724
725        mqtt.handle_incoming_pubrel(&PubRel::new(1)).unwrap();
726        let packet = read(&mut mqtt.write, 10 * 1024).unwrap();
727        match packet {
728            Packet::PubComp(pubcomp) => assert_eq!(pubcomp.pkid, 1),
729            packet => panic!("Invalid network request: {:?}", packet),
730        }
731    }
732
733    #[test]
734    fn incoming_pubcomp_should_release_correct_pkid_from_release_queue() {
735        let mut mqtt = build_mqttstate();
736        let publish = build_outgoing_publish(QoS::ExactlyOnce);
737
738        mqtt.outgoing_publish(publish).unwrap();
739        mqtt.handle_incoming_pubrec(&PubRec::new(1)).unwrap();
740
741        mqtt.handle_incoming_pubcomp(&PubComp::new(1)).unwrap();
742        assert_eq!(mqtt.inflight, 0);
743    }
744
745    #[test]
746    fn outgoing_ping_handle_should_throw_errors_for_no_pingresp() {
747        let mut mqtt = build_mqttstate();
748        let mut opts = MqttOptions::new("test", "localhost", 1883);
749        opts.set_keep_alive(std::time::Duration::from_secs(10));
750        mqtt.outgoing_ping().unwrap();
751
752        // network activity other than pingresp
753        let publish = build_outgoing_publish(QoS::AtLeastOnce);
754        mqtt.handle_outgoing_packet(Request::Publish(publish))
755            .unwrap();
756        mqtt.handle_incoming_packet(Incoming::PubAck(PubAck::new(1)))
757            .unwrap();
758
759        // should throw error because we didn't get pingresp for previous ping
760        match mqtt.outgoing_ping() {
761            Ok(_) => panic!("Should throw pingresp await error"),
762            Err(StateError::AwaitPingResp) => (),
763            Err(e) => panic!("Should throw pingresp await error. Error = {:?}", e),
764        }
765    }
766
767    #[test]
768    fn outgoing_ping_handle_should_succeed_if_pingresp_is_received() {
769        let mut mqtt = build_mqttstate();
770
771        let mut opts = MqttOptions::new("test", "localhost", 1883);
772        opts.set_keep_alive(std::time::Duration::from_secs(10));
773
774        // should ping
775        mqtt.outgoing_ping().unwrap();
776        mqtt.handle_incoming_packet(Incoming::PingResp).unwrap();
777
778        // should ping
779        mqtt.outgoing_ping().unwrap();
780    }
781}