1use web_time::{SystemTime, UNIX_EPOCH};
2use std::collections::HashSet;
11
12use crate::Message;
13
14use crate::endpoint::DEFAULT_INBOX_CAPACITY;
15use crate::inbox::Inbox;
16use crate::service::{BROADCAST_TOPIC, MESSAGE_TYPE_BROADCAST};
17
18pub type TopicId = [u8; 32];
20
21#[must_use]
30pub fn topic_id(name: &str) -> TopicId {
31 *blake3::hash(name.as_bytes()).as_bytes()
32}
33
34pub struct Topic {
50 name: String,
51 id: TopicId,
52 inbox: Option<Inbox<Message>>,
53 blocked: HashSet<String>,
54}
55
56impl Topic {
57 pub fn new(name: impl Into<String>) -> Self {
62 let name = name.into();
63 let id = topic_id(&name);
64 Self {
65 name,
66 id,
67 inbox: None,
68 blocked: HashSet::new(),
69 }
70 }
71
72 #[must_use]
76 pub fn broadcast() -> Self {
77 Self::new(BROADCAST_TOPIC)
78 }
79
80 #[must_use]
82 pub fn name(&self) -> &str {
83 &self.name
84 }
85
86 #[must_use]
88 pub fn id(&self) -> &TopicId {
89 &self.id
90 }
91
92 #[must_use]
94 pub fn is_subscribed(&self) -> bool {
95 self.inbox.is_some()
96 }
97
98 pub fn subscribe(&mut self) {
102 if self.inbox.is_none() {
103 self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
104 }
105 }
106
107 pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
112 self.inbox = Some(inbox);
113 }
114
115 pub fn unsubscribe(&mut self) {
117 self.inbox = None;
118 }
119
120 pub fn deliver(&mut self, message: Message) -> bool {
126 if self.inbox.is_none() {
127 return false;
128 }
129
130 if !self.validate(&message) {
131 return false;
132 }
133
134 let now = now_secs();
135 let expires_at = if message.exp == 0 {
136 0
137 } else {
138 message.exp / 1_000_000_000
139 };
140
141 let Some(inbox) = self.inbox.as_mut() else {
142 return false;
143 };
144 inbox.push(now, expires_at, message);
145 true
146 }
147
148 pub fn drain(&mut self) -> Vec<Message> {
152 match self.inbox.as_mut() {
153 Some(inbox) => inbox.drain(now_secs()),
154 None => Vec::new(),
155 }
156 }
157
158 pub fn block(&mut self, sender_did: impl Into<String>) {
163 self.blocked.insert(sender_did.into());
164 }
165
166 pub fn unblock(&mut self, sender_did: &str) {
168 self.blocked.remove(sender_did);
169 }
170
171 #[must_use]
173 pub fn is_blocked(&self, sender_did: &str) -> bool {
174 self.blocked.contains(sender_did)
175 }
176
177 fn validate(&self, message: &Message) -> bool {
190 if self.blocked.contains(&message.from) {
192 return false;
193 }
194
195 if message.message_type != MESSAGE_TYPE_BROADCAST {
197 return false;
198 }
199
200 if !message.to.is_empty() {
202 return false;
203 }
204
205 if message.exp > 0 {
207 let expires_at = message.exp / 1_000_000_000;
208 if expires_at <= now_secs() {
209 return false;
210 }
211 }
212
213 true
214 }
215}
216
217fn now_secs() -> u64 {
219 SystemTime::now()
220 .duration_since(UNIX_EPOCH)
221 .expect("system clock before UNIX epoch")
222 .as_secs()
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use crate::{Did, SigningKey};
229
230 fn test_signing_key() -> (SigningKey, String) {
231 let did = Did::new_identity("k51qzi5uqu5test").expect("did");
232 let did_string = did.id();
233 let signing_key = SigningKey::generate(did).expect("signing key");
234 (signing_key, did_string)
235 }
236
237 fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
238 Message::new(
239 from.to_string(),
240 String::new(),
241 MESSAGE_TYPE_BROADCAST,
242 "text/plain",
243 b"hello world".to_vec(),
244 signing_key,
245 )
246 .expect("broadcast message")
247 }
248
249 #[test]
250 fn topic_id_is_blake3() {
251 let name = "/ma/broadcast/0.0.1";
252 let id = topic_id(name);
253 assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
254 }
255
256 #[test]
257 fn broadcast_topic_uses_protocol_constant() {
258 let t = Topic::broadcast();
259 assert_eq!(t.name(), BROADCAST_TOPIC);
260 }
261
262 #[test]
263 fn subscribe_unsubscribe_lifecycle() {
264 let mut t = Topic::new("test/topic/0.0.1");
265 assert!(!t.is_subscribed());
266
267 t.subscribe();
268 assert!(t.is_subscribed());
269
270 t.unsubscribe();
271 assert!(!t.is_subscribed());
272 }
273
274 #[test]
275 fn deliver_requires_subscription() {
276 let mut t = Topic::new("test/topic/0.0.1");
277 let (sk, did) = test_signing_key();
278 let msg = make_broadcast(&did, &sk);
279 assert!(!t.deliver(msg));
280 }
281
282 #[test]
283 fn deliver_and_drain() {
284 let mut t = Topic::new("test/topic/0.0.1");
285 t.subscribe();
286 let (sk, did) = test_signing_key();
287 let msg = make_broadcast(&did, &sk);
288 assert!(t.deliver(msg));
289 let drained = t.drain();
290 assert_eq!(drained.len(), 1);
291 assert_eq!(drained[0].message_type, MESSAGE_TYPE_BROADCAST);
292 }
293
294 #[test]
295 fn rejects_wrong_content_type() {
296 let mut t = Topic::new("test/topic/0.0.1");
297 t.subscribe();
298 let (sk, did) = test_signing_key();
299 let msg = Message::new(
301 did,
302 String::new(),
303 "application/x-ma-custom",
304 "text/plain",
305 b"payload".to_vec(),
306 &sk,
307 )
308 .expect("custom message");
309 assert!(!t.deliver(msg));
310 }
311
312 #[test]
313 fn rejects_message_with_recipient() {
314 let mut t = Topic::new("test/topic/0.0.1");
315 t.subscribe();
316 let (sk, did) = test_signing_key();
317 let mut msg = make_broadcast(&did, &sk);
321 msg.to = "did:ma:someone".to_string();
322 assert!(!t.deliver(msg));
323 }
324
325 #[test]
326 fn blocked_sender_rejected() {
327 let mut t = Topic::new("test/topic/0.0.1");
328 t.subscribe();
329 let (sk, did) = test_signing_key();
330 t.block(did.clone());
331 let msg = make_broadcast(&did, &sk);
332 assert!(!t.deliver(msg));
333 }
334
335 #[test]
336 fn unblock_allows_delivery() {
337 let mut t = Topic::new("test/topic/0.0.1");
338 t.subscribe();
339 let (sk, did) = test_signing_key();
340 t.block(did.clone());
341 t.unblock(&did);
342 let msg = make_broadcast(&did, &sk);
343 assert!(t.deliver(msg));
344 }
345
346 #[test]
347 fn drain_empty_when_unsubscribed() {
348 let mut t = Topic::new("test/topic/0.0.1");
349 assert!(t.drain().is_empty());
350 }
351
352 #[test]
353 fn subscribe_with_shared_inbox() {
354 let mut t = Topic::new("test/topic/0.0.1");
355 let inbox = Inbox::new(128);
356 t.subscribe_with(inbox);
357 assert!(t.is_subscribed());
358 let (sk, did) = test_signing_key();
359 let msg = make_broadcast(&did, &sk);
360 assert!(t.deliver(msg));
361 assert_eq!(t.drain().len(), 1);
362 }
363}