dynamo_runtime/pipeline/network/manager.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::shared_tcp_endpoint::SharedTcpServer;
18use super::ingress::unified_server::RequestPlaneServer;
19use crate::distributed::RequestPlaneMode;
20use anyhow::Result;
21use async_once_cell::OnceCell;
22use std::sync::Arc;
23use std::sync::OnceLock;
24use tokio_util::sync::CancellationToken;
25
26/// Global storage for the actual TCP RPC port after binding.
27/// Uses OnceLock since the port is set once when the server binds and never changes.
28static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
29
30/// Global storage for the shared TCP server instance.
31///
32/// When multiple workers run in the same process, they must share a single TCP server
33/// to ensure all endpoints are registered on the same server. Without this, each worker
34/// would create its own server on a different port, but all would publish the same port
35/// (from ACTUAL_TCP_RPC_PORT) to discovery, causing "No handler found" errors.
36///
37/// Uses `tokio::sync::OnceCell` to support async initialization (binding the TCP socket).
38static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
39 tokio::sync::OnceCell::const_new();
40
41/// Get the actual TCP RPC port that the server is listening on.
42pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
43 ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
44 tracing::error!(
45 "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
46 );
47 anyhow::anyhow!(
48 "TCP RPC port not initialized. This is not expected."
49 )
50 })
51}
52
53/// Set the actual TCP RPC port (called internally after server binds).
54fn set_actual_tcp_rpc_port(port: u16) {
55 if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
56 tracing::warn!(
57 existing_port = existing,
58 new_port = port,
59 "TCP RPC port already set, ignoring new value"
60 );
61 }
62}
63
64/// Network configuration loaded from environment variables
65#[derive(Clone)]
66struct NetworkConfig {
67 // HTTP server configuration
68 http_host: String,
69 http_port: u16,
70 http_rpc_root: String,
71
72 // TCP server configuration
73 tcp_host: String,
74 /// TCP port to bind to. If None, the OS will assign a free port.
75 tcp_port: Option<u16>,
76
77 // HTTP client configuration
78 http_client_config: super::egress::http_router::Http2Config,
79
80 // TCP client configuration
81 tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
82
83 // NATS configuration (provided externally, not from env)
84 nats_client: Option<async_nats::Client>,
85}
86
87impl NetworkConfig {
88 /// Load configuration from environment variables
89 ///
90 /// This is the ONLY place where network-related environment variables are read.
91 fn from_env(nats_client: Option<async_nats::Client>) -> Self {
92 Self {
93 // HTTP server configuration
94 http_host: std::env::var("DYN_HTTP_RPC_HOST")
95 .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
96 http_port: std::env::var("DYN_HTTP_RPC_PORT")
97 .ok()
98 .and_then(|p| p.parse().ok())
99 .unwrap_or(8888),
100 http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
101 .unwrap_or_else(|_| "/v1/rpc".to_string()),
102
103 // TCP server configuration
104 // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
105 tcp_host: std::env::var("DYN_TCP_RPC_HOST")
106 .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
107 tcp_port: std::env::var("DYN_TCP_RPC_PORT")
108 .ok()
109 .and_then(|p| p.parse().ok()),
110
111 // HTTP client configuration (reads DYN_HTTP2_* env vars)
112 http_client_config: super::egress::http_router::Http2Config::from_env(),
113
114 // TCP client configuration (reads DYN_TCP_* env vars)
115 tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
116
117 // NATS (external)
118 nats_client,
119 }
120 }
121}
122
123/// Network Manager - Central coordinator for all network resources
124///
125/// # Responsibilities
126///
127/// 1. **Configuration Management**: Reads and manages all network-related environment variables
128/// 2. **Server Creation**: Creates and starts request plane servers based on mode
129/// 3. **Client Creation**: Creates request plane clients on demand
130/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
131///
132/// # Design Principles
133///
134/// - **Single Source of Truth**: All network config and creation logic lives here
135/// - **Lazy Initialization**: Servers are created only when first accessed
136/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
137/// - **No Leaky Abstractions**: Transport types never escape this module
138///
139/// # Example
140///
141/// ```ignore
142/// // Create manager (typically done once in DistributedRuntime)
143/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
144///
145/// // Get server (lazy init, cached)
146/// let server = manager.server().await?;
147/// server.register_endpoint(...).await?;
148///
149/// // Create client (not cached, lightweight)
150/// let client = manager.create_client()?;
151/// client.send_request(...).await?;
152/// ```
153pub struct NetworkManager {
154 mode: RequestPlaneMode,
155 config: NetworkConfig,
156 server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
157 cancellation_token: CancellationToken,
158 component_registry: crate::component::Registry,
159}
160
161impl NetworkManager {
162 /// Create a new network manager
163 ///
164 /// This is the single constructor for NetworkManager. All configuration
165 /// is loaded from environment variables internally.
166 ///
167 /// # Arguments
168 ///
169 /// * `cancellation_token` - Token for graceful shutdown of servers
170 /// * `nats_client` - Optional NATS client (required only for NATS mode)
171 /// * `component_registry` - Component registry to get NATS service groups from
172 ///
173 /// # Returns
174 ///
175 /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
176 pub fn new(
177 cancellation_token: CancellationToken,
178 nats_client: Option<async_nats::Client>,
179 component_registry: crate::component::Registry,
180 mode: RequestPlaneMode,
181 ) -> Self {
182 let config = NetworkConfig::from_env(nats_client);
183
184 match mode {
185 RequestPlaneMode::Http => {
186 tracing::info!(
187 %mode,
188 host = %config.http_host,
189 port = config.http_port,
190 rpc_root = %config.http_rpc_root,
191 "Initializing NetworkManager with HTTP request plane"
192 );
193 }
194 RequestPlaneMode::Tcp => {
195 let port_display = config
196 .tcp_port
197 .map(|p| p.to_string())
198 .unwrap_or_else(|| "OS-assigned".to_string());
199 tracing::info!(
200 %mode,
201 host = %config.tcp_host,
202 port = %port_display,
203 "Initializing NetworkManager with TCP request plane"
204 );
205 }
206 RequestPlaneMode::Nats => {
207 tracing::info!(
208 %mode,
209 "Initializing NetworkManager with NATS request plane"
210 );
211 }
212 }
213
214 Self {
215 mode,
216 config,
217 server: Arc::new(OnceCell::new()),
218 cancellation_token,
219 component_registry,
220 }
221 }
222
223 /// Get or create the request plane server
224 ///
225 /// The server is created lazily on first access and cached for subsequent calls.
226 /// The server is automatically started in the background.
227 ///
228 /// # Returns
229 ///
230 /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
231 ///
232 /// # Errors
233 ///
234 /// Returns an error if:
235 /// - Server creation fails (e.g., port already in use)
236 /// - NATS mode is selected but NATS client is not available
237 /// - Configuration is invalid (e.g., malformed bind address)
238 pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
239 let server = self
240 .server
241 .get_or_try_init(async { self.create_server().await })
242 .await?;
243
244 Ok(server.clone())
245 }
246
247 /// Create a new request plane client
248 ///
249 /// Clients are lightweight and not cached. Each call creates a new client instance.
250 ///
251 /// # Returns
252 ///
253 /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
254 ///
255 /// # Errors
256 ///
257 /// Returns an error if:
258 /// - Client creation fails (e.g., invalid configuration)
259 /// - NATS mode is selected but NATS client is not available
260 pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
261 match self.mode {
262 RequestPlaneMode::Http => self.create_http_client(),
263 RequestPlaneMode::Tcp => self.create_tcp_client(),
264 RequestPlaneMode::Nats => self.create_nats_client(),
265 }
266 }
267
268 /// Get the current request plane mode
269 ///
270 /// This is provided primarily for logging and debugging purposes.
271 /// Application logic should not branch on mode - use trait objects instead.
272 pub fn mode(&self) -> RequestPlaneMode {
273 self.mode
274 }
275
276 // ============================================================================
277 // PRIVATE: Server Creation
278 // ============================================================================
279
280 async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
281 match self.mode {
282 RequestPlaneMode::Http => self.create_http_server().await,
283 RequestPlaneMode::Tcp => self.create_tcp_server().await,
284 RequestPlaneMode::Nats => self.create_nats_server().await,
285 }
286 }
287
288 async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
289 use super::ingress::http_endpoint::SharedHttpServer;
290
291 let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
292 .parse()
293 .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
294
295 tracing::info!(
296 bind_addr = %bind_addr,
297 rpc_root = %self.config.http_rpc_root,
298 "Creating HTTP request plane server"
299 );
300
301 let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());
302
303 // Start server in background
304 let server_clone = server.clone();
305 tokio::spawn(async move {
306 if let Err(e) = server_clone.start().await {
307 tracing::error!("HTTP request plane server error: {}", e);
308 }
309 });
310
311 Ok(server as Arc<dyn RequestPlaneServer>)
312 }
313
314 async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
315 // Use the global TCP server to ensure all workers in the same process share
316 // a single server. This is critical for correct endpoint routing.
317 let server = GLOBAL_TCP_SERVER
318 .get_or_try_init(|| async {
319 // Use configured port if specified, otherwise use port 0 (OS assigns free port)
320 let port = self.config.tcp_port.unwrap_or(0);
321 let bind_addr = format!("{}:{}", self.config.tcp_host, port)
322 .parse()
323 .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
324
325 tracing::info!(
326 bind_addr = %bind_addr,
327 port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
328 "Creating TCP request plane server"
329 );
330
331 let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());
332
333 // Bind and start server, getting the actual bound address
334 let actual_addr = server.clone().bind_and_start().await?;
335
336 // Store the actual bound port globally so build_transport_type() can access it
337 set_actual_tcp_rpc_port(actual_addr.port());
338
339 tracing::info!(
340 actual_addr = %actual_addr,
341 actual_port = actual_addr.port(),
342 "TCP request plane server started"
343 );
344
345 Ok::<_, anyhow::Error>(server)
346 })
347 .await?;
348
349 Ok(server.clone() as Arc<dyn RequestPlaneServer>)
350 }
351
352 async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
353 use super::ingress::nats_server::NatsMultiplexedServer;
354
355 let nats_client = self
356 .config
357 .nats_client
358 .as_ref()
359 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
360
361 tracing::info!("Creating NATS request plane server");
362
363 Ok(NatsMultiplexedServer::new(
364 nats_client.clone(),
365 self.component_registry.clone(),
366 self.cancellation_token.clone(),
367 ) as Arc<dyn RequestPlaneServer>)
368 }
369
370 // ============================================================================
371 // PRIVATE: Client Creation
372 // ============================================================================
373
374 fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
375 use super::egress::http_router::HttpRequestClient;
376
377 tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
378 Ok(Arc::new(HttpRequestClient::with_config(
379 self.config.http_client_config.clone(),
380 )?))
381 }
382
383 fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
384 use super::egress::tcp_client::TcpRequestClient;
385
386 tracing::debug!("Creating TCP request plane client with config from NetworkManager");
387 Ok(Arc::new(TcpRequestClient::with_config(
388 self.config.tcp_client_config.clone(),
389 )?))
390 }
391
392 fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
393 use super::egress::nats_client::NatsRequestClient;
394
395 let nats_client = self
396 .config
397 .nats_client
398 .as_ref()
399 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
400
401 tracing::debug!("Creating NATS request plane client");
402 Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
403 }
404}