Skip to main content

dynamo_runtime/pipeline/network/
manager.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::shared_tcp_endpoint::SharedTcpServer;
18use super::ingress::unified_server::RequestPlaneServer;
19use crate::distributed::RequestPlaneMode;
20use anyhow::Result;
21use async_once_cell::OnceCell;
22use std::sync::Arc;
23use std::sync::OnceLock;
24use tokio_util::sync::CancellationToken;
25
26/// Global storage for the actual TCP RPC port after binding.
27/// Uses OnceLock since the port is set once when the server binds and never changes.
28static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
29
30/// Global storage for the actual HTTP RPC port after binding.
31/// Uses OnceLock since the port is set once when the server binds and never changes.
32static ACTUAL_HTTP_RPC_PORT: OnceLock<u16> = OnceLock::new();
33
34/// Global storage for the shared TCP server instance.
35///
36/// When multiple workers run in the same process, they must share a single TCP server
37/// to ensure all endpoints are registered on the same server. Without this, each worker
38/// would create its own server on a different port, but all would publish the same port
39/// (from ACTUAL_TCP_RPC_PORT) to discovery, causing "No handler found" errors.
40///
41/// Uses `tokio::sync::OnceCell` to support async initialization (binding the TCP socket).
42static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
43    tokio::sync::OnceCell::const_new();
44
45/// Global storage for the shared HTTP server instance.
46///
47/// Same rationale as GLOBAL_TCP_SERVER: multiple workers in the same process must share
48/// a single HTTP server so that all endpoints are registered on the same port.
49static GLOBAL_HTTP_SERVER: tokio::sync::OnceCell<
50    Arc<super::ingress::http_endpoint::SharedHttpServer>,
51> = tokio::sync::OnceCell::const_new();
52
53/// Process-wide cancellation token for the global TCP server.
54///
55/// This token is independent of any individual runtime's cancellation token so that
56/// component Drop impls (e.g. KvRouter::drop → cancel) don't kill the shared accept
57/// loop while the OnceCell still hands out the (now-dead) server to later runtimes.
58static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
59    std::sync::LazyLock::new(CancellationToken::new);
60
61/// Process-wide cancellation token for the global HTTP server.
62static GLOBAL_HTTP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
63    std::sync::LazyLock::new(CancellationToken::new);
64
65/// Get the actual TCP RPC port that the server is listening on.
66pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
67    ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
68        tracing::error!(
69            "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
70        );
71        anyhow::anyhow!(
72            "TCP RPC port not initialized. This is not expected."
73        )
74    })
75}
76
77/// Set the actual TCP RPC port (called internally after server binds).
78fn set_actual_tcp_rpc_port(port: u16) {
79    if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
80        tracing::warn!(
81            existing_port = existing,
82            new_port = port,
83            "TCP RPC port already set, ignoring new value"
84        );
85    }
86}
87
88/// Get the actual HTTP RPC port that the server is listening on.
89pub fn get_actual_http_rpc_port() -> anyhow::Result<u16> {
90    ACTUAL_HTTP_RPC_PORT.get().copied().ok_or_else(|| {
91        tracing::error!(
92            "HTTP RPC port not set - request_plane_server() must be called before get_actual_http_rpc_port()"
93        );
94        anyhow::anyhow!(
95            "HTTP RPC port not initialized. This is not expected."
96        )
97    })
98}
99
100/// Set the actual HTTP RPC port (called internally after server binds).
101fn set_actual_http_rpc_port(port: u16) {
102    if let Err(existing) = ACTUAL_HTTP_RPC_PORT.set(port) {
103        tracing::warn!(
104            existing_port = existing,
105            new_port = port,
106            "HTTP RPC port already set, ignoring new value"
107        );
108    }
109}
110
111/// Network configuration loaded from environment variables
112#[derive(Clone)]
113struct NetworkConfig {
114    // HTTP server configuration
115    http_host: String,
116    /// HTTP port to bind to. If None, the OS will assign a free port.
117    http_port: Option<u16>,
118    http_rpc_root: String,
119
120    // TCP server configuration
121    tcp_host: String,
122    /// TCP port to bind to. If None, the OS will assign a free port.
123    tcp_port: Option<u16>,
124
125    // HTTP client configuration
126    http_client_config: super::egress::http_router::Http2Config,
127
128    // TCP client configuration
129    tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
130
131    // NATS configuration (provided externally, not from env)
132    nats_client: Option<async_nats::Client>,
133}
134
135impl NetworkConfig {
136    /// Load configuration from environment variables
137    ///
138    /// This is the ONLY place where network-related environment variables are read.
139    fn from_env(nats_client: Option<async_nats::Client>) -> Self {
140        Self {
141            // HTTP server configuration
142            // If DYN_HTTP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
143            http_host: std::env::var("DYN_HTTP_RPC_HOST")
144                .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
145            http_port: std::env::var("DYN_HTTP_RPC_PORT")
146                .ok()
147                .and_then(|p| p.parse().ok()),
148            http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
149                .unwrap_or_else(|_| "/v1/rpc".to_string()),
150
151            // TCP server configuration
152            // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
153            tcp_host: std::env::var("DYN_TCP_RPC_HOST")
154                .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
155            tcp_port: std::env::var("DYN_TCP_RPC_PORT")
156                .ok()
157                .and_then(|p| p.parse().ok()),
158
159            // HTTP client configuration (reads DYN_HTTP2_* env vars)
160            http_client_config: super::egress::http_router::Http2Config::from_env(),
161
162            // TCP client configuration (reads DYN_TCP_* env vars)
163            tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
164
165            // NATS (external)
166            nats_client,
167        }
168    }
169}
170
171/// Network Manager - Central coordinator for all network resources
172///
173/// # Responsibilities
174///
175/// 1. **Configuration Management**: Reads and manages all network-related environment variables
176/// 2. **Server Creation**: Creates and starts request plane servers based on mode
177/// 3. **Client Creation**: Creates request plane clients on demand
178/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
179///
180/// # Design Principles
181///
182/// - **Single Source of Truth**: All network config and creation logic lives here
183/// - **Lazy Initialization**: Servers are created only when first accessed
184/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
185/// - **No Leaky Abstractions**: Transport types never escape this module
186///
187/// # Example
188///
189/// ```ignore
190/// // Create manager (typically done once in DistributedRuntime)
191/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
192///
193/// // Get server (lazy init, cached)
194/// let server = manager.server().await?;
195/// server.register_endpoint(...).await?;
196///
197/// // Create client (not cached, lightweight)
198/// let client = manager.create_client()?;
199/// client.send_request(...).await?;
200/// ```
201pub struct NetworkManager {
202    mode: RequestPlaneMode,
203    config: NetworkConfig,
204    server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
205    cancellation_token: CancellationToken,
206    component_registry: crate::component::Registry,
207}
208
209impl NetworkManager {
210    /// Create a new network manager
211    ///
212    /// This is the single constructor for NetworkManager. All configuration
213    /// is loaded from environment variables internally.
214    ///
215    /// # Arguments
216    ///
217    /// * `cancellation_token` - Token for graceful shutdown of servers
218    /// * `nats_client` - Optional NATS client (required only for NATS mode)
219    /// * `component_registry` - Component registry to get NATS service groups from
220    ///
221    /// # Returns
222    ///
223    /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
224    pub fn new(
225        cancellation_token: CancellationToken,
226        nats_client: Option<async_nats::Client>,
227        component_registry: crate::component::Registry,
228        mode: RequestPlaneMode,
229    ) -> Self {
230        let config = NetworkConfig::from_env(nats_client);
231
232        match mode {
233            RequestPlaneMode::Http => {
234                let port_display = config
235                    .http_port
236                    .map(|p| p.to_string())
237                    .unwrap_or_else(|| "OS-assigned".to_string());
238                tracing::info!(
239                    %mode,
240                    host = %config.http_host,
241                    port = %port_display,
242                    rpc_root = %config.http_rpc_root,
243                    "Initializing NetworkManager with HTTP request plane"
244                );
245            }
246            RequestPlaneMode::Tcp => {
247                let port_display = config
248                    .tcp_port
249                    .map(|p| p.to_string())
250                    .unwrap_or_else(|| "OS-assigned".to_string());
251                tracing::info!(
252                    %mode,
253                    host = %config.tcp_host,
254                    port = %port_display,
255                    "Initializing NetworkManager with TCP request plane"
256                );
257            }
258            RequestPlaneMode::Nats => {
259                tracing::info!(
260                    %mode,
261                    "Initializing NetworkManager with NATS request plane"
262                );
263            }
264        }
265
266        Self {
267            mode,
268            config,
269            server: Arc::new(OnceCell::new()),
270            cancellation_token,
271            component_registry,
272        }
273    }
274
275    /// Get or create the request plane server
276    ///
277    /// The server is created lazily on first access and cached for subsequent calls.
278    /// The server is automatically started in the background.
279    ///
280    /// # Returns
281    ///
282    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
283    ///
284    /// # Errors
285    ///
286    /// Returns an error if:
287    /// - Server creation fails (e.g., port already in use)
288    /// - NATS mode is selected but NATS client is not available
289    /// - Configuration is invalid (e.g., malformed bind address)
290    pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
291        let server = self
292            .server
293            .get_or_try_init(async { self.create_server().await })
294            .await?;
295
296        Ok(server.clone())
297    }
298
299    /// Create a new request plane client
300    ///
301    /// Clients are lightweight and not cached. Each call creates a new client instance.
302    ///
303    /// # Returns
304    ///
305    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
306    ///
307    /// # Errors
308    ///
309    /// Returns an error if:
310    /// - Client creation fails (e.g., invalid configuration)
311    /// - NATS mode is selected but NATS client is not available
312    pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
313        match self.mode {
314            RequestPlaneMode::Http => self.create_http_client(),
315            RequestPlaneMode::Tcp => self.create_tcp_client(),
316            RequestPlaneMode::Nats => self.create_nats_client(),
317        }
318    }
319
320    /// Get the current request plane mode
321    ///
322    /// This is provided primarily for logging and debugging purposes.
323    /// Application logic should not branch on mode - use trait objects instead.
324    pub fn mode(&self) -> RequestPlaneMode {
325        self.mode
326    }
327
328    // ============================================================================
329    // PRIVATE: Server Creation
330    // ============================================================================
331
332    async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
333        match self.mode {
334            RequestPlaneMode::Http => self.create_http_server().await,
335            RequestPlaneMode::Tcp => self.create_tcp_server().await,
336            RequestPlaneMode::Nats => self.create_nats_server().await,
337        }
338    }
339
340    async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
341        use super::ingress::http_endpoint::SharedHttpServer;
342
343        // Use the global HTTP server to ensure all workers in the same process share
344        // a single server. This is critical for correct endpoint routing.
345        let server = GLOBAL_HTTP_SERVER
346            .get_or_try_init(|| async {
347                // Use configured port if specified, otherwise use port 0 (OS assigns free port)
348                let port = self.config.http_port.unwrap_or(0);
349                let bind_addr = format!("{}:{}", self.config.http_host, port)
350                    .parse()
351                    .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
352
353                tracing::info!(
354                    bind_addr = %bind_addr,
355                    port_source = if self.config.http_port.is_some() { "DYN_HTTP_RPC_PORT" } else { "OS-assigned" },
356                    rpc_root = %self.config.http_rpc_root,
357                    "Creating HTTP request plane server"
358                );
359
360                let server = SharedHttpServer::new(bind_addr, GLOBAL_HTTP_SERVER_TOKEN.clone());
361
362                // Bind and start server, getting the actual bound address
363                let actual_addr = server.clone().bind_and_start().await?;
364
365                // Store the actual bound port globally so build_transport_type() can access it
366                set_actual_http_rpc_port(actual_addr.port());
367
368                tracing::info!(
369                    actual_addr = %actual_addr,
370                    actual_port = actual_addr.port(),
371                    "HTTP request plane server started"
372                );
373
374                Ok::<_, anyhow::Error>(server)
375            })
376            .await?;
377
378        Ok(server.clone() as Arc<dyn RequestPlaneServer>)
379    }
380
381    async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
382        // Use the global TCP server to ensure all workers in the same process share
383        // a single server. This is critical for correct endpoint routing.
384        let server = GLOBAL_TCP_SERVER
385            .get_or_try_init(|| async {
386                // Use configured port if specified, otherwise use port 0 (OS assigns free port)
387                let port = self.config.tcp_port.unwrap_or(0);
388                let bind_addr = format!("{}:{}", self.config.tcp_host, port)
389                    .parse()
390                    .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
391
392                tracing::info!(
393                    bind_addr = %bind_addr,
394                    port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
395                    "Creating TCP request plane server"
396                );
397
398                let server = SharedTcpServer::new(bind_addr, GLOBAL_TCP_SERVER_TOKEN.clone());
399
400                // Bind and start server, getting the actual bound address
401                let actual_addr = server.clone().bind_and_start().await?;
402
403                // Store the actual bound port globally so build_transport_type() can access it
404                set_actual_tcp_rpc_port(actual_addr.port());
405
406                tracing::info!(
407                    actual_addr = %actual_addr,
408                    actual_port = actual_addr.port(),
409                    "TCP request plane server started"
410                );
411
412                Ok::<_, anyhow::Error>(server)
413            })
414            .await?;
415
416        Ok(server.clone() as Arc<dyn RequestPlaneServer>)
417    }
418
419    async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
420        use super::ingress::nats_server::NatsMultiplexedServer;
421
422        let nats_client = self
423            .config
424            .nats_client
425            .as_ref()
426            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
427
428        tracing::info!("Creating NATS request plane server");
429
430        Ok(NatsMultiplexedServer::new(
431            nats_client.clone(),
432            self.component_registry.clone(),
433            self.cancellation_token.clone(),
434        ) as Arc<dyn RequestPlaneServer>)
435    }
436
437    // ============================================================================
438    // PRIVATE: Client Creation
439    // ============================================================================
440
441    fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
442        use super::egress::http_router::HttpRequestClient;
443
444        tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
445        Ok(Arc::new(HttpRequestClient::with_config(
446            self.config.http_client_config.clone(),
447        )?))
448    }
449
450    fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
451        use super::egress::tcp_client::TcpRequestClient;
452
453        tracing::debug!("Creating TCP request plane client with config from NetworkManager");
454        Ok(Arc::new(TcpRequestClient::with_config(
455            self.config.tcp_client_config.clone(),
456        )?))
457    }
458
459    fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
460        use super::egress::nats_client::NatsRequestClient;
461
462        let nats_client = self
463            .config
464            .nats_client
465            .as_ref()
466            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
467
468        tracing::debug!("Creating NATS request plane client");
469        Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
470    }
471}