1use std::sync::Arc;
14use std::time::{SystemTime, UNIX_EPOCH};
15
16use bytes::{BufMut, Bytes, BytesMut};
17
18use crate::error::{Error, Result};
19use crate::fleet::{Fleet, NodeId};
20use crate::id::NetId64;
21use crate::ring::cursor::RingCursor;
22use crate::typed::OrbitTyped;
23
24#[cfg(unix)]
28pub const EVENT_PAYLOAD_MAX: usize = crate::ring::shm::PAYLOAD_MAX;
29#[cfg(not(unix))]
30pub const EVENT_PAYLOAD_MAX: usize = 256;
31
32const HEADER_LEN: usize = 2 + 2 + 8;
33const FRAME_KIND_EVENT: u8 = 1;
34pub const EVENT_RING_KIND: u8 = 220;
35
36#[derive(Clone, Debug)]
38struct OrbitEventRecord;
39
40impl OrbitTyped for OrbitEventRecord {
41 const KIND: u8 = EVENT_RING_KIND;
44}
45
46#[derive(Clone, Copy, Debug, PartialEq, Eq)]
52pub struct OrbitEventCursor {
53 inner: RingCursor,
54}
55
56impl OrbitEventCursor {
57 pub const fn from_start() -> Self {
60 Self {
61 inner: RingCursor::from_start(),
62 }
63 }
64
65 pub const fn from_counter(next_counter: u64) -> Self {
67 Self {
68 inner: RingCursor::from_counter(next_counter),
69 }
70 }
71
72 pub const fn next_counter(self) -> u64 {
74 self.inner.next_counter()
75 }
76}
77
78#[derive(Clone, Debug, PartialEq, Eq)]
80pub struct OrbitEvent {
81 pub id: NetId64,
82 pub topic: String,
83 pub payload: Vec<u8>,
84 pub timestamp_ms: u64,
85}
86
87#[derive(Clone, Debug, Default, PartialEq, Eq)]
89pub struct OrbitEventPoll {
90 pub events: Vec<OrbitEvent>,
91 pub lagged: u64,
95}
96
97impl OrbitEventPoll {
98 pub fn is_empty(&self) -> bool {
99 self.events.is_empty() && self.lagged == 0
100 }
101}
102
103#[derive(Clone)]
105pub struct OrbitEventBus {
106 fleet: Arc<Fleet>,
107}
108
109impl OrbitEventBus {
110 pub fn new(fleet: Arc<Fleet>) -> Self {
111 Self { fleet }
112 }
113
114 pub fn node_id(&self) -> NodeId {
115 self.fleet.node_id()
116 }
117
118 pub fn cursor_at_head(&self) -> OrbitEventCursor {
121 OrbitEventCursor {
122 inner: self.fleet.cursor_at_head::<OrbitEventRecord>(),
123 }
124 }
125
126 pub const fn cursor_from_start(&self) -> OrbitEventCursor {
129 let _ = self;
130 OrbitEventCursor::from_start()
131 }
132
133 pub fn reset_ring(&self) -> Result<()> {
140 self.fleet
141 .reset_ring::<OrbitEventRecord>()
142 .map_err(Error::Io)
143 }
144
145 pub fn publish(&self, topic: &str, payload: &[u8]) -> Result<NetId64> {
147 let timestamp_ms = now_ms();
148 let frame = encode_frame(topic.as_bytes(), payload, timestamp_ms)?;
149 Ok(self
150 .fleet
151 .publish::<OrbitEventRecord>(FRAME_KIND_EVENT, timestamp_ms, frame))
152 }
153
154 pub fn poll(&self, cursor: &mut OrbitEventCursor) -> OrbitEventPoll {
158 let ring_poll = self.fleet.poll_ring::<OrbitEventRecord>(&mut cursor.inner);
159 let mut lagged = ring_poll.loss.total();
160 let mut events = Vec::new();
161 for frame in ring_poll.frames {
162 let Some(decoded) = decode_frame(&frame.payload) else {
163 lagged = lagged.saturating_add(1);
164 continue;
165 };
166 events.push(OrbitEvent {
167 id: frame.id,
168 topic: String::from_utf8_lossy(decoded.topic).into_owned(),
169 payload: decoded.payload.to_vec(),
170 timestamp_ms: decoded.timestamp_ms,
171 });
172 }
173
174 OrbitEventPoll { events, lagged }
175 }
176
177 pub fn poll_topic(&self, cursor: &mut OrbitEventCursor, topic: &str) -> OrbitEventPoll {
182 let mut poll = self.poll(cursor);
183 poll.events.retain(|event| event.topic == topic);
184 poll
185 }
186}
187
188struct DecodedFrame<'a> {
189 topic: &'a [u8],
190 payload: &'a [u8],
191 timestamp_ms: u64,
192}
193
194fn encode_frame(topic: &[u8], payload: &[u8], timestamp_ms: u64) -> Result<Bytes> {
195 let total = HEADER_LEN + topic.len() + payload.len();
196 if topic.len() > u16::MAX as usize
197 || payload.len() > u16::MAX as usize
198 || total > EVENT_PAYLOAD_MAX
199 {
200 return Err(Error::EventFrameTooLarge {
201 topic_len: topic.len(),
202 payload_len: payload.len(),
203 max_payload: EVENT_PAYLOAD_MAX,
204 });
205 }
206
207 let mut buf = BytesMut::with_capacity(total);
208 buf.put_u16_le(topic.len() as u16);
209 buf.put_u16_le(payload.len() as u16);
210 buf.put_u64_le(timestamp_ms);
211 buf.put_slice(topic);
212 buf.put_slice(payload);
213 Ok(buf.freeze())
214}
215
216fn decode_frame(payload: &Bytes) -> Option<DecodedFrame<'_>> {
217 if payload.len() < HEADER_LEN {
218 return None;
219 }
220
221 let topic_len = u16::from_le_bytes(payload[0..2].try_into().ok()?) as usize;
222 let payload_len = u16::from_le_bytes(payload[2..4].try_into().ok()?) as usize;
223 let timestamp_ms = u64::from_le_bytes(payload[4..12].try_into().ok()?);
224 let topic_start = HEADER_LEN;
225 let topic_end = topic_start.checked_add(topic_len)?;
226 let payload_end = topic_end.checked_add(payload_len)?;
227 if payload.len() < payload_end {
228 return None;
229 }
230
231 Some(DecodedFrame {
232 topic: &payload[topic_start..topic_end],
233 payload: &payload[topic_end..payload_end],
234 timestamp_ms,
235 })
236}
237
238fn now_ms() -> u64 {
239 SystemTime::now()
240 .duration_since(UNIX_EPOCH)
241 .map(|d| d.as_millis().min(u128::from(u64::MAX)) as u64)
242 .unwrap_or(0)
243}
244
245#[cfg(test)]
246mod tests {
247 use std::sync::Arc;
248
249 use super::OrbitEventBus;
250 use crate::Fleet;
251
252 #[test]
253 fn polls_events_since_cursor() {
254 let fleet = Arc::new(Fleet::join("event_poll", 1).expect("fleet"));
255 let bus = OrbitEventBus::new(fleet);
256 let mut cursor = bus.cursor_from_start();
257
258 bus.publish("worker.booted", b"w1").expect("publish first");
259 bus.publish("cache.reset", b"all").expect("publish second");
260
261 let poll = bus.poll(&mut cursor);
262 assert_eq!(poll.lagged, 0);
263 assert_eq!(poll.events.len(), 2);
264 assert_eq!(poll.events[0].topic, "worker.booted");
265 assert_eq!(poll.events[0].payload, b"w1");
266 assert_eq!(poll.events[1].topic, "cache.reset");
267 assert_eq!(poll.events[1].payload, b"all");
268
269 assert!(bus.poll(&mut cursor).is_empty());
270 }
271
272 #[test]
273 fn cursor_at_head_only_reads_future_events() {
274 let fleet = Arc::new(Fleet::join("event_head", 1).expect("fleet"));
275 let bus = OrbitEventBus::new(fleet);
276
277 bus.publish("old", b"ignored").expect("publish old");
278 let mut cursor = bus.cursor_at_head();
279 bus.publish("new", b"seen").expect("publish new");
280
281 let poll = bus.poll(&mut cursor);
282 assert_eq!(poll.events.len(), 1);
283 assert_eq!(poll.events[0].topic, "new");
284 }
285
286 #[test]
287 fn topic_filter_advances_cursor() {
288 let fleet = Arc::new(Fleet::join("event_topic", 1).expect("fleet"));
289 let bus = OrbitEventBus::new(fleet);
290 let mut cursor = bus.cursor_from_start();
291
292 bus.publish("a", b"1").expect("publish a");
293 bus.publish("b", b"2").expect("publish b");
294
295 let poll = bus.poll_topic(&mut cursor, "b");
296 assert_eq!(poll.events.len(), 1);
297 assert_eq!(poll.events[0].payload, b"2");
298 assert!(bus.poll(&mut cursor).is_empty());
299 }
300
301 #[test]
302 fn reports_lag_when_cursor_falls_behind_capacity() {
303 let fleet = Arc::new(Fleet::join("event_lag", 1).expect("fleet"));
304 fleet.ring_with_capacity::<super::OrbitEventRecord>(2);
305 let bus = OrbitEventBus::new(fleet);
306 let mut cursor = bus.cursor_from_start();
307
308 bus.publish("n", b"1").expect("publish 1");
309 bus.publish("n", b"2").expect("publish 2");
310 bus.publish("n", b"3").expect("publish 3");
311
312 let poll = bus.poll(&mut cursor);
313 assert_eq!(poll.lagged, 1);
314 assert_eq!(poll.events.len(), 2);
315 assert_eq!(poll.events[0].payload, b"2");
316 assert_eq!(poll.events[1].payload, b"3");
317 }
318}