use super::egress::unified_client::RequestPlaneClient;
use super::ingress::shared_tcp_endpoint::SharedTcpServer;
use super::ingress::unified_server::RequestPlaneServer;
use crate::distributed::RequestPlaneMode;
use anyhow::Result;
use async_once_cell::OnceCell;
use std::sync::Arc;
use std::sync::OnceLock;
use tokio_util::sync::CancellationToken;
static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
tokio::sync::OnceCell::const_new();
static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
std::sync::LazyLock::new(CancellationToken::new);
pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
tracing::error!(
"TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
);
anyhow::anyhow!(
"TCP RPC port not initialized. This is not expected."
)
})
}
fn set_actual_tcp_rpc_port(port: u16) {
if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
tracing::warn!(
existing_port = existing,
new_port = port,
"TCP RPC port already set, ignoring new value"
);
}
}
#[derive(Clone)]
struct NetworkConfig {
tcp_host: String,
tcp_port: Option<u16>,
tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
nats_client: Option<async_nats::Client>,
}
impl NetworkConfig {
fn from_env(nats_client: Option<async_nats::Client>) -> Self {
Self {
tcp_host: crate::utils::tcp_rpc_host_from_env(),
tcp_port: std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok()),
tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
nats_client,
}
}
}
pub struct NetworkManager {
mode: RequestPlaneMode,
config: NetworkConfig,
server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
cancellation_token: CancellationToken,
component_registry: crate::component::Registry,
}
impl NetworkManager {
pub fn new(
cancellation_token: CancellationToken,
nats_client: Option<async_nats::Client>,
component_registry: crate::component::Registry,
mode: RequestPlaneMode,
) -> Self {
let config = NetworkConfig::from_env(nats_client);
match mode {
RequestPlaneMode::Tcp => {
let port_display = config
.tcp_port
.map(|p| p.to_string())
.unwrap_or_else(|| "OS-assigned".to_string());
tracing::info!(
%mode,
host = %config.tcp_host,
port = %port_display,
"Initializing NetworkManager with TCP request plane"
);
}
RequestPlaneMode::Nats => {
tracing::info!(
%mode,
"Initializing NetworkManager with NATS request plane"
);
}
}
Self {
mode,
config,
server: Arc::new(OnceCell::new()),
cancellation_token,
component_registry,
}
}
pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
let server = self
.server
.get_or_try_init(async { self.create_server().await })
.await?;
Ok(server.clone())
}
pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
match self.mode {
RequestPlaneMode::Tcp => self.create_tcp_client(),
RequestPlaneMode::Nats => self.create_nats_client(),
}
}
pub fn mode(&self) -> RequestPlaneMode {
self.mode
}
async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
match self.mode {
RequestPlaneMode::Tcp => self.create_tcp_server().await,
RequestPlaneMode::Nats => self.create_nats_server().await,
}
}
async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
let server = GLOBAL_TCP_SERVER
.get_or_try_init(|| async {
let port = self.config.tcp_port.unwrap_or(0);
let bind_addr = format!("{}:{}", self.config.tcp_host, port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
"Creating TCP request plane server"
);
let server = SharedTcpServer::new(bind_addr, GLOBAL_TCP_SERVER_TOKEN.clone());
let actual_addr = server.clone().bind_and_start().await?;
set_actual_tcp_rpc_port(actual_addr.port());
tracing::info!(
actual_addr = %actual_addr,
actual_port = actual_addr.port(),
"TCP request plane server started"
);
Ok::<_, anyhow::Error>(server)
})
.await?;
Ok(server.clone() as Arc<dyn RequestPlaneServer>)
}
async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::nats_server::NatsMultiplexedServer;
let nats_client = self
.config
.nats_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
tracing::info!("Creating NATS request plane server");
Ok(NatsMultiplexedServer::new(
nats_client.clone(),
self.component_registry.clone(),
self.cancellation_token.clone(),
) as Arc<dyn RequestPlaneServer>)
}
fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::tcp_client::TcpRequestClient;
tracing::debug!("Creating TCP request plane client with config from NetworkManager");
Ok(Arc::new(TcpRequestClient::with_config(
self.config.tcp_client_config.clone(),
)?))
}
fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::nats_client::NatsRequestClient;
let nats_client = self
.config
.nats_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
tracing::debug!("Creating NATS request plane client");
Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn manager_for(mode: RequestPlaneMode) -> NetworkManager {
NetworkManager::new(
CancellationToken::new(),
None,
crate::component::Registry::new(),
mode,
)
}
#[test]
fn tcp_mode_creates_tcp_client_without_nats_client() {
let tcp = manager_for(RequestPlaneMode::Tcp).create_client().unwrap();
assert_eq!(tcp.transport_name(), "tcp");
}
#[test]
fn nats_mode_requires_nats_client() {
match manager_for(RequestPlaneMode::Nats).create_client() {
Ok(client) => panic!(
"expected NATS mode without NATS client to fail, got {} client",
client.transport_name()
),
Err(err) => assert!(err.to_string().contains("NATS client required")),
}
}
}