use crate::component::Component;
use crate::pipeline::PipelineError;
use crate::storage::key_value_store::{
EtcdStore, KeyValueStore, KeyValueStoreEnum, KeyValueStoreManager, KeyValueStoreSelect,
MemoryStore,
};
use crate::transports::nats::DRTNatsClientPrometheusMetrics;
use crate::{
component::{self, ComponentBuilder, Endpoint, InstanceSource, Namespace},
discovery::Discovery,
metrics::PrometheusUpdateCallback,
metrics::{MetricsHierarchy, MetricsRegistry},
service::ServiceClient,
transports::{etcd, nats, tcp},
};
use crate::{discovery, system_status_server, transports};
use super::utils::GracefulShutdownTracker;
use crate::SystemHealth;
use crate::runtime::Runtime;
use async_once_cell::OnceCell;
use std::sync::{Arc, OnceLock, Weak};
use anyhow::Result;
use derive_getters::Dissolve;
use figment::error;
use std::collections::HashMap;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct DistributedRuntime {
runtime: Runtime,
etcd_client: Option<transports::etcd::Client>,
nats_client: Option<transports::nats::Client>,
store: KeyValueStoreManager,
tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
network_manager: Arc<OnceCell<Arc<crate::pipeline::network::manager::NetworkManager>>>,
system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
discovery_client: Arc<dyn discovery::Discovery>,
discovery_metadata: Option<Arc<tokio::sync::RwLock<discovery::DiscoveryMetadata>>>,
component_registry: component::Registry,
is_static: bool,
instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
system_health: Arc<parking_lot::Mutex<SystemHealth>>,
metrics_registry: MetricsRegistry,
}
impl MetricsHierarchy for DistributedRuntime {
fn basename(&self) -> String {
"".to_string() }
fn parent_hierarchies(&self) -> Vec<&dyn MetricsHierarchy> {
vec![] }
fn get_metrics_registry(&self) -> &MetricsRegistry {
&self.metrics_registry
}
}
impl std::fmt::Debug for DistributedRuntime {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "DistributedRuntime")
}
}
impl DistributedRuntime {
pub async fn new(runtime: Runtime, config: DistributedConfig) -> Result<Self> {
let (selected_kv_store, nats_config, is_static) = config.dissolve();
let runtime_clone = runtime.clone();
let (etcd_client, store) = match (is_static, selected_kv_store) {
(false, KeyValueStoreSelect::Etcd(etcd_config)) => {
let etcd_client = etcd::Client::new(*etcd_config, runtime_clone).await.inspect_err(|err|
tracing::error!(%err, "Could not connect to etcd. Pass `--store-kv ..` to use a different backend or start etcd."))?;
let store = KeyValueStoreManager::etcd(etcd_client.clone());
(Some(etcd_client), store)
}
(false, KeyValueStoreSelect::File(root)) => (None, KeyValueStoreManager::file(root)),
(true, _) | (false, KeyValueStoreSelect::Memory) => {
(None, KeyValueStoreManager::memory())
}
};
let nats_client = Some(nats_config.clone().connect().await?);
let config = crate::config::RuntimeConfig::from_settings().unwrap_or_default();
let cancel_token = if config.system_server_enabled() {
Some(runtime.clone().child_token())
} else {
None
};
let starting_health_status = config.starting_health_status.clone();
let use_endpoint_health_status = config.use_endpoint_health_status.clone();
let health_endpoint_path = config.system_health_path.clone();
let live_endpoint_path = config.system_live_path.clone();
let system_health = Arc::new(parking_lot::Mutex::new(SystemHealth::new(
starting_health_status,
use_endpoint_health_status,
health_endpoint_path,
live_endpoint_path,
)));
let nats_client_for_metrics = nats_client.clone();
let discovery_backend =
std::env::var("DYN_DISCOVERY_BACKEND").unwrap_or_else(|_| "kv_store".to_string());
let (discovery_client, discovery_metadata) = match discovery_backend.as_str() {
"kubernetes" => {
tracing::info!("Initializing Kubernetes discovery backend");
let metadata = Arc::new(tokio::sync::RwLock::new(
crate::discovery::DiscoveryMetadata::new(),
));
let client = crate::discovery::KubeDiscoveryClient::new(
metadata.clone(),
runtime.primary_token(),
)
.await
.inspect_err(
|err| tracing::error!(%err, "Failed to initialize Kubernetes discovery client"),
)?;
(Arc::new(client) as Arc<dyn Discovery>, Some(metadata))
}
_ => {
tracing::info!("Initializing KV store discovery backend");
use crate::discovery::KVStoreDiscovery;
(
Arc::new(KVStoreDiscovery::new(
store.clone(),
runtime.primary_token(),
)) as Arc<dyn Discovery>,
None,
)
}
};
let distributed_runtime = Self {
runtime,
etcd_client,
store,
nats_client,
tcp_server: Arc::new(OnceCell::new()),
network_manager: Arc::new(OnceCell::new()),
system_status_server: Arc::new(OnceLock::new()),
discovery_client,
discovery_metadata,
component_registry: component::Registry::new_with_static(is_static),
is_static,
instance_sources: Arc::new(Mutex::new(HashMap::new())),
metrics_registry: crate::MetricsRegistry::new(),
system_health,
};
if let Some(nats_client_for_metrics) = nats_client_for_metrics {
let nats_client_metrics = DRTNatsClientPrometheusMetrics::new(
&distributed_runtime,
nats_client_for_metrics.client().clone(),
)?;
let nats_client_callback = Arc::new({
let nats_client_clone = nats_client_metrics.clone();
move || {
nats_client_clone.set_from_client_stats();
Ok(())
}
});
distributed_runtime
.metrics_registry
.add_update_callback(nats_client_callback);
}
distributed_runtime
.system_health
.lock()
.initialize_uptime_gauge(&distributed_runtime)?;
if let Some(cancel_token) = cancel_token {
let host = config.system_host.clone();
let port = config.system_port as u16;
match crate::system_status_server::spawn_system_status_server(
&host,
port,
cancel_token,
Arc::new(distributed_runtime.clone()),
distributed_runtime.discovery_metadata.clone(),
)
.await
{
Ok((addr, handle)) => {
tracing::info!("System status server started successfully on {}", addr);
let system_status_server_info =
crate::system_status_server::SystemStatusServerInfo::new(
addr,
Some(handle),
);
distributed_runtime
.system_status_server
.set(Arc::new(system_status_server_info))
.expect("System status server info should only be set once");
}
Err(e) => {
tracing::error!("System status server startup failed: {}", e);
}
}
} else {
tracing::debug!(
"System status server HTTP endpoints disabled, but uptime metrics are being tracked"
);
}
if config.health_check_enabled {
let health_check_config = crate::health_check::HealthCheckConfig {
canary_wait_time: std::time::Duration::from_secs(config.canary_wait_time_secs),
request_timeout: std::time::Duration::from_secs(
config.health_check_request_timeout_secs,
),
};
match crate::health_check::start_health_check_manager(
distributed_runtime.clone(),
Some(health_check_config),
)
.await
{
Ok(()) => tracing::info!(
"Health check manager started (canary_wait_time: {}s, request_timeout: {}s)",
config.canary_wait_time_secs,
config.health_check_request_timeout_secs
),
Err(e) => tracing::error!("Health check manager failed to start: {}", e),
}
}
Ok(distributed_runtime)
}
pub async fn from_settings(runtime: Runtime) -> Result<Self> {
let config = DistributedConfig::from_settings(false);
Self::new(runtime, config).await
}
pub async fn from_settings_without_discovery(runtime: Runtime) -> Result<Self> {
let config = DistributedConfig::from_settings(true);
Self::new(runtime, config).await
}
pub fn runtime(&self) -> &Runtime {
&self.runtime
}
pub fn primary_token(&self) -> CancellationToken {
self.runtime.primary_token()
}
pub fn component_registry(&self) -> &component::Registry {
&self.component_registry
}
pub fn system_health(&self) -> Arc<parking_lot::Mutex<SystemHealth>> {
self.system_health.clone()
}
pub fn connection_id(&self) -> u64 {
self.discovery_client.instance_id()
}
pub fn shutdown(&self) {
self.runtime.shutdown();
self.store.shutdown();
}
pub fn namespace(&self, name: impl Into<String>) -> Result<Namespace> {
Namespace::new(self.clone(), name.into(), self.is_static)
}
pub fn discovery(&self) -> Arc<dyn Discovery> {
self.discovery_client.clone()
}
pub(crate) fn service_client(&self) -> Option<ServiceClient> {
self.nats_client().map(|nc| ServiceClient::new(nc.clone()))
}
pub async fn tcp_server(&self) -> Result<Arc<tcp::server::TcpStreamServer>> {
Ok(self
.tcp_server
.get_or_try_init(async move {
let options = tcp::server::ServerOptions::default();
let server = tcp::server::TcpStreamServer::new(options).await?;
Ok::<_, PipelineError>(server)
})
.await?
.clone())
}
pub async fn network_manager(
&self,
) -> Result<Arc<crate::pipeline::network::manager::NetworkManager>> {
use crate::pipeline::network::manager::NetworkManager;
let manager = self
.network_manager
.get_or_try_init(async {
let nats_client = self.nats_client().map(|c| c.client().clone());
anyhow::Ok(NetworkManager::new(
self.child_token(),
nats_client,
self.component_registry.clone(),
))
})
.await?;
Ok(manager.clone())
}
pub async fn request_plane_server(
&self,
) -> Result<Arc<dyn crate::pipeline::network::ingress::unified_server::RequestPlaneServer>>
{
let manager = self.network_manager().await?;
manager.server().await
}
#[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
pub async fn http_server(
&self,
) -> Result<Arc<crate::pipeline::network::ingress::http_endpoint::SharedHttpServer>> {
let _server = self.request_plane_server().await?;
anyhow::bail!(
"http_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
)
}
#[deprecated(note = "Use request_plane_server() or network_manager().server() instead")]
pub async fn shared_tcp_server(
&self,
) -> Result<Arc<crate::pipeline::network::ingress::shared_tcp_endpoint::SharedTcpServer>> {
let _server = self.request_plane_server().await?;
anyhow::bail!(
"shared_tcp_server() is deprecated. Use request_plane_server() instead, which returns a trait object that works with all transport types."
)
}
pub fn nats_client(&self) -> Option<&nats::Client> {
self.nats_client.as_ref()
}
pub fn system_status_server_info(
&self,
) -> Option<Arc<crate::system_status_server::SystemStatusServerInfo>> {
self.system_status_server.get().cloned()
}
pub fn etcd_client(&self) -> Option<etcd::Client> {
self.etcd_client.clone()
}
pub fn store(&self) -> &KeyValueStoreManager {
&self.store
}
pub fn child_token(&self) -> CancellationToken {
self.runtime.child_token()
}
pub(crate) fn graceful_shutdown_tracker(&self) -> Arc<GracefulShutdownTracker> {
self.runtime.graceful_shutdown_tracker()
}
pub fn instance_sources(&self) -> Arc<Mutex<HashMap<Endpoint, Weak<InstanceSource>>>> {
self.instance_sources.clone()
}
}
#[derive(Dissolve)]
pub struct DistributedConfig {
pub store_backend: KeyValueStoreSelect,
pub nats_config: nats::ClientOptions,
pub is_static: bool,
}
impl DistributedConfig {
pub fn from_settings(is_static: bool) -> DistributedConfig {
DistributedConfig {
store_backend: KeyValueStoreSelect::Etcd(Box::default()),
nats_config: nats::ClientOptions::default(),
is_static,
}
}
pub fn for_cli() -> DistributedConfig {
let etcd_config = etcd::ClientOptions {
attach_lease: false,
..Default::default()
};
DistributedConfig {
store_backend: KeyValueStoreSelect::Etcd(Box::new(etcd_config)),
nats_config: nats::ClientOptions::default(),
is_static: false,
}
}
}
pub mod distributed_test_utils {
#[cfg(feature = "integration")]
pub async fn create_test_drt_async() -> crate::DistributedRuntime {
let rt = crate::Runtime::from_current().unwrap();
crate::DistributedRuntime::from_settings_without_discovery(rt)
.await
.unwrap()
}
}
#[cfg(all(test, feature = "integration"))]
mod tests {
use super::distributed_test_utils::create_test_drt_async;
#[tokio::test]
async fn test_drt_uptime_after_delay_system_disabled() {
temp_env::async_with_vars([("DYN_SYSTEM_PORT", None::<&str>)], async {
let drt = create_test_drt_async().await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let uptime = drt.system_health.lock().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);
println!(
"✓ DRT uptime test passed (system disabled): uptime = {:?}",
uptime
);
})
.await;
}
#[tokio::test]
async fn test_drt_uptime_after_delay_system_enabled() {
temp_env::async_with_vars([("DYN_SYSTEM_PORT", Some("8081"))], async {
let drt = create_test_drt_async().await;
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
let uptime = drt.system_health.lock().uptime();
assert!(
uptime >= std::time::Duration::from_millis(50),
"Expected uptime to be at least 50ms, but got {:?}",
uptime
);
println!(
"✓ DRT uptime test passed (system enabled): uptime = {:?}",
uptime
);
})
.await;
}
}