pub mod ack;
pub mod archive;
pub mod findings_store;
pub mod health;
pub mod query_api;
mod event_loop;
#[cfg(unix)]
mod json_socket;
mod listeners;
mod sampling;
mod tls;
use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, mpsc};
use crate::config::Config;
use crate::correlate::window::{TraceWindow, WindowConfig};
use crate::detect::DetectConfig;
use crate::event::SpanEvent;
use crate::report::GreenSummary;
use crate::report::metrics::MetricsState;
use event_loop::{
EnergyScraperHandles, EnergySources, EventLoopConfig, ListenerHandles, ShutdownTargets,
run_event_loop,
};
use listeners::{
setup_cloud_scraper, setup_correlator, setup_emaps_scraper, setup_scaphandre_scraper,
spawn_listeners,
};
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum DaemonError {
#[error("invalid listen address: {0}")]
InvalidAddr(#[from] std::net::AddrParseError),
#[error("failed to bind HTTP listener: {0}")]
HttpBind(std::io::Error),
#[error("failed to bind gRPC listener: {0}")]
GrpcBind(std::io::Error),
#[error("TLS configuration error: {0}")]
TlsConfig(#[source] TlsConfigError),
#[error("failed to load acknowledgments TOML at '{path}'")]
AckTomlLoad {
path: String,
#[source]
source: crate::acknowledgments::AcknowledgmentLoadError,
},
#[error("failed to initialize ack store at '{path}'")]
AckStoreInit {
path: String,
#[source]
source: ack::AckError,
},
#[error("[reporting] official intent is misconfigured:\n - {}", errors.join("\n - "))]
ReportingValidation {
errors: Vec<String>,
},
#[error("failed to open report archive at '{path}'")]
ArchiveOpen {
path: String,
#[source]
source: archive::ArchiveError,
},
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum TlsConfigError {
#[error("failed to read TLS cert '{path}'")]
ReadCert {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to read TLS key '{path}'")]
ReadKey {
path: String,
#[source]
source: std::io::Error,
},
#[error("failed to parse TLS cert chain")]
ParseCerts(#[source] tokio_rustls::rustls::pki_types::pem::Error),
#[error("failed to parse TLS private key")]
ParseKey(#[source] tokio_rustls::rustls::pki_types::pem::Error),
#[error("rustls server config rejected the cert+key pair")]
ServerConfig(#[source] tokio_rustls::rustls::Error),
}
pub async fn run(config: Config) -> Result<(), DaemonError> {
validate_official_reporting(&config)?;
let (tx, mut rx) = mpsc::channel::<Vec<SpanEvent>>(1024);
let window = Arc::new(Mutex::new(TraceWindow::new(WindowConfig {
max_events_per_trace: config.daemon.max_events_per_trace,
trace_ttl_ms: config.daemon.trace_ttl_ms,
max_active_traces: std::num::NonZeroUsize::new(config.daemon.max_active_traces)
.expect("config validates max_active_traces >= 1"),
})));
let metrics = Arc::new(MetricsState::new());
let findings_store = Arc::new(findings_store::FindingsStore::new(
config.daemon.max_retained_findings,
));
let correlator = setup_correlator(&config);
let green_summary_cell = Arc::new(RwLock::new(GreenSummary::disabled(0)));
let (grpc_handle, http_handle, json_socket_handle) = spawn_listeners(
&config,
tx.clone(),
window.clone(),
findings_store.clone(),
correlator.clone(),
metrics.clone(),
green_summary_cell.clone(),
)
.await?;
let scaphandre = setup_scaphandre_scraper(&config, &metrics);
let cloud = setup_cloud_scraper(&config, &metrics);
let emaps = setup_emaps_scraper(&config);
let archive_handle = match &config.daemon.archive {
Some(cfg) => Some(
archive::spawn(cfg).map_err(|source| DaemonError::ArchiveOpen {
path: cfg.path.clone(),
source,
})?,
),
None => None,
};
let base_carbon_ctx = config.carbon_context();
let detect_config = DetectConfig::from(&config);
let energy_sources = EnergySources {
base_carbon_ctx: &base_carbon_ctx,
scaphandre_state: scaphandre.state.as_deref(),
scaphandre_staleness_ms: scaphandre.staleness_ms,
cloud_state: cloud.state.as_deref(),
cloud_staleness_ms: cloud.staleness_ms,
emaps_state: emaps.state.as_deref(),
emaps_staleness_ms: emaps.staleness_ms,
};
let shutdown = ShutdownTargets {
energy: EnergyScraperHandles {
scaphandre: scaphandre.handle.as_ref(),
cloud: cloud.handle.as_ref(),
emaps: emaps.handle.as_ref(),
},
listeners: ListenerHandles {
grpc: &grpc_handle,
http: &http_handle,
json_socket: json_socket_handle.as_ref(),
},
};
run_event_loop(
&mut rx,
&window,
&metrics,
&findings_store,
correlator.as_deref(),
&detect_config,
&energy_sources,
shutdown,
EventLoopConfig {
green_enabled: config.green.enabled,
sampling_rate: config.daemon.sampling_rate,
evict_ms: config.daemon.trace_ttl_ms / 2,
confidence: config.confidence(),
},
&green_summary_cell,
archive_handle.as_ref(),
)
.await;
if let Some(handle) = archive_handle {
drop(handle.tx);
let _ = handle.join.await;
}
#[cfg(unix)]
{
let _ = std::fs::remove_file(&config.daemon.json_socket);
}
Ok(())
}
fn validate_official_reporting(config: &Config) -> Result<(), DaemonError> {
use crate::report::periodic::org_config;
if config.reporting.intent.as_deref() == Some("audited") {
return Err(DaemonError::ReportingValidation {
errors: vec![
"[reporting] intent = \"audited\" is not yet implemented, refusing to start daemon"
.to_string(),
],
});
}
if config.reporting.intent.as_deref() != Some("official") {
return Ok(());
}
let mut errors: Vec<String> = Vec::new();
let loaded = match &config.reporting.org_config_path {
None => {
errors.push(
"[reporting] org_config_path is required when intent = \"official\"".to_string(),
);
None
}
Some(path) => match org_config::load_from_path(path) {
Ok(cfg) => Some(cfg),
Err(err) => {
errors.push(err.to_string());
None
}
},
};
if let Some(cfg) = loaded {
errors.extend(org_config::validate_for_official(&cfg));
}
if errors.is_empty() {
Ok(())
} else {
Err(DaemonError::ReportingValidation { errors })
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn daemon_run_rejects_invalid_listen_address() {
let config = Config {
daemon: crate::config::DaemonConfig {
listen_addr: "not an address".to_string(),
..crate::config::DaemonConfig::default()
},
..Config::default()
};
let err = run(config).await.expect_err("should fail");
assert!(matches!(err, DaemonError::InvalidAddr(_)));
}
#[tokio::test]
async fn daemon_run_refuses_official_without_org_config_path() {
let config = Config {
reporting: crate::config::ReportingConfig {
intent: Some("official".to_string()),
..Default::default()
},
..Config::default()
};
let err = run(config).await.expect_err("must refuse");
match err {
DaemonError::ReportingValidation { errors } => {
assert!(
errors.iter().any(|e| e.contains("org_config_path")),
"got {errors:?}"
);
}
other => panic!("expected ReportingValidation, got {other:?}"),
}
}
#[tokio::test]
async fn daemon_run_refuses_official_with_unreadable_org_config() {
let config = Config {
reporting: crate::config::ReportingConfig {
intent: Some("official".to_string()),
org_config_path: Some("/no/such/path/org.toml".to_string()),
..Default::default()
},
..Config::default()
};
let err = run(config).await.expect_err("must refuse");
assert!(matches!(err, DaemonError::ReportingValidation { .. }));
}
#[tokio::test]
async fn daemon_run_refuses_official_when_org_config_misses_fields() {
use std::io::Write;
let mut file = tempfile::NamedTempFile::new().unwrap();
writeln!(
file,
r#"
[organisation]
name = ""
country = "fr"
[methodology]
sci_specification = "ISO/IEC 21031:2024"
enabled_patterns = ["slow_sql"]
disabled_patterns = []
conformance = "partial"
[methodology.calibration]
carbon_intensity_source = "electricity_maps"
specpower_table_version = "2026-04-24"
[scope_manifest]
total_applications_declared = 1
"#
)
.unwrap();
let config = Config {
reporting: crate::config::ReportingConfig {
intent: Some("official".to_string()),
org_config_path: Some(file.path().display().to_string()),
..Default::default()
},
..Config::default()
};
let err = run(config).await.expect_err("must refuse");
match err {
DaemonError::ReportingValidation { errors } => {
assert!(errors.iter().any(|e| e.contains("organisation.name")));
assert!(errors.iter().any(|e| e.contains("country")));
assert!(errors.iter().any(|e| e.contains("n_plus_one_sql")));
}
other => panic!("expected ReportingValidation, got {other:?}"),
}
}
#[test]
fn daemon_error_display_is_informative() {
let e1: DaemonError = "not a socket"
.parse::<std::net::SocketAddr>()
.unwrap_err()
.into();
assert!(format!("{e1}").contains("invalid listen address"));
let e2 = DaemonError::HttpBind(std::io::Error::other("boom"));
assert!(format!("{e2}").contains("HTTP listener"));
let e3 = DaemonError::GrpcBind(std::io::Error::other("boom"));
assert!(format!("{e3}").contains("gRPC listener"));
}
#[cfg(unix)]
#[tokio::test]
async fn daemon_run_ingests_json_socket_events_and_exposes_metrics() {
use std::fmt::Write as _;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream, UnixStream};
use tokio::time::Duration;
let l1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let l2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let http_port = l1.local_addr().unwrap().port();
let grpc_port = l2.local_addr().unwrap().port();
drop(l1);
drop(l2);
let (_dir, socket_path) = json_socket::unique_socket_dir_and_path("daemon-run");
let socket_path_str = socket_path.to_string_lossy().into_owned();
let config = Config {
daemon: crate::config::DaemonConfig {
listen_addr: "127.0.0.1".to_string(),
listen_port: http_port,
listen_port_grpc: grpc_port,
json_socket: socket_path_str,
trace_ttl_ms: 200, max_active_traces: 10,
..crate::config::DaemonConfig::default()
},
..Config::default()
};
let daemon_handle = tokio::spawn(async move {
let _ = run(config).await;
});
let mut client = None;
for _ in 0..40 {
if let Ok(s) = UnixStream::connect(&socket_path).await {
client = Some(s);
break;
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
let mut client = client.expect("daemon Unix socket must bind within 1s");
let mut payload = String::from("[");
for i in 1..=6 {
if i > 1 {
payload.push(',');
}
let _ = write!(
payload,
r#"{{"timestamp":"2025-07-10T14:32:01.{i:03}Z","trace_id":"daemon-t1","span_id":"s{i}","service":"svc-e2e","type":"sql","operation":"SELECT","target":"SELECT * FROM users WHERE id = {i}","duration_us":100,"source":{{"endpoint":"GET /test","method":"m"}}}}"#
);
}
payload.push(']');
client.write_all(payload.as_bytes()).await.unwrap();
client.write_all(b"\n").await.unwrap();
client.shutdown().await.unwrap();
let metrics_addr = format!("127.0.0.1:{http_port}");
let mut observed_events = false;
for _ in 0..60 {
tokio::time::sleep(Duration::from_millis(25)).await;
let Ok(mut stream) = TcpStream::connect(&metrics_addr).await else {
continue;
};
let req = "GET /metrics HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n";
if stream.write_all(req.as_bytes()).await.is_err() {
continue;
}
let mut buf = Vec::with_capacity(8192);
if stream.read_to_end(&mut buf).await.is_err() {
continue;
}
let body = String::from_utf8_lossy(&buf);
if body.contains("perf_sentinel_events_processed_total")
&& body.lines().any(|l| {
l.starts_with("perf_sentinel_events_processed_total")
&& l.split_whitespace()
.last()
.and_then(|v| v.parse::<f64>().ok())
.is_some_and(|v| v > 0.0)
})
{
observed_events = true;
break;
}
}
daemon_handle.abort();
let _ = daemon_handle.await;
assert!(
observed_events,
"daemon should have processed the 6 events and surfaced a \
non-zero `perf_sentinel_events_processed_total` on /metrics"
);
}
#[cfg(unix)]
#[tokio::test]
async fn daemon_exposes_health_endpoint_even_when_query_api_disabled() {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::time::Duration;
let l1 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let l2 = TcpListener::bind("127.0.0.1:0").await.unwrap();
let http_port = l1.local_addr().unwrap().port();
let grpc_port = l2.local_addr().unwrap().port();
drop(l1);
drop(l2);
let (_dir, socket_path) = json_socket::unique_socket_dir_and_path("daemon-health");
let config = Config {
daemon: crate::config::DaemonConfig {
listen_addr: "127.0.0.1".to_string(),
listen_port: http_port,
listen_port_grpc: grpc_port,
json_socket: socket_path.to_string_lossy().into_owned(),
api_enabled: false, ..crate::config::DaemonConfig::default()
},
..Config::default()
};
let daemon_handle = tokio::spawn(async move {
let _ = run(config).await;
});
let addr = format!("127.0.0.1:{http_port}");
let mut body = String::new();
let mut ok = false;
for _ in 0..60 {
tokio::time::sleep(Duration::from_millis(25)).await;
let Ok(mut stream) = TcpStream::connect(&addr).await else {
continue;
};
let req = "GET /health HTTP/1.0\r\nHost: 127.0.0.1\r\n\r\n";
if stream.write_all(req.as_bytes()).await.is_err() {
continue;
}
let mut buf = Vec::with_capacity(1024);
if stream.read_to_end(&mut buf).await.is_err() {
continue;
}
body = String::from_utf8_lossy(&buf).into_owned();
if body.starts_with("HTTP/1.") && body.contains(" 200 ") {
ok = true;
break;
}
}
daemon_handle.abort();
let _ = daemon_handle.await;
assert!(ok, "/health should return 200 OK; got response: {body}");
assert!(
body.contains("\"status\":\"ok\""),
"response body should include status=ok; got: {body}"
);
assert!(
body.contains(env!("CARGO_PKG_VERSION")),
"response body should include current version; got: {body}"
);
}
}