Skip to main content

dynamo_runtime/pipeline/network/
manager.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::shared_tcp_endpoint::SharedTcpServer;
18use super::ingress::unified_server::RequestPlaneServer;
19use crate::distributed::RequestPlaneMode;
20use anyhow::Result;
21use async_once_cell::OnceCell;
22use std::sync::Arc;
23use std::sync::OnceLock;
24use tokio_util::sync::CancellationToken;
25
26/// Global storage for the actual TCP RPC port after binding.
27/// Uses OnceLock since the port is set once when the server binds and never changes.
28static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
29
30/// Global storage for the shared TCP server instance.
31///
32/// When multiple workers run in the same process, they must share a single TCP server
33/// to ensure all endpoints are registered on the same server. Without this, each worker
34/// would create its own server on a different port, but all would publish the same port
35/// (from ACTUAL_TCP_RPC_PORT) to discovery, causing "No handler found" errors.
36///
37/// Uses `tokio::sync::OnceCell` to support async initialization (binding the TCP socket).
38static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
39    tokio::sync::OnceCell::const_new();
40
41/// Get the actual TCP RPC port that the server is listening on.
42pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
43    ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
44        tracing::error!(
45            "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
46        );
47        anyhow::anyhow!(
48            "TCP RPC port not initialized. This is not expected."
49        )
50    })
51}
52
53/// Set the actual TCP RPC port (called internally after server binds).
54fn set_actual_tcp_rpc_port(port: u16) {
55    if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
56        tracing::warn!(
57            existing_port = existing,
58            new_port = port,
59            "TCP RPC port already set, ignoring new value"
60        );
61    }
62}
63
64/// Network configuration loaded from environment variables
65#[derive(Clone)]
66struct NetworkConfig {
67    // HTTP server configuration
68    http_host: String,
69    http_port: u16,
70    http_rpc_root: String,
71
72    // TCP server configuration
73    tcp_host: String,
74    /// TCP port to bind to. If None, the OS will assign a free port.
75    tcp_port: Option<u16>,
76
77    // HTTP client configuration
78    http_client_config: super::egress::http_router::Http2Config,
79
80    // TCP client configuration
81    tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
82
83    // NATS configuration (provided externally, not from env)
84    nats_client: Option<async_nats::Client>,
85}
86
87impl NetworkConfig {
88    /// Load configuration from environment variables
89    ///
90    /// This is the ONLY place where network-related environment variables are read.
91    fn from_env(nats_client: Option<async_nats::Client>) -> Self {
92        Self {
93            // HTTP server configuration
94            http_host: std::env::var("DYN_HTTP_RPC_HOST")
95                .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
96            http_port: std::env::var("DYN_HTTP_RPC_PORT")
97                .ok()
98                .and_then(|p| p.parse().ok())
99                .unwrap_or(8888),
100            http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
101                .unwrap_or_else(|_| "/v1/rpc".to_string()),
102
103            // TCP server configuration
104            // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
105            tcp_host: std::env::var("DYN_TCP_RPC_HOST")
106                .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
107            tcp_port: std::env::var("DYN_TCP_RPC_PORT")
108                .ok()
109                .and_then(|p| p.parse().ok()),
110
111            // HTTP client configuration (reads DYN_HTTP2_* env vars)
112            http_client_config: super::egress::http_router::Http2Config::from_env(),
113
114            // TCP client configuration (reads DYN_TCP_* env vars)
115            tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
116
117            // NATS (external)
118            nats_client,
119        }
120    }
121}
122
123/// Network Manager - Central coordinator for all network resources
124///
125/// # Responsibilities
126///
127/// 1. **Configuration Management**: Reads and manages all network-related environment variables
128/// 2. **Server Creation**: Creates and starts request plane servers based on mode
129/// 3. **Client Creation**: Creates request plane clients on demand
130/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
131///
132/// # Design Principles
133///
134/// - **Single Source of Truth**: All network config and creation logic lives here
135/// - **Lazy Initialization**: Servers are created only when first accessed
136/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
137/// - **No Leaky Abstractions**: Transport types never escape this module
138///
139/// # Example
140///
141/// ```ignore
142/// // Create manager (typically done once in DistributedRuntime)
143/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
144///
145/// // Get server (lazy init, cached)
146/// let server = manager.server().await?;
147/// server.register_endpoint(...).await?;
148///
149/// // Create client (not cached, lightweight)
150/// let client = manager.create_client()?;
151/// client.send_request(...).await?;
152/// ```
153pub struct NetworkManager {
154    mode: RequestPlaneMode,
155    config: NetworkConfig,
156    server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
157    cancellation_token: CancellationToken,
158    component_registry: crate::component::Registry,
159}
160
161impl NetworkManager {
162    /// Create a new network manager
163    ///
164    /// This is the single constructor for NetworkManager. All configuration
165    /// is loaded from environment variables internally.
166    ///
167    /// # Arguments
168    ///
169    /// * `cancellation_token` - Token for graceful shutdown of servers
170    /// * `nats_client` - Optional NATS client (required only for NATS mode)
171    /// * `component_registry` - Component registry to get NATS service groups from
172    ///
173    /// # Returns
174    ///
175    /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
176    pub fn new(
177        cancellation_token: CancellationToken,
178        nats_client: Option<async_nats::Client>,
179        component_registry: crate::component::Registry,
180        mode: RequestPlaneMode,
181    ) -> Self {
182        let config = NetworkConfig::from_env(nats_client);
183
184        match mode {
185            RequestPlaneMode::Http => {
186                tracing::info!(
187                    %mode,
188                    host = %config.http_host,
189                    port = config.http_port,
190                    rpc_root = %config.http_rpc_root,
191                    "Initializing NetworkManager with HTTP request plane"
192                );
193            }
194            RequestPlaneMode::Tcp => {
195                let port_display = config
196                    .tcp_port
197                    .map(|p| p.to_string())
198                    .unwrap_or_else(|| "OS-assigned".to_string());
199                tracing::info!(
200                    %mode,
201                    host = %config.tcp_host,
202                    port = %port_display,
203                    "Initializing NetworkManager with TCP request plane"
204                );
205            }
206            RequestPlaneMode::Nats => {
207                tracing::info!(
208                    %mode,
209                    "Initializing NetworkManager with NATS request plane"
210                );
211            }
212        }
213
214        Self {
215            mode,
216            config,
217            server: Arc::new(OnceCell::new()),
218            cancellation_token,
219            component_registry,
220        }
221    }
222
223    /// Get or create the request plane server
224    ///
225    /// The server is created lazily on first access and cached for subsequent calls.
226    /// The server is automatically started in the background.
227    ///
228    /// # Returns
229    ///
230    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if:
235    /// - Server creation fails (e.g., port already in use)
236    /// - NATS mode is selected but NATS client is not available
237    /// - Configuration is invalid (e.g., malformed bind address)
238    pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
239        let server = self
240            .server
241            .get_or_try_init(async { self.create_server().await })
242            .await?;
243
244        Ok(server.clone())
245    }
246
247    /// Create a new request plane client
248    ///
249    /// Clients are lightweight and not cached. Each call creates a new client instance.
250    ///
251    /// # Returns
252    ///
253    /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
254    ///
255    /// # Errors
256    ///
257    /// Returns an error if:
258    /// - Client creation fails (e.g., invalid configuration)
259    /// - NATS mode is selected but NATS client is not available
260    pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
261        match self.mode {
262            RequestPlaneMode::Http => self.create_http_client(),
263            RequestPlaneMode::Tcp => self.create_tcp_client(),
264            RequestPlaneMode::Nats => self.create_nats_client(),
265        }
266    }
267
268    /// Get the current request plane mode
269    ///
270    /// This is provided primarily for logging and debugging purposes.
271    /// Application logic should not branch on mode - use trait objects instead.
272    pub fn mode(&self) -> RequestPlaneMode {
273        self.mode
274    }
275
276    // ============================================================================
277    // PRIVATE: Server Creation
278    // ============================================================================
279
280    async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
281        match self.mode {
282            RequestPlaneMode::Http => self.create_http_server().await,
283            RequestPlaneMode::Tcp => self.create_tcp_server().await,
284            RequestPlaneMode::Nats => self.create_nats_server().await,
285        }
286    }
287
288    async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
289        use super::ingress::http_endpoint::SharedHttpServer;
290
291        let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
292            .parse()
293            .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
294
295        tracing::info!(
296            bind_addr = %bind_addr,
297            rpc_root = %self.config.http_rpc_root,
298            "Creating HTTP request plane server"
299        );
300
301        let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());
302
303        // Start server in background
304        let server_clone = server.clone();
305        tokio::spawn(async move {
306            if let Err(e) = server_clone.start().await {
307                tracing::error!("HTTP request plane server error: {}", e);
308            }
309        });
310
311        Ok(server as Arc<dyn RequestPlaneServer>)
312    }
313
314    async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
315        // Use the global TCP server to ensure all workers in the same process share
316        // a single server. This is critical for correct endpoint routing.
317        let server = GLOBAL_TCP_SERVER
318            .get_or_try_init(|| async {
319                // Use configured port if specified, otherwise use port 0 (OS assigns free port)
320                let port = self.config.tcp_port.unwrap_or(0);
321                let bind_addr = format!("{}:{}", self.config.tcp_host, port)
322                    .parse()
323                    .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
324
325                tracing::info!(
326                    bind_addr = %bind_addr,
327                    port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
328                    "Creating TCP request plane server"
329                );
330
331                let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());
332
333                // Bind and start server, getting the actual bound address
334                let actual_addr = server.clone().bind_and_start().await?;
335
336                // Store the actual bound port globally so build_transport_type() can access it
337                set_actual_tcp_rpc_port(actual_addr.port());
338
339                tracing::info!(
340                    actual_addr = %actual_addr,
341                    actual_port = actual_addr.port(),
342                    "TCP request plane server started"
343                );
344
345                Ok::<_, anyhow::Error>(server)
346            })
347            .await?;
348
349        Ok(server.clone() as Arc<dyn RequestPlaneServer>)
350    }
351
352    async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
353        use super::ingress::nats_server::NatsMultiplexedServer;
354
355        let nats_client = self
356            .config
357            .nats_client
358            .as_ref()
359            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
360
361        tracing::info!("Creating NATS request plane server");
362
363        Ok(NatsMultiplexedServer::new(
364            nats_client.clone(),
365            self.component_registry.clone(),
366            self.cancellation_token.clone(),
367        ) as Arc<dyn RequestPlaneServer>)
368    }
369
370    // ============================================================================
371    // PRIVATE: Client Creation
372    // ============================================================================
373
374    fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
375        use super::egress::http_router::HttpRequestClient;
376
377        tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
378        Ok(Arc::new(HttpRequestClient::with_config(
379            self.config.http_client_config.clone(),
380        )?))
381    }
382
383    fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
384        use super::egress::tcp_client::TcpRequestClient;
385
386        tracing::debug!("Creating TCP request plane client with config from NetworkManager");
387        Ok(Arc::new(TcpRequestClient::with_config(
388            self.config.tcp_client_config.clone(),
389        )?))
390    }
391
392    fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
393        use super::egress::nats_client::NatsRequestClient;
394
395        let nats_client = self
396            .config
397            .nats_client
398            .as_ref()
399            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
400
401        tracing::debug!("Creating NATS request plane client");
402        Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
403    }
404}