use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
use bytes::Bytes;
use dashmap::DashMap;
use crate::error::{Error, Result};
use crate::id::NetId64;
#[cfg(unix)]
use crate::ring::shm::{ShmRing, ShmRingRegistry};
use crate::ring::{Frame, Ring, RingRegistry};
use crate::tick::OrbitEpoch;
use crate::typed::OrbitTyped;
mod cursor;
pub mod heartbeat;
pub use heartbeat::{
FLEET_HEARTBEAT_FRAME_KIND, FLEET_HEARTBEAT_RING_KIND, FleetHeartbeat, FleetHeartbeatRecord,
FleetHeartbeatSnapshot,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[repr(transparent)]
pub struct NodeId(pub u16);
impl NodeId {
pub const ZERO: Self = Self(0);
pub const fn new(value: u16) -> Self {
Self(value)
}
pub const fn get(self) -> u16 {
self.0
}
}
impl std::fmt::Display for NodeId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "node:{}", self.0)
}
}
#[derive(Clone)]
pub struct Fleet {
inner: Arc<FleetInner>,
}
struct FleetInner {
name: &'static str,
fleet_size: u8,
node_id: NodeId,
id_counters: DashMap<u8, Arc<AtomicU64>>,
backing: RingBacking,
}
enum RingBacking {
InMemory(RingRegistry),
#[cfg(unix)]
Shm(ShmRingRegistry),
}
pub const DEFAULT_RING_CAPACITY: usize = 1024;
impl Fleet {
pub fn join(name: &'static str, fleet_size: u8) -> Result<Self> {
Self::join_as(name, fleet_size, NodeId::ZERO)
}
pub fn join_as(name: &'static str, fleet_size: u8, node_id: NodeId) -> Result<Self> {
if fleet_size == 0 {
return Err(Error::EmptyFleet);
}
Ok(Self {
inner: Arc::new(FleetInner {
name,
fleet_size,
node_id,
id_counters: DashMap::new(),
backing: RingBacking::InMemory(RingRegistry::new()),
}),
})
}
#[cfg(unix)]
pub fn join_shm(name: &'static str, fleet_size: u8, capacity: usize) -> Result<Self> {
Self::join_shm_as(name, fleet_size, capacity, NodeId::ZERO)
}
#[cfg(unix)]
pub fn join_shm_as(
name: &'static str,
fleet_size: u8,
capacity: usize,
node_id: NodeId,
) -> Result<Self> {
if fleet_size == 0 {
return Err(Error::EmptyFleet);
}
Ok(Self {
inner: Arc::new(FleetInner {
name,
fleet_size,
node_id,
id_counters: DashMap::new(),
backing: RingBacking::Shm(ShmRingRegistry::new(name, capacity)),
}),
})
}
pub fn name(&self) -> &'static str {
self.inner.name
}
pub fn fleet_size(&self) -> u8 {
self.inner.fleet_size
}
pub fn node_id(&self) -> NodeId {
self.inner.node_id
}
pub fn next_id<T: OrbitTyped>(&self) -> NetId64 {
let counter_arc = self
.inner
.id_counters
.entry(T::KIND)
.or_insert_with(|| Arc::new(AtomicU64::new(0)))
.clone();
let counter = counter_arc.fetch_add(1, Ordering::Relaxed);
NetId64::make(T::KIND, self.node_id().get(), counter)
}
pub fn is_shm(&self) -> bool {
#[cfg(unix)]
{
matches!(self.inner.backing, RingBacking::Shm(_))
}
#[cfg(not(unix))]
{
false
}
}
pub fn ring<T: OrbitTyped>(&self) -> Arc<Ring> {
match &self.inner.backing {
RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY),
#[cfg(unix)]
RingBacking::Shm(_) => {
panic!("Fleet::ring called on SHM-backed fleet — use Fleet::shm_ring instead");
}
}
}
pub fn ring_with_capacity<T: OrbitTyped>(&self, capacity: usize) -> Arc<Ring> {
match &self.inner.backing {
RingBacking::InMemory(r) => r.get_or_create::<T>(capacity),
#[cfg(unix)]
RingBacking::Shm(_) => {
panic!(
"Fleet::ring_with_capacity called on SHM-backed fleet — capacity is fixed at join_shm time"
);
}
}
}
#[cfg(unix)]
pub fn shm_ring<T: OrbitTyped>(&self) -> std::io::Result<Arc<ShmRing>> {
match &self.inner.backing {
RingBacking::Shm(r) => r.get_or_create_for(T::KIND),
RingBacking::InMemory(_) => {
panic!("Fleet::shm_ring called on in-memory fleet — use Fleet::ring instead");
}
}
}
pub fn publish<T: OrbitTyped>(&self, frame_kind: u8, ver: u64, payload: Bytes) -> NetId64 {
match &self.inner.backing {
RingBacking::InMemory(r) => {
let ring = r.get_or_create::<T>(DEFAULT_RING_CAPACITY);
ring.write(self.node_id(), frame_kind, ver, payload)
}
#[cfg(unix)]
RingBacking::Shm(r) => {
let ring = r
.get_or_create_for(T::KIND)
.expect("SHM ring open failed — fleet unusable");
ring.write(self.node_id(), frame_kind, ver, payload)
.expect("SHM ring write failed")
}
}
}
pub fn read(&self, id: NetId64) -> Option<Frame> {
match &self.inner.backing {
RingBacking::InMemory(r) => r.lookup(id.kind())?.read(id),
#[cfg(unix)]
RingBacking::Shm(r) => r.lookup(id.kind())?.read(id),
}
}
pub fn read_head<T: OrbitTyped>(&self) -> Option<Frame> {
match &self.inner.backing {
RingBacking::InMemory(r) => {
let ring = r.get_or_create::<T>(DEFAULT_RING_CAPACITY);
ring.read_head()
}
#[cfg(unix)]
RingBacking::Shm(r) => {
let ring = r.get_or_create_for(T::KIND).ok()?;
ring.read_head()
}
}
}
pub fn head<T: OrbitTyped>(&self) -> u64 {
match &self.inner.backing {
RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY).head(),
#[cfg(unix)]
RingBacking::Shm(r) => r
.get_or_create_for(T::KIND)
.map(|ring| ring.head())
.unwrap_or(0),
}
}
pub fn read_at<T: OrbitTyped>(&self, counter: u64) -> Option<Frame> {
match &self.inner.backing {
RingBacking::InMemory(r) => {
r.get_or_create::<T>(DEFAULT_RING_CAPACITY).read_at(counter)
}
#[cfg(unix)]
RingBacking::Shm(r) => r.get_or_create_for(T::KIND).ok()?.read_at(counter),
}
}
pub fn ring_capacity<T: OrbitTyped>(&self) -> usize {
match &self.inner.backing {
RingBacking::InMemory(r) => r.get_or_create::<T>(DEFAULT_RING_CAPACITY).capacity(),
#[cfg(unix)]
RingBacking::Shm(r) => r
.get_or_create_for(T::KIND)
.map(|ring| ring.capacity())
.unwrap_or(DEFAULT_RING_CAPACITY),
}
}
pub fn reset_ring<T: OrbitTyped>(&self) -> std::io::Result<()> {
match &self.inner.backing {
RingBacking::InMemory(r) => {
r.get_or_create::<T>(DEFAULT_RING_CAPACITY).reset();
Ok(())
}
#[cfg(unix)]
RingBacking::Shm(r) => {
r.get_or_create_for(T::KIND)?.reset();
Ok(())
}
}
}
pub fn publish_heartbeat(&self) -> FleetHeartbeat {
self.publish_heartbeat_at(OrbitEpoch::now())
}
pub fn publish_heartbeat_at(&self, captured_at: OrbitEpoch) -> FleetHeartbeat {
let id = self.publish::<FleetHeartbeatRecord>(
FLEET_HEARTBEAT_FRAME_KIND,
captured_at.as_unix_ms(),
Bytes::new(),
);
FleetHeartbeat {
id,
node_id: self.node_id(),
captured_at,
}
}
pub fn latest_heartbeats(&self) -> Vec<FleetHeartbeat> {
heartbeat::latest_heartbeats(self)
}
pub fn heartbeat_snapshot(&self, max_age: Duration) -> FleetHeartbeatSnapshot {
self.heartbeat_snapshot_at(OrbitEpoch::now(), max_age)
}
pub fn heartbeat_snapshot_at(
&self,
now: OrbitEpoch,
max_age: Duration,
) -> FleetHeartbeatSnapshot {
heartbeat::heartbeat_snapshot(self, now, max_age)
}
}
impl std::fmt::Debug for Fleet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Fleet")
.field("name", &self.inner.name)
.field("fleet_size", &self.inner.fleet_size)
.field("node_id", &self.inner.node_id)
.field("id_counters", &self.inner.id_counters.len())
.finish()
}
}