1use web_time::{SystemTime, UNIX_EPOCH};
2use std::collections::HashSet;
11
12use crate::Message;
13
14use crate::endpoint::DEFAULT_INBOX_CAPACITY;
15use crate::inbox::Inbox;
16use crate::service::{BROADCAST_TOPIC, CONTENT_TYPE_BROADCAST};
17
18pub type TopicId = [u8; 32];
20
21#[must_use]
30pub fn topic_id(name: &str) -> TopicId {
31 *blake3::hash(name.as_bytes()).as_bytes()
32}
33
34pub struct Topic {
50 name: String,
51 id: TopicId,
52 inbox: Option<Inbox<Message>>,
53 blocked: HashSet<String>,
54}
55
56impl Topic {
57 pub fn new(name: impl Into<String>) -> Self {
62 let name = name.into();
63 let id = topic_id(&name);
64 Self {
65 name,
66 id,
67 inbox: None,
68 blocked: HashSet::new(),
69 }
70 }
71
72 #[must_use]
76 pub fn broadcast() -> Self {
77 Self::new(BROADCAST_TOPIC)
78 }
79
80 #[must_use]
82 pub fn name(&self) -> &str {
83 &self.name
84 }
85
86 #[must_use]
88 pub fn id(&self) -> &TopicId {
89 &self.id
90 }
91
92 #[must_use]
94 pub fn is_subscribed(&self) -> bool {
95 self.inbox.is_some()
96 }
97
98 pub fn subscribe(&mut self) {
102 if self.inbox.is_none() {
103 self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
104 }
105 }
106
107 pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
112 self.inbox = Some(inbox);
113 }
114
115 pub fn unsubscribe(&mut self) {
117 self.inbox = None;
118 }
119
120 pub fn deliver(&mut self, message: Message) -> bool {
126 if self.inbox.is_none() {
127 return false;
128 }
129
130 if !self.validate(&message) {
131 return false;
132 }
133
134 let now = now_secs();
135 let expires_at = if message.ttl == 0 {
136 0
137 } else {
138 message_created_at_secs(message.created_at).saturating_add(message.ttl)
139 };
140
141 let Some(inbox) = self.inbox.as_mut() else {
142 return false;
143 };
144 inbox.push(now, expires_at, message);
145 true
146 }
147
148 pub fn drain(&mut self) -> Vec<Message> {
152 match self.inbox.as_mut() {
153 Some(inbox) => inbox.drain(now_secs()),
154 None => Vec::new(),
155 }
156 }
157
158 pub fn block(&mut self, sender_did: impl Into<String>) {
163 self.blocked.insert(sender_did.into());
164 }
165
166 pub fn unblock(&mut self, sender_did: &str) {
168 self.blocked.remove(sender_did);
169 }
170
171 #[must_use]
173 pub fn is_blocked(&self, sender_did: &str) -> bool {
174 self.blocked.contains(sender_did)
175 }
176
177 fn validate(&self, message: &Message) -> bool {
190 if self.blocked.contains(&message.from) {
192 return false;
193 }
194
195 if message.content_type != CONTENT_TYPE_BROADCAST {
197 return false;
198 }
199
200 if !message.to.is_empty() {
202 return false;
203 }
204
205 if message.ttl > 0 {
207 let expires_at =
208 message_created_at_secs(message.created_at).saturating_add(message.ttl);
209 if expires_at <= now_secs() {
210 return false;
211 }
212 }
213
214 true
215 }
216}
217
218#[allow(
219 clippy::cast_possible_truncation,
220 clippy::cast_precision_loss,
221 clippy::cast_sign_loss
222)]
223fn message_created_at_secs(created_at: f64) -> u64 {
224 if !created_at.is_finite() || created_at <= 0.0 {
225 0
226 } else if created_at >= u64::MAX as f64 {
227 u64::MAX
228 } else {
229 created_at.floor() as u64
230 }
231}
232
233fn now_secs() -> u64 {
235 SystemTime::now()
236 .duration_since(UNIX_EPOCH)
237 .expect("system clock before UNIX epoch")
238 .as_secs()
239}
240
241#[cfg(test)]
242mod tests {
243 use super::*;
244 use crate::{Did, SigningKey};
245
246 fn test_signing_key() -> (SigningKey, String) {
247 let did = Did::new_identity("k51qzi5uqu5test").expect("did");
248 let did_string = did.id();
249 let signing_key = SigningKey::generate(did).expect("signing key");
250 (signing_key, did_string)
251 }
252
253 fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
254 Message::new(
255 from.to_string(),
256 String::new(),
257 CONTENT_TYPE_BROADCAST,
258 b"hello world".to_vec(),
259 signing_key,
260 )
261 .expect("broadcast message")
262 }
263
264 #[test]
265 fn topic_id_is_blake3() {
266 let name = "/ma/broadcast/0.0.1";
267 let id = topic_id(name);
268 assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
269 }
270
271 #[test]
272 fn broadcast_topic_uses_protocol_constant() {
273 let t = Topic::broadcast();
274 assert_eq!(t.name(), BROADCAST_TOPIC);
275 }
276
277 #[test]
278 fn subscribe_unsubscribe_lifecycle() {
279 let mut t = Topic::new("test/topic/0.0.1");
280 assert!(!t.is_subscribed());
281
282 t.subscribe();
283 assert!(t.is_subscribed());
284
285 t.unsubscribe();
286 assert!(!t.is_subscribed());
287 }
288
289 #[test]
290 fn deliver_requires_subscription() {
291 let mut t = Topic::new("test/topic/0.0.1");
292 let (sk, did) = test_signing_key();
293 let msg = make_broadcast(&did, &sk);
294 assert!(!t.deliver(msg));
295 }
296
297 #[test]
298 fn deliver_and_drain() {
299 let mut t = Topic::new("test/topic/0.0.1");
300 t.subscribe();
301 let (sk, did) = test_signing_key();
302 let msg = make_broadcast(&did, &sk);
303 assert!(t.deliver(msg));
304 let drained = t.drain();
305 assert_eq!(drained.len(), 1);
306 assert_eq!(drained[0].content_type, CONTENT_TYPE_BROADCAST);
307 }
308
309 #[test]
310 fn rejects_wrong_content_type() {
311 let mut t = Topic::new("test/topic/0.0.1");
312 t.subscribe();
313 let (sk, did) = test_signing_key();
314 let msg = Message::new(
316 did,
317 String::new(),
318 "application/x-ma-custom",
319 b"payload".to_vec(),
320 &sk,
321 )
322 .expect("custom message");
323 assert!(!t.deliver(msg));
324 }
325
326 #[test]
327 fn rejects_message_with_recipient() {
328 let mut t = Topic::new("test/topic/0.0.1");
329 t.subscribe();
330 let (sk, did) = test_signing_key();
331 let mut msg = make_broadcast(&did, &sk);
335 msg.to = "did:ma:someone".to_string();
336 assert!(!t.deliver(msg));
337 }
338
339 #[test]
340 fn blocked_sender_rejected() {
341 let mut t = Topic::new("test/topic/0.0.1");
342 t.subscribe();
343 let (sk, did) = test_signing_key();
344 t.block(did.clone());
345 let msg = make_broadcast(&did, &sk);
346 assert!(!t.deliver(msg));
347 }
348
349 #[test]
350 fn unblock_allows_delivery() {
351 let mut t = Topic::new("test/topic/0.0.1");
352 t.subscribe();
353 let (sk, did) = test_signing_key();
354 t.block(did.clone());
355 t.unblock(&did);
356 let msg = make_broadcast(&did, &sk);
357 assert!(t.deliver(msg));
358 }
359
360 #[test]
361 fn drain_empty_when_unsubscribed() {
362 let mut t = Topic::new("test/topic/0.0.1");
363 assert!(t.drain().is_empty());
364 }
365
366 #[test]
367 fn subscribe_with_shared_inbox() {
368 let mut t = Topic::new("test/topic/0.0.1");
369 let inbox = Inbox::new(128);
370 t.subscribe_with(inbox);
371 assert!(t.is_subscribed());
372 let (sk, did) = test_signing_key();
373 let msg = make_broadcast(&did, &sk);
374 assert!(t.deliver(msg));
375 assert_eq!(t.drain().len(), 1);
376 }
377}