1use std::collections::HashMap;
8
9#[allow(dead_code)]
15#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
16pub enum EventPriority {
17 High,
19 Normal,
21 Low,
23}
24
25#[allow(dead_code)]
27#[derive(Clone, Debug)]
28pub struct EventRecord {
29 pub id: u64,
31 pub topic: String,
33 pub payload: String,
35 pub priority: EventPriority,
37 pub timestamp_ms: u64,
39}
40
41#[allow(dead_code)]
43pub type SubscriberId = u64;
44
45#[allow(dead_code)]
47pub type PendingEvents = Vec<EventRecord>;
48
49#[allow(dead_code)]
51pub struct EventBusTopic {
52 pub pending: Vec<EventRecord>,
54 pub dispatched: Vec<EventRecord>,
56 pub subscribers: HashMap<String, Vec<SubscriberId>>,
58 pub next_sub_id: SubscriberId,
60 pub next_event_id: u64,
62 pub last_event_ts: u64,
64}
65
66#[allow(dead_code)]
72pub fn new_event_bus() -> EventBusTopic {
73 EventBusTopic {
74 pending: Vec::new(),
75 dispatched: Vec::new(),
76 subscribers: HashMap::new(),
77 next_sub_id: 1,
78 next_event_id: 1,
79 last_event_ts: 0,
80 }
81}
82
83#[allow(dead_code)]
90pub fn publish(bus: &mut EventBusTopic, topic: &str, payload: &str, timestamp_ms: u64) -> u64 {
91 publish_priority(bus, topic, payload, EventPriority::Normal, timestamp_ms)
92}
93
94#[allow(dead_code)]
97pub fn publish_priority(
98 bus: &mut EventBusTopic,
99 topic: &str,
100 payload: &str,
101 priority: EventPriority,
102 timestamp_ms: u64,
103) -> u64 {
104 let id = bus.next_event_id;
105 bus.next_event_id += 1;
106 bus.last_event_ts = timestamp_ms;
107 let record = EventRecord {
108 id,
109 topic: topic.to_string(),
110 payload: payload.to_string(),
111 priority,
112 timestamp_ms,
113 };
114 bus.pending.push(record);
115 id
116}
117
118#[allow(dead_code)]
125pub fn subscribe(bus: &mut EventBusTopic, topic: &str) -> SubscriberId {
126 let id = bus.next_sub_id;
127 bus.next_sub_id += 1;
128 bus.subscribers
129 .entry(topic.to_string())
130 .or_default()
131 .push(id);
132 id
133}
134
135#[allow(dead_code)]
138pub fn unsubscribe(bus: &mut EventBusTopic, topic: &str, sub_id: SubscriberId) -> bool {
139 if let Some(subs) = bus.subscribers.get_mut(topic) {
140 if let Some(pos) = subs.iter().position(|&s| s == sub_id) {
141 subs.remove(pos);
142 return true;
143 }
144 }
145 false
146}
147
148#[allow(dead_code)]
156pub fn dispatch_pending(bus: &mut EventBusTopic) -> usize {
157 bus.pending.sort_by_key(|e| e.priority);
159 let count = bus.pending.len();
160 let drained: Vec<EventRecord> = bus.pending.drain(..).collect();
161 bus.dispatched.extend(drained);
162 count
163}
164
165#[allow(dead_code)]
167pub fn drain_topic(bus: &mut EventBusTopic, topic: &str) -> PendingEvents {
168 let mut drained = Vec::new();
169 let mut remaining = Vec::new();
170 for ev in bus.pending.drain(..) {
171 if ev.topic == topic {
172 drained.push(ev);
173 } else {
174 remaining.push(ev);
175 }
176 }
177 bus.pending = remaining;
178 drained
179}
180
181#[allow(dead_code)]
187pub fn pending_count(bus: &EventBusTopic) -> usize {
188 bus.pending.len()
189}
190
191#[allow(dead_code)]
193pub fn topic_subscriber_count(bus: &EventBusTopic, topic: &str) -> usize {
194 bus.subscribers.get(topic).map_or(0, |v| v.len())
195}
196
197#[allow(dead_code)]
199pub fn event_count_total(bus: &EventBusTopic) -> usize {
200 bus.dispatched.len()
201}
202
203#[allow(dead_code)]
205pub fn has_subscribers(bus: &EventBusTopic, topic: &str) -> bool {
206 bus.subscribers.get(topic).is_some_and(|v| !v.is_empty())
207}
208
209#[allow(dead_code)]
211pub fn last_event_time(bus: &EventBusTopic) -> u64 {
212 bus.last_event_ts
213}
214
215#[allow(dead_code)]
221pub fn clear_event_bus(bus: &mut EventBusTopic) {
222 bus.pending.clear();
223 bus.dispatched.clear();
224 bus.last_event_ts = 0;
225}
226
227#[allow(dead_code)]
233pub fn event_bus_to_json(bus: &EventBusTopic) -> String {
234 let pending_json: Vec<String> = bus
235 .pending
236 .iter()
237 .map(|e| {
238 format!(
239 r#"{{"id":{},"topic":"{}","priority":"{:?}","ts":{}}}"#,
240 e.id, e.topic, e.priority, e.timestamp_ms
241 )
242 })
243 .collect();
244
245 let dispatched_json: Vec<String> = bus
246 .dispatched
247 .iter()
248 .map(|e| {
249 format!(
250 r#"{{"id":{},"topic":"{}","priority":"{:?}","ts":{}}}"#,
251 e.id, e.topic, e.priority, e.timestamp_ms
252 )
253 })
254 .collect();
255
256 format!(
257 r#"{{"pending_count":{},"dispatched_count":{},"pending":[{}],"dispatched":[{}]}}"#,
258 bus.pending.len(),
259 bus.dispatched.len(),
260 pending_json.join(","),
261 dispatched_json.join(",")
262 )
263}
264
265#[cfg(test)]
270mod tests {
271 use super::*;
272
273 #[test]
274 fn test_new_event_bus_empty() {
275 let bus = new_event_bus();
276 assert_eq!(pending_count(&bus), 0);
277 assert_eq!(event_count_total(&bus), 0);
278 }
279
280 #[test]
281 fn test_publish_increments_pending() {
282 let mut bus = new_event_bus();
283 publish(&mut bus, "topic_a", "null", 0);
284 publish(&mut bus, "topic_a", "null", 1);
285 assert_eq!(pending_count(&bus), 2);
286 }
287
288 #[test]
289 fn test_publish_returns_sequential_ids() {
290 let mut bus = new_event_bus();
291 let id1 = publish(&mut bus, "t", "null", 0);
292 let id2 = publish(&mut bus, "t", "null", 0);
293 assert_eq!(id2, id1 + 1);
294 }
295
296 #[test]
297 fn test_subscribe_returns_unique_ids() {
298 let mut bus = new_event_bus();
299 let s1 = subscribe(&mut bus, "topic_x");
300 let s2 = subscribe(&mut bus, "topic_x");
301 assert_ne!(s1, s2);
302 }
303
304 #[test]
305 fn test_topic_subscriber_count() {
306 let mut bus = new_event_bus();
307 subscribe(&mut bus, "topic_a");
308 subscribe(&mut bus, "topic_a");
309 subscribe(&mut bus, "topic_b");
310 assert_eq!(topic_subscriber_count(&bus, "topic_a"), 2);
311 assert_eq!(topic_subscriber_count(&bus, "topic_b"), 1);
312 }
313
314 #[test]
315 fn test_has_subscribers_false_on_empty() {
316 let bus = new_event_bus();
317 assert!(!has_subscribers(&bus, "no_topic"));
318 }
319
320 #[test]
321 fn test_has_subscribers_true_after_subscribe() {
322 let mut bus = new_event_bus();
323 subscribe(&mut bus, "events");
324 assert!(has_subscribers(&bus, "events"));
325 }
326
327 #[test]
328 fn test_unsubscribe_removes_subscriber() {
329 let mut bus = new_event_bus();
330 let sid = subscribe(&mut bus, "topic");
331 assert!(unsubscribe(&mut bus, "topic", sid));
332 assert!(!has_subscribers(&bus, "topic"));
333 }
334
335 #[test]
336 fn test_unsubscribe_returns_false_for_unknown() {
337 let mut bus = new_event_bus();
338 assert!(!unsubscribe(&mut bus, "unknown", 999));
339 }
340
341 #[test]
342 fn test_dispatch_pending_moves_to_dispatched() {
343 let mut bus = new_event_bus();
344 publish(&mut bus, "t", "null", 0);
345 publish(&mut bus, "t", "null", 1);
346 let n = dispatch_pending(&mut bus);
347 assert_eq!(n, 2);
348 assert_eq!(pending_count(&bus), 0);
349 assert_eq!(event_count_total(&bus), 2);
350 }
351
352 #[test]
353 fn test_dispatch_priority_ordering() {
354 let mut bus = new_event_bus();
355 publish_priority(&mut bus, "t", "low", EventPriority::Low, 0);
356 publish_priority(&mut bus, "t", "high", EventPriority::High, 1);
357 publish_priority(&mut bus, "t", "normal", EventPriority::Normal, 2);
358 dispatch_pending(&mut bus);
359 assert_eq!(bus.dispatched[0].payload, "high");
361 assert_eq!(bus.dispatched[1].payload, "normal");
362 assert_eq!(bus.dispatched[2].payload, "low");
363 }
364
365 #[test]
366 fn test_clear_event_bus_resets_counts() {
367 let mut bus = new_event_bus();
368 publish(&mut bus, "t", "null", 5);
369 dispatch_pending(&mut bus);
370 clear_event_bus(&mut bus);
371 assert_eq!(pending_count(&bus), 0);
372 assert_eq!(event_count_total(&bus), 0);
373 assert_eq!(last_event_time(&bus), 0);
374 }
375
376 #[test]
377 fn test_last_event_time_updated() {
378 let mut bus = new_event_bus();
379 publish(&mut bus, "t", "null", 42);
380 assert_eq!(last_event_time(&bus), 42);
381 }
382
383 #[test]
384 fn test_drain_topic_removes_only_matching() {
385 let mut bus = new_event_bus();
386 publish(&mut bus, "alpha", "a1", 0);
387 publish(&mut bus, "beta", "b1", 1);
388 publish(&mut bus, "alpha", "a2", 2);
389 let drained = drain_topic(&mut bus, "alpha");
390 assert_eq!(drained.len(), 2);
391 assert_eq!(pending_count(&bus), 1);
392 assert_eq!(bus.pending[0].topic, "beta");
393 }
394
395 #[test]
396 fn test_event_bus_to_json_contains_counts() {
397 let mut bus = new_event_bus();
398 publish(&mut bus, "t", "null", 0);
399 let json = event_bus_to_json(&bus);
400 assert!(json.contains("pending_count"));
401 assert!(json.contains("dispatched_count"));
402 }
403
404 #[test]
405 fn test_publish_priority_high() {
406 let mut bus = new_event_bus();
407 let id = publish_priority(&mut bus, "t", "hi", EventPriority::High, 99);
408 assert_eq!(bus.pending[0].priority, EventPriority::High);
409 assert_eq!(bus.pending[0].id, id);
410 }
411
412 #[test]
413 fn test_topic_subscriber_count_zero_for_unknown() {
414 let bus = new_event_bus();
415 assert_eq!(topic_subscriber_count(&bus, "ghost"), 0);
416 }
417
418 #[test]
419 fn test_multiple_topics_independent() {
420 let mut bus = new_event_bus();
421 subscribe(&mut bus, "a");
422 subscribe(&mut bus, "b");
423 assert_eq!(topic_subscriber_count(&bus, "a"), 1);
424 assert_eq!(topic_subscriber_count(&bus, "b"), 1);
425 assert_eq!(topic_subscriber_count(&bus, "c"), 0);
426 }
427}