zus-discovery 1.1.4

Service discovery client for ZUS RPC framework with ZooServer integration
Documentation
//! Service Endpoint Pool with Connection Pooling Support
//!
//! Manages a pool of connection pools for service discovery.
//! Each discovered server gets its own connection pool.

use {
  dashmap::DashMap,
  parking_lot::RwLock,
  std::sync::{
    Arc,
    atomic::{AtomicUsize, Ordering},
  },
  tracing::{debug, info, warn},
};

use zus_common::{ConnectionPool, ConnectionPoolConfig, Result, RpcEndpoint, ZusError};

use crate::zooserver::ZusZooClient;

/// Server health status
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ServerActiveStatus {
  Ok,
  Fail,
}

/// Service Endpoint Pool - manages connection pools for each discovered server
///
/// Provides:
/// - Service discovery from ZooServer
/// - Connection pooling per server (configurable)
/// - Round-robin load balancing across servers
/// - Health tracking
pub struct ServiceEndpointPool {
  service_path: String,
  zoo_client: Arc<ZusZooClient>,
  /// Connection pools per server (one pool per discovered server)
  connection_pools: Arc<RwLock<Vec<Arc<ConnectionPool>>>>,
  /// Legacy endpoints for backward compatibility (used when pool_config is None)
  legacy_endpoints: Arc<RwLock<Vec<Arc<RpcEndpoint>>>>,
  round_robin_index: Arc<AtomicUsize>,
  health_status: Arc<DashMap<String, ServerActiveStatus>>,
  /// Connection pool configuration (None = legacy single connection mode)
  pool_config: Option<ConnectionPoolConfig>,
}

impl ServiceEndpointPool {
  /// Create service endpoint pool with default configuration (single connection per server)
  pub async fn new(service_path: String, zoo_client: Arc<ZusZooClient>) -> Result<Arc<Self>> {
    Self::with_pool_config(service_path, zoo_client, None).await
  }

  /// Create service endpoint pool with connection pooling enabled
  pub async fn new_with_pooling(
    service_path: String,
    zoo_client: Arc<ZusZooClient>,
    pool_config: ConnectionPoolConfig,
  ) -> Result<Arc<Self>> {
    Self::with_pool_config(service_path, zoo_client, Some(pool_config)).await
  }

  /// Create service endpoint pool with optional pool configuration
  pub async fn with_pool_config(
    service_path: String,
    zoo_client: Arc<ZusZooClient>,
    pool_config: Option<ConnectionPoolConfig>,
  ) -> Result<Arc<Self>> {
    let pool = Arc::new(Self {
      service_path: service_path.clone(),
      zoo_client,
      connection_pools: Arc::new(RwLock::new(Vec::new())),
      legacy_endpoints: Arc::new(RwLock::new(Vec::new())),
      round_robin_index: Arc::new(AtomicUsize::new(rand::random())),
      health_status: Arc::new(DashMap::new()),
      pool_config,
    });

    // Initial load of endpoints
    pool.refresh_endpoints().await?;

    if pool.pool_config.is_some() {
      info!(
        "ServiceEndpointPool created for {} with connection pooling enabled",
        service_path
      );
    } else {
      debug!("ServiceEndpointPool created for {} (legacy mode)", service_path);
    }

    Ok(pool)
  }

  /// Refresh endpoints from ZooServer
  pub async fn refresh_endpoints(&self) -> Result<()> {
    let children = self.zoo_client.get_path_child(&self.service_path).await?;

    debug!("Refreshed endpoints for {}: {:?}", self.service_path, children);

    if self.pool_config.is_some() {
      self.refresh_with_pooling(&children).await
    } else {
      self.refresh_legacy(&children).await
    }
  }

  /// Refresh endpoints with connection pooling
  async fn refresh_with_pooling(&self, children: &[zus_proto::ZooPathFileNode]) -> Result<()> {
    let config = self.pool_config.clone().unwrap();
    let mut new_pools = Vec::new();

    for child in children {
      let child_path = &child.file;

      // Parse endpoint address (format: "tcp:host:port" or "host:port")
      if let Some((host, port)) = parse_endpoint_address(child_path) {
        match ConnectionPool::new(host.clone(), port, config.clone()).await {
          | Ok(pool) => {
            new_pools.push(pool);
            self.health_status.insert(child_path.clone(), ServerActiveStatus::Ok);
            debug!("Created connection pool for {}", child_path);
          }
          | Err(e) => {
            warn!("Failed to create connection pool for {}: {:?}", child_path, e);
            self.health_status.insert(child_path.clone(), ServerActiveStatus::Fail);
          }
        }
      }
    }

    *self.connection_pools.write() = new_pools;
    Ok(())
  }

  /// Refresh endpoints in legacy mode (single connection per server)
  async fn refresh_legacy(&self, children: &[zus_proto::ZooPathFileNode]) -> Result<()> {
    let mut new_endpoints = Vec::new();

    for child in children {
      let child_path = &child.file;

      if let Some((host, port)) = parse_endpoint_address(child_path) {
        match RpcEndpoint::connect(host, port).await {
          | Ok(endpoint) => {
            new_endpoints.push(Arc::new(endpoint));
            self.health_status.insert(child_path.clone(), ServerActiveStatus::Ok);
          }
          | Err(e) => {
            warn!("Failed to connect to endpoint {}: {:?}", child_path, e);
            self.health_status.insert(child_path.clone(), ServerActiveStatus::Fail);
          }
        }
      }
    }

    *self.legacy_endpoints.write() = new_endpoints;
    Ok(())
  }

  /// Select one server using round-robin with health filtering
  pub fn select_one_server(&self) -> Result<Arc<RpcEndpoint>> {
    if self.pool_config.is_some() {
      self.select_with_pooling()
    } else {
      self.select_legacy()
    }
  }

  /// Select server with connection pooling
  fn select_with_pooling(&self) -> Result<Arc<RpcEndpoint>> {
    let pools = self.connection_pools.read();

    if pools.is_empty() {
      return Err(ZusError::Connection("No available servers".to_string()));
    }

    // Filter healthy server pools
    let healthy: Vec<_> = pools
      .iter()
      .filter(|pool| {
        self
          .health_status
          .get(&pool.address())
          .map(|s| *s.value() == ServerActiveStatus::Ok)
          .unwrap_or(true)
      })
      .cloned()
      .collect();

    let available = if healthy.is_empty() { &pools[..] } else { &healthy[..] };

    // Round-robin select a server pool
    let index = self.round_robin_index.fetch_add(1, Ordering::SeqCst);
    let selected_pool = &available[index % available.len()];

    // Get a connection from the selected pool
    // Note: This blocks briefly, but pool.get_connection is designed to be fast
    let connection = futures::executor::block_on(selected_pool.get_connection())
      .map_err(|e| ZusError::Connection(format!("Failed to get connection from pool: {:?}", e)))?;

    Ok(connection.endpoint())
  }

  /// Select server in legacy mode (single connection)
  fn select_legacy(&self) -> Result<Arc<RpcEndpoint>> {
    let endpoints = self.legacy_endpoints.read();

    if endpoints.is_empty() {
      return Err(ZusError::Connection("No available endpoints".to_string()));
    }

    // Filter healthy endpoints
    let healthy: Vec<_> = endpoints
      .iter()
      .filter(|ep| {
        let addr = ep.address();
        self
          .health_status
          .get(&addr)
          .map(|s| *s.value() == ServerActiveStatus::Ok)
          .unwrap_or(true)
      })
      .cloned()
      .collect();

    let pool = if healthy.is_empty() {
      &endpoints[..]
    } else {
      &healthy[..]
    };

    // Round-robin selection
    let index = self.round_robin_index.fetch_add(1, Ordering::SeqCst);
    let selected = &pool[index % pool.len()];

    Ok(selected.clone())
  }

  /// Async version of select_one_server (preferred for pooled mode)
  pub async fn select_one_server_async(&self) -> Result<Arc<RpcEndpoint>> {
    if self.pool_config.is_some() {
      // Collect available pools while holding lock, then release
      let available = {
        let pools = self.connection_pools.read();

        if pools.is_empty() {
          return Err(ZusError::Connection("No available servers".to_string()));
        }

        // Filter healthy server pools
        let healthy: Vec<_> = pools
          .iter()
          .filter(|pool| {
            self
              .health_status
              .get(&pool.address())
              .map(|s| *s.value() == ServerActiveStatus::Ok)
              .unwrap_or(true)
          })
          .cloned()
          .collect();

        if healthy.is_empty() { pools.clone() } else { healthy }
        // Lock dropped here
      };

      // Round-robin select a server pool
      let index = self.round_robin_index.fetch_add(1, Ordering::SeqCst);
      let selected_pool = &available[index % available.len()];

      // Get a connection from the selected pool
      let connection = selected_pool.get_connection().await?;
      Ok(connection.endpoint())
    } else {
      self.select_legacy()
    }
  }

  /// Get all servers
  pub fn get_all_servers(&self) -> Vec<Arc<RpcEndpoint>> {
    if self.pool_config.is_some() {
      // For pooled mode, return one connection per pool for compatibility
      let pools = self.connection_pools.read();
      pools
        .iter()
        .filter_map(|pool| futures::executor::block_on(pool.get_connection()).ok())
        .map(|conn| conn.endpoint())
        .collect()
    } else {
      self.legacy_endpoints.read().clone()
    }
  }

  /// Update health status for an endpoint
  pub fn update_health_status(&self, address: String, status: ServerActiveStatus) {
    self.health_status.insert(address, status);
  }

  /// Get health status for a specific endpoint (for testing)
  pub fn get_health_status(&self, address: &str) -> Option<ServerActiveStatus> {
    self.health_status.get(address).map(|r| *r.value())
  }

  /// Get all health statuses (for testing)
  #[doc(hidden)]
  pub fn get_all_health_statuses(&self) -> Vec<(String, ServerActiveStatus)> {
    self
      .health_status
      .iter()
      .map(|entry| (entry.key().clone(), *entry.value()))
      .collect()
  }

  pub fn service_path(&self) -> &str {
    &self.service_path
  }

  /// Check if connection pooling is enabled
  pub fn is_pooling_enabled(&self) -> bool {
    self.pool_config.is_some()
  }

  /// Get pool configuration
  pub fn pool_config(&self) -> Option<&ConnectionPoolConfig> {
    self.pool_config.as_ref()
  }
}

/// Parse endpoint address from various formats
/// Supports: "tcp:host:port", "host:port"
fn parse_endpoint_address(path: &str) -> Option<(String, u16)> {
  // Handle "tcp:host:port" format
  if let Some(stripped) = path.strip_prefix("tcp:")
    && let Some((host, port_str)) = stripped.rsplit_once(':')
    && let Ok(port) = port_str.parse::<u16>()
  {
    return Some((host.to_string(), port));
  }

  // Handle "host:port" format
  if let Some((host, port_str)) = path.rsplit_once(':')
    && let Ok(port) = port_str.parse::<u16>()
  {
    return Some((host.to_string(), port));
  }

  None
}

#[cfg(test)]
mod tests {
  use super::*;

  #[test]
  fn test_health_status() {
    let status = ServerActiveStatus::Ok;
    assert_eq!(status, ServerActiveStatus::Ok);
    assert_ne!(status, ServerActiveStatus::Fail);
  }

  #[test]
  fn test_parse_endpoint_address() {
    // Test "tcp:host:port" format
    let result = parse_endpoint_address("tcp:192.168.1.1:9527");
    assert_eq!(result, Some(("192.168.1.1".to_string(), 9527)));

    // Test "host:port" format
    let result = parse_endpoint_address("192.168.1.1:9527");
    assert_eq!(result, Some(("192.168.1.1".to_string(), 9527)));

    // Test invalid format
    let result = parse_endpoint_address("invalid");
    assert_eq!(result, None);
  }
}