dynamo_runtime/pipeline/network/manager.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::shared_tcp_endpoint::SharedTcpServer;
18use super::ingress::unified_server::RequestPlaneServer;
19use crate::distributed::RequestPlaneMode;
20use anyhow::Result;
21use async_once_cell::OnceCell;
22use std::sync::Arc;
23use std::sync::OnceLock;
24use tokio_util::sync::CancellationToken;
25
26/// Global storage for the actual TCP RPC port after binding.
27/// Uses OnceLock since the port is set once when the server binds and never changes.
28static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
29
30/// Global storage for the shared TCP server instance.
31///
32/// When multiple workers run in the same process, they must share a single TCP server
33/// to ensure all endpoints are registered on the same server. Without this, each worker
34/// would create its own server on a different port, but all would publish the same port
35/// (from ACTUAL_TCP_RPC_PORT) to discovery, causing "No handler found" errors.
36///
37/// Uses `tokio::sync::OnceCell` to support async initialization (binding the TCP socket).
38static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
39 tokio::sync::OnceCell::const_new();
40
41/// Process-wide cancellation token for the global TCP server.
42///
43/// This token is independent of any individual runtime's cancellation token so that
44/// component Drop impls (e.g. KvRouter::drop → cancel) don't kill the shared accept
45/// loop while the OnceCell still hands out the (now-dead) server to later runtimes.
46static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
47 std::sync::LazyLock::new(CancellationToken::new);
48
49/// Get the actual TCP RPC port that the server is listening on.
50pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
51 ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
52 tracing::error!(
53 "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
54 );
55 anyhow::anyhow!(
56 "TCP RPC port not initialized. This is not expected."
57 )
58 })
59}
60
61/// Set the actual TCP RPC port (called internally after server binds).
62fn set_actual_tcp_rpc_port(port: u16) {
63 if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
64 tracing::warn!(
65 existing_port = existing,
66 new_port = port,
67 "TCP RPC port already set, ignoring new value"
68 );
69 }
70}
71
72/// Network configuration loaded from environment variables
73#[derive(Clone)]
74struct NetworkConfig {
75 // TCP server configuration
76 tcp_host: String,
77 /// TCP port to bind to. If None, the OS will assign a free port.
78 tcp_port: Option<u16>,
79
80 // TCP client configuration
81 tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
82
83 // NATS configuration (provided externally, not from env)
84 nats_client: Option<async_nats::Client>,
85}
86
87impl NetworkConfig {
88 /// Load configuration from environment variables
89 ///
90 /// This is the ONLY place where network-related environment variables are read.
91 fn from_env(nats_client: Option<async_nats::Client>) -> Self {
92 Self {
93 // TCP server configuration
94 // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
95 tcp_host: crate::utils::tcp_rpc_host_from_env(),
96 tcp_port: std::env::var("DYN_TCP_RPC_PORT")
97 .ok()
98 .and_then(|p| p.parse().ok()),
99
100 // TCP client configuration (reads DYN_TCP_* env vars)
101 tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
102
103 // NATS (external)
104 nats_client,
105 }
106 }
107}
108
109/// Network Manager - Central coordinator for all network resources
110///
111/// # Responsibilities
112///
113/// 1. **Configuration Management**: Reads and manages all network-related environment variables
114/// 2. **Server Creation**: Creates and starts request plane servers based on mode
115/// 3. **Client Creation**: Creates request plane clients on demand
116/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
117///
118/// # Design Principles
119///
120/// - **Single Source of Truth**: All network config and creation logic lives here
121/// - **Lazy Initialization**: Servers are created only when first accessed
122/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
123/// - **No Leaky Abstractions**: Transport types never escape this module
124///
125/// # Example
126///
127/// ```ignore
128/// // Create manager (typically done once in DistributedRuntime)
129/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
130///
131/// // Get server (lazy init, cached)
132/// let server = manager.server().await?;
133/// server.register_endpoint(...).await?;
134///
135/// // Create client (not cached, lightweight)
136/// let client = manager.create_client()?;
137/// client.send_request(...).await?;
138/// ```
139pub struct NetworkManager {
140 mode: RequestPlaneMode,
141 config: NetworkConfig,
142 server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
143 cancellation_token: CancellationToken,
144 component_registry: crate::component::Registry,
145}
146
147impl NetworkManager {
148 /// Create a new network manager
149 ///
150 /// This is the single constructor for NetworkManager. All configuration
151 /// is loaded from environment variables internally.
152 ///
153 /// # Arguments
154 ///
155 /// * `cancellation_token` - Token for graceful shutdown of servers
156 /// * `nats_client` - Optional NATS client (required only for NATS mode)
157 /// * `component_registry` - Component registry to get NATS service groups from
158 ///
159 /// # Returns
160 ///
161 /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
162 pub fn new(
163 cancellation_token: CancellationToken,
164 nats_client: Option<async_nats::Client>,
165 component_registry: crate::component::Registry,
166 mode: RequestPlaneMode,
167 ) -> Self {
168 let config = NetworkConfig::from_env(nats_client);
169
170 match mode {
171 RequestPlaneMode::Tcp => {
172 let port_display = config
173 .tcp_port
174 .map(|p| p.to_string())
175 .unwrap_or_else(|| "OS-assigned".to_string());
176 tracing::info!(
177 %mode,
178 host = %config.tcp_host,
179 port = %port_display,
180 "Initializing NetworkManager with TCP request plane"
181 );
182 }
183 RequestPlaneMode::Nats => {
184 tracing::info!(
185 %mode,
186 "Initializing NetworkManager with NATS request plane"
187 );
188 }
189 }
190
191 Self {
192 mode,
193 config,
194 server: Arc::new(OnceCell::new()),
195 cancellation_token,
196 component_registry,
197 }
198 }
199
200 /// Get or create the request plane server
201 ///
202 /// The server is created lazily on first access and cached for subsequent calls.
203 /// The server is automatically started in the background.
204 ///
205 /// # Returns
206 ///
207 /// Returns a trait object that abstracts over TCP/NATS implementations.
208 ///
209 /// # Errors
210 ///
211 /// Returns an error if:
212 /// - Server creation fails (e.g., port already in use)
213 /// - NATS mode is selected but NATS client is not available
214 /// - Configuration is invalid (e.g., malformed bind address)
215 pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
216 let server = self
217 .server
218 .get_or_try_init(async { self.create_server().await })
219 .await?;
220
221 Ok(server.clone())
222 }
223
224 /// Create a new request plane client
225 ///
226 /// Clients are lightweight and not cached. Each call creates a new client instance.
227 ///
228 /// # Returns
229 ///
230 /// Returns a trait object that abstracts over TCP/NATS implementations.
231 ///
232 /// # Errors
233 ///
234 /// Returns an error if:
235 /// - Client creation fails (e.g., invalid configuration)
236 /// - NATS mode is selected but NATS client is not available
237 pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
238 match self.mode {
239 RequestPlaneMode::Tcp => self.create_tcp_client(),
240 RequestPlaneMode::Nats => self.create_nats_client(),
241 }
242 }
243
244 /// Get the current request plane mode
245 ///
246 /// This is provided primarily for logging and debugging purposes.
247 /// Application logic should not branch on mode - use trait objects instead.
248 pub fn mode(&self) -> RequestPlaneMode {
249 self.mode
250 }
251
252 // ============================================================================
253 // PRIVATE: Server Creation
254 // ============================================================================
255
256 async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
257 match self.mode {
258 RequestPlaneMode::Tcp => self.create_tcp_server().await,
259 RequestPlaneMode::Nats => self.create_nats_server().await,
260 }
261 }
262
263 async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
264 // Use the global TCP server to ensure all workers in the same process share
265 // a single server. This is critical for correct endpoint routing.
266 let server = GLOBAL_TCP_SERVER
267 .get_or_try_init(|| async {
268 // Use configured port if specified, otherwise use port 0 (OS assigns free port)
269 let port = self.config.tcp_port.unwrap_or(0);
270 let bind_addr = format!("{}:{}", self.config.tcp_host, port)
271 .parse()
272 .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
273
274 tracing::info!(
275 bind_addr = %bind_addr,
276 port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
277 "Creating TCP request plane server"
278 );
279
280 let server = SharedTcpServer::new(bind_addr, GLOBAL_TCP_SERVER_TOKEN.clone());
281
282 // Bind and start server, getting the actual bound address
283 let actual_addr = server.clone().bind_and_start().await?;
284
285 // Store the actual bound port globally so build_transport_type() can access it
286 set_actual_tcp_rpc_port(actual_addr.port());
287
288 tracing::info!(
289 actual_addr = %actual_addr,
290 actual_port = actual_addr.port(),
291 "TCP request plane server started"
292 );
293
294 Ok::<_, anyhow::Error>(server)
295 })
296 .await?;
297
298 Ok(server.clone() as Arc<dyn RequestPlaneServer>)
299 }
300
301 async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
302 use super::ingress::nats_server::NatsMultiplexedServer;
303
304 let nats_client = self
305 .config
306 .nats_client
307 .as_ref()
308 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
309
310 tracing::info!("Creating NATS request plane server");
311
312 Ok(NatsMultiplexedServer::new(
313 nats_client.clone(),
314 self.component_registry.clone(),
315 self.cancellation_token.clone(),
316 ) as Arc<dyn RequestPlaneServer>)
317 }
318
319 // ============================================================================
320 // PRIVATE: Client Creation
321 // ============================================================================
322
323 fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
324 use super::egress::tcp_client::TcpRequestClient;
325
326 tracing::debug!("Creating TCP request plane client with config from NetworkManager");
327 Ok(Arc::new(TcpRequestClient::with_config(
328 self.config.tcp_client_config.clone(),
329 )?))
330 }
331
332 fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
333 use super::egress::nats_client::NatsRequestClient;
334
335 let nats_client = self
336 .config
337 .nats_client
338 .as_ref()
339 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
340
341 tracing::debug!("Creating NATS request plane client");
342 Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
343 }
344}
345
346#[cfg(test)]
347mod tests {
348 use super::*;
349
350 fn manager_for(mode: RequestPlaneMode) -> NetworkManager {
351 NetworkManager::new(
352 CancellationToken::new(),
353 None,
354 crate::component::Registry::new(),
355 mode,
356 )
357 }
358
359 #[test]
360 fn tcp_mode_creates_tcp_client_without_nats_client() {
361 let tcp = manager_for(RequestPlaneMode::Tcp).create_client().unwrap();
362 assert_eq!(tcp.transport_name(), "tcp");
363 }
364
365 #[test]
366 fn nats_mode_requires_nats_client() {
367 match manager_for(RequestPlaneMode::Nats).create_client() {
368 Ok(client) => panic!(
369 "expected NATS mode without NATS client to fail, got {} client",
370 client.transport_name()
371 ),
372 Err(err) => assert!(err.to_string().contains("NATS client required")),
373 }
374 }
375}