mqrstt/
state.rs

1use std::{
2    collections::{BTreeSet, VecDeque},
3    sync::Mutex,
4};
5
6use crate::{
7    available_packet_ids::AvailablePacketIds,
8    error::HandlerError,
9    packets::{Packet, PubRel, Publish},
10};
11
12#[derive(Debug)]
13/// [`State`] keeps track of the outgoing and incoming messages on which actions needs to be taken.
14/// In the future this will be adjusted to rebroadcast packets that have not been acked and thus need to be rebroadcast.
15pub struct State {
16    apkid: AvailablePacketIds,
17
18    /// Outgoing Subcribe requests which aren't acked yet
19    outgoing_sub: Mutex<BTreeSet<u16>>,
20    /// Outgoing Unsubcribe requests which aren't acked yet
21    outgoing_unsub: Mutex<BTreeSet<u16>>,
22
23    /// Outgoing QoS 1, 2 publishes which aren't acked yet
24    outgoing_pub: Mutex<Vec<Option<Publish>>>,
25    /// The order of the publish packets. This needs to be tracked to maintain in order communicaion on retransmit
26    outgoing_pub_order: Mutex<VecDeque<u16>>,
27
28    /// Packet ids of released QoS 2 publishes
29    outgoing_rel: Mutex<BTreeSet<u16>>,
30
31    /// Packet IDs of packets that arrive with QoS 2
32    incoming_pub: Mutex<BTreeSet<u16>>,
33}
34
35impl State {
36    pub fn new(receive_maximum: u16, apkid: AvailablePacketIds) -> Self {
37        Self {
38            apkid,
39
40            outgoing_sub: Mutex::new(BTreeSet::new()),
41            outgoing_unsub: Mutex::new(BTreeSet::new()),
42            outgoing_pub: Mutex::new(vec![None; receive_maximum as usize]),
43
44            // shifting should be minimal with a vecdeque
45            outgoing_pub_order: Mutex::new(VecDeque::new()),
46            outgoing_rel: Mutex::new(BTreeSet::new()),
47            incoming_pub: Mutex::new(BTreeSet::new()),
48        }
49    }
50
51    pub fn make_pkid_available(&self, pkid: u16) -> Result<(), HandlerError> {
52        self.apkid.mark_available(pkid)
53    }
54
55    /// Returns true is newly inserted.
56    /// False otherwise
57    pub fn add_incoming_pub(&self, pkid: u16) -> bool {
58        self.incoming_pub.lock().unwrap().insert(pkid)
59    }
60
61    /// Returns whether the packett id was present.
62    pub fn remove_incoming_pub(&self, pkid: u16) -> bool {
63        self.incoming_pub.lock().unwrap().remove(&pkid)
64    }
65
66    pub fn add_outgoing_pub(&self, pkid: u16, publish: Publish) -> Result<(), HandlerError> {
67        let mut outgoing_pub = self.outgoing_pub.lock().unwrap();
68        let current_pub = outgoing_pub[(pkid - 1) as usize].replace(publish);
69
70        if current_pub.is_some() {
71            Err(HandlerError::PacketIdCollision(pkid))
72        } else {
73            let mut outgoing_pub_order = self.outgoing_pub_order.lock().unwrap();
74            outgoing_pub_order.push_back(pkid);
75            Ok(())
76        }
77    }
78
79    pub fn remove_outgoing_pub(&self, pkid: u16) -> Option<Publish> {
80        {
81            let mut outgoing_pub_order = self.outgoing_pub_order.lock().unwrap();
82
83            for (index, id) in outgoing_pub_order.iter().enumerate() {
84                if pkid == *id {
85                    outgoing_pub_order.remove(index);
86                    break;
87                }
88            }
89        }
90
91        let mut outgoing_pub = self.outgoing_pub.lock().unwrap();
92        outgoing_pub[pkid as usize - 1].take()
93    }
94
95    pub fn add_outgoing_rel(&self, pkid: u16) -> bool {
96        self.outgoing_rel.lock().unwrap().insert(pkid)
97    }
98
99    /// Returns whether the packett id was present.
100    pub fn remove_outgoing_rel(&self, pkid: &u16) -> bool {
101        self.outgoing_rel.lock().unwrap().remove(pkid)
102    }
103
104    pub fn add_outgoing_sub(&self, pkid: u16) -> bool {
105        self.outgoing_sub.lock().unwrap().insert(pkid)
106    }
107
108    /// Returns whether the packett id was present.
109    pub fn remove_outgoing_sub(&self, pkid: u16) -> bool {
110        self.outgoing_sub.lock().unwrap().remove(&pkid)
111    }
112
113    pub fn add_outgoing_unsub(&self, pkid: u16) -> bool {
114        self.outgoing_unsub.lock().unwrap().insert(pkid)
115    }
116
117    /// Returns whether the packett id was present.
118    pub fn remove_outgoing_unsub(&self, pkid: u16) -> bool {
119        self.outgoing_unsub.lock().unwrap().remove(&pkid)
120    }
121
122    /// Returns the identifiers that are in use but can be freed
123    pub fn reset(&self, retransmission: bool) -> (Vec<u16>, Vec<Packet>) {
124        let State {
125            apkid: _,
126            outgoing_sub,
127            outgoing_unsub,
128            outgoing_pub,
129            outgoing_pub_order,
130            outgoing_rel,
131            incoming_pub,
132        } = self;
133
134        let mut outgoing_sub = outgoing_sub.lock().unwrap();
135        let mut outgoing_unsub = outgoing_unsub.lock().unwrap();
136        let mut outgoing_pub = outgoing_pub.lock().unwrap();
137        let mut outgoing_pub_order = outgoing_pub_order.lock().unwrap();
138        let mut outgoing_rel = outgoing_rel.lock().unwrap();
139        let mut incoming_pub = incoming_pub.lock().unwrap();
140
141        let mut freeable_ids = Vec::<u16>::with_capacity(outgoing_sub.len() + outgoing_unsub.len());
142        // let mut freeable_ids = outgoing_sub.iter().chain(outgoing_unsub.iter()).collect::<Vec<u16>>();
143        let mut retransmit = Vec::with_capacity(outgoing_pub_order.len());
144
145        freeable_ids.extend(outgoing_sub.iter());
146        freeable_ids.extend(outgoing_unsub.iter());
147
148        if retransmission {
149            for i in outgoing_pub_order.iter() {
150                let mut packet = outgoing_pub[(*i - 1) as usize].clone().unwrap();
151                packet.dup = true;
152                retransmit.push(Packet::Publish(packet));
153            }
154
155            for &rel in outgoing_rel.iter() {
156                retransmit.push(Packet::PubRel(PubRel::new(rel)));
157            }
158        } else {
159            freeable_ids.extend(outgoing_pub_order.iter());
160
161            *outgoing_pub = vec![None; outgoing_pub.len()];
162            outgoing_pub_order.clear();
163            outgoing_rel.clear();
164        }
165
166        outgoing_sub.clear();
167        outgoing_unsub.clear();
168
169        incoming_pub.clear();
170
171        (freeable_ids, retransmit)
172    }
173}
174
175#[cfg(test)]
176impl State {
177    pub fn outgoing_sub(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
178        self.outgoing_sub.lock().unwrap()
179    }
180    pub fn outgoing_unsub(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
181        self.outgoing_unsub.lock().unwrap()
182    }
183    pub fn outgoing_pub(&mut self) -> std::sync::MutexGuard<'_, Vec<Option<Publish>>> {
184        self.outgoing_pub.lock().unwrap()
185    }
186    pub fn outgoing_pub_order(&mut self) -> std::sync::MutexGuard<'_, VecDeque<u16>> {
187        self.outgoing_pub_order.lock().unwrap()
188    }
189    pub fn outgoing_rel(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
190        self.outgoing_rel.lock().unwrap()
191    }
192    pub fn incoming_pub(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
193        self.incoming_pub.lock().unwrap()
194    }
195}