use std::collections::BTreeMap;
use crate::correlate::Trace;
use super::carbon::{
CarbonContext, CarbonReport, ENERGY_PER_IO_OP_KWH, GENERIC_PUE, IntensitySource,
RegionBreakdown, energy_coefficient, extract_hostname, hourly_profile_for_region_lower,
is_valid_region_id, lookup_region_lower, per_op_gco2, resolve_region,
};
use super::carbon_profiles;
use super::region_breakdown::{
RegionAccumulator, build_region_breakdowns, finalize_carbon_report, select_co2_model_tag,
sort_regions_by_co2_desc,
};
const MAX_REGIONS: usize = 256;
#[derive(Default)]
struct CarbonRunState {
per_region: BTreeMap<String, RegionAccumulator>,
per_service: BTreeMap<String, ServiceCarbonAccumulator>,
unknown_ops: usize,
overflow_warned: bool,
total_transport_gco2: f64,
multi_region_active: bool,
}
#[derive(Default)]
pub(super) struct ServiceCarbonAccumulator {
pub energy_kwh: f64,
pub operational_gco2: f64,
pub region: String,
pub measured_model: Option<&'static str>,
pub measured_ops: u64,
pub total_ops: u64,
}
#[inline]
fn higher_fidelity_measured(
current: Option<&'static str>,
incoming: Option<&'static str>,
) -> Option<&'static str> {
#[inline]
fn rank(tag: &str) -> u8 {
match tag {
super::carbon::CO2_MODEL_SCAPHANDRE => 2,
super::carbon::CO2_MODEL_CLOUD_SPECPOWER => 1,
_ => 0,
}
}
match (current, incoming) {
(None, x) | (x, None) => x,
(Some(a), Some(b)) => Some(if rank(a) >= rank(b) { a } else { b }),
}
}
struct SpanRegionContext<'a> {
region_key: Option<String>,
region_ref: &'a str,
custom_profile: Option<&'a carbon_profiles::HourlyProfile>,
embedded_profile: Option<carbon_profiles::HourlyProfileRef<'static>>,
annual_intensity: f64,
pue: f64,
}
pub(super) struct CarbonComputeOutputs {
pub report: Option<CarbonReport>,
pub regions: Vec<RegionBreakdown>,
pub multi_region_active: bool,
pub per_service: BTreeMap<String, ServiceCarbonAccumulator>,
pub window_model: &'static str,
}
pub(super) fn compute_carbon_report(
traces: &[Trace],
ctx: &CarbonContext,
total_io_ops: usize,
avoidable_io_ops: usize,
) -> CarbonComputeOutputs {
let mut state = CarbonRunState {
multi_region_active: !ctx.service_regions.is_empty(),
..Default::default()
};
if traces.is_empty() {
return CarbonComputeOutputs {
report: None,
regions: Vec::new(),
multi_region_active: state.multi_region_active,
per_service: BTreeMap::new(),
window_model: "",
};
}
for trace in traces {
for span in &trace.spans {
process_span_for_carbon(&mut state, span, ctx);
}
}
let (mut regions, flags, operational_gco2) =
build_region_breakdowns(state.per_region, state.unknown_ops);
let model = select_co2_model_tag(flags);
let report = finalize_carbon_report(
traces.len(),
operational_gco2,
state.total_transport_gco2,
total_io_ops,
avoidable_io_ops,
state.unknown_ops,
ctx.embodied_per_request_gco2,
model,
);
sort_regions_by_co2_desc(&mut regions);
CarbonComputeOutputs {
report: Some(report),
regions,
multi_region_active: state.multi_region_active,
per_service: state.per_service,
window_model: model,
}
}
fn process_span_for_carbon(
state: &mut CarbonRunState,
span: &crate::normalize::NormalizedEvent,
ctx: &CarbonContext,
) {
if span.event.cloud_region.is_some() {
state.multi_region_active = true;
}
let Some(region_ctx) = resolve_span_region(span, ctx, state) else {
return;
};
let intensity = resolve_span_intensity(span, ®ion_ctx, ctx);
let (energy_kwh, measured_model, calibrated) = resolve_span_energy(span, ctx);
let op_co2 = per_op_gco2(energy_kwh, intensity.value, region_ctx.pue);
let region_ref = region_ctx.region_ref;
let pue = region_ctx.pue;
let intensity_value = intensity.value;
let service_key = span.event.service.as_ref();
let service_region = region_ctx
.region_key
.as_deref()
.unwrap_or(region_ref)
.to_string();
accumulate_span_into_region(
&mut state.per_region,
region_ctx,
op_co2,
&intensity,
measured_model,
calibrated,
);
let svc = state
.per_service
.entry(service_key.to_string())
.or_insert_with(|| ServiceCarbonAccumulator {
energy_kwh: 0.0,
operational_gco2: 0.0,
region: service_region,
measured_model: None,
measured_ops: 0,
total_ops: 0,
});
svc.energy_kwh += energy_kwh;
svc.operational_gco2 += op_co2;
svc.measured_model = higher_fidelity_measured(svc.measured_model, measured_model);
svc.total_ops += 1;
if measured_model.is_some() {
svc.measured_ops += 1;
}
state.total_transport_gco2 +=
network_transport_contribution(span, region_ref, intensity_value, pue, ctx);
}
fn resolve_span_region<'a>(
span: &'a crate::normalize::NormalizedEvent,
ctx: &'a CarbonContext,
state: &mut CarbonRunState,
) -> Option<SpanRegionContext<'a>> {
let Some(region_ref) = resolve_region(&span.event, ctx) else {
state.unknown_ops += 1;
return None;
};
debug_assert!(
is_valid_region_id(region_ref),
"unvalidated region '{region_ref}' reached compute_carbon_report; \
ingestion boundary should have sanitized it"
);
let needs_lowercase = region_ref.bytes().any(|b| b.is_ascii_uppercase());
let region_key: Option<String> = if needs_lowercase {
Some(region_ref.to_ascii_lowercase())
} else {
None
};
let region_key_borrow: &str = region_key.as_deref().unwrap_or(region_ref);
if state.per_region.len() >= MAX_REGIONS && !state.per_region.contains_key(region_key_borrow) {
state.unknown_ops += 1;
if !state.overflow_warned {
tracing::debug!(
"Region cardinality cap ({MAX_REGIONS}) exceeded; \
additional distinct regions folded into 'unknown'."
);
state.overflow_warned = true;
}
return None;
}
let custom_profile = ctx
.custom_hourly_profiles
.as_ref()
.and_then(|m| m.get(region_key_borrow));
let has_realtime = ctx
.real_time_intensity
.as_ref()
.is_some_and(|rt| rt.contains_key(region_key_borrow));
let (annual_intensity, pue) = lookup_region_lower(region_key_borrow).unwrap_or_else(|| {
let fallback_pue = if custom_profile.is_some() || has_realtime {
GENERIC_PUE
} else {
0.0
};
(0.0, fallback_pue)
});
let embedded_profile = if custom_profile.is_none() {
hourly_profile_for_region_lower(region_key_borrow)
} else {
None
};
Some(SpanRegionContext {
region_key,
region_ref,
custom_profile,
embedded_profile,
annual_intensity,
pue,
})
}
struct SpanIntensity<'a> {
value: f64,
source: IntensitySource,
realtime_estimated: Option<bool>,
realtime_estimation_method: Option<&'a str>,
}
fn resolve_span_intensity<'a>(
span: &crate::normalize::NormalizedEvent,
region_ctx: &SpanRegionContext<'_>,
ctx: &'a CarbonContext,
) -> SpanIntensity<'a> {
let region_key_borrow: &str = region_ctx
.region_key
.as_deref()
.unwrap_or(region_ctx.region_ref);
let real_time_val = ctx
.real_time_intensity
.as_ref()
.and_then(|rt| rt.get(region_key_borrow));
if let Some(entry) = real_time_val {
return SpanIntensity {
value: entry.gco2_per_kwh,
source: IntensitySource::RealTime,
realtime_estimated: entry.is_estimated,
realtime_estimation_method: entry.estimation_method.as_deref(),
};
}
let region_has_hourly = ctx.use_hourly_profiles
&& (region_ctx.custom_profile.is_some() || region_ctx.embedded_profile.is_some());
if !region_has_hourly {
return SpanIntensity {
value: region_ctx.annual_intensity,
source: IntensitySource::Annual,
realtime_estimated: None,
realtime_estimation_method: None,
};
}
let Some(hour) = crate::time::parse_utc_hour(&span.event.timestamp) else {
return SpanIntensity {
value: region_ctx.annual_intensity,
source: IntensitySource::Annual,
realtime_estimated: None,
realtime_estimation_method: None,
};
};
let month_opt = crate::time::parse_utc_month(&span.event.timestamp);
if let Some(cp) = region_ctx.custom_profile {
let val = cp.intensity_at(hour, month_opt);
let src = if cp.is_monthly() {
IntensitySource::MonthlyHourly
} else {
IntensitySource::Hourly
};
return SpanIntensity {
value: val,
source: src,
realtime_estimated: None,
realtime_estimation_method: None,
};
}
if let Some(ep) = region_ctx.embedded_profile {
let val = ep.intensity_at(hour, month_opt);
let src = if ep.is_monthly() {
IntensitySource::MonthlyHourly
} else {
IntensitySource::Hourly
};
return SpanIntensity {
value: val,
source: src,
realtime_estimated: None,
realtime_estimation_method: None,
};
}
debug_assert!(false, "region_has_hourly was true but no profile found");
SpanIntensity {
value: region_ctx.annual_intensity,
source: IntensitySource::Annual,
realtime_estimated: None,
realtime_estimation_method: None,
}
}
fn resolve_span_energy(
span: &crate::normalize::NormalizedEvent,
ctx: &CarbonContext,
) -> (f64, Option<&'static str>, bool) {
let mut proxy_energy_kwh = if ctx.per_operation_coefficients {
ENERGY_PER_IO_OP_KWH * energy_coefficient(&span.event)
} else {
ENERGY_PER_IO_OP_KWH
};
let calibrated = if let Some(ref cal) = ctx.calibration {
if let Some(factor) = cal.factor_for(&span.event.service) {
proxy_energy_kwh *= factor;
true
} else {
false
}
} else {
false
};
let (energy_kwh, measured_model) = match &ctx.energy_snapshot {
Some(snapshot) => match snapshot.get(span.event.service.as_ref()) {
Some(entry) => (entry.energy_per_op_kwh, Some(entry.model_tag)),
None => (proxy_energy_kwh, None),
},
None => (proxy_energy_kwh, None),
};
(energy_kwh, measured_model, calibrated)
}
fn accumulate_span_into_region(
per_region: &mut BTreeMap<String, RegionAccumulator>,
region_ctx: SpanRegionContext<'_>,
op_co2: f64,
intensity: &SpanIntensity<'_>,
measured_model: Option<&'static str>,
calibrated: bool,
) {
let SpanRegionContext {
region_key,
region_ref,
..
} = region_ctx;
let acc = if let Some(lowered) = region_key {
per_region.entry(lowered).or_default()
} else if let Some(existing) = per_region.get_mut(region_ref) {
existing
} else {
per_region.entry(region_ref.to_string()).or_default()
};
acc.co2_gco2 += op_co2;
acc.total_ops += 1;
acc.intensity_sum_per_op += intensity.value;
if intensity.source > acc.max_intensity_source {
acc.max_intensity_source = intensity.source;
}
if intensity.source == IntensitySource::RealTime {
acc.realtime_estimated = intensity.realtime_estimated;
if acc.realtime_estimation_method.as_deref() != intensity.realtime_estimation_method {
acc.realtime_estimation_method =
intensity.realtime_estimation_method.map(str::to_string);
}
}
match measured_model {
Some(super::carbon::CO2_MODEL_SCAPHANDRE) => acc.any_scaphandre = true,
Some(super::carbon::CO2_MODEL_CLOUD_SPECPOWER) => acc.any_cloud_specpower = true,
_ => {
if calibrated {
acc.any_calibrated = true;
}
}
}
}
fn network_transport_contribution(
span: &crate::normalize::NormalizedEvent,
caller_region: &str,
intensity_used: f64,
pue: f64,
ctx: &CarbonContext,
) -> f64 {
if !ctx.include_network_transport || span.event.event_type != crate::event::EventType::HttpOut {
return 0.0;
}
let Some(bytes) = span.event.response_size_bytes else {
return 0.0;
};
let callee_region = extract_hostname(&span.event.target)
.and_then(|host| {
if host.bytes().any(|b| b.is_ascii_uppercase()) {
ctx.service_regions.get(&host.to_ascii_lowercase())
} else {
ctx.service_regions.get(host)
}
})
.map(String::as_str);
let Some(callee) = callee_region else {
return 0.0;
};
if caller_region.eq_ignore_ascii_case(callee) {
return 0.0;
}
let transport_energy = bytes as f64 * ctx.network_energy_per_byte_kwh;
transport_energy * intensity_used * pue
}