zus-discovery 1.1.4

Service discovery client for ZUS RPC framework with ZooServer integration
Documentation
use {
  dashmap::DashMap,
  std::{
    sync::{
      Arc,
      atomic::{AtomicUsize, Ordering},
    },
    time::Duration,
  },
  tokio::time::interval,
  tracing::error,
};

use zus_common::{ConnectionPoolConfig, Result};

use crate::selector::ZooServerSelect;

/// Service Discovery Manager - caches and manages service discovery clients
/// Provides singleton pattern for sharing ZooServerSelect instances
/// Runs background health checks on all managed services
pub struct ServiceDiscoveryManager {
  discovery_clients: Arc<DashMap<String, Arc<ZooServerSelect>>>,
  /// Counter for number of health checks performed (for testing)
  health_check_count: Arc<AtomicUsize>,
}

impl ServiceDiscoveryManager {
  pub fn new() -> Arc<Self> {
    let manager = Arc::new(Self {
      discovery_clients: Arc::new(DashMap::new()),
      health_check_count: Arc::new(AtomicUsize::new(0)),
    });

    // Start background health check task (every 3 seconds)
    let manager_clone = manager.clone();
    tokio::spawn(async move {
      manager_clone.health_check_loop().await;
    });

    manager
  }

  /// Get or create service discovery client for given ZooKeeper addresses and service path
  pub async fn get_service_discovery(
    &self,
    addresses: Vec<String>,
    service_path: String,
  ) -> Result<Arc<ZooServerSelect>> {
    self
      .get_service_discovery_with_pool_config(addresses, service_path, None)
      .await
  }

  /// Get or create service discovery client with connection pooling enabled
  pub async fn get_service_discovery_with_pooling(
    &self,
    addresses: Vec<String>,
    service_path: String,
    pool_config: ConnectionPoolConfig,
  ) -> Result<Arc<ZooServerSelect>> {
    self
      .get_service_discovery_with_pool_config(addresses, service_path, Some(pool_config))
      .await
  }

  /// Get or create service discovery client with optional pool configuration
  pub async fn get_service_discovery_with_pool_config(
    &self,
    addresses: Vec<String>,
    service_path: String,
    pool_config: Option<ConnectionPoolConfig>,
  ) -> Result<Arc<ZooServerSelect>> {
    let cache_key = format!("{}:{}:{}", addresses.join(","), service_path, pool_config.is_some());

    // Check cache first
    if let Some(client) = self.discovery_clients.get(&cache_key) {
      return Ok(client.clone());
    }

    // Create new instance
    let client = ZooServerSelect::with_pool_config(addresses, service_path, pool_config).await?;
    self.discovery_clients.insert(cache_key, client.clone());

    Ok(client)
  }

  /// Background health check loop (every 3 seconds)
  async fn health_check_loop(self: Arc<Self>) {
    let mut tick = interval(Duration::from_secs(3));

    loop {
      tick.tick().await;

      // Increment health check counter
      self.health_check_count.fetch_add(1, Ordering::Relaxed);

      // Check health for all discovery clients
      for entry in self.discovery_clients.iter() {
        if let Err(e) = entry.value().check_server_status().await {
          error!("Health check failed for {}: {:?}", entry.key(), e);
        }
      }
    }
  }

  /// Get number of health checks performed (for testing)
  #[doc(hidden)]
  pub fn health_check_count(&self) -> usize {
    self.health_check_count.load(Ordering::Relaxed)
  }
}

#[cfg(test)]
mod tests {
  use super::*;

  #[tokio::test]
  async fn test_manager_creation() {
    let manager = ServiceDiscoveryManager::new();
    assert_eq!(manager.discovery_clients.len(), 0);
  }
}