zerodds_coap_bridge/
reliability.rs1use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12pub const ACK_TIMEOUT_MS: u64 = 2000;
14pub const ACK_RANDOM_FACTOR_NUM: u64 = 3;
17pub const ACK_RANDOM_FACTOR_DEN: u64 = 2;
19pub const MAX_RETRANSMIT: u32 = 4;
21
22#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct PendingConfirmable {
25 pub message_id: u16,
27 pub token: Vec<u8>,
29 pub bytes: Vec<u8>,
31 pub retransmits_left: u32,
33 pub next_timeout_ms: u64,
35 pub current_interval_ms: u64,
37}
38
39#[derive(Debug, Default, Clone, PartialEq, Eq)]
42pub struct ReliabilityTracker {
43 pending: BTreeMap<u16, PendingConfirmable>,
44}
45
46#[derive(Debug, Clone, PartialEq, Eq, Default)]
48pub struct TickOutput {
49 pub retransmit: Vec<PendingConfirmable>,
52 pub timed_out: Vec<u16>,
55}
56
57impl ReliabilityTracker {
58 #[must_use]
60 pub fn new() -> Self {
61 Self::default()
62 }
63
64 pub fn send_confirmable(
66 &mut self,
67 message_id: u16,
68 token: Vec<u8>,
69 bytes: Vec<u8>,
70 now_ms: u64,
71 ) {
72 self.pending.insert(
73 message_id,
74 PendingConfirmable {
75 message_id,
76 token,
77 bytes,
78 retransmits_left: MAX_RETRANSMIT,
79 next_timeout_ms: now_ms + initial_interval(),
80 current_interval_ms: initial_interval(),
81 },
82 );
83 }
84
85 pub fn receive_ack(&mut self, message_id: u16) -> bool {
87 self.pending.remove(&message_id).is_some()
88 }
89
90 pub fn receive_rst(&mut self, message_id: u16) -> bool {
93 self.pending.remove(&message_id).is_some()
94 }
95
96 pub fn tick(&mut self, now_ms: u64) -> TickOutput {
98 let mut out = TickOutput::default();
99 let mut to_drop = Vec::new();
100 for (mid, entry) in self.pending.iter_mut() {
101 if now_ms < entry.next_timeout_ms {
102 continue;
103 }
104 if entry.retransmits_left == 0 {
105 to_drop.push(*mid);
106 continue;
107 }
108 entry.retransmits_left -= 1;
109 entry.current_interval_ms *= 2;
110 entry.next_timeout_ms = now_ms + entry.current_interval_ms;
111 out.retransmit.push(entry.clone());
112 }
113 for mid in to_drop {
114 self.pending.remove(&mid);
115 out.timed_out.push(mid);
116 }
117 out
118 }
119
120 #[must_use]
122 pub fn pending_count(&self) -> usize {
123 self.pending.len()
124 }
125}
126
127fn initial_interval() -> u64 {
128 ACK_TIMEOUT_MS * ACK_RANDOM_FACTOR_NUM / ACK_RANDOM_FACTOR_DEN
132}
133
134#[cfg(test)]
135#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
136mod tests {
137 use super::*;
138
139 #[test]
140 fn fresh_tracker_is_empty() {
141 let t = ReliabilityTracker::new();
142 assert_eq!(t.pending_count(), 0);
143 }
144
145 #[test]
146 fn send_then_ack_clears_pending() {
147 let mut t = ReliabilityTracker::new();
148 t.send_confirmable(42, alloc::vec![1, 2], alloc::vec![0; 10], 0);
149 assert_eq!(t.pending_count(), 1);
150 assert!(t.receive_ack(42));
151 assert_eq!(t.pending_count(), 0);
152 }
153
154 #[test]
155 fn unknown_ack_returns_false() {
156 let mut t = ReliabilityTracker::new();
157 assert!(!t.receive_ack(99));
158 }
159
160 #[test]
161 fn rst_clears_pending() {
162 let mut t = ReliabilityTracker::new();
163 t.send_confirmable(42, alloc::vec![], alloc::vec![], 0);
164 assert!(t.receive_rst(42));
165 assert_eq!(t.pending_count(), 0);
166 }
167
168 #[test]
169 fn tick_before_timeout_does_nothing() {
170 let mut t = ReliabilityTracker::new();
171 t.send_confirmable(42, alloc::vec![], alloc::vec![0; 10], 0);
172 let out = t.tick(100);
173 assert!(out.retransmit.is_empty());
174 assert!(out.timed_out.is_empty());
175 assert_eq!(t.pending_count(), 1);
176 }
177
178 #[test]
179 fn tick_after_timeout_retransmits() {
180 let mut t = ReliabilityTracker::new();
181 t.send_confirmable(42, alloc::vec![], alloc::vec![0; 5], 0);
182 let out = t.tick(initial_interval() + 1);
183 assert_eq!(out.retransmit.len(), 1);
184 assert_eq!(out.retransmit[0].message_id, 42);
185 assert!(out.timed_out.is_empty());
186 }
187
188 #[test]
189 fn exhausting_retransmits_times_out() {
190 let mut t = ReliabilityTracker::new();
191 t.send_confirmable(42, alloc::vec![], alloc::vec![0; 5], 0);
192 let mut now = 0u64;
193 let mut interval = initial_interval();
194 for _ in 0..MAX_RETRANSMIT {
196 now += interval + 1;
197 interval *= 2;
198 let _ = t.tick(now);
199 }
200 now += interval + 1;
202 let out = t.tick(now);
203 assert!(!out.timed_out.is_empty(), "should be timed out");
204 assert!(t.pending_count() == 0);
205 }
206
207 #[test]
208 fn interval_doubles_per_retransmit() {
209 let mut t = ReliabilityTracker::new();
210 t.send_confirmable(42, alloc::vec![], alloc::vec![0; 5], 0);
211 let _ = t.tick(initial_interval() + 1);
212 let after_first = t.pending.get(&42).unwrap().current_interval_ms;
213 assert_eq!(after_first, initial_interval() * 2);
214 }
215}