hermes_server/
subscription.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use hermes_proto::EventEnvelope;
5use tokio::sync::{broadcast, mpsc};
6use uuid::Uuid;
7
8pub type SubscriptionId = Uuid;
9
10pub enum SubscriptionReceiver {
12 Fanout(broadcast::Receiver<Arc<EventEnvelope>>),
14 QueueGroup(mpsc::Receiver<Arc<EventEnvelope>>),
16}
17
18#[derive(Clone)]
20pub struct QueueGroupMember {
21 pub id: SubscriptionId,
22 pub sender: mpsc::Sender<Arc<EventEnvelope>>,
23}
24
25pub struct SubjectSubscribers {
32 pub fanout: broadcast::Sender<Arc<EventEnvelope>>,
34 pub groups: HashMap<String, Vec<QueueGroupMember>>,
36}
37
38impl SubjectSubscribers {
39 pub fn new(capacity: usize) -> Self {
40 let (tx, _rx) = broadcast::channel(capacity);
41 Self {
42 fanout: tx,
43 groups: HashMap::new(),
44 }
45 }
46
47 pub fn subscribe_fanout(&self) -> broadcast::Receiver<Arc<EventEnvelope>> {
49 self.fanout.subscribe()
50 }
51
52 pub fn add_to_groups(&mut self, member: QueueGroupMember, queue_groups: &[String]) {
54 for group in queue_groups {
55 self.groups
56 .entry(group.clone())
57 .or_default()
58 .push(member.clone());
59 }
60 }
61
62 pub fn remove_from_groups(&mut self, id: SubscriptionId) {
64 self.groups.retain(|_, members| {
65 members.retain(|m| m.id != id);
66 !members.is_empty()
67 });
68 }
69
70 pub fn is_empty(&self) -> bool {
72 self.fanout.receiver_count() == 0 && self.groups.is_empty()
73 }
74}