dynamo_runtime/pipeline/network/manager.rs
1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types (SharedHttpServer, TcpRequestClient, etc.)
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::unified_server::RequestPlaneServer;
18use crate::config::RequestPlaneMode;
19use anyhow::Result;
20use async_once_cell::OnceCell;
21use std::sync::Arc;
22use tokio_util::sync::CancellationToken;
23
24/// Network configuration loaded from environment variables
25#[derive(Clone)]
26struct NetworkConfig {
27 // HTTP server configuration
28 http_host: String,
29 http_port: u16,
30 http_rpc_root: String,
31
32 // TCP server configuration
33 tcp_host: String,
34 tcp_port: u16,
35
36 // HTTP client configuration
37 http_client_config: super::egress::http_router::Http2Config,
38
39 // TCP client configuration
40 tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
41
42 // NATS configuration (provided externally, not from env)
43 nats_client: Option<async_nats::Client>,
44}
45
46impl NetworkConfig {
47 /// Load configuration from environment variables
48 ///
49 /// This is the ONLY place where network-related environment variables are read.
50 fn from_env(nats_client: Option<async_nats::Client>) -> Self {
51 Self {
52 // HTTP server configuration
53 http_host: std::env::var("DYN_HTTP_RPC_HOST")
54 .unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
55 http_port: std::env::var("DYN_HTTP_RPC_PORT")
56 .ok()
57 .and_then(|p| p.parse().ok())
58 .unwrap_or(8888),
59 http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
60 .unwrap_or_else(|_| "/v1/rpc".to_string()),
61
62 // TCP server configuration
63 tcp_host: std::env::var("DYN_TCP_RPC_HOST")
64 .unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
65 tcp_port: std::env::var("DYN_TCP_RPC_PORT")
66 .ok()
67 .and_then(|p| p.parse().ok())
68 .unwrap_or(9999),
69
70 // HTTP client configuration (reads DYN_HTTP2_* env vars)
71 http_client_config: super::egress::http_router::Http2Config::from_env(),
72
73 // TCP client configuration (reads DYN_TCP_* env vars)
74 tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
75
76 // NATS (external)
77 nats_client,
78 }
79 }
80}
81
82/// Network Manager - Central coordinator for all network resources
83///
84/// # Responsibilities
85///
86/// 1. **Configuration Management**: Reads and manages all network-related environment variables
87/// 2. **Server Creation**: Creates and starts request plane servers based on mode
88/// 3. **Client Creation**: Creates request plane clients on demand
89/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
90///
91/// # Design Principles
92///
93/// - **Single Source of Truth**: All network config and creation logic lives here
94/// - **Lazy Initialization**: Servers are created only when first accessed
95/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
96/// - **No Leaky Abstractions**: Transport types never escape this module
97///
98/// # Example
99///
100/// ```ignore
101/// // Create manager (typically done once in DistributedRuntime)
102/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry);
103///
104/// // Get server (lazy init, cached)
105/// let server = manager.server().await?;
106/// server.register_endpoint(...).await?;
107///
108/// // Create client (not cached, lightweight)
109/// let client = manager.create_client()?;
110/// client.send_request(...).await?;
111/// ```
112pub struct NetworkManager {
113 mode: RequestPlaneMode,
114 config: NetworkConfig,
115 server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
116 cancellation_token: CancellationToken,
117 component_registry: crate::component::Registry,
118}
119
120impl NetworkManager {
121 /// Create a new network manager
122 ///
123 /// This is the single constructor for NetworkManager. All configuration
124 /// is loaded from environment variables internally.
125 ///
126 /// # Arguments
127 ///
128 /// * `cancellation_token` - Token for graceful shutdown of servers
129 /// * `nats_client` - Optional NATS client (required only for NATS mode)
130 /// * `component_registry` - Component registry to get NATS service groups from
131 ///
132 /// # Returns
133 ///
134 /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
135 pub fn new(
136 cancellation_token: CancellationToken,
137 nats_client: Option<async_nats::Client>,
138 component_registry: crate::component::Registry,
139 ) -> Arc<Self> {
140 let mode = RequestPlaneMode::get();
141 let config = NetworkConfig::from_env(nats_client);
142
143 tracing::info!(
144 mode = %mode,
145 http_port = config.http_port,
146 tcp_port = config.tcp_port,
147 "Initializing NetworkManager"
148 );
149
150 Arc::new(Self {
151 mode,
152 config,
153 server: Arc::new(OnceCell::new()),
154 cancellation_token,
155 component_registry,
156 })
157 }
158
159 /// Get or create the request plane server
160 ///
161 /// The server is created lazily on first access and cached for subsequent calls.
162 /// The server is automatically started in the background.
163 ///
164 /// # Returns
165 ///
166 /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
167 ///
168 /// # Errors
169 ///
170 /// Returns an error if:
171 /// - Server creation fails (e.g., port already in use)
172 /// - NATS mode is selected but NATS client is not available
173 /// - Configuration is invalid (e.g., malformed bind address)
174 pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
175 let server = self
176 .server
177 .get_or_try_init(async { self.create_server().await })
178 .await?;
179
180 Ok(server.clone())
181 }
182
183 /// Create a new request plane client
184 ///
185 /// Clients are lightweight and not cached. Each call creates a new client instance.
186 ///
187 /// # Returns
188 ///
189 /// Returns a trait object that abstracts over HTTP/TCP/NATS implementations.
190 ///
191 /// # Errors
192 ///
193 /// Returns an error if:
194 /// - Client creation fails (e.g., invalid configuration)
195 /// - NATS mode is selected but NATS client is not available
196 pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
197 match self.mode {
198 RequestPlaneMode::Http => self.create_http_client(),
199 RequestPlaneMode::Tcp => self.create_tcp_client(),
200 RequestPlaneMode::Nats => self.create_nats_client(),
201 }
202 }
203
204 /// Get the current request plane mode
205 ///
206 /// This is provided primarily for logging and debugging purposes.
207 /// Application logic should not branch on mode - use trait objects instead.
208 pub fn mode(&self) -> RequestPlaneMode {
209 self.mode
210 }
211
212 // ============================================================================
213 // PRIVATE: Server Creation
214 // ============================================================================
215
216 async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
217 match self.mode {
218 RequestPlaneMode::Http => self.create_http_server().await,
219 RequestPlaneMode::Tcp => self.create_tcp_server().await,
220 RequestPlaneMode::Nats => self.create_nats_server().await,
221 }
222 }
223
224 async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
225 use super::ingress::http_endpoint::SharedHttpServer;
226
227 let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
228 .parse()
229 .map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
230
231 tracing::info!(
232 bind_addr = %bind_addr,
233 rpc_root = %self.config.http_rpc_root,
234 "Creating HTTP request plane server"
235 );
236
237 let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());
238
239 // Start server in background
240 let server_clone = server.clone();
241 tokio::spawn(async move {
242 if let Err(e) = server_clone.start().await {
243 tracing::error!("HTTP request plane server error: {}", e);
244 }
245 });
246
247 Ok(server as Arc<dyn RequestPlaneServer>)
248 }
249
250 async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
251 use super::ingress::shared_tcp_endpoint::SharedTcpServer;
252
253 let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port)
254 .parse()
255 .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
256
257 tracing::info!(
258 bind_addr = %bind_addr,
259 "Creating TCP request plane server"
260 );
261
262 let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());
263
264 // Start server in background
265 let server_clone = server.clone();
266 tokio::spawn(async move {
267 if let Err(e) = server_clone.start().await {
268 tracing::error!("TCP request plane server error: {}", e);
269 }
270 });
271
272 Ok(server as Arc<dyn RequestPlaneServer>)
273 }
274
275 async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
276 use super::ingress::nats_server::NatsMultiplexedServer;
277
278 let nats_client = self
279 .config
280 .nats_client
281 .as_ref()
282 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
283
284 tracing::info!("Creating NATS request plane server");
285
286 Ok(NatsMultiplexedServer::new(
287 nats_client.clone(),
288 self.component_registry.clone(),
289 self.cancellation_token.clone(),
290 ) as Arc<dyn RequestPlaneServer>)
291 }
292
293 // ============================================================================
294 // PRIVATE: Client Creation
295 // ============================================================================
296
297 fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
298 use super::egress::http_router::HttpRequestClient;
299
300 tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
301 Ok(Arc::new(HttpRequestClient::with_config(
302 self.config.http_client_config.clone(),
303 )?))
304 }
305
306 fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
307 use super::egress::tcp_client::TcpRequestClient;
308
309 tracing::debug!("Creating TCP request plane client with config from NetworkManager");
310 Ok(Arc::new(TcpRequestClient::with_config(
311 self.config.tcp_client_config.clone(),
312 )?))
313 }
314
315 fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
316 use super::egress::nats_client::NatsRequestClient;
317
318 let nats_client = self
319 .config
320 .nats_client
321 .as_ref()
322 .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
323
324 tracing::debug!("Creating NATS request plane client");
325 Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
326 }
327}