Skip to main content

oxihuman_core/
topic_event_bus.rs

1//! Publish-subscribe event bus for decoupled component communication.
2//!
3//! This module provides a topic-based pub/sub event bus with priority queuing,
4//! subscriber management, and JSON-serializable event records. It is distinct
5//! from the simpler kind-based `event_bus` module.
6
7use std::collections::HashMap;
8
9// ---------------------------------------------------------------------------
10// Types
11// ---------------------------------------------------------------------------
12
13/// Priority level for an event.
14#[allow(dead_code)]
15#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
16pub enum EventPriority {
17    /// Processed before normal and low events.
18    High,
19    /// Default priority.
20    Normal,
21    /// Processed last.
22    Low,
23}
24
25/// A record of a single published event stored in the bus queue.
26#[allow(dead_code)]
27#[derive(Clone, Debug)]
28pub struct EventRecord {
29    /// Unique sequential id assigned at publish time.
30    pub id: u64,
31    /// Topic string the event was published on.
32    pub topic: String,
33    /// JSON-encoded payload (may be `"null"` if empty).
34    pub payload: String,
35    /// Priority of this event.
36    pub priority: EventPriority,
37    /// Monotonic timestamp in milliseconds (caller-provided).
38    pub timestamp_ms: u64,
39}
40
41/// Type alias for a subscriber handler id.
42#[allow(dead_code)]
43pub type SubscriberId = u64;
44
45/// Type alias for the pending events list.
46#[allow(dead_code)]
47pub type PendingEvents = Vec<EventRecord>;
48
49/// Publish-subscribe event bus with topic-based routing and priority support.
50#[allow(dead_code)]
51pub struct EventBusTopic {
52    /// Pending (unprocessed) events, in insertion order.
53    pub pending: Vec<EventRecord>,
54    /// All dispatched events, oldest first.
55    pub dispatched: Vec<EventRecord>,
56    /// Subscribers: topic → list of subscriber ids.
57    pub subscribers: HashMap<String, Vec<SubscriberId>>,
58    /// Next subscriber id to assign.
59    pub next_sub_id: SubscriberId,
60    /// Next event id to assign.
61    pub next_event_id: u64,
62    /// Timestamp of the most recently published event (0 if none).
63    pub last_event_ts: u64,
64}
65
66// ---------------------------------------------------------------------------
67// Construction
68// ---------------------------------------------------------------------------
69
70/// Create a new, empty event bus.
71#[allow(dead_code)]
72pub fn new_event_bus() -> EventBusTopic {
73    EventBusTopic {
74        pending: Vec::new(),
75        dispatched: Vec::new(),
76        subscribers: HashMap::new(),
77        next_sub_id: 1,
78        next_event_id: 1,
79        last_event_ts: 0,
80    }
81}
82
83// ---------------------------------------------------------------------------
84// Publishing
85// ---------------------------------------------------------------------------
86
87/// Publish an event with Normal priority on the given topic.
88/// Returns the assigned event id.
89#[allow(dead_code)]
90pub fn publish(bus: &mut EventBusTopic, topic: &str, payload: &str, timestamp_ms: u64) -> u64 {
91    publish_priority(bus, topic, payload, EventPriority::Normal, timestamp_ms)
92}
93
94/// Publish an event with an explicit priority on the given topic.
95/// Returns the assigned event id.
96#[allow(dead_code)]
97pub fn publish_priority(
98    bus: &mut EventBusTopic,
99    topic: &str,
100    payload: &str,
101    priority: EventPriority,
102    timestamp_ms: u64,
103) -> u64 {
104    let id = bus.next_event_id;
105    bus.next_event_id += 1;
106    bus.last_event_ts = timestamp_ms;
107    let record = EventRecord {
108        id,
109        topic: topic.to_string(),
110        payload: payload.to_string(),
111        priority,
112        timestamp_ms,
113    };
114    bus.pending.push(record);
115    id
116}
117
118// ---------------------------------------------------------------------------
119// Subscription management
120// ---------------------------------------------------------------------------
121
122/// Register a subscriber for the given topic.
123/// Returns the new subscriber id.
124#[allow(dead_code)]
125pub fn subscribe(bus: &mut EventBusTopic, topic: &str) -> SubscriberId {
126    let id = bus.next_sub_id;
127    bus.next_sub_id += 1;
128    bus.subscribers
129        .entry(topic.to_string())
130        .or_default()
131        .push(id);
132    id
133}
134
135/// Remove a subscriber by id from the given topic.
136/// Returns `true` if the subscriber was found and removed.
137#[allow(dead_code)]
138pub fn unsubscribe(bus: &mut EventBusTopic, topic: &str, sub_id: SubscriberId) -> bool {
139    if let Some(subs) = bus.subscribers.get_mut(topic) {
140        if let Some(pos) = subs.iter().position(|&s| s == sub_id) {
141            subs.remove(pos);
142            return true;
143        }
144    }
145    false
146}
147
148// ---------------------------------------------------------------------------
149// Dispatch
150// ---------------------------------------------------------------------------
151
152/// Dispatch all pending events in priority order (High → Normal → Low).
153/// Moves them from `pending` to `dispatched`.
154/// Returns the number of events dispatched.
155#[allow(dead_code)]
156pub fn dispatch_pending(bus: &mut EventBusTopic) -> usize {
157    // Sort pending by priority (High first = lowest enum ordinal).
158    bus.pending.sort_by_key(|e| e.priority);
159    let count = bus.pending.len();
160    let drained: Vec<EventRecord> = bus.pending.drain(..).collect();
161    bus.dispatched.extend(drained);
162    count
163}
164
165/// Drain all pending events for a specific topic, returning them.
166#[allow(dead_code)]
167pub fn drain_topic(bus: &mut EventBusTopic, topic: &str) -> PendingEvents {
168    let mut drained = Vec::new();
169    let mut remaining = Vec::new();
170    for ev in bus.pending.drain(..) {
171        if ev.topic == topic {
172            drained.push(ev);
173        } else {
174            remaining.push(ev);
175        }
176    }
177    bus.pending = remaining;
178    drained
179}
180
181// ---------------------------------------------------------------------------
182// Queries
183// ---------------------------------------------------------------------------
184
185/// Return the number of pending (not yet dispatched) events.
186#[allow(dead_code)]
187pub fn pending_count(bus: &EventBusTopic) -> usize {
188    bus.pending.len()
189}
190
191/// Return the number of subscribers for a given topic.
192#[allow(dead_code)]
193pub fn topic_subscriber_count(bus: &EventBusTopic, topic: &str) -> usize {
194    bus.subscribers.get(topic).map_or(0, |v| v.len())
195}
196
197/// Return the total number of dispatched events since creation.
198#[allow(dead_code)]
199pub fn event_count_total(bus: &EventBusTopic) -> usize {
200    bus.dispatched.len()
201}
202
203/// Return `true` if at least one subscriber exists for the given topic.
204#[allow(dead_code)]
205pub fn has_subscribers(bus: &EventBusTopic, topic: &str) -> bool {
206    bus.subscribers.get(topic).is_some_and(|v| !v.is_empty())
207}
208
209/// Return the timestamp of the most recently published event (0 if none).
210#[allow(dead_code)]
211pub fn last_event_time(bus: &EventBusTopic) -> u64 {
212    bus.last_event_ts
213}
214
215// ---------------------------------------------------------------------------
216// Clear
217// ---------------------------------------------------------------------------
218
219/// Clear pending events and dispatched history, keeping subscribers.
220#[allow(dead_code)]
221pub fn clear_event_bus(bus: &mut EventBusTopic) {
222    bus.pending.clear();
223    bus.dispatched.clear();
224    bus.last_event_ts = 0;
225}
226
227// ---------------------------------------------------------------------------
228// JSON serialisation (minimal, no external deps)
229// ---------------------------------------------------------------------------
230
231/// Serialise the entire bus state to a compact JSON string.
232#[allow(dead_code)]
233pub fn event_bus_to_json(bus: &EventBusTopic) -> String {
234    let pending_json: Vec<String> = bus
235        .pending
236        .iter()
237        .map(|e| {
238            format!(
239                r#"{{"id":{},"topic":"{}","priority":"{:?}","ts":{}}}"#,
240                e.id, e.topic, e.priority, e.timestamp_ms
241            )
242        })
243        .collect();
244
245    let dispatched_json: Vec<String> = bus
246        .dispatched
247        .iter()
248        .map(|e| {
249            format!(
250                r#"{{"id":{},"topic":"{}","priority":"{:?}","ts":{}}}"#,
251                e.id, e.topic, e.priority, e.timestamp_ms
252            )
253        })
254        .collect();
255
256    format!(
257        r#"{{"pending_count":{},"dispatched_count":{},"pending":[{}],"dispatched":[{}]}}"#,
258        bus.pending.len(),
259        bus.dispatched.len(),
260        pending_json.join(","),
261        dispatched_json.join(",")
262    )
263}
264
265// ---------------------------------------------------------------------------
266// Tests
267// ---------------------------------------------------------------------------
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272
273    #[test]
274    fn test_new_event_bus_empty() {
275        let bus = new_event_bus();
276        assert_eq!(pending_count(&bus), 0);
277        assert_eq!(event_count_total(&bus), 0);
278    }
279
280    #[test]
281    fn test_publish_increments_pending() {
282        let mut bus = new_event_bus();
283        publish(&mut bus, "topic_a", "null", 0);
284        publish(&mut bus, "topic_a", "null", 1);
285        assert_eq!(pending_count(&bus), 2);
286    }
287
288    #[test]
289    fn test_publish_returns_sequential_ids() {
290        let mut bus = new_event_bus();
291        let id1 = publish(&mut bus, "t", "null", 0);
292        let id2 = publish(&mut bus, "t", "null", 0);
293        assert_eq!(id2, id1 + 1);
294    }
295
296    #[test]
297    fn test_subscribe_returns_unique_ids() {
298        let mut bus = new_event_bus();
299        let s1 = subscribe(&mut bus, "topic_x");
300        let s2 = subscribe(&mut bus, "topic_x");
301        assert_ne!(s1, s2);
302    }
303
304    #[test]
305    fn test_topic_subscriber_count() {
306        let mut bus = new_event_bus();
307        subscribe(&mut bus, "topic_a");
308        subscribe(&mut bus, "topic_a");
309        subscribe(&mut bus, "topic_b");
310        assert_eq!(topic_subscriber_count(&bus, "topic_a"), 2);
311        assert_eq!(topic_subscriber_count(&bus, "topic_b"), 1);
312    }
313
314    #[test]
315    fn test_has_subscribers_false_on_empty() {
316        let bus = new_event_bus();
317        assert!(!has_subscribers(&bus, "no_topic"));
318    }
319
320    #[test]
321    fn test_has_subscribers_true_after_subscribe() {
322        let mut bus = new_event_bus();
323        subscribe(&mut bus, "events");
324        assert!(has_subscribers(&bus, "events"));
325    }
326
327    #[test]
328    fn test_unsubscribe_removes_subscriber() {
329        let mut bus = new_event_bus();
330        let sid = subscribe(&mut bus, "topic");
331        assert!(unsubscribe(&mut bus, "topic", sid));
332        assert!(!has_subscribers(&bus, "topic"));
333    }
334
335    #[test]
336    fn test_unsubscribe_returns_false_for_unknown() {
337        let mut bus = new_event_bus();
338        assert!(!unsubscribe(&mut bus, "unknown", 999));
339    }
340
341    #[test]
342    fn test_dispatch_pending_moves_to_dispatched() {
343        let mut bus = new_event_bus();
344        publish(&mut bus, "t", "null", 0);
345        publish(&mut bus, "t", "null", 1);
346        let n = dispatch_pending(&mut bus);
347        assert_eq!(n, 2);
348        assert_eq!(pending_count(&bus), 0);
349        assert_eq!(event_count_total(&bus), 2);
350    }
351
352    #[test]
353    fn test_dispatch_priority_ordering() {
354        let mut bus = new_event_bus();
355        publish_priority(&mut bus, "t", "low", EventPriority::Low, 0);
356        publish_priority(&mut bus, "t", "high", EventPriority::High, 1);
357        publish_priority(&mut bus, "t", "normal", EventPriority::Normal, 2);
358        dispatch_pending(&mut bus);
359        // After dispatch, dispatched order: High, Normal, Low
360        assert_eq!(bus.dispatched[0].payload, "high");
361        assert_eq!(bus.dispatched[1].payload, "normal");
362        assert_eq!(bus.dispatched[2].payload, "low");
363    }
364
365    #[test]
366    fn test_clear_event_bus_resets_counts() {
367        let mut bus = new_event_bus();
368        publish(&mut bus, "t", "null", 5);
369        dispatch_pending(&mut bus);
370        clear_event_bus(&mut bus);
371        assert_eq!(pending_count(&bus), 0);
372        assert_eq!(event_count_total(&bus), 0);
373        assert_eq!(last_event_time(&bus), 0);
374    }
375
376    #[test]
377    fn test_last_event_time_updated() {
378        let mut bus = new_event_bus();
379        publish(&mut bus, "t", "null", 42);
380        assert_eq!(last_event_time(&bus), 42);
381    }
382
383    #[test]
384    fn test_drain_topic_removes_only_matching() {
385        let mut bus = new_event_bus();
386        publish(&mut bus, "alpha", "a1", 0);
387        publish(&mut bus, "beta", "b1", 1);
388        publish(&mut bus, "alpha", "a2", 2);
389        let drained = drain_topic(&mut bus, "alpha");
390        assert_eq!(drained.len(), 2);
391        assert_eq!(pending_count(&bus), 1);
392        assert_eq!(bus.pending[0].topic, "beta");
393    }
394
395    #[test]
396    fn test_event_bus_to_json_contains_counts() {
397        let mut bus = new_event_bus();
398        publish(&mut bus, "t", "null", 0);
399        let json = event_bus_to_json(&bus);
400        assert!(json.contains("pending_count"));
401        assert!(json.contains("dispatched_count"));
402    }
403
404    #[test]
405    fn test_publish_priority_high() {
406        let mut bus = new_event_bus();
407        let id = publish_priority(&mut bus, "t", "hi", EventPriority::High, 99);
408        assert_eq!(bus.pending[0].priority, EventPriority::High);
409        assert_eq!(bus.pending[0].id, id);
410    }
411
412    #[test]
413    fn test_topic_subscriber_count_zero_for_unknown() {
414        let bus = new_event_bus();
415        assert_eq!(topic_subscriber_count(&bus, "ghost"), 0);
416    }
417
418    #[test]
419    fn test_multiple_topics_independent() {
420        let mut bus = new_event_bus();
421        subscribe(&mut bus, "a");
422        subscribe(&mut bus, "b");
423        assert_eq!(topic_subscriber_count(&bus, "a"), 1);
424        assert_eq!(topic_subscriber_count(&bus, "b"), 1);
425        assert_eq!(topic_subscriber_count(&bus, "c"), 0);
426    }
427}