batata-client 0.0.2

Rust client for Batata/Nacos service discovery and configuration management
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use dashmap::DashMap;

use crate::api::naming::Service;
use crate::common::build_service_key;

/// Service change event
#[derive(Clone, Debug)]
pub struct ServiceChangeEvent {
    pub namespace: String,
    pub group_name: String,
    pub service_name: String,
    pub service: Service,
}

impl ServiceChangeEvent {
    pub fn new(namespace: &str, group_name: &str, service_name: &str, service: Service) -> Self {
        Self {
            namespace: namespace.to_string(),
            group_name: group_name.to_string(),
            service_name: service_name.to_string(),
            service,
        }
    }

    pub fn key(&self) -> String {
        build_service_key(&self.service_name, &self.group_name, &self.namespace)
    }
}

/// Trait for service change listeners
#[async_trait]
pub trait ServiceListener: Send + Sync {
    /// Called when service changes
    async fn on_event(&self, event: ServiceChangeEvent);
}

/// Simple callback-based listener
pub struct CallbackServiceListener {
    callback: Box<dyn Fn(ServiceChangeEvent) + Send + Sync>,
}

impl CallbackServiceListener {
    pub fn new<F>(callback: F) -> Self
    where
        F: Fn(ServiceChangeEvent) + Send + Sync + 'static,
    {
        Self {
            callback: Box::new(callback),
        }
    }
}

#[async_trait]
impl ServiceListener for CallbackServiceListener {
    async fn on_event(&self, event: ServiceChangeEvent) {
        (self.callback)(event);
    }
}

/// Subscriber registry for managing service subscriptions
pub struct SubscriberRegistry {
    /// Listeners per service key
    listeners: DashMap<String, Vec<Arc<dyn ServiceListener>>>,
}

impl SubscriberRegistry {
    pub fn new() -> Self {
        Self {
            listeners: DashMap::new(),
        }
    }

    /// Subscribe to a service
    pub fn subscribe(
        &self,
        namespace: &str,
        group_name: &str,
        service_name: &str,
        listener: Arc<dyn ServiceListener>,
    ) {
        let key = build_service_key(service_name, group_name, namespace);
        self.listeners.entry(key).or_default().push(listener);
    }

    /// Unsubscribe from a service
    pub fn unsubscribe(&self, namespace: &str, group_name: &str, service_name: &str) {
        let key = build_service_key(service_name, group_name, namespace);
        self.listeners.remove(&key);
    }

    /// Check if a service has subscribers
    pub fn has_subscribers(&self, namespace: &str, group_name: &str, service_name: &str) -> bool {
        let key = build_service_key(service_name, group_name, namespace);
        self.listeners.contains_key(&key)
    }

    /// Notify listeners of a service change
    pub async fn notify(&self, event: ServiceChangeEvent) {
        let key = event.key();
        if let Some(listeners) = self.listeners.get(&key) {
            for listener in listeners.iter() {
                listener.on_event(event.clone()).await;
            }
        }
    }

    /// Get all subscribed service keys
    pub fn get_subscribed_services(&self) -> Vec<(String, String, String)> {
        self.listeners
            .iter()
            .map(|entry| {
                let key = entry.key();
                let parts: Vec<&str> = key.split("@@").collect();
                if parts.len() == 3 {
                    (
                        parts[0].to_string(),
                        parts[1].to_string(),
                        parts[2].to_string(),
                    )
                } else {
                    (key.clone(), String::new(), String::new())
                }
            })
            .collect()
    }

    /// Get subscriber count
    pub fn subscriber_count(&self) -> usize {
        self.listeners.len()
    }

    /// Clear all subscriptions
    pub fn clear(&self) {
        self.listeners.clear();
    }
}

impl Default for SubscriberRegistry {
    fn default() -> Self {
        Self::new()
    }
}