use crate::config::Config;
use crate::correlate;
use crate::detect;
use crate::detect::{Confidence, DetectConfig};
use crate::event::SpanEvent;
use crate::normalize;
use crate::report::{Analysis, Report};
use crate::score;
#[must_use]
pub fn analyze(events: Vec<SpanEvent>, config: &Config) -> Report {
analyze_with_traces(events, config).0
}
#[must_use]
pub fn analyze_with_traces(
events: Vec<SpanEvent>,
config: &Config,
) -> (Report, Vec<correlate::Trace>) {
let start = std::time::Instant::now();
let event_count = events.len();
let normalized = normalize::normalize_all(events);
let traces = correlate::correlate(normalized);
let trace_count = traces.len();
let detect_config = DetectConfig::from(config);
let findings = detect::run_full_detection(&traces, &detect_config);
let (mut findings, green_summary, per_endpoint_io_ops) = if config.green.enabled {
let carbon_ctx = config.carbon_context();
score::score_green(&traces, findings, Some(&carbon_ctx))
} else {
let total_io_ops = traces.iter().map(|t| t.spans.len()).sum();
let per_endpoint_io_ops = crate::report::compute_per_endpoint_io_ops(&traces);
(
findings,
crate::report::GreenSummary::disabled(total_io_ops),
per_endpoint_io_ops,
)
};
detect::sort_findings(&mut findings);
detect::apply_confidence(&mut findings, Confidence::CiBatch);
crate::acknowledgments::enrich_with_signatures(&mut findings);
let quality_gate = crate::quality_gate::evaluate(&findings, &green_summary, config);
let report = Report {
analysis: Analysis {
duration_ms: start.elapsed().as_millis() as u64,
events_processed: event_count,
traces_analyzed: trace_count,
},
findings,
green_summary,
quality_gate,
per_endpoint_io_ops,
correlations: vec![],
warnings: vec![],
warning_details: vec![],
acknowledged_findings: vec![],
binary_version: env!("CARGO_PKG_VERSION").to_string(),
};
(report, traces)
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::event::SpanEvent;
#[test]
fn empty_pipeline_produces_empty_report() {
let config = Config::default();
let report = analyze(vec![], &config);
assert!(report.findings.is_empty());
assert_eq!(report.analysis.events_processed, 0);
assert_eq!(report.analysis.traces_analyzed, 0);
assert!(report.quality_gate.passed);
}
#[test]
fn waste_dedup_no_double_count() {
use crate::test_helpers::{make_sql_event, make_sql_series_events};
let mut events: Vec<SpanEvent> = make_sql_series_events(5);
for i in 6..=7 {
events.push(make_sql_event(
"trace-1",
&format!("span-{i}"),
"SELECT * FROM order_item WHERE order_id = 1",
&format!("2025-07-10T14:32:01.{:03}Z", i * 40),
));
}
let config = Config::default();
let report = analyze(events, &config);
assert!(!report.findings.is_empty());
assert_eq!(report.green_summary.avoidable_io_ops, 6);
}
#[test]
fn zero_events_waste_ratio_is_zero() {
let config = Config::default();
let report = analyze(vec![], &config);
assert!((report.green_summary.io_waste_ratio - 0.0).abs() < f64::EPSILON);
assert_eq!(report.green_summary.total_io_ops, 0);
assert_eq!(report.green_summary.avoidable_io_ops, 0);
}
#[test]
fn clean_events_zero_waste_ratio() {
use crate::test_helpers::make_sql_event;
let events = vec![
make_sql_event(
"trace-1",
"span-1",
"SELECT * FROM users WHERE id = 1",
"2025-07-10T14:32:01.000Z",
),
make_sql_event(
"trace-1",
"span-2",
"SELECT * FROM orders WHERE id = 2",
"2025-07-10T14:32:01.050Z",
),
make_sql_event(
"trace-1",
"span-3",
"SELECT * FROM products WHERE id = 3",
"2025-07-10T14:32:01.100Z",
),
make_sql_event(
"trace-1",
"span-4",
"INSERT INTO logs (msg) VALUES ('ok')",
"2025-07-10T14:32:01.150Z",
),
];
let config = Config::default();
let report = analyze(events, &config);
assert!(report.findings.is_empty());
assert_eq!(report.green_summary.total_io_ops, 4);
assert_eq!(report.green_summary.avoidable_io_ops, 0);
assert!((report.green_summary.io_waste_ratio - 0.0).abs() < f64::EPSILON);
}
#[test]
fn pipeline_with_findings_computes_green_summary() {
use crate::test_helpers::make_n_plus_one_events;
let events = make_n_plus_one_events();
let config = Config::default();
let report = analyze(events, &config);
assert!(!report.findings.is_empty());
assert_eq!(report.green_summary.avoidable_io_ops, 5);
assert!((report.green_summary.io_waste_ratio - 5.0_f64 / 6.0).abs() < f64::EPSILON);
assert_eq!(report.green_summary.total_io_ops, 6);
}
#[test]
fn dedup_across_traces() {
use crate::test_helpers::make_sql_event;
let mut events = Vec::new();
for i in 1..=3 {
events.push(make_sql_event(
"trace-A",
&format!("span-a{i}"),
"SELECT * FROM order_item WHERE order_id = 42",
&format!("2025-07-10T14:32:01.{:03}Z", i * 50),
));
}
for i in 1..=3 {
events.push(make_sql_event(
"trace-B",
&format!("span-b{i}"),
"SELECT * FROM orders WHERE user_id = 7",
&format!("2025-07-10T14:32:02.{:03}Z", i * 50),
));
}
let config = Config::default();
let report = analyze(events, &config);
assert_eq!(report.green_summary.avoidable_io_ops, 4);
assert_eq!(report.green_summary.total_io_ops, 6);
}
#[test]
fn pipeline_with_green_default_region_produces_co2() {
use crate::test_helpers::make_n_plus_one_events;
let events = make_n_plus_one_events();
let config = Config {
green: crate::config::GreenConfig {
default_region: Some("eu-west-3".to_string()),
..crate::config::GreenConfig::default()
},
..Config::default()
};
let report = analyze(events, &config);
let co2 = report
.green_summary
.co2
.as_ref()
.expect("co2 should be Some when default_region is configured");
assert!(co2.total.mid > 0.0);
assert!(co2.avoidable.mid > 0.0);
}
#[test]
fn pipeline_empty_traces_no_co2() {
let config = Config::default();
let report = analyze(vec![], &config);
assert!(
report.green_summary.co2.is_none(),
"co2 should be None for empty traces"
);
assert!(report.green_summary.regions.is_empty());
}
#[test]
fn green_disabled_skips_scoring() {
use crate::test_helpers::make_n_plus_one_events;
let events = make_n_plus_one_events();
let config = Config {
green: crate::config::GreenConfig {
enabled: false,
..crate::config::GreenConfig::default()
},
..Config::default()
};
let report = analyze(events, &config);
assert!(!report.findings.is_empty());
assert_eq!(report.green_summary.avoidable_io_ops, 0);
assert!((report.green_summary.io_waste_ratio - 0.0).abs() < f64::EPSILON);
assert!(report.green_summary.top_offenders.is_empty());
assert!(report.green_summary.co2.is_none());
assert!(report.green_summary.regions.is_empty());
assert_eq!(report.green_summary.total_io_ops, 6);
for f in &report.findings {
assert!(f.green_impact.is_none());
}
}
#[test]
fn green_disabled_with_region_still_no_co2() {
let config = Config {
green: crate::config::GreenConfig {
enabled: false,
default_region: Some("eu-west-3".to_string()),
..crate::config::GreenConfig::default()
},
..Config::default()
};
let report = analyze(vec![], &config);
assert!(report.green_summary.co2.is_none());
}
#[test]
fn batch_analyze_stamps_ci_batch_confidence() {
use crate::test_helpers::make_n_plus_one_events;
let events = make_n_plus_one_events();
let config = Config {
daemon: crate::config::DaemonConfig {
environment: crate::config::DaemonEnvironment::Production,
..crate::config::DaemonConfig::default()
},
..Config::default()
};
let report = analyze(events, &config);
assert!(!report.findings.is_empty());
for f in &report.findings {
assert_eq!(f.confidence, Confidence::CiBatch);
}
}
#[test]
fn sharded_detection_matches_single_instance() {
use std::collections::{HashMap, HashSet};
const NUM_SHARDS: u64 = 2;
let traces_data = [
("trace-A", "svc-alpha"),
("trace-B", "svc-beta"),
("trace-C", "svc-gamma"),
("trace-D", "svc-delta"),
];
let mut all_events: Vec<SpanEvent> = Vec::new();
for (trace_id, service) in &traces_data {
for i in 0..6 {
let ts = format!("2025-07-10T14:32:01.{i:03}Z");
let mut ev = crate::test_helpers::make_sql_event(
trace_id,
&format!("span-{trace_id}-{i}"),
&format!("SELECT * FROM orders WHERE id = {}", 100 + i),
&ts,
);
ev.service = Arc::from(*service);
all_events.push(ev);
}
}
let config = Config::default();
let baseline = analyze(all_events.clone(), &config);
assert!(
baseline.findings.len() >= 4,
"expected at least 4 findings (one N+1 per trace), got {}",
baseline.findings.len()
);
let mut shards: Vec<Vec<SpanEvent>> = vec![vec![]; NUM_SHARDS as usize];
for event in &all_events {
let hash = fnv1a_hash(event.trace_id.as_bytes());
let bucket = (hash % NUM_SHARDS) as usize;
shards[bucket].push(event.clone());
}
for (i, shard) in shards.iter().enumerate() {
assert!(
!shard.is_empty(),
"shard {i} is empty, hash distribution failed"
);
}
let mut trace_to_shard: HashMap<String, usize> = HashMap::new();
for (i, shard) in shards.iter().enumerate() {
for ev in shard {
if let Some(&prev) = trace_to_shard.get(&ev.trace_id) {
assert_eq!(
prev, i,
"trace {} split across shards {prev} and {i}",
ev.trace_id
);
}
trace_to_shard.insert(ev.trace_id.clone(), i);
}
}
let mut sharded_findings = Vec::new();
for shard in shards {
let report = analyze(shard, &config);
sharded_findings.extend(report.findings);
}
let baseline_set: HashSet<(String, String)> = baseline
.findings
.iter()
.filter(|f| {
!matches!(
f.finding_type,
detect::FindingType::SlowSql | detect::FindingType::SlowHttp
)
})
.map(|f| (f.trace_id.clone(), f.finding_type.as_str().to_string()))
.collect();
let sharded_set: HashSet<(String, String)> = sharded_findings
.iter()
.filter(|f| {
!matches!(
f.finding_type,
detect::FindingType::SlowSql | detect::FindingType::SlowHttp
)
})
.map(|f| (f.trace_id.clone(), f.finding_type.as_str().to_string()))
.collect();
assert_eq!(
baseline_set, sharded_set,
"sharded findings differ from baseline.\n\
baseline: {baseline_set:?}\n\
sharded: {sharded_set:?}"
);
}
fn fnv1a_hash(bytes: &[u8]) -> u64 {
let mut hash: u64 = 0xcbf2_9ce4_8422_2325;
for &b in bytes {
hash ^= u64::from(b);
hash = hash.wrapping_mul(0x0100_0000_01b3);
}
hash
}
}