nodedb_cluster/rpc_codec/
peer_seq.rs1use std::collections::HashMap;
27use std::sync::RwLock;
28use std::sync::atomic::{AtomicU64, Ordering};
29
30use crate::error::{ClusterError, Result};
31
32pub const REPLAY_WINDOW: u64 = 64;
34
35#[derive(Default, Debug)]
43pub struct PeerSeqSender {
44 counter: AtomicU64,
45}
46
47impl PeerSeqSender {
48 pub fn new() -> Self {
49 Self::default()
50 }
51
52 pub fn next(&self) -> u64 {
55 self.counter.fetch_add(1, Ordering::Relaxed) + 1
56 }
57
58 #[cfg(test)]
60 pub fn peek(&self) -> u64 {
61 self.counter.load(Ordering::Relaxed)
62 }
63}
64
65#[derive(Default, Debug)]
68pub struct PeerSeqWindow {
69 windows: RwLock<HashMap<u64, WindowState>>,
70}
71
72#[derive(Default, Debug, Clone, Copy)]
74struct WindowState {
75 high: u64,
77 mask: u64,
80}
81
82impl PeerSeqWindow {
83 pub fn new() -> Self {
84 Self::default()
85 }
86
87 pub fn accept(&self, peer_id: u64, seq: u64) -> Result<()> {
93 if seq == 0 {
94 return Err(ClusterError::Codec {
95 detail: format!("peer {peer_id} sent reserved sequence 0"),
96 });
97 }
98
99 let mut guard = self.windows.write().unwrap_or_else(|p| p.into_inner());
100 let state = guard.entry(peer_id).or_default();
101
102 if seq > state.high {
103 let delta = seq - state.high;
105 state.mask = if delta >= REPLAY_WINDOW {
106 1
107 } else {
108 (state.mask << delta) | 1
109 };
110 state.high = seq;
111 return Ok(());
112 }
113
114 let offset = state.high - seq;
116 if offset >= REPLAY_WINDOW {
117 return Err(ClusterError::Codec {
118 detail: format!(
119 "peer {peer_id} sent stale sequence {seq}, window high is {}",
120 state.high
121 ),
122 });
123 }
124 let bit = 1u64 << offset;
125 if state.mask & bit != 0 {
126 return Err(ClusterError::Codec {
127 detail: format!(
128 "peer {peer_id} replayed sequence {seq} (window high {})",
129 state.high
130 ),
131 });
132 }
133 state.mask |= bit;
134 Ok(())
135 }
136
137 #[cfg(test)]
138 pub fn highest(&self, peer_id: u64) -> u64 {
139 let guard = self.windows.read().unwrap_or_else(|p| p.into_inner());
140 guard.get(&peer_id).map(|w| w.high).unwrap_or(0)
141 }
142}
143
144#[cfg(test)]
145mod tests {
146 use super::*;
147
148 #[test]
149 fn outbound_counter_starts_at_one() {
150 let s = PeerSeqSender::new();
151 assert_eq!(s.next(), 1);
152 assert_eq!(s.next(), 2);
153 assert_eq!(s.next(), 3);
154 }
155
156 #[test]
157 fn outbound_counter_is_single_across_all_targets() {
158 let s = PeerSeqSender::new();
165 assert_eq!(s.next(), 1);
166 assert_eq!(s.next(), 2);
167 assert_eq!(s.next(), 3);
168 assert_eq!(s.next(), 4);
169 }
170
171 #[test]
172 fn window_accepts_monotonic_sequence() {
173 let w = PeerSeqWindow::new();
174 for seq in 1..=10 {
175 w.accept(7, seq).unwrap();
176 }
177 assert_eq!(w.highest(7), 10);
178 }
179
180 #[test]
181 fn window_rejects_immediate_replay() {
182 let w = PeerSeqWindow::new();
183 w.accept(1, 1).unwrap();
184 let err = w.accept(1, 1).unwrap_err();
185 assert!(err.to_string().contains("replayed"));
186 }
187
188 #[test]
189 fn window_rejects_zero_sequence() {
190 let w = PeerSeqWindow::new();
191 let err = w.accept(1, 0).unwrap_err();
192 assert!(err.to_string().contains("reserved sequence 0"));
193 }
194
195 #[test]
196 fn window_accepts_in_order_gap_within_window() {
197 let w = PeerSeqWindow::new();
198 w.accept(1, 5).unwrap();
200 w.accept(1, 3).unwrap();
201 w.accept(1, 1).unwrap();
202 w.accept(1, 2).unwrap();
203 w.accept(1, 4).unwrap();
204 assert_eq!(w.highest(1), 5);
205 }
206
207 #[test]
208 fn window_rejects_replay_within_window() {
209 let w = PeerSeqWindow::new();
210 w.accept(1, 5).unwrap();
211 w.accept(1, 3).unwrap();
212 let err = w.accept(1, 3).unwrap_err();
213 assert!(err.to_string().contains("replayed"));
214 }
215
216 #[test]
217 fn window_rejects_stale_outside_window() {
218 let w = PeerSeqWindow::new();
219 w.accept(1, 100).unwrap();
220 let err = w.accept(1, 36).unwrap_err();
222 assert!(err.to_string().contains("stale sequence 36"));
223 w.accept(1, 37).unwrap();
225 }
226
227 #[test]
228 fn window_advances_beyond_window_clears_mask() {
229 let w = PeerSeqWindow::new();
230 w.accept(1, 1).unwrap();
231 w.accept(1, 2).unwrap();
232 w.accept(1, 100).unwrap();
233 let err = w.accept(1, 1).unwrap_err();
236 assert!(err.to_string().contains("stale sequence 1"));
237 }
238
239 #[test]
240 fn windows_are_independent_per_peer() {
241 let w = PeerSeqWindow::new();
242 w.accept(1, 10).unwrap();
243 w.accept(2, 10).unwrap();
244 w.accept(1, 9).unwrap();
245 w.accept(2, 9).unwrap();
246 assert_eq!(w.highest(1), 10);
248 assert_eq!(w.highest(2), 10);
249 }
250}