batata_client/naming/
subscriber.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use dashmap::DashMap;
5
6use crate::api::naming::Service;
7use crate::common::build_service_key;
8
9#[derive(Clone, Debug)]
11pub struct ServiceChangeEvent {
12 pub namespace: String,
13 pub group_name: String,
14 pub service_name: String,
15 pub service: Service,
16}
17
18impl ServiceChangeEvent {
19 pub fn new(namespace: &str, group_name: &str, service_name: &str, service: Service) -> Self {
20 Self {
21 namespace: namespace.to_string(),
22 group_name: group_name.to_string(),
23 service_name: service_name.to_string(),
24 service,
25 }
26 }
27
28 pub fn key(&self) -> String {
29 build_service_key(&self.service_name, &self.group_name, &self.namespace)
30 }
31}
32
33#[async_trait]
35pub trait ServiceListener: Send + Sync {
36 async fn on_event(&self, event: ServiceChangeEvent);
38}
39
40pub struct CallbackServiceListener {
42 callback: Box<dyn Fn(ServiceChangeEvent) + Send + Sync>,
43}
44
45impl CallbackServiceListener {
46 pub fn new<F>(callback: F) -> Self
47 where
48 F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
49 {
50 Self {
51 callback: Box::new(callback),
52 }
53 }
54}
55
56#[async_trait]
57impl ServiceListener for CallbackServiceListener {
58 async fn on_event(&self, event: ServiceChangeEvent) {
59 (self.callback)(event);
60 }
61}
62
63pub struct SubscriberRegistry {
65 listeners: DashMap<String, Vec<Arc<dyn ServiceListener>>>,
67}
68
69impl SubscriberRegistry {
70 pub fn new() -> Self {
71 Self {
72 listeners: DashMap::new(),
73 }
74 }
75
76 pub fn subscribe(
78 &self,
79 namespace: &str,
80 group_name: &str,
81 service_name: &str,
82 listener: Arc<dyn ServiceListener>,
83 ) {
84 let key = build_service_key(service_name, group_name, namespace);
85 self.listeners.entry(key).or_default().push(listener);
86 }
87
88 pub fn unsubscribe(&self, namespace: &str, group_name: &str, service_name: &str) {
90 let key = build_service_key(service_name, group_name, namespace);
91 self.listeners.remove(&key);
92 }
93
94 pub fn has_subscribers(&self, namespace: &str, group_name: &str, service_name: &str) -> bool {
96 let key = build_service_key(service_name, group_name, namespace);
97 self.listeners.contains_key(&key)
98 }
99
100 pub async fn notify(&self, event: ServiceChangeEvent) {
102 let key = event.key();
103 if let Some(listeners) = self.listeners.get(&key) {
104 for listener in listeners.iter() {
105 listener.on_event(event.clone()).await;
106 }
107 }
108 }
109
110 pub fn get_subscribed_services(&self) -> Vec<(String, String, String)> {
112 self.listeners
113 .iter()
114 .map(|entry| {
115 let key = entry.key();
116 let parts: Vec<&str> = key.split("@@").collect();
117 if parts.len() == 3 {
118 (
119 parts[0].to_string(),
120 parts[1].to_string(),
121 parts[2].to_string(),
122 )
123 } else {
124 (key.clone(), String::new(), String::new())
125 }
126 })
127 .collect()
128 }
129
130 pub fn subscriber_count(&self) -> usize {
132 self.listeners.len()
133 }
134
135 pub fn clear(&self) {
137 self.listeners.clear();
138 }
139}
140
141impl Default for SubscriberRegistry {
142 fn default() -> Self {
143 Self::new()
144 }
145}