1pub mod auth;
2pub mod config;
3pub mod error;
4pub mod http;
5pub mod models;
6pub mod nats;
7pub mod path;
8pub mod rotate;
9pub mod state;
10pub mod telemetry;
11
12use std::net::SocketAddr;
13use std::sync::Arc;
14
15use anyhow::Context;
16use auth::Authorizer;
17use greentic_config_types::{
18 NetworkConfig, PathsConfig, SecretsBackendRefConfig, TelemetryConfig, TelemetryExporterKind,
19};
20use secrets_core::SecretsBroker;
21use secrets_core::crypto::dek_cache::DekCache;
22use secrets_core::crypto::envelope::EnvelopeService;
23use secrets_core::types::EncryptionAlgorithm;
24use tokio::net::TcpListener;
25use tokio::sync::Mutex;
26use tracing::{info, warn};
27
28pub use state::AppState;
29pub use telemetry::CorrelationId;
30
31#[derive(Clone)]
32pub struct BrokerRuntimeConfig {
33 pub http_addr: SocketAddr,
34 pub nats_url: Option<String>,
35 pub network: NetworkConfig,
36 pub telemetry: TelemetryConfig,
37 pub paths: PathsConfig,
38 pub secrets: SecretsBackendRefConfig,
39}
40
41fn effective_backend(config: &SecretsBackendRefConfig) -> SecretsBackendRefConfig {
42 if config.kind == "none" && std::env::var("GREENTIC_DEV_SECRETS_PATH").is_ok() {
43 return SecretsBackendRefConfig {
44 kind: "dev".to_string(),
45 reference: None,
46 };
47 }
48 config.clone()
49}
50
51pub async fn run(config: BrokerRuntimeConfig) -> anyhow::Result<()> {
52 apply_network_env(&config.network);
53 apply_telemetry_env(&config.telemetry);
54
55 let state = build_state_with_backend(&config.secrets).await?;
56
57 let http_listener = TcpListener::bind(config.http_addr).await.with_context(|| {
58 format!(
59 "failed to bind http listener on {addr}",
60 addr = config.http_addr
61 )
62 })?;
63
64 let http_addr = http_listener.local_addr()?;
65 info!(%http_addr, "http server listening");
66
67 let http_router = http::router(state.clone());
68 let http_server = tokio::spawn(async move {
69 axum::serve(http_listener, http_router)
70 .with_graceful_shutdown(shutdown_signal())
71 .await
72 .map_err(anyhow::Error::from)
73 });
74
75 let maybe_nats = if let Some(url) = &config.nats_url {
76 info!(nats_url = %url, "connecting to nats");
77 let client = async_nats::connect(url)
78 .await
79 .with_context(|| "failed to connect to nats")?;
80 Some(tokio::spawn(nats::run(client, state.clone())))
81 } else {
82 warn!("nats disabled; BROKER__NATS_URL not set");
83 None
84 };
85
86 if let Some(nats_task) = maybe_nats {
87 let (http_result, nats_result) = tokio::try_join!(http_server, nats_task)?;
88 http_result?;
89 nats_result?;
90 } else {
91 http_server.await??;
92 }
93
94 Ok(())
95}
96
97pub async fn build_state() -> anyhow::Result<AppState> {
98 build_state_with_backend(&SecretsBackendRefConfig::default()).await
99}
100
101pub async fn build_state_with_backend(
102 backend: &SecretsBackendRefConfig,
103) -> anyhow::Result<AppState> {
104 let backend = effective_backend(backend);
105 let authorizer = Authorizer::from_env().await?;
106 let components = config::load_backend_components(&backend.kind).await?;
107 let crypto = EnvelopeService::new(
108 components.key_provider,
109 DekCache::from_env(),
110 EncryptionAlgorithm::Aes256Gcm,
111 );
112 let broker = SecretsBroker::new(components.backend, crypto);
113 Ok(AppState::new(
114 Arc::new(Mutex::new(broker)),
115 Arc::new(authorizer),
116 ))
117}
118
119fn apply_network_env(network: &NetworkConfig) {
120 if let Some(proxy) = &network.proxy_url {
121 unsafe {
122 std::env::set_var("HTTPS_PROXY", proxy);
123 std::env::set_var("HTTP_PROXY", proxy);
124 }
125 }
126 let _ = network.tls_mode;
127}
128
129fn apply_telemetry_env(telemetry: &TelemetryConfig) {
130 if !telemetry.enabled {
131 unsafe {
132 std::env::set_var("OTEL_TRACES_EXPORTER", "none");
133 std::env::set_var("OTEL_METRICS_EXPORTER", "none");
134 }
135 return;
136 }
137
138 if let Some(endpoint) = &telemetry.endpoint {
139 unsafe {
140 std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint);
141 }
142 }
143 if telemetry.sampling != 1.0 {
144 unsafe {
145 std::env::set_var("OTEL_TRACES_SAMPLER", "parentbased_traceidratio");
146 std::env::set_var("OTEL_TRACES_SAMPLER_ARG", telemetry.sampling.to_string());
147 }
148 }
149 match telemetry.exporter {
150 TelemetryExporterKind::Otlp => unsafe {
151 std::env::set_var("OTEL_TRACES_EXPORTER", "otlp");
152 std::env::set_var("OTEL_METRICS_EXPORTER", "otlp");
153 },
154 TelemetryExporterKind::Stdout => unsafe {
155 std::env::set_var("OTEL_TRACES_EXPORTER", "stdout");
156 std::env::set_var("OTEL_METRICS_EXPORTER", "none");
157 },
158 TelemetryExporterKind::None => unsafe {
159 std::env::set_var("OTEL_TRACES_EXPORTER", "none");
160 std::env::set_var("OTEL_METRICS_EXPORTER", "none");
161 },
162 }
163}
164
165async fn shutdown_signal() {
166 let ctrl_c = async {
167 if let Err(err) = tokio::signal::ctrl_c().await {
168 warn!(?err, "failed to install ctrl-c handler");
169 }
170 };
171
172 #[cfg(unix)]
173 let terminate = async {
174 use tokio::signal::unix::{SignalKind, signal};
175 match signal(SignalKind::terminate()) {
176 Ok(mut stream) => {
177 stream.recv().await;
178 }
179 Err(err) => warn!(?err, "failed to install sigterm handler"),
180 }
181 };
182
183 #[cfg(not(unix))]
184 let terminate = std::future::pending::<()>();
185
186 tokio::select! {
187 _ = ctrl_c => {},
188 _ = terminate => {},
189 }
190}