use crate::component::{Component, Instance};
use crate::pipeline::PipelineError;
use crate::pipeline::network::manager::NetworkManager;
use crate::service::{ServiceClient, ServiceSet};
use crate::storage::kv;
use crate::{
component::{self, ComponentBuilder, Endpoint, Namespace},
discovery::Discovery,
metrics::PrometheusUpdateCallback,
metrics::{MetricsHierarchy, MetricsRegistry},
transports::{etcd, nats, tcp},
};
use crate::{discovery, system_status_server, transports};
use super::utils::GracefulShutdownTracker;
use crate::SystemHealth;
use crate::runtime::Runtime;
use async_once_cell::OnceCell;
use std::fmt;
use std::sync::{Arc, OnceLock, Weak};
use std::time::Duration;
use tokio::sync::watch::Receiver;
use anyhow::Result;
use derive_getters::Dissolve;
use figment::error;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
type InstanceMap = HashMap<Endpoint, Weak<Receiver<Vec<Instance>>>>;
#[derive(Clone)]
pub struct DistributedRuntime {
runtime: Runtime,
nats_client: Option<transports::nats::Client>,
network_manager: Arc<NetworkManager>,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
request_plane: RequestPlaneMode,
discovery_client: Arc<dyn discovery::Discovery>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
component_registry: component::Registry,
instance_sources: Arc<tokio::sync::Mutex<InstanceMap>>,
system_health: Arc<parking_lot::Mutex<SystemHealth>>,
local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry,
metrics_registry: MetricsRegistry,
engine_routes: crate::engine_routes::EngineRouteRegistry,
}
impl MetricsHierarchy for DistributedRuntime {
fn basename(&self) -> String {
"".to_string() }
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
vec![] }
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
impl std::fmt::Debug for DistributedRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DistributedRuntime")
}
}
impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (discovery_backend, nats_config, request_plane) = config.dissolve();
let nats_client = match nats_config {
Some(nc) => Some(nc.connect().await?),
None => None,
};
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
let cancel_token = if config.system_server_enabled() {
Some(runtime.clone().child_token())
} else {
None
};
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let health_endpoint_path = config.system_health_path.clone();
let live_endpoint_path = config.system_live_path.clone();
let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
health_endpoint_path,
live_endpoint_path,
)));
let (discovery_client, discovery_metadata) = match discovery_backend {
DiscoveryBackend::Kubernetes => {
tracing::info!("Initializing Kubernetes discovery backend");
let metadata = Arc::new(tokio::sync::RwLock::new(
crate::discovery::DiscoveryMetadata::new(),
));
let client = crate::discovery::KubeDiscoveryClient::new(
metadata.clone(),
runtime.primary_token(),
)
.await
.inspect_err(
|err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
)?;
(Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
}
DiscoveryBackend::KvStore(kv_selector) => {
tracing::info!("Initializing KV store discovery backend: {}", kv_selector);
let runtime_clone = runtime.clone();
let store = match kv_selector {
kv::Selector::Etcd(etcd_config) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
tracing::error!(%err, "Could not connect to etcd. Pass `--discovery-backend ..` to use a different backend or start etcd."))?;
kv::Manager::etcd(etcd_client)
}
kv::Selector::File(root) => kv::Manager::file(runtime.primary_token(), root),
kv::Selector::Memory => kv::Manager::memory(),
};
use crate::discovery::KVStoreDiscovery;
(
Arc::new(KVStoreDiscovery::new(store, runtime.primary_token()))
as Arc<dyn Discovery>,
None,
)
}
};
let component_registry = component::Registry::new();
let network_manager = NetworkManager::new(
runtime.child_token(),
nats_client.clone().map(|c| c.client().clone()),
component_registry.clone(),
request_plane,
);
let distributed_runtime = Self {
runtime,
network_manager: Arc::new(network_manager),
nats_client,
tcp_server: Arc::new(OnceCell::new()),
system_status_server: Arc::new(OnceLock::new()),
discovery_client,
discovery_metadata,
component_registry,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
metrics_registry: crate::MetricsRegistry::new(),
system_health,
request_plane,
local_endpoint_registry: crate::local_endpoint_registry::LocalEndpointRegistry::new(),
engine_routes: crate::engine_routes::EngineRouteRegistry::new(),
};
distributed_runtime
.system_health
.lock()
.initialize_uptime_gauge(&distributed_runtime)?;
{
let system_health = distributed_runtime.system_health.clone();
distributed_runtime
.metrics_registry
.add_update_callback(std::sync::Arc::new(move || {
system_health.lock().update_uptime_gauge();
Ok(())
}));
}
if let Some(cancel_token) = cancel_token {
let host = config.system_host.clone();
let port = config.system_port as u16;
match crate::system_status_server::spawn_system_status_server(
&host,
port,
cancel_token,
Arc::new(distributed_runtime.clone()),
distributed_runtime.discovery_metadata.clone(),
)
.await
{
Ok((addr, handle)) => {
tracing::info!("System status server started successfully on {}", addr);
let system_status_server_info =
crate::system_status_server::SystemStatusServerInfo::new(
addr,
Some(handle),
);
distributed_runtime
.system_status_server
.set(Arc::new(system_status_server_info))
.expect("System status server info should only be set once");
}
Err(e) => {
tracing::error!("System status server startup failed: {}", e);
}
}
} else {
tracing::debug!(
"System status server HTTP endpoints disabled, but uptime metrics are being tracked"
);
}
if config.health_check_enabled {
let health_check_config = crate::health_check::HealthCheckConfig {
canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
request_timeout: std::time::Duration::from_secs(
config.health_check_request_timeout_secs,
),
};
match crate::health_check::start_health_check_manager(
distributed_runtime.clone(),
Some(health_check_config),
)
.await
{
Ok(()) => tracing::info!(
"Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
config.canary_wait_time_secs,
config.health_check_request_timeout_secs
),
Err(e) => tracing::error!("Health check manager failed to start: {}", e),
}
}
Ok(distributed_runtime)
}
pub async fn from_settings(runtime: Runtime) -> Result<Self> {
let config = DistributedConfig::from_settings();
Self::new(runtime, config).await
}
pub fn runtime(&self) -> &Runtime {
&self.runtime
}
pub fn primary_token(&self) -> CancellationToken {
self.runtime.primary_token()
}
pub fn component_registry(&self) -> &component::Registry {
&self.component_registry
}
pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
self.system_health.clone()
}
pub fn local_endpoint_registry(
&self,
) -> &crate::local_endpoint_registry::LocalEndpointRegistry {
&self.local_endpoint_registry
}
pub fn engine_routes(&self) -> &crate::engine_routes::EngineRouteRegistry {
&self.engine_routes
}
pub fn connection_id(&self) -> u64 {
self.discovery_client.instance_id()
}
pub fn shutdown(&self) {
self.runtime.shutdown();
self.discovery_client.shutdown();
}
pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
Namespace::new(self.clone(), name.into())
}
pub fn discovery(&self) -> Arc<dyn Discovery> {
self.discovery_client.clone()
}
pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ok(self
.tcp_server
.get_or_try_init(async move {
let options = tcp::server::ServerOptions::default();
let server = tcp::server::TcpStreamServer::new(options).await?;
Ok::<_, PipelineError>(server)
})
.await?
.clone())
}
pub fn network_manager(&self) -> Arc<NetworkManager> {
self.network_manager.clone()
}
pub async fn request_plane_server(
&self,
) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
{
self.network_manager().server().await
}
pub fn system_status_server_info(
&self,
) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
self.system_status_server.get().cloned()
}
pub fn request_plane(&self) -> RequestPlaneMode {
self.request_plane
}
pub fn child_token(&self) -> CancellationToken {
self.runtime.child_token()
}
pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
self.runtime.graceful_shutdown_tracker()
}
pub fn instance_sources(&self) -> Arc<Mutex<InstanceMap>> {
self.instance_sources.clone()
}
pub async fn kv_router_nats_publish(
&self,
subject: String,
payload: bytes::Bytes,
) -> anyhow::Result<()> {
let Some(nats_client) = self.nats_client.as_ref() else {
tracing::trace!("Skipping NATS publish (NATS not configured): {}", subject);
return Ok(());
};
Ok(nats_client.client().publish(subject, payload).await?)
}
pub(crate) async fn kv_router_nats_subscribe(
&self,
subject: String,
) -> Result<async_nats::Subscriber> {
let Some(nats_client) = self.nats_client.as_ref() else {
anyhow::bail!("KV router's EventSubscriber requires NATS");
};
Ok(nats_client.client().subscribe(subject).await?)
}
pub async fn kv_router_nats_request(
&self,
subject: String,
payload: bytes::Bytes,
timeout: std::time::Duration,
) -> anyhow::Result<async_nats::Message> {
let Some(nats_client) = self.nats_client.as_ref() else {
anyhow::bail!("KV router's request requires NATS");
};
let response =
tokio::time::timeout(timeout, nats_client.client().request(subject, payload))
.await
.map_err(|_| anyhow::anyhow!("Request timed out after {:?}", timeout))??;
Ok(response)
}
pub fn register_nats_service(
&self,
component: Component,
) -> tokio::sync::mpsc::Receiver<Result<(), String>> {
let (tx, rx) = tokio::sync::mpsc::channel::<Result<(), String>>(1);
let drt = self.clone();
self.runtime().secondary().spawn(async move {
let service_name = component.service_name();
if drt
.component_registry()
.inner
.lock()
.await
.services
.contains_key(&service_name)
{
tracing::trace!("Service {service_name} already exists");
let _ = tx.send(Ok(())).await;
return;
}
let Some(nats_client) = drt.nats_client.as_ref() else {
tracing::error!("Cannot create NATS service without NATS.");
let _ = tx
.send(Err("Cannot create NATS service without NATS".to_string()))
.await;
return;
};
let description = None;
let nats_service = match crate::component::service::build_nats_service(
nats_client,
&component,
description,
)
.await
{
Ok(service) => service,
Err(err) => {
tracing::error!(error = %err, component = service_name, "Failed to build NATS service");
let _ = tx.send(Err(format!("Failed to build NATS service: {err}"))).await;
return;
}
};
let mut guard = drt.component_registry().inner.lock().await;
if !guard.services.contains_key(&service_name) {
guard.services.insert(service_name.clone(), nats_service);
tracing::info!("Added NATS service {service_name}");
drop(guard);
} else {
drop(guard);
let _ = nats_service.stop().await;
}
let _ = tx.send(Ok(())).await;
});
rx
}
}
#[derive(Clone, Debug)]
pub enum DiscoveryBackend {
Kubernetes,
KvStore(kv::Selector),
}
#[derive(Dissolve)]
pub struct DistributedConfig {
pub discovery_backend: DiscoveryBackend,
pub nats_config: Option<nats::ClientOptions>,
pub request_plane: RequestPlaneMode,
}
impl DistributedConfig {
pub fn from_settings() -> DistributedConfig {
let request_plane = RequestPlaneMode::from_env();
let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
let backend_str =
std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "etcd".to_string());
let discovery_backend = match backend_str.as_str() {
"kubernetes" => {
tracing::info!("Using Kubernetes discovery backend");
DiscoveryBackend::Kubernetes
}
other => {
let selector: kv::Selector = other.parse().unwrap_or_else(|_| {
panic!(
"Unknown DYN_DISCOVERY_BACKEND value: '{other}'. \
Valid options: kubernetes, etcd, file, mem"
)
});
DiscoveryBackend::KvStore(selector)
}
};
DistributedConfig {
discovery_backend,
nats_config: if nats_enabled {
Some(nats::ClientOptions::default())
} else {
None
},
request_plane,
}
}
pub fn for_cli() -> DistributedConfig {
let etcd_config = etcd::ClientOptions {
attach_lease: false,
..Default::default()
};
let request_plane = RequestPlaneMode::from_env();
let nats_enabled = request_plane.is_nats()
|| std::env::var(crate::config::environment_names::nats::NATS_SERVER).is_ok();
DistributedConfig {
discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Etcd(Box::new(etcd_config))),
nats_config: if nats_enabled {
Some(nats::ClientOptions::default())
} else {
None
},
request_plane,
}
}
pub fn process_local() -> DistributedConfig {
DistributedConfig {
discovery_backend: DiscoveryBackend::KvStore(kv::Selector::Memory),
nats_config: None,
request_plane: RequestPlaneMode::Tcp,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RequestPlaneMode {
Nats,
Http,
Tcp,
}
impl Default for RequestPlaneMode {
fn default() -> Self {
Self::Tcp
}
}
impl fmt::Display for RequestPlaneMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Nats => write!(f, "nats"),
Self::Http => write!(f, "http"),
Self::Tcp => write!(f, "tcp"),
}
}
}
impl std::str::FromStr for RequestPlaneMode {
type Err = anyhow::Error;
fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"nats" => Ok(Self::Nats),
"http" => Ok(Self::Http),
"tcp" => Ok(Self::Tcp),
_ => Err(anyhow::anyhow!(
"Invalid request plane mode: '{}'. Valid options are: 'nats', 'http', 'tcp'",
s
)),
}
}
}
impl RequestPlaneMode {
fn from_env() -> Self {
std::env::var("DYN_REQUEST_PLANE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or_default()
}
pub fn is_nats(&self) -> bool {
matches!(self, RequestPlaneMode::Nats)
}
}
pub mod distributed_test_utils {
#[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> super::DistributedRuntime {
use crate::transports::nats;
let rt = crate::Runtime::from_current().unwrap();
let config = super::DistributedConfig {
discovery_backend: super::DiscoveryBackend::KvStore(
crate::storage::kv::Selector::Memory,
),
nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(),
};
super::DistributedRuntime::new(rt, config).await.unwrap()
}
pub async fn create_test_shared_drt_async(
store_path: &std::path::Path,
) -> super::DistributedRuntime {
use crate::transports::nats;
let rt = crate::Runtime::from_current().unwrap();
let config = super::DistributedConfig {
discovery_backend: super::DiscoveryBackend::KvStore(
crate::storage::kv::Selector::File(store_path.to_path_buf()),
),
nats_config: Some(nats::ClientOptions::default()),
request_plane: crate::distributed::RequestPlaneMode::default(),
};
super::DistributedRuntime::new(rt, config).await.unwrap()
}
}
#[cfg(all(test, feature = "integration"))]
mod tests {
use super::RequestPlaneMode;
use super::distributed_test_utils::create_test_drt_async;
#[tokio::test]
async fn test_drt_uptime_after_delay_system_disabled() {
use crate::config::environment_names::runtime::system as env_system;
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, None::<&str>)], async {
let drt = create_test_drt_async().await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let uptime = drt.system_health.lock().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);
println!(
"✓ DRT uptime test passed (system disabled): uptime = {:?}",
uptime
);
})
.await;
}
#[tokio::test]
async fn test_drt_uptime_after_delay_system_enabled() {
use crate::config::environment_names::runtime::system as env_system;
temp_env::async_with_vars([(env_system::DYN_SYSTEM_PORT, Some("8081"))], async {
let drt = create_test_drt_async().await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let uptime = drt.system_health.lock().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);
println!(
"✓ DRT uptime test passed (system enabled): uptime = {:?}",
uptime
);
})
.await;
}
#[test]
fn test_request_plane_mode_from_str() {
assert_eq!(
"nats".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Nats
);
assert_eq!(
"http".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Http
);
assert_eq!(
"tcp".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Tcp
);
assert_eq!(
"NATS".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Nats
);
assert_eq!(
"HTTP".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Http
);
assert_eq!(
"TCP".parse::<RequestPlaneMode>().unwrap(),
RequestPlaneMode::Tcp
);
assert!("invalid".parse::<RequestPlaneMode>().is_err());
}
#[test]
fn test_request_plane_mode_display() {
assert_eq!(RequestPlaneMode::Nats.to_string(), "nats");
assert_eq!(RequestPlaneMode::Http.to_string(), "http");
assert_eq!(RequestPlaneMode::Tcp.to_string(), "tcp");
}
}