use super::egress::unified_client::RequestPlaneClient;
use super::ingress::shared_tcp_endpoint::SharedTcpServer;
use super::ingress::unified_server::RequestPlaneServer;
use crate::distributed::RequestPlaneMode;
use anyhow::Result;
use async_once_cell::OnceCell;
use std::sync::Arc;
use std::sync::OnceLock;
use tokio_util::sync::CancellationToken;
static ACTUAL_TCP_RPC_PORT: OnceLock<u16> = OnceLock::new();
static ACTUAL_HTTP_RPC_PORT: OnceLock<u16> = OnceLock::new();
static GLOBAL_TCP_SERVER: tokio::sync::OnceCell<Arc<SharedTcpServer>> =
tokio::sync::OnceCell::const_new();
static GLOBAL_HTTP_SERVER: tokio::sync::OnceCell<
Arc<super::ingress::http_endpoint::SharedHttpServer>,
> = tokio::sync::OnceCell::const_new();
static GLOBAL_TCP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
std::sync::LazyLock::new(CancellationToken::new);
static GLOBAL_HTTP_SERVER_TOKEN: std::sync::LazyLock<CancellationToken> =
std::sync::LazyLock::new(CancellationToken::new);
pub fn get_actual_tcp_rpc_port() -> anyhow::Result<u16> {
ACTUAL_TCP_RPC_PORT.get().copied().ok_or_else(|| {
tracing::error!(
"TCP RPC port not set - request_plane_server() must be called before get_actual_tcp_rpc_port()"
);
anyhow::anyhow!(
"TCP RPC port not initialized. This is not expected."
)
})
}
fn set_actual_tcp_rpc_port(port: u16) {
if let Err(existing) = ACTUAL_TCP_RPC_PORT.set(port) {
tracing::warn!(
existing_port = existing,
new_port = port,
"TCP RPC port already set, ignoring new value"
);
}
}
pub fn get_actual_http_rpc_port() -> anyhow::Result<u16> {
ACTUAL_HTTP_RPC_PORT.get().copied().ok_or_else(|| {
tracing::error!(
"HTTP RPC port not set - request_plane_server() must be called before get_actual_http_rpc_port()"
);
anyhow::anyhow!(
"HTTP RPC port not initialized. This is not expected."
)
})
}
fn set_actual_http_rpc_port(port: u16) {
if let Err(existing) = ACTUAL_HTTP_RPC_PORT.set(port) {
tracing::warn!(
existing_port = existing,
new_port = port,
"HTTP RPC port already set, ignoring new value"
);
}
}
#[derive(Clone)]
struct NetworkConfig {
http_host: String,
http_port: Option<u16>,
http_rpc_root: String,
tcp_host: String,
tcp_port: Option<u16>,
http_client_config: super::egress::http_router::Http2Config,
tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
nats_client: Option<async_nats::Client>,
}
impl NetworkConfig {
fn from_env(nats_client: Option<async_nats::Client>) -> Self {
Self {
http_host: std::env::var("DYN_HTTP_RPC_HOST")
.unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
http_port: std::env::var("DYN_HTTP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok()),
http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
.unwrap_or_else(|_| "/v1/rpc".to_string()),
tcp_host: std::env::var("DYN_TCP_RPC_HOST")
.unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
tcp_port: std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok()),
http_client_config: super::egress::http_router::Http2Config::from_env(),
tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
nats_client,
}
}
}
pub struct NetworkManager {
mode: RequestPlaneMode,
config: NetworkConfig,
server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
cancellation_token: CancellationToken,
component_registry: crate::component::Registry,
}
impl NetworkManager {
pub fn new(
cancellation_token: CancellationToken,
nats_client: Option<async_nats::Client>,
component_registry: crate::component::Registry,
mode: RequestPlaneMode,
) -> Self {
let config = NetworkConfig::from_env(nats_client);
match mode {
RequestPlaneMode::Http => {
let port_display = config
.http_port
.map(|p| p.to_string())
.unwrap_or_else(|| "OS-assigned".to_string());
tracing::info!(
%mode,
host = %config.http_host,
port = %port_display,
rpc_root = %config.http_rpc_root,
"Initializing NetworkManager with HTTP request plane"
);
}
RequestPlaneMode::Tcp => {
let port_display = config
.tcp_port
.map(|p| p.to_string())
.unwrap_or_else(|| "OS-assigned".to_string());
tracing::info!(
%mode,
host = %config.tcp_host,
port = %port_display,
"Initializing NetworkManager with TCP request plane"
);
}
RequestPlaneMode::Nats => {
tracing::info!(
%mode,
"Initializing NetworkManager with NATS request plane"
);
}
}
Self {
mode,
config,
server: Arc::new(OnceCell::new()),
cancellation_token,
component_registry,
}
}
pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
let server = self
.server
.get_or_try_init(async { self.create_server().await })
.await?;
Ok(server.clone())
}
pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
match self.mode {
RequestPlaneMode::Http => self.create_http_client(),
RequestPlaneMode::Tcp => self.create_tcp_client(),
RequestPlaneMode::Nats => self.create_nats_client(),
}
}
pub fn mode(&self) -> RequestPlaneMode {
self.mode
}
async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
match self.mode {
RequestPlaneMode::Http => self.create_http_server().await,
RequestPlaneMode::Tcp => self.create_tcp_server().await,
RequestPlaneMode::Nats => self.create_nats_server().await,
}
}
async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::http_endpoint::SharedHttpServer;
let server = GLOBAL_HTTP_SERVER
.get_or_try_init(|| async {
let port = self.config.http_port.unwrap_or(0);
let bind_addr = format!("{}:{}", self.config.http_host, port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
port_source = if self.config.http_port.is_some() { "DYN_HTTP_RPC_PORT" } else { "OS-assigned" },
rpc_root = %self.config.http_rpc_root,
"Creating HTTP request plane server"
);
let server = SharedHttpServer::new(bind_addr, GLOBAL_HTTP_SERVER_TOKEN.clone());
let actual_addr = server.clone().bind_and_start().await?;
set_actual_http_rpc_port(actual_addr.port());
tracing::info!(
actual_addr = %actual_addr,
actual_port = actual_addr.port(),
"HTTP request plane server started"
);
Ok::<_, anyhow::Error>(server)
})
.await?;
Ok(server.clone() as Arc<dyn RequestPlaneServer>)
}
async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
let server = GLOBAL_TCP_SERVER
.get_or_try_init(|| async {
let port = self.config.tcp_port.unwrap_or(0);
let bind_addr = format!("{}:{}", self.config.tcp_host, port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
port_source = if self.config.tcp_port.is_some() { "DYN_TCP_RPC_PORT" } else { "OS-assigned" },
"Creating TCP request plane server"
);
let server = SharedTcpServer::new(bind_addr, GLOBAL_TCP_SERVER_TOKEN.clone());
let actual_addr = server.clone().bind_and_start().await?;
set_actual_tcp_rpc_port(actual_addr.port());
tracing::info!(
actual_addr = %actual_addr,
actual_port = actual_addr.port(),
"TCP request plane server started"
);
Ok::<_, anyhow::Error>(server)
})
.await?;
Ok(server.clone() as Arc<dyn RequestPlaneServer>)
}
async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::nats_server::NatsMultiplexedServer;
let nats_client = self
.config
.nats_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
tracing::info!("Creating NATS request plane server");
Ok(NatsMultiplexedServer::new(
nats_client.clone(),
self.component_registry.clone(),
self.cancellation_token.clone(),
) as Arc<dyn RequestPlaneServer>)
}
fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::http_router::HttpRequestClient;
tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
Ok(Arc::new(HttpRequestClient::with_config(
self.config.http_client_config.clone(),
)?))
}
fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::tcp_client::TcpRequestClient;
tracing::debug!("Creating TCP request plane client with config from NetworkManager");
Ok(Arc::new(TcpRequestClient::with_config(
self.config.tcp_client_config.clone(),
)?))
}
fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::nats_client::NatsRequestClient;
let nats_client = self
.config
.nats_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
tracing::debug!("Creating NATS request plane client");
Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
}
}