batata_client/naming/
subscriber.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use dashmap::DashMap;
5
6use crate::api::naming::Service;
7use crate::common::build_service_key;
8
9/// Service change event
10#[derive(Clone, Debug)]
11pub struct ServiceChangeEvent {
12    pub namespace: String,
13    pub group_name: String,
14    pub service_name: String,
15    pub service: Service,
16}
17
18impl ServiceChangeEvent {
19    pub fn new(namespace: &str, group_name: &str, service_name: &str, service: Service) -> Self {
20        Self {
21            namespace: namespace.to_string(),
22            group_name: group_name.to_string(),
23            service_name: service_name.to_string(),
24            service,
25        }
26    }
27
28    pub fn key(&self) -> String {
29        build_service_key(&self.service_name, &self.group_name, &self.namespace)
30    }
31}
32
33/// Trait for service change listeners
34#[async_trait]
35pub trait ServiceListener: Send + Sync {
36    /// Called when service changes
37    async fn on_event(&self, event: ServiceChangeEvent);
38}
39
40/// Simple callback-based listener
41pub struct CallbackServiceListener {
42    callback: Box<dyn Fn(ServiceChangeEvent) + Send + Sync>,
43}
44
45impl CallbackServiceListener {
46    pub fn new<F>(callback: F) -> Self
47    where
48        F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
49    {
50        Self {
51            callback: Box::new(callback),
52        }
53    }
54}
55
56#[async_trait]
57impl ServiceListener for CallbackServiceListener {
58    async fn on_event(&self, event: ServiceChangeEvent) {
59        (self.callback)(event);
60    }
61}
62
63/// Subscriber registry for managing service subscriptions
64pub struct SubscriberRegistry {
65    /// Listeners per service key
66    listeners: DashMap<String, Vec<Arc<dyn ServiceListener>>>,
67}
68
69impl SubscriberRegistry {
70    pub fn new() -> Self {
71        Self {
72            listeners: DashMap::new(),
73        }
74    }
75
76    /// Subscribe to a service
77    pub fn subscribe(
78        &self,
79        namespace: &str,
80        group_name: &str,
81        service_name: &str,
82        listener: Arc<dyn ServiceListener>,
83    ) {
84        let key = build_service_key(service_name, group_name, namespace);
85        self.listeners.entry(key).or_default().push(listener);
86    }
87
88    /// Unsubscribe from a service
89    pub fn unsubscribe(&self, namespace: &str, group_name: &str, service_name: &str) {
90        let key = build_service_key(service_name, group_name, namespace);
91        self.listeners.remove(&key);
92    }
93
94    /// Check if a service has subscribers
95    pub fn has_subscribers(&self, namespace: &str, group_name: &str, service_name: &str) -> bool {
96        let key = build_service_key(service_name, group_name, namespace);
97        self.listeners.contains_key(&key)
98    }
99
100    /// Notify listeners of a service change
101    pub async fn notify(&self, event: ServiceChangeEvent) {
102        let key = event.key();
103        if let Some(listeners) = self.listeners.get(&key) {
104            for listener in listeners.iter() {
105                listener.on_event(event.clone()).await;
106            }
107        }
108    }
109
110    /// Get all subscribed service keys
111    pub fn get_subscribed_services(&self) -> Vec<(String, String, String)> {
112        self.listeners
113            .iter()
114            .map(|entry| {
115                let key = entry.key();
116                let parts: Vec<&str> = key.split("@@").collect();
117                if parts.len() == 3 {
118                    (
119                        parts[0].to_string(),
120                        parts[1].to_string(),
121                        parts[2].to_string(),
122                    )
123                } else {
124                    (key.clone(), String::new(), String::new())
125                }
126            })
127            .collect()
128    }
129
130    /// Get subscriber count
131    pub fn subscriber_count(&self) -> usize {
132        self.listeners.len()
133    }
134
135    /// Clear all subscriptions
136    pub fn clear(&self) {
137        self.listeners.clear();
138    }
139}
140
141impl Default for SubscriberRegistry {
142    fn default() -> Self {
143        Self::new()
144    }
145}