Skip to main content

shm_primitives/
peer.rs

1use crate::sync::{AtomicU32, AtomicU64, Ordering};
2
3/// States a peer table slot can be in.
4///
5/// r[impl shm.peer-table.states]
6#[repr(u32)]
7#[derive(Debug, Clone, Copy, PartialEq, Eq)]
8pub enum PeerState {
9    /// Slot available for a new guest.
10    Empty = 0,
11    /// Guest is active.
12    Attached = 1,
13    /// Guest is shutting down or has crashed.
14    Goodbye = 2,
15    /// Host has reserved this slot; guest not yet attached.
16    Reserved = 3,
17}
18
19impl PeerState {
20    #[inline]
21    pub fn from_u32(v: u32) -> Option<Self> {
22        match v {
23            0 => Some(PeerState::Empty),
24            1 => Some(PeerState::Attached),
25            2 => Some(PeerState::Goodbye),
26            3 => Some(PeerState::Reserved),
27            _ => None,
28        }
29    }
30}
31
32/// One 64-byte entry in the peer table.
33///
34/// r[impl shm.peer-table]
35#[repr(C)]
36pub struct PeerEntry {
37    /// Current peer state (Empty / Attached / Goodbye / Reserved).
38    pub state: AtomicU32,
39    /// Incremented on each attach; used as an ABA counter for crash recovery.
40    pub epoch: AtomicU32,
41    /// Monotonic clock reading (nanoseconds) of the last heartbeat.
42    pub last_heartbeat: AtomicU64,
43    /// Byte offset from the start of the segment to this guest's BipBuffer pair.
44    pub ring_offset: u64,
45    pub _reserved: [u8; 40],
46}
47
48#[cfg(not(loom))]
49const _: () = assert!(core::mem::size_of::<PeerEntry>() == 64);
50
51impl PeerEntry {
52    /// Write initial values for a new peer entry.
53    ///
54    /// # Safety
55    ///
56    /// `self` must point into exclusively-owned, zeroed memory.
57    pub unsafe fn init(&mut self, ring_offset: u64) {
58        self.state = AtomicU32::new(PeerState::Empty as u32);
59        self.epoch = AtomicU32::new(0);
60        self.last_heartbeat = AtomicU64::new(0);
61        self.ring_offset = ring_offset;
62        self._reserved = [0u8; 40];
63    }
64
65    /// Read the current peer state.
66    #[inline]
67    pub fn state(&self) -> PeerState {
68        PeerState::from_u32(self.state.load(Ordering::Acquire)).unwrap_or(PeerState::Empty)
69    }
70
71    /// Read the current epoch.
72    #[inline]
73    pub fn epoch(&self) -> u32 {
74        self.epoch.load(Ordering::Acquire)
75    }
76
77    /// Attempt to attach: CAS `Empty → Attached`, increment epoch.
78    ///
79    /// Returns `Ok(new_epoch)` on success, `Err(actual)` if the slot is not Empty.
80    pub fn try_attach(&self) -> Result<u32, PeerState> {
81        match self.state.compare_exchange(
82            PeerState::Empty as u32,
83            PeerState::Attached as u32,
84            Ordering::AcqRel,
85            Ordering::Acquire,
86        ) {
87            Ok(_) => Ok(self.epoch.fetch_add(1, Ordering::AcqRel).wrapping_add(1)),
88            Err(actual) => Err(PeerState::from_u32(actual).unwrap_or(PeerState::Empty)),
89        }
90    }
91
92    /// Reserve this slot before spawning a guest: CAS `Empty → Reserved`, increment epoch.
93    ///
94    /// Returns `Ok(new_epoch)` on success, `Err(actual)` if the slot is not Empty.
95    pub fn try_reserve(&self) -> Result<u32, PeerState> {
96        match self.state.compare_exchange(
97            PeerState::Empty as u32,
98            PeerState::Reserved as u32,
99            Ordering::AcqRel,
100            Ordering::Acquire,
101        ) {
102            Ok(_) => Ok(self.epoch.fetch_add(1, Ordering::AcqRel).wrapping_add(1)),
103            Err(actual) => Err(PeerState::from_u32(actual).unwrap_or(PeerState::Empty)),
104        }
105    }
106
107    /// Claim a reserved slot: CAS `Reserved → Attached`.
108    ///
109    /// Called by a spawned guest to complete the attach handshake.
110    /// Returns `Ok(())` on success, `Err(actual)` if the slot is not Reserved.
111    pub fn try_claim_reserved(&self) -> Result<(), PeerState> {
112        match self.state.compare_exchange(
113            PeerState::Reserved as u32,
114            PeerState::Attached as u32,
115            Ordering::AcqRel,
116            Ordering::Acquire,
117        ) {
118            Ok(_) => Ok(()),
119            Err(actual) => Err(PeerState::from_u32(actual).unwrap_or(PeerState::Empty)),
120        }
121    }
122
123    /// Release a reserved slot back to Empty (called by host if spawn fails).
124    pub fn release_reserved(&self) {
125        let _ = self.state.compare_exchange(
126            PeerState::Reserved as u32,
127            PeerState::Empty as u32,
128            Ordering::AcqRel,
129            Ordering::Acquire,
130        );
131    }
132
133    /// Mark this slot as Goodbye (orderly detach or crash detected by host).
134    #[inline]
135    pub fn set_goodbye(&self) {
136        self.state
137            .store(PeerState::Goodbye as u32, Ordering::Release);
138    }
139
140    /// Reset to Empty (crash recovery — called by host after reclaiming slots).
141    #[inline]
142    pub fn reset(&self) {
143        self.last_heartbeat.store(0, Ordering::Release);
144        self.state.store(PeerState::Empty as u32, Ordering::Release);
145    }
146
147    /// Write the current heartbeat timestamp.
148    ///
149    /// `timestamp_ns` must be a monotonic clock reading in nanoseconds.
150    ///
151    /// r[impl shm.crash.heartbeat-clock]
152    #[inline]
153    pub fn update_heartbeat(&self, timestamp_ns: u64) {
154        self.last_heartbeat.store(timestamp_ns, Ordering::Release);
155    }
156
157    /// Read the last recorded heartbeat timestamp.
158    #[inline]
159    pub fn last_heartbeat(&self) -> u64 {
160        self.last_heartbeat.load(Ordering::Acquire)
161    }
162
163    /// Return true if the heartbeat is stale (peer likely crashed).
164    ///
165    /// A heartbeat is stale when `current_time_ns - last_heartbeat > 2 * interval_ns`.
166    /// Always returns false when `interval_ns == 0` (heartbeats disabled).
167    #[inline]
168    pub fn is_heartbeat_stale(&self, current_time_ns: u64, interval_ns: u64) -> bool {
169        if interval_ns == 0 {
170            return false;
171        }
172        current_time_ns.saturating_sub(self.last_heartbeat()) > 2 * interval_ns
173    }
174}
175
176// ── PeerId ────────────────────────────────────────────────────────────────────
177
178/// A peer ID in the range 1–255 (host is 0, guests are 1–255).
179///
180/// r[impl shm.topology.peer-id]
181#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
182pub struct PeerId(u8);
183
184impl PeerId {
185    /// Construct from a 0-based table index.  Returns `None` if index ≥ 255.
186    #[inline]
187    pub fn from_index(index: u8) -> Option<Self> {
188        if index < 255 {
189            Some(Self(index + 1))
190        } else {
191            None
192        }
193    }
194
195    /// Construct from the raw peer ID value (1–255).  Returns `None` if 0.
196    #[inline]
197    pub fn new(value: u8) -> Option<Self> {
198        if value >= 1 { Some(Self(value)) } else { None }
199    }
200
201    /// Raw peer ID value (1–255).
202    #[inline]
203    pub fn get(self) -> u8 {
204        self.0
205    }
206
207    /// 0-based index into the peer table.
208    #[inline]
209    pub fn index(self) -> u8 {
210        self.0 - 1
211    }
212}
213
214// ── tests ─────────────────────────────────────────────────────────────────────
215
216#[cfg(all(test, not(loom)))]
217mod tests {
218    use super::*;
219
220    fn make_entry(ring_offset: u64) -> PeerEntry {
221        // Safety: zeroed stack value is valid for init.
222        let mut entry: PeerEntry = unsafe { core::mem::zeroed() };
223        unsafe { entry.init(ring_offset) };
224        entry
225    }
226
227    #[test]
228    fn initial_state_is_empty() {
229        let e = make_entry(0);
230        assert_eq!(e.state(), PeerState::Empty);
231        assert_eq!(e.epoch(), 0);
232    }
233
234    #[test]
235    fn try_attach_transitions_and_bumps_epoch() {
236        let e = make_entry(0);
237        let epoch = e.try_attach().unwrap();
238        assert_eq!(epoch, 1);
239        assert_eq!(e.state(), PeerState::Attached);
240    }
241
242    #[test]
243    fn try_attach_fails_when_not_empty() {
244        let e = make_entry(0);
245        e.try_attach().unwrap();
246        assert!(e.try_attach().is_err());
247    }
248
249    #[test]
250    fn reserve_then_claim() {
251        let e = make_entry(0);
252        let epoch = e.try_reserve().unwrap();
253        assert_eq!(epoch, 1);
254        assert_eq!(e.state(), PeerState::Reserved);
255        e.try_claim_reserved().unwrap();
256        assert_eq!(e.state(), PeerState::Attached);
257        // epoch does not increment on claim
258        assert_eq!(e.epoch(), 1);
259    }
260
261    #[test]
262    fn release_reserved_returns_to_empty() {
263        let e = make_entry(0);
264        e.try_reserve().unwrap();
265        e.release_reserved();
266        assert_eq!(e.state(), PeerState::Empty);
267    }
268
269    #[test]
270    fn set_goodbye() {
271        let e = make_entry(0);
272        e.try_attach().unwrap();
273        e.set_goodbye();
274        assert_eq!(e.state(), PeerState::Goodbye);
275    }
276
277    #[test]
278    fn reset_clears_to_empty() {
279        let e = make_entry(0);
280        e.try_attach().unwrap();
281        e.update_heartbeat(999_999);
282        e.reset();
283        assert_eq!(e.state(), PeerState::Empty);
284        assert_eq!(e.last_heartbeat(), 0);
285        // epoch is not reset — it keeps incrementing across crashes
286        assert_eq!(e.epoch(), 1);
287    }
288
289    #[test]
290    fn heartbeat_stale_detection() {
291        let e = make_entry(0);
292        let interval = 1_000_000_000u64; // 1 s
293        e.update_heartbeat(0);
294        // 2.5 s elapsed → stale
295        assert!(e.is_heartbeat_stale(2_500_000_000, interval));
296        // 1.9 s elapsed → not stale
297        assert!(!e.is_heartbeat_stale(1_900_000_000, interval));
298    }
299
300    #[test]
301    fn heartbeat_disabled_never_stale() {
302        let e = make_entry(0);
303        e.update_heartbeat(0);
304        assert!(!e.is_heartbeat_stale(u64::MAX, 0));
305    }
306
307    #[test]
308    fn peer_id_roundtrip() {
309        let id = PeerId::from_index(0).unwrap();
310        assert_eq!(id.get(), 1);
311        assert_eq!(id.index(), 0);
312
313        let id = PeerId::new(5).unwrap();
314        assert_eq!(id.get(), 5);
315        assert_eq!(id.index(), 4);
316    }
317
318    #[test]
319    fn peer_id_bounds() {
320        assert!(PeerId::from_index(254).is_some()); // max index → peer 255
321        assert!(PeerId::from_index(255).is_none()); // overflow
322        assert!(PeerId::new(0).is_none()); // host id not valid
323    }
324}