oxihuman_core/
publish_subscribe.rs1#![allow(dead_code)]
4
5#[derive(Clone, Debug)]
9pub struct TopicMessage {
10 pub topic: String,
11 pub payload: String,
12 pub sequence: u64,
13}
14
15#[derive(Clone, Debug)]
17pub struct Subscription {
18 pub id: u64,
19 pub topic: String,
20 pub subscriber_name: String,
21}
22
23#[derive(Clone, Debug)]
25pub struct PubSubConfig {
26 pub max_topics: usize,
27 pub max_messages_per_topic: usize,
28}
29
30impl Default for PubSubConfig {
31 fn default() -> Self {
32 Self {
33 max_topics: 64,
34 max_messages_per_topic: 256,
35 }
36 }
37}
38
39pub struct PubSubBus {
41 pub config: PubSubConfig,
42 subscriptions: Vec<Subscription>,
43 messages: Vec<TopicMessage>,
44 next_sub_id: u64,
45 next_seq: u64,
46}
47
48pub fn new_pubsub_bus(config: PubSubConfig) -> PubSubBus {
50 PubSubBus {
51 config,
52 subscriptions: Vec::new(),
53 messages: Vec::new(),
54 next_sub_id: 1,
55 next_seq: 1,
56 }
57}
58
59pub fn subscribe(bus: &mut PubSubBus, topic: &str, subscriber_name: &str) -> u64 {
61 let id = bus.next_sub_id;
62 bus.next_sub_id += 1;
63 bus.subscriptions.push(Subscription {
64 id,
65 topic: topic.into(),
66 subscriber_name: subscriber_name.into(),
67 });
68 id
69}
70
71pub fn unsubscribe(bus: &mut PubSubBus, sub_id: u64) -> bool {
73 let before = bus.subscriptions.len();
74 bus.subscriptions.retain(|s| s.id != sub_id);
75 bus.subscriptions.len() < before
76}
77
78pub fn publish(bus: &mut PubSubBus, topic: &str, payload: &str) -> u64 {
80 let seq = bus.next_seq;
81 bus.next_seq += 1;
82 let msg = TopicMessage {
83 topic: topic.into(),
84 payload: payload.into(),
85 sequence: seq,
86 };
87 let count = bus.messages.iter().filter(|m| m.topic == topic).count();
89 if count >= bus.config.max_messages_per_topic {
90 let mut removed = false;
91 bus.messages.retain(|m| {
92 if !removed && m.topic == topic {
93 removed = true;
94 false
95 } else {
96 true
97 }
98 });
99 }
100 bus.messages.push(msg);
101 seq
102}
103
104pub fn messages_for_topic<'a>(bus: &'a PubSubBus, topic: &str) -> Vec<&'a TopicMessage> {
106 bus.messages.iter().filter(|m| m.topic == topic).collect()
107}
108
109pub fn subscriber_count(bus: &PubSubBus, topic: &str) -> usize {
111 bus.subscriptions
112 .iter()
113 .filter(|s| s.topic == topic)
114 .count()
115}
116
117pub fn clear_topic(bus: &mut PubSubBus, topic: &str) {
119 bus.messages.retain(|m| m.topic != topic);
120}
121
122impl PubSubBus {
123 pub fn new(config: PubSubConfig) -> Self {
125 new_pubsub_bus(config)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 fn make_bus() -> PubSubBus {
134 new_pubsub_bus(PubSubConfig::default())
135 }
136
137 #[test]
138 fn test_subscribe_returns_unique_ids() {
139 let mut bus = make_bus();
140 let id1 = subscribe(&mut bus, "t", "s1");
141 let id2 = subscribe(&mut bus, "t", "s2");
142 assert_ne!(id1, id2);
143 }
144
145 #[test]
146 fn test_subscriber_count_tracks_subs() {
147 let mut bus = make_bus();
148 subscribe(&mut bus, "topic_a", "alice");
149 subscribe(&mut bus, "topic_a", "bob");
150 assert_eq!(subscriber_count(&bus, "topic_a"), 2);
151 }
152
153 #[test]
154 fn test_unsubscribe_removes_entry() {
155 let mut bus = make_bus();
156 let id = subscribe(&mut bus, "t", "s");
157 assert!(unsubscribe(&mut bus, id));
158 assert_eq!(subscriber_count(&bus, "t"), 0);
159 }
160
161 #[test]
162 fn test_unsubscribe_nonexistent_returns_false() {
163 let mut bus = make_bus();
164 assert!(!unsubscribe(&mut bus, 999));
165 }
166
167 #[test]
168 fn test_publish_stores_message() {
169 let mut bus = make_bus();
170 publish(&mut bus, "news", "hello world");
171 assert_eq!(messages_for_topic(&bus, "news").len(), 1);
172 }
173
174 #[test]
175 fn test_publish_returns_incrementing_sequence() {
176 let mut bus = make_bus();
177 let s1 = publish(&mut bus, "t", "a");
178 let s2 = publish(&mut bus, "t", "b");
179 assert!(s2 > s1);
180 }
181
182 #[test]
183 fn test_clear_topic_removes_messages() {
184 let mut bus = make_bus();
185 publish(&mut bus, "news", "msg1");
186 publish(&mut bus, "news", "msg2");
187 clear_topic(&mut bus, "news");
188 assert!(messages_for_topic(&bus, "news").is_empty());
189 }
190
191 #[test]
192 fn test_messages_for_wrong_topic_empty() {
193 let mut bus = make_bus();
194 publish(&mut bus, "topicX", "data");
195 assert!(messages_for_topic(&bus, "topicY").is_empty());
196 }
197
198 #[test]
199 fn test_capacity_trims_oldest_message() {
200 let mut bus = new_pubsub_bus(PubSubConfig {
201 max_topics: 8,
202 max_messages_per_topic: 2,
203 });
204 publish(&mut bus, "t", "first");
205 publish(&mut bus, "t", "second");
206 publish(&mut bus, "t", "third"); let msgs = messages_for_topic(&bus, "t");
208 assert_eq!(msgs.len(), 2);
209 assert!(msgs.iter().any(|m| m.payload == "second"));
210 assert!(msgs.iter().any(|m| m.payload == "third"));
211 }
212}