Skip to main content

oxihuman_core/
publish_subscribe.rs

1// Copyright (C) 2026 COOLJAPAN OU (Team KitaSan)
2// SPDX-License-Identifier: Apache-2.0
3#![allow(dead_code)]
4
5//! Pub/sub topic bus stub — topics, subscriptions, and message delivery.
6
7/// A message on a topic.
8#[derive(Clone, Debug)]
9pub struct TopicMessage {
10    pub topic: String,
11    pub payload: String,
12    pub sequence: u64,
13}
14
15/// A subscription entry.
16#[derive(Clone, Debug)]
17pub struct Subscription {
18    pub id: u64,
19    pub topic: String,
20    pub subscriber_name: String,
21}
22
23/// Configuration for the pub/sub bus.
24#[derive(Clone, Debug)]
25pub struct PubSubConfig {
26    pub max_topics: usize,
27    pub max_messages_per_topic: usize,
28}
29
30impl Default for PubSubConfig {
31    fn default() -> Self {
32        Self {
33            max_topics: 64,
34            max_messages_per_topic: 256,
35        }
36    }
37}
38
39/// An in-memory pub/sub topic bus.
40pub struct PubSubBus {
41    pub config: PubSubConfig,
42    subscriptions: Vec<Subscription>,
43    messages: Vec<TopicMessage>,
44    next_sub_id: u64,
45    next_seq: u64,
46}
47
48/// Creates a new pub/sub bus.
49pub fn new_pubsub_bus(config: PubSubConfig) -> PubSubBus {
50    PubSubBus {
51        config,
52        subscriptions: Vec::new(),
53        messages: Vec::new(),
54        next_sub_id: 1,
55        next_seq: 1,
56    }
57}
58
59/// Subscribes a named subscriber to a topic, returning the subscription ID.
60pub fn subscribe(bus: &mut PubSubBus, topic: &str, subscriber_name: &str) -> u64 {
61    let id = bus.next_sub_id;
62    bus.next_sub_id += 1;
63    bus.subscriptions.push(Subscription {
64        id,
65        topic: topic.into(),
66        subscriber_name: subscriber_name.into(),
67    });
68    id
69}
70
71/// Unsubscribes by subscription ID, returning true if found.
72pub fn unsubscribe(bus: &mut PubSubBus, sub_id: u64) -> bool {
73    let before = bus.subscriptions.len();
74    bus.subscriptions.retain(|s| s.id != sub_id);
75    bus.subscriptions.len() < before
76}
77
78/// Publishes a message to a topic, returning the sequence number.
79pub fn publish(bus: &mut PubSubBus, topic: &str, payload: &str) -> u64 {
80    let seq = bus.next_seq;
81    bus.next_seq += 1;
82    let msg = TopicMessage {
83        topic: topic.into(),
84        payload: payload.into(),
85        sequence: seq,
86    };
87    /* trim oldest if over capacity */
88    let count = bus.messages.iter().filter(|m| m.topic == topic).count();
89    if count >= bus.config.max_messages_per_topic {
90        let mut removed = false;
91        bus.messages.retain(|m| {
92            if !removed && m.topic == topic {
93                removed = true;
94                false
95            } else {
96                true
97            }
98        });
99    }
100    bus.messages.push(msg);
101    seq
102}
103
104/// Returns all messages for a topic, in sequence order.
105pub fn messages_for_topic<'a>(bus: &'a PubSubBus, topic: &str) -> Vec<&'a TopicMessage> {
106    bus.messages.iter().filter(|m| m.topic == topic).collect()
107}
108
109/// Returns the number of active subscribers to a topic.
110pub fn subscriber_count(bus: &PubSubBus, topic: &str) -> usize {
111    bus.subscriptions
112        .iter()
113        .filter(|s| s.topic == topic)
114        .count()
115}
116
117/// Clears all messages for a given topic.
118pub fn clear_topic(bus: &mut PubSubBus, topic: &str) {
119    bus.messages.retain(|m| m.topic != topic);
120}
121
122impl PubSubBus {
123    /// Creates a new bus with default config.
124    pub fn new(config: PubSubConfig) -> Self {
125        new_pubsub_bus(config)
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    fn make_bus() -> PubSubBus {
134        new_pubsub_bus(PubSubConfig::default())
135    }
136
137    #[test]
138    fn test_subscribe_returns_unique_ids() {
139        let mut bus = make_bus();
140        let id1 = subscribe(&mut bus, "t", "s1");
141        let id2 = subscribe(&mut bus, "t", "s2");
142        assert_ne!(id1, id2);
143    }
144
145    #[test]
146    fn test_subscriber_count_tracks_subs() {
147        let mut bus = make_bus();
148        subscribe(&mut bus, "topic_a", "alice");
149        subscribe(&mut bus, "topic_a", "bob");
150        assert_eq!(subscriber_count(&bus, "topic_a"), 2);
151    }
152
153    #[test]
154    fn test_unsubscribe_removes_entry() {
155        let mut bus = make_bus();
156        let id = subscribe(&mut bus, "t", "s");
157        assert!(unsubscribe(&mut bus, id));
158        assert_eq!(subscriber_count(&bus, "t"), 0);
159    }
160
161    #[test]
162    fn test_unsubscribe_nonexistent_returns_false() {
163        let mut bus = make_bus();
164        assert!(!unsubscribe(&mut bus, 999));
165    }
166
167    #[test]
168    fn test_publish_stores_message() {
169        let mut bus = make_bus();
170        publish(&mut bus, "news", "hello world");
171        assert_eq!(messages_for_topic(&bus, "news").len(), 1);
172    }
173
174    #[test]
175    fn test_publish_returns_incrementing_sequence() {
176        let mut bus = make_bus();
177        let s1 = publish(&mut bus, "t", "a");
178        let s2 = publish(&mut bus, "t", "b");
179        assert!(s2 > s1);
180    }
181
182    #[test]
183    fn test_clear_topic_removes_messages() {
184        let mut bus = make_bus();
185        publish(&mut bus, "news", "msg1");
186        publish(&mut bus, "news", "msg2");
187        clear_topic(&mut bus, "news");
188        assert!(messages_for_topic(&bus, "news").is_empty());
189    }
190
191    #[test]
192    fn test_messages_for_wrong_topic_empty() {
193        let mut bus = make_bus();
194        publish(&mut bus, "topicX", "data");
195        assert!(messages_for_topic(&bus, "topicY").is_empty());
196    }
197
198    #[test]
199    fn test_capacity_trims_oldest_message() {
200        let mut bus = new_pubsub_bus(PubSubConfig {
201            max_topics: 8,
202            max_messages_per_topic: 2,
203        });
204        publish(&mut bus, "t", "first");
205        publish(&mut bus, "t", "second");
206        publish(&mut bus, "t", "third"); /* should evict "first" */
207        let msgs = messages_for_topic(&bus, "t");
208        assert_eq!(msgs.len(), 2);
209        assert!(msgs.iter().any(|m| m.payload == "second"));
210        assert!(msgs.iter().any(|m| m.payload == "third"));
211    }
212}