Skip to main content

ma_core/
topic.rs

1//! Gossip pub/sub topic primitive.
2//!
3//! A [`Topic`] represents a named gossip channel identified by a BLAKE3 hash
4//! of its name string. Topics deliver validated messages to an [`Inbox`].
5//!
6//! See the [pubsub spec](https://github.com/bahner/ma-core-spec/blob/main/pubsub.md)
7//! for the full specification.
8
9use std::collections::HashSet;
10use std::time::{SystemTime, UNIX_EPOCH};
11
12use did_ma::Message;
13
14use crate::endpoint::DEFAULT_INBOX_CAPACITY;
15use crate::inbox::Inbox;
16use crate::service::{BROADCAST_TOPIC, CONTENT_TYPE_BROADCAST};
17
18/// A 32-byte topic identifier derived from `blake3(topic_string)`.
19pub type TopicId = [u8; 32];
20
21/// Compute a [`TopicId`] from a topic name string.
22///
23/// ```
24/// use ma_core::topic::topic_id;
25///
26/// let id = topic_id("/ma/broadcast/0.0.1");
27/// assert_eq!(id, *blake3::hash(b"/ma/broadcast/0.0.1").as_bytes());
28/// ```
29pub fn topic_id(name: &str) -> TopicId {
30    *blake3::hash(name.as_bytes()).as_bytes()
31}
32
33/// A gossip pub/sub topic.
34///
35/// Topics are identified by a BLAKE3 hash of their name string. When
36/// subscribed, incoming messages are validated (§1.4) and delivered to an
37/// inbox. Messages from blocked senders (§1.5) are dropped silently.
38///
39/// # Examples
40///
41/// ```
42/// use ma_core::Topic;
43///
44/// let topic = Topic::new("/ma/broadcast/0.0.1");
45/// assert_eq!(topic.name(), "/ma/broadcast/0.0.1");
46/// assert!(!topic.is_subscribed());
47/// ```
48pub struct Topic {
49    name: String,
50    id: TopicId,
51    inbox: Option<Inbox<Message>>,
52    blocked: HashSet<String>,
53}
54
55impl Topic {
56    /// Create a new topic from a protocol-style name string.
57    ///
58    /// The topic starts unsubscribed. Call [`subscribe`](Self::subscribe) to
59    /// begin receiving messages.
60    pub fn new(name: impl Into<String>) -> Self {
61        let name = name.into();
62        let id = topic_id(&name);
63        Self {
64            name,
65            id,
66            inbox: None,
67            blocked: HashSet::new(),
68        }
69    }
70
71    /// Create a topic for the well-known broadcast channel.
72    ///
73    /// Equivalent to `Topic::new("/ma/broadcast/0.0.1")`.
74    pub fn broadcast() -> Self {
75        Self::new(BROADCAST_TOPIC)
76    }
77
78    /// The human-readable topic name.
79    pub fn name(&self) -> &str {
80        &self.name
81    }
82
83    /// The BLAKE3-derived topic identifier.
84    pub fn id(&self) -> &TopicId {
85        &self.id
86    }
87
88    /// Whether this topic is currently subscribed (has an inbox).
89    pub fn is_subscribed(&self) -> bool {
90        self.inbox.is_some()
91    }
92
93    /// Subscribe with a new internal inbox using the default capacity.
94    ///
95    /// If already subscribed, this is a no-op.
96    pub fn subscribe(&mut self) {
97        if self.inbox.is_none() {
98            self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
99        }
100    }
101
102    /// Subscribe with an existing inbox, so messages from multiple sources
103    /// converge into a single queue.
104    ///
105    /// Replaces any previous inbox.
106    pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
107        self.inbox = Some(inbox);
108    }
109
110    /// Unsubscribe — stop receiving messages and drop the inbox.
111    pub fn unsubscribe(&mut self) {
112        self.inbox = None;
113    }
114
115    /// Deliver a message into this topic's inbox after validation.
116    ///
117    /// Returns `true` if the message was accepted, `false` if it was
118    /// rejected (wrong content type, has recipient, blocked sender,
119    /// expired, or not subscribed).
120    pub fn deliver(&mut self, message: Message) -> bool {
121        if self.inbox.is_none() {
122            return false;
123        }
124
125        if !self.validate(&message) {
126            return false;
127        }
128
129        let now = now_secs();
130        let expires_at = if message.ttl == 0 {
131            0
132        } else {
133            message.created_at.saturating_add(message.ttl)
134        };
135
136        // Safety: we checked inbox.is_some() above.
137        self.inbox.as_mut().unwrap().push(now, expires_at, message);
138        true
139    }
140
141    /// Drain all non-expired messages from the topic's inbox.
142    ///
143    /// Returns an empty `Vec` if not subscribed.
144    pub fn drain(&mut self) -> Vec<Message> {
145        match self.inbox.as_mut() {
146            Some(inbox) => inbox.drain(now_secs()),
147            None => Vec::new(),
148        }
149    }
150
151    // ─── Sender blocking (§1.5) ─────────────────────────────────────────
152
153    /// Block a sender DID. Messages from this sender will be dropped
154    /// before any other validation.
155    pub fn block(&mut self, sender_did: impl Into<String>) {
156        self.blocked.insert(sender_did.into());
157    }
158
159    /// Unblock a sender DID.
160    pub fn unblock(&mut self, sender_did: &str) {
161        self.blocked.remove(sender_did);
162    }
163
164    /// Whether a sender DID is blocked.
165    pub fn is_blocked(&self, sender_did: &str) -> bool {
166        self.blocked.contains(sender_did)
167    }
168
169    // ─── Validation (§1.4) ──────────────────────────────────────────────
170
171    /// Validate a message for topic delivery.
172    ///
173    /// Rules (pubsub.md §1.4):
174    /// 1. Content type MUST be `application/x-ma-broadcast`.
175    /// 2. The `to` field MUST be absent or empty.
176    /// 3. Sender MUST NOT be blocked (§1.5 — checked first).
177    /// 4. TTL — reject expired messages.
178    ///
179    /// Signature validation is the caller's responsibility (requires async
180    /// DID document resolution).
181    fn validate(&self, message: &Message) -> bool {
182        // §1.5: blocked sender check first.
183        if self.blocked.contains(&message.from) {
184            return false;
185        }
186
187        // §1.4 rule 1: content type must be broadcast.
188        if message.content_type != CONTENT_TYPE_BROADCAST {
189            return false;
190        }
191
192        // §1.4 rule 2: no recipient.
193        if !message.to.is_empty() {
194            return false;
195        }
196
197        // §1.4 rule 4: TTL check.
198        if message.ttl > 0 {
199            let expires_at = message.created_at.saturating_add(message.ttl);
200            if expires_at <= now_secs() {
201                return false;
202            }
203        }
204
205        true
206    }
207}
208
209/// Current unix timestamp in seconds.
210fn now_secs() -> u64 {
211    SystemTime::now()
212        .duration_since(UNIX_EPOCH)
213        .expect("system clock before UNIX epoch")
214        .as_secs()
215}
216
217#[cfg(test)]
218mod tests {
219    use super::*;
220    use did_ma::{Did, SigningKey};
221
222    fn test_signing_key() -> (SigningKey, String) {
223        let did = Did::new_identity("k51qzi5uqu5test").expect("did");
224        let did_string = did.id();
225        let signing_key = SigningKey::generate(did).expect("signing key");
226        (signing_key, did_string)
227    }
228
229    fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
230        Message::new(
231            from.to_string(),
232            String::new(),
233            CONTENT_TYPE_BROADCAST,
234            b"hello world".to_vec(),
235            signing_key,
236        )
237        .expect("broadcast message")
238    }
239
240    #[test]
241    fn topic_id_is_blake3() {
242        let name = "/ma/broadcast/0.0.1";
243        let id = topic_id(name);
244        assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
245    }
246
247    #[test]
248    fn broadcast_topic_uses_protocol_constant() {
249        let t = Topic::broadcast();
250        assert_eq!(t.name(), BROADCAST_TOPIC);
251    }
252
253    #[test]
254    fn subscribe_unsubscribe_lifecycle() {
255        let mut t = Topic::new("test/topic/0.0.1");
256        assert!(!t.is_subscribed());
257
258        t.subscribe();
259        assert!(t.is_subscribed());
260
261        t.unsubscribe();
262        assert!(!t.is_subscribed());
263    }
264
265    #[test]
266    fn deliver_requires_subscription() {
267        let mut t = Topic::new("test/topic/0.0.1");
268        let (sk, did) = test_signing_key();
269        let msg = make_broadcast(&did, &sk);
270        assert!(!t.deliver(msg));
271    }
272
273    #[test]
274    fn deliver_and_drain() {
275        let mut t = Topic::new("test/topic/0.0.1");
276        t.subscribe();
277        let (sk, did) = test_signing_key();
278        let msg = make_broadcast(&did, &sk);
279        assert!(t.deliver(msg));
280        let drained = t.drain();
281        assert_eq!(drained.len(), 1);
282        assert_eq!(drained[0].content_type, CONTENT_TYPE_BROADCAST);
283    }
284
285    #[test]
286    fn rejects_wrong_content_type() {
287        let mut t = Topic::new("test/topic/0.0.1");
288        t.subscribe();
289        let (sk, did) = test_signing_key();
290        // Use a non-broadcast content type.
291        let msg = Message::new(
292            did,
293            String::new(),
294            "application/x-ma-custom",
295            b"payload".to_vec(),
296            &sk,
297        )
298        .expect("custom message");
299        assert!(!t.deliver(msg));
300    }
301
302    #[test]
303    fn rejects_message_with_recipient() {
304        let mut t = Topic::new("test/topic/0.0.1");
305        t.subscribe();
306        let (sk, did) = test_signing_key();
307        // x-ma-broadcast with a recipient should be rejected by topic
308        // validation even though ma-did would reject it at construction.
309        // Build a valid broadcast first, then tamper with `to`.
310        let mut msg = make_broadcast(&did, &sk);
311        msg.to = "did:ma:someone".to_string();
312        assert!(!t.deliver(msg));
313    }
314
315    #[test]
316    fn blocked_sender_rejected() {
317        let mut t = Topic::new("test/topic/0.0.1");
318        t.subscribe();
319        let (sk, did) = test_signing_key();
320        t.block(did.clone());
321        let msg = make_broadcast(&did, &sk);
322        assert!(!t.deliver(msg));
323    }
324
325    #[test]
326    fn unblock_allows_delivery() {
327        let mut t = Topic::new("test/topic/0.0.1");
328        t.subscribe();
329        let (sk, did) = test_signing_key();
330        t.block(did.clone());
331        t.unblock(&did);
332        let msg = make_broadcast(&did, &sk);
333        assert!(t.deliver(msg));
334    }
335
336    #[test]
337    fn drain_empty_when_unsubscribed() {
338        let mut t = Topic::new("test/topic/0.0.1");
339        assert!(t.drain().is_empty());
340    }
341
342    #[test]
343    fn subscribe_with_shared_inbox() {
344        let mut t = Topic::new("test/topic/0.0.1");
345        let inbox = Inbox::new(128);
346        t.subscribe_with(inbox);
347        assert!(t.is_subscribed());
348        let (sk, did) = test_signing_key();
349        let msg = make_broadcast(&did, &sk);
350        assert!(t.deliver(msg));
351        assert_eq!(t.drain().len(), 1);
352    }
353}