Skip to main content

nodedb_cluster/rpc_codec/
peer_seq.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-peer monotonic sequence counters and a 64-entry sliding-window
4//! replay detector.
5//!
6//! # Outbound: [`PeerSeqSender`]
7//!
8//! Each local-node-to-peer direction has a distinct counter. Sent frames
9//! carry strictly-increasing sequence numbers starting from 1. `0` is
10//! reserved as a sentinel meaning "never sent".
11//!
12//! # Inbound: [`PeerSeqWindow`]
13//!
14//! A 64-bit bitmap anchored at `last_accepted_seq`. Frame with sequence
15//! `n` is:
16//! - accepted and window advanced if `n > last_accepted_seq`
17//! - accepted and bit set if `last_accepted_seq - 63 <= n < last_accepted_seq`
18//!   and the bit was previously unset
19//! - rejected as replay if the bit was already set, or if `n` is older
20//!   than the window
21//!
22//! The window model is identical to IPsec AH/ESP (RFC 4303 §3.4.3).
23//! 64 entries is the standard default — large enough that legitimate
24//! reordering in-flight (rare over one QUIC stream but possible across
25//! QUIC streams) is tolerated, and small enough to fit in a single
26//! `u64`.
27
28use std::collections::HashMap;
29use std::sync::RwLock;
30use std::sync::atomic::{AtomicU64, Ordering};
31
32use crate::error::{ClusterError, Result};
33
34/// Size of the inbound replay-detection window.
35pub const REPLAY_WINDOW: u64 = 64;
36
37/// Outbound monotonic counter for this `AuthContext`. One counter total
38/// — not one per target — because the receiver's replay window is keyed
39/// by the *sender's* `local_node_id`. If this sender used a per-target
40/// counter, two distinct targets' traffic would share the same window on
41/// any node that receives from both: seq=1 from target=A and seq=1 from
42/// target=B collide in the receiver's `window[sender_id]`. A single
43/// counter makes every outbound seq globally unique per sender.
44#[derive(Default, Debug)]
45pub struct PeerSeqSender {
46    counter: AtomicU64,
47}
48
49impl PeerSeqSender {
50    pub fn new() -> Self {
51        Self::default()
52    }
53
54    /// Reserve and return the next outbound sequence number. Starts at 1
55    /// and is strictly increasing across all targets for this sender.
56    pub fn next(&self) -> u64 {
57        self.counter.fetch_add(1, Ordering::Relaxed) + 1
58    }
59
60    /// Current counter value (0 if no frames have been sent). Test-only.
61    #[cfg(test)]
62    pub fn peek(&self) -> u64 {
63        self.counter.load(Ordering::Relaxed)
64    }
65}
66
67/// Per-peer inbound sliding-window replay detector. One window per
68/// (local_node, remote_peer) pair.
69#[derive(Default, Debug)]
70pub struct PeerSeqWindow {
71    windows: RwLock<HashMap<u64, WindowState>>,
72}
73
74/// Sliding-window state for one peer.
75#[derive(Default, Debug, Clone, Copy)]
76struct WindowState {
77    /// Highest accepted sequence seen from this peer. 0 if none yet.
78    high: u64,
79    /// Bitmap of accepted sequences in `[high - 63, high]`. Bit 0 is
80    /// `high`, bit 63 is `high - 63`.
81    mask: u64,
82}
83
84impl PeerSeqWindow {
85    pub fn new() -> Self {
86        Self::default()
87    }
88
89    /// Accept `seq` from `peer_id`, rejecting replays and out-of-window
90    /// stale frames. Returns `Err(ClusterError::Codec)` on rejection.
91    ///
92    /// Sequence `0` is always rejected — a well-formed sender starts at
93    /// 1, so `0` means "nothing sent", which is not a valid inbound frame.
94    pub fn accept(&self, peer_id: u64, seq: u64) -> Result<()> {
95        if seq == 0 {
96            return Err(ClusterError::Codec {
97                detail: format!("peer {peer_id} sent reserved sequence 0"),
98            });
99        }
100
101        let mut guard = self.windows.write().unwrap_or_else(|p| p.into_inner());
102        let state = guard.entry(peer_id).or_default();
103
104        if seq > state.high {
105            // Frame advances the window. Shift by the delta and set bit 0.
106            let delta = seq - state.high;
107            state.mask = if delta >= REPLAY_WINDOW {
108                1
109            } else {
110                (state.mask << delta) | 1
111            };
112            state.high = seq;
113            return Ok(());
114        }
115
116        // Frame is `state.high - seq` positions back in the window.
117        let offset = state.high - seq;
118        if offset >= REPLAY_WINDOW {
119            return Err(ClusterError::Codec {
120                detail: format!(
121                    "peer {peer_id} sent stale sequence {seq}, window high is {}",
122                    state.high
123                ),
124            });
125        }
126        let bit = 1u64 << offset;
127        if state.mask & bit != 0 {
128            return Err(ClusterError::Codec {
129                detail: format!(
130                    "peer {peer_id} replayed sequence {seq} (window high {})",
131                    state.high
132                ),
133            });
134        }
135        state.mask |= bit;
136        Ok(())
137    }
138
139    #[cfg(test)]
140    pub fn highest(&self, peer_id: u64) -> u64 {
141        let guard = self.windows.read().unwrap_or_else(|p| p.into_inner());
142        guard.get(&peer_id).map(|w| w.high).unwrap_or(0)
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149
150    #[test]
151    fn outbound_counter_starts_at_one() {
152        let s = PeerSeqSender::new();
153        assert_eq!(s.next(), 1);
154        assert_eq!(s.next(), 2);
155        assert_eq!(s.next(), 3);
156    }
157
158    #[test]
159    fn outbound_counter_is_single_across_all_targets() {
160        // The outbound counter is intentionally shared across targets: the
161        // receiver's replay window is keyed by the sender's local_node_id,
162        // so per-target counters would collide in the same window. A
163        // single monotonic counter guarantees every emitted seq is unique
164        // from the receiver's point of view regardless of which target
165        // the sender was aiming at.
166        let s = PeerSeqSender::new();
167        assert_eq!(s.next(), 1);
168        assert_eq!(s.next(), 2);
169        assert_eq!(s.next(), 3);
170        assert_eq!(s.next(), 4);
171    }
172
173    #[test]
174    fn window_accepts_monotonic_sequence() {
175        let w = PeerSeqWindow::new();
176        for seq in 1..=10 {
177            w.accept(7, seq).unwrap();
178        }
179        assert_eq!(w.highest(7), 10);
180    }
181
182    #[test]
183    fn window_rejects_immediate_replay() {
184        let w = PeerSeqWindow::new();
185        w.accept(1, 1).unwrap();
186        let err = w.accept(1, 1).unwrap_err();
187        assert!(err.to_string().contains("replayed"));
188    }
189
190    #[test]
191    fn window_rejects_zero_sequence() {
192        let w = PeerSeqWindow::new();
193        let err = w.accept(1, 0).unwrap_err();
194        assert!(err.to_string().contains("reserved sequence 0"));
195    }
196
197    #[test]
198    fn window_accepts_in_order_gap_within_window() {
199        let w = PeerSeqWindow::new();
200        // 1 ... 5 arrive out of order but within window.
201        w.accept(1, 5).unwrap();
202        w.accept(1, 3).unwrap();
203        w.accept(1, 1).unwrap();
204        w.accept(1, 2).unwrap();
205        w.accept(1, 4).unwrap();
206        assert_eq!(w.highest(1), 5);
207    }
208
209    #[test]
210    fn window_rejects_replay_within_window() {
211        let w = PeerSeqWindow::new();
212        w.accept(1, 5).unwrap();
213        w.accept(1, 3).unwrap();
214        let err = w.accept(1, 3).unwrap_err();
215        assert!(err.to_string().contains("replayed"));
216    }
217
218    #[test]
219    fn window_rejects_stale_outside_window() {
220        let w = PeerSeqWindow::new();
221        w.accept(1, 100).unwrap();
222        // Window is [37, 100]. seq=36 is stale.
223        let err = w.accept(1, 36).unwrap_err();
224        assert!(err.to_string().contains("stale sequence 36"));
225        // seq=37 is inside the window edge and acceptable.
226        w.accept(1, 37).unwrap();
227    }
228
229    #[test]
230    fn window_advances_beyond_window_clears_mask() {
231        let w = PeerSeqWindow::new();
232        w.accept(1, 1).unwrap();
233        w.accept(1, 2).unwrap();
234        w.accept(1, 100).unwrap();
235        // Sequences 1, 2 are now outside the window anchored at 100 and
236        // must be rejected on replay (not accepted as fresh within mask).
237        let err = w.accept(1, 1).unwrap_err();
238        assert!(err.to_string().contains("stale sequence 1"));
239    }
240
241    #[test]
242    fn windows_are_independent_per_peer() {
243        let w = PeerSeqWindow::new();
244        w.accept(1, 10).unwrap();
245        w.accept(2, 10).unwrap();
246        w.accept(1, 9).unwrap();
247        w.accept(2, 9).unwrap();
248        // Independent — neither is a replay.
249        assert_eq!(w.highest(1), 10);
250        assert_eq!(w.highest(2), 10);
251    }
252}