Skip to main content

orbit_rs/event/
mod.rs

1//! `OrbitEventBus` — append-only event stream over one Orbit ring.
2//!
3//! Events are not cache entries and not metrics snapshots. Cache reads
4//! ask "what is the newest value for this key?" Metrics reads ask "what
5//! is the newest sample for each node?" Event reads ask "which frames
6//! have appeared since my cursor?"
7//!
8//! V0 deliberately uses a single event ring for all topics. The topic is
9//! carried inside the frame payload, so adding a new event type does not
10//! allocate another SHM segment. Typed/domain dispatch belongs above
11//! this raw primitive.
12
13use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use bytes::{BufMut, Bytes, BytesMut};
17
18use crate::error::{Error, Result};
19use crate::fleet::{Fleet, NodeId};
20use crate::id::NetId64;
21use crate::ring::cursor::RingCursor;
22use crate::typed::OrbitTyped;
23
24/// Event frame payload limit for V0. On Unix this matches the SHM
25/// ring's fixed slot payload size; non-Unix keeps the same contract so
26/// callers do not accidentally rely on unbounded in-memory frames.
27#[cfg(unix)]
28pub const EVENT_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
29#[cfg(not(unix))]
30pub const EVENT_PAYLOAD_MAX: usize = 256;
31
32const HEADER_LEN: usize = 2 + 2 + 8;
33const FRAME_KIND_EVENT: u8 = 1;
34pub const EVENT_RING_KIND: u8 = 220;
35
36/// Dedicated single-ring kind for raw Orbit events.
37#[derive(Clone, Debug)]
38struct OrbitEventRecord;
39
40impl OrbitTyped for OrbitEventRecord {
41    // Hand-picked V0 kind. Build-time KIND allocation will replace
42    // these manual values later.
43    const KIND: u8 = EVENT_RING_KIND;
44}
45
46/// Cursor for one event subscriber.
47///
48/// The cursor stores the next ring counter the subscriber should try to
49/// read. It is intentionally caller-owned so different consumers can
50/// advance independently.
51#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52pub struct OrbitEventCursor {
53    inner: RingCursor,
54}
55
56impl OrbitEventCursor {
57    /// Start from the beginning of the ring history that is still
58    /// available.
59    pub const fn from_start() -> Self {
60        Self {
61            inner: RingCursor::from_start(),
62        }
63    }
64
65    /// Start from a known next counter.
66    pub const fn from_counter(next_counter: u64) -> Self {
67        Self {
68            inner: RingCursor::from_counter(next_counter),
69        }
70    }
71
72    /// The next counter this cursor will read.
73    pub const fn next_counter(self) -> u64 {
74        self.inner.next_counter()
75    }
76}
77
78/// One decoded event from the shared event ring.
79#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct OrbitEvent {
81    pub id: NetId64,
82    pub topic: String,
83    pub payload: Vec<u8>,
84    pub timestamp_ms: u64,
85}
86
87/// Result of polling an event cursor.
88#[derive(Clone, Debug, Default, PartialEq, Eq)]
89pub struct OrbitEventPoll {
90    pub events: Vec<OrbitEvent>,
91    /// Number of counters that could not be delivered because the
92    /// subscriber lagged past the ring capacity, a slot was empty, or a
93    /// slot had already wrapped to another counter.
94    pub lagged: u64,
95}
96
97impl OrbitEventPoll {
98    pub fn is_empty(&self) -> bool {
99        self.events.is_empty() && self.lagged == 0
100    }
101}
102
103/// Fleet-shared raw event bus. Cheap to clone.
104#[derive(Clone)]
105pub struct OrbitEventBus {
106    fleet: Arc<Fleet>,
107}
108
109impl OrbitEventBus {
110    pub fn new(fleet: Arc<Fleet>) -> Self {
111        Self { fleet }
112    }
113
114    pub fn node_id(&self) -> NodeId {
115        self.fleet.node_id()
116    }
117
118    /// Cursor that starts after all events currently in the ring.
119    /// Useful for subscribers that only want future events.
120    pub fn cursor_at_head(&self) -> OrbitEventCursor {
121        OrbitEventCursor {
122            inner: self.fleet.cursor_at_head::<OrbitEventRecord>(),
123        }
124    }
125
126    /// Cursor that starts at counter 0 and replays whatever history has
127    /// not wrapped out of the ring.
128    pub const fn cursor_from_start(&self) -> OrbitEventCursor {
129        let _ = self;
130        OrbitEventCursor::from_start()
131    }
132
133    /// Clear the shared event ring.
134    ///
135    /// Intended for owner-controlled boot-time cleanup before peer
136    /// processes publish events into the ring. This prevents stale
137    /// events from a previous process lifetime from being replayed or
138    /// counted as current runtime state.
139    pub fn reset_ring(&self) -> Result<()> {
140        self.fleet
141            .reset_ring::<OrbitEventRecord>()
142            .map_err(Error::Io)
143    }
144
145    /// Publish one event under `topic`.
146    pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<NetId64> {
147        let timestamp_ms = now_ms();
148        let frame = encode_frame(topic.as_bytes(), payload, timestamp_ms)?;
149        Ok(self
150            .fleet
151            .publish::<OrbitEventRecord>(FRAME_KIND_EVENT, timestamp_ms, frame))
152    }
153
154    /// Poll all events since `cursor`, advancing the cursor to the
155    /// current ring head. If the cursor has fallen behind the ring
156    /// capacity, older overwritten counters are reported as `lagged`.
157    pub fn poll(&self, cursor: &mut OrbitEventCursor) -> OrbitEventPoll {
158        let ring_poll = self.fleet.poll_ring::<OrbitEventRecord>(&mut cursor.inner);
159        let mut lagged = ring_poll.loss.total();
160        let mut events = Vec::new();
161        for frame in ring_poll.frames {
162            let Some(decoded) = decode_frame(&frame.payload) else {
163                lagged = lagged.saturating_add(1);
164                continue;
165            };
166            events.push(OrbitEvent {
167                id: frame.id,
168                topic: String::from_utf8_lossy(decoded.topic).into_owned(),
169                payload: decoded.payload.to_vec(),
170                timestamp_ms: decoded.timestamp_ms,
171            });
172        }
173
174        OrbitEventPoll { events, lagged }
175    }
176
177    /// Poll and keep only events whose topic matches `topic`.
178    ///
179    /// The cursor still advances past all events, including filtered
180    /// topics. Use separate cursors for independent consumers.
181    pub fn poll_topic(&self, cursor: &mut OrbitEventCursor, topic: &str) -> OrbitEventPoll {
182        let mut poll = self.poll(cursor);
183        poll.events.retain(|event| event.topic == topic);
184        poll
185    }
186}
187
188struct DecodedFrame<'a> {
189    topic: &'a [u8],
190    payload: &'a [u8],
191    timestamp_ms: u64,
192}
193
194fn encode_frame(topic: &[u8], payload: &[u8], timestamp_ms: u64) -> Result<Bytes> {
195    let total = HEADER_LEN + topic.len() + payload.len();
196    if topic.len() > u16::MAX as usize
197        || payload.len() > u16::MAX as usize
198        || total > EVENT_PAYLOAD_MAX
199    {
200        return Err(Error::EventFrameTooLarge {
201            topic_len: topic.len(),
202            payload_len: payload.len(),
203            max_payload: EVENT_PAYLOAD_MAX,
204        });
205    }
206
207    let mut buf = BytesMut::with_capacity(total);
208    buf.put_u16_le(topic.len() as u16);
209    buf.put_u16_le(payload.len() as u16);
210    buf.put_u64_le(timestamp_ms);
211    buf.put_slice(topic);
212    buf.put_slice(payload);
213    Ok(buf.freeze())
214}
215
216fn decode_frame(payload: &Bytes) -> Option<DecodedFrame<'_>> {
217    if payload.len() < HEADER_LEN {
218        return None;
219    }
220
221    let topic_len = u16::from_le_bytes(payload[0..2].try_into().ok()?) as usize;
222    let payload_len = u16::from_le_bytes(payload[2..4].try_into().ok()?) as usize;
223    let timestamp_ms = u64::from_le_bytes(payload[4..12].try_into().ok()?);
224    let topic_start = HEADER_LEN;
225    let topic_end = topic_start.checked_add(topic_len)?;
226    let payload_end = topic_end.checked_add(payload_len)?;
227    if payload.len() < payload_end {
228        return None;
229    }
230
231    Some(DecodedFrame {
232        topic: &payload[topic_start..topic_end],
233        payload: &payload[topic_end..payload_end],
234        timestamp_ms,
235    })
236}
237
238fn now_ms() -> u64 {
239    SystemTime::now()
240        .duration_since(UNIX_EPOCH)
241        .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
242        .unwrap_or(0)
243}
244
245#[cfg(test)]
246mod tests {
247    use std::sync::Arc;
248
249    use super::OrbitEventBus;
250    use crate::Fleet;
251
252    #[test]
253    fn polls_events_since_cursor() {
254        let fleet = Arc::new(Fleet::join("event_poll", 1).expect("fleet"));
255        let bus = OrbitEventBus::new(fleet);
256        let mut cursor = bus.cursor_from_start();
257
258        bus.publish("worker.booted", b"w1").expect("publish first");
259        bus.publish("cache.reset", b"all").expect("publish second");
260
261        let poll = bus.poll(&mut cursor);
262        assert_eq!(poll.lagged, 0);
263        assert_eq!(poll.events.len(), 2);
264        assert_eq!(poll.events[0].topic, "worker.booted");
265        assert_eq!(poll.events[0].payload, b"w1");
266        assert_eq!(poll.events[1].topic, "cache.reset");
267        assert_eq!(poll.events[1].payload, b"all");
268
269        assert!(bus.poll(&mut cursor).is_empty());
270    }
271
272    #[test]
273    fn cursor_at_head_only_reads_future_events() {
274        let fleet = Arc::new(Fleet::join("event_head", 1).expect("fleet"));
275        let bus = OrbitEventBus::new(fleet);
276
277        bus.publish("old", b"ignored").expect("publish old");
278        let mut cursor = bus.cursor_at_head();
279        bus.publish("new", b"seen").expect("publish new");
280
281        let poll = bus.poll(&mut cursor);
282        assert_eq!(poll.events.len(), 1);
283        assert_eq!(poll.events[0].topic, "new");
284    }
285
286    #[test]
287    fn topic_filter_advances_cursor() {
288        let fleet = Arc::new(Fleet::join("event_topic", 1).expect("fleet"));
289        let bus = OrbitEventBus::new(fleet);
290        let mut cursor = bus.cursor_from_start();
291
292        bus.publish("a", b"1").expect("publish a");
293        bus.publish("b", b"2").expect("publish b");
294
295        let poll = bus.poll_topic(&mut cursor, "b");
296        assert_eq!(poll.events.len(), 1);
297        assert_eq!(poll.events[0].payload, b"2");
298        assert!(bus.poll(&mut cursor).is_empty());
299    }
300
301    #[test]
302    fn reports_lag_when_cursor_falls_behind_capacity() {
303        let fleet = Arc::new(Fleet::join("event_lag", 1).expect("fleet"));
304        fleet.ring_with_capacity::<super::OrbitEventRecord>(2);
305        let bus = OrbitEventBus::new(fleet);
306        let mut cursor = bus.cursor_from_start();
307
308        bus.publish("n", b"1").expect("publish 1");
309        bus.publish("n", b"2").expect("publish 2");
310        bus.publish("n", b"3").expect("publish 3");
311
312        let poll = bus.poll(&mut cursor);
313        assert_eq!(poll.lagged, 1);
314        assert_eq!(poll.events.len(), 2);
315        assert_eq!(poll.events[0].payload, b"2");
316        assert_eq!(poll.events[1].payload, b"3");
317    }
318}