Skip to main content

dynamo_runtime/pipeline/network/
manager.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Network Manager - Single Source of Truth for Network Configuration
5//!
6//! This module consolidates ALL network-related configuration and creation logic.
7//! It is the ONLY place in the codebase that:
8//! - Reads environment variables for network configuration
9//! - Knows about transport-specific types
10//! - Performs mode selection based on RequestPlaneMode
11//! - Creates servers and clients
12//!
13//! The rest of the codebase works exclusively with trait objects and never
14//! directly accesses transport implementations or configuration.
15
16use super::egress::unified_client::RequestPlaneClient;
17use super::ingress::shared_tcp_endpoint::SharedTcpServer;
18use super::ingress::unified_server::RequestPlaneServer;
19use crate::distributed::RequestPlaneMode;
20use anyhow::Result;
21use async_once_cell::OnceCell;
22use std::sync::Arc;
23use std::sync::OnceLock;
24use tokio_util::sync::CancellationToken;
25
26/// Global storage for the actual TCP RPC port after binding.
27/// Uses OnceLock since the port is set once when the server binds and never changes.
28static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
29
30/// Global storage for the shared TCP server instance.
31///
32/// When multiple workers run in the same process, they must share a single TCP server
33/// to ensure all endpoints are registered on the same server. Without this, each worker
34/// would create its own server on a different port, but all would publish the same port
35/// (from ACTUAL_TCP_RPC_PORT) to discovery, causing "No handler found" errors.
36///
37/// Uses `tokio::sync::OnceCell` to support async initialization (binding the TCP socket).
38static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
39    tokio::sync::OnceCell::const_new();
40
41/// Process-wide cancellation token for the global TCP server.
42///
43/// This token is independent of any individual runtime's cancellation token so that
44/// component Drop impls (e.g. KvRouter::drop → cancel) don't kill the shared accept
45/// loop while the OnceCell still hands out the (now-dead) server to later runtimes.
46static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
47    std::sync::LazyLock::new(CancellationToken::new);
48
49/// Get the actual TCP RPC port that the server is listening on.
50pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
51    ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
52        tracing::error!(
53            "TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
54        );
55        anyhow::anyhow!(
56            "TCP RPC port not initialized. This is not expected."
57        )
58    })
59}
60
61/// Set the actual TCP RPC port (called internally after server binds).
62fn set_actual_tcp_rpc_port(port: u16) {
63    if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
64        tracing::warn!(
65            existing_port = existing,
66            new_port = port,
67            "TCP RPC port already set, ignoring new value"
68        );
69    }
70}
71
72/// Network configuration loaded from environment variables
73#[derive(Clone)]
74struct NetworkConfig {
75    // TCP server configuration
76    tcp_host: String,
77    /// TCP port to bind to. If None, the OS will assign a free port.
78    tcp_port: Option<u16>,
79
80    // TCP client configuration
81    tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
82
83    // NATS configuration (provided externally, not from env)
84    nats_client: Option<async_nats::Client>,
85}
86
87impl NetworkConfig {
88    /// Load configuration from environment variables
89    ///
90    /// This is the ONLY place where network-related environment variables are read.
91    fn from_env(nats_client: Option<async_nats::Client>) -> Self {
92        Self {
93            // TCP server configuration
94            // If DYN_TCP_RPC_PORT is set, use that port; otherwise None means OS will assign a free port
95            tcp_host: crate::utils::tcp_rpc_host_from_env(),
96            tcp_port: std::env::var("DYN_TCP_RPC_PORT")
97                .ok()
98                .and_then(|p| p.parse().ok()),
99
100            // TCP client configuration (reads DYN_TCP_* env vars)
101            tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
102
103            // NATS (external)
104            nats_client,
105        }
106    }
107}
108
109/// Network Manager - Central coordinator for all network resources
110///
111/// # Responsibilities
112///
113/// 1. **Configuration Management**: Reads and manages all network-related environment variables
114/// 2. **Server Creation**: Creates and starts request plane servers based on mode
115/// 3. **Client Creation**: Creates request plane clients on demand
116/// 4. **Abstraction**: Hides all transport-specific details from the rest of the codebase
117///
118/// # Design Principles
119///
120/// - **Single Source of Truth**: All network config and creation logic lives here
121/// - **Lazy Initialization**: Servers are created only when first accessed
122/// - **Transport Agnostic Interface**: Exposes only trait objects to callers
123/// - **No Leaky Abstractions**: Transport types never escape this module
124///
125/// # Example
126///
127/// ```ignore
128/// // Create manager (typically done once in DistributedRuntime)
129/// let manager = NetworkManager::new(cancel_token, nats_client, component_registry, request_plane_mode);
130///
131/// // Get server (lazy init, cached)
132/// let server = manager.server().await?;
133/// server.register_endpoint(...).await?;
134///
135/// // Create client (not cached, lightweight)
136/// let client = manager.create_client()?;
137/// client.send_request(...).await?;
138/// ```
139pub struct NetworkManager {
140    mode: RequestPlaneMode,
141    config: NetworkConfig,
142    server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
143    cancellation_token: CancellationToken,
144    component_registry: crate::component::Registry,
145}
146
147impl NetworkManager {
148    /// Create a new network manager
149    ///
150    /// This is the single constructor for NetworkManager. All configuration
151    /// is loaded from environment variables internally.
152    ///
153    /// # Arguments
154    ///
155    /// * `cancellation_token` - Token for graceful shutdown of servers
156    /// * `nats_client` - Optional NATS client (required only for NATS mode)
157    /// * `component_registry` - Component registry to get NATS service groups from
158    ///
159    /// # Returns
160    ///
161    /// Returns an Arc-wrapped NetworkManager ready to create servers and clients.
162    pub fn new(
163        cancellation_token: CancellationToken,
164        nats_client: Option<async_nats::Client>,
165        component_registry: crate::component::Registry,
166        mode: RequestPlaneMode,
167    ) -> Self {
168        let config = NetworkConfig::from_env(nats_client);
169
170        match mode {
171            RequestPlaneMode::Tcp => {
172                let port_display = config
173                    .tcp_port
174                    .map(|p| p.to_string())
175                    .unwrap_or_else(|| "OS-assigned".to_string());
176                tracing::info!(
177                    %mode,
178                    host = %config.tcp_host,
179                    port = %port_display,
180                    "Initializing NetworkManager with TCP request plane"
181                );
182            }
183            RequestPlaneMode::Nats => {
184                tracing::info!(
185                    %mode,
186                    "Initializing NetworkManager with NATS request plane"
187                );
188            }
189        }
190
191        Self {
192            mode,
193            config,
194            server: Arc::new(OnceCell::new()),
195            cancellation_token,
196            component_registry,
197        }
198    }
199
200    /// Get or create the request plane server
201    ///
202    /// The server is created lazily on first access and cached for subsequent calls.
203    /// The server is automatically started in the background.
204    ///
205    /// # Returns
206    ///
207    /// Returns a trait object that abstracts over TCP/NATS implementations.
208    ///
209    /// # Errors
210    ///
211    /// Returns an error if:
212    /// - Server creation fails (e.g., port already in use)
213    /// - NATS mode is selected but NATS client is not available
214    /// - Configuration is invalid (e.g., malformed bind address)
215    pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
216        let server = self
217            .server
218            .get_or_try_init(async { self.create_server().await })
219            .await?;
220
221        Ok(server.clone())
222    }
223
224    /// Create a new request plane client
225    ///
226    /// Clients are lightweight and not cached. Each call creates a new client instance.
227    ///
228    /// # Returns
229    ///
230    /// Returns a trait object that abstracts over TCP/NATS implementations.
231    ///
232    /// # Errors
233    ///
234    /// Returns an error if:
235    /// - Client creation fails (e.g., invalid configuration)
236    /// - NATS mode is selected but NATS client is not available
237    pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
238        match self.mode {
239            RequestPlaneMode::Tcp => self.create_tcp_client(),
240            RequestPlaneMode::Nats => self.create_nats_client(),
241        }
242    }
243
244    /// Get the current request plane mode
245    ///
246    /// This is provided primarily for logging and debugging purposes.
247    /// Application logic should not branch on mode - use trait objects instead.
248    pub fn mode(&self) -> RequestPlaneMode {
249        self.mode
250    }
251
252    // ============================================================================
253    // PRIVATE: Server Creation
254    // ============================================================================
255
256    async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
257        match self.mode {
258            RequestPlaneMode::Tcp => self.create_tcp_server().await,
259            RequestPlaneMode::Nats => self.create_nats_server().await,
260        }
261    }
262
263    async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
264        // Use the global TCP server to ensure all workers in the same process share
265        // a single server. This is critical for correct endpoint routing.
266        let server = GLOBAL_TCP_SERVER
267            .get_or_try_init(|| async {
268                // Use configured port if specified, otherwise use port 0 (OS assigns free port)
269                let port = self.config.tcp_port.unwrap_or(0);
270                let bind_addr = format!("{}:{}", self.config.tcp_host, port)
271                    .parse()
272                    .map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
273
274                tracing::info!(
275                    bind_addr = %bind_addr,
276                    port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
277                    "Creating TCP request plane server"
278                );
279
280                let server = SharedTcpServer::new(bind_addr, GLOBAL_TCP_SERVER_TOKEN.clone());
281
282                // Bind and start server, getting the actual bound address
283                let actual_addr = server.clone().bind_and_start().await?;
284
285                // Store the actual bound port globally so build_transport_type() can access it
286                set_actual_tcp_rpc_port(actual_addr.port());
287
288                tracing::info!(
289                    actual_addr = %actual_addr,
290                    actual_port = actual_addr.port(),
291                    "TCP request plane server started"
292                );
293
294                Ok::<_, anyhow::Error>(server)
295            })
296            .await?;
297
298        Ok(server.clone() as Arc<dyn RequestPlaneServer>)
299    }
300
301    async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
302        use super::ingress::nats_server::NatsMultiplexedServer;
303
304        let nats_client = self
305            .config
306            .nats_client
307            .as_ref()
308            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
309
310        tracing::info!("Creating NATS request plane server");
311
312        Ok(NatsMultiplexedServer::new(
313            nats_client.clone(),
314            self.component_registry.clone(),
315            self.cancellation_token.clone(),
316        ) as Arc<dyn RequestPlaneServer>)
317    }
318
319    // ============================================================================
320    // PRIVATE: Client Creation
321    // ============================================================================
322
323    fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
324        use super::egress::tcp_client::TcpRequestClient;
325
326        tracing::debug!("Creating TCP request plane client with config from NetworkManager");
327        Ok(Arc::new(TcpRequestClient::with_config(
328            self.config.tcp_client_config.clone(),
329        )?))
330    }
331
332    fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
333        use super::egress::nats_client::NatsRequestClient;
334
335        let nats_client = self
336            .config
337            .nats_client
338            .as_ref()
339            .ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
340
341        tracing::debug!("Creating NATS request plane client");
342        Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
343    }
344}
345
346#[cfg(test)]
347mod tests {
348    use super::*;
349
350    fn manager_for(mode: RequestPlaneMode) -> NetworkManager {
351        NetworkManager::new(
352            CancellationToken::new(),
353            None,
354            crate::component::Registry::new(),
355            mode,
356        )
357    }
358
359    #[test]
360    fn tcp_mode_creates_tcp_client_without_nats_client() {
361        let tcp = manager_for(RequestPlaneMode::Tcp).create_client().unwrap();
362        assert_eq!(tcp.transport_name(), "tcp");
363    }
364
365    #[test]
366    fn nats_mode_requires_nats_client() {
367        match manager_for(RequestPlaneMode::Nats).create_client() {
368            Ok(client) => panic!(
369                "expected NATS mode without NATS client to fail, got {} client",
370                client.transport_name()
371            ),
372            Err(err) => assert!(err.to_string().contains("NATS client required")),
373        }
374    }
375}