1use crate::sync::{AtomicU32, AtomicU64, Ordering};
2
3#[repr(u32)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum PeerState {
9 Empty = 0,
11 Attached = 1,
13 Goodbye = 2,
15 Reserved = 3,
17}
18
19impl PeerState {
20 #[inline]
21 pub fn from_u32(v: u32) -> Option<Self> {
22 match v {
23 0 => Some(PeerState::Empty),
24 1 => Some(PeerState::Attached),
25 2 => Some(PeerState::Goodbye),
26 3 => Some(PeerState::Reserved),
27 _ => None,
28 }
29 }
30}
31
32#[repr(C)]
36pub struct PeerEntry {
37 pub state: AtomicU32,
39 pub epoch: AtomicU32,
41 pub last_heartbeat: AtomicU64,
43 pub ring_offset: u64,
45 pub _reserved: [u8; 40],
46}
47
48#[cfg(not(loom))]
49const _: () = assert!(core::mem::size_of::<PeerEntry>() == 64);
50
51impl PeerEntry {
52 pub unsafe fn init(&mut self, ring_offset: u64) {
58 self.state = AtomicU32::new(PeerState::Empty as u32);
59 self.epoch = AtomicU32::new(0);
60 self.last_heartbeat = AtomicU64::new(0);
61 self.ring_offset = ring_offset;
62 self._reserved = [0u8; 40];
63 }
64
65 #[inline]
67 pub fn state(&self) -> PeerState {
68 PeerState::from_u32(self.state.load(Ordering::Acquire)).unwrap_or(PeerState::Empty)
69 }
70
71 #[inline]
73 pub fn epoch(&self) -> u32 {
74 self.epoch.load(Ordering::Acquire)
75 }
76
77 pub fn try_attach(&self) -> Result<u32, PeerState> {
81 match self.state.compare_exchange(
82 PeerState::Empty as u32,
83 PeerState::Attached as u32,
84 Ordering::AcqRel,
85 Ordering::Acquire,
86 ) {
87 Ok(_) => Ok(self.epoch.fetch_add(1, Ordering::AcqRel).wrapping_add(1)),
88 Err(actual) => Err(PeerState::from_u32(actual).unwrap_or(PeerState::Empty)),
89 }
90 }
91
92 pub fn try_reserve(&self) -> Result<u32, PeerState> {
96 match self.state.compare_exchange(
97 PeerState::Empty as u32,
98 PeerState::Reserved as u32,
99 Ordering::AcqRel,
100 Ordering::Acquire,
101 ) {
102 Ok(_) => Ok(self.epoch.fetch_add(1, Ordering::AcqRel).wrapping_add(1)),
103 Err(actual) => Err(PeerState::from_u32(actual).unwrap_or(PeerState::Empty)),
104 }
105 }
106
107 pub fn try_claim_reserved(&self) -> Result<(), PeerState> {
112 match self.state.compare_exchange(
113 PeerState::Reserved as u32,
114 PeerState::Attached as u32,
115 Ordering::AcqRel,
116 Ordering::Acquire,
117 ) {
118 Ok(_) => Ok(()),
119 Err(actual) => Err(PeerState::from_u32(actual).unwrap_or(PeerState::Empty)),
120 }
121 }
122
123 pub fn release_reserved(&self) {
125 let _ = self.state.compare_exchange(
126 PeerState::Reserved as u32,
127 PeerState::Empty as u32,
128 Ordering::AcqRel,
129 Ordering::Acquire,
130 );
131 }
132
133 #[inline]
135 pub fn set_goodbye(&self) {
136 self.state
137 .store(PeerState::Goodbye as u32, Ordering::Release);
138 }
139
140 #[inline]
142 pub fn reset(&self) {
143 self.last_heartbeat.store(0, Ordering::Release);
144 self.state.store(PeerState::Empty as u32, Ordering::Release);
145 }
146
147 #[inline]
153 pub fn update_heartbeat(&self, timestamp_ns: u64) {
154 self.last_heartbeat.store(timestamp_ns, Ordering::Release);
155 }
156
157 #[inline]
159 pub fn last_heartbeat(&self) -> u64 {
160 self.last_heartbeat.load(Ordering::Acquire)
161 }
162
163 #[inline]
168 pub fn is_heartbeat_stale(&self, current_time_ns: u64, interval_ns: u64) -> bool {
169 if interval_ns == 0 {
170 return false;
171 }
172 current_time_ns.saturating_sub(self.last_heartbeat()) > 2 * interval_ns
173 }
174}
175
176#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
182pub struct PeerId(u8);
183
184impl PeerId {
185 #[inline]
187 pub fn from_index(index: u8) -> Option<Self> {
188 if index < 255 {
189 Some(Self(index + 1))
190 } else {
191 None
192 }
193 }
194
195 #[inline]
197 pub fn new(value: u8) -> Option<Self> {
198 if value >= 1 { Some(Self(value)) } else { None }
199 }
200
201 #[inline]
203 pub fn get(self) -> u8 {
204 self.0
205 }
206
207 #[inline]
209 pub fn index(self) -> u8 {
210 self.0 - 1
211 }
212}
213
214#[cfg(all(test, not(loom)))]
217mod tests {
218 use super::*;
219
220 fn make_entry(ring_offset: u64) -> PeerEntry {
221 let mut entry: PeerEntry = unsafe { core::mem::zeroed() };
223 unsafe { entry.init(ring_offset) };
224 entry
225 }
226
227 #[test]
228 fn initial_state_is_empty() {
229 let e = make_entry(0);
230 assert_eq!(e.state(), PeerState::Empty);
231 assert_eq!(e.epoch(), 0);
232 }
233
234 #[test]
235 fn try_attach_transitions_and_bumps_epoch() {
236 let e = make_entry(0);
237 let epoch = e.try_attach().unwrap();
238 assert_eq!(epoch, 1);
239 assert_eq!(e.state(), PeerState::Attached);
240 }
241
242 #[test]
243 fn try_attach_fails_when_not_empty() {
244 let e = make_entry(0);
245 e.try_attach().unwrap();
246 assert!(e.try_attach().is_err());
247 }
248
249 #[test]
250 fn reserve_then_claim() {
251 let e = make_entry(0);
252 let epoch = e.try_reserve().unwrap();
253 assert_eq!(epoch, 1);
254 assert_eq!(e.state(), PeerState::Reserved);
255 e.try_claim_reserved().unwrap();
256 assert_eq!(e.state(), PeerState::Attached);
257 assert_eq!(e.epoch(), 1);
259 }
260
261 #[test]
262 fn release_reserved_returns_to_empty() {
263 let e = make_entry(0);
264 e.try_reserve().unwrap();
265 e.release_reserved();
266 assert_eq!(e.state(), PeerState::Empty);
267 }
268
269 #[test]
270 fn set_goodbye() {
271 let e = make_entry(0);
272 e.try_attach().unwrap();
273 e.set_goodbye();
274 assert_eq!(e.state(), PeerState::Goodbye);
275 }
276
277 #[test]
278 fn reset_clears_to_empty() {
279 let e = make_entry(0);
280 e.try_attach().unwrap();
281 e.update_heartbeat(999_999);
282 e.reset();
283 assert_eq!(e.state(), PeerState::Empty);
284 assert_eq!(e.last_heartbeat(), 0);
285 assert_eq!(e.epoch(), 1);
287 }
288
289 #[test]
290 fn heartbeat_stale_detection() {
291 let e = make_entry(0);
292 let interval = 1_000_000_000u64; e.update_heartbeat(0);
294 assert!(e.is_heartbeat_stale(2_500_000_000, interval));
296 assert!(!e.is_heartbeat_stale(1_900_000_000, interval));
298 }
299
300 #[test]
301 fn heartbeat_disabled_never_stale() {
302 let e = make_entry(0);
303 e.update_heartbeat(0);
304 assert!(!e.is_heartbeat_stale(u64::MAX, 0));
305 }
306
307 #[test]
308 fn peer_id_roundtrip() {
309 let id = PeerId::from_index(0).unwrap();
310 assert_eq!(id.get(), 1);
311 assert_eq!(id.index(), 0);
312
313 let id = PeerId::new(5).unwrap();
314 assert_eq!(id.get(), 5);
315 assert_eq!(id.index(), 4);
316 }
317
318 #[test]
319 fn peer_id_bounds() {
320 assert!(PeerId::from_index(254).is_some()); assert!(PeerId::from_index(255).is_none()); assert!(PeerId::new(0).is_none()); }
324}