Skip to main content

nodedb_cluster/rpc_codec/
peer_seq.rs

1//! Per-peer monotonic sequence counters and a 64-entry sliding-window
2//! replay detector.
3//!
4//! # Outbound: [`PeerSeqSender`]
5//!
6//! Each local-node-to-peer direction has a distinct counter. Sent frames
7//! carry strictly-increasing sequence numbers starting from 1. `0` is
8//! reserved as a sentinel meaning "never sent".
9//!
10//! # Inbound: [`PeerSeqWindow`]
11//!
12//! A 64-bit bitmap anchored at `last_accepted_seq`. Frame with sequence
13//! `n` is:
14//! - accepted and window advanced if `n > last_accepted_seq`
15//! - accepted and bit set if `last_accepted_seq - 63 <= n < last_accepted_seq`
16//!   and the bit was previously unset
17//! - rejected as replay if the bit was already set, or if `n` is older
18//!   than the window
19//!
20//! The window model is identical to IPsec AH/ESP (RFC 4303 §3.4.3).
21//! 64 entries is the standard default — large enough that legitimate
22//! reordering in-flight (rare over one QUIC stream but possible across
23//! QUIC streams) is tolerated, and small enough to fit in a single
24//! `u64`.
25
26use std::collections::HashMap;
27use std::sync::RwLock;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30use crate::error::{ClusterError, Result};
31
32/// Size of the inbound replay-detection window.
33pub const REPLAY_WINDOW: u64 = 64;
34
35/// Outbound monotonic counter for this `AuthContext`. One counter total
36/// — not one per target — because the receiver's replay window is keyed
37/// by the *sender's* `local_node_id`. If this sender used a per-target
38/// counter, two distinct targets' traffic would share the same window on
39/// any node that receives from both: seq=1 from target=A and seq=1 from
40/// target=B collide in the receiver's `window[sender_id]`. A single
41/// counter makes every outbound seq globally unique per sender.
42#[derive(Default, Debug)]
43pub struct PeerSeqSender {
44    counter: AtomicU64,
45}
46
47impl PeerSeqSender {
48    pub fn new() -> Self {
49        Self::default()
50    }
51
52    /// Reserve and return the next outbound sequence number. Starts at 1
53    /// and is strictly increasing across all targets for this sender.
54    pub fn next(&self) -> u64 {
55        self.counter.fetch_add(1, Ordering::Relaxed) + 1
56    }
57
58    /// Current counter value (0 if no frames have been sent). Test-only.
59    #[cfg(test)]
60    pub fn peek(&self) -> u64 {
61        self.counter.load(Ordering::Relaxed)
62    }
63}
64
65/// Per-peer inbound sliding-window replay detector. One window per
66/// (local_node, remote_peer) pair.
67#[derive(Default, Debug)]
68pub struct PeerSeqWindow {
69    windows: RwLock<HashMap<u64, WindowState>>,
70}
71
72/// Sliding-window state for one peer.
73#[derive(Default, Debug, Clone, Copy)]
74struct WindowState {
75    /// Highest accepted sequence seen from this peer. 0 if none yet.
76    high: u64,
77    /// Bitmap of accepted sequences in `[high - 63, high]`. Bit 0 is
78    /// `high`, bit 63 is `high - 63`.
79    mask: u64,
80}
81
82impl PeerSeqWindow {
83    pub fn new() -> Self {
84        Self::default()
85    }
86
87    /// Accept `seq` from `peer_id`, rejecting replays and out-of-window
88    /// stale frames. Returns `Err(ClusterError::Codec)` on rejection.
89    ///
90    /// Sequence `0` is always rejected — a well-formed sender starts at
91    /// 1, so `0` means "nothing sent", which is not a valid inbound frame.
92    pub fn accept(&self, peer_id: u64, seq: u64) -> Result<()> {
93        if seq == 0 {
94            return Err(ClusterError::Codec {
95                detail: format!("peer {peer_id} sent reserved sequence 0"),
96            });
97        }
98
99        let mut guard = self.windows.write().unwrap_or_else(|p| p.into_inner());
100        let state = guard.entry(peer_id).or_default();
101
102        if seq > state.high {
103            // Frame advances the window. Shift by the delta and set bit 0.
104            let delta = seq - state.high;
105            state.mask = if delta >= REPLAY_WINDOW {
106                1
107            } else {
108                (state.mask << delta) | 1
109            };
110            state.high = seq;
111            return Ok(());
112        }
113
114        // Frame is `state.high - seq` positions back in the window.
115        let offset = state.high - seq;
116        if offset >= REPLAY_WINDOW {
117            return Err(ClusterError::Codec {
118                detail: format!(
119                    "peer {peer_id} sent stale sequence {seq}, window high is {}",
120                    state.high
121                ),
122            });
123        }
124        let bit = 1u64 << offset;
125        if state.mask & bit != 0 {
126            return Err(ClusterError::Codec {
127                detail: format!(
128                    "peer {peer_id} replayed sequence {seq} (window high {})",
129                    state.high
130                ),
131            });
132        }
133        state.mask |= bit;
134        Ok(())
135    }
136
137    #[cfg(test)]
138    pub fn highest(&self, peer_id: u64) -> u64 {
139        let guard = self.windows.read().unwrap_or_else(|p| p.into_inner());
140        guard.get(&peer_id).map(|w| w.high).unwrap_or(0)
141    }
142}
143
144#[cfg(test)]
145mod tests {
146    use super::*;
147
148    #[test]
149    fn outbound_counter_starts_at_one() {
150        let s = PeerSeqSender::new();
151        assert_eq!(s.next(), 1);
152        assert_eq!(s.next(), 2);
153        assert_eq!(s.next(), 3);
154    }
155
156    #[test]
157    fn outbound_counter_is_single_across_all_targets() {
158        // The outbound counter is intentionally shared across targets: the
159        // receiver's replay window is keyed by the sender's local_node_id,
160        // so per-target counters would collide in the same window. A
161        // single monotonic counter guarantees every emitted seq is unique
162        // from the receiver's point of view regardless of which target
163        // the sender was aiming at.
164        let s = PeerSeqSender::new();
165        assert_eq!(s.next(), 1);
166        assert_eq!(s.next(), 2);
167        assert_eq!(s.next(), 3);
168        assert_eq!(s.next(), 4);
169    }
170
171    #[test]
172    fn window_accepts_monotonic_sequence() {
173        let w = PeerSeqWindow::new();
174        for seq in 1..=10 {
175            w.accept(7, seq).unwrap();
176        }
177        assert_eq!(w.highest(7), 10);
178    }
179
180    #[test]
181    fn window_rejects_immediate_replay() {
182        let w = PeerSeqWindow::new();
183        w.accept(1, 1).unwrap();
184        let err = w.accept(1, 1).unwrap_err();
185        assert!(err.to_string().contains("replayed"));
186    }
187
188    #[test]
189    fn window_rejects_zero_sequence() {
190        let w = PeerSeqWindow::new();
191        let err = w.accept(1, 0).unwrap_err();
192        assert!(err.to_string().contains("reserved sequence 0"));
193    }
194
195    #[test]
196    fn window_accepts_in_order_gap_within_window() {
197        let w = PeerSeqWindow::new();
198        // 1 ... 5 arrive out of order but within window.
199        w.accept(1, 5).unwrap();
200        w.accept(1, 3).unwrap();
201        w.accept(1, 1).unwrap();
202        w.accept(1, 2).unwrap();
203        w.accept(1, 4).unwrap();
204        assert_eq!(w.highest(1), 5);
205    }
206
207    #[test]
208    fn window_rejects_replay_within_window() {
209        let w = PeerSeqWindow::new();
210        w.accept(1, 5).unwrap();
211        w.accept(1, 3).unwrap();
212        let err = w.accept(1, 3).unwrap_err();
213        assert!(err.to_string().contains("replayed"));
214    }
215
216    #[test]
217    fn window_rejects_stale_outside_window() {
218        let w = PeerSeqWindow::new();
219        w.accept(1, 100).unwrap();
220        // Window is [37, 100]. seq=36 is stale.
221        let err = w.accept(1, 36).unwrap_err();
222        assert!(err.to_string().contains("stale sequence 36"));
223        // seq=37 is inside the window edge and acceptable.
224        w.accept(1, 37).unwrap();
225    }
226
227    #[test]
228    fn window_advances_beyond_window_clears_mask() {
229        let w = PeerSeqWindow::new();
230        w.accept(1, 1).unwrap();
231        w.accept(1, 2).unwrap();
232        w.accept(1, 100).unwrap();
233        // Sequences 1, 2 are now outside the window anchored at 100 and
234        // must be rejected on replay (not accepted as fresh within mask).
235        let err = w.accept(1, 1).unwrap_err();
236        assert!(err.to_string().contains("stale sequence 1"));
237    }
238
239    #[test]
240    fn windows_are_independent_per_peer() {
241        let w = PeerSeqWindow::new();
242        w.accept(1, 10).unwrap();
243        w.accept(2, 10).unwrap();
244        w.accept(1, 9).unwrap();
245        w.accept(2, 9).unwrap();
246        // Independent — neither is a replay.
247        assert_eq!(w.highest(1), 10);
248        assert_eq!(w.highest(2), 10);
249    }
250}