dynamo_runtime/pipeline/network/
manager.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::unified_server::RequestPlaneServer;
18use crate::config::RequestPlaneMode;
19use anyhow::Result;
20use async_once_cell::OnceCell;
21use std::sync::Arc;
22use tokio_util::sync::CancellationToken;
23
24/// Network configuration loaded from environment variables
25#[derive(Clone)]
26struct NetworkConfig {
27    // HTTP server configuration
28    http_host: String,
29    http_port: u16,
30    http_rpc_root: String,
31
32    // TCP server configuration
33    tcp_host: String,
34    tcp_port: u16,
35
36    // HTTP client configuration
37    http_client_config: super::egress::http_router::Http2Config,
38
39    // TCP client configuration
40    tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
41
42    // NATS configuration (provided externally, not from env)
43    nats_client: Option<async_nats::Client>,
44}
45
46impl NetworkConfig {
47    /// Load configuration from environment variables
48    ///
49    /// This is the ONLY place where network-related environment variables are read.
50    fn from_env(nats_client: Option<async_nats::Client>) -> Self {
51        Self {
52            // HTTP server configuration
53            http_host: std::env::var("DYN_HTTP_RPC_HOST")
54                .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
55            http_port: std::env::var("DYN_HTTP_RPC_PORT")
56                .ok()
57                .and_then(|p| p.parse().ok())
58                .unwrap_or(8888),
59            http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
60                .unwrap_or_else(|_| "/v1/rpc".to_string()),
61
62            // TCP server configuration
63            tcp_host: std::env::var("DYN_TCP_RPC_HOST")
64                .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
65            tcp_port: std::env::var("DYN_TCP_RPC_PORT")
66                .ok()
67                .and_then(|p| p.parse().ok())
68                .unwrap_or(9999),
69
70            // HTTP client configuration (reads DYN_HTTP2_* env vars)
71            http_client_config: super::egress::http_router::Http2Config::from_env(),
72
73            // TCP client configuration (reads DYN_TCP_* env vars)
74            tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
75
76            // NATS (external)
77            nats_client,
78        }
79    }
80}
81
82/// Network Manager - Central coordinator for all network resources
83///
84/// # Responsibilities
85///
86/// 1. **Configuration Management**: Reads and manages all network-related environment variables
87/// 2. **Server Creation**: Creates and starts request plane servers based on mode
88/// 3. **Client Creation**: Creates request plane clients on demand
89/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
90///
91/// # Design Principles
92///
93/// - **Single Source of Truth**: All network config and creation logic lives here
94/// - **Lazy Initialization**: Servers are created only when first accessed
95/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
96/// - **No Leaky Abstractions**: Transport types never escape this module
97///
98/// # Example
99///
100/// ```ignore
101/// // Create manager (typically done once in DistributedRuntime)
102/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry);
103///
104/// // Get server (lazy init, cached)
105/// let server = manager.server().await?;
106/// server.register_endpoint(...).await?;
107///
108/// // Create client (not cached, lightweight)
109/// let client = manager.create_client()?;
110/// client.send_request(...).await?;
111/// ```
112pub struct NetworkManager {
113    mode: RequestPlaneMode,
114    config: NetworkConfig,
115    server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
116    cancellation_token: CancellationToken,
117    component_registry: crate::component::Registry,
118}
119
120impl NetworkManager {
121    /// Create a new network manager
122    ///
123    /// This is the single constructor for NetworkManager. All configuration
124    /// is loaded from environment variables internally.
125    ///
126    /// # Arguments
127    ///
128    /// * `cancellation_token` - Token for graceful shutdown of servers
129    /// * `nats_client` - Optional NATS client (required only for NATS mode)
130    /// * `component_registry` - Component registry to get NATS service groups from
131    ///
132    /// # Returns
133    ///
134    /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
135    pub fn new(
136        cancellation_token: CancellationToken,
137        nats_client: Option<async_nats::Client>,
138        component_registry: crate::component::Registry,
139    ) -> Arc<Self> {
140        let mode = RequestPlaneMode::get();
141        let config = NetworkConfig::from_env(nats_client);
142
143        tracing::info!(
144            mode = %mode,
145            http_port = config.http_port,
146            tcp_port = config.tcp_port,
147            "Initializing NetworkManager"
148        );
149
150        Arc::new(Self {
151            mode,
152            config,
153            server: Arc::new(OnceCell::new()),
154            cancellation_token,
155            component_registry,
156        })
157    }
158
159    /// Get or create the request plane server
160    ///
161    /// The server is created lazily on first access and cached for subsequent calls.
162    /// The server is automatically started in the background.
163    ///
164    /// # Returns
165    ///
166    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
167    ///
168    /// # Errors
169    ///
170    /// Returns an error if:
171    /// - Server creation fails (e.g., port already in use)
172    /// - NATS mode is selected but NATS client is not available
173    /// - Configuration is invalid (e.g., malformed bind address)
174    pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
175        let server = self
176            .server
177            .get_or_try_init(async { self.create_server().await })
178            .await?;
179
180        Ok(server.clone())
181    }
182
183    /// Create a new request plane client
184    ///
185    /// Clients are lightweight and not cached. Each call creates a new client instance.
186    ///
187    /// # Returns
188    ///
189    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
190    ///
191    /// # Errors
192    ///
193    /// Returns an error if:
194    /// - Client creation fails (e.g., invalid configuration)
195    /// - NATS mode is selected but NATS client is not available
196    pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
197        match self.mode {
198            RequestPlaneMode::Http => self.create_http_client(),
199            RequestPlaneMode::Tcp => self.create_tcp_client(),
200            RequestPlaneMode::Nats => self.create_nats_client(),
201        }
202    }
203
204    /// Get the current request plane mode
205    ///
206    /// This is provided primarily for logging and debugging purposes.
207    /// Application logic should not branch on mode - use trait objects instead.
208    pub fn mode(&self) -> RequestPlaneMode {
209        self.mode
210    }
211
212    // ============================================================================
213    // PRIVATE: Server Creation
214    // ============================================================================
215
216    async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
217        match self.mode {
218            RequestPlaneMode::Http => self.create_http_server().await,
219            RequestPlaneMode::Tcp => self.create_tcp_server().await,
220            RequestPlaneMode::Nats => self.create_nats_server().await,
221        }
222    }
223
224    async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
225        use super::ingress::http_endpoint::SharedHttpServer;
226
227        let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
228            .parse()
229            .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
230
231        tracing::info!(
232            bind_addr = %bind_addr,
233            rpc_root = %self.config.http_rpc_root,
234            "Creating HTTP request plane server"
235        );
236
237        let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());
238
239        // Start server in background
240        let server_clone = server.clone();
241        tokio::spawn(async move {
242            if let Err(e) = server_clone.start().await {
243                tracing::error!("HTTP request plane server error: {}", e);
244            }
245        });
246
247        Ok(server as Arc<dyn RequestPlaneServer>)
248    }
249
250    async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
251        use super::ingress::shared_tcp_endpoint::SharedTcpServer;
252
253        let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port)
254            .parse()
255            .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
256
257        tracing::info!(
258            bind_addr = %bind_addr,
259            "Creating TCP request plane server"
260        );
261
262        let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());
263
264        // Start server in background
265        let server_clone = server.clone();
266        tokio::spawn(async move {
267            if let Err(e) = server_clone.start().await {
268                tracing::error!("TCP request plane server error: {}", e);
269            }
270        });
271
272        Ok(server as Arc<dyn RequestPlaneServer>)
273    }
274
275    async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
276        use super::ingress::nats_server::NatsMultiplexedServer;
277
278        let nats_client = self
279            .config
280            .nats_client
281            .as_ref()
282            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
283
284        tracing::info!("Creating NATS request plane server");
285
286        Ok(NatsMultiplexedServer::new(
287            nats_client.clone(),
288            self.component_registry.clone(),
289            self.cancellation_token.clone(),
290        ) as Arc<dyn RequestPlaneServer>)
291    }
292
293    // ============================================================================
294    // PRIVATE: Client Creation
295    // ============================================================================
296
297    fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
298        use super::egress::http_router::HttpRequestClient;
299
300        tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
301        Ok(Arc::new(HttpRequestClient::with_config(
302            self.config.http_client_config.clone(),
303        )?))
304    }
305
306    fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
307        use super::egress::tcp_client::TcpRequestClient;
308
309        tracing::debug!("Creating TCP request plane client with config from NetworkManager");
310        Ok(Arc::new(TcpRequestClient::with_config(
311            self.config.tcp_client_config.clone(),
312        )?))
313    }
314
315    fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
316        use super::egress::nats_client::NatsRequestClient;
317
318        let nats_client = self
319            .config
320            .nats_client
321            .as_ref()
322            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
323
324        tracing::debug!("Creating NATS request plane client");
325        Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
326    }
327}