orbit-rs 0.1.0

Fleet-aware shared-memory rings over POSIX shared memory.
Documentation
//! `OrbitEventBus` — append-only event stream over one Orbit ring.
//!
//! Events are not cache entries and not metrics snapshots. Cache reads
//! ask "what is the newest value for this key?" Metrics reads ask "what
//! is the newest sample for each node?" Event reads ask "which frames
//! have appeared since my cursor?"
//!
//! V0 deliberately uses a single event ring for all topics. The topic is
//! carried inside the frame payload, so adding a new event type does not
//! allocate another SHM segment. Typed/domain dispatch belongs above
//! this raw primitive.

use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use bytes::{BufMut, Bytes, BytesMut};

use crate::error::{Error, Result};
use crate::fleet::{Fleet, NodeId};
use crate::id::NetId64;
use crate::ring::cursor::RingCursor;
use crate::typed::OrbitTyped;

/// Event frame payload limit for V0. On Unix this matches the SHM
/// ring's fixed slot payload size; non-Unix keeps the same contract so
/// callers do not accidentally rely on unbounded in-memory frames.
#[cfg(unix)]
pub const EVENT_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
#[cfg(not(unix))]
pub const EVENT_PAYLOAD_MAX: usize = 256;

const HEADER_LEN: usize = 2 + 2 + 8;
const FRAME_KIND_EVENT: u8 = 1;
pub const EVENT_RING_KIND: u8 = 220;

/// Dedicated single-ring kind for raw Orbit events.
#[derive(Clone, Debug)]
struct OrbitEventRecord;

impl OrbitTyped for OrbitEventRecord {
    // Hand-picked V0 kind. Build-time KIND allocation will replace
    // these manual values later.
    const KIND: u8 = EVENT_RING_KIND;
}

/// Cursor for one event subscriber.
///
/// The cursor stores the next ring counter the subscriber should try to
/// read. It is intentionally caller-owned so different consumers can
/// advance independently.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct OrbitEventCursor {
    inner: RingCursor,
}

impl OrbitEventCursor {
    /// Start from the beginning of the ring history that is still
    /// available.
    pub const fn from_start() -> Self {
        Self {
            inner: RingCursor::from_start(),
        }
    }

    /// Start from a known next counter.
    pub const fn from_counter(next_counter: u64) -> Self {
        Self {
            inner: RingCursor::from_counter(next_counter),
        }
    }

    /// The next counter this cursor will read.
    pub const fn next_counter(self) -> u64 {
        self.inner.next_counter()
    }
}

/// One decoded event from the shared event ring.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OrbitEvent {
    pub id: NetId64,
    pub topic: String,
    pub payload: Vec<u8>,
    pub timestamp_ms: u64,
}

/// Result of polling an event cursor.
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct OrbitEventPoll {
    pub events: Vec<OrbitEvent>,
    /// Number of counters that could not be delivered because the
    /// subscriber lagged past the ring capacity, a slot was empty, or a
    /// slot had already wrapped to another counter.
    pub lagged: u64,
}

impl OrbitEventPoll {
    pub fn is_empty(&self) -> bool {
        self.events.is_empty() && self.lagged == 0
    }
}

/// Fleet-shared raw event bus. Cheap to clone.
#[derive(Clone)]
pub struct OrbitEventBus {
    fleet: Arc<Fleet>,
}

impl OrbitEventBus {
    pub fn new(fleet: Arc<Fleet>) -> Self {
        Self { fleet }
    }

    pub fn node_id(&self) -> NodeId {
        self.fleet.node_id()
    }

    /// Cursor that starts after all events currently in the ring.
    /// Useful for subscribers that only want future events.
    pub fn cursor_at_head(&self) -> OrbitEventCursor {
        OrbitEventCursor {
            inner: self.fleet.cursor_at_head::<OrbitEventRecord>(),
        }
    }

    /// Cursor that starts at counter 0 and replays whatever history has
    /// not wrapped out of the ring.
    pub const fn cursor_from_start(&self) -> OrbitEventCursor {
        let _ = self;
        OrbitEventCursor::from_start()
    }

    /// Clear the shared event ring.
    ///
    /// Intended for owner-controlled boot-time cleanup before peer
    /// processes publish events into the ring. This prevents stale
    /// events from a previous process lifetime from being replayed or
    /// counted as current runtime state.
    pub fn reset_ring(&self) -> Result<()> {
        self.fleet
            .reset_ring::<OrbitEventRecord>()
            .map_err(Error::Io)
    }

    /// Publish one event under `topic`.
    pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<NetId64> {
        let timestamp_ms = now_ms();
        let frame = encode_frame(topic.as_bytes(), payload, timestamp_ms)?;
        Ok(self
            .fleet
            .publish::<OrbitEventRecord>(FRAME_KIND_EVENT, timestamp_ms, frame))
    }

    /// Poll all events since `cursor`, advancing the cursor to the
    /// current ring head. If the cursor has fallen behind the ring
    /// capacity, older overwritten counters are reported as `lagged`.
    pub fn poll(&self, cursor: &mut OrbitEventCursor) -> OrbitEventPoll {
        let ring_poll = self.fleet.poll_ring::<OrbitEventRecord>(&mut cursor.inner);
        let mut lagged = ring_poll.loss.total();
        let mut events = Vec::new();
        for frame in ring_poll.frames {
            let Some(decoded) = decode_frame(&frame.payload) else {
                lagged = lagged.saturating_add(1);
                continue;
            };
            events.push(OrbitEvent {
                id: frame.id,
                topic: String::from_utf8_lossy(decoded.topic).into_owned(),
                payload: decoded.payload.to_vec(),
                timestamp_ms: decoded.timestamp_ms,
            });
        }

        OrbitEventPoll { events, lagged }
    }

    /// Poll and keep only events whose topic matches `topic`.
    ///
    /// The cursor still advances past all events, including filtered
    /// topics. Use separate cursors for independent consumers.
    pub fn poll_topic(&self, cursor: &mut OrbitEventCursor, topic: &str) -> OrbitEventPoll {
        let mut poll = self.poll(cursor);
        poll.events.retain(|event| event.topic == topic);
        poll
    }
}

struct DecodedFrame<'a> {
    topic: &'a [u8],
    payload: &'a [u8],
    timestamp_ms: u64,
}

fn encode_frame(topic: &[u8], payload: &[u8], timestamp_ms: u64) -> Result<Bytes> {
    let total = HEADER_LEN + topic.len() + payload.len();
    if topic.len() > u16::MAX as usize
        || payload.len() > u16::MAX as usize
        || total > EVENT_PAYLOAD_MAX
    {
        return Err(Error::EventFrameTooLarge {
            topic_len: topic.len(),
            payload_len: payload.len(),
            max_payload: EVENT_PAYLOAD_MAX,
        });
    }

    let mut buf = BytesMut::with_capacity(total);
    buf.put_u16_le(topic.len() as u16);
    buf.put_u16_le(payload.len() as u16);
    buf.put_u64_le(timestamp_ms);
    buf.put_slice(topic);
    buf.put_slice(payload);
    Ok(buf.freeze())
}

fn decode_frame(payload: &Bytes) -> Option<DecodedFrame<'_>> {
    if payload.len() < HEADER_LEN {
        return None;
    }

    let topic_len = u16::from_le_bytes(payload[0..2].try_into().ok()?) as usize;
    let payload_len = u16::from_le_bytes(payload[2..4].try_into().ok()?) as usize;
    let timestamp_ms = u64::from_le_bytes(payload[4..12].try_into().ok()?);
    let topic_start = HEADER_LEN;
    let topic_end = topic_start.checked_add(topic_len)?;
    let payload_end = topic_end.checked_add(payload_len)?;
    if payload.len() < payload_end {
        return None;
    }

    Some(DecodedFrame {
        topic: &payload[topic_start..topic_end],
        payload: &payload[topic_end..payload_end],
        timestamp_ms,
    })
}

fn now_ms() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
        .unwrap_or(0)
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use super::OrbitEventBus;
    use crate::Fleet;

    #[test]
    fn polls_events_since_cursor() {
        let fleet = Arc::new(Fleet::join("event_poll", 1).expect("fleet"));
        let bus = OrbitEventBus::new(fleet);
        let mut cursor = bus.cursor_from_start();

        bus.publish("worker.booted", b"w1").expect("publish first");
        bus.publish("cache.reset", b"all").expect("publish second");

        let poll = bus.poll(&mut cursor);
        assert_eq!(poll.lagged, 0);
        assert_eq!(poll.events.len(), 2);
        assert_eq!(poll.events[0].topic, "worker.booted");
        assert_eq!(poll.events[0].payload, b"w1");
        assert_eq!(poll.events[1].topic, "cache.reset");
        assert_eq!(poll.events[1].payload, b"all");

        assert!(bus.poll(&mut cursor).is_empty());
    }

    #[test]
    fn cursor_at_head_only_reads_future_events() {
        let fleet = Arc::new(Fleet::join("event_head", 1).expect("fleet"));
        let bus = OrbitEventBus::new(fleet);

        bus.publish("old", b"ignored").expect("publish old");
        let mut cursor = bus.cursor_at_head();
        bus.publish("new", b"seen").expect("publish new");

        let poll = bus.poll(&mut cursor);
        assert_eq!(poll.events.len(), 1);
        assert_eq!(poll.events[0].topic, "new");
    }

    #[test]
    fn topic_filter_advances_cursor() {
        let fleet = Arc::new(Fleet::join("event_topic", 1).expect("fleet"));
        let bus = OrbitEventBus::new(fleet);
        let mut cursor = bus.cursor_from_start();

        bus.publish("a", b"1").expect("publish a");
        bus.publish("b", b"2").expect("publish b");

        let poll = bus.poll_topic(&mut cursor, "b");
        assert_eq!(poll.events.len(), 1);
        assert_eq!(poll.events[0].payload, b"2");
        assert!(bus.poll(&mut cursor).is_empty());
    }

    #[test]
    fn reports_lag_when_cursor_falls_behind_capacity() {
        let fleet = Arc::new(Fleet::join("event_lag", 1).expect("fleet"));
        fleet.ring_with_capacity::<super::OrbitEventRecord>(2);
        let bus = OrbitEventBus::new(fleet);
        let mut cursor = bus.cursor_from_start();

        bus.publish("n", b"1").expect("publish 1");
        bus.publish("n", b"2").expect("publish 2");
        bus.publish("n", b"3").expect("publish 3");

        let poll = bus.poll(&mut cursor);
        assert_eq!(poll.lagged, 1);
        assert_eq!(poll.events.len(), 2);
        assert_eq!(poll.events[0].payload, b"2");
        assert_eq!(poll.events[1].payload, b"3");
    }
}