dynamo-runtime 0.7.0-post1

Dynamo Runtime Library
Documentation
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

//! Network Manager - Single Source of Truth for Network Configuration
//!
//! This module consolidates ALL network-related configuration and creation logic.
//! It is the ONLY place in the codebase that:
//! - Reads environment variables for network configuration
//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
//! - Performs mode selection based on RequestPlaneMode
//! - Creates servers and clients
//!
//! The rest of the codebase works exclusively with trait objects and never
//! directly accesses transport implementations or configuration.

use super::egress::unified_client::RequestPlaneClient;
use super::ingress::unified_server::RequestPlaneServer;
use crate::config::RequestPlaneMode;
use anyhow::Result;
use async_once_cell::OnceCell;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;

/// Network configuration loaded from environment variables
#[derive(Clone)]
struct NetworkConfig {
    // HTTP server configuration
    http_host: String,
    http_port: u16,
    http_rpc_root: String,

    // TCP server configuration
    tcp_host: String,
    tcp_port: u16,

    // HTTP client configuration
    http_client_config: super::egress::http_router::Http2Config,

    // TCP client configuration
    tcp_client_config: super::egress::tcp_client::TcpRequestConfig,

    // NATS configuration (provided externally, not from env)
    nats_client: Option<async_nats::Client>,
}

impl NetworkConfig {
    /// Load configuration from environment variables
    ///
    /// This is the ONLY place where network-related environment variables are read.
    fn from_env(nats_client: Option<async_nats::Client>) -> Self {
        Self {
            // HTTP server configuration
            http_host: std::env::var("DYN_HTTP_RPC_HOST")
                .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
            http_port: std::env::var("DYN_HTTP_RPC_PORT")
                .ok()
                .and_then(|p| p.parse().ok())
                .unwrap_or(8888),
            http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
                .unwrap_or_else(|_| "/v1/rpc".to_string()),

            // TCP server configuration
            tcp_host: std::env::var("DYN_TCP_RPC_HOST")
                .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
            tcp_port: std::env::var("DYN_TCP_RPC_PORT")
                .ok()
                .and_then(|p| p.parse().ok())
                .unwrap_or(9999),

            // HTTP client configuration (reads DYN_HTTP2_* env vars)
            http_client_config: super::egress::http_router::Http2Config::from_env(),

            // TCP client configuration (reads DYN_TCP_* env vars)
            tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),

            // NATS (external)
            nats_client,
        }
    }
}

/// Network Manager - Central coordinator for all network resources
///
/// # Responsibilities
///
/// 1. **Configuration Management**: Reads and manages all network-related environment variables
/// 2. **Server Creation**: Creates and starts request plane servers based on mode
/// 3. **Client Creation**: Creates request plane clients on demand
/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
///
/// # Design Principles
///
/// - **Single Source of Truth**: All network config and creation logic lives here
/// - **Lazy Initialization**: Servers are created only when first accessed
/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
/// - **No Leaky Abstractions**: Transport types never escape this module
///
/// # Example
///
/// ```ignore
/// // Create manager (typically done once in DistributedRuntime)
/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry);
///
/// // Get server (lazy init, cached)
/// let server = manager.server().await?;
/// server.register_endpoint(...).await?;
///
/// // Create client (not cached, lightweight)
/// let client = manager.create_client()?;
/// client.send_request(...).await?;
/// ```
pub struct NetworkManager {
    mode: RequestPlaneMode,
    config: NetworkConfig,
    server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
    cancellation_token: CancellationToken,
    component_registry: crate::component::Registry,
}

impl NetworkManager {
    /// Create a new network manager
    ///
    /// This is the single constructor for NetworkManager. All configuration
    /// is loaded from environment variables internally.
    ///
    /// # Arguments
    ///
    /// * `cancellation_token` - Token for graceful shutdown of servers
    /// * `nats_client` - Optional NATS client (required only for NATS mode)
    /// * `component_registry` - Component registry to get NATS service groups from
    ///
    /// # Returns
    ///
    /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
    pub fn new(
        cancellation_token: CancellationToken,
        nats_client: Option<async_nats::Client>,
        component_registry: crate::component::Registry,
    ) -> Arc<Self> {
        let mode = RequestPlaneMode::get();
        let config = NetworkConfig::from_env(nats_client);

        tracing::info!(
            mode = %mode,
            http_port = config.http_port,
            tcp_port = config.tcp_port,
            "Initializing NetworkManager"
        );

        Arc::new(Self {
            mode,
            config,
            server: Arc::new(OnceCell::new()),
            cancellation_token,
            component_registry,
        })
    }

    /// Get or create the request plane server
    ///
    /// The server is created lazily on first access and cached for subsequent calls.
    /// The server is automatically started in the background.
    ///
    /// # Returns
    ///
    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - Server creation fails (e.g., port already in use)
    /// - NATS mode is selected but NATS client is not available
    /// - Configuration is invalid (e.g., malformed bind address)
    pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
        let server = self
            .server
            .get_or_try_init(async { self.create_server().await })
            .await?;

        Ok(server.clone())
    }

    /// Create a new request plane client
    ///
    /// Clients are lightweight and not cached. Each call creates a new client instance.
    ///
    /// # Returns
    ///
    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - Client creation fails (e.g., invalid configuration)
    /// - NATS mode is selected but NATS client is not available
    pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
        match self.mode {
            RequestPlaneMode::Http => self.create_http_client(),
            RequestPlaneMode::Tcp => self.create_tcp_client(),
            RequestPlaneMode::Nats => self.create_nats_client(),
        }
    }

    /// Get the current request plane mode
    ///
    /// This is provided primarily for logging and debugging purposes.
    /// Application logic should not branch on mode - use trait objects instead.
    pub fn mode(&self) -> RequestPlaneMode {
        self.mode
    }

    // ============================================================================
    // PRIVATE: Server Creation
    // ============================================================================

    async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
        match self.mode {
            RequestPlaneMode::Http => self.create_http_server().await,
            RequestPlaneMode::Tcp => self.create_tcp_server().await,
            RequestPlaneMode::Nats => self.create_nats_server().await,
        }
    }

    async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
        use super::ingress::http_endpoint::SharedHttpServer;

        let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
            .parse()
            .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;

        tracing::info!(
            bind_addr = %bind_addr,
            rpc_root = %self.config.http_rpc_root,
            "Creating HTTP request plane server"
        );

        let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());

        // Start server in background
        let server_clone = server.clone();
        tokio::spawn(async move {
            if let Err(e) = server_clone.start().await {
                tracing::error!("HTTP request plane server error: {}", e);
            }
        });

        Ok(server as Arc<dyn RequestPlaneServer>)
    }

    async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
        use super::ingress::shared_tcp_endpoint::SharedTcpServer;

        let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port)
            .parse()
            .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;

        tracing::info!(
            bind_addr = %bind_addr,
            "Creating TCP request plane server"
        );

        let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());

        // Start server in background
        let server_clone = server.clone();
        tokio::spawn(async move {
            if let Err(e) = server_clone.start().await {
                tracing::error!("TCP request plane server error: {}", e);
            }
        });

        Ok(server as Arc<dyn RequestPlaneServer>)
    }

    async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
        use super::ingress::nats_server::NatsMultiplexedServer;

        let nats_client = self
            .config
            .nats_client
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;

        tracing::info!("Creating NATS request plane server");

        Ok(NatsMultiplexedServer::new(
            nats_client.clone(),
            self.component_registry.clone(),
            self.cancellation_token.clone(),
        ) as Arc<dyn RequestPlaneServer>)
    }

    // ============================================================================
    // PRIVATE: Client Creation
    // ============================================================================

    fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
        use super::egress::http_router::HttpRequestClient;

        tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
        Ok(Arc::new(HttpRequestClient::with_config(
            self.config.http_client_config.clone(),
        )?))
    }

    fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
        use super::egress::tcp_client::TcpRequestClient;

        tracing::debug!("Creating TCP request plane client with config from NetworkManager");
        Ok(Arc::new(TcpRequestClient::with_config(
            self.config.tcp_client_config.clone(),
        )?))
    }

    fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
        use super::egress::nats_client::NatsRequestClient;

        let nats_client = self
            .config
            .nats_client
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;

        tracing::debug!("Creating NATS request plane client");
        Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
    }
}