1#![allow(dead_code)]
7#![allow(unused_imports)]
8
9use std::{
10 collections::HashMap,
11 sync::{Arc, OnceLock, Weak},
12};
13
14pub use anyhow::{
15 Context as ErrorContext, Error, Ok as OK, Result, anyhow as error, bail as raise,
16};
17
18use async_once_cell::OnceCell;
19
20mod config;
21pub use config::RuntimeConfig;
22
23pub mod component;
24pub mod compute;
25pub mod discovery;
26pub mod engine;
27pub mod health_check;
28pub mod system_status_server;
29pub use system_status_server::SystemStatusServerInfo;
30pub mod instances;
31pub mod logging;
32pub mod metrics;
33pub mod pipeline;
34pub mod prelude;
35pub mod protocols;
36pub mod runnable;
37pub mod runtime;
38pub mod service;
39pub mod slug;
40pub mod storage;
41pub mod system_health;
42pub mod traits;
43pub mod transports;
44pub mod utils;
45pub mod worker;
46
47pub mod distributed;
48pub use distributed::distributed_test_utils;
49pub use futures::stream;
50pub use system_health::{HealthCheckTarget, SystemHealth};
51pub use tokio_util::sync::CancellationToken;
52pub use worker::Worker;
53
54use crate::{
55 metrics::prometheus_names::distributed_runtime, storage::key_value_store::KeyValueStore,
56};
57
58use component::{Endpoint, InstanceSource};
59use utils::GracefulShutdownTracker;
60
61use config::HealthStatus;
62
63#[derive(Clone)]
65enum RuntimeType {
66 Shared(Arc<tokio::runtime::Runtime>),
67 External(tokio::runtime::Handle),
68}
69
70#[derive(Debug, Clone)]
72pub struct Runtime {
73 id: Arc<String>,
74 primary: RuntimeType,
75 secondary: RuntimeType,
76 cancellation_token: CancellationToken,
77 endpoint_shutdown_token: CancellationToken,
78 graceful_shutdown_tracker: Arc<GracefulShutdownTracker>,
79 compute_pool: Option<Arc<compute::ComputePool>>,
80 block_in_place_permits: Option<Arc<tokio::sync::Semaphore>>,
81}
82
83type PrometheusUpdateCallback = Arc<dyn Fn() -> anyhow::Result<()> + Send + Sync + 'static>;
92
93type PrometheusExpositionFormatCallback =
95 Arc<dyn Fn() -> anyhow::Result<String> + Send + Sync + 'static>;
96
97pub struct MetricsRegistryEntry {
99 pub prometheus_registry: prometheus::Registry,
101 pub prometheus_update_callbacks: Vec<PrometheusUpdateCallback>,
103 pub prometheus_expfmt_callbacks: Vec<PrometheusExpositionFormatCallback>,
105}
106
107impl MetricsRegistryEntry {
108 pub fn new() -> Self {
110 Self {
111 prometheus_registry: prometheus::Registry::new(),
112 prometheus_update_callbacks: Vec::new(),
113 prometheus_expfmt_callbacks: Vec::new(),
114 }
115 }
116
117 pub fn add_prometheus_update_callback(&mut self, callback: PrometheusUpdateCallback) {
119 self.prometheus_update_callbacks.push(callback);
120 }
121
122 pub fn add_prometheus_expfmt_callback(&mut self, callback: PrometheusExpositionFormatCallback) {
124 self.prometheus_expfmt_callbacks.push(callback);
125 }
126
127 pub fn execute_prometheus_update_callbacks(&self) -> Vec<anyhow::Result<()>> {
129 self.prometheus_update_callbacks
130 .iter()
131 .map(|callback| callback())
132 .collect()
133 }
134
135 pub fn execute_prometheus_expfmt_callbacks(&self) -> String {
137 let mut result = String::new();
138 for callback in &self.prometheus_expfmt_callbacks {
139 match callback() {
140 Ok(text) => {
141 if !text.is_empty() {
142 if !result.is_empty() && !result.ends_with('\n') {
143 result.push('\n');
144 }
145 result.push_str(&text);
146 }
147 }
148 Err(e) => {
149 tracing::error!("Error executing exposition text callback: {}", e);
150 }
151 }
152 }
153 result
154 }
155
156 pub fn has_metric_named(&self, metric_name: &str) -> bool {
158 self.prometheus_registry
159 .gather()
160 .iter()
161 .any(|mf| mf.name() == metric_name)
162 }
163}
164
165impl Default for MetricsRegistryEntry {
166 fn default() -> Self {
167 Self::new()
168 }
169}
170
171impl Clone for MetricsRegistryEntry {
172 fn clone(&self) -> Self {
173 Self {
174 prometheus_registry: self.prometheus_registry.clone(),
175 prometheus_update_callbacks: Vec::new(), prometheus_expfmt_callbacks: Vec::new(), }
178 }
179}
180
181#[derive(Clone)]
184pub struct DistributedRuntime {
185 runtime: Runtime,
187
188 etcd_client: Option<transports::etcd::Client>,
190 nats_client: transports::nats::Client,
191 store: Arc<dyn KeyValueStore>,
192 tcp_server: Arc<OnceCell<Arc<transports::tcp::server::TcpStreamServer>>>,
193 system_status_server: Arc<OnceLock<Arc<system_status_server::SystemStatusServerInfo>>>,
194
195 component_registry: component::Registry,
201
202 is_static: bool,
205
206 instance_sources: Arc<tokio::sync::Mutex<HashMap<Endpoint, Weak<InstanceSource>>>>,
207
208 system_health: Arc<std::sync::Mutex<SystemHealth>>,
210
211 hierarchy_to_metricsregistry: Arc<std::sync::RwLock<HashMap<String, MetricsRegistryEntry>>>,
214}