use super::egress::unified_client::RequestPlaneClient;
use super::ingress::unified_server::RequestPlaneServer;
use crate::config::RequestPlaneMode;
use anyhow::Result;
use async_once_cell::OnceCell;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
struct NetworkConfig {
http_host: String,
http_port: u16,
http_rpc_root: String,
tcp_host: String,
tcp_port: u16,
http_client_config: super::egress::http_router::Http2Config,
tcp_client_config: super::egress::tcp_client::TcpRequestConfig,
nats_client: Option<async_nats::Client>,
}
impl NetworkConfig {
fn from_env(nats_client: Option<async_nats::Client>) -> Self {
Self {
http_host: std::env::var("DYN_HTTP_RPC_HOST")
.unwrap_or_else(|_| crate::utils::get_http_rpc_host_from_env()),
http_port: std::env::var("DYN_HTTP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(8888),
http_rpc_root: std::env::var("DYN_HTTP_RPC_ROOT_PATH")
.unwrap_or_else(|_| "/v1/rpc".to_string()),
tcp_host: std::env::var("DYN_TCP_RPC_HOST")
.unwrap_or_else(|_| crate::utils::get_tcp_rpc_host_from_env()),
tcp_port: std::env::var("DYN_TCP_RPC_PORT")
.ok()
.and_then(|p| p.parse().ok())
.unwrap_or(9999),
http_client_config: super::egress::http_router::Http2Config::from_env(),
tcp_client_config: super::egress::tcp_client::TcpRequestConfig::from_env(),
nats_client,
}
}
}
pub struct NetworkManager {
mode: RequestPlaneMode,
config: NetworkConfig,
server: Arc<OnceCell<Arc<dyn RequestPlaneServer>>>,
cancellation_token: CancellationToken,
component_registry: crate::component::Registry,
}
impl NetworkManager {
pub fn new(
cancellation_token: CancellationToken,
nats_client: Option<async_nats::Client>,
component_registry: crate::component::Registry,
) -> Arc<Self> {
let mode = RequestPlaneMode::get();
let config = NetworkConfig::from_env(nats_client);
tracing::info!(
mode = %mode,
http_port = config.http_port,
tcp_port = config.tcp_port,
"Initializing NetworkManager"
);
Arc::new(Self {
mode,
config,
server: Arc::new(OnceCell::new()),
cancellation_token,
component_registry,
})
}
pub async fn server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
let server = self
.server
.get_or_try_init(async { self.create_server().await })
.await?;
Ok(server.clone())
}
pub fn create_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
match self.mode {
RequestPlaneMode::Http => self.create_http_client(),
RequestPlaneMode::Tcp => self.create_tcp_client(),
RequestPlaneMode::Nats => self.create_nats_client(),
}
}
pub fn mode(&self) -> RequestPlaneMode {
self.mode
}
async fn create_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
match self.mode {
RequestPlaneMode::Http => self.create_http_server().await,
RequestPlaneMode::Tcp => self.create_tcp_server().await,
RequestPlaneMode::Nats => self.create_nats_server().await,
}
}
async fn create_http_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::http_endpoint::SharedHttpServer;
let bind_addr = format!("{}:{}", self.config.http_host, self.config.http_port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid HTTP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
rpc_root = %self.config.http_rpc_root,
"Creating HTTP request plane server"
);
let server = SharedHttpServer::new(bind_addr, self.cancellation_token.clone());
let server_clone = server.clone();
tokio::spawn(async move {
if let Err(e) = server_clone.start().await {
tracing::error!("HTTP request plane server error: {}", e);
}
});
Ok(server as Arc<dyn RequestPlaneServer>)
}
async fn create_tcp_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::shared_tcp_endpoint::SharedTcpServer;
let bind_addr = format!("{}:{}", self.config.tcp_host, self.config.tcp_port)
.parse()
.map_err(|e| anyhow::anyhow!("Invalid TCP bind address: {}", e))?;
tracing::info!(
bind_addr = %bind_addr,
"Creating TCP request plane server"
);
let server = SharedTcpServer::new(bind_addr, self.cancellation_token.clone());
let server_clone = server.clone();
tokio::spawn(async move {
if let Err(e) = server_clone.start().await {
tracing::error!("TCP request plane server error: {}", e);
}
});
Ok(server as Arc<dyn RequestPlaneServer>)
}
async fn create_nats_server(&self) -> Result<Arc<dyn RequestPlaneServer>> {
use super::ingress::nats_server::NatsMultiplexedServer;
let nats_client = self
.config
.nats_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
tracing::info!("Creating NATS request plane server");
Ok(NatsMultiplexedServer::new(
nats_client.clone(),
self.component_registry.clone(),
self.cancellation_token.clone(),
) as Arc<dyn RequestPlaneServer>)
}
fn create_http_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::http_router::HttpRequestClient;
tracing::debug!("Creating HTTP request plane client with config from NetworkManager");
Ok(Arc::new(HttpRequestClient::with_config(
self.config.http_client_config.clone(),
)?))
}
fn create_tcp_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::tcp_client::TcpRequestClient;
tracing::debug!("Creating TCP request plane client with config from NetworkManager");
Ok(Arc::new(TcpRequestClient::with_config(
self.config.tcp_client_config.clone(),
)?))
}
fn create_nats_client(&self) -> Result<Arc<dyn RequestPlaneClient>> {
use super::egress::nats_client::NatsRequestClient;
let nats_client = self
.config
.nats_client
.as_ref()
.ok_or_else(|| anyhow::anyhow!("NATS client required for NATS mode"))?;
tracing::debug!("Creating NATS request plane client");
Ok(Arc::new(NatsRequestClient::new(nats_client.clone())))
}
}