use std::sync::Arc;
use async_trait::async_trait;
use dashmap::DashMap;
use crate::api::naming::Service;
use crate::common::build_service_key;
#[derive(Clone, Debug)]
pub struct ServiceChangeEvent {
pub namespace: String,
pub group_name: String,
pub service_name: String,
pub service: Service,
}
impl ServiceChangeEvent {
pub fn new(namespace: &str, group_name: &str, service_name: &str, service: Service) -> Self {
Self {
namespace: namespace.to_string(),
group_name: group_name.to_string(),
service_name: service_name.to_string(),
service,
}
}
pub fn key(&self) -> String {
build_service_key(&self.service_name, &self.group_name, &self.namespace)
}
}
#[async_trait]
pub trait ServiceListener: Send + Sync {
async fn on_event(&self, event: ServiceChangeEvent);
}
pub struct CallbackServiceListener {
callback: Box<dyn Fn(ServiceChangeEvent) + Send + Sync>,
}
impl CallbackServiceListener {
pub fn new<F>(callback: F) -> Self
where
F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
{
Self {
callback: Box::new(callback),
}
}
}
#[async_trait]
impl ServiceListener for CallbackServiceListener {
async fn on_event(&self, event: ServiceChangeEvent) {
(self.callback)(event);
}
}
pub struct SubscriberRegistry {
listeners: DashMap<String, Vec<Arc<dyn ServiceListener>>>,
}
impl SubscriberRegistry {
pub fn new() -> Self {
Self {
listeners: DashMap::new(),
}
}
pub fn subscribe(
&self,
namespace: &str,
group_name: &str,
service_name: &str,
listener: Arc<dyn ServiceListener>,
) {
let key = build_service_key(service_name, group_name, namespace);
self.listeners.entry(key).or_default().push(listener);
}
pub fn unsubscribe(&self, namespace: &str, group_name: &str, service_name: &str) {
let key = build_service_key(service_name, group_name, namespace);
self.listeners.remove(&key);
}
pub fn has_subscribers(&self, namespace: &str, group_name: &str, service_name: &str) -> bool {
let key = build_service_key(service_name, group_name, namespace);
self.listeners.contains_key(&key)
}
pub async fn notify(&self, event: ServiceChangeEvent) {
let key = event.key();
if let Some(listeners) = self.listeners.get(&key) {
for listener in listeners.iter() {
listener.on_event(event.clone()).await;
}
}
}
pub fn get_subscribed_services(&self) -> Vec<(String, String, String)> {
self.listeners
.iter()
.map(|entry| {
let key = entry.key();
let parts: Vec<&str> = key.split("@@").collect();
if parts.len() == 3 {
(
parts[0].to_string(),
parts[1].to_string(),
parts[2].to_string(),
)
} else {
(key.clone(), String::new(), String::new())
}
})
.collect()
}
pub fn subscriber_count(&self) -> usize {
self.listeners.len()
}
pub fn clear(&self) {
self.listeners.clear();
}
}
impl Default for SubscriberRegistry {
fn default() -> Self {
Self::new()
}
}