Skip to main content

orbit_rs/ring/
mod.rs

1//! Ring buffers — orbit-rs's runtime substrate.
2//!
3//! > *"orbit runtime yani ring"* — the place where the fleet's
4//! > shared state actually lives at the lowest level. Higher-level
5//! > shapes (`OrbitCache`, future metrics/event substrates, etc.)
6//! > reduce to *one or more rings*.
7//!
8//! ## Shape
9//!
10//! One [`Ring`] per [`OrbitTyped`] kind. A ring is a fixed-size
11//! circular log of [`Frame`]s. Writers append (atomic
12//! `fetch_add` on the head); readers walk the log by counter
13//! position. When `head` overflows the capacity, the oldest slot
14//! is overwritten — readers that fell behind see the loss as a
15//! "missing counter".
16//!
17//! The frame layout mirrors the `nwd1` seed (see VISION §13):
18//!
19//! ```text
20//! ┌──────────┬──────┬──────┬─────────────┐
21//! │ id (8)   │ kind │ ver  │ payload (N) │
22//! │ NetId64  │  u8  │ u64  │   bytes     │
23//! └──────────┴──────┴──────┴─────────────┘
24//! ```
25//!
26//! Two `kind` bytes coexist on the wire and they mean different
27//! things (intentional, two-axis encoding):
28//!
29//! - `frame.id.kind()` — *which Rust type* (the data shape).
30//! - `frame.kind`      — *which message class* (state / event /
31//!   command / ack / invalidate / …). V0 leaves this open at `0`;
32//!   concrete classes appear when subscriber semantics arrive.
33//!
34//! ## V0 backing
35//!
36//! `RwLock<Option<Frame>>` per slot — simple, correct, slow. V1
37//! swaps the slot storage for a SHM-backed lock-free layout with
38//! per-slot sequence numbers (publishing involves a write to the
39//! seq, then the body, then the seq again, like LMAX Disruptor).
40//! The [`Ring`] public API is V1-stable; this file is the only
41//! place that changes.
42//!
43//! ## Who mints
44//!
45//! Writes go through [`Ring::write`], which mints the [`NetId64`]
46//! atomically with the slot reservation (the COUNTER part *is*
47//! the slot's write position). NetId64s are therefore minted
48//! **server-side, by the writer process**. Browsers / external
49//! clients receive ids; they do not generate them.
50
51use std::sync::atomic::{AtomicU64, Ordering};
52use std::sync::{Arc, RwLock};
53
54use bytes::Bytes;
55
56use crate::NodeId;
57use crate::id::NetId64;
58use crate::typed::OrbitTyped;
59
60pub mod cursor;
61#[cfg(unix)]
62pub mod shm;
63
64/// One record in a ring — the on-wire shape (mirrors `nwd1::Frame`).
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct Frame {
67    pub id: NetId64,
68    pub kind: u8,
69    pub ver: u64,
70    pub payload: Bytes,
71}
72
73/// A fixed-capacity, fleet-wide append-only log keyed on KIND byte.
74///
75/// V0 is single-process; V1 is SHM-backed. The API is the same.
76pub struct Ring {
77    /// The KIND this ring carries — equals `T::KIND` for the
78    /// `OrbitTyped` value-shape it's storing.
79    kind: u8,
80    /// Number of slots; constant for the ring's lifetime.
81    capacity: usize,
82    /// Monotonic write counter. Always increases; slot index is
83    /// `(write_pos - 1) % capacity` after a `fetch_add`.
84    write_pos: AtomicU64,
85    /// Slot storage. Each slot is `None` until the first write
86    /// lands; afterwards holds the most recent frame to occupy
87    /// that position. Wraparound silently overwrites.
88    slots: Vec<RwLock<Option<Frame>>>,
89}
90
91impl Ring {
92    /// Create a ring for type `T` with `capacity` slots.
93    pub fn new<T: OrbitTyped>(capacity: usize) -> Self {
94        assert!(capacity > 0, "Ring capacity must be > 0");
95        let mut slots = Vec::with_capacity(capacity);
96        for _ in 0..capacity {
97            slots.push(RwLock::new(None));
98        }
99        Self {
100            kind: T::KIND,
101            capacity,
102            write_pos: AtomicU64::new(0),
103            slots,
104        }
105    }
106
107    /// The KIND byte this ring carries (equals `T::KIND`).
108    pub fn kind(&self) -> u8 {
109        self.kind
110    }
111
112    /// Total slot count — fixed at construction.
113    pub fn capacity(&self) -> usize {
114        self.capacity
115    }
116
117    /// Monotonic head — number of writes ever performed. Slot index
118    /// of the most recent write is `(head() - 1) % capacity` once
119    /// `head() > 0`.
120    pub fn head(&self) -> u64 {
121        self.write_pos.load(Ordering::Acquire)
122    }
123
124    /// Append a frame. Atomically reserves the next counter, mints
125    /// the [`NetId64`], and writes the frame into the corresponding
126    /// slot. Returns the minted id.
127    ///
128    /// `frame_kind` is the message class byte (V0: pass `0`).
129    /// `ver` is the version / tick at write time (V0: caller's
130    /// choice).
131    pub fn write(&self, node_id: NodeId, frame_kind: u8, ver: u64, payload: Bytes) -> NetId64 {
132        let counter = self.write_pos.fetch_add(1, Ordering::AcqRel);
133        let id = NetId64::make(self.kind, node_id.get(), counter);
134        let slot_idx = (counter as usize) % self.capacity;
135        let frame = Frame {
136            id,
137            kind: frame_kind,
138            ver,
139            payload,
140        };
141        let mut guard = self.slots[slot_idx].write().expect("ring slot poisoned");
142        *guard = Some(frame);
143        id
144    }
145
146    /// Read the slot that the given [`NetId64`]'s counter points at.
147    ///
148    /// Returns:
149    /// - `Some(frame)` if the slot's stored id matches the queried id
150    ///   exactly (the slot has not been overwritten by a later writer).
151    /// - `None` if the slot is empty, has wrapped past, or holds a
152    ///   different id than the one asked for.
153    pub fn read(&self, id: NetId64) -> Option<Frame> {
154        if id.kind() != self.kind {
155            return None;
156        }
157        let slot_idx = (id.counter() as usize) % self.capacity;
158        let guard = self.slots[slot_idx].read().expect("ring slot poisoned");
159        match &*guard {
160            Some(f) if f.id == id => Some(f.clone()),
161            _ => None,
162        }
163    }
164
165    /// Read the most recent frame, regardless of who wrote it.
166    /// Useful for "what's the current state?" — ignores
167    /// counter-by-counter walking.
168    pub fn read_head(&self) -> Option<Frame> {
169        let head = self.head();
170        if head == 0 {
171            return None;
172        }
173        let slot_idx = ((head - 1) as usize) % self.capacity;
174        self.slots[slot_idx]
175            .read()
176            .expect("ring slot poisoned")
177            .clone()
178    }
179
180    /// Read whatever frame currently occupies the slot at
181    /// `counter % capacity`, regardless of which counter is
182    /// stored in it. Used by walking readers that need slot-by-slot
183    /// access without knowing the writer's `NetId64` ahead of time.
184    ///
185    /// Returns `None` if the slot is empty.
186    pub fn read_at(&self, counter: u64) -> Option<Frame> {
187        let slot_idx = (counter as usize) % self.capacity;
188        self.slots[slot_idx]
189            .read()
190            .expect("ring slot poisoned")
191            .clone()
192    }
193
194    /// Clear all slots and reset the write head to zero.
195    ///
196    /// Intended for owner-controlled boot-time cleanup. Do not call
197    /// while other threads are publishing to this ring.
198    pub fn reset(&self) {
199        for slot in &self.slots {
200            *slot.write().expect("ring slot poisoned") = None;
201        }
202        self.write_pos.store(0, Ordering::Release);
203    }
204}
205
206impl cursor::RingFrameSource for Ring {
207    fn kind(&self) -> u8 {
208        Ring::kind(self)
209    }
210
211    fn head(&self) -> u64 {
212        Ring::head(self)
213    }
214
215    fn capacity(&self) -> usize {
216        Ring::capacity(self)
217    }
218
219    fn read_at(&self, counter: u64) -> Option<Frame> {
220        Ring::read_at(self, counter)
221    }
222}
223
224#[cfg(unix)]
225impl cursor::RingFrameSource for shm::ShmRing {
226    fn kind(&self) -> u8 {
227        shm::ShmRing::kind(self)
228    }
229
230    fn head(&self) -> u64 {
231        shm::ShmRing::head(self)
232    }
233
234    fn capacity(&self) -> usize {
235        shm::ShmRing::capacity(self)
236    }
237
238    fn read_at(&self, counter: u64) -> Option<Frame> {
239        shm::ShmRing::read_at(self, counter)
240    }
241}
242
243impl std::fmt::Debug for Ring {
244    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245        f.debug_struct("Ring")
246            .field("kind", &self.kind)
247            .field("capacity", &self.capacity)
248            .field("head", &self.head())
249            .finish()
250    }
251}
252
253/// Type-keyed registry of rings. A `Fleet` holds one of these and
254/// hands out `Arc<Ring>` per `OrbitTyped` kind on demand.
255pub(crate) struct RingRegistry {
256    rings: dashmap::DashMap<u8, Arc<Ring>>,
257}
258
259impl RingRegistry {
260    pub fn new() -> Self {
261        Self {
262            rings: dashmap::DashMap::new(),
263        }
264    }
265
266    /// Get-or-create the ring for `T`. The first caller for a given
267    /// KIND determines the ring's capacity.
268    pub fn get_or_create<T: OrbitTyped>(&self, capacity: usize) -> Arc<Ring> {
269        self.rings
270            .entry(T::KIND)
271            .or_insert_with(|| Arc::new(Ring::new::<T>(capacity)))
272            .clone()
273    }
274
275    /// Look up a ring by KIND byte (e.g. when only the id is known).
276    pub fn lookup(&self, kind: u8) -> Option<Arc<Ring>> {
277        self.rings.get(&kind).map(|e| e.clone())
278    }
279}