zerodds_rtps/
writer_proxy.rs1extern crate alloc;
15use alloc::collections::BTreeSet;
16use alloc::vec::Vec;
17
18use crate::wire_types::{Guid, Locator, SequenceNumber};
19
20#[derive(Debug, Clone)]
22pub struct WriterProxy {
23 pub remote_writer_guid: Guid,
25 pub unicast_locators: Vec<Locator>,
27 pub multicast_locators: Vec<Locator>,
29 pub is_reliable: bool,
31 first_available_sn: SequenceNumber,
33 last_available_sn: SequenceNumber,
35 highest_received_sn: SequenceNumber,
37 received: BTreeSet<SequenceNumber>,
39 irrelevant: BTreeSet<SequenceNumber>,
41}
42
43impl WriterProxy {
44 #[must_use]
46 pub fn new(
47 remote_writer_guid: Guid,
48 unicast_locators: Vec<Locator>,
49 multicast_locators: Vec<Locator>,
50 is_reliable: bool,
51 ) -> Self {
52 Self {
53 remote_writer_guid,
54 unicast_locators,
55 multicast_locators,
56 is_reliable,
57 first_available_sn: SequenceNumber(1),
58 last_available_sn: SequenceNumber(0),
59 highest_received_sn: SequenceNumber(0),
60 received: BTreeSet::new(),
61 irrelevant: BTreeSet::new(),
62 }
63 }
64
65 pub fn update_from_heartbeat(&mut self, first_sn: SequenceNumber, last_sn: SequenceNumber) {
70 if first_sn > self.first_available_sn {
72 self.first_available_sn = first_sn;
73 let split = self.received.split_off(&first_sn);
77 self.received = split;
78 let split = self.irrelevant.split_off(&first_sn);
79 self.irrelevant = split;
80 }
81 if last_sn > self.last_available_sn {
82 self.last_available_sn = last_sn;
83 }
84 }
85
86 pub fn received_change_set(&mut self, sn: SequenceNumber) {
88 if sn < self.first_available_sn {
89 return;
91 }
92 self.received.insert(sn);
93 if sn > self.highest_received_sn {
94 self.highest_received_sn = sn;
95 }
96 }
97
98 pub fn irrelevant_change_set(&mut self, sn: SequenceNumber) {
100 if sn < self.first_available_sn {
101 return;
102 }
103 self.irrelevant.insert(sn);
104 }
105
106 #[must_use]
108 pub fn is_known(&self, sn: SequenceNumber) -> bool {
109 self.received.contains(&sn) || self.irrelevant.contains(&sn)
110 }
111
112 #[must_use]
118 pub fn missing_changes(&self, max_count: usize) -> Vec<SequenceNumber> {
119 let mut out = Vec::new();
120 if self.last_available_sn < self.first_available_sn {
121 return out;
122 }
123 let mut sn = self.first_available_sn;
124 while sn <= self.last_available_sn && out.len() < max_count {
125 if !self.is_known(sn) {
126 out.push(sn);
127 }
128 sn = SequenceNumber(sn.0 + 1);
129 }
130 out
131 }
132
133 #[must_use]
135 pub fn has_missing_changes(&self) -> bool {
136 !self.missing_changes(1).is_empty()
137 }
138
139 #[must_use]
141 pub fn first_available_sn(&self) -> SequenceNumber {
142 self.first_available_sn
143 }
144
145 #[must_use]
147 pub fn last_available_sn(&self) -> SequenceNumber {
148 self.last_available_sn
149 }
150
151 #[must_use]
153 pub fn highest_received_sn(&self) -> SequenceNumber {
154 self.highest_received_sn
155 }
156
157 #[must_use]
162 pub fn acknack_base(&self) -> SequenceNumber {
163 let mut sn = self.first_available_sn;
164 while sn <= self.last_available_sn {
165 if !self.is_known(sn) {
166 return sn;
167 }
168 sn = SequenceNumber(sn.0 + 1);
169 }
170 SequenceNumber(self.last_available_sn.0 + 1)
171 }
172}
173
174#[cfg(test)]
175#[allow(clippy::expect_used, clippy::unwrap_used)]
176mod tests {
177 use super::*;
178 use crate::wire_types::{EntityId, GuidPrefix};
179
180 fn sn(n: i64) -> SequenceNumber {
181 SequenceNumber(n)
182 }
183
184 fn proxy() -> WriterProxy {
185 let guid = Guid::new(
186 GuidPrefix::from_bytes([2; 12]),
187 EntityId::user_writer_with_key([0x10, 0x20, 0x30]),
188 );
189 WriterProxy::new(guid, alloc::vec![], alloc::vec![], true)
190 }
191
192 #[test]
193 fn fresh_proxy_has_no_missing() {
194 let p = proxy();
195 assert!(!p.has_missing_changes());
196 assert_eq!(p.missing_changes(10), alloc::vec![]);
197 assert_eq!(p.acknack_base(), sn(1));
198 }
199
200 #[test]
201 fn heartbeat_sets_available_range() {
202 let mut p = proxy();
203 p.update_from_heartbeat(sn(1), sn(5));
204 assert_eq!(p.first_available_sn(), sn(1));
205 assert_eq!(p.last_available_sn(), sn(5));
206 assert_eq!(
208 p.missing_changes(10),
209 alloc::vec![sn(1), sn(2), sn(3), sn(4), sn(5)]
210 );
211 }
212
213 #[test]
214 fn received_removes_from_missing() {
215 let mut p = proxy();
216 p.update_from_heartbeat(sn(1), sn(5));
217 p.received_change_set(sn(2));
218 p.received_change_set(sn(4));
219 assert_eq!(p.missing_changes(10), alloc::vec![sn(1), sn(3), sn(5)]);
220 assert_eq!(p.acknack_base(), sn(1));
221 }
222
223 #[test]
224 fn gap_marks_irrelevant() {
225 let mut p = proxy();
226 p.update_from_heartbeat(sn(1), sn(5));
227 p.irrelevant_change_set(sn(3));
228 assert_eq!(
229 p.missing_changes(10),
230 alloc::vec![sn(1), sn(2), sn(4), sn(5)]
231 );
232 }
233
234 #[test]
235 fn acknack_base_walks_up() {
236 let mut p = proxy();
237 p.update_from_heartbeat(sn(1), sn(3));
238 p.received_change_set(sn(1));
239 p.received_change_set(sn(2));
240 assert_eq!(p.acknack_base(), sn(3));
241 p.received_change_set(sn(3));
242 assert_eq!(p.acknack_base(), sn(4));
243 }
244
245 #[test]
246 fn heartbeat_advancing_first_prunes_old_state() {
247 let mut p = proxy();
248 p.update_from_heartbeat(sn(1), sn(10));
249 p.received_change_set(sn(3));
250 p.received_change_set(sn(7));
251 p.update_from_heartbeat(sn(5), sn(10));
253 assert_eq!(p.first_available_sn(), sn(5));
254 assert!(!p.is_known(sn(3)));
256 assert!(p.is_known(sn(7)));
257 }
258
259 #[test]
260 fn highest_received_tracks_max() {
261 let mut p = proxy();
262 p.update_from_heartbeat(sn(1), sn(10));
263 p.received_change_set(sn(3));
264 p.received_change_set(sn(7));
265 p.received_change_set(sn(5));
266 assert_eq!(p.highest_received_sn(), sn(7));
267 }
268
269 #[test]
270 fn received_before_first_is_ignored() {
271 let mut p = proxy();
272 p.update_from_heartbeat(sn(5), sn(10));
273 p.received_change_set(sn(2));
274 assert!(!p.is_known(sn(2)));
275 assert_eq!(p.highest_received_sn(), sn(0));
276 }
277
278 #[test]
279 fn missing_changes_respects_max_count() {
280 let mut p = proxy();
281 p.update_from_heartbeat(sn(1), sn(100));
282 let m = p.missing_changes(5);
283 assert_eq!(m, alloc::vec![sn(1), sn(2), sn(3), sn(4), sn(5)]);
284 }
285
286 #[test]
287 fn acknack_base_when_all_received_is_last_plus_one() {
288 let mut p = proxy();
289 p.update_from_heartbeat(sn(1), sn(3));
290 p.received_change_set(sn(1));
291 p.received_change_set(sn(2));
292 p.received_change_set(sn(3));
293 assert_eq!(p.acknack_base(), sn(4));
294 assert!(!p.has_missing_changes());
295 }
296}