use std::collections::HashSet;
use std::time::{SystemTime, UNIX_EPOCH};
use did_ma::Message;
use crate::endpoint::DEFAULT_INBOX_CAPACITY;
use crate::inbox::Inbox;
use crate::service::{BROADCAST_TOPIC, CONTENT_TYPE_BROADCAST};
pub type TopicId = [u8; 32];
pub fn topic_id(name: &str) -> TopicId {
*blake3::hash(name.as_bytes()).as_bytes()
}
pub struct Topic {
name: String,
id: TopicId,
inbox: Option<Inbox<Message>>,
blocked: HashSet<String>,
}
impl Topic {
pub fn new(name: impl Into<String>) -> Self {
let name = name.into();
let id = topic_id(&name);
Self {
name,
id,
inbox: None,
blocked: HashSet::new(),
}
}
pub fn broadcast() -> Self {
Self::new(BROADCAST_TOPIC)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn id(&self) -> &TopicId {
&self.id
}
pub fn is_subscribed(&self) -> bool {
self.inbox.is_some()
}
pub fn subscribe(&mut self) {
if self.inbox.is_none() {
self.inbox = Some(Inbox::new(DEFAULT_INBOX_CAPACITY));
}
}
pub fn subscribe_with(&mut self, inbox: Inbox<Message>) {
self.inbox = Some(inbox);
}
pub fn unsubscribe(&mut self) {
self.inbox = None;
}
pub fn deliver(&mut self, message: Message) -> bool {
if self.inbox.is_none() {
return false;
}
if !self.validate(&message) {
return false;
}
let now = now_secs();
let expires_at = if message.ttl == 0 {
0
} else {
message.created_at.saturating_add(message.ttl)
};
self.inbox.as_mut().unwrap().push(now, expires_at, message);
true
}
pub fn drain(&mut self) -> Vec<Message> {
match self.inbox.as_mut() {
Some(inbox) => inbox.drain(now_secs()),
None => Vec::new(),
}
}
pub fn block(&mut self, sender_did: impl Into<String>) {
self.blocked.insert(sender_did.into());
}
pub fn unblock(&mut self, sender_did: &str) {
self.blocked.remove(sender_did);
}
pub fn is_blocked(&self, sender_did: &str) -> bool {
self.blocked.contains(sender_did)
}
fn validate(&self, message: &Message) -> bool {
if self.blocked.contains(&message.from) {
return false;
}
if message.content_type != CONTENT_TYPE_BROADCAST {
return false;
}
if !message.to.is_empty() {
return false;
}
if message.ttl > 0 {
let expires_at = message.created_at.saturating_add(message.ttl);
if expires_at <= now_secs() {
return false;
}
}
true
}
}
fn now_secs() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system clock before UNIX epoch")
.as_secs()
}
#[cfg(test)]
mod tests {
use super::*;
use did_ma::{Did, SigningKey};
fn test_signing_key() -> (SigningKey, String) {
let did = Did::new_identity("k51qzi5uqu5test").expect("did");
let did_string = did.id();
let signing_key = SigningKey::generate(did).expect("signing key");
(signing_key, did_string)
}
fn make_broadcast(from: &str, signing_key: &SigningKey) -> Message {
Message::new(
from.to_string(),
String::new(),
CONTENT_TYPE_BROADCAST,
b"hello world".to_vec(),
signing_key,
)
.expect("broadcast message")
}
#[test]
fn topic_id_is_blake3() {
let name = "/ma/broadcast/0.0.1";
let id = topic_id(name);
assert_eq!(id, *blake3::hash(name.as_bytes()).as_bytes());
}
#[test]
fn broadcast_topic_uses_protocol_constant() {
let t = Topic::broadcast();
assert_eq!(t.name(), BROADCAST_TOPIC);
}
#[test]
fn subscribe_unsubscribe_lifecycle() {
let mut t = Topic::new("test/topic/0.0.1");
assert!(!t.is_subscribed());
t.subscribe();
assert!(t.is_subscribed());
t.unsubscribe();
assert!(!t.is_subscribed());
}
#[test]
fn deliver_requires_subscription() {
let mut t = Topic::new("test/topic/0.0.1");
let (sk, did) = test_signing_key();
let msg = make_broadcast(&did, &sk);
assert!(!t.deliver(msg));
}
#[test]
fn deliver_and_drain() {
let mut t = Topic::new("test/topic/0.0.1");
t.subscribe();
let (sk, did) = test_signing_key();
let msg = make_broadcast(&did, &sk);
assert!(t.deliver(msg));
let drained = t.drain();
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].content_type, CONTENT_TYPE_BROADCAST);
}
#[test]
fn rejects_wrong_content_type() {
let mut t = Topic::new("test/topic/0.0.1");
t.subscribe();
let (sk, did) = test_signing_key();
let msg = Message::new(
did,
String::new(),
"application/x-ma-custom",
b"payload".to_vec(),
&sk,
)
.expect("custom message");
assert!(!t.deliver(msg));
}
#[test]
fn rejects_message_with_recipient() {
let mut t = Topic::new("test/topic/0.0.1");
t.subscribe();
let (sk, did) = test_signing_key();
let mut msg = make_broadcast(&did, &sk);
msg.to = "did:ma:someone".to_string();
assert!(!t.deliver(msg));
}
#[test]
fn blocked_sender_rejected() {
let mut t = Topic::new("test/topic/0.0.1");
t.subscribe();
let (sk, did) = test_signing_key();
t.block(did.clone());
let msg = make_broadcast(&did, &sk);
assert!(!t.deliver(msg));
}
#[test]
fn unblock_allows_delivery() {
let mut t = Topic::new("test/topic/0.0.1");
t.subscribe();
let (sk, did) = test_signing_key();
t.block(did.clone());
t.unblock(&did);
let msg = make_broadcast(&did, &sk);
assert!(t.deliver(msg));
}
#[test]
fn drain_empty_when_unsubscribed() {
let mut t = Topic::new("test/topic/0.0.1");
assert!(t.drain().is_empty());
}
#[test]
fn subscribe_with_shared_inbox() {
let mut t = Topic::new("test/topic/0.0.1");
let inbox = Inbox::new(128);
t.subscribe_with(inbox);
assert!(t.is_subscribed());
let (sk, did) = test_signing_key();
let msg = make_broadcast(&did, &sk);
assert!(t.deliver(msg));
assert_eq!(t.drain().len(), 1);
}
}