use crate::findings::{
Finding, RecommendedAction, RootCauseGroup, SignalClass, ThreatCategory, VerdictCalibrationNote,
};
pub(crate) const CALIBRATED_RULE_IDS: &[&str] = &[
"DECLARED_PERMISSION_NETWORK_ACCESS",
"CAPABILITY_PERMISSION_MISMATCH",
"INTERNAL_NETWORK_ACCESS",
"MCP_NO_AUTH_MODEL",
"OFFICIAL_MCP_NO_AUTH_REMOTE_ENDPOINT",
];
#[derive(Debug, Clone)]
pub(crate) struct VerdictCalibration {
pub(crate) root_cause_groups: Vec<RootCauseGroup>,
pub(crate) risk_adjustment: i32,
pub(crate) notes: Vec<VerdictCalibrationNote>,
}
struct CalibrationRule {
trigger_rule_ids: &'static [&'static str],
rule_ids: &'static [&'static str],
risk_delta: i32,
reclassify_signal: bool,
effect_downgraded: &'static str,
effect_unchanged: &'static str,
rationale: &'static str,
note_rule_id: &'static str,
}
const TIER1_DECLARED_NETWORK_ROLLBACK: i32 = -10;
const TIER2_CAPABILITY_MISMATCH_ROLLBACK: i32 = -8;
const TIER3_INTERNAL_NETWORK_ROLLBACK: i32 = -12;
const TIER4_REMOTE_MCP_NO_AUTH_ROLLBACK: i32 = -6;
const CALIBRATION_PIPELINE: &[CalibrationRule] = &[
CalibrationRule {
trigger_rule_ids: &["DECLARED_PERMISSION_NETWORK_ACCESS"],
rule_ids: &["DECLARED_PERMISSION_NETWORK_ACCESS"],
risk_delta: TIER1_DECLARED_NETWORK_ROLLBACK,
reclassify_signal: false,
effect_downgraded: "downgraded_to_context",
effect_unchanged: "remains_context_only",
rationale: "Declared network access remains useful for blast-radius reporting, but it no longer drives package escalation without corroborating behavior.",
note_rule_id: "DECLARED_PERMISSION_NETWORK_ACCESS",
},
CalibrationRule {
trigger_rule_ids: &["CAPABILITY_PERMISSION_MISMATCH"],
rule_ids: &["CAPABILITY_PERMISSION_MISMATCH"],
risk_delta: TIER2_CAPABILITY_MISMATCH_ROLLBACK,
reclassify_signal: false,
effect_downgraded: "downgraded_to_context",
effect_unchanged: "remains_context_only",
rationale: "Capability mismatch is retained as an explainability signal, but it no longer escalates verdicts without stronger intent or behavioral evidence.",
note_rule_id: "CAPABILITY_PERMISSION_MISMATCH",
},
CalibrationRule {
trigger_rule_ids: &["INTERNAL_NETWORK_ACCESS"],
rule_ids: &["INTERNAL_NETWORK_ACCESS"],
risk_delta: TIER3_INTERNAL_NETWORK_ROLLBACK,
reclassify_signal: true,
effect_downgraded: "downgraded_to_review_only",
effect_unchanged: "remains_review_only",
rationale: "Internal or loopback network targets are treated as review-only unless paired with fetch, execution, exfiltration, or metadata-service behavior.",
note_rule_id: "INTERNAL_NETWORK_ACCESS",
},
CalibrationRule {
trigger_rule_ids: &["MCP_NO_AUTH_MODEL", "OFFICIAL_MCP_NO_AUTH_REMOTE_ENDPOINT"],
rule_ids: &["MCP_NO_AUTH_MODEL", "OFFICIAL_MCP_NO_AUTH_REMOTE_ENDPOINT"],
risk_delta: TIER4_REMOTE_MCP_NO_AUTH_ROLLBACK,
reclassify_signal: true,
effect_downgraded: "downgraded_to_context",
effect_unchanged: "remains_context_only",
rationale: "Remote MCP without auth is still risky, but it is not treated as standalone malicious behavior unless it widens into command or transport execution semantics.",
note_rule_id: "MCP_NO_AUTH_MODEL",
},
];
fn is_permission_model_rule(rule_id: &str) -> bool {
crate::findings::is_declared_permission_rule(rule_id)
|| rule_id == "CAPABILITY_PERMISSION_MISMATCH"
}
pub(crate) fn calibrate_verdict_inputs(
findings: &[Finding],
root_cause_groups: &[RootCauseGroup],
) -> VerdictCalibration {
let mut groups = root_cause_groups.to_vec();
let gates = compute_calibration_gates(findings);
let original_snapshots: Vec<(RecommendedAction, SignalClass)> = groups
.iter()
.map(|group| (group.strongest_action, group.signal_class))
.collect();
let (risk_adjustment, mut notes) =
apply_calibration_rules(&mut groups, findings, &gates, &original_snapshots);
groups.retain(|g| g.finding_count > 0);
dedup_notes(&mut notes);
VerdictCalibration {
root_cause_groups: groups,
risk_adjustment,
notes,
}
}
fn compute_calibration_gates(findings: &[Finding]) -> Vec<bool> {
let has_stronger_behavior = findings.iter().any(|f| {
f.recommended_action != RecommendedAction::Log
&& !is_permission_model_rule(&f.rule_id)
&& f.rule_id != "INTERNAL_NETWORK_ACCESS"
&& !matches!(
f.rule_id.as_str(),
"MCP_NO_AUTH_MODEL" | "OFFICIAL_MCP_NO_AUTH_REMOTE_ENDPOINT"
)
&& matches!(
f.signal_class,
SignalClass::SuspiciousPackageBehavior | SignalClass::MaliciousBehavior
)
});
let has_network_chain = findings.iter().any(|f| {
let is_known_chain_rule = matches!(
f.rule_id.as_str(),
"ARTIFACT_TAINT_SECRET_TO_EXTERNAL_NETWORK"
| "ARTIFACT_TAINT_DOWNLOAD_TO_EXECUTION"
| "SSRF_LIKE_FETCH"
| "SKILL_REMOTE_EXEC_CURL_BASH"
| "SKILL_REMOTE_EXEC_POWERSHELL_IEX"
| "OFFICIAL_REMOTE_FETCH_EXEC_POLYGLOT"
| "OFFICIAL_SECRET_EXFIL_WEBHOOK"
) && f.recommended_action != RecommendedAction::Log;
let is_actionable_chain_category = matches!(
f.category,
ThreatCategory::RemoteExec
| ThreatCategory::DataExfiltration
| ThreatCategory::CredentialExposure
) && f.recommended_action != RecommendedAction::Log;
is_known_chain_rule || is_actionable_chain_category
});
let has_remote_mcp_exec_pair = findings.iter().any(|f| {
matches!(
f.rule_id.as_str(),
"MCP_REMOTE_EXEC_SURFACE"
| "MCP_TOOLING_TRANSPORT_DECLARED"
| "OFFICIAL_MCP_REMOTE_TUNNEL_WITH_EXEC"
| "OFFICIAL_MCP_REMOTE_BRIDGE_WITH_COMMAND"
) && f.recommended_action != RecommendedAction::Log
});
vec![
!has_stronger_behavior,
!has_stronger_behavior,
!has_network_chain,
!has_remote_mcp_exec_pair,
]
}
struct GroupCalibrationState<'f> {
original_signal_class: SignalClass,
original_action: RecommendedAction,
accumulated_exclusions: Vec<&'f str>,
pre_rule_action: RecommendedAction,
}
fn apply_calibration_rules<'f>(
groups: &mut [RootCauseGroup],
findings: &'f [Finding],
gates: &[bool],
original_snapshots: &[(RecommendedAction, SignalClass)],
) -> (i32, Vec<VerdictCalibrationNote>) {
debug_assert_eq!(
CALIBRATION_PIPELINE.len(),
gates.len(),
"gate count must match pipeline length"
);
let mut risk_adjustment = 0_i32;
let mut notes = Vec::new();
let mut counted_rules: std::collections::HashSet<&'static str> =
std::collections::HashSet::new();
let mut states: Vec<GroupCalibrationState<'f>> = groups
.iter()
.zip(original_snapshots.iter())
.map(|(g, &(_, original_signal_class))| GroupCalibrationState {
original_signal_class,
original_action: g.strongest_action,
accumulated_exclusions: Vec::new(),
pre_rule_action: g.strongest_action,
})
.collect();
for (i, group) in groups.iter_mut().enumerate() {
for (rule, &gate) in CALIBRATION_PIPELINE.iter().zip(gates.iter()) {
let (delta, note) =
apply_single_rule_to_group(group, rule, gate, findings, &mut states[i]);
if delta != 0 && counted_rules.insert(rule.note_rule_id) {
risk_adjustment += delta;
}
notes.extend(note);
}
}
(risk_adjustment, notes)
}
fn apply_single_rule_to_group<'f>(
group: &mut RootCauseGroup,
rule: &CalibrationRule,
gate: bool,
findings: &'f [Finding],
state: &mut GroupCalibrationState<'f>,
) -> (i32, Option<VerdictCalibrationNote>) {
let group_matches = findings.iter().any(|f| {
f.artifact_scope == group.scope
&& f.category == group.category
&& f.signal_class == state.original_signal_class
&& rule.trigger_rule_ids.contains(&f.rule_id.as_str())
});
if !gate || !group_matches {
return (0, None);
}
state
.accumulated_exclusions
.extend_from_slice(rule.rule_ids);
let (new_action, remaining_count) = recalculate_group_action_excluding(
findings,
group,
state.original_signal_class,
&state.accumulated_exclusions,
);
group.strongest_action = new_action;
group.finding_count = remaining_count;
{
let mut rules: Vec<String> = findings
.iter()
.filter(|f| {
f.artifact_scope == group.scope
&& f.category == group.category
&& f.signal_class == state.original_signal_class
&& !state.accumulated_exclusions.contains(&f.rule_id.as_str())
})
.map(|f| f.rule_id.clone())
.collect();
rules.sort();
rules.dedup();
rules.truncate(super::verdict::MAX_REPRESENTATIVE_RULES);
group.representative_rules = rules;
}
let changed_from_previous = group.strongest_action < state.pre_rule_action;
let risk_delta = if changed_from_previous {
rule.risk_delta
} else {
0
};
let downgraded_by_prior_rule = group.strongest_action == RecommendedAction::Log
&& state.original_action > RecommendedAction::Log;
if rule.reclassify_signal && (changed_from_previous || downgraded_by_prior_rule) {
group.signal_class = SignalClass::ReviewSignal;
}
state.pre_rule_action = group.strongest_action;
let was_reclassified =
rule.reclassify_signal && group.signal_class != state.original_signal_class;
let note = VerdictCalibrationNote {
rule_id: rule.note_rule_id.to_string(),
effect: if changed_from_previous {
rule.effect_downgraded.to_string()
} else if was_reclassified {
"reclassified_only".to_string()
} else {
rule.effect_unchanged.to_string()
},
rationale: rule.rationale.to_string(),
scope: group.scope,
category: group.category,
signal_class: group.signal_class,
};
(risk_delta, Some(note))
}
fn recalculate_group_action_excluding(
findings: &[Finding],
group: &RootCauseGroup,
original_signal_class: SignalClass,
excluded_rule_ids: &[&str],
) -> (RecommendedAction, usize) {
let remaining: Vec<_> = findings
.iter()
.filter(|f| {
f.artifact_scope == group.scope
&& f.category == group.category
&& f.signal_class == original_signal_class
&& !excluded_rule_ids.contains(&f.rule_id.as_str())
})
.collect();
let action = remaining.iter().fold(RecommendedAction::Log, |acc, f| {
acc.max(f.recommended_action)
});
(action, remaining.len())
}
fn dedup_notes(notes: &mut Vec<VerdictCalibrationNote>) {
notes.sort_by(|a, b| {
a.scope
.cmp(&b.scope)
.then_with(|| a.category.cmp(&b.category))
.then_with(|| a.rule_id.cmp(&b.rule_id))
.then_with(|| a.effect.cmp(&b.effect))
.then_with(|| a.rationale.cmp(&b.rationale))
.then_with(|| a.signal_class.cmp(&b.signal_class))
});
notes.dedup_by(|a, b| {
a.scope == b.scope
&& a.category == b.category
&& a.rule_id == b.rule_id
&& a.effect == b.effect
&& a.rationale == b.rationale
&& a.signal_class == b.signal_class
});
}
#[cfg(test)]
#[path = "verdict_calibration_tests.rs"]
mod verdict_calibration_tests;