Skip to main content

hermes_server/
subscription.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use hermes_proto::EventEnvelope;
5use tokio::sync::{broadcast, mpsc};
6use uuid::Uuid;
7
8pub type SubscriptionId = Uuid;
9
10/// Receiver returned by [`super::broker::BrokerEngine::subscribe`].
11pub enum SubscriptionReceiver {
12    /// Fanout: receives all messages via broadcast (zero-copy fan-out).
13    Fanout(broadcast::Receiver<Arc<EventEnvelope>>),
14    /// Queue group: receives messages via round-robin mpsc.
15    QueueGroup(mpsc::Receiver<Arc<EventEnvelope>>),
16}
17
18/// A queue-group member: identity + mpsc channel.
19#[derive(Clone)]
20pub struct QueueGroupMember {
21    pub id: SubscriptionId,
22    pub sender: mpsc::Sender<Arc<EventEnvelope>>,
23}
24
25/// Pre-partitioned subscribers for a single subject.
26///
27/// **Fanout** uses a `broadcast` channel: one `send()` delivers to all
28/// receivers without per-subscriber cloning — the `Arc` is cloned internally.
29///
30/// **Queue groups** keep per-member `mpsc` channels for round-robin selection.
31pub struct SubjectSubscribers {
32    /// Broadcast channel for fanout delivery.
33    pub fanout: broadcast::Sender<Arc<EventEnvelope>>,
34    /// Queue groups: one member per group receives each message.
35    pub groups: HashMap<String, Vec<QueueGroupMember>>,
36}
37
38impl SubjectSubscribers {
39    pub fn new(capacity: usize) -> Self {
40        let (tx, _rx) = broadcast::channel(capacity);
41        Self {
42            fanout: tx,
43            groups: HashMap::new(),
44        }
45    }
46
47    /// Create a new fanout receiver from the broadcast channel.
48    pub fn subscribe_fanout(&self) -> broadcast::Receiver<Arc<EventEnvelope>> {
49        self.fanout.subscribe()
50    }
51
52    /// Add a member to one or more queue groups.
53    pub fn add_to_groups(&mut self, member: QueueGroupMember, queue_groups: &[String]) {
54        for group in queue_groups {
55            self.groups
56                .entry(group.clone())
57                .or_default()
58                .push(member.clone());
59        }
60    }
61
62    /// Remove a member by id from all queue groups.
63    pub fn remove_from_groups(&mut self, id: SubscriptionId) {
64        self.groups.retain(|_, members| {
65            members.retain(|m| m.id != id);
66            !members.is_empty()
67        });
68    }
69
70    /// `true` if no fanout receivers and no group members remain.
71    pub fn is_empty(&self) -> bool {
72        self.fanout.receiver_count() == 0 && self.groups.is_empty()
73    }
74}