use std::sync::Arc;
use tokio::sync::{Mutex, RwLock, mpsc};
use tokio::time::{Duration, interval};
use crate::correlate::Trace;
use crate::correlate::window::TraceWindow;
use crate::detect;
#[cfg(test)]
use crate::detect::sanitizer_aware::SanitizerAwareMode;
use crate::detect::{Confidence, DetectConfig};
use crate::event::SpanEvent;
use crate::normalize;
use crate::report::GreenSummary;
use crate::report::metrics::MetricsState;
use crate::score;
use crate::score::cloud_energy::CloudEnergyState;
use crate::score::electricity_maps::ElectricityMapsState;
use crate::score::kepler::KeplerState;
use crate::score::redfish::RedfishState;
use crate::score::scaphandre::ScaphandreState;
use super::findings_store;
use super::sampling::apply_sampling;
#[derive(Clone, Copy)]
pub(super) struct EventLoopConfig {
pub(super) green_enabled: bool,
pub(super) sampling_rate: f64,
pub(super) evict_ms: u64,
pub(super) confidence: Confidence,
pub(super) analysis_queue_capacity: usize,
}
pub(super) struct ShutdownTargets<'a> {
pub(super) energy: EnergyScraperHandles<'a>,
pub(super) listeners: ListenerHandles<'a>,
}
#[derive(Clone, Copy)]
pub(super) struct EnergyScraperHandles<'a> {
pub(super) scaphandre: Option<&'a tokio::task::JoinHandle<()>>,
pub(super) kepler: Option<&'a tokio::task::JoinHandle<()>>,
pub(super) redfish: Option<&'a tokio::task::JoinHandle<()>>,
pub(super) cloud: Option<&'a tokio::task::JoinHandle<()>>,
pub(super) emaps: Option<&'a tokio::task::JoinHandle<()>>,
}
#[derive(Clone, Copy)]
pub(super) struct ListenerHandles<'a> {
pub(super) grpc: &'a tokio::task::JoinHandle<()>,
pub(super) http: &'a tokio::task::JoinHandle<()>,
pub(super) json_socket: Option<&'a tokio::task::JoinHandle<()>>,
}
pub(super) struct EnergySources<'a> {
pub(super) base_carbon_ctx: Arc<score::carbon::CarbonContext>,
pub(super) scaphandre_state: Option<&'a ScaphandreState>,
pub(super) scaphandre_staleness_ms: u64,
pub(super) kepler_state: Option<&'a KeplerState>,
pub(super) kepler_staleness_ms: u64,
pub(super) redfish_state: Option<&'a RedfishState>,
pub(super) redfish_staleness_ms: u64,
pub(super) cloud_state: Option<&'a CloudEnergyState>,
pub(super) cloud_staleness_ms: u64,
pub(super) emaps_state: Option<&'a ElectricityMapsState>,
pub(super) emaps_staleness_ms: u64,
}
struct AnalysisBatch {
traces: Vec<(String, Vec<normalize::NormalizedEvent>)>,
carbon_ctx: Arc<score::carbon::CarbonContext>,
}
impl AnalysisBatch {
fn new(
traces: Vec<(String, Vec<normalize::NormalizedEvent>)>,
sources: &EnergySources<'_>,
) -> Self {
Self {
traces,
carbon_ctx: build_owned_tick_ctx(sources),
}
}
}
struct AnalysisWorkerCtx {
detect_config: DetectConfig,
green_enabled: bool,
confidence: Confidence,
metrics: Arc<MetricsState>,
findings_store: Arc<findings_store::FindingsStore>,
correlator: Option<Arc<Mutex<detect::correlate_cross::CrossTraceCorrelator>>>,
green_summary_cell: Arc<RwLock<GreenSummary>>,
archive_tx: Option<mpsc::Sender<super::archive::OwnedArchive>>,
}
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_event_loop(
rx: &mut mpsc::Receiver<Vec<SpanEvent>>,
window: &Arc<Mutex<TraceWindow>>,
metrics: Arc<MetricsState>,
findings_store: Arc<findings_store::FindingsStore>,
correlator: Option<Arc<Mutex<detect::correlate_cross::CrossTraceCorrelator>>>,
detect_config: &DetectConfig,
energy_sources: &EnergySources<'_>,
shutdown: ShutdownTargets<'_>,
loop_cfg: EventLoopConfig,
green_summary_cell: Arc<RwLock<GreenSummary>>,
archive_tx: Option<mpsc::Sender<super::archive::OwnedArchive>>,
) -> Result<(), super::DaemonError> {
let (work_tx, work_rx) = mpsc::channel::<AnalysisBatch>(loop_cfg.analysis_queue_capacity);
let worker = tokio::spawn(run_analysis_worker(
work_rx,
AnalysisWorkerCtx {
detect_config: detect_config.clone(),
green_enabled: loop_cfg.green_enabled,
confidence: loop_cfg.confidence,
metrics: metrics.clone(),
findings_store,
correlator,
green_summary_cell,
archive_tx,
},
));
drive_event_loop(
rx,
window,
&metrics,
energy_sources,
shutdown,
loop_cfg,
work_tx,
worker,
crate::shutdown::shutdown_signal(),
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn drive_event_loop(
rx: &mut mpsc::Receiver<Vec<SpanEvent>>,
window: &Arc<Mutex<TraceWindow>>,
metrics: &MetricsState,
energy_sources: &EnergySources<'_>,
shutdown: ShutdownTargets<'_>,
loop_cfg: EventLoopConfig,
work_tx: mpsc::Sender<AnalysisBatch>,
mut worker: tokio::task::JoinHandle<()>,
shutdown_fut: impl Future<Output = ()>,
) -> Result<(), super::DaemonError> {
let mut ticker = interval(Duration::from_millis(loop_cfg.evict_ms.max(100)));
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut service_meter = ServiceMeter {
known_services: std::collections::HashMap::new(),
max_service_cardinality: MAX_SERVICE_CARDINALITY,
service_cap_warned: false,
};
tokio::pin!(shutdown_fut);
let graceful = loop {
tokio::select! {
Some(events) = rx.recv() => {
let lru_evicted = ingest_event_batch(
events,
loop_cfg.sampling_rate,
window,
metrics,
&mut service_meter,
).await;
enqueue_for_analysis(lru_evicted, energy_sources, &work_tx, metrics);
}
_ = ticker.tick() => {
let expired = evict_expired_traces(window, metrics).await;
enqueue_for_analysis(expired, energy_sources, &work_tx, metrics);
}
() = &mut shutdown_fut => {
tracing::info!("Shutting down daemon, processing remaining traces...");
break true;
}
res = &mut worker => {
tracing::error!(result = ?res, "analysis worker stopped unexpectedly; daemon exiting for restart");
break false;
}
}
};
shutdown_listeners(shutdown.energy, shutdown.listeners);
if !graceful {
return Err(super::DaemonError::AnalysisWorkerStopped);
}
drain_to_worker_and_join(window, energy_sources, work_tx, worker, metrics).await;
Ok(())
}
async fn run_analysis_worker(mut work_rx: mpsc::Receiver<AnalysisBatch>, wctx: AnalysisWorkerCtx) {
while let Some(batch) = work_rx.recv().await {
wctx.metrics.analysis_queue_depth.dec();
process_traces(
batch.traces,
ProcessTracesCtx {
detect_config: &wctx.detect_config,
green_enabled: wctx.green_enabled,
carbon_ctx: batch.carbon_ctx.as_ref(),
metrics: &wctx.metrics,
confidence: wctx.confidence,
findings_store: &wctx.findings_store,
correlator: wctx.correlator.as_deref(),
green_summary_cell: &wctx.green_summary_cell,
archive_tx: wctx.archive_tx.as_ref(),
},
)
.await;
}
}
pub(crate) const MAX_SERVICE_CARDINALITY: usize = 1024;
struct ServiceMeter {
known_services: std::collections::HashMap<String, prometheus::Counter>,
max_service_cardinality: usize,
service_cap_warned: bool,
}
impl ServiceMeter {
fn record(&mut self, service: &str, metrics: &MetricsState) {
if let Some(child) = self.known_services.get(service) {
child.inc();
} else if self.known_services.len() < self.max_service_cardinality {
let child = metrics.service_io_ops_total.with_label_values(&[service]);
child.inc();
self.known_services.insert(service.to_string(), child);
} else {
metrics.service_io_ops_overflow_total.inc();
if !self.service_cap_warned {
tracing::warn!(
cap = self.max_service_cardinality,
"Service cardinality cap reached; new services will \
not have per-service I/O op counters"
);
self.service_cap_warned = true;
}
}
}
}
async fn ingest_event_batch(
events: Vec<SpanEvent>,
sampling_rate: f64,
window: &Arc<Mutex<TraceWindow>>,
metrics: &MetricsState,
service_meter: &mut ServiceMeter,
) -> Vec<(String, Vec<normalize::NormalizedEvent>)> {
let events = apply_sampling(events, sampling_rate);
let event_count = events.len();
let normalized: Vec<_> = events.into_iter().map(normalize::normalize).collect();
for event in &normalized {
service_meter.record(event.event.service.as_ref(), metrics);
}
let now_ms = current_time_ms();
let mut lru_evicted = Vec::new();
{
let mut w = window.lock().await;
for event in normalized {
if let Some(evicted) = w.push(event, now_ms) {
lru_evicted.push(evicted);
}
}
metrics.active_traces.set(w.active_traces() as f64);
}
metrics.events_processed_total.inc_by(event_count as f64);
lru_evicted
}
async fn evict_expired_traces(
window: &Arc<Mutex<TraceWindow>>,
metrics: &MetricsState,
) -> Vec<(String, Vec<normalize::NormalizedEvent>)> {
let now_ms = current_time_ms();
let mut w = window.lock().await;
let expired = w.evict_expired(now_ms);
metrics.active_traces.set(w.active_traces() as f64);
expired
}
fn build_owned_tick_ctx(sources: &EnergySources<'_>) -> Arc<score::carbon::CarbonContext> {
match build_tick_ctx(
&sources.base_carbon_ctx,
sources.scaphandre_state,
sources.scaphandre_staleness_ms,
sources.kepler_state,
sources.kepler_staleness_ms,
sources.redfish_state,
sources.redfish_staleness_ms,
sources.cloud_state,
sources.cloud_staleness_ms,
sources.emaps_state,
sources.emaps_staleness_ms,
) {
std::borrow::Cow::Borrowed(_) => Arc::clone(&sources.base_carbon_ctx),
std::borrow::Cow::Owned(ctx) => Arc::new(ctx),
}
}
fn enqueue_for_analysis(
traces: Vec<(String, Vec<normalize::NormalizedEvent>)>,
sources: &EnergySources<'_>,
work_tx: &mpsc::Sender<AnalysisBatch>,
metrics: &MetricsState,
) {
if traces.is_empty() {
return;
}
let trace_count = traces.len();
match work_tx.try_reserve() {
Ok(permit) => {
metrics.analysis_queue_depth.inc();
permit.send(AnalysisBatch::new(traces, sources));
}
Err(mpsc::error::TrySendError::Full(())) => {
metrics.record_shed(trace_count);
tracing::warn!(traces = trace_count, "analysis queue full, shedding batch");
}
Err(mpsc::error::TrySendError::Closed(())) => {
metrics.record_shed(trace_count);
tracing::error!(
traces = trace_count,
"analysis worker stopped, shedding batch"
);
}
}
}
async fn drain_to_worker_and_join(
window: &Arc<Mutex<TraceWindow>>,
sources: &EnergySources<'_>,
work_tx: mpsc::Sender<AnalysisBatch>,
worker: tokio::task::JoinHandle<()>,
metrics: &MetricsState,
) {
let remaining = {
let mut w = window.lock().await;
w.drain_all()
};
if !remaining.is_empty() {
let trace_count = remaining.len();
let batch = AnalysisBatch::new(remaining, sources);
if work_tx.send(batch).await.is_ok() {
metrics.analysis_queue_depth.inc();
} else {
metrics.record_shed(trace_count);
tracing::error!(
traces = trace_count,
"analysis worker stopped before shutdown drain"
);
}
}
drop(work_tx);
let _ = worker.await;
}
fn shutdown_listeners(energy: EnergyScraperHandles<'_>, listeners: ListenerHandles<'_>) {
if let Some(handle) = energy.emaps {
handle.abort();
}
if let Some(handle) = energy.cloud {
handle.abort();
}
if let Some(handle) = energy.redfish {
handle.abort();
}
if let Some(handle) = energy.kepler {
handle.abort();
}
if let Some(handle) = energy.scaphandre {
handle.abort();
}
listeners.grpc.abort();
listeners.http.abort();
if let Some(handle) = listeners.json_socket {
handle.abort();
}
}
#[allow(clippy::too_many_arguments)]
fn build_tick_ctx<'a>(
base: &'a score::carbon::CarbonContext,
scaphandre_state: Option<&ScaphandreState>,
scaphandre_staleness_ms: u64,
kepler_state: Option<&KeplerState>,
kepler_staleness_ms: u64,
redfish_state: Option<&RedfishState>,
redfish_staleness_ms: u64,
cloud_state: Option<&CloudEnergyState>,
cloud_staleness_ms: u64,
emaps_state: Option<&ElectricityMapsState>,
emaps_staleness_ms: u64,
) -> std::borrow::Cow<'a, score::carbon::CarbonContext> {
let now = score::scaphandre::monotonic_ms();
let cloud_snap = cloud_state
.map(|s| s.snapshot(now, cloud_staleness_ms))
.unwrap_or_default();
let redfish_snap = redfish_state
.map(|s| s.snapshot(now, redfish_staleness_ms))
.unwrap_or_default();
let kepler_snap = kepler_state
.map(|s| s.snapshot(now, kepler_staleness_ms))
.unwrap_or_default();
let scaph_snap = scaphandre_state
.map(|s| s.snapshot(now, scaphandre_staleness_ms))
.unwrap_or_default();
let emaps_snap = emaps_state
.map(|s| s.snapshot_with_metadata(now, emaps_staleness_ms))
.unwrap_or_default();
if cloud_snap.is_empty()
&& redfish_snap.is_empty()
&& kepler_snap.is_empty()
&& scaph_snap.is_empty()
&& emaps_snap.is_empty()
{
return std::borrow::Cow::Borrowed(base);
}
let mut merged: std::collections::HashMap<String, score::carbon::EnergyEntry> =
std::collections::HashMap::with_capacity(
cloud_snap.len() + redfish_snap.len() + kepler_snap.len() + scaph_snap.len(),
);
for (service, energy_kwh) in cloud_snap {
merged.insert(service, score::carbon::EnergyEntry::cloud(energy_kwh));
}
for (service, energy_kwh) in redfish_snap {
merged.insert(service, score::carbon::EnergyEntry::redfish(energy_kwh));
}
for (service, energy_kwh) in kepler_snap {
merged.insert(service, score::carbon::EnergyEntry::kepler(energy_kwh));
}
for (service, energy_kwh) in scaph_snap {
merged.insert(service, score::carbon::EnergyEntry::scaphandre(energy_kwh));
}
let mut ctx = base.clone();
ctx.energy_snapshot = if merged.is_empty() {
None
} else {
Some(merged)
};
if !emaps_snap.is_empty() {
ctx.real_time_intensity = Some(emaps_snap);
}
std::borrow::Cow::Owned(ctx)
}
fn record_slow_durations(traces: &[Trace], detect_config: &DetectConfig, metrics: &MetricsState) {
let slow_threshold_us = detect_config.slow_threshold_ms.saturating_mul(1000);
let hist_sql = metrics.slow_duration_seconds.with_label_values(&["sql"]);
let hist_http = metrics
.slow_duration_seconds
.with_label_values(&["http_out"]);
for trace in traces {
for span in &trace.spans {
if span.event.duration_us > slow_threshold_us {
let hist = match span.event.event_type {
crate::event::EventType::Sql => &hist_sql,
crate::event::EventType::HttpOut => &hist_http,
};
hist.observe(span.event.duration_us as f64 / 1_000_000.0);
}
}
}
}
fn emit_findings_and_update_metrics(
trace_count: usize,
findings: &[detect::Finding],
green_summary: &GreenSummary,
metrics: &MetricsState,
) {
use std::io::Write;
metrics.traces_analyzed_total.inc_by(trace_count as f64);
metrics
.total_io_ops
.inc_by(green_summary.total_io_ops as f64);
metrics
.avoidable_io_ops
.inc_by(green_summary.avoidable_io_ops as f64);
let cumulative_total = metrics.total_io_ops.get();
if cumulative_total > 0.0 {
metrics
.io_waste_ratio
.set(metrics.avoidable_io_ops.get() / cumulative_total);
}
metrics.energy_kwh.set(green_summary.energy_kwh);
metrics
.carbon_gco2
.set(green_summary.regions.iter().map(|r| r.co2_gco2).sum());
metrics.record_exemplars(findings, green_summary);
let stdout = std::io::stdout();
let mut lock = stdout.lock();
for finding in findings {
metrics
.findings_total
.with_label_values(&[finding.finding_type.as_str(), finding.severity.as_str()])
.inc();
if serde_json::to_writer(&mut lock, finding).is_ok() {
let _ = writeln!(lock);
}
}
}
struct ProcessTracesCtx<'a> {
detect_config: &'a DetectConfig,
green_enabled: bool,
carbon_ctx: &'a score::carbon::CarbonContext,
metrics: &'a MetricsState,
confidence: Confidence,
findings_store: &'a findings_store::FindingsStore,
correlator: Option<&'a Mutex<detect::correlate_cross::CrossTraceCorrelator>>,
green_summary_cell: &'a Arc<RwLock<GreenSummary>>,
archive_tx: Option<&'a mpsc::Sender<super::archive::OwnedArchive>>,
}
async fn process_traces(
traces: Vec<(String, Vec<normalize::NormalizedEvent>)>,
ctx: ProcessTracesCtx<'_>,
) {
if traces.is_empty() {
return;
}
let trace_count = traces.len();
let trace_structs: Vec<Trace> = traces
.into_iter()
.map(|(trace_id, spans)| Trace { trace_id, spans })
.collect();
let findings = detect::run_full_detection(&trace_structs, ctx.detect_config);
record_slow_durations(&trace_structs, ctx.detect_config, ctx.metrics);
let (mut findings, green_summary, per_endpoint_io_ops) = if ctx.green_enabled {
score::score_green(&trace_structs, findings, Some(ctx.carbon_ctx))
} else {
let total_io_ops = trace_structs.iter().map(|t| t.spans.len()).sum();
(findings, GreenSummary::disabled(total_io_ops), Vec::new())
};
ctx.green_summary_cell
.write()
.await
.clone_from(&green_summary);
detect::apply_confidence(&mut findings, ctx.confidence);
crate::acknowledgments::enrich_with_signatures(&mut findings);
let findings = findings;
let now_ms = current_time_ms();
if !findings.is_empty() {
ctx.findings_store.push_batch(&findings, now_ms).await;
#[allow(clippy::cast_precision_loss)] ctx.metrics
.stored_findings
.set(ctx.findings_store.len().await as f64);
}
if let Some(correlator) = ctx.correlator {
let evicted = correlator.lock().await.ingest(&findings, now_ms);
if evicted > 0 {
static CAP_WARNED: std::sync::atomic::AtomicBool =
std::sync::atomic::AtomicBool::new(false);
ctx.metrics
.correlator_pairs_evicted_total
.inc_by(evicted as u64);
if !CAP_WARNED.swap(true, std::sync::atomic::Ordering::Relaxed) {
tracing::warn!(
evicted,
"correlator pair cap reached, dropping pairs (see \
perf_sentinel_correlator_pairs_evicted_total)"
);
}
}
}
emit_findings_and_update_metrics(trace_count, &findings, &green_summary, ctx.metrics);
if let Some(archive_tx) = ctx.archive_tx {
let events_processed = trace_structs.iter().map(|t| t.spans.len()).sum();
let disclosure_waste = green_summary.co2.is_some().then(|| {
score::canonical::compute_disclosure_waste(
&trace_structs,
&green_summary,
ctx.detect_config,
)
});
let report = crate::report::Report {
analysis: crate::report::Analysis {
duration_ms: 0,
events_processed,
traces_analyzed: trace_count,
},
findings,
green_summary,
quality_gate: crate::report::QualityGate {
passed: true,
rules: vec![],
},
per_endpoint_io_ops,
correlations: vec![],
warnings: vec![],
warning_details: vec![],
acknowledged_findings: vec![],
binary_version: env!("CARGO_PKG_VERSION").to_string(),
disclosure_waste,
};
let archive = super::archive::OwnedArchive {
ts: chrono::Utc::now(),
report,
};
super::archive::try_send(archive_tx, archive);
}
}
fn current_time_ms() -> u64 {
if let Ok(duration) = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
} else {
tracing::warn!(
"System clock is before Unix epoch; using 0 as current_time_ms. \
Check system time configuration."
);
0
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::correlate::window::WindowConfig;
use crate::event::{EventSource, EventType, SpanEvent};
use core::assert_matches;
fn make_normalized(trace_id: &str, target: &str) -> normalize::NormalizedEvent {
make_normalized_for_service(trace_id, "test", target)
}
fn make_normalized_for_service(
trace_id: &str,
service: &str,
target: &str,
) -> normalize::NormalizedEvent {
let mut event = crate::test_helpers::make_sql_event_with_duration(
trace_id,
"s1",
target,
"2025-07-10T14:32:01.123Z",
100,
);
event.service = Arc::from(service);
normalize::normalize(event)
}
fn default_detect_config() -> DetectConfig {
DetectConfig {
n_plus_one_threshold: 5,
window_ms: 500,
slow_threshold_ms: 500,
slow_min_occurrences: 3,
max_fanout: 20,
chatty_service_min_calls: 15,
pool_saturation_concurrent_threshold: 10,
serialized_min_sequential: 3,
sanitizer_aware_classification: SanitizerAwareMode::default(),
}
}
fn empty_carbon_ctx() -> score::carbon::CarbonContext {
score::carbon::CarbonContext::default()
}
fn test_ctx<'a>(
detect_config: &'a DetectConfig,
carbon_ctx: &'a score::carbon::CarbonContext,
metrics: &'a MetricsState,
findings_store: &'a findings_store::FindingsStore,
green_enabled: bool,
green_summary_cell: &'a Arc<RwLock<GreenSummary>>,
) -> ProcessTracesCtx<'a> {
ProcessTracesCtx {
detect_config,
green_enabled,
carbon_ctx,
metrics,
confidence: Confidence::DaemonStaging,
findings_store,
correlator: None,
green_summary_cell,
archive_tx: None,
}
}
fn fresh_green_cell() -> Arc<RwLock<GreenSummary>> {
Arc::new(RwLock::new(GreenSummary::disabled(0)))
}
#[tokio::test]
async fn process_traces_empty_does_nothing() {
let metrics = MetricsState::new();
let ctx = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
process_traces(
vec![],
test_ctx(&detect_config, &ctx, &metrics, &store, true, &cell),
)
.await;
}
#[tokio::test]
async fn process_traces_with_n_plus_one() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
let ctx = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
process_traces(
vec![("t1".to_string(), events)],
test_ctx(&detect_config, &ctx, &metrics, &store, true, &cell),
)
.await;
}
#[tokio::test]
async fn process_traces_clean_no_finding() {
let events = vec![
make_normalized("t1", "SELECT * FROM users WHERE id = 1"),
make_normalized("t1", "SELECT * FROM orders WHERE id = 2"),
];
let metrics = MetricsState::new();
let ctx = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
process_traces(
vec![("t1".to_string(), events)],
test_ctx(&detect_config, &ctx, &metrics, &store, true, &cell),
)
.await;
}
#[test]
fn current_time_ms_returns_nonzero() {
let ms = current_time_ms();
assert!(ms > 0, "current_time_ms should return a positive value");
}
#[test]
fn evict_expired_returns_traces() {
let config = WindowConfig {
trace_ttl_ms: 100,
..Default::default()
};
let mut w = TraceWindow::new(config);
let event = normalize::normalize(SpanEvent {
timestamp: "2025-07-10T14:32:01.123Z".to_string(),
trace_id: "t1".to_string(),
span_id: "s1".to_string(),
parent_span_id: None,
service: Arc::from("test"),
cloud_region: None,
event_type: EventType::Sql,
operation: "SELECT".to_string(),
target: "SELECT 1".to_string(),
duration_us: 100,
source: EventSource {
endpoint: "GET /test".to_string(),
method: "Test::test".to_string(),
},
status_code: None,
response_size_bytes: None,
code_function: None,
code_filepath: None,
code_lineno: None,
code_namespace: None,
instrumentation_scopes: Vec::new(),
});
w.push(event, 0);
assert_eq!(w.active_traces(), 1);
let expired = w.evict_expired(50);
assert!(expired.is_empty());
assert_eq!(w.active_traces(), 1);
let expired = w.evict_expired(150);
assert_eq!(expired.len(), 1);
assert_eq!(expired[0].0, "t1");
assert_eq!(expired[0].1.len(), 1);
assert_eq!(w.active_traces(), 0);
}
#[tokio::test]
async fn process_traces_updates_metrics() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
let ctx = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
process_traces(
vec![("t1".to_string(), events)],
test_ctx(&detect_config, &ctx, &metrics, &store, true, &cell),
)
.await;
let output = metrics.render();
assert!(output.contains("perf_sentinel_traces_analyzed_total"));
assert!(output.contains("perf_sentinel_findings_total"));
}
#[tokio::test]
async fn process_traces_green_disabled() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
let ctx = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
process_traces(
vec![("t1".to_string(), events)],
test_ctx(&detect_config, &ctx, &metrics, &store, false, &cell),
)
.await;
assert!((metrics.avoidable_io_ops.get() - 0.0).abs() < f64::EPSILON);
assert!(metrics.total_io_ops.get() > 0.0);
}
#[tokio::test]
async fn process_traces_publishes_green_summary_to_cell() {
let events: Vec<_> = (1..=6)
.map(|i| {
make_normalized(
"t1",
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect();
let metrics = MetricsState::new();
let ctx = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
process_traces(
vec![("t1".to_string(), events)],
test_ctx(&detect_config, &ctx, &metrics, &store, true, &cell),
)
.await;
let snapshot = cell.read().await.clone();
assert!(snapshot.total_io_ops > 0, "cell should reflect the batch");
}
#[test]
fn build_tick_ctx_no_scrapers_yields_borrowed_cow() {
let base = score::carbon::CarbonContext::default();
let ctx = build_tick_ctx(&base, None, 0, None, 0, None, 0, None, 0, None, 0);
assert_matches!(ctx, std::borrow::Cow::Borrowed(_));
assert!(ctx.energy_snapshot.is_none());
}
#[test]
fn build_tick_ctx_scaphandre_only() {
let base = score::carbon::CarbonContext::default();
let scaph = ScaphandreState::new();
scaph.insert_for_test("svc-a".into(), 1e-7, 100);
let ctx = build_tick_ctx(&base, Some(&scaph), 500, None, 0, None, 0, None, 0, None, 0);
let snap = ctx.energy_snapshot.as_ref().unwrap();
assert_eq!(snap.len(), 1);
assert_eq!(snap["svc-a"].model_tag, "scaphandre_rapl");
}
#[test]
fn build_tick_ctx_cloud_only() {
let base = score::carbon::CarbonContext::default();
let cloud = CloudEnergyState::new();
cloud.insert_for_test("svc-b".into(), 2e-7, 100);
let ctx = build_tick_ctx(&base, None, 0, None, 0, None, 0, Some(&cloud), 500, None, 0);
let snap = ctx.energy_snapshot.as_ref().unwrap();
assert_eq!(snap.len(), 1);
assert_eq!(snap["svc-b"].model_tag, "cloud_specpower");
}
#[test]
fn build_tick_ctx_kepler_only() {
let base = score::carbon::CarbonContext::default();
let kepler = KeplerState::new();
kepler.insert_for_test("svc-k".into(), 4e-7, 100);
let ctx = build_tick_ctx(
&base,
None,
0,
Some(&kepler),
500,
None,
0,
None,
0,
None,
0,
);
let snap = ctx.energy_snapshot.as_ref().unwrap();
assert_eq!(snap.len(), 1);
assert_eq!(snap["svc-k"].model_tag, "kepler_ebpf");
}
#[test]
fn build_tick_ctx_redfish_only() {
let base = score::carbon::CarbonContext::default();
let redfish = RedfishState::new();
redfish.insert_for_test("svc-r".into(), 6e-7, 100);
let ctx = build_tick_ctx(
&base,
None,
0,
None,
0,
Some(&redfish),
500,
None,
0,
None,
0,
);
let snap = ctx.energy_snapshot.as_ref().unwrap();
assert_eq!(snap.len(), 1);
assert_eq!(snap["svc-r"].model_tag, "redfish_bmc");
}
#[test]
fn build_tick_ctx_scaphandre_overrides_kepler_overrides_cloud_for_same_service() {
let base = score::carbon::CarbonContext::default();
let scaph = ScaphandreState::new();
scaph.insert_for_test("svc-a".into(), 1e-7, 100);
let kepler = KeplerState::new();
kepler.insert_for_test("svc-a".into(), 2e-7, 100);
kepler.insert_for_test("svc-k".into(), 4e-7, 100);
let cloud = CloudEnergyState::new();
cloud.insert_for_test("svc-a".into(), 5e-7, 100);
cloud.insert_for_test("svc-b".into(), 3e-7, 100);
let ctx = build_tick_ctx(
&base,
Some(&scaph),
500,
Some(&kepler),
500,
None,
0,
Some(&cloud),
500,
None,
0,
);
let snap = ctx.energy_snapshot.as_ref().unwrap();
assert_eq!(snap.len(), 3);
assert_eq!(snap["svc-a"].model_tag, "scaphandre_rapl");
assert!((snap["svc-a"].energy_per_op_kwh - 1e-7).abs() < 1e-15);
assert_eq!(snap["svc-k"].model_tag, "kepler_ebpf");
assert_eq!(snap["svc-b"].model_tag, "cloud_specpower");
}
#[test]
fn build_tick_ctx_stale_entries_filtered() {
let scaph = ScaphandreState::new();
scaph.insert_for_test("stale-svc".into(), 1e-7, 0);
let snap = scaph.snapshot(100, 1);
assert!(
snap.is_empty(),
"entry at time 0 should be stale when now=100, staleness=1"
);
scaph.insert_for_test("fresh-svc".into(), 2e-7, 99);
let snap2 = scaph.snapshot(100, 50);
assert!(snap2.contains_key("fresh-svc"));
assert!(!snap2.contains_key("stale-svc"));
}
fn no_scrapers(base: &Arc<score::carbon::CarbonContext>) -> EnergySources<'_> {
EnergySources {
base_carbon_ctx: base.clone(),
scaphandre_state: None,
scaphandre_staleness_ms: 0,
kepler_state: None,
kepler_staleness_ms: 0,
redfish_state: None,
redfish_staleness_ms: 0,
cloud_state: None,
cloud_staleness_ms: 0,
emaps_state: None,
emaps_staleness_ms: 0,
}
}
fn one_trace_batch(id: &str) -> Vec<(String, Vec<normalize::NormalizedEvent>)> {
vec![(id.to_string(), vec![make_normalized(id, "SELECT 1")])]
}
fn test_window() -> Arc<Mutex<TraceWindow>> {
Arc::new(Mutex::new(TraceWindow::new(WindowConfig {
max_events_per_trace: 1000,
trace_ttl_ms: 30_000,
max_active_traces: std::num::NonZeroUsize::new(10_000).expect("nonzero"),
})))
}
fn test_worker_ctx(
metrics: &Arc<MetricsState>,
findings_store: &Arc<findings_store::FindingsStore>,
green_summary_cell: &Arc<RwLock<GreenSummary>>,
) -> AnalysisWorkerCtx {
AnalysisWorkerCtx {
detect_config: default_detect_config(),
green_enabled: true,
confidence: Confidence::DaemonStaging,
metrics: metrics.clone(),
findings_store: findings_store.clone(),
correlator: None,
green_summary_cell: green_summary_cell.clone(),
archive_tx: None,
}
}
#[tokio::test]
async fn ingestion_not_head_of_line_blocked_by_slow_analysis() {
let metrics = MetricsState::new();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let (work_tx, _work_rx) = mpsc::channel::<AnalysisBatch>(2);
for i in 0..10u32 {
enqueue_for_analysis(
one_trace_batch(&format!("t{i}")),
&sources,
&work_tx,
&metrics,
);
}
assert_eq!(metrics.analysis_queue_depth.get(), 2);
assert_eq!(metrics.analysis_shed_batches_total.get(), 8);
assert_eq!(metrics.analysis_shed_traces_total.get(), 8);
}
#[tokio::test]
async fn saturated_queue_sheds_and_increments_metric() {
let metrics = MetricsState::new();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let (work_tx, _work_rx) = mpsc::channel::<AnalysisBatch>(1);
enqueue_for_analysis(one_trace_batch("t1"), &sources, &work_tx, &metrics);
assert_eq!(metrics.analysis_queue_depth.get(), 1);
assert_eq!(metrics.analysis_shed_batches_total.get(), 0);
let batch = vec![
("t2".to_string(), vec![make_normalized("t2", "SELECT 1")]),
("t3".to_string(), vec![make_normalized("t3", "SELECT 1")]),
("t4".to_string(), vec![make_normalized("t4", "SELECT 1")]),
];
enqueue_for_analysis(batch, &sources, &work_tx, &metrics);
assert_eq!(metrics.analysis_shed_batches_total.get(), 1);
assert_eq!(metrics.analysis_shed_traces_total.get(), 3);
assert_eq!(metrics.analysis_queue_depth.get(), 1);
}
#[tokio::test]
async fn stopped_worker_counts_as_shed() {
let metrics = MetricsState::new();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let (work_tx, work_rx) = mpsc::channel::<AnalysisBatch>(4);
drop(work_rx);
let batch = vec![
("t1".to_string(), vec![make_normalized("t1", "SELECT 1")]),
("t2".to_string(), vec![make_normalized("t2", "SELECT 1")]),
];
enqueue_for_analysis(batch, &sources, &work_tx, &metrics);
assert_eq!(metrics.analysis_shed_batches_total.get(), 1);
assert_eq!(metrics.analysis_shed_traces_total.get(), 2);
assert_eq!(metrics.analysis_queue_depth.get(), 0);
}
#[tokio::test]
async fn correlator_pair_evictions_recorded_in_metrics() {
let metrics = MetricsState::new();
let carbon = empty_carbon_ctx();
let store = findings_store::FindingsStore::new(100);
let detect_config = default_detect_config();
let cell = fresh_green_cell();
let correlator = Mutex::new(detect::correlate_cross::CrossTraceCorrelator::new(
detect::correlate_cross::CorrelationConfig {
enabled: true,
max_tracked_pairs: 1,
lag_threshold_ms: 100_000,
min_co_occurrences: 1,
min_confidence: 0.0,
..Default::default()
},
));
let mut ctx = test_ctx(&detect_config, &carbon, &metrics, &store, true, &cell);
ctx.correlator = Some(&correlator);
let traces: Vec<_> = ["svc-a", "svc-b", "svc-c"]
.iter()
.enumerate()
.map(|(i, svc)| {
let trace_id = format!("t{i}");
let events: Vec<_> = (1..=6)
.map(|p| {
make_normalized_for_service(
&trace_id,
svc,
&format!("SELECT * FROM order_item WHERE order_id = {p}"),
)
})
.collect();
(trace_id, events)
})
.collect();
process_traces(traces, ctx).await;
assert!(
metrics.correlator_pairs_evicted_total.get() > 0,
"pair cap evictions must reach the metric"
);
}
#[test]
fn service_meter_overflow_counts_unattributed_ops() {
let metrics = MetricsState::new();
let mut meter = ServiceMeter {
known_services: std::collections::HashMap::new(),
max_service_cardinality: 2,
service_cap_warned: false,
};
for service in ["svc-a", "svc-b", "svc-c"] {
meter.record(service, &metrics);
meter.record(service, &metrics);
}
assert_eq!(metrics.service_io_ops_overflow_total.get(), 2);
for service in ["svc-a", "svc-b"] {
let count = metrics
.service_io_ops_total
.with_label_values(&[service])
.get();
assert!((count - 2.0).abs() < f64::EPSILON);
}
assert!(meter.service_cap_warned);
}
#[tokio::test]
async fn shed_traces_are_excluded_from_analysis_outputs() {
let metrics = Arc::new(MetricsState::new());
let store = Arc::new(findings_store::FindingsStore::new(100));
let cell = fresh_green_cell();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let (work_tx, work_rx) = mpsc::channel::<AnalysisBatch>(1);
let n_plus_one_events = |trace_id: &str| -> Vec<normalize::NormalizedEvent> {
(1..=6)
.map(|i| {
make_normalized(
trace_id,
&format!("SELECT * FROM order_item WHERE order_id = {i}"),
)
})
.collect()
};
enqueue_for_analysis(
vec![("kept".to_string(), n_plus_one_events("kept"))],
&sources,
&work_tx,
&metrics,
);
enqueue_for_analysis(
vec![("shed".to_string(), n_plus_one_events("shed"))],
&sources,
&work_tx,
&metrics,
);
assert_eq!(metrics.analysis_shed_batches_total.get(), 1);
assert_eq!(metrics.analysis_shed_traces_total.get(), 1);
let worker = tokio::spawn(run_analysis_worker(
work_rx,
test_worker_ctx(&metrics, &store, &cell),
));
drop(work_tx);
worker.await.expect("worker should drain and exit");
assert!((metrics.traces_analyzed_total.get() - 1.0).abs() < f64::EPSILON);
assert!(
!store.by_trace_id("kept").await.is_empty(),
"kept trace must reach the findings store"
);
assert!(
store.by_trace_id("shed").await.is_empty(),
"shed trace must never reach analysis outputs"
);
}
#[tokio::test]
async fn shutdown_drains_window_and_inflight_queue() {
let metrics = Arc::new(MetricsState::new());
let store = Arc::new(findings_store::FindingsStore::new(100));
let cell = fresh_green_cell();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let (work_tx, work_rx) = mpsc::channel::<AnalysisBatch>(4);
let worker = tokio::spawn(run_analysis_worker(
work_rx,
test_worker_ctx(&metrics, &store, &cell),
));
let inflight = vec![
("q1".to_string(), vec![make_normalized("q1", "SELECT 1")]),
("q2".to_string(), vec![make_normalized("q2", "SELECT 1")]),
];
enqueue_for_analysis(inflight, &sources, &work_tx, &metrics);
let window = test_window();
{
let mut w = window.lock().await;
for id in ["w1", "w2", "w3"] {
w.push(make_normalized(id, "SELECT 1"), 0);
}
}
drain_to_worker_and_join(&window, &sources, work_tx, worker, &metrics).await;
assert!((metrics.traces_analyzed_total.get() - 5.0).abs() < f64::EPSILON);
assert_eq!(metrics.analysis_queue_depth.get(), 0);
}
fn dummy_shutdown<'a>(
grpc: &'a tokio::task::JoinHandle<()>,
http: &'a tokio::task::JoinHandle<()>,
) -> ShutdownTargets<'a> {
ShutdownTargets {
energy: EnergyScraperHandles {
scaphandre: None,
kepler: None,
redfish: None,
cloud: None,
emaps: None,
},
listeners: ListenerHandles {
grpc,
http,
json_socket: None,
},
}
}
fn test_loop_cfg() -> EventLoopConfig {
EventLoopConfig {
green_enabled: true,
sampling_rate: 1.0,
evict_ms: 60_000,
confidence: Confidence::DaemonStaging,
analysis_queue_capacity: 1024,
}
}
#[tokio::test]
async fn fail_loud_returns_error_when_worker_dies() {
let metrics = MetricsState::new();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let window = test_window();
let (_tx, mut rx) = mpsc::channel::<Vec<SpanEvent>>(16);
let (work_tx, _work_rx) = mpsc::channel::<AnalysisBatch>(4);
let worker = tokio::spawn(async {});
let grpc = tokio::spawn(std::future::pending::<()>());
let http = tokio::spawn(std::future::pending::<()>());
let result = drive_event_loop(
&mut rx,
&window,
&metrics,
&sources,
dummy_shutdown(&grpc, &http),
test_loop_cfg(),
work_tx,
worker,
std::future::pending::<()>(), )
.await;
assert!(matches!(
result,
Err(crate::DaemonError::AnalysisWorkerStopped)
));
}
#[tokio::test]
async fn graceful_shutdown_drains_window_and_returns_ok() {
let metrics = Arc::new(MetricsState::new());
let store = Arc::new(findings_store::FindingsStore::new(100));
let cell = fresh_green_cell();
let base = Arc::new(empty_carbon_ctx());
let sources = no_scrapers(&base);
let window = test_window();
{
let mut w = window.lock().await;
for id in ["w1", "w2", "w3"] {
w.push(make_normalized(id, "SELECT 1"), current_time_ms());
}
}
let (_tx, mut rx) = mpsc::channel::<Vec<SpanEvent>>(16);
let (work_tx, work_rx) = mpsc::channel::<AnalysisBatch>(4);
let worker = tokio::spawn(run_analysis_worker(
work_rx,
test_worker_ctx(&metrics, &store, &cell),
));
let grpc = tokio::spawn(std::future::pending::<()>());
let http = tokio::spawn(std::future::pending::<()>());
let (sd_tx, sd_rx) = tokio::sync::oneshot::channel::<()>();
sd_tx.send(()).expect("receiver alive");
let shutdown_fut = async move {
let _ = sd_rx.await;
};
let result = drive_event_loop(
&mut rx,
&window,
&metrics,
&sources,
dummy_shutdown(&grpc, &http),
test_loop_cfg(),
work_tx,
worker,
shutdown_fut,
)
.await;
assert!(result.is_ok());
assert!((metrics.traces_analyzed_total.get() - 3.0).abs() < f64::EPSILON);
}
}