orbit_rs/fleet/mod.rs
1//! `Fleet` — the per-process handle that represents membership in
2//! a fleet of peers.
3//!
4//! See VISION.md §6 (lifetime — virtual, network-aware) and §7.5
5//! (FleetHandle — membership layer).
6//!
7//! V0 contract: joining yields a `NodeId` and gives the holder access to
8//! type-keyed rings. A fleet can be process-local or backed by POSIX
9//! shared memory; the public API is the same either way.
10//!
11//! Three Musketeers: every member is equal. The fleet does not track
12//! a "leader". Whatever role hierarchy the embedder cares about
13//! (master vs worker, primary vs replica) lives outside this crate.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18
19use bytes::Bytes;
20use dashmap::DashMap;
21
22use crate::error::{Error, Result};
23use crate::id::NetId64;
24#[cfg(unix)]
25use crate::ring::shm::{ShmRing, ShmRingRegistry};
26use crate::ring::{Frame, Ring, RingRegistry};
27use crate::tick::OrbitEpoch;
28use crate::typed::OrbitTyped;
29
30mod cursor;
31pub mod heartbeat;
32pub use heartbeat::{
33 FLEET_HEARTBEAT_FRAME_KIND, FLEET_HEARTBEAT_RING_KIND, FleetHeartbeat, FleetHeartbeatRecord,
34 FleetHeartbeatSnapshot,
35};
36
37/// A node's slot inside the fleet — assigned at `join` time.
38///
39/// V0: assigned monotonically by a process-local counter (always
40/// `0` for a single-process test). V1+: assigned by the SHM-backed
41/// fleet header so peers see consistent values.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43#[repr(transparent)]
44pub struct NodeId(pub u16);
45
46impl NodeId {
47 pub const ZERO: Self = Self(0);
48
49 pub const fn new(value: u16) -> Self {
50 Self(value)
51 }
52
53 pub const fn get(self) -> u16 {
54 self.0
55 }
56}
57
58impl std::fmt::Display for NodeId {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "node:{}", self.0)
61 }
62}
63
64/// Per-process handle into the fleet. Cheap to clone — the inner
65/// state is `Arc`-shared.
66#[derive(Clone)]
67pub struct Fleet {
68 inner: Arc<FleetInner>,
69}
70
71struct FleetInner {
72 name: &'static str,
73 fleet_size: u8,
74 node_id: NodeId,
75 /// Per-KIND counter for `next_id` calls that don't go through a
76 /// ring (i.e. when the caller wants a fleet-unique id without
77 /// allocating a ring slot). V0: process-local atomic. V1: still
78 /// here, parallel to the ring's own write-position.
79 id_counters: DashMap<u8, Arc<AtomicU64>>,
80 /// Per-KIND ring buffers — orbit's runtime substrate. Either
81 /// in-process for unit-test / single-process use, or POSIX SHM
82 /// for real cross-process visibility (V1, master+worker fleet).
83 backing: RingBacking,
84}
85
86/// Backing storage for the fleet's ring buffers — chosen at
87/// `Fleet::join` / `Fleet::join_shm` time and frozen for the
88/// fleet's lifetime.
89enum RingBacking {
90 /// Process-local DashMap of `Ring` instances. No cross-process
91 /// visibility — peers running other processes do not see this
92 /// fleet's writes. Useful for unit tests and embedded scenarios.
93 InMemory(RingRegistry),
94 /// POSIX-SHM-backed `ShmRing` instances. Multiple processes
95 /// joining the same fleet name share the same kernel-level
96 /// memory; writes from one are visible to all immediately.
97 #[cfg(unix)]
98 Shm(ShmRingRegistry),
99}
100
101/// Default ring capacity when [`Fleet::publish`] is called for a
102/// kind that has no ring yet. Override with [`Fleet::ring_with_capacity`]
103/// if a specific kind needs more (or fewer) slots.
104pub const DEFAULT_RING_CAPACITY: usize = 1024;
105
106impl Fleet {
107 /// Join (or create) a fleet under `name` with `fleet_size` total
108 /// expected members. In V0 every call returns a fresh local
109 /// fleet; the backing slot table is process-local.
110 pub fn join(name: &'static str, fleet_size: u8) -> Result<Self> {
111 Self::join_as(name, fleet_size, NodeId::ZERO)
112 }
113
114 /// Join (or create) a process-local fleet with an explicit node id.
115 pub fn join_as(name: &'static str, fleet_size: u8, node_id: NodeId) -> Result<Self> {
116 if fleet_size == 0 {
117 return Err(Error::EmptyFleet);
118 }
119 Ok(Self {
120 inner: Arc::new(FleetInner {
121 name,
122 fleet_size,
123 node_id,
124 id_counters: DashMap::new(),
125 backing: RingBacking::InMemory(RingRegistry::new()),
126 }),
127 })
128 }
129
130 /// Join (or create) a fleet whose ring storage is backed by
131 /// POSIX shared memory. Multiple processes calling this with
132 /// the same `name` and `capacity` share the same kernel-level
133 /// segments — the fleet sees each other's writes.
134 ///
135 /// `capacity` must be a power of two (cheap modulo); applies
136 /// per-KIND ring (each `OrbitTyped` kind gets its own SHM
137 /// segment of `capacity` slots).
138 ///
139 /// Cross-process naming: segments are `/orbit-{name}-{kind}-{uid}`.
140 /// macOS limits POSIX SHM names to 31 chars (PSHMNAMLEN); a
141 /// short fleet name is required there.
142 #[cfg(unix)]
143 pub fn join_shm(name: &'static str, fleet_size: u8, capacity: usize) -> Result<Self> {
144 Self::join_shm_as(name, fleet_size, capacity, NodeId::ZERO)
145 }
146
147 /// Join (or create) a SHM-backed fleet with an explicit node id.
148 #[cfg(unix)]
149 pub fn join_shm_as(
150 name: &'static str,
151 fleet_size: u8,
152 capacity: usize,
153 node_id: NodeId,
154 ) -> Result<Self> {
155 if fleet_size == 0 {
156 return Err(Error::EmptyFleet);
157 }
158 Ok(Self {
159 inner: Arc::new(FleetInner {
160 name,
161 fleet_size,
162 node_id,
163 id_counters: DashMap::new(),
164 backing: RingBacking::Shm(ShmRingRegistry::new(name, capacity)),
165 }),
166 })
167 }
168
169 pub fn name(&self) -> &'static str {
170 self.inner.name
171 }
172
173 pub fn fleet_size(&self) -> u8 {
174 self.inner.fleet_size
175 }
176
177 pub fn node_id(&self) -> NodeId {
178 self.inner.node_id
179 }
180
181 /// Mint a fresh fleet-unique [`NetId64`] for type `T` *without*
182 /// publishing anything. Use this when the caller only needs the
183 /// id (e.g. minting an id to attach to data being persisted to
184 /// DB before going through the ring).
185 ///
186 /// For most use cases prefer [`Fleet::publish`] — it mints AND
187 /// stores in a single atomic step.
188 pub fn next_id<T: OrbitTyped>(&self) -> NetId64 {
189 let counter_arc = self
190 .inner
191 .id_counters
192 .entry(T::KIND)
193 .or_insert_with(|| Arc::new(AtomicU64::new(0)))
194 .clone();
195 let counter = counter_arc.fetch_add(1, Ordering::Relaxed);
196 NetId64::make(T::KIND, self.node_id().get(), counter)
197 }
198
199 /// True when this fleet's ring storage is backed by POSIX SHM
200 /// (visible across processes). False for in-memory fleets.
201 pub fn is_shm(&self) -> bool {
202 #[cfg(unix)]
203 {
204 matches!(self.inner.backing, RingBacking::Shm(_))
205 }
206 #[cfg(not(unix))]
207 {
208 false
209 }
210 }
211
212 /// Get-or-create the in-memory ring for type `T`. Only valid
213 /// for fleets created via [`Fleet::join`]; SHM-backed fleets
214 /// should use [`Fleet::shm_ring`] instead.
215 ///
216 /// # Panics
217 ///
218 /// Panics if called on a SHM-backed fleet.
219 pub fn ring<T: OrbitTyped>(&self) -> Arc<Ring> {
220 match &self.inner.backing {
221 RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY),
222 #[cfg(unix)]
223 RingBacking::Shm(_) => {
224 panic!("Fleet::ring called on SHM-backed fleet — use Fleet::shm_ring instead");
225 }
226 }
227 }
228
229 /// Get-or-create the in-memory ring for type `T` with an
230 /// explicit capacity. See [`Fleet::ring`].
231 pub fn ring_with_capacity<T: OrbitTyped>(&self, capacity: usize) -> Arc<Ring> {
232 match &self.inner.backing {
233 RingBacking::InMemory(r) => r.get_or_create::<T>(capacity),
234 #[cfg(unix)]
235 RingBacking::Shm(_) => {
236 panic!(
237 "Fleet::ring_with_capacity called on SHM-backed fleet — capacity is fixed at join_shm time"
238 );
239 }
240 }
241 }
242
243 /// Get-or-create the SHM ring for type `T`. Only valid on fleets
244 /// created via [`Fleet::join_shm`].
245 ///
246 /// # Errors
247 ///
248 /// Returns an `io::Error` if the SHM segment cannot be opened
249 /// (permissions, name too long, etc.).
250 ///
251 /// # Panics
252 ///
253 /// Panics if called on an in-memory fleet.
254 #[cfg(unix)]
255 pub fn shm_ring<T: OrbitTyped>(&self) -> std::io::Result<Arc<ShmRing>> {
256 match &self.inner.backing {
257 RingBacking::Shm(r) => r.get_or_create_for(T::KIND),
258 RingBacking::InMemory(_) => {
259 panic!("Fleet::shm_ring called on in-memory fleet — use Fleet::ring instead");
260 }
261 }
262 }
263
264 /// Publish a payload to the ring for type `T`. Mints a
265 /// [`NetId64`] (atomic with slot reservation), writes the
266 /// [`Frame`] to the appropriate ring (in-memory or SHM), and
267 /// returns the id.
268 ///
269 /// # Panics
270 ///
271 /// On a SHM-backed fleet, panics if the SHM open fails or the
272 /// payload exceeds [`ring_shm::PAYLOAD_MAX`]. V1 contract: SHM
273 /// errors are operator-visible failures, not silent ones.
274 pub fn publish<T: OrbitTyped>(&self, frame_kind: u8, ver: u64, payload: Bytes) -> NetId64 {
275 match &self.inner.backing {
276 RingBacking::InMemory(r) => {
277 let ring = r.get_or_create::<T>(DEFAULT_RING_CAPACITY);
278 ring.write(self.node_id(), frame_kind, ver, payload)
279 }
280 #[cfg(unix)]
281 RingBacking::Shm(r) => {
282 let ring = r
283 .get_or_create_for(T::KIND)
284 .expect("SHM ring open failed — fleet unusable");
285 ring.write(self.node_id(), frame_kind, ver, payload)
286 .expect("SHM ring write failed")
287 }
288 }
289 }
290
291 /// Look up a previously-published frame by its id. Returns the
292 /// frame if its slot still holds the same id (i.e. the ring has
293 /// not wrapped past it).
294 pub fn read(&self, id: NetId64) -> Option<Frame> {
295 match &self.inner.backing {
296 RingBacking::InMemory(r) => r.lookup(id.kind())?.read(id),
297 #[cfg(unix)]
298 RingBacking::Shm(r) => r.lookup(id.kind())?.read(id),
299 }
300 }
301
302 /// Read the most recent frame for type `T` (head - 1 in the ring).
303 /// Returns `None` if no write has happened yet.
304 pub fn read_head<T: OrbitTyped>(&self) -> Option<Frame> {
305 match &self.inner.backing {
306 RingBacking::InMemory(r) => {
307 let ring = r.get_or_create::<T>(DEFAULT_RING_CAPACITY);
308 ring.read_head()
309 }
310 #[cfg(unix)]
311 RingBacking::Shm(r) => {
312 let ring = r.get_or_create_for(T::KIND).ok()?;
313 ring.read_head()
314 }
315 }
316 }
317
318 /// Current head (write position) for type `T`'s ring. Lazily
319 /// creates / attaches the ring on first access — important for
320 /// cross-process readers, where a child process may need to
321 /// attach to a SHM segment a peer already populated. Returns 0
322 /// when the ring is fresh / no writes have happened.
323 pub fn head<T: OrbitTyped>(&self) -> u64 {
324 match &self.inner.backing {
325 RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY).head(),
326 #[cfg(unix)]
327 RingBacking::Shm(r) => r
328 .get_or_create_for(T::KIND)
329 .map(|ring| ring.head())
330 .unwrap_or(0),
331 }
332 }
333
334 /// Read whatever frame currently occupies the slot at
335 /// `counter % capacity` for type `T`'s ring. Lazily attaches
336 /// the ring on first access (same rationale as [`Fleet::head`]).
337 /// Returns `None` if the slot is empty/torn or attach fails.
338 ///
339 /// Used by walking readers; for typed handle-based reads,
340 /// prefer [`Fleet::read`].
341 pub fn read_at<T: OrbitTyped>(&self, counter: u64) -> Option<Frame> {
342 match &self.inner.backing {
343 RingBacking::InMemory(r) => {
344 r.get_or_create::<T>(DEFAULT_RING_CAPACITY).read_at(counter)
345 }
346 #[cfg(unix)]
347 RingBacking::Shm(r) => r.get_or_create_for(T::KIND).ok()?.read_at(counter),
348 }
349 }
350
351 /// Capacity of the ring for type `T`. Lazily attaches the ring
352 /// on first access. Falls back to [`DEFAULT_RING_CAPACITY`] when
353 /// SHM attach fails.
354 pub fn ring_capacity<T: OrbitTyped>(&self) -> usize {
355 match &self.inner.backing {
356 RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY).capacity(),
357 #[cfg(unix)]
358 RingBacking::Shm(r) => r
359 .get_or_create_for(T::KIND)
360 .map(|ring| ring.capacity())
361 .unwrap_or(DEFAULT_RING_CAPACITY),
362 }
363 }
364
365 /// Clear the ring for `T` and reset its write head to zero.
366 ///
367 /// This is an owner-side boot cleanup primitive. It is safe for
368 /// runtime state such as events and periodic metrics when the
369 /// embedding application calls it before peer processes begin
370 /// publishing. It is not a coordination protocol; callers must not
371 /// reset a ring while other fleet members are actively writing it.
372 pub fn reset_ring<T: OrbitTyped>(&self) -> std::io::Result<()> {
373 match &self.inner.backing {
374 RingBacking::InMemory(r) => {
375 r.get_or_create::<T>(DEFAULT_RING_CAPACITY).reset();
376 Ok(())
377 }
378 #[cfg(unix)]
379 RingBacking::Shm(r) => {
380 r.get_or_create_for(T::KIND)?.reset();
381 Ok(())
382 }
383 }
384 }
385
386 /// Publish a fleet-level Orbit heartbeat for this node.
387 ///
388 /// This is substrate liveness, not process supervision. Embedders
389 /// may inspect it to see whether a node is still writing into the
390 /// Orbit fabric, but worker kill/restart policy belongs above this
391 /// crate.
392 pub fn publish_heartbeat(&self) -> FleetHeartbeat {
393 self.publish_heartbeat_at(OrbitEpoch::now())
394 }
395
396 pub fn publish_heartbeat_at(&self, captured_at: OrbitEpoch) -> FleetHeartbeat {
397 let id = self.publish::<FleetHeartbeatRecord>(
398 FLEET_HEARTBEAT_FRAME_KIND,
399 captured_at.as_unix_ms(),
400 Bytes::new(),
401 );
402 FleetHeartbeat {
403 id,
404 node_id: self.node_id(),
405 captured_at,
406 }
407 }
408
409 pub fn latest_heartbeats(&self) -> Vec<FleetHeartbeat> {
410 heartbeat::latest_heartbeats(self)
411 }
412
413 pub fn heartbeat_snapshot(&self, max_age: Duration) -> FleetHeartbeatSnapshot {
414 self.heartbeat_snapshot_at(OrbitEpoch::now(), max_age)
415 }
416
417 pub fn heartbeat_snapshot_at(
418 &self,
419 now: OrbitEpoch,
420 max_age: Duration,
421 ) -> FleetHeartbeatSnapshot {
422 heartbeat::heartbeat_snapshot(self, now, max_age)
423 }
424}
425
426impl std::fmt::Debug for Fleet {
427 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428 f.debug_struct("Fleet")
429 .field("name", &self.inner.name)
430 .field("fleet_size", &self.inner.fleet_size)
431 .field("node_id", &self.inner.node_id)
432 .field("id_counters", &self.inner.id_counters.len())
433 .finish()
434 }
435}