1use std::{
2 collections::{BTreeSet, VecDeque},
3 sync::Mutex,
4};
5
6use crate::{
7 available_packet_ids::AvailablePacketIds,
8 error::HandlerError,
9 packets::{Packet, PubRel, Publish},
10};
11
12#[derive(Debug)]
13pub struct State {
16 apkid: AvailablePacketIds,
17
18 outgoing_sub: Mutex<BTreeSet<u16>>,
20 outgoing_unsub: Mutex<BTreeSet<u16>>,
22
23 outgoing_pub: Mutex<Vec<Option<Publish>>>,
25 outgoing_pub_order: Mutex<VecDeque<u16>>,
27
28 outgoing_rel: Mutex<BTreeSet<u16>>,
30
31 incoming_pub: Mutex<BTreeSet<u16>>,
33}
34
35impl State {
36 pub fn new(receive_maximum: u16, apkid: AvailablePacketIds) -> Self {
37 Self {
38 apkid,
39
40 outgoing_sub: Mutex::new(BTreeSet::new()),
41 outgoing_unsub: Mutex::new(BTreeSet::new()),
42 outgoing_pub: Mutex::new(vec![None; receive_maximum as usize]),
43
44 outgoing_pub_order: Mutex::new(VecDeque::new()),
46 outgoing_rel: Mutex::new(BTreeSet::new()),
47 incoming_pub: Mutex::new(BTreeSet::new()),
48 }
49 }
50
51 pub fn make_pkid_available(&self, pkid: u16) -> Result<(), HandlerError> {
52 self.apkid.mark_available(pkid)
53 }
54
55 pub fn add_incoming_pub(&self, pkid: u16) -> bool {
58 self.incoming_pub.lock().unwrap().insert(pkid)
59 }
60
61 pub fn remove_incoming_pub(&self, pkid: u16) -> bool {
63 self.incoming_pub.lock().unwrap().remove(&pkid)
64 }
65
66 pub fn add_outgoing_pub(&self, pkid: u16, publish: Publish) -> Result<(), HandlerError> {
67 let mut outgoing_pub = self.outgoing_pub.lock().unwrap();
68 let current_pub = outgoing_pub[(pkid - 1) as usize].replace(publish);
69
70 if current_pub.is_some() {
71 Err(HandlerError::PacketIdCollision(pkid))
72 } else {
73 let mut outgoing_pub_order = self.outgoing_pub_order.lock().unwrap();
74 outgoing_pub_order.push_back(pkid);
75 Ok(())
76 }
77 }
78
79 pub fn remove_outgoing_pub(&self, pkid: u16) -> Option<Publish> {
80 {
81 let mut outgoing_pub_order = self.outgoing_pub_order.lock().unwrap();
82
83 for (index, id) in outgoing_pub_order.iter().enumerate() {
84 if pkid == *id {
85 outgoing_pub_order.remove(index);
86 break;
87 }
88 }
89 }
90
91 let mut outgoing_pub = self.outgoing_pub.lock().unwrap();
92 outgoing_pub[pkid as usize - 1].take()
93 }
94
95 pub fn add_outgoing_rel(&self, pkid: u16) -> bool {
96 self.outgoing_rel.lock().unwrap().insert(pkid)
97 }
98
99 pub fn remove_outgoing_rel(&self, pkid: &u16) -> bool {
101 self.outgoing_rel.lock().unwrap().remove(pkid)
102 }
103
104 pub fn add_outgoing_sub(&self, pkid: u16) -> bool {
105 self.outgoing_sub.lock().unwrap().insert(pkid)
106 }
107
108 pub fn remove_outgoing_sub(&self, pkid: u16) -> bool {
110 self.outgoing_sub.lock().unwrap().remove(&pkid)
111 }
112
113 pub fn add_outgoing_unsub(&self, pkid: u16) -> bool {
114 self.outgoing_unsub.lock().unwrap().insert(pkid)
115 }
116
117 pub fn remove_outgoing_unsub(&self, pkid: u16) -> bool {
119 self.outgoing_unsub.lock().unwrap().remove(&pkid)
120 }
121
122 pub fn reset(&self, retransmission: bool) -> (Vec<u16>, Vec<Packet>) {
124 let State {
125 apkid: _,
126 outgoing_sub,
127 outgoing_unsub,
128 outgoing_pub,
129 outgoing_pub_order,
130 outgoing_rel,
131 incoming_pub,
132 } = self;
133
134 let mut outgoing_sub = outgoing_sub.lock().unwrap();
135 let mut outgoing_unsub = outgoing_unsub.lock().unwrap();
136 let mut outgoing_pub = outgoing_pub.lock().unwrap();
137 let mut outgoing_pub_order = outgoing_pub_order.lock().unwrap();
138 let mut outgoing_rel = outgoing_rel.lock().unwrap();
139 let mut incoming_pub = incoming_pub.lock().unwrap();
140
141 let mut freeable_ids = Vec::<u16>::with_capacity(outgoing_sub.len() + outgoing_unsub.len());
142 let mut retransmit = Vec::with_capacity(outgoing_pub_order.len());
144
145 freeable_ids.extend(outgoing_sub.iter());
146 freeable_ids.extend(outgoing_unsub.iter());
147
148 if retransmission {
149 for i in outgoing_pub_order.iter() {
150 let mut packet = outgoing_pub[(*i - 1) as usize].clone().unwrap();
151 packet.dup = true;
152 retransmit.push(Packet::Publish(packet));
153 }
154
155 for &rel in outgoing_rel.iter() {
156 retransmit.push(Packet::PubRel(PubRel::new(rel)));
157 }
158 } else {
159 freeable_ids.extend(outgoing_pub_order.iter());
160
161 *outgoing_pub = vec![None; outgoing_pub.len()];
162 outgoing_pub_order.clear();
163 outgoing_rel.clear();
164 }
165
166 outgoing_sub.clear();
167 outgoing_unsub.clear();
168
169 incoming_pub.clear();
170
171 (freeable_ids, retransmit)
172 }
173}
174
175#[cfg(test)]
176impl State {
177 pub fn outgoing_sub(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
178 self.outgoing_sub.lock().unwrap()
179 }
180 pub fn outgoing_unsub(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
181 self.outgoing_unsub.lock().unwrap()
182 }
183 pub fn outgoing_pub(&mut self) -> std::sync::MutexGuard<'_, Vec<Option<Publish>>> {
184 self.outgoing_pub.lock().unwrap()
185 }
186 pub fn outgoing_pub_order(&mut self) -> std::sync::MutexGuard<'_, VecDeque<u16>> {
187 self.outgoing_pub_order.lock().unwrap()
188 }
189 pub fn outgoing_rel(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
190 self.outgoing_rel.lock().unwrap()
191 }
192 pub fn incoming_pub(&mut self) -> std::sync::MutexGuard<'_, BTreeSet<u16>> {
193 self.incoming_pub.lock().unwrap()
194 }
195}