orbit_rs/ring/mod.rs
1//! Ring buffers — orbit-rs's runtime substrate.
2//!
3//! > *"orbit runtime yani ring"* — the place where the fleet's
4//! > shared state actually lives at the lowest level. Higher-level
5//! > shapes (`OrbitCache`, future metrics/event substrates, etc.)
6//! > reduce to *one or more rings*.
7//!
8//! ## Shape
9//!
10//! One [`Ring`] per [`OrbitTyped`] kind. A ring is a fixed-size
11//! circular log of [`Frame`]s. Writers append (atomic
12//! `fetch_add` on the head); readers walk the log by counter
13//! position. When `head` overflows the capacity, the oldest slot
14//! is overwritten — readers that fell behind see the loss as a
15//! "missing counter".
16//!
17//! The frame layout mirrors the `nwd1` seed (see VISION §13):
18//!
19//! ```text
20//! ┌──────────┬──────┬──────┬─────────────┐
21//! │ id (8) │ kind │ ver │ payload (N) │
22//! │ NetId64 │ u8 │ u64 │ bytes │
23//! └──────────┴──────┴──────┴─────────────┘
24//! ```
25//!
26//! Two `kind` bytes coexist on the wire and they mean different
27//! things (intentional, two-axis encoding):
28//!
29//! - `frame.id.kind()` — *which Rust type* (the data shape).
30//! - `frame.kind` — *which message class* (state / event /
31//! command / ack / invalidate / …). V0 leaves this open at `0`;
32//! concrete classes appear when subscriber semantics arrive.
33//!
34//! ## V0 backing
35//!
36//! `RwLock<Option<Frame>>` per slot — simple, correct, slow. V1
37//! swaps the slot storage for a SHM-backed lock-free layout with
38//! per-slot sequence numbers (publishing involves a write to the
39//! seq, then the body, then the seq again, like LMAX Disruptor).
40//! The [`Ring`] public API is V1-stable; this file is the only
41//! place that changes.
42//!
43//! ## Who mints
44//!
45//! Writes go through [`Ring::write`], which mints the [`NetId64`]
46//! atomically with the slot reservation (the COUNTER part *is*
47//! the slot's write position). NetId64s are therefore minted
48//! **server-side, by the writer process**. Browsers / external
49//! clients receive ids; they do not generate them.
50
51use std::sync::atomic::{AtomicU64, Ordering};
52use std::sync::{Arc, RwLock};
53
54use bytes::Bytes;
55
56use crate::NodeId;
57use crate::id::NetId64;
58use crate::typed::OrbitTyped;
59
60pub mod cursor;
61#[cfg(unix)]
62pub mod shm;
63
64/// One record in a ring — the on-wire shape (mirrors `nwd1::Frame`).
65#[derive(Clone, Debug, PartialEq, Eq)]
66pub struct Frame {
67 pub id: NetId64,
68 pub kind: u8,
69 pub ver: u64,
70 pub payload: Bytes,
71}
72
73/// A fixed-capacity, fleet-wide append-only log keyed on KIND byte.
74///
75/// V0 is single-process; V1 is SHM-backed. The API is the same.
76pub struct Ring {
77 /// The KIND this ring carries — equals `T::KIND` for the
78 /// `OrbitTyped` value-shape it's storing.
79 kind: u8,
80 /// Number of slots; constant for the ring's lifetime.
81 capacity: usize,
82 /// Monotonic write counter. Always increases; slot index is
83 /// `(write_pos - 1) % capacity` after a `fetch_add`.
84 write_pos: AtomicU64,
85 /// Slot storage. Each slot is `None` until the first write
86 /// lands; afterwards holds the most recent frame to occupy
87 /// that position. Wraparound silently overwrites.
88 slots: Vec<RwLock<Option<Frame>>>,
89}
90
91impl Ring {
92 /// Create a ring for type `T` with `capacity` slots.
93 pub fn new<T: OrbitTyped>(capacity: usize) -> Self {
94 assert!(capacity > 0, "Ring capacity must be > 0");
95 let mut slots = Vec::with_capacity(capacity);
96 for _ in 0..capacity {
97 slots.push(RwLock::new(None));
98 }
99 Self {
100 kind: T::KIND,
101 capacity,
102 write_pos: AtomicU64::new(0),
103 slots,
104 }
105 }
106
107 /// The KIND byte this ring carries (equals `T::KIND`).
108 pub fn kind(&self) -> u8 {
109 self.kind
110 }
111
112 /// Total slot count — fixed at construction.
113 pub fn capacity(&self) -> usize {
114 self.capacity
115 }
116
117 /// Monotonic head — number of writes ever performed. Slot index
118 /// of the most recent write is `(head() - 1) % capacity` once
119 /// `head() > 0`.
120 pub fn head(&self) -> u64 {
121 self.write_pos.load(Ordering::Acquire)
122 }
123
124 /// Append a frame. Atomically reserves the next counter, mints
125 /// the [`NetId64`], and writes the frame into the corresponding
126 /// slot. Returns the minted id.
127 ///
128 /// `frame_kind` is the message class byte (V0: pass `0`).
129 /// `ver` is the version / tick at write time (V0: caller's
130 /// choice).
131 pub fn write(&self, node_id: NodeId, frame_kind: u8, ver: u64, payload: Bytes) -> NetId64 {
132 let counter = self.write_pos.fetch_add(1, Ordering::AcqRel);
133 let id = NetId64::make(self.kind, node_id.get(), counter);
134 let slot_idx = (counter as usize) % self.capacity;
135 let frame = Frame {
136 id,
137 kind: frame_kind,
138 ver,
139 payload,
140 };
141 let mut guard = self.slots[slot_idx].write().expect("ring slot poisoned");
142 *guard = Some(frame);
143 id
144 }
145
146 /// Read the slot that the given [`NetId64`]'s counter points at.
147 ///
148 /// Returns:
149 /// - `Some(frame)` if the slot's stored id matches the queried id
150 /// exactly (the slot has not been overwritten by a later writer).
151 /// - `None` if the slot is empty, has wrapped past, or holds a
152 /// different id than the one asked for.
153 pub fn read(&self, id: NetId64) -> Option<Frame> {
154 if id.kind() != self.kind {
155 return None;
156 }
157 let slot_idx = (id.counter() as usize) % self.capacity;
158 let guard = self.slots[slot_idx].read().expect("ring slot poisoned");
159 match &*guard {
160 Some(f) if f.id == id => Some(f.clone()),
161 _ => None,
162 }
163 }
164
165 /// Read the most recent frame, regardless of who wrote it.
166 /// Useful for "what's the current state?" — ignores
167 /// counter-by-counter walking.
168 pub fn read_head(&self) -> Option<Frame> {
169 let head = self.head();
170 if head == 0 {
171 return None;
172 }
173 let slot_idx = ((head - 1) as usize) % self.capacity;
174 self.slots[slot_idx]
175 .read()
176 .expect("ring slot poisoned")
177 .clone()
178 }
179
180 /// Read whatever frame currently occupies the slot at
181 /// `counter % capacity`, regardless of which counter is
182 /// stored in it. Used by walking readers that need slot-by-slot
183 /// access without knowing the writer's `NetId64` ahead of time.
184 ///
185 /// Returns `None` if the slot is empty.
186 pub fn read_at(&self, counter: u64) -> Option<Frame> {
187 let slot_idx = (counter as usize) % self.capacity;
188 self.slots[slot_idx]
189 .read()
190 .expect("ring slot poisoned")
191 .clone()
192 }
193
194 /// Clear all slots and reset the write head to zero.
195 ///
196 /// Intended for owner-controlled boot-time cleanup. Do not call
197 /// while other threads are publishing to this ring.
198 pub fn reset(&self) {
199 for slot in &self.slots {
200 *slot.write().expect("ring slot poisoned") = None;
201 }
202 self.write_pos.store(0, Ordering::Release);
203 }
204}
205
206impl cursor::RingFrameSource for Ring {
207 fn kind(&self) -> u8 {
208 Ring::kind(self)
209 }
210
211 fn head(&self) -> u64 {
212 Ring::head(self)
213 }
214
215 fn capacity(&self) -> usize {
216 Ring::capacity(self)
217 }
218
219 fn read_at(&self, counter: u64) -> Option<Frame> {
220 Ring::read_at(self, counter)
221 }
222}
223
224#[cfg(unix)]
225impl cursor::RingFrameSource for shm::ShmRing {
226 fn kind(&self) -> u8 {
227 shm::ShmRing::kind(self)
228 }
229
230 fn head(&self) -> u64 {
231 shm::ShmRing::head(self)
232 }
233
234 fn capacity(&self) -> usize {
235 shm::ShmRing::capacity(self)
236 }
237
238 fn read_at(&self, counter: u64) -> Option<Frame> {
239 shm::ShmRing::read_at(self, counter)
240 }
241}
242
243impl std::fmt::Debug for Ring {
244 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
245 f.debug_struct("Ring")
246 .field("kind", &self.kind)
247 .field("capacity", &self.capacity)
248 .field("head", &self.head())
249 .finish()
250 }
251}
252
253/// Type-keyed registry of rings. A `Fleet` holds one of these and
254/// hands out `Arc<Ring>` per `OrbitTyped` kind on demand.
255pub(crate) struct RingRegistry {
256 rings: dashmap::DashMap<u8, Arc<Ring>>,
257}
258
259impl RingRegistry {
260 pub fn new() -> Self {
261 Self {
262 rings: dashmap::DashMap::new(),
263 }
264 }
265
266 /// Get-or-create the ring for `T`. The first caller for a given
267 /// KIND determines the ring's capacity.
268 pub fn get_or_create<T: OrbitTyped>(&self, capacity: usize) -> Arc<Ring> {
269 self.rings
270 .entry(T::KIND)
271 .or_insert_with(|| Arc::new(Ring::new::<T>(capacity)))
272 .clone()
273 }
274
275 /// Look up a ring by KIND byte (e.g. when only the id is known).
276 pub fn lookup(&self, kind: u8) -> Option<Arc<Ring>> {
277 self.rings.get(&kind).map(|e| e.clone())
278 }
279}