Skip to main content

zerodds_coap_bridge/
reliability.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! CoAP-Reliability — RFC 7252 §4.2-§4.7.
5//!
6//! Spec §4.2: CON-Messages erwarten ACK; bei Timeout wird mit
7//! exponentiellem Backoff retransmittet bis `MAX_RETRANSMIT`.
8
9use alloc::collections::BTreeMap;
10use alloc::vec::Vec;
11
12/// Retransmit-Default-Konstanten — RFC 7252 §4.8.
13pub const ACK_TIMEOUT_MS: u64 = 2000;
14/// `ACK_RANDOM_FACTOR` (Spec §4.8) — wir verwenden 1.5 gerundet auf
15/// einen Multiplikator von 3/2.
16pub const ACK_RANDOM_FACTOR_NUM: u64 = 3;
17/// Nenner zu `ACK_RANDOM_FACTOR_NUM`.
18pub const ACK_RANDOM_FACTOR_DEN: u64 = 2;
19/// `MAX_RETRANSMIT` (Spec §4.8).
20pub const MAX_RETRANSMIT: u32 = 4;
21
22/// Pending-CON-Eintrag in der Reliability-Queue.
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct PendingConfirmable {
25    /// Message-Id (16-Bit).
26    pub message_id: u16,
27    /// Token-Bytes (0..=8).
28    pub token: Vec<u8>,
29    /// Encoded Message-Bytes — werden bei Retransmit re-emittiert.
30    pub bytes: Vec<u8>,
31    /// Verbleibende Retransmits (start = `MAX_RETRANSMIT`).
32    pub retransmits_left: u32,
33    /// Naechstes Timeout (in Millisekunden seit Start).
34    pub next_timeout_ms: u64,
35    /// Aktuelles Timeout-Intervall (verdoppelt sich pro Retransmit).
36    pub current_interval_ms: u64,
37}
38
39/// Reliability-Tracker — Caller pumpt Zeit + Empfangs-Events rein,
40/// Tracker liefert Retransmit/Drop-Decisions.
41#[derive(Debug, Default, Clone, PartialEq, Eq)]
42pub struct ReliabilityTracker {
43    pending: BTreeMap<u16, PendingConfirmable>,
44}
45
46/// Output eines `tick`-Schritts.
47#[derive(Debug, Clone, PartialEq, Eq, Default)]
48pub struct TickOutput {
49    /// Messages, die retransmittiert werden muessen (Bytes-Slice
50    /// bereits enthalten).
51    pub retransmit: Vec<PendingConfirmable>,
52    /// Message-Ids, deren `MAX_RETRANSMIT` erschoepft sind und
53    /// deren Verbindung als verloren angesehen wird.
54    pub timed_out: Vec<u16>,
55}
56
57impl ReliabilityTracker {
58    /// Konstruktor.
59    #[must_use]
60    pub fn new() -> Self {
61        Self::default()
62    }
63
64    /// Sendet eine CON — registriert sie zur Retransmission.
65    pub fn send_confirmable(
66        &mut self,
67        message_id: u16,
68        token: Vec<u8>,
69        bytes: Vec<u8>,
70        now_ms: u64,
71    ) {
72        self.pending.insert(
73            message_id,
74            PendingConfirmable {
75                message_id,
76                token,
77                bytes,
78                retransmits_left: MAX_RETRANSMIT,
79                next_timeout_ms: now_ms + initial_interval(),
80                current_interval_ms: initial_interval(),
81            },
82        );
83    }
84
85    /// ACK empfangen — entferne Pending-Eintrag.
86    pub fn receive_ack(&mut self, message_id: u16) -> bool {
87        self.pending.remove(&message_id).is_some()
88    }
89
90    /// RST empfangen — entferne Pending-Eintrag (Spec §4.2: RST
91    /// terminiert ohne Retransmit).
92    pub fn receive_rst(&mut self, message_id: u16) -> bool {
93        self.pending.remove(&message_id).is_some()
94    }
95
96    /// Tick — pruefe ob ein Pending faellig ist.
97    pub fn tick(&mut self, now_ms: u64) -> TickOutput {
98        let mut out = TickOutput::default();
99        let mut to_drop = Vec::new();
100        for (mid, entry) in self.pending.iter_mut() {
101            if now_ms < entry.next_timeout_ms {
102                continue;
103            }
104            if entry.retransmits_left == 0 {
105                to_drop.push(*mid);
106                continue;
107            }
108            entry.retransmits_left -= 1;
109            entry.current_interval_ms *= 2;
110            entry.next_timeout_ms = now_ms + entry.current_interval_ms;
111            out.retransmit.push(entry.clone());
112        }
113        for mid in to_drop {
114            self.pending.remove(&mid);
115            out.timed_out.push(mid);
116        }
117        out
118    }
119
120    /// Anzahl pending CONs.
121    #[must_use]
122    pub fn pending_count(&self) -> usize {
123        self.pending.len()
124    }
125}
126
127fn initial_interval() -> u64 {
128    // Spec §4.8: ACK_TIMEOUT * ACK_RANDOM_FACTOR. We use the upper
129    // bound of the random range deterministically (Caller can
130    // randomize externally if desired).
131    ACK_TIMEOUT_MS * ACK_RANDOM_FACTOR_NUM / ACK_RANDOM_FACTOR_DEN
132}
133
134#[cfg(test)]
135#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
136mod tests {
137    use super::*;
138
139    #[test]
140    fn fresh_tracker_is_empty() {
141        let t = ReliabilityTracker::new();
142        assert_eq!(t.pending_count(), 0);
143    }
144
145    #[test]
146    fn send_then_ack_clears_pending() {
147        let mut t = ReliabilityTracker::new();
148        t.send_confirmable(42, alloc::vec![1, 2], alloc::vec![0; 10], 0);
149        assert_eq!(t.pending_count(), 1);
150        assert!(t.receive_ack(42));
151        assert_eq!(t.pending_count(), 0);
152    }
153
154    #[test]
155    fn unknown_ack_returns_false() {
156        let mut t = ReliabilityTracker::new();
157        assert!(!t.receive_ack(99));
158    }
159
160    #[test]
161    fn rst_clears_pending() {
162        let mut t = ReliabilityTracker::new();
163        t.send_confirmable(42, alloc::vec![], alloc::vec![], 0);
164        assert!(t.receive_rst(42));
165        assert_eq!(t.pending_count(), 0);
166    }
167
168    #[test]
169    fn tick_before_timeout_does_nothing() {
170        let mut t = ReliabilityTracker::new();
171        t.send_confirmable(42, alloc::vec![], alloc::vec![0; 10], 0);
172        let out = t.tick(100);
173        assert!(out.retransmit.is_empty());
174        assert!(out.timed_out.is_empty());
175        assert_eq!(t.pending_count(), 1);
176    }
177
178    #[test]
179    fn tick_after_timeout_retransmits() {
180        let mut t = ReliabilityTracker::new();
181        t.send_confirmable(42, alloc::vec![], alloc::vec![0; 5], 0);
182        let out = t.tick(initial_interval() + 1);
183        assert_eq!(out.retransmit.len(), 1);
184        assert_eq!(out.retransmit[0].message_id, 42);
185        assert!(out.timed_out.is_empty());
186    }
187
188    #[test]
189    fn exhausting_retransmits_times_out() {
190        let mut t = ReliabilityTracker::new();
191        t.send_confirmable(42, alloc::vec![], alloc::vec![0; 5], 0);
192        let mut now = 0u64;
193        let mut interval = initial_interval();
194        // Drain MAX_RETRANSMIT retransmits.
195        for _ in 0..MAX_RETRANSMIT {
196            now += interval + 1;
197            interval *= 2;
198            let _ = t.tick(now);
199        }
200        // One more tick should now mark the message as timed_out.
201        now += interval + 1;
202        let out = t.tick(now);
203        assert!(!out.timed_out.is_empty(), "should be timed out");
204        assert!(t.pending_count() == 0);
205    }
206
207    #[test]
208    fn interval_doubles_per_retransmit() {
209        let mut t = ReliabilityTracker::new();
210        t.send_confirmable(42, alloc::vec![], alloc::vec![0; 5], 0);
211        let _ = t.tick(initial_interval() + 1);
212        let after_first = t.pending.get(&42).unwrap().current_interval_ms;
213        assert_eq!(after_first, initial_interval() * 2);
214    }
215}