Skip to main content

ma_core/
topic.rs

1use web_time::{SystemTime, UNIX_EPOCH};
2// Gossip pub/sub topic primitive.
3//
4// A [`Topic`] represents a named gossip channel identified by a BLAKE3 hash
5// of its name string. Topics deliver validated messages to an [`Inbox`].
6//
7// See the [pubsub spec](https://github.com/bahner/ma-core-spec/blob/main/pubsub.md)
8// for the full specification.
9
10use std::collections::HashSet;
11
12use crate::Message;
13
14use crate::endpoint::DEFAULT_INBOX_CAPACITY;
15use crate::inbox::Inbox;
16use crate::service::{BROADCAST_TOPIC, CONTENT_TYPE_BROADCAST};
17
18/// A 32-byte topic identifier derived from `blake3(topic_string)`.
19pub type TopicId = [u8; 32];
20
21/// Compute a [`TopicId`] from a topic name string.
22///
23/// ```
24/// use ma_core::topic::topic_id;
25///
26/// let id = topic_id("/ma/broadcast/0.0.1");
27/// assert_eq!(id, *blake3::hash(b"/ma/broadcast/0.0.1").as_bytes());
28/// ```
29#[must_use]
30pub fn topic_id(name: &str) -> TopicId {
31    *blake3::hash(name.as_bytes()).as_bytes()
32}
33
34/// A gossip pub/sub topic.
35///
36/// Topics are identified by a BLAKE3 hash of their name string. When
37/// subscribed, incoming messages are validated (§1.4) and delivered to an
38/// inbox. Messages from blocked senders (§1.5) are dropped silently.
39///
40/// # Examples
41///
42/// ```
43/// use ma_core::Topic;
44///
45/// let topic = Topic::new("/ma/broadcast/0.0.1");
46/// assert_eq!(topic.name(), "/ma/broadcast/0.0.1");
47/// assert!(!topic.is_subscribed());
48/// ```
49pub struct Topic {
50    name: String,
51    id: TopicId,
52    inbox: Option<Inbox<Message>>,
53    blocked: HashSet<String>,
54}
55
56impl Topic {
57    /// Create a new topic from a protocol-style name string.
58    ///
59    /// The topic starts unsubscribed. Call [`subscribe`](Self::subscribe) to
60    /// begin receiving messages.
61    pub fn new(name: impl Into<String>) -> Self {
62        let name = name.into();
63        let id = topic_id(&name);
64        Self {
65            name,
66            id,
67            inbox: None,
68            blocked: HashSet::new(),
69        }
70    }
71
72    /// Create a topic for the well-known broadcast channel.
73    ///
74    /// Equivalent to `Topic::new("/ma/broadcast/0.0.1")`.
75    #[must_use]
76    pub fn broadcast() -> Self {
77        Self::new(BROADCAST_TOPIC)
78    }
79
80    /// The human-readable topic name.
81    #[must_use]
82    pub fn name(&self) -> &str {
83        &self.name
84    }
85
86    /// The BLAKE3-derived topic identifier.
87    #[must_use]
88    pub fn id(&self) -> &TopicId {
89        &self.id
90    }
91
92    /// Whether this topic is currently subscribed (has an inbox).
93    #[must_use]
94    pub fn is_subscribed(&self) -> bool {
95        self.inbox.is_some()
96    }
97
98    /// Subscribe with a new internal inbox using the default capacity.
99    ///
100    /// If already subscribed, this is a no-op.
101    pub fn subscribe(&mut self) {
102        if self.inbox.is_none() {
103            self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
104        }
105    }
106
107    /// Subscribe with an existing inbox, so messages from multiple sources
108    /// converge into a single queue.
109    ///
110    /// Replaces any previous inbox.
111    pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
112        self.inbox = Some(inbox);
113    }
114
115    /// Unsubscribe — stop receiving messages and drop the inbox.
116    pub fn unsubscribe(&mut self) {
117        self.inbox = None;
118    }
119
120    /// Deliver a message into this topic's inbox after validation.
121    ///
122    /// Returns `true` if the message was accepted, `false` if it was
123    /// rejected (wrong content type, has recipient, blocked sender,
124    /// expired, or not subscribed).
125    pub fn deliver(&mut self, message: Message) -> bool {
126        if self.inbox.is_none() {
127            return false;
128        }
129
130        if !self.validate(&message) {
131            return false;
132        }
133
134        let now = now_secs();
135        let expires_at = if message.ttl == 0 {
136            0
137        } else {
138            message_created_at_secs(message.created_at).saturating_add(message.ttl)
139        };
140
141        let Some(inbox) = self.inbox.as_mut() else {
142            return false;
143        };
144        inbox.push(now, expires_at, message);
145        true
146    }
147
148    /// Drain all non-expired messages from the topic's inbox.
149    ///
150    /// Returns an empty `Vec` if not subscribed.
151    pub fn drain(&mut self) -> Vec<Message> {
152        match self.inbox.as_mut() {
153            Some(inbox) => inbox.drain(now_secs()),
154            None => Vec::new(),
155        }
156    }
157
158    // ─── Sender blocking (§1.5) ─────────────────────────────────────────
159
160    /// Block a sender DID. Messages from this sender will be dropped
161    /// before any other validation.
162    pub fn block(&mut self, sender_did: impl Into<String>) {
163        self.blocked.insert(sender_did.into());
164    }
165
166    /// Unblock a sender DID.
167    pub fn unblock(&mut self, sender_did: &str) {
168        self.blocked.remove(sender_did);
169    }
170
171    /// Whether a sender DID is blocked.
172    #[must_use]
173    pub fn is_blocked(&self, sender_did: &str) -> bool {
174        self.blocked.contains(sender_did)
175    }
176
177    // ─── Validation (§1.4) ──────────────────────────────────────────────
178
179    /// Validate a message for topic delivery.
180    ///
181    /// Rules (pubsub.md §1.4):
182    /// 1. Content type MUST be `application/x-ma-broadcast`.
183    /// 2. The `to` field MUST be absent or empty.
184    /// 3. Sender MUST NOT be blocked (§1.5 — checked first).
185    /// 4. TTL — reject expired messages.
186    ///
187    /// Signature validation is the caller's responsibility (requires async
188    /// DID document resolution).
189    fn validate(&self, message: &Message) -> bool {
190        // §1.5: blocked sender check first.
191        if self.blocked.contains(&message.from) {
192            return false;
193        }
194
195        // §1.4 rule 1: content type must be broadcast.
196        if message.content_type != CONTENT_TYPE_BROADCAST {
197            return false;
198        }
199
200        // §1.4 rule 2: no recipient.
201        if !message.to.is_empty() {
202            return false;
203        }
204
205        // §1.4 rule 4: TTL check.
206        if message.ttl > 0 {
207            let expires_at =
208                message_created_at_secs(message.created_at).saturating_add(message.ttl);
209            if expires_at <= now_secs() {
210                return false;
211            }
212        }
213
214        true
215    }
216}
217
218#[allow(
219    clippy::cast_possible_truncation,
220    clippy::cast_precision_loss,
221    clippy::cast_sign_loss
222)]
223fn message_created_at_secs(created_at: f64) -> u64 {
224    if !created_at.is_finite() || created_at <= 0.0 {
225        0
226    } else if created_at >= u64::MAX as f64 {
227        u64::MAX
228    } else {
229        created_at.floor() as u64
230    }
231}
232
233/// Current unix timestamp in seconds.
234fn now_secs() -> u64 {
235    SystemTime::now()
236        .duration_since(UNIX_EPOCH)
237        .expect("system clock before UNIX epoch")
238        .as_secs()
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244    use crate::{Did, SigningKey};
245
246    fn test_signing_key() -> (SigningKey, String) {
247        let did = Did::new_identity("k51qzi5uqu5test").expect("did");
248        let did_string = did.id();
249        let signing_key = SigningKey::generate(did).expect("signing key");
250        (signing_key, did_string)
251    }
252
253    fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
254        Message::new(
255            from.to_string(),
256            String::new(),
257            CONTENT_TYPE_BROADCAST,
258            b"hello world".to_vec(),
259            signing_key,
260        )
261        .expect("broadcast message")
262    }
263
264    #[test]
265    fn topic_id_is_blake3() {
266        let name = "/ma/broadcast/0.0.1";
267        let id = topic_id(name);
268        assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
269    }
270
271    #[test]
272    fn broadcast_topic_uses_protocol_constant() {
273        let t = Topic::broadcast();
274        assert_eq!(t.name(), BROADCAST_TOPIC);
275    }
276
277    #[test]
278    fn subscribe_unsubscribe_lifecycle() {
279        let mut t = Topic::new("test/topic/0.0.1");
280        assert!(!t.is_subscribed());
281
282        t.subscribe();
283        assert!(t.is_subscribed());
284
285        t.unsubscribe();
286        assert!(!t.is_subscribed());
287    }
288
289    #[test]
290    fn deliver_requires_subscription() {
291        let mut t = Topic::new("test/topic/0.0.1");
292        let (sk, did) = test_signing_key();
293        let msg = make_broadcast(&did, &sk);
294        assert!(!t.deliver(msg));
295    }
296
297    #[test]
298    fn deliver_and_drain() {
299        let mut t = Topic::new("test/topic/0.0.1");
300        t.subscribe();
301        let (sk, did) = test_signing_key();
302        let msg = make_broadcast(&did, &sk);
303        assert!(t.deliver(msg));
304        let drained = t.drain();
305        assert_eq!(drained.len(), 1);
306        assert_eq!(drained[0].content_type, CONTENT_TYPE_BROADCAST);
307    }
308
309    #[test]
310    fn rejects_wrong_content_type() {
311        let mut t = Topic::new("test/topic/0.0.1");
312        t.subscribe();
313        let (sk, did) = test_signing_key();
314        // Use a non-broadcast content type.
315        let msg = Message::new(
316            did,
317            String::new(),
318            "application/x-ma-custom",
319            b"payload".to_vec(),
320            &sk,
321        )
322        .expect("custom message");
323        assert!(!t.deliver(msg));
324    }
325
326    #[test]
327    fn rejects_message_with_recipient() {
328        let mut t = Topic::new("test/topic/0.0.1");
329        t.subscribe();
330        let (sk, did) = test_signing_key();
331        // x-ma-broadcast with a recipient should be rejected by topic
332        // validation even though ma-did would reject it at construction.
333        // Build a valid broadcast first, then tamper with `to`.
334        let mut msg = make_broadcast(&did, &sk);
335        msg.to = "did:ma:someone".to_string();
336        assert!(!t.deliver(msg));
337    }
338
339    #[test]
340    fn blocked_sender_rejected() {
341        let mut t = Topic::new("test/topic/0.0.1");
342        t.subscribe();
343        let (sk, did) = test_signing_key();
344        t.block(did.clone());
345        let msg = make_broadcast(&did, &sk);
346        assert!(!t.deliver(msg));
347    }
348
349    #[test]
350    fn unblock_allows_delivery() {
351        let mut t = Topic::new("test/topic/0.0.1");
352        t.subscribe();
353        let (sk, did) = test_signing_key();
354        t.block(did.clone());
355        t.unblock(&did);
356        let msg = make_broadcast(&did, &sk);
357        assert!(t.deliver(msg));
358    }
359
360    #[test]
361    fn drain_empty_when_unsubscribed() {
362        let mut t = Topic::new("test/topic/0.0.1");
363        assert!(t.drain().is_empty());
364    }
365
366    #[test]
367    fn subscribe_with_shared_inbox() {
368        let mut t = Topic::new("test/topic/0.0.1");
369        let inbox = Inbox::new(128);
370        t.subscribe_with(inbox);
371        assert!(t.is_subscribed());
372        let (sk, did) = test_signing_key();
373        let msg = make_broadcast(&did, &sk);
374        assert!(t.deliver(msg));
375        assert_eq!(t.drain().len(), 1);
376    }
377}