Skip to main content

ma_core/
topic.rs

1use web_time::{SystemTime, UNIX_EPOCH};
2// Gossip pub/sub topic primitive.
3//
4// A [`Topic`] represents a named gossip channel identified by a BLAKE3 hash
5// of its name string. Topics deliver validated messages to an [`Inbox`].
6//
7// See the [pubsub spec](https://github.com/bahner/ma-core-spec/blob/main/pubsub.md)
8// for the full specification.
9
10use std::collections::HashSet;
11
12use crate::Message;
13
14use crate::endpoint::DEFAULT_INBOX_CAPACITY;
15use crate::inbox::Inbox;
16use crate::service::{BROADCAST_TOPIC, MESSAGE_TYPE_BROADCAST};
17
18/// A 32-byte topic identifier derived from `blake3(topic_string)`.
19pub type TopicId = [u8; 32];
20
21/// Compute a [`TopicId`] from a topic name string.
22///
23/// ```
24/// use ma_core::topic::topic_id;
25///
26/// let id = topic_id("/ma/broadcast/0.0.1");
27/// assert_eq!(id, *blake3::hash(b"/ma/broadcast/0.0.1").as_bytes());
28/// ```
29#[must_use]
30pub fn topic_id(name: &str) -> TopicId {
31    *blake3::hash(name.as_bytes()).as_bytes()
32}
33
34/// A gossip pub/sub topic.
35///
36/// Topics are identified by a BLAKE3 hash of their name string. When
37/// subscribed, incoming messages are validated (§1.4) and delivered to an
38/// inbox. Messages from blocked senders (§1.5) are dropped silently.
39///
40/// # Examples
41///
42/// ```
43/// use ma_core::Topic;
44///
45/// let topic = Topic::new("/ma/broadcast/0.0.1");
46/// assert_eq!(topic.name(), "/ma/broadcast/0.0.1");
47/// assert!(!topic.is_subscribed());
48/// ```
49pub struct Topic {
50    name: String,
51    id: TopicId,
52    inbox: Option<Inbox<Message>>,
53    blocked: HashSet<String>,
54}
55
56impl Topic {
57    /// Create a new topic from a protocol-style name string.
58    ///
59    /// The topic starts unsubscribed. Call [`subscribe`](Self::subscribe) to
60    /// begin receiving messages.
61    pub fn new(name: impl Into<String>) -> Self {
62        let name = name.into();
63        let id = topic_id(&name);
64        Self {
65            name,
66            id,
67            inbox: None,
68            blocked: HashSet::new(),
69        }
70    }
71
72    /// Create a topic for the well-known broadcast channel.
73    ///
74    /// Equivalent to `Topic::new("/ma/broadcast/0.0.1")`.
75    #[must_use]
76    pub fn broadcast() -> Self {
77        Self::new(BROADCAST_TOPIC)
78    }
79
80    /// The human-readable topic name.
81    #[must_use]
82    pub fn name(&self) -> &str {
83        &self.name
84    }
85
86    /// The BLAKE3-derived topic identifier.
87    #[must_use]
88    pub fn id(&self) -> &TopicId {
89        &self.id
90    }
91
92    /// Whether this topic is currently subscribed (has an inbox).
93    #[must_use]
94    pub fn is_subscribed(&self) -> bool {
95        self.inbox.is_some()
96    }
97
98    /// Subscribe with a new internal inbox using the default capacity.
99    ///
100    /// If already subscribed, this is a no-op.
101    pub fn subscribe(&mut self) {
102        if self.inbox.is_none() {
103            self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
104        }
105    }
106
107    /// Subscribe with an existing inbox, so messages from multiple sources
108    /// converge into a single queue.
109    ///
110    /// Replaces any previous inbox.
111    pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
112        self.inbox = Some(inbox);
113    }
114
115    /// Unsubscribe — stop receiving messages and drop the inbox.
116    pub fn unsubscribe(&mut self) {
117        self.inbox = None;
118    }
119
120    /// Deliver a message into this topic's inbox after validation.
121    ///
122    /// Returns `true` if the message was accepted, `false` if it was
123    /// rejected (wrong content type, has recipient, blocked sender,
124    /// expired, or not subscribed).
125    pub fn deliver(&mut self, message: Message) -> bool {
126        if self.inbox.is_none() {
127            return false;
128        }
129
130        if !self.validate(&message) {
131            return false;
132        }
133
134        let now = now_secs();
135        let expires_at = if message.exp == 0 {
136            0
137        } else {
138            message.exp / 1_000_000_000
139        };
140
141        let Some(inbox) = self.inbox.as_mut() else {
142            return false;
143        };
144        inbox.push(now, expires_at, message);
145        true
146    }
147
148    /// Drain all non-expired messages from the topic's inbox.
149    ///
150    /// Returns an empty `Vec` if not subscribed.
151    pub fn drain(&mut self) -> Vec<Message> {
152        match self.inbox.as_mut() {
153            Some(inbox) => inbox.drain(now_secs()),
154            None => Vec::new(),
155        }
156    }
157
158    // ─── Sender blocking (§1.5) ─────────────────────────────────────────
159
160    /// Block a sender DID. Messages from this sender will be dropped
161    /// before any other validation.
162    pub fn block(&mut self, sender_did: impl Into<String>) {
163        self.blocked.insert(sender_did.into());
164    }
165
166    /// Unblock a sender DID.
167    pub fn unblock(&mut self, sender_did: &str) {
168        self.blocked.remove(sender_did);
169    }
170
171    /// Whether a sender DID is blocked.
172    #[must_use]
173    pub fn is_blocked(&self, sender_did: &str) -> bool {
174        self.blocked.contains(sender_did)
175    }
176
177    // ─── Validation (§1.4) ──────────────────────────────────────────────
178
179    /// Validate a message for topic delivery.
180    ///
181    /// Rules (pubsub.md §1.4):
182    /// 1. Content type MUST be `application/x-ma-broadcast`.
183    /// 2. The `to` field MUST be absent or empty.
184    /// 3. Sender MUST NOT be blocked (§1.5 — checked first).
185    /// 4. TTL — reject expired messages.
186    ///
187    /// Signature validation is the caller's responsibility (requires async
188    /// DID document resolution).
189    fn validate(&self, message: &Message) -> bool {
190        // §1.5: blocked sender check first.
191        if self.blocked.contains(&message.from) {
192            return false;
193        }
194
195        // §1.4 rule 1: content type must be broadcast.
196        if message.message_type != MESSAGE_TYPE_BROADCAST {
197            return false;
198        }
199
200        // §1.4 rule 2: no recipient.
201        if !message.to.is_empty() {
202            return false;
203        }
204
205        // §1.4 rule 4: expiry check.
206        if message.exp > 0 {
207            let expires_at = message.exp / 1_000_000_000;
208            if expires_at <= now_secs() {
209                return false;
210            }
211        }
212
213        true
214    }
215}
216
217/// Current unix timestamp in seconds.
218fn now_secs() -> u64 {
219    SystemTime::now()
220        .duration_since(UNIX_EPOCH)
221        .expect("system clock before UNIX epoch")
222        .as_secs()
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use crate::{Did, SigningKey};
229
230    fn test_signing_key() -> (SigningKey, String) {
231        let did = Did::new_identity("k51qzi5uqu5test").expect("did");
232        let did_string = did.id();
233        let signing_key = SigningKey::generate(did).expect("signing key");
234        (signing_key, did_string)
235    }
236
237    fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
238        Message::new(
239            from.to_string(),
240            String::new(),
241            MESSAGE_TYPE_BROADCAST,
242            "text/plain",
243            b"hello world".to_vec(),
244            signing_key,
245        )
246        .expect("broadcast message")
247    }
248
249    #[test]
250    fn topic_id_is_blake3() {
251        let name = "/ma/broadcast/0.0.1";
252        let id = topic_id(name);
253        assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
254    }
255
256    #[test]
257    fn broadcast_topic_uses_protocol_constant() {
258        let t = Topic::broadcast();
259        assert_eq!(t.name(), BROADCAST_TOPIC);
260    }
261
262    #[test]
263    fn subscribe_unsubscribe_lifecycle() {
264        let mut t = Topic::new("test/topic/0.0.1");
265        assert!(!t.is_subscribed());
266
267        t.subscribe();
268        assert!(t.is_subscribed());
269
270        t.unsubscribe();
271        assert!(!t.is_subscribed());
272    }
273
274    #[test]
275    fn deliver_requires_subscription() {
276        let mut t = Topic::new("test/topic/0.0.1");
277        let (sk, did) = test_signing_key();
278        let msg = make_broadcast(&did, &sk);
279        assert!(!t.deliver(msg));
280    }
281
282    #[test]
283    fn deliver_and_drain() {
284        let mut t = Topic::new("test/topic/0.0.1");
285        t.subscribe();
286        let (sk, did) = test_signing_key();
287        let msg = make_broadcast(&did, &sk);
288        assert!(t.deliver(msg));
289        let drained = t.drain();
290        assert_eq!(drained.len(), 1);
291        assert_eq!(drained[0].message_type, MESSAGE_TYPE_BROADCAST);
292    }
293
294    #[test]
295    fn rejects_wrong_content_type() {
296        let mut t = Topic::new("test/topic/0.0.1");
297        t.subscribe();
298        let (sk, did) = test_signing_key();
299        // Use a non-broadcast content type.
300        let msg = Message::new(
301            did,
302            String::new(),
303            "application/x-ma-custom",
304            "text/plain",
305            b"payload".to_vec(),
306            &sk,
307        )
308        .expect("custom message");
309        assert!(!t.deliver(msg));
310    }
311
312    #[test]
313    fn rejects_message_with_recipient() {
314        let mut t = Topic::new("test/topic/0.0.1");
315        t.subscribe();
316        let (sk, did) = test_signing_key();
317        // x-ma-broadcast with a recipient should be rejected by topic
318        // validation even though ma-did would reject it at construction.
319        // Build a valid broadcast first, then tamper with `to`.
320        let mut msg = make_broadcast(&did, &sk);
321        msg.to = "did:ma:someone".to_string();
322        assert!(!t.deliver(msg));
323    }
324
325    #[test]
326    fn blocked_sender_rejected() {
327        let mut t = Topic::new("test/topic/0.0.1");
328        t.subscribe();
329        let (sk, did) = test_signing_key();
330        t.block(did.clone());
331        let msg = make_broadcast(&did, &sk);
332        assert!(!t.deliver(msg));
333    }
334
335    #[test]
336    fn unblock_allows_delivery() {
337        let mut t = Topic::new("test/topic/0.0.1");
338        t.subscribe();
339        let (sk, did) = test_signing_key();
340        t.block(did.clone());
341        t.unblock(&did);
342        let msg = make_broadcast(&did, &sk);
343        assert!(t.deliver(msg));
344    }
345
346    #[test]
347    fn drain_empty_when_unsubscribed() {
348        let mut t = Topic::new("test/topic/0.0.1");
349        assert!(t.drain().is_empty());
350    }
351
352    #[test]
353    fn subscribe_with_shared_inbox() {
354        let mut t = Topic::new("test/topic/0.0.1");
355        let inbox = Inbox::new(128);
356        t.subscribe_with(inbox);
357        assert!(t.is_subscribed());
358        let (sk, did) = test_signing_key();
359        let msg = make_broadcast(&did, &sk);
360        assert!(t.deliver(msg));
361        assert_eq!(t.drain().len(), 1);
362    }
363}