zus-discovery 1.1.4

Service discovery client for ZUS RPC framework with ZooServer integration
Documentation
use {bytes::Bytes, dashmap::DashMap, std::sync::Arc, tracing::info};

use zus_common::{ConnectionPoolConfig, Result, RpcEndpoint};

use crate::{
  pool::{ServerActiveStatus, ServiceEndpointPool},
  zooserver::ZusZooClient,
};

/// ZooServer Select (replacing Java's ZooServerSelect)
/// Orchestrates service discovery and manages multiple service paths
pub struct ZooServerSelect {
  #[allow(dead_code)]
  addresses: Vec<String>, // Kept for potential reconnection logic
  zoo_client: Arc<ZusZooClient>,
  service_pools: Arc<DashMap<String, Arc<ServiceEndpointPool>>>,
  /// Connection pool configuration (None = single connection per server)
  pool_config: Option<ConnectionPoolConfig>,
}

impl ZooServerSelect {
  /// Create without connection pooling (legacy mode, backward compatible)
  pub async fn new(addresses: Vec<String>, service_path: String) -> Result<Arc<Self>> {
    Self::with_pool_config(addresses, service_path, None).await
  }

  /// Create with connection pooling enabled
  pub async fn new_with_pooling(
    addresses: Vec<String>,
    service_path: String,
    pool_config: ConnectionPoolConfig,
  ) -> Result<Arc<Self>> {
    Self::with_pool_config(addresses, service_path, Some(pool_config)).await
  }

  /// Create with optional pool configuration
  pub async fn with_pool_config(
    addresses: Vec<String>,
    service_path: String,
    pool_config: Option<ConnectionPoolConfig>,
  ) -> Result<Arc<Self>> {
    let zoo_client = ZusZooClient::new(addresses.clone()).await?;

    let select = Arc::new(Self {
      addresses,
      zoo_client: zoo_client.clone(),
      service_pools: Arc::new(DashMap::new()),
      pool_config,
    });

    // Initialize the service path
    select.init_path(service_path).await?;

    Ok(select)
  }

  /// Initialize a service path
  async fn init_path(&self, service_path: String) -> Result<()> {
    let pool =
      ServiceEndpointPool::with_pool_config(service_path.clone(), self.zoo_client.clone(), self.pool_config.clone())
        .await?;
    self.service_pools.insert(service_path, pool);
    Ok(())
  }

  /// Get or create service endpoint pool
  async fn get_or_create_pool(&self, path: &str) -> Result<Arc<ServiceEndpointPool>> {
    if let Some(pool) = self.service_pools.get(path) {
      return Ok(pool.clone());
    }

    let pool =
      ServiceEndpointPool::with_pool_config(path.to_string(), self.zoo_client.clone(), self.pool_config.clone())
        .await?;
    self.service_pools.insert(path.to_string(), pool.clone());

    Ok(pool)
  }

  /// Select one server from the given path
  pub async fn select_one_server(&self, path: &str) -> Result<Arc<RpcEndpoint>> {
    let pool = self.get_or_create_pool(path).await?;
    pool.select_one_server()
  }

  /// Get all servers from the given path
  pub async fn get_all_servers(&self, path: &str) -> Result<Vec<Arc<RpcEndpoint>>> {
    let pool = self.get_or_create_pool(path).await?;
    Ok(pool.get_all_servers())
  }

  /// Check server status for all paths (health check)
  pub async fn check_server_status(&self) -> Result<()> {
    for entry in self.service_pools.iter() {
      let pool = entry.value();
      let servers = pool.get_all_servers();

      for server in servers {
        // Call .CheckServerStatus on each endpoint
        let result = server
          .sync_call(Bytes::from(".CheckServerStatus"), Bytes::new(), 3000)
          .await;

        let status = if result.is_ok() {
          ServerActiveStatus::Ok
        } else {
          ServerActiveStatus::Fail
        };

        pool.update_health_status(server.address(), status);
      }
    }

    Ok(())
  }

  /// Handle path child changed event
  pub async fn on_path_evt_child_changed(&self, path: &str) -> Result<()> {
    if let Some(pool) = self.service_pools.get(path) {
      info!("Path children changed: {}", path);
      pool.refresh_endpoints().await?;
    }

    Ok(())
  }

  pub fn zoo_client(&self) -> &Arc<ZusZooClient> {
    &self.zoo_client
  }
}