1extern crate alloc;
15use alloc::collections::{BTreeMap, BTreeSet};
16use alloc::vec::Vec;
17
18use crate::wire_types::{FragmentNumber, Guid, Locator, SequenceNumber};
19
20#[derive(Debug, Clone)]
22pub struct ReaderProxy {
23 pub remote_reader_guid: Guid,
25 pub unicast_locators: Vec<Locator>,
27 pub multicast_locators: Vec<Locator>,
29 pub is_reliable: bool,
31 highest_acked_sn: SequenceNumber,
34 highest_sent_sn: SequenceNumber,
36 requested_changes: BTreeSet<SequenceNumber>,
38 requested_fragments: BTreeMap<SequenceNumber, BTreeSet<FragmentNumber>>,
41 last_activity: core::time::Duration,
48 negotiated_data_representation: i16,
54}
55
56impl ReaderProxy {
57 #[must_use]
59 pub fn new(
60 remote_reader_guid: Guid,
61 unicast_locators: Vec<Locator>,
62 multicast_locators: Vec<Locator>,
63 is_reliable: bool,
64 ) -> Self {
65 Self {
66 remote_reader_guid,
67 unicast_locators,
68 multicast_locators,
69 is_reliable,
70 highest_acked_sn: SequenceNumber(0),
73 highest_sent_sn: SequenceNumber(0),
74 requested_changes: BTreeSet::new(),
75 requested_fragments: BTreeMap::new(),
76 last_activity: core::time::Duration::ZERO,
77 negotiated_data_representation: crate::publication_data::data_representation::XCDR2,
80 }
81 }
82
83 pub fn set_negotiated_data_representation(&mut self, id: i16) {
86 self.negotiated_data_representation = id;
87 }
88
89 #[must_use]
91 pub fn negotiated_data_representation(&self) -> i16 {
92 self.negotiated_data_representation
93 }
94
95 pub fn note_activity(&mut self, now: core::time::Duration) {
98 self.last_activity = now;
99 }
100
101 #[must_use]
107 pub fn is_inactive(&self, now: core::time::Duration, threshold: core::time::Duration) -> bool {
108 now.checked_sub(self.last_activity)
109 .is_some_and(|elapsed| elapsed > threshold)
110 }
111
112 #[must_use]
114 pub fn last_activity(&self) -> core::time::Duration {
115 self.last_activity
116 }
117
118 pub fn skip_samples_up_to(&mut self, sn: SequenceNumber) {
129 if sn > self.highest_sent_sn {
130 self.highest_sent_sn = sn;
131 }
132 if sn > self.highest_acked_sn {
133 self.highest_acked_sn = sn;
134 }
135 }
136
137 pub fn acked_changes_set(&mut self, base: SequenceNumber) {
140 let new_acked = SequenceNumber(base.0 - 1);
141 if new_acked > self.highest_acked_sn {
142 self.highest_acked_sn = new_acked;
143 }
144 self.requested_changes
146 .retain(|sn| *sn > self.highest_acked_sn);
147 self.requested_fragments
149 .retain(|sn, _| *sn > self.highest_acked_sn);
150 }
151
152 pub fn requested_changes_set(&mut self, sns: impl IntoIterator<Item = SequenceNumber>) {
154 for sn in sns {
155 if sn > self.highest_acked_sn {
156 self.requested_changes.insert(sn);
157 }
158 }
159 }
160
161 pub fn next_requested_change(&mut self) -> Option<SequenceNumber> {
163 let sn = *self.requested_changes.iter().next()?;
164 self.requested_changes.remove(&sn);
165 Some(sn)
166 }
167
168 pub fn next_unsent_change(&mut self, cache_max: SequenceNumber) -> Option<SequenceNumber> {
172 if self.highest_sent_sn < cache_max {
173 let next = SequenceNumber(self.highest_sent_sn.0 + 1);
174 self.highest_sent_sn = next;
175 Some(next)
176 } else {
177 None
178 }
179 }
180
181 #[must_use]
184 pub fn unacked_changes(&self, cache_max: SequenceNumber) -> bool {
185 cache_max > self.highest_acked_sn
186 }
187
188 #[must_use]
190 pub fn highest_acked_sn(&self) -> SequenceNumber {
191 self.highest_acked_sn
192 }
193
194 #[must_use]
196 pub fn highest_sent_sn(&self) -> SequenceNumber {
197 self.highest_sent_sn
198 }
199
200 #[must_use]
202 pub fn pending_requested_count(&self) -> usize {
203 self.requested_changes.len()
204 }
205
206 pub fn requested_fragments_set(
209 &mut self,
210 sn: SequenceNumber,
211 fragments: impl IntoIterator<Item = FragmentNumber>,
212 ) {
213 if sn <= self.highest_acked_sn {
214 return;
215 }
216 let entry = self.requested_fragments.entry(sn).or_default();
217 for f in fragments {
218 if f != FragmentNumber::UNKNOWN {
219 entry.insert(f);
220 }
221 }
222 if entry.is_empty() {
223 self.requested_fragments.remove(&sn);
224 }
225 }
226
227 pub fn next_requested_fragment(&mut self) -> Option<(SequenceNumber, FragmentNumber)> {
229 let sn = *self.requested_fragments.keys().next()?;
230 let frag = {
231 let set = self.requested_fragments.get_mut(&sn)?;
232 let f = *set.iter().next()?;
233 set.remove(&f);
234 f
235 };
236 if self
237 .requested_fragments
238 .get(&sn)
239 .is_some_and(alloc::collections::BTreeSet::is_empty)
240 {
241 self.requested_fragments.remove(&sn);
242 }
243 Some((sn, frag))
244 }
245
246 #[must_use]
248 pub fn pending_requested_fragment_count(&self) -> usize {
249 self.requested_fragments.values().map(BTreeSet::len).sum()
250 }
251}
252
253#[cfg(test)]
254#[allow(clippy::expect_used, clippy::unwrap_used)]
255mod tests {
256 use super::*;
257 use crate::wire_types::{EntityId, GuidPrefix};
258
259 fn sn(n: i64) -> SequenceNumber {
260 SequenceNumber(n)
261 }
262
263 fn proxy() -> ReaderProxy {
264 let guid = Guid::new(
265 GuidPrefix::from_bytes([1; 12]),
266 EntityId::user_reader_with_key([0xA0, 0xB0, 0xC0]),
267 );
268 ReaderProxy::new(guid, alloc::vec![], alloc::vec![], true)
269 }
270
271 #[test]
272 fn fresh_proxy_has_zero_state() {
273 let p = proxy();
274 assert_eq!(p.highest_acked_sn(), sn(0));
275 assert_eq!(p.highest_sent_sn(), sn(0));
276 assert_eq!(p.pending_requested_count(), 0);
277 }
278
279 #[test]
280 fn acked_changes_set_monotonic() {
281 let mut p = proxy();
282 p.acked_changes_set(sn(5));
283 assert_eq!(p.highest_acked_sn(), sn(4));
284 p.acked_changes_set(sn(3));
286 assert_eq!(p.highest_acked_sn(), sn(4));
287 p.acked_changes_set(sn(10));
288 assert_eq!(p.highest_acked_sn(), sn(9));
289 }
290
291 #[test]
292 fn requested_changes_set_above_ack_only() {
293 let mut p = proxy();
294 p.acked_changes_set(sn(5)); p.requested_changes_set([sn(2), sn(4), sn(6), sn(8)]);
296 assert_eq!(p.pending_requested_count(), 2);
298 }
299
300 #[test]
301 fn next_requested_change_pulls_smallest_first() {
302 let mut p = proxy();
303 p.requested_changes_set([sn(8), sn(3), sn(5)]);
304 assert_eq!(p.next_requested_change(), Some(sn(3)));
305 assert_eq!(p.next_requested_change(), Some(sn(5)));
306 assert_eq!(p.next_requested_change(), Some(sn(8)));
307 assert_eq!(p.next_requested_change(), None);
308 }
309
310 #[test]
311 fn next_unsent_change_walks_sequentially() {
312 let mut p = proxy();
313 let cache_max = sn(3);
314 assert_eq!(p.next_unsent_change(cache_max), Some(sn(1)));
315 assert_eq!(p.next_unsent_change(cache_max), Some(sn(2)));
316 assert_eq!(p.next_unsent_change(cache_max), Some(sn(3)));
317 assert_eq!(p.next_unsent_change(cache_max), None);
318 }
319
320 #[test]
321 fn next_unsent_change_picks_up_after_cache_grows() {
322 let mut p = proxy();
323 assert_eq!(p.next_unsent_change(sn(2)), Some(sn(1)));
324 assert_eq!(p.next_unsent_change(sn(2)), Some(sn(2)));
325 assert_eq!(p.next_unsent_change(sn(2)), None);
326 assert_eq!(p.next_unsent_change(sn(5)), Some(sn(3)));
327 }
328
329 #[test]
330 fn unacked_changes_detects_gap() {
331 let mut p = proxy();
332 assert!(!p.unacked_changes(sn(0)));
333 assert!(p.unacked_changes(sn(5)));
334 p.acked_changes_set(sn(6)); assert!(!p.unacked_changes(sn(5)));
336 assert!(p.unacked_changes(sn(7)));
337 }
338
339 #[test]
340 fn acking_also_prunes_requested_changes() {
341 let mut p = proxy();
342 p.requested_changes_set([sn(3), sn(5), sn(7)]);
343 assert_eq!(p.pending_requested_count(), 3);
344 p.acked_changes_set(sn(6)); assert_eq!(p.pending_requested_count(), 1);
347 assert_eq!(p.next_requested_change(), Some(sn(7)));
348 }
349
350 fn frag(n: u32) -> FragmentNumber {
351 FragmentNumber(n)
352 }
353
354 #[test]
355 fn requested_fragments_set_above_ack_only() {
356 let mut p = proxy();
357 p.acked_changes_set(sn(3)); p.requested_fragments_set(sn(2), [frag(1), frag(2)]);
359 p.requested_fragments_set(sn(5), [frag(1), frag(3)]);
360 assert_eq!(p.pending_requested_fragment_count(), 2);
361 }
362
363 #[test]
364 fn next_requested_fragment_pulls_smallest_sn_first() {
365 let mut p = proxy();
366 p.requested_fragments_set(sn(5), [frag(3), frag(1)]);
367 p.requested_fragments_set(sn(2), [frag(2)]);
368 assert_eq!(p.next_requested_fragment(), Some((sn(2), frag(2))));
369 assert_eq!(p.next_requested_fragment(), Some((sn(5), frag(1))));
370 assert_eq!(p.next_requested_fragment(), Some((sn(5), frag(3))));
371 assert_eq!(p.next_requested_fragment(), None);
372 }
373
374 #[test]
375 fn acking_also_prunes_requested_fragments() {
376 let mut p = proxy();
377 p.requested_fragments_set(sn(3), [frag(1)]);
378 p.requested_fragments_set(sn(7), [frag(2)]);
379 assert_eq!(p.pending_requested_fragment_count(), 2);
380 p.acked_changes_set(sn(5)); assert_eq!(p.pending_requested_fragment_count(), 1);
383 assert_eq!(p.next_requested_fragment(), Some((sn(7), frag(2))));
384 }
385
386 #[test]
387 fn requested_fragments_ignore_unknown_sentinel() {
388 let mut p = proxy();
389 p.requested_fragments_set(sn(1), [FragmentNumber::UNKNOWN, frag(1)]);
390 assert_eq!(p.pending_requested_fragment_count(), 1);
391 }
392
393 #[test]
396 fn proxy_is_inactive_initially_when_threshold_is_short() {
397 let p = proxy();
400 assert!(p.is_inactive(
401 core::time::Duration::from_secs(10),
402 core::time::Duration::from_secs(1)
403 ));
404 }
405
406 #[test]
407 fn proxy_is_active_after_note_activity() {
408 let mut p = proxy();
409 p.note_activity(core::time::Duration::from_secs(5));
410 assert_eq!(p.last_activity(), core::time::Duration::from_secs(5));
411 assert!(!p.is_inactive(
413 core::time::Duration::from_secs(6),
414 core::time::Duration::from_secs(2)
415 ));
416 }
417
418 #[test]
419 fn proxy_becomes_inactive_after_threshold_elapses() {
420 let mut p = proxy();
421 p.note_activity(core::time::Duration::from_secs(5));
422 assert!(p.is_inactive(
424 core::time::Duration::from_secs(15),
425 core::time::Duration::from_secs(2)
426 ));
427 }
428
429 #[test]
430 fn proxy_inactivity_not_reported_when_now_before_last_activity() {
431 let mut p = proxy();
434 p.note_activity(core::time::Duration::from_secs(100));
435 assert!(!p.is_inactive(
436 core::time::Duration::from_secs(50),
437 core::time::Duration::from_secs(1)
438 ));
439 }
440}