1#![allow(dead_code)]
7#![allow(unused_imports)]
8
9use std::{
10 collections::HashMap,
11 sync::{Arc, OnceLock, Weak},
12};
13
14pub use anyhow::{
15 Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
16};
17
18use async_once_cell::OnceCell;
19
20mod config;
21pub use config::RuntimeConfig;
22
23pub mod component;
24pub mod discovery;
25pub mod engine;
26pub mod health_check;
27pub mod system_status_server;
28pub use system_status_server::SystemStatusServerInfo;
29pub mod instances;
30pub mod logging;
31pub mod metrics;
32pub mod pipeline;
33pub mod prelude;
34pub mod protocols;
35pub mod runnable;
36pub mod runtime;
37pub mod service;
38pub mod slug;
39pub mod storage;
40pub mod system_health;
41pub mod traits;
42pub mod transports;
43pub mod utils;
44pub mod worker;
45
46pub mod distributed;
47pub use distributed::distributed_test_utils;
48pub use futures::stream;
49pub use system_health::{HealthCheckTarget, SystemHealth};
50pub use tokio_util::sync::CancellationToken;
51pub use worker::Worker;
52
53use crate::metrics::prometheus_names::distributed_runtime;
54
55use component::{Endpoint, InstanceSource};
56use utils::GracefulShutdownTracker;
57
58use config::HealthStatus;
59
60#[derive(Clone)]
62enum RuntimeType {
63 Shared(Arc<tokio::runtime::Runtime>),
64 External(tokio::runtime::Handle),
65}
66
67#[derive(Debug, Clone)]
69pub struct Runtime {
70 id: Arc<String>,
71 primary: RuntimeType,
72 secondary: RuntimeType,
73 cancellation_token: CancellationToken,
74 endpoint_shutdown_token: CancellationToken,
75 graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
76}
77
78type RuntimeCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
87
88pub struct MetricsRegistryEntry {
90 pub prometheus_registry: prometheus::Registry,
92 pub runtime_callbacks: Vec<RuntimeCallback>,
94}
95
96impl MetricsRegistryEntry {
97 pub fn new() -> Self {
99 Self {
100 prometheus_registry: prometheus::Registry::new(),
101 runtime_callbacks: Vec::new(),
102 }
103 }
104
105 pub fn add_callback(&mut self, callback: RuntimeCallback) {
107 self.runtime_callbacks.push(callback);
108 }
109
110 pub fn execute_callbacks(&self) -> Vec<anyhow::Result<()>> {
112 self.runtime_callbacks
113 .iter()
114 .map(|callback| callback())
115 .collect()
116 }
117
118 pub fn has_metric_named(&self, metric_name: &str) -> bool {
120 self.prometheus_registry
121 .gather()
122 .iter()
123 .any(|mf| mf.name() == metric_name)
124 }
125}
126
127impl Default for MetricsRegistryEntry {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133impl Clone for MetricsRegistryEntry {
134 fn clone(&self) -> Self {
135 Self {
136 prometheus_registry: self.prometheus_registry.clone(),
137 runtime_callbacks: Vec::new(), }
139 }
140}
141
142#[derive(Clone)]
145pub struct DistributedRuntime {
146 runtime: Runtime,
148
149 etcd_client: Option<transports::etcd::Client>,
151 nats_client: transports::nats::Client,
152 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
153 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
154
155 component_registry: component::Registry,
161
162 is_static: bool,
165
166 instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
167
168 system_health: Arc<std::sync::Mutex<SystemHealth>>,
170
171 hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
174}