use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use bytes::{BufMut, Bytes, BytesMut};
use crate::error::{Error, Result};
use crate::fleet::{Fleet, NodeId};
use crate::id::NetId64;
use crate::ring::cursor::RingCursor;
use crate::typed::OrbitTyped;
#[cfg(unix)]
pub const EVENT_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
#[cfg(not(unix))]
pub const EVENT_PAYLOAD_MAX: usize = 256;
const HEADER_LEN: usize = 2 + 2 + 8;
const FRAME_KIND_EVENT: u8 = 1;
pub const EVENT_RING_KIND: u8 = 220;
#[derive(Clone, Debug)]
struct OrbitEventRecord;
impl OrbitTyped for OrbitEventRecord {
const KIND: u8 = EVENT_RING_KIND;
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct OrbitEventCursor {
inner: RingCursor,
}
impl OrbitEventCursor {
pub const fn from_start() -> Self {
Self {
inner: RingCursor::from_start(),
}
}
pub const fn from_counter(next_counter: u64) -> Self {
Self {
inner: RingCursor::from_counter(next_counter),
}
}
pub const fn next_counter(self) -> u64 {
self.inner.next_counter()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct OrbitEvent {
pub id: NetId64,
pub topic: String,
pub payload: Vec<u8>,
pub timestamp_ms: u64,
}
#[derive(Clone, Debug, Default, PartialEq, Eq)]
pub struct OrbitEventPoll {
pub events: Vec<OrbitEvent>,
pub lagged: u64,
}
impl OrbitEventPoll {
pub fn is_empty(&self) -> bool {
self.events.is_empty() && self.lagged == 0
}
}
#[derive(Clone)]
pub struct OrbitEventBus {
fleet: Arc<Fleet>,
}
impl OrbitEventBus {
pub fn new(fleet: Arc<Fleet>) -> Self {
Self { fleet }
}
pub fn node_id(&self) -> NodeId {
self.fleet.node_id()
}
pub fn cursor_at_head(&self) -> OrbitEventCursor {
OrbitEventCursor {
inner: self.fleet.cursor_at_head::<OrbitEventRecord>(),
}
}
pub const fn cursor_from_start(&self) -> OrbitEventCursor {
let _ = self;
OrbitEventCursor::from_start()
}
pub fn reset_ring(&self) -> Result<()> {
self.fleet
.reset_ring::<OrbitEventRecord>()
.map_err(Error::Io)
}
pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<NetId64> {
let timestamp_ms = now_ms();
let frame = encode_frame(topic.as_bytes(), payload, timestamp_ms)?;
Ok(self
.fleet
.publish::<OrbitEventRecord>(FRAME_KIND_EVENT, timestamp_ms, frame))
}
pub fn poll(&self, cursor: &mut OrbitEventCursor) -> OrbitEventPoll {
let ring_poll = self.fleet.poll_ring::<OrbitEventRecord>(&mut cursor.inner);
let mut lagged = ring_poll.loss.total();
let mut events = Vec::new();
for frame in ring_poll.frames {
let Some(decoded) = decode_frame(&frame.payload) else {
lagged = lagged.saturating_add(1);
continue;
};
events.push(OrbitEvent {
id: frame.id,
topic: String::from_utf8_lossy(decoded.topic).into_owned(),
payload: decoded.payload.to_vec(),
timestamp_ms: decoded.timestamp_ms,
});
}
OrbitEventPoll { events, lagged }
}
pub fn poll_topic(&self, cursor: &mut OrbitEventCursor, topic: &str) -> OrbitEventPoll {
let mut poll = self.poll(cursor);
poll.events.retain(|event| event.topic == topic);
poll
}
}
struct DecodedFrame<'a> {
topic: &'a [u8],
payload: &'a [u8],
timestamp_ms: u64,
}
fn encode_frame(topic: &[u8], payload: &[u8], timestamp_ms: u64) -> Result<Bytes> {
let total = HEADER_LEN + topic.len() + payload.len();
if topic.len() > u16::MAX as usize
|| payload.len() > u16::MAX as usize
|| total > EVENT_PAYLOAD_MAX
{
return Err(Error::EventFrameTooLarge {
topic_len: topic.len(),
payload_len: payload.len(),
max_payload: EVENT_PAYLOAD_MAX,
});
}
let mut buf = BytesMut::with_capacity(total);
buf.put_u16_le(topic.len() as u16);
buf.put_u16_le(payload.len() as u16);
buf.put_u64_le(timestamp_ms);
buf.put_slice(topic);
buf.put_slice(payload);
Ok(buf.freeze())
}
fn decode_frame(payload: &Bytes) -> Option<DecodedFrame<'_>> {
if payload.len() < HEADER_LEN {
return None;
}
let topic_len = u16::from_le_bytes(payload[0..2].try_into().ok()?) as usize;
let payload_len = u16::from_le_bytes(payload[2..4].try_into().ok()?) as usize;
let timestamp_ms = u64::from_le_bytes(payload[4..12].try_into().ok()?);
let topic_start = HEADER_LEN;
let topic_end = topic_start.checked_add(topic_len)?;
let payload_end = topic_end.checked_add(payload_len)?;
if payload.len() < payload_end {
return None;
}
Some(DecodedFrame {
topic: &payload[topic_start..topic_end],
payload: &payload[topic_end..payload_end],
timestamp_ms,
})
}
fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::OrbitEventBus;
use crate::Fleet;
#[test]
fn polls_events_since_cursor() {
let fleet = Arc::new(Fleet::join("event_poll", 1).expect("fleet"));
let bus = OrbitEventBus::new(fleet);
let mut cursor = bus.cursor_from_start();
bus.publish("worker.booted", b"w1").expect("publish first");
bus.publish("cache.reset", b"all").expect("publish second");
let poll = bus.poll(&mut cursor);
assert_eq!(poll.lagged, 0);
assert_eq!(poll.events.len(), 2);
assert_eq!(poll.events[0].topic, "worker.booted");
assert_eq!(poll.events[0].payload, b"w1");
assert_eq!(poll.events[1].topic, "cache.reset");
assert_eq!(poll.events[1].payload, b"all");
assert!(bus.poll(&mut cursor).is_empty());
}
#[test]
fn cursor_at_head_only_reads_future_events() {
let fleet = Arc::new(Fleet::join("event_head", 1).expect("fleet"));
let bus = OrbitEventBus::new(fleet);
bus.publish("old", b"ignored").expect("publish old");
let mut cursor = bus.cursor_at_head();
bus.publish("new", b"seen").expect("publish new");
let poll = bus.poll(&mut cursor);
assert_eq!(poll.events.len(), 1);
assert_eq!(poll.events[0].topic, "new");
}
#[test]
fn topic_filter_advances_cursor() {
let fleet = Arc::new(Fleet::join("event_topic", 1).expect("fleet"));
let bus = OrbitEventBus::new(fleet);
let mut cursor = bus.cursor_from_start();
bus.publish("a", b"1").expect("publish a");
bus.publish("b", b"2").expect("publish b");
let poll = bus.poll_topic(&mut cursor, "b");
assert_eq!(poll.events.len(), 1);
assert_eq!(poll.events[0].payload, b"2");
assert!(bus.poll(&mut cursor).is_empty());
}
#[test]
fn reports_lag_when_cursor_falls_behind_capacity() {
let fleet = Arc::new(Fleet::join("event_lag", 1).expect("fleet"));
fleet.ring_with_capacity::<super::OrbitEventRecord>(2);
let bus = OrbitEventBus::new(fleet);
let mut cursor = bus.cursor_from_start();
bus.publish("n", b"1").expect("publish 1");
bus.publish("n", b"2").expect("publish 2");
bus.publish("n", b"3").expect("publish 3");
let poll = bus.poll(&mut cursor);
assert_eq!(poll.lagged, 1);
assert_eq!(poll.events.len(), 2);
assert_eq!(poll.events[0].payload, b"2");
assert_eq!(poll.events[1].payload, b"3");
}
}