use crate::graph::{AuthorityGraph, NodeId, NodeKind};
use crate::propagation::PropagationPath;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::path::PathBuf;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Severity {
Critical,
High,
Medium,
Low,
Info,
}
impl Severity {
fn rank(self) -> u8 {
match self {
Severity::Critical => 0,
Severity::High => 1,
Severity::Medium => 2,
Severity::Low => 3,
Severity::Info => 4,
}
}
}
impl Ord for Severity {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.rank().cmp(&other.rank())
}
}
impl PartialOrd for Severity {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FindingCategory {
AuthorityPropagation,
OverPrivilegedIdentity,
UnpinnedAction,
UntrustedWithAuthority,
ArtifactBoundaryCrossing,
FloatingImage,
LongLivedCredential,
PersistedCredential,
TriggerContextMismatch,
CrossWorkflowAuthorityChain,
AuthorityCycle,
UpliftWithoutAttestation,
SelfMutatingPipeline,
CheckoutSelfPrExposure,
VariableGroupInPrJob,
SelfHostedPoolPrHijack,
SharedSelfHostedPoolNoIsolation,
ServiceConnectionScopeMismatch,
TemplateExtendsUnpinnedBranch,
TemplateRepoRefIsFeatureBranch,
VmRemoteExecViaPipelineSecret,
ShortLivedSasInCommandLine,
SecretToInlineScriptEnvExport,
SecretMaterialisedToWorkspaceFile,
#[serde(rename = "keyvault_secret_to_plaintext")]
KeyVaultSecretToPlaintext,
TerraformAutoApproveInProd,
AddSpnWithInlineScript,
ParameterInterpolationIntoShell,
RuntimeScriptFetchedFromFloatingUrl,
PrTriggerWithFloatingActionRef,
UntrustedApiResponseToEnvSink,
PrBuildPushesImageWithFloatingCredentials,
SecretViaEnvGateToUntrustedConsumer,
NoWorkflowLevelPermissionsBlock,
ProdDeployJobNoEnvironmentGate,
LongLivedSecretWithoutOidcRecommendation,
PullRequestWorkflowInconsistentForkCheck,
GitlabDeployJobMissingProtectedBranchOnly,
TerraformOutputViaSetvariableShellExpansion,
RiskyTriggerWithAuthority,
SensitiveValueInJobOutput,
ManualDispatchInputToUrlOrCommand,
SecretsInheritOverscopedPassthrough,
UnsafePrArtifactInWorkflowRunConsumer,
ScriptInjectionViaUntrustedContext,
InteractiveDebugActionInAuthorityWorkflow,
PrSpecificCacheKeyInDefaultBranchConsumer,
GhCliWithDefaultTokenEscalating,
CiJobTokenToExternalApi,
IdTokenAudienceOverscoped,
UntrustedCiVarInShellInterpolation,
UnpinnedIncludeRemoteOrBranchRef,
DindServiceGrantsHostAuthority,
SecurityJobSilentlySkipped,
ChildPipelineTriggerInheritsAuthority,
CacheKeyCrossesTrustBoundary,
PatEmbeddedInGitRemoteUrl,
CiTokenTriggersDownstreamWithVariablePassthrough,
DotenvArtifactFlowsToPrivilegedDeployment,
SetvariableIssecretFalse,
HomoglyphInActionRef,
#[doc(hidden)]
EgressBlindspot,
#[doc(hidden)]
MissingAuditTrail,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Recommendation {
TsafeRemediation {
command: String,
explanation: String,
},
CellosRemediation {
reason: String,
spec_hint: String,
},
PinAction {
current: String,
pinned: String,
},
ReducePermissions {
current: String,
minimum: String,
},
FederateIdentity {
static_secret: String,
oidc_provider: String,
},
Manual {
action: String,
},
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FindingSource {
#[default]
#[serde(rename = "built-in")]
BuiltIn,
Custom { source_file: PathBuf },
}
impl FindingSource {
pub fn is_built_in(&self) -> bool {
matches!(self, FindingSource::BuiltIn)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FixEffort {
Trivial,
Small,
Medium,
Large,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct FindingExtras {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub finding_group_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub time_to_fix: Option<FixEffort>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub compensating_controls: Vec<String>,
#[serde(default, skip_serializing_if = "is_false")]
pub suppressed: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub original_severity: Option<Severity>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub suppression_reason: Option<String>,
}
#[allow(clippy::trivially_copy_pass_by_ref)]
fn is_false(b: &bool) -> bool {
!*b
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Finding {
pub severity: Severity,
pub category: FindingCategory,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<PropagationPath>,
pub nodes_involved: Vec<NodeId>,
pub message: String,
pub recommendation: Recommendation,
#[serde(default)]
pub source: FindingSource,
#[serde(flatten, default)]
pub extras: FindingExtras,
}
impl Finding {
pub fn with_time_to_fix(mut self, effort: FixEffort) -> Self {
self.extras.time_to_fix = Some(effort);
self
}
pub fn with_compensating_control(mut self, control: impl Into<String>) -> Self {
let original = self.severity;
self.extras.compensating_controls.push(control.into());
self.severity = downgrade_severity(self.severity);
if self.extras.original_severity.is_none() {
self.extras.original_severity = Some(original);
}
self
}
}
pub fn downgrade_severity(s: Severity) -> Severity {
match s {
Severity::Critical => Severity::High,
Severity::High => Severity::Medium,
Severity::Medium => Severity::Low,
Severity::Low => Severity::Info,
Severity::Info => Severity::Info,
}
}
pub fn compute_finding_group_id(fingerprint: &str) -> String {
const NAMESPACE: [u8; 16] = [
0x6c, 0x6f, 0xd0, 0xa3, 0x82, 0x44, 0x4f, 0x29, 0xb1, 0x9a, 0x09, 0xc8, 0x7e, 0x49, 0x55,
0x21,
];
use sha1::{Digest as Sha1Digest, Sha1};
let mut hasher = Sha1::new();
Sha1Digest::update(&mut hasher, NAMESPACE);
Sha1Digest::update(&mut hasher, fingerprint.as_bytes());
let hash = hasher.finalize();
let mut bytes = [0u8; 16];
bytes.copy_from_slice(&hash[..16]);
bytes[6] = (bytes[6] & 0x0f) | 0x50;
bytes[8] = (bytes[8] & 0x3f) | 0x80;
format!(
"{:02x}{:02x}{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}-{:02x}{:02x}{:02x}{:02x}{:02x}{:02x}",
bytes[0], bytes[1], bytes[2], bytes[3],
bytes[4], bytes[5],
bytes[6], bytes[7],
bytes[8], bytes[9],
bytes[10], bytes[11], bytes[12], bytes[13], bytes[14], bytes[15],
)
}
fn extract_custom_rule_id(message: &str) -> Option<&str> {
if !message.starts_with('[') {
return None;
}
let end = message.find(']')?;
let id = &message[1..end];
if id.is_empty() {
None
} else {
Some(id)
}
}
fn category_rule_id(category: &FindingCategory) -> String {
serde_json::to_value(category)
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_else(|| "unknown".to_string())
}
pub fn rule_id_for(finding: &Finding) -> String {
extract_custom_rule_id(&finding.message)
.map(str::to_string)
.unwrap_or_else(|| category_rule_id(&finding.category))
}
pub fn compute_fingerprint(finding: &Finding, graph: &AuthorityGraph) -> String {
let rule_id = extract_custom_rule_id(&finding.message)
.map(str::to_string)
.unwrap_or_else(|| category_rule_id(&finding.category));
let category = category_rule_id(&finding.category);
let file = graph.source.file.as_str();
let root_authority: String = finding
.nodes_involved
.iter()
.filter_map(|id| graph.node(*id))
.find(|n| matches!(n.kind, NodeKind::Secret | NodeKind::Identity))
.map(|n| n.name.clone())
.unwrap_or_default();
let nodes_ordered: String = finding
.nodes_involved
.iter()
.filter_map(|id| graph.node(*id))
.map(|n| n.name.as_str())
.collect::<Vec<_>>()
.join(",");
let canonical = format!(
"v2\x1frule={rule_id}\x1ffile={file}\x1fcategory={category}\x1froot={root_authority}\x1fnodes={nodes_ordered}"
);
let digest = Sha256::digest(canonical.as_bytes());
let mut out = String::with_capacity(16);
for byte in &digest[..8] {
use std::fmt::Write;
let _ = write!(&mut out, "{byte:02x}");
}
out
}
#[cfg(test)]
mod fingerprint_tests {
use super::*;
use crate::graph::{AuthorityGraph, NodeKind, PipelineSource, TrustZone};
fn source(file: &str) -> PipelineSource {
PipelineSource {
file: file.to_string(),
repo: None,
git_ref: None,
commit_sha: None,
}
}
fn make_finding(category: FindingCategory, msg: &str, nodes: Vec<NodeId>) -> Finding {
Finding {
severity: Severity::High,
category,
path: None,
nodes_involved: nodes,
message: msg.to_string(),
recommendation: Recommendation::Manual {
action: "fix it".to_string(),
},
source: FindingSource::BuiltIn,
extras: FindingExtras::default(),
}
}
#[test]
fn fingerprint_is_stable_across_repeat_calls() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let f = make_finding(
FindingCategory::AuthorityPropagation,
"AWS_KEY reaches third party",
vec![s],
);
let a = compute_fingerprint(&f, &graph);
let b = compute_fingerprint(&f, &graph);
assert_eq!(a, b, "same finding must hash identically across calls");
assert_eq!(a.len(), 16, "fingerprint is 16 hex chars");
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn different_files_produce_different_fingerprints() {
let mut g_a = AuthorityGraph::new(source("workflows/a.yml"));
let mut g_b = AuthorityGraph::new(source("workflows/b.yml"));
let s_a = g_a.add_node(NodeKind::Secret, "TOKEN", TrustZone::FirstParty);
let s_b = g_b.add_node(NodeKind::Secret, "TOKEN", TrustZone::FirstParty);
let f_a = make_finding(FindingCategory::UnpinnedAction, "msg", vec![s_a]);
let f_b = make_finding(FindingCategory::UnpinnedAction, "msg", vec![s_b]);
assert_ne!(
compute_fingerprint(&f_a, &g_a),
compute_fingerprint(&f_b, &g_b)
);
}
#[test]
fn different_rules_produce_different_fingerprints() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let f1 = make_finding(FindingCategory::AuthorityPropagation, "msg", vec![s]);
let f2 = make_finding(FindingCategory::UntrustedWithAuthority, "msg", vec![s]);
assert_ne!(
compute_fingerprint(&f1, &graph),
compute_fingerprint(&f2, &graph)
);
}
#[test]
fn message_changes_do_not_affect_fingerprint() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let f1 = make_finding(
FindingCategory::AuthorityPropagation,
"old phrasing of the message",
vec![s],
);
let f2 = make_finding(
FindingCategory::AuthorityPropagation,
"completely different new phrasing",
vec![s],
);
assert_eq!(
compute_fingerprint(&f1, &graph),
compute_fingerprint(&f2, &graph)
);
}
#[test]
fn per_hop_findings_against_same_authority_are_distinct() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let secret = graph.add_node(NodeKind::Secret, "DEPLOY_TOKEN", TrustZone::FirstParty);
let step_a = graph.add_node(NodeKind::Step, "deploy[0]", TrustZone::Untrusted);
let step_b = graph.add_node(NodeKind::Step, "deploy[1]", TrustZone::Untrusted);
let f_a = make_finding(
FindingCategory::AuthorityPropagation,
"DEPLOY_TOKEN reaches deploy[0]",
vec![secret, step_a],
);
let f_b = make_finding(
FindingCategory::AuthorityPropagation,
"DEPLOY_TOKEN reaches deploy[1]",
vec![secret, step_b],
);
assert_ne!(
compute_fingerprint(&f_a, &graph),
compute_fingerprint(&f_b, &graph),
"per-hop findings against one secret must produce distinct \
fingerprints — sink identity is part of the issue"
);
}
#[test]
fn same_secret_same_sink_remains_stable_across_calls() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let secret = graph.add_node(NodeKind::Secret, "DEPLOY_TOKEN", TrustZone::FirstParty);
let step = graph.add_node(NodeKind::Step, "deploy[0]", TrustZone::Untrusted);
let f = make_finding(
FindingCategory::AuthorityPropagation,
"msg",
vec![secret, step],
);
assert_eq!(
compute_fingerprint(&f, &graph),
compute_fingerprint(&f, &graph)
);
}
#[test]
fn r2_attack2_two_files_same_secret_name_distinct_fingerprints() {
let mut g_a = AuthorityGraph::new(source("workflows/a.yml"));
let mut g_b = AuthorityGraph::new(source("workflows/b.yml"));
let s_a = g_a.add_node(NodeKind::Secret, "MY_SECRET", TrustZone::FirstParty);
let sink_a = g_a.add_node(NodeKind::Step, "evil/action", TrustZone::Untrusted);
let s_b = g_b.add_node(NodeKind::Secret, "MY_SECRET", TrustZone::FirstParty);
let sink_b = g_b.add_node(
NodeKind::Step,
"different-evil/action",
TrustZone::Untrusted,
);
let f_a = make_finding(
FindingCategory::AuthorityPropagation,
"MY_SECRET reaches evil/action",
vec![s_a, sink_a],
);
let f_b = make_finding(
FindingCategory::AuthorityPropagation,
"MY_SECRET reaches different-evil/action",
vec![s_b, sink_b],
);
assert_ne!(
compute_fingerprint(&f_a, &g_a),
compute_fingerprint(&f_b, &g_b),
"two genuinely different findings must not share a fingerprint \
just because the secret name overlaps"
);
}
#[test]
fn root_authority_segment_is_always_present_even_when_empty() {
let mut g = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let img_a = g.add_node(NodeKind::Image, "alpine:latest", TrustZone::ThirdParty);
let img_b = g.add_node(NodeKind::Image, "ubuntu:22.04", TrustZone::ThirdParty);
let f_a = make_finding(FindingCategory::FloatingImage, "msg-a", vec![img_a]);
let f_b = make_finding(FindingCategory::FloatingImage, "msg-b", vec![img_b]);
let fp_a = compute_fingerprint(&f_a, &g);
let fp_b = compute_fingerprint(&f_b, &g);
assert_ne!(
fp_a, fp_b,
"two distinct floating-image findings must not collide"
);
assert_eq!(fp_a.len(), 16);
assert_eq!(fp_b.len(), 16);
}
#[test]
fn node_order_is_significant() {
let mut g = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = g.add_node(NodeKind::Secret, "K", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "use", TrustZone::Untrusted);
let forward = make_finding(FindingCategory::AuthorityPropagation, "x", vec![s, step]);
let reverse = make_finding(FindingCategory::AuthorityPropagation, "x", vec![step, s]);
assert_ne!(
compute_fingerprint(&forward, &g),
compute_fingerprint(&reverse, &g),
"node order must influence the fingerprint so role swap is detectable"
);
}
#[test]
fn custom_rule_id_in_message_is_used() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "X", TrustZone::FirstParty);
let f_custom = make_finding(
FindingCategory::UnpinnedAction,
"[my_custom_rule] something happened",
vec![s],
);
let f_plain = make_finding(FindingCategory::UnpinnedAction, "no prefix here", vec![s]);
assert_ne!(
compute_fingerprint(&f_custom, &graph),
compute_fingerprint(&f_plain, &graph),
"custom rule id must distinguish from category fallback"
);
}
#[test]
fn finding_group_id_is_deterministic_uuid_v5() {
let g1 = compute_finding_group_id("5edb30f4db3b5fa3");
let g2 = compute_finding_group_id("5edb30f4db3b5fa3");
assert_eq!(g1, g2);
assert_eq!(g1.len(), 36);
assert_eq!(
g1.chars().nth(14),
Some('5'),
"expected v5 marker, got {g1}"
);
let variant = g1.chars().nth(19).unwrap();
assert!(
matches!(variant, '8' | '9' | 'a' | 'b'),
"expected RFC 4122 variant, got {variant}"
);
assert_ne!(g1, compute_finding_group_id("a3c8d9e1f2b4c5d6"));
}
#[test]
fn with_time_to_fix_attaches_effort() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "X", TrustZone::FirstParty);
let f = make_finding(FindingCategory::UnpinnedAction, "msg", vec![s])
.with_time_to_fix(FixEffort::Trivial);
assert_eq!(f.extras.time_to_fix, Some(FixEffort::Trivial));
}
#[test]
fn with_compensating_control_downgrades_and_records_original() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "X", TrustZone::FirstParty);
let f = make_finding(FindingCategory::TriggerContextMismatch, "msg", vec![s])
.with_compensating_control("fork check present");
assert_eq!(f.severity, Severity::Medium);
assert_eq!(f.extras.original_severity, Some(Severity::High));
assert_eq!(f.extras.compensating_controls.len(), 1);
}
#[test]
fn empty_node_list_still_produces_fingerprint() {
let graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let f = make_finding(FindingCategory::UnpinnedAction, "no nodes here", vec![]);
let fp = compute_fingerprint(&f, &graph);
assert_eq!(fp.len(), 16);
assert!(fp.chars().all(|c| c.is_ascii_hexdigit()));
}
}
#[cfg(test)]
mod source_tests {
use super::*;
#[test]
fn built_in_serializes_as_string() {
let s = FindingSource::BuiltIn;
let v = serde_json::to_value(&s).unwrap();
assert_eq!(v, serde_json::json!("built-in"));
}
#[test]
fn custom_serializes_with_path_payload() {
let s = FindingSource::Custom {
source_file: PathBuf::from("/policies/no_prod_pat.yml"),
};
let v = serde_json::to_value(&s).unwrap();
assert_eq!(
v,
serde_json::json!({"custom": {"source_file": "/policies/no_prod_pat.yml"}})
);
}
#[test]
fn finding_round_trip_preserves_built_in_source() {
let f = Finding {
severity: Severity::High,
category: FindingCategory::AuthorityPropagation,
path: None,
nodes_involved: vec![],
message: "x".into(),
recommendation: Recommendation::Manual {
action: "fix".into(),
},
source: FindingSource::BuiltIn,
extras: FindingExtras::default(),
};
let s = serde_json::to_string(&f).unwrap();
assert!(
s.contains("\"source\":\"built-in\""),
"built-in source must serialise as \"built-in\": {s}"
);
let f2: Finding = serde_json::from_str(&s).unwrap();
assert_eq!(f2.source, FindingSource::BuiltIn);
}
#[test]
fn finding_round_trip_preserves_custom_source_with_path() {
let path = PathBuf::from("/work/invariants/no_prod_pat.yml");
let f = Finding {
severity: Severity::Critical,
category: FindingCategory::AuthorityPropagation,
path: None,
nodes_involved: vec![],
message: "[no_prod_pat] hit".into(),
recommendation: Recommendation::Manual {
action: "fix".into(),
},
source: FindingSource::Custom {
source_file: path.clone(),
},
extras: FindingExtras::default(),
};
let s = serde_json::to_string(&f).unwrap();
assert!(
s.contains("\"custom\""),
"custom source must serialise with `custom` key: {s}"
);
assert!(
s.contains("/work/invariants/no_prod_pat.yml"),
"custom source must include the loader path: {s}"
);
let f2: Finding = serde_json::from_str(&s).unwrap();
assert_eq!(
f2.source,
FindingSource::Custom { source_file: path },
"round-trip must preserve custom source path"
);
}
#[test]
fn missing_source_field_deserializes_as_built_in() {
let json = r#"{
"severity": "high",
"category": "authority_propagation",
"nodes_involved": [],
"message": "old-format finding",
"recommendation": {"type": "manual", "action": "review"}
}"#;
let f: Finding = serde_json::from_str(json).expect("legacy JSON must parse");
assert_eq!(f.source, FindingSource::BuiltIn);
}
}