dynamo_runtime/pipeline/network/manager.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::shared_tcp_endpoint::SharedTcpServer;
18use super::ingress::unified_server::RequestPlaneServer;
19use crate::distributed::RequestPlaneMode;
20use anyhow::Result;
21use async_once_cell::OnceCell;
22use std::sync::Arc;
23use std::sync::OnceLock;
24use tokio_util::sync::CancellationToken;
25
26/// Global storage for the actual TCP RPC port after binding.
27/// Uses OnceLock since the port is set once when the server binds and never changes.
28static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
29
30/// Global storage for the actual HTTP RPC port after binding.
31/// Uses OnceLock since the port is set once when the server binds and never changes.
32static ACTUAL_HTTP_RPC_PORT: OnceLock<u16> = OnceLock::new();
33
34/// Global storage for the shared TCP server instance.
35///
36/// When multiple workers run in the same process, they must share a single TCP server
37/// to ensure all endpoints are registered on the same server. Without this, each worker
38/// would create its own server on a different port, but all would publish the same port
39/// (from ACTUAL_TCP_RPC_PORT) to discovery, causing "No handler found" errors.
40///
41/// Uses `tokio::sync::OnceCell` to support async initialization (binding the TCP socket).
42static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
43 tokio::sync::OnceCell::const_new();
44
45/// Global storage for the shared HTTP server instance.
46///
47/// Same rationale as GLOBAL_TCP_SERVER: multiple workers in the same process must share
48/// a single HTTP server so that all endpoints are registered on the same port.
49static GLOBAL_HTTP_SERVER: tokio::sync::OnceCell<
50 Arc<super::ingress::http_endpoint::SharedHttpServer>,
51> = tokio::sync::OnceCell::const_new();
52
53/// Process-wide cancellation token for the global TCP server.
54///
55/// This token is independent of any individual runtime's cancellation token so that
56/// component Drop impls (e.g. KvRouter::drop → cancel) don't kill the shared accept
57/// loop while the OnceCell still hands out the (now-dead) server to later runtimes.
58static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
59 std::sync::LazyLock::new(CancellationToken::new);
60
61/// Process-wide cancellation token for the global HTTP server.
62static GLOBAL_HTTP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
63 std::sync::LazyLock::new(CancellationToken::new);
64
65/// Get the actual TCP RPC port that the server is listening on.
66pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
67 ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
68 tracing::error!(
69 "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
70 );
71 anyhow::anyhow!(
72 "TCP RPC port not initialized. This is not expected."
73 )
74 })
75}
76
77/// Set the actual TCP RPC port (called internally after server binds).
78fn set_actual_tcp_rpc_port(port: u16) {
79 if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
80 tracing::warn!(
81 existing_port = existing,
82 new_port = port,
83 "TCP RPC port already set, ignoring new value"
84 );
85 }
86}
87
88/// Get the actual HTTP RPC port that the server is listening on.
89pub fn get_actual_http_rpc_port() -> anyhow::Result<u16> {
90 ACTUAL_HTTP_RPC_PORT.get().copied().ok_or_else(|| {
91 tracing::error!(
92 "HTTP RPC port not set - request_plane_server() must be called before get_actual_http_rpc_port()"
93 );
94 anyhow::anyhow!(
95 "HTTP RPC port not initialized. This is not expected."
96 )
97 })
98}
99
100/// Set the actual HTTP RPC port (called internally after server binds).
101fn set_actual_http_rpc_port(port: u16) {
102 if let Err(existing) = ACTUAL_HTTP_RPC_PORT.set(port) {
103 tracing::warn!(
104 existing_port = existing,
105 new_port = port,
106 "HTTP RPC port already set, ignoring new value"
107 );
108 }
109}
110
111/// Network configuration loaded from environment variables
112#[derive(Clone)]
113struct NetworkConfig {
114 // HTTP server configuration
115 http_host: String,
116 /// HTTP port to bind to. If None, the OS will assign a free port.
117 http_port: Option<u16>,
118 http_rpc_root: String,
119
120 // TCP server configuration
121 tcp_host: String,
122 /// TCP port to bind to. If None, the OS will assign a free port.
123 tcp_port: Option<u16>,
124
125 // HTTP client configuration
126 http_client_config: super::egress::http_router::Http2Config,
127
128 // TCP client configuration
129 tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
130
131 // NATS configuration (provided externally, not from env)
132 nats_client: Option<async_nats::Client>,
133}
134
135impl NetworkConfig {
136 /// Load configuration from environment variables
137 ///
138 /// This is the ONLY place where network-related environment variables are read.
139 fn from_env(nats_client: Option<async_nats::Client>) -> Self {
140 Self {
141 // HTTP server configuration
142 // If DYN_HTTP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
143 http_host: std::env::var("DYN_HTTP_RPC_HOST")
144 .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
145 http_port: std::env::var("DYN_HTTP_RPC_PORT")
146 .ok()
147 .and_then(|p| p.parse().ok()),
148 http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
149 .unwrap_or_else(|_| "/v1/rpc".to_string()),
150
151 // TCP server configuration
152 // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
153 tcp_host: std::env::var("DYN_TCP_RPC_HOST")
154 .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
155 tcp_port: std::env::var("DYN_TCP_RPC_PORT")
156 .ok()
157 .and_then(|p| p.parse().ok()),
158
159 // HTTP client configuration (reads DYN_HTTP2_* env vars)
160 http_client_config: super::egress::http_router::Http2Config::from_env(),
161
162 // TCP client configuration (reads DYN_TCP_* env vars)
163 tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
164
165 // NATS (external)
166 nats_client,
167 }
168 }
169}
170
171/// Network Manager - Central coordinator for all network resources
172///
173/// # Responsibilities
174///
175/// 1. **Configuration Management**: Reads and manages all network-related environment variables
176/// 2. **Server Creation**: Creates and starts request plane servers based on mode
177/// 3. **Client Creation**: Creates request plane clients on demand
178/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
179///
180/// # Design Principles
181///
182/// - **Single Source of Truth**: All network config and creation logic lives here
183/// - **Lazy Initialization**: Servers are created only when first accessed
184/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
185/// - **No Leaky Abstractions**: Transport types never escape this module
186///
187/// # Example
188///
189/// ```ignore
190/// // Create manager (typically done once in DistributedRuntime)
191/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
192///
193/// // Get server (lazy init, cached)
194/// let server = manager.server().await?;
195/// server.register_endpoint(...).await?;
196///
197/// // Create client (not cached, lightweight)
198/// let client = manager.create_client()?;
199/// client.send_request(...).await?;
200/// ```
201pub struct NetworkManager {
202 mode: RequestPlaneMode,
203 config: NetworkConfig,
204 server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
205 cancellation_token: CancellationToken,
206 component_registry: crate::component::Registry,
207}
208
209impl NetworkManager {
210 /// Create a new network manager
211 ///
212 /// This is the single constructor for NetworkManager. All configuration
213 /// is loaded from environment variables internally.
214 ///
215 /// # Arguments
216 ///
217 /// * `cancellation_token` - Token for graceful shutdown of servers
218 /// * `nats_client` - Optional NATS client (required only for NATS mode)
219 /// * `component_registry` - Component registry to get NATS service groups from
220 ///
221 /// # Returns
222 ///
223 /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
224 pub fn new(
225 cancellation_token: CancellationToken,
226 nats_client: Option<async_nats::Client>,
227 component_registry: crate::component::Registry,
228 mode: RequestPlaneMode,
229 ) -> Self {
230 let config = NetworkConfig::from_env(nats_client);
231
232 match mode {
233 RequestPlaneMode::Http => {
234 let port_display = config
235 .http_port
236 .map(|p| p.to_string())
237 .unwrap_or_else(|| "OS-assigned".to_string());
238 tracing::info!(
239 %mode,
240 host = %config.http_host,
241 port = %port_display,
242 rpc_root = %config.http_rpc_root,
243 "Initializing NetworkManager with HTTP request plane"
244 );
245 }
246 RequestPlaneMode::Tcp => {
247 let port_display = config
248 .tcp_port
249 .map(|p| p.to_string())
250 .unwrap_or_else(|| "OS-assigned".to_string());
251 tracing::info!(
252 %mode,
253 host = %config.tcp_host,
254 port = %port_display,
255 "Initializing NetworkManager with TCP request plane"
256 );
257 }
258 RequestPlaneMode::Nats => {
259 tracing::info!(
260 %mode,
261 "Initializing NetworkManager with NATS request plane"
262 );
263 }
264 }
265
266 Self {
267 mode,
268 config,
269 server: Arc::new(OnceCell::new()),
270 cancellation_token,
271 component_registry,
272 }
273 }
274
275 /// Get or create the request plane server
276 ///
277 /// The server is created lazily on first access and cached for subsequent calls.
278 /// The server is automatically started in the background.
279 ///
280 /// # Returns
281 ///
282 /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
283 ///
284 /// # Errors
285 ///
286 /// Returns an error if:
287 /// - Server creation fails (e.g., port already in use)
288 /// - NATS mode is selected but NATS client is not available
289 /// - Configuration is invalid (e.g., malformed bind address)
290 pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
291 let server = self
292 .server
293 .get_or_try_init(async { self.create_server().await })
294 .await?;
295
296 Ok(server.clone())
297 }
298
299 /// Create a new request plane client
300 ///
301 /// Clients are lightweight and not cached. Each call creates a new client instance.
302 ///
303 /// # Returns
304 ///
305 /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
306 ///
307 /// # Errors
308 ///
309 /// Returns an error if:
310 /// - Client creation fails (e.g., invalid configuration)
311 /// - NATS mode is selected but NATS client is not available
312 pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
313 match self.mode {
314 RequestPlaneMode::Http => self.create_http_client(),
315 RequestPlaneMode::Tcp => self.create_tcp_client(),
316 RequestPlaneMode::Nats => self.create_nats_client(),
317 }
318 }
319
320 /// Get the current request plane mode
321 ///
322 /// This is provided primarily for logging and debugging purposes.
323 /// Application logic should not branch on mode - use trait objects instead.
324 pub fn mode(&self) -> RequestPlaneMode {
325 self.mode
326 }
327
328 // ============================================================================
329 // PRIVATE: Server Creation
330 // ============================================================================
331
332 async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
333 match self.mode {
334 RequestPlaneMode::Http => self.create_http_server().await,
335 RequestPlaneMode::Tcp => self.create_tcp_server().await,
336 RequestPlaneMode::Nats => self.create_nats_server().await,
337 }
338 }
339
340 async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
341 use super::ingress::http_endpoint::SharedHttpServer;
342
343 // Use the global HTTP server to ensure all workers in the same process share
344 // a single server. This is critical for correct endpoint routing.
345 let server = GLOBAL_HTTP_SERVER
346 .get_or_try_init(|| async {
347 // Use configured port if specified, otherwise use port 0 (OS assigns free port)
348 let port = self.config.http_port.unwrap_or(0);
349 let bind_addr = format!("{}:{}", self.config.http_host, port)
350 .parse()
351 .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
352
353 tracing::info!(
354 bind_addr = %bind_addr,
355 port_source = if self.config.http_port.is_some() { "DYN_HTTP_RPC_PORT" } else { "OS-assigned" },
356 rpc_root = %self.config.http_rpc_root,
357 "Creating HTTP request plane server"
358 );
359
360 let server = SharedHttpServer::new(bind_addr, GLOBAL_HTTP_SERVER_TOKEN.clone());
361
362 // Bind and start server, getting the actual bound address
363 let actual_addr = server.clone().bind_and_start().await?;
364
365 // Store the actual bound port globally so build_transport_type() can access it
366 set_actual_http_rpc_port(actual_addr.port());
367
368 tracing::info!(
369 actual_addr = %actual_addr,
370 actual_port = actual_addr.port(),
371 "HTTP request plane server started"
372 );
373
374 Ok::<_, anyhow::Error>(server)
375 })
376 .await?;
377
378 Ok(server.clone() as Arc<dyn RequestPlaneServer>)
379 }
380
381 async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
382 // Use the global TCP server to ensure all workers in the same process share
383 // a single server. This is critical for correct endpoint routing.
384 let server = GLOBAL_TCP_SERVER
385 .get_or_try_init(|| async {
386 // Use configured port if specified, otherwise use port 0 (OS assigns free port)
387 let port = self.config.tcp_port.unwrap_or(0);
388 let bind_addr = format!("{}:{}", self.config.tcp_host, port)
389 .parse()
390 .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
391
392 tracing::info!(
393 bind_addr = %bind_addr,
394 port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
395 "Creating TCP request plane server"
396 );
397
398 let server = SharedTcpServer::new(bind_addr, GLOBAL_TCP_SERVER_TOKEN.clone());
399
400 // Bind and start server, getting the actual bound address
401 let actual_addr = server.clone().bind_and_start().await?;
402
403 // Store the actual bound port globally so build_transport_type() can access it
404 set_actual_tcp_rpc_port(actual_addr.port());
405
406 tracing::info!(
407 actual_addr = %actual_addr,
408 actual_port = actual_addr.port(),
409 "TCP request plane server started"
410 );
411
412 Ok::<_, anyhow::Error>(server)
413 })
414 .await?;
415
416 Ok(server.clone() as Arc<dyn RequestPlaneServer>)
417 }
418
419 async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
420 use super::ingress::nats_server::NatsMultiplexedServer;
421
422 let nats_client = self
423 .config
424 .nats_client
425 .as_ref()
426 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
427
428 tracing::info!("Creating NATS request plane server");
429
430 Ok(NatsMultiplexedServer::new(
431 nats_client.clone(),
432 self.component_registry.clone(),
433 self.cancellation_token.clone(),
434 ) as Arc<dyn RequestPlaneServer>)
435 }
436
437 // ============================================================================
438 // PRIVATE: Client Creation
439 // ============================================================================
440
441 fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
442 use super::egress::http_router::HttpRequestClient;
443
444 tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
445 Ok(Arc::new(HttpRequestClient::with_config(
446 self.config.http_client_config.clone(),
447 )?))
448 }
449
450 fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
451 use super::egress::tcp_client::TcpRequestClient;
452
453 tracing::debug!("Creating TCP request plane client with config from NetworkManager");
454 Ok(Arc::new(TcpRequestClient::with_config(
455 self.config.tcp_client_config.clone(),
456 )?))
457 }
458
459 fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
460 use super::egress::nats_client::NatsRequestClient;
461
462 let nats_client = self
463 .config
464 .nats_client
465 .as_ref()
466 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
467
468 tracing::debug!("Creating NATS request plane client");
469 Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
470 }
471}