Skip to main content

orbit_rs/fleet/
mod.rs

1//! `Fleet` — the per-process handle that represents membership in
2//! a fleet of peers.
3//!
4//! See VISION.md §6 (lifetime — virtual, network-aware) and §7.5
5//! (FleetHandle — membership layer).
6//!
7//! V0 contract: joining yields a `NodeId` and gives the holder access to
8//! type-keyed rings. A fleet can be process-local or backed by POSIX
9//! shared memory; the public API is the same either way.
10//!
11//! Three Musketeers: every member is equal. The fleet does not track
12//! a "leader". Whatever role hierarchy the embedder cares about
13//! (master vs worker, primary vs replica) lives outside this crate.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicU64, Ordering};
17use std::time::Duration;
18
19use bytes::Bytes;
20use dashmap::DashMap;
21
22use crate::error::{Error, Result};
23use crate::id::NetId64;
24#[cfg(unix)]
25use crate::ring::shm::{ShmRing, ShmRingRegistry};
26use crate::ring::{Frame, Ring, RingRegistry};
27use crate::tick::OrbitEpoch;
28use crate::typed::OrbitTyped;
29
30mod cursor;
31pub mod heartbeat;
32pub use heartbeat::{
33    FLEET_HEARTBEAT_FRAME_KIND, FLEET_HEARTBEAT_RING_KIND, FleetHeartbeat, FleetHeartbeatRecord,
34    FleetHeartbeatSnapshot,
35};
36
37/// A node's slot inside the fleet — assigned at `join` time.
38///
39/// V0: assigned monotonically by a process-local counter (always
40/// `0` for a single-process test). V1+: assigned by the SHM-backed
41/// fleet header so peers see consistent values.
42#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
43#[repr(transparent)]
44pub struct NodeId(pub u16);
45
46impl NodeId {
47    pub const ZERO: Self = Self(0);
48
49    pub const fn new(value: u16) -> Self {
50        Self(value)
51    }
52
53    pub const fn get(self) -> u16 {
54        self.0
55    }
56}
57
58impl std::fmt::Display for NodeId {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        write!(f, "node:{}", self.0)
61    }
62}
63
64/// Per-process handle into the fleet. Cheap to clone — the inner
65/// state is `Arc`-shared.
66#[derive(Clone)]
67pub struct Fleet {
68    inner: Arc<FleetInner>,
69}
70
71struct FleetInner {
72    name: &'static str,
73    fleet_size: u8,
74    node_id: NodeId,
75    /// Per-KIND counter for `next_id` calls that don't go through a
76    /// ring (i.e. when the caller wants a fleet-unique id without
77    /// allocating a ring slot). V0: process-local atomic. V1: still
78    /// here, parallel to the ring's own write-position.
79    id_counters: DashMap<u8, Arc<AtomicU64>>,
80    /// Per-KIND ring buffers — orbit's runtime substrate. Either
81    /// in-process for unit-test / single-process use, or POSIX SHM
82    /// for real cross-process visibility (V1, master+worker fleet).
83    backing: RingBacking,
84}
85
86/// Backing storage for the fleet's ring buffers — chosen at
87/// `Fleet::join` / `Fleet::join_shm` time and frozen for the
88/// fleet's lifetime.
89enum RingBacking {
90    /// Process-local DashMap of `Ring` instances. No cross-process
91    /// visibility — peers running other processes do not see this
92    /// fleet's writes. Useful for unit tests and embedded scenarios.
93    InMemory(RingRegistry),
94    /// POSIX-SHM-backed `ShmRing` instances. Multiple processes
95    /// joining the same fleet name share the same kernel-level
96    /// memory; writes from one are visible to all immediately.
97    #[cfg(unix)]
98    Shm(ShmRingRegistry),
99}
100
101/// Default ring capacity when [`Fleet::publish`] is called for a
102/// kind that has no ring yet. Override with [`Fleet::ring_with_capacity`]
103/// if a specific kind needs more (or fewer) slots.
104pub const DEFAULT_RING_CAPACITY: usize = 1024;
105
106impl Fleet {
107    /// Join (or create) a fleet under `name` with `fleet_size` total
108    /// expected members. In V0 every call returns a fresh local
109    /// fleet; the backing slot table is process-local.
110    pub fn join(name: &'static str, fleet_size: u8) -> Result<Self> {
111        Self::join_as(name, fleet_size, NodeId::ZERO)
112    }
113
114    /// Join (or create) a process-local fleet with an explicit node id.
115    pub fn join_as(name: &'static str, fleet_size: u8, node_id: NodeId) -> Result<Self> {
116        if fleet_size == 0 {
117            return Err(Error::EmptyFleet);
118        }
119        Ok(Self {
120            inner: Arc::new(FleetInner {
121                name,
122                fleet_size,
123                node_id,
124                id_counters: DashMap::new(),
125                backing: RingBacking::InMemory(RingRegistry::new()),
126            }),
127        })
128    }
129
130    /// Join (or create) a fleet whose ring storage is backed by
131    /// POSIX shared memory. Multiple processes calling this with
132    /// the same `name` and `capacity` share the same kernel-level
133    /// segments — the fleet sees each other's writes.
134    ///
135    /// `capacity` must be a power of two (cheap modulo); applies
136    /// per-KIND ring (each `OrbitTyped` kind gets its own SHM
137    /// segment of `capacity` slots).
138    ///
139    /// Cross-process naming: segments are `/orbit-{name}-{kind}-{uid}`.
140    /// macOS limits POSIX SHM names to 31 chars (PSHMNAMLEN); a
141    /// short fleet name is required there.
142    #[cfg(unix)]
143    pub fn join_shm(name: &'static str, fleet_size: u8, capacity: usize) -> Result<Self> {
144        Self::join_shm_as(name, fleet_size, capacity, NodeId::ZERO)
145    }
146
147    /// Join (or create) a SHM-backed fleet with an explicit node id.
148    #[cfg(unix)]
149    pub fn join_shm_as(
150        name: &'static str,
151        fleet_size: u8,
152        capacity: usize,
153        node_id: NodeId,
154    ) -> Result<Self> {
155        if fleet_size == 0 {
156            return Err(Error::EmptyFleet);
157        }
158        Ok(Self {
159            inner: Arc::new(FleetInner {
160                name,
161                fleet_size,
162                node_id,
163                id_counters: DashMap::new(),
164                backing: RingBacking::Shm(ShmRingRegistry::new(name, capacity)),
165            }),
166        })
167    }
168
169    pub fn name(&self) -> &'static str {
170        self.inner.name
171    }
172
173    pub fn fleet_size(&self) -> u8 {
174        self.inner.fleet_size
175    }
176
177    pub fn node_id(&self) -> NodeId {
178        self.inner.node_id
179    }
180
181    /// Mint a fresh fleet-unique [`NetId64`] for type `T` *without*
182    /// publishing anything. Use this when the caller only needs the
183    /// id (e.g. minting an id to attach to data being persisted to
184    /// DB before going through the ring).
185    ///
186    /// For most use cases prefer [`Fleet::publish`] — it mints AND
187    /// stores in a single atomic step.
188    pub fn next_id<T: OrbitTyped>(&self) -> NetId64 {
189        let counter_arc = self
190            .inner
191            .id_counters
192            .entry(T::KIND)
193            .or_insert_with(|| Arc::new(AtomicU64::new(0)))
194            .clone();
195        let counter = counter_arc.fetch_add(1, Ordering::Relaxed);
196        NetId64::make(T::KIND, self.node_id().get(), counter)
197    }
198
199    /// True when this fleet's ring storage is backed by POSIX SHM
200    /// (visible across processes). False for in-memory fleets.
201    pub fn is_shm(&self) -> bool {
202        #[cfg(unix)]
203        {
204            matches!(self.inner.backing, RingBacking::Shm(_))
205        }
206        #[cfg(not(unix))]
207        {
208            false
209        }
210    }
211
212    /// Get-or-create the in-memory ring for type `T`. Only valid
213    /// for fleets created via [`Fleet::join`]; SHM-backed fleets
214    /// should use [`Fleet::shm_ring`] instead.
215    ///
216    /// # Panics
217    ///
218    /// Panics if called on a SHM-backed fleet.
219    pub fn ring<T: OrbitTyped>(&self) -> Arc<Ring> {
220        match &self.inner.backing {
221            RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY),
222            #[cfg(unix)]
223            RingBacking::Shm(_) => {
224                panic!("Fleet::ring called on SHM-backed fleet — use Fleet::shm_ring instead");
225            }
226        }
227    }
228
229    /// Get-or-create the in-memory ring for type `T` with an
230    /// explicit capacity. See [`Fleet::ring`].
231    pub fn ring_with_capacity<T: OrbitTyped>(&self, capacity: usize) -> Arc<Ring> {
232        match &self.inner.backing {
233            RingBacking::InMemory(r) => r.get_or_create::<T>(capacity),
234            #[cfg(unix)]
235            RingBacking::Shm(_) => {
236                panic!(
237                    "Fleet::ring_with_capacity called on SHM-backed fleet — capacity is fixed at join_shm time"
238                );
239            }
240        }
241    }
242
243    /// Get-or-create the SHM ring for type `T`. Only valid on fleets
244    /// created via [`Fleet::join_shm`].
245    ///
246    /// # Errors
247    ///
248    /// Returns an `io::Error` if the SHM segment cannot be opened
249    /// (permissions, name too long, etc.).
250    ///
251    /// # Panics
252    ///
253    /// Panics if called on an in-memory fleet.
254    #[cfg(unix)]
255    pub fn shm_ring<T: OrbitTyped>(&self) -> std::io::Result<Arc<ShmRing>> {
256        match &self.inner.backing {
257            RingBacking::Shm(r) => r.get_or_create_for(T::KIND),
258            RingBacking::InMemory(_) => {
259                panic!("Fleet::shm_ring called on in-memory fleet — use Fleet::ring instead");
260            }
261        }
262    }
263
264    /// Publish a payload to the ring for type `T`. Mints a
265    /// [`NetId64`] (atomic with slot reservation), writes the
266    /// [`Frame`] to the appropriate ring (in-memory or SHM), and
267    /// returns the id.
268    ///
269    /// # Panics
270    ///
271    /// On a SHM-backed fleet, panics if the SHM open fails or the
272    /// payload exceeds [`ring_shm::PAYLOAD_MAX`]. V1 contract: SHM
273    /// errors are operator-visible failures, not silent ones.
274    pub fn publish<T: OrbitTyped>(&self, frame_kind: u8, ver: u64, payload: Bytes) -> NetId64 {
275        match &self.inner.backing {
276            RingBacking::InMemory(r) => {
277                let ring = r.get_or_create::<T>(DEFAULT_RING_CAPACITY);
278                ring.write(self.node_id(), frame_kind, ver, payload)
279            }
280            #[cfg(unix)]
281            RingBacking::Shm(r) => {
282                let ring = r
283                    .get_or_create_for(T::KIND)
284                    .expect("SHM ring open failed — fleet unusable");
285                ring.write(self.node_id(), frame_kind, ver, payload)
286                    .expect("SHM ring write failed")
287            }
288        }
289    }
290
291    /// Look up a previously-published frame by its id. Returns the
292    /// frame if its slot still holds the same id (i.e. the ring has
293    /// not wrapped past it).
294    pub fn read(&self, id: NetId64) -> Option<Frame> {
295        match &self.inner.backing {
296            RingBacking::InMemory(r) => r.lookup(id.kind())?.read(id),
297            #[cfg(unix)]
298            RingBacking::Shm(r) => r.lookup(id.kind())?.read(id),
299        }
300    }
301
302    /// Read the most recent frame for type `T` (head - 1 in the ring).
303    /// Returns `None` if no write has happened yet.
304    pub fn read_head<T: OrbitTyped>(&self) -> Option<Frame> {
305        match &self.inner.backing {
306            RingBacking::InMemory(r) => {
307                let ring = r.get_or_create::<T>(DEFAULT_RING_CAPACITY);
308                ring.read_head()
309            }
310            #[cfg(unix)]
311            RingBacking::Shm(r) => {
312                let ring = r.get_or_create_for(T::KIND).ok()?;
313                ring.read_head()
314            }
315        }
316    }
317
318    /// Current head (write position) for type `T`'s ring. Lazily
319    /// creates / attaches the ring on first access — important for
320    /// cross-process readers, where a child process may need to
321    /// attach to a SHM segment a peer already populated. Returns 0
322    /// when the ring is fresh / no writes have happened.
323    pub fn head<T: OrbitTyped>(&self) -> u64 {
324        match &self.inner.backing {
325            RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY).head(),
326            #[cfg(unix)]
327            RingBacking::Shm(r) => r
328                .get_or_create_for(T::KIND)
329                .map(|ring| ring.head())
330                .unwrap_or(0),
331        }
332    }
333
334    /// Read whatever frame currently occupies the slot at
335    /// `counter % capacity` for type `T`'s ring. Lazily attaches
336    /// the ring on first access (same rationale as [`Fleet::head`]).
337    /// Returns `None` if the slot is empty/torn or attach fails.
338    ///
339    /// Used by walking readers; for typed handle-based reads,
340    /// prefer [`Fleet::read`].
341    pub fn read_at<T: OrbitTyped>(&self, counter: u64) -> Option<Frame> {
342        match &self.inner.backing {
343            RingBacking::InMemory(r) => {
344                r.get_or_create::<T>(DEFAULT_RING_CAPACITY).read_at(counter)
345            }
346            #[cfg(unix)]
347            RingBacking::Shm(r) => r.get_or_create_for(T::KIND).ok()?.read_at(counter),
348        }
349    }
350
351    /// Capacity of the ring for type `T`. Lazily attaches the ring
352    /// on first access. Falls back to [`DEFAULT_RING_CAPACITY`] when
353    /// SHM attach fails.
354    pub fn ring_capacity<T: OrbitTyped>(&self) -> usize {
355        match &self.inner.backing {
356            RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY).capacity(),
357            #[cfg(unix)]
358            RingBacking::Shm(r) => r
359                .get_or_create_for(T::KIND)
360                .map(|ring| ring.capacity())
361                .unwrap_or(DEFAULT_RING_CAPACITY),
362        }
363    }
364
365    /// Clear the ring for `T` and reset its write head to zero.
366    ///
367    /// This is an owner-side boot cleanup primitive. It is safe for
368    /// runtime state such as events and periodic metrics when the
369    /// embedding application calls it before peer processes begin
370    /// publishing. It is not a coordination protocol; callers must not
371    /// reset a ring while other fleet members are actively writing it.
372    pub fn reset_ring<T: OrbitTyped>(&self) -> std::io::Result<()> {
373        match &self.inner.backing {
374            RingBacking::InMemory(r) => {
375                r.get_or_create::<T>(DEFAULT_RING_CAPACITY).reset();
376                Ok(())
377            }
378            #[cfg(unix)]
379            RingBacking::Shm(r) => {
380                r.get_or_create_for(T::KIND)?.reset();
381                Ok(())
382            }
383        }
384    }
385
386    /// Publish a fleet-level Orbit heartbeat for this node.
387    ///
388    /// This is substrate liveness, not process supervision. Embedders
389    /// may inspect it to see whether a node is still writing into the
390    /// Orbit fabric, but worker kill/restart policy belongs above this
391    /// crate.
392    pub fn publish_heartbeat(&self) -> FleetHeartbeat {
393        self.publish_heartbeat_at(OrbitEpoch::now())
394    }
395
396    pub fn publish_heartbeat_at(&self, captured_at: OrbitEpoch) -> FleetHeartbeat {
397        let id = self.publish::<FleetHeartbeatRecord>(
398            FLEET_HEARTBEAT_FRAME_KIND,
399            captured_at.as_unix_ms(),
400            Bytes::new(),
401        );
402        FleetHeartbeat {
403            id,
404            node_id: self.node_id(),
405            captured_at,
406        }
407    }
408
409    pub fn latest_heartbeats(&self) -> Vec<FleetHeartbeat> {
410        heartbeat::latest_heartbeats(self)
411    }
412
413    pub fn heartbeat_snapshot(&self, max_age: Duration) -> FleetHeartbeatSnapshot {
414        self.heartbeat_snapshot_at(OrbitEpoch::now(), max_age)
415    }
416
417    pub fn heartbeat_snapshot_at(
418        &self,
419        now: OrbitEpoch,
420        max_age: Duration,
421    ) -> FleetHeartbeatSnapshot {
422        heartbeat::heartbeat_snapshot(self, now, max_age)
423    }
424}
425
426impl std::fmt::Debug for Fleet {
427    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
428        f.debug_struct("Fleet")
429            .field("name", &self.inner.name)
430            .field("fleet_size", &self.inner.fleet_size)
431            .field("node_id", &self.inner.node_id)
432            .field("id_counters", &self.inner.id_counters.len())
433            .finish()
434    }
435}