1use std::collections::HashSet;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use did_ma::Message;
13
14use crate::endpoint::DEFAULT_INBOX_CAPACITY;
15use crate::inbox::Inbox;
16use crate::service::{BROADCAST_TOPIC, CONTENT_TYPE_BROADCAST};
17
18pub type TopicId = [u8; 32];
20
21pub fn topic_id(name: &str) -> TopicId {
30 *blake3::hash(name.as_bytes()).as_bytes()
31}
32
33pub struct Topic {
49 name: String,
50 id: TopicId,
51 inbox: Option<Inbox<Message>>,
52 blocked: HashSet<String>,
53}
54
55impl Topic {
56 pub fn new(name: impl Into<String>) -> Self {
61 let name = name.into();
62 let id = topic_id(&name);
63 Self {
64 name,
65 id,
66 inbox: None,
67 blocked: HashSet::new(),
68 }
69 }
70
71 pub fn broadcast() -> Self {
75 Self::new(BROADCAST_TOPIC)
76 }
77
78 pub fn name(&self) -> &str {
80 &self.name
81 }
82
83 pub fn id(&self) -> &TopicId {
85 &self.id
86 }
87
88 pub fn is_subscribed(&self) -> bool {
90 self.inbox.is_some()
91 }
92
93 pub fn subscribe(&mut self) {
97 if self.inbox.is_none() {
98 self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
99 }
100 }
101
102 pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
107 self.inbox = Some(inbox);
108 }
109
110 pub fn unsubscribe(&mut self) {
112 self.inbox = None;
113 }
114
115 pub fn deliver(&mut self, message: Message) -> bool {
121 if self.inbox.is_none() {
122 return false;
123 }
124
125 if !self.validate(&message) {
126 return false;
127 }
128
129 let now = now_secs();
130 let expires_at = if message.ttl == 0 {
131 0
132 } else {
133 message.created_at.saturating_add(message.ttl)
134 };
135
136 self.inbox.as_mut().unwrap().push(now, expires_at, message);
138 true
139 }
140
141 pub fn drain(&mut self) -> Vec<Message> {
145 match self.inbox.as_mut() {
146 Some(inbox) => inbox.drain(now_secs()),
147 None => Vec::new(),
148 }
149 }
150
151 pub fn block(&mut self, sender_did: impl Into<String>) {
156 self.blocked.insert(sender_did.into());
157 }
158
159 pub fn unblock(&mut self, sender_did: &str) {
161 self.blocked.remove(sender_did);
162 }
163
164 pub fn is_blocked(&self, sender_did: &str) -> bool {
166 self.blocked.contains(sender_did)
167 }
168
169 fn validate(&self, message: &Message) -> bool {
182 if self.blocked.contains(&message.from) {
184 return false;
185 }
186
187 if message.content_type != CONTENT_TYPE_BROADCAST {
189 return false;
190 }
191
192 if !message.to.is_empty() {
194 return false;
195 }
196
197 if message.ttl > 0 {
199 let expires_at = message.created_at.saturating_add(message.ttl);
200 if expires_at <= now_secs() {
201 return false;
202 }
203 }
204
205 true
206 }
207}
208
209fn now_secs() -> u64 {
211 SystemTime::now()
212 .duration_since(UNIX_EPOCH)
213 .expect("system clock before UNIX epoch")
214 .as_secs()
215}
216
217#[cfg(test)]
218mod tests {
219 use super::*;
220 use did_ma::{Did, SigningKey};
221
222 fn test_signing_key() -> (SigningKey, String) {
223 let did = Did::new_identity("k51qzi5uqu5test").expect("did");
224 let did_string = did.id();
225 let signing_key = SigningKey::generate(did).expect("signing key");
226 (signing_key, did_string)
227 }
228
229 fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
230 Message::new(
231 from.to_string(),
232 String::new(),
233 CONTENT_TYPE_BROADCAST,
234 b"hello world".to_vec(),
235 signing_key,
236 )
237 .expect("broadcast message")
238 }
239
240 #[test]
241 fn topic_id_is_blake3() {
242 let name = "/ma/broadcast/0.0.1";
243 let id = topic_id(name);
244 assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
245 }
246
247 #[test]
248 fn broadcast_topic_uses_protocol_constant() {
249 let t = Topic::broadcast();
250 assert_eq!(t.name(), BROADCAST_TOPIC);
251 }
252
253 #[test]
254 fn subscribe_unsubscribe_lifecycle() {
255 let mut t = Topic::new("test/topic/0.0.1");
256 assert!(!t.is_subscribed());
257
258 t.subscribe();
259 assert!(t.is_subscribed());
260
261 t.unsubscribe();
262 assert!(!t.is_subscribed());
263 }
264
265 #[test]
266 fn deliver_requires_subscription() {
267 let mut t = Topic::new("test/topic/0.0.1");
268 let (sk, did) = test_signing_key();
269 let msg = make_broadcast(&did, &sk);
270 assert!(!t.deliver(msg));
271 }
272
273 #[test]
274 fn deliver_and_drain() {
275 let mut t = Topic::new("test/topic/0.0.1");
276 t.subscribe();
277 let (sk, did) = test_signing_key();
278 let msg = make_broadcast(&did, &sk);
279 assert!(t.deliver(msg));
280 let drained = t.drain();
281 assert_eq!(drained.len(), 1);
282 assert_eq!(drained[0].content_type, CONTENT_TYPE_BROADCAST);
283 }
284
285 #[test]
286 fn rejects_wrong_content_type() {
287 let mut t = Topic::new("test/topic/0.0.1");
288 t.subscribe();
289 let (sk, did) = test_signing_key();
290 let msg = Message::new(
292 did,
293 String::new(),
294 "application/x-ma-custom",
295 b"payload".to_vec(),
296 &sk,
297 )
298 .expect("custom message");
299 assert!(!t.deliver(msg));
300 }
301
302 #[test]
303 fn rejects_message_with_recipient() {
304 let mut t = Topic::new("test/topic/0.0.1");
305 t.subscribe();
306 let (sk, did) = test_signing_key();
307 let mut msg = make_broadcast(&did, &sk);
311 msg.to = "did:ma:someone".to_string();
312 assert!(!t.deliver(msg));
313 }
314
315 #[test]
316 fn blocked_sender_rejected() {
317 let mut t = Topic::new("test/topic/0.0.1");
318 t.subscribe();
319 let (sk, did) = test_signing_key();
320 t.block(did.clone());
321 let msg = make_broadcast(&did, &sk);
322 assert!(!t.deliver(msg));
323 }
324
325 #[test]
326 fn unblock_allows_delivery() {
327 let mut t = Topic::new("test/topic/0.0.1");
328 t.subscribe();
329 let (sk, did) = test_signing_key();
330 t.block(did.clone());
331 t.unblock(&did);
332 let msg = make_broadcast(&did, &sk);
333 assert!(t.deliver(msg));
334 }
335
336 #[test]
337 fn drain_empty_when_unsubscribed() {
338 let mut t = Topic::new("test/topic/0.0.1");
339 assert!(t.drain().is_empty());
340 }
341
342 #[test]
343 fn subscribe_with_shared_inbox() {
344 let mut t = Topic::new("test/topic/0.0.1");
345 let inbox = Inbox::new(128);
346 t.subscribe_with(inbox);
347 assert!(t.is_subscribed());
348 let (sk, did) = test_signing_key();
349 let msg = make_broadcast(&did, &sk);
350 assert!(t.deliver(msg));
351 assert_eq!(t.drain().len(), 1);
352 }
353}