nodedb_cluster/rpc_codec/
peer_seq.rs1use std::collections::HashMap;
29use std::sync::RwLock;
30use std::sync::atomic::{AtomicU64, Ordering};
31
32use crate::error::{ClusterError, Result};
33
34pub const REPLAY_WINDOW: u64 = 64;
36
37#[derive(Default, Debug)]
45pub struct PeerSeqSender {
46 counter: AtomicU64,
47}
48
49impl PeerSeqSender {
50 pub fn new() -> Self {
51 Self::default()
52 }
53
54 pub fn next(&self) -> u64 {
57 self.counter.fetch_add(1, Ordering::Relaxed) + 1
58 }
59
60 #[cfg(test)]
62 pub fn peek(&self) -> u64 {
63 self.counter.load(Ordering::Relaxed)
64 }
65}
66
67#[derive(Default, Debug)]
70pub struct PeerSeqWindow {
71 windows: RwLock<HashMap<u64, WindowState>>,
72}
73
74#[derive(Default, Debug, Clone, Copy)]
76struct WindowState {
77 high: u64,
79 mask: u64,
82}
83
84impl PeerSeqWindow {
85 pub fn new() -> Self {
86 Self::default()
87 }
88
89 pub fn accept(&self, peer_id: u64, seq: u64) -> Result<()> {
95 if seq == 0 {
96 return Err(ClusterError::Codec {
97 detail: format!("peer {peer_id} sent reserved sequence 0"),
98 });
99 }
100
101 let mut guard = self.windows.write().unwrap_or_else(|p| p.into_inner());
102 let state = guard.entry(peer_id).or_default();
103
104 if seq > state.high {
105 let delta = seq - state.high;
107 state.mask = if delta >= REPLAY_WINDOW {
108 1
109 } else {
110 (state.mask << delta) | 1
111 };
112 state.high = seq;
113 return Ok(());
114 }
115
116 let offset = state.high - seq;
118 if offset >= REPLAY_WINDOW {
119 return Err(ClusterError::Codec {
120 detail: format!(
121 "peer {peer_id} sent stale sequence {seq}, window high is {}",
122 state.high
123 ),
124 });
125 }
126 let bit = 1u64 << offset;
127 if state.mask & bit != 0 {
128 return Err(ClusterError::Codec {
129 detail: format!(
130 "peer {peer_id} replayed sequence {seq} (window high {})",
131 state.high
132 ),
133 });
134 }
135 state.mask |= bit;
136 Ok(())
137 }
138
139 #[cfg(test)]
140 pub fn highest(&self, peer_id: u64) -> u64 {
141 let guard = self.windows.read().unwrap_or_else(|p| p.into_inner());
142 guard.get(&peer_id).map(|w| w.high).unwrap_or(0)
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149
150 #[test]
151 fn outbound_counter_starts_at_one() {
152 let s = PeerSeqSender::new();
153 assert_eq!(s.next(), 1);
154 assert_eq!(s.next(), 2);
155 assert_eq!(s.next(), 3);
156 }
157
158 #[test]
159 fn outbound_counter_is_single_across_all_targets() {
160 let s = PeerSeqSender::new();
167 assert_eq!(s.next(), 1);
168 assert_eq!(s.next(), 2);
169 assert_eq!(s.next(), 3);
170 assert_eq!(s.next(), 4);
171 }
172
173 #[test]
174 fn window_accepts_monotonic_sequence() {
175 let w = PeerSeqWindow::new();
176 for seq in 1..=10 {
177 w.accept(7, seq).unwrap();
178 }
179 assert_eq!(w.highest(7), 10);
180 }
181
182 #[test]
183 fn window_rejects_immediate_replay() {
184 let w = PeerSeqWindow::new();
185 w.accept(1, 1).unwrap();
186 let err = w.accept(1, 1).unwrap_err();
187 assert!(err.to_string().contains("replayed"));
188 }
189
190 #[test]
191 fn window_rejects_zero_sequence() {
192 let w = PeerSeqWindow::new();
193 let err = w.accept(1, 0).unwrap_err();
194 assert!(err.to_string().contains("reserved sequence 0"));
195 }
196
197 #[test]
198 fn window_accepts_in_order_gap_within_window() {
199 let w = PeerSeqWindow::new();
200 w.accept(1, 5).unwrap();
202 w.accept(1, 3).unwrap();
203 w.accept(1, 1).unwrap();
204 w.accept(1, 2).unwrap();
205 w.accept(1, 4).unwrap();
206 assert_eq!(w.highest(1), 5);
207 }
208
209 #[test]
210 fn window_rejects_replay_within_window() {
211 let w = PeerSeqWindow::new();
212 w.accept(1, 5).unwrap();
213 w.accept(1, 3).unwrap();
214 let err = w.accept(1, 3).unwrap_err();
215 assert!(err.to_string().contains("replayed"));
216 }
217
218 #[test]
219 fn window_rejects_stale_outside_window() {
220 let w = PeerSeqWindow::new();
221 w.accept(1, 100).unwrap();
222 let err = w.accept(1, 36).unwrap_err();
224 assert!(err.to_string().contains("stale sequence 36"));
225 w.accept(1, 37).unwrap();
227 }
228
229 #[test]
230 fn window_advances_beyond_window_clears_mask() {
231 let w = PeerSeqWindow::new();
232 w.accept(1, 1).unwrap();
233 w.accept(1, 2).unwrap();
234 w.accept(1, 100).unwrap();
235 let err = w.accept(1, 1).unwrap_err();
238 assert!(err.to_string().contains("stale sequence 1"));
239 }
240
241 #[test]
242 fn windows_are_independent_per_peer() {
243 let w = PeerSeqWindow::new();
244 w.accept(1, 10).unwrap();
245 w.accept(2, 10).unwrap();
246 w.accept(1, 9).unwrap();
247 w.accept(2, 9).unwrap();
248 assert_eq!(w.highest(1), 10);
250 assert_eq!(w.highest(2), 10);
251 }
252}