use std::collections::BTreeMap;
use crate::correlate::Trace;
use super::carbon::{
CarbonContext, CarbonReport, ENERGY_PER_IO_OP_KWH, GENERIC_PUE, IntensitySource,
RegionBreakdown, energy_coefficient, extract_hostname, hourly_profile_for_region_lower,
is_valid_region_id, lookup_region_lower, per_op_gco2, resolve_region,
};
use super::carbon_profiles;
use super::region_breakdown::{
RegionAccumulator, build_region_breakdowns, finalize_carbon_report, select_co2_model_tag,
sort_regions_by_co2_desc,
};
const MAX_REGIONS: usize = 256;
#[derive(Default)]
struct CarbonRunState {
per_region: BTreeMap<String, RegionAccumulator>,
unknown_ops: usize,
overflow_warned: bool,
total_transport_gco2: f64,
multi_region_active: bool,
}
struct SpanRegionContext<'a> {
region_key: Option<String>,
region_ref: &'a str,
custom_profile: Option<&'a carbon_profiles::HourlyProfile>,
embedded_profile: Option<carbon_profiles::HourlyProfileRef<'static>>,
annual_intensity: f64,
pue: f64,
}
pub(super) fn compute_carbon_report(
traces: &[Trace],
ctx: &CarbonContext,
total_io_ops: usize,
avoidable_io_ops: usize,
) -> (Option<CarbonReport>, Vec<RegionBreakdown>, bool) {
let mut state = CarbonRunState {
multi_region_active: !ctx.service_regions.is_empty(),
..Default::default()
};
if traces.is_empty() {
return (None, Vec::new(), state.multi_region_active);
}
for trace in traces {
for span in &trace.spans {
process_span_for_carbon(&mut state, span, ctx);
}
}
let (mut regions, flags, operational_gco2) =
build_region_breakdowns(state.per_region, state.unknown_ops);
let model = select_co2_model_tag(flags);
let report = finalize_carbon_report(
traces.len(),
operational_gco2,
state.total_transport_gco2,
total_io_ops,
avoidable_io_ops,
state.unknown_ops,
ctx.embodied_per_request_gco2,
model,
);
sort_regions_by_co2_desc(&mut regions);
(Some(report), regions, state.multi_region_active)
}
fn process_span_for_carbon(
state: &mut CarbonRunState,
span: &crate::normalize::NormalizedEvent,
ctx: &CarbonContext,
) {
if span.event.cloud_region.is_some() {
state.multi_region_active = true;
}
let Some(region_ctx) = resolve_span_region(span, ctx, state) else {
return;
};
let (intensity_used, span_source) = resolve_span_intensity(span, ®ion_ctx, ctx);
let (energy_kwh, measured_model, calibrated) = resolve_span_energy(span, ctx);
let op_co2 = per_op_gco2(energy_kwh, intensity_used, region_ctx.pue);
let region_ref = region_ctx.region_ref;
let pue = region_ctx.pue;
accumulate_span_into_region(
&mut state.per_region,
region_ctx,
op_co2,
intensity_used,
span_source,
measured_model,
calibrated,
);
state.total_transport_gco2 +=
network_transport_contribution(span, region_ref, intensity_used, pue, ctx);
}
fn resolve_span_region<'a>(
span: &'a crate::normalize::NormalizedEvent,
ctx: &'a CarbonContext,
state: &mut CarbonRunState,
) -> Option<SpanRegionContext<'a>> {
let Some(region_ref) = resolve_region(&span.event, ctx) else {
state.unknown_ops += 1;
return None;
};
debug_assert!(
is_valid_region_id(region_ref),
"unvalidated region '{region_ref}' reached compute_carbon_report; \
ingestion boundary should have sanitized it"
);
let needs_lowercase = region_ref.bytes().any(|b| b.is_ascii_uppercase());
let region_key: Option<String> = if needs_lowercase {
Some(region_ref.to_ascii_lowercase())
} else {
None
};
let region_key_borrow: &str = region_key.as_deref().unwrap_or(region_ref);
if state.per_region.len() >= MAX_REGIONS && !state.per_region.contains_key(region_key_borrow) {
state.unknown_ops += 1;
if !state.overflow_warned {
tracing::debug!(
"Region cardinality cap ({MAX_REGIONS}) exceeded; \
additional distinct regions folded into 'unknown'."
);
state.overflow_warned = true;
}
return None;
}
let custom_profile = ctx
.custom_hourly_profiles
.as_ref()
.and_then(|m| m.get(region_key_borrow));
let has_realtime = ctx
.real_time_intensity
.as_ref()
.is_some_and(|rt| rt.contains_key(region_key_borrow));
let (annual_intensity, pue) = lookup_region_lower(region_key_borrow).unwrap_or_else(|| {
let fallback_pue = if custom_profile.is_some() || has_realtime {
GENERIC_PUE
} else {
0.0
};
(0.0, fallback_pue)
});
let embedded_profile = if custom_profile.is_none() {
hourly_profile_for_region_lower(region_key_borrow)
} else {
None
};
Some(SpanRegionContext {
region_key,
region_ref,
custom_profile,
embedded_profile,
annual_intensity,
pue,
})
}
fn resolve_span_intensity(
span: &crate::normalize::NormalizedEvent,
region_ctx: &SpanRegionContext<'_>,
ctx: &CarbonContext,
) -> (f64, IntensitySource) {
let region_key_borrow: &str = region_ctx
.region_key
.as_deref()
.unwrap_or(region_ctx.region_ref);
let real_time_val = ctx
.real_time_intensity
.as_ref()
.and_then(|rt| rt.get(region_key_borrow));
if let Some(&rt_intensity) = real_time_val {
return (rt_intensity, IntensitySource::RealTime);
}
let region_has_hourly = ctx.use_hourly_profiles
&& (region_ctx.custom_profile.is_some() || region_ctx.embedded_profile.is_some());
if !region_has_hourly {
return (region_ctx.annual_intensity, IntensitySource::Annual);
}
let Some(hour) = crate::time::parse_utc_hour(&span.event.timestamp) else {
return (region_ctx.annual_intensity, IntensitySource::Annual);
};
let month_opt = crate::time::parse_utc_month(&span.event.timestamp);
if let Some(cp) = region_ctx.custom_profile {
let val = cp.intensity_at(hour, month_opt);
let src = if cp.is_monthly() {
IntensitySource::MonthlyHourly
} else {
IntensitySource::Hourly
};
return (val, src);
}
if let Some(ep) = region_ctx.embedded_profile {
let val = ep.intensity_at(hour, month_opt);
let src = if ep.is_monthly() {
IntensitySource::MonthlyHourly
} else {
IntensitySource::Hourly
};
return (val, src);
}
debug_assert!(false, "region_has_hourly was true but no profile found");
(region_ctx.annual_intensity, IntensitySource::Annual)
}
fn resolve_span_energy(
span: &crate::normalize::NormalizedEvent,
ctx: &CarbonContext,
) -> (f64, Option<&'static str>, bool) {
let mut proxy_energy_kwh = if ctx.per_operation_coefficients {
ENERGY_PER_IO_OP_KWH * energy_coefficient(&span.event)
} else {
ENERGY_PER_IO_OP_KWH
};
let calibrated = if let Some(ref cal) = ctx.calibration {
if let Some(factor) = cal.factor_for(&span.event.service) {
proxy_energy_kwh *= factor;
true
} else {
false
}
} else {
false
};
let (energy_kwh, measured_model) = match &ctx.energy_snapshot {
Some(snapshot) => match snapshot.get(&span.event.service) {
Some(entry) => (entry.energy_per_op_kwh, Some(entry.model_tag)),
None => (proxy_energy_kwh, None),
},
None => (proxy_energy_kwh, None),
};
(energy_kwh, measured_model, calibrated)
}
fn accumulate_span_into_region(
per_region: &mut BTreeMap<String, RegionAccumulator>,
region_ctx: SpanRegionContext<'_>,
op_co2: f64,
intensity_used: f64,
span_source: IntensitySource,
measured_model: Option<&'static str>,
calibrated: bool,
) {
let SpanRegionContext {
region_key,
region_ref,
..
} = region_ctx;
let acc = if let Some(lowered) = region_key {
per_region.entry(lowered).or_default()
} else if let Some(existing) = per_region.get_mut(region_ref) {
existing
} else {
per_region.entry(region_ref.to_string()).or_default()
};
acc.co2_gco2 += op_co2;
acc.total_ops += 1;
acc.intensity_sum_per_op += intensity_used;
if span_source > acc.max_intensity_source {
acc.max_intensity_source = span_source;
}
match measured_model {
Some(super::carbon::CO2_MODEL_SCAPHANDRE) => acc.any_scaphandre = true,
Some(super::carbon::CO2_MODEL_CLOUD_SPECPOWER) => acc.any_cloud_specpower = true,
_ => {
if calibrated {
acc.any_calibrated = true;
}
}
}
}
fn network_transport_contribution(
span: &crate::normalize::NormalizedEvent,
caller_region: &str,
intensity_used: f64,
pue: f64,
ctx: &CarbonContext,
) -> f64 {
if !ctx.include_network_transport || span.event.event_type != crate::event::EventType::HttpOut {
return 0.0;
}
let Some(bytes) = span.event.response_size_bytes else {
return 0.0;
};
let callee_region = extract_hostname(&span.event.target)
.and_then(|host| {
if host.bytes().any(|b| b.is_ascii_uppercase()) {
ctx.service_regions.get(&host.to_ascii_lowercase())
} else {
ctx.service_regions.get(host)
}
})
.map(String::as_str);
let Some(callee) = callee_region else {
return 0.0;
};
if caller_region.eq_ignore_ascii_case(callee) {
return 0.0;
}
let transport_energy = bytes as f64 * ctx.network_energy_per_byte_kwh;
transport_energy * intensity_used * pue
}