Skip to main content

secrets_broker/
lib.rs

1pub mod auth;
2pub mod config;
3pub mod error;
4pub mod http;
5pub mod models;
6pub mod nats;
7pub mod path;
8pub mod rotate;
9pub mod state;
10pub mod telemetry;
11
12use std::net::SocketAddr;
13use std::sync::Arc;
14
15use anyhow::Context;
16use auth::Authorizer;
17use greentic_config_types::{
18    NetworkConfig, PathsConfig, SecretsBackendRefConfig, TelemetryConfig, TelemetryExporterKind,
19};
20use secrets_core::SecretsBroker;
21use secrets_core::crypto::dek_cache::DekCache;
22use secrets_core::crypto::envelope::EnvelopeService;
23use secrets_core::types::EncryptionAlgorithm;
24use tokio::net::TcpListener;
25use tokio::sync::Mutex;
26use tracing::{info, warn};
27
28pub use state::AppState;
29pub use telemetry::CorrelationId;
30
31#[derive(Clone)]
32pub struct BrokerRuntimeConfig {
33    pub http_addr: SocketAddr,
34    pub nats_url: Option<String>,
35    pub network: NetworkConfig,
36    pub telemetry: TelemetryConfig,
37    pub paths: PathsConfig,
38    pub secrets: SecretsBackendRefConfig,
39}
40
41fn effective_backend(config: &SecretsBackendRefConfig) -> SecretsBackendRefConfig {
42    if config.kind == "none" && std::env::var("GREENTIC_DEV_SECRETS_PATH").is_ok() {
43        return SecretsBackendRefConfig {
44            kind: "dev".to_string(),
45            reference: None,
46        };
47    }
48    config.clone()
49}
50
51pub async fn run(config: BrokerRuntimeConfig) -> anyhow::Result<()> {
52    apply_network_env(&config.network);
53    apply_telemetry_env(&config.telemetry);
54
55    let state = build_state_with_backend(&config.secrets).await?;
56
57    let http_listener = TcpListener::bind(config.http_addr).await.with_context(|| {
58        format!(
59            "failed to bind http listener on {addr}",
60            addr = config.http_addr
61        )
62    })?;
63
64    let http_addr = http_listener.local_addr()?;
65    info!(%http_addr, "http server listening");
66
67    let http_router = http::router(state.clone());
68    let http_server = tokio::spawn(async move {
69        axum::serve(http_listener, http_router)
70            .with_graceful_shutdown(shutdown_signal())
71            .await
72            .map_err(anyhow::Error::from)
73    });
74
75    let maybe_nats = if let Some(url) = &config.nats_url {
76        info!(nats_url = %url, "connecting to nats");
77        let client = async_nats::connect(url)
78            .await
79            .with_context(|| "failed to connect to nats")?;
80        Some(tokio::spawn(nats::run(client, state.clone())))
81    } else {
82        warn!("nats disabled; BROKER__NATS_URL not set");
83        None
84    };
85
86    if let Some(nats_task) = maybe_nats {
87        let (http_result, nats_result) = tokio::try_join!(http_server, nats_task)?;
88        http_result?;
89        nats_result?;
90    } else {
91        http_server.await??;
92    }
93
94    Ok(())
95}
96
97pub async fn build_state() -> anyhow::Result<AppState> {
98    build_state_with_backend(&SecretsBackendRefConfig::default()).await
99}
100
101pub async fn build_state_with_backend(
102    backend: &SecretsBackendRefConfig,
103) -> anyhow::Result<AppState> {
104    let backend = effective_backend(backend);
105    let authorizer = Authorizer::from_env().await?;
106    let components = config::load_backend_components(&backend.kind).await?;
107    let crypto = EnvelopeService::new(
108        components.key_provider,
109        DekCache::from_env(),
110        EncryptionAlgorithm::Aes256Gcm,
111    );
112    let broker = SecretsBroker::new(components.backend, crypto);
113    Ok(AppState::new(
114        Arc::new(Mutex::new(broker)),
115        Arc::new(authorizer),
116    ))
117}
118
119fn apply_network_env(network: &NetworkConfig) {
120    if let Some(proxy) = &network.proxy_url {
121        unsafe {
122            std::env::set_var("HTTPS_PROXY", proxy);
123            std::env::set_var("HTTP_PROXY", proxy);
124        }
125    }
126    let _ = network.tls_mode;
127}
128
129fn apply_telemetry_env(telemetry: &TelemetryConfig) {
130    if !telemetry.enabled {
131        unsafe {
132            std::env::set_var("OTEL_TRACES_EXPORTER", "none");
133            std::env::set_var("OTEL_METRICS_EXPORTER", "none");
134        }
135        return;
136    }
137
138    if let Some(endpoint) = &telemetry.endpoint {
139        unsafe {
140            std::env::set_var("OTEL_EXPORTER_OTLP_ENDPOINT", endpoint);
141        }
142    }
143    if telemetry.sampling != 1.0 {
144        unsafe {
145            std::env::set_var("OTEL_TRACES_SAMPLER", "parentbased_traceidratio");
146            std::env::set_var("OTEL_TRACES_SAMPLER_ARG", telemetry.sampling.to_string());
147        }
148    }
149    match telemetry.exporter {
150        TelemetryExporterKind::Otlp => unsafe {
151            std::env::set_var("OTEL_TRACES_EXPORTER", "otlp");
152            std::env::set_var("OTEL_METRICS_EXPORTER", "otlp");
153        },
154        TelemetryExporterKind::Stdout => unsafe {
155            std::env::set_var("OTEL_TRACES_EXPORTER", "stdout");
156            std::env::set_var("OTEL_METRICS_EXPORTER", "none");
157        },
158        TelemetryExporterKind::None => unsafe {
159            std::env::set_var("OTEL_TRACES_EXPORTER", "none");
160            std::env::set_var("OTEL_METRICS_EXPORTER", "none");
161        },
162    }
163}
164
165async fn shutdown_signal() {
166    let ctrl_c = async {
167        if let Err(err) = tokio::signal::ctrl_c().await {
168            warn!(?err, "failed to install ctrl-c handler");
169        }
170    };
171
172    #[cfg(unix)]
173    let terminate = async {
174        use tokio::signal::unix::{SignalKind, signal};
175        match signal(SignalKind::terminate()) {
176            Ok(mut stream) => {
177                stream.recv().await;
178            }
179            Err(err) => warn!(?err, "failed to install sigterm handler"),
180        }
181    };
182
183    #[cfg(not(unix))]
184    let terminate = std::future::pending::<()>();
185
186    tokio::select! {
187        _ = ctrl_c => {},
188        _ = terminate => {},
189    }
190}