use crate::graph::{AuthorityGraph, NodeId, NodeKind};
use crate::propagation::PropagationPath;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum Severity {
Critical,
High,
Medium,
Low,
Info,
}
impl Severity {
fn rank(self) -> u8 {
match self {
Severity::Critical => 0,
Severity::High => 1,
Severity::Medium => 2,
Severity::Low => 3,
Severity::Info => 4,
}
}
}
impl Ord for Severity {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.rank().cmp(&other.rank())
}
}
impl PartialOrd for Severity {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum FindingCategory {
AuthorityPropagation,
OverPrivilegedIdentity,
UnpinnedAction,
UntrustedWithAuthority,
ArtifactBoundaryCrossing,
FloatingImage,
LongLivedCredential,
PersistedCredential,
TriggerContextMismatch,
CrossWorkflowAuthorityChain,
AuthorityCycle,
UpliftWithoutAttestation,
SelfMutatingPipeline,
CheckoutSelfPrExposure,
VariableGroupInPrJob,
SelfHostedPoolPrHijack,
ServiceConnectionScopeMismatch,
TemplateExtendsUnpinnedBranch,
TemplateRepoRefIsFeatureBranch,
VmRemoteExecViaPipelineSecret,
ShortLivedSasInCommandLine,
SecretToInlineScriptEnvExport,
SecretMaterialisedToWorkspaceFile,
#[serde(rename = "keyvault_secret_to_plaintext")]
KeyVaultSecretToPlaintext,
TerraformAutoApproveInProd,
AddSpnWithInlineScript,
ParameterInterpolationIntoShell,
RuntimeScriptFetchedFromFloatingUrl,
PrTriggerWithFloatingActionRef,
UntrustedApiResponseToEnvSink,
PrBuildPushesImageWithFloatingCredentials,
#[doc(hidden)]
EgressBlindspot,
#[doc(hidden)]
MissingAuditTrail,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Recommendation {
TsafeRemediation {
command: String,
explanation: String,
},
CellosRemediation {
reason: String,
spec_hint: String,
},
PinAction {
current: String,
pinned: String,
},
ReducePermissions {
current: String,
minimum: String,
},
FederateIdentity {
static_secret: String,
oidc_provider: String,
},
Manual {
action: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Finding {
pub severity: Severity,
pub category: FindingCategory,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<PropagationPath>,
pub nodes_involved: Vec<NodeId>,
pub message: String,
pub recommendation: Recommendation,
}
fn extract_custom_rule_id(message: &str) -> Option<&str> {
if !message.starts_with('[') {
return None;
}
let end = message.find(']')?;
let id = &message[1..end];
if id.is_empty() {
None
} else {
Some(id)
}
}
fn category_rule_id(category: &FindingCategory) -> String {
serde_json::to_value(category)
.ok()
.and_then(|v| v.as_str().map(str::to_string))
.unwrap_or_else(|| "unknown".to_string())
}
pub fn compute_fingerprint(finding: &Finding, graph: &AuthorityGraph) -> String {
let rule_id = extract_custom_rule_id(&finding.message)
.map(str::to_string)
.unwrap_or_else(|| category_rule_id(&finding.category));
let category = category_rule_id(&finding.category);
let file = graph.source.file.as_str();
let root_authority: Option<&str> = finding
.nodes_involved
.iter()
.filter_map(|id| graph.node(*id))
.find(|n| matches!(n.kind, NodeKind::Secret | NodeKind::Identity))
.map(|n| n.name.as_str());
let node_segment: String = match root_authority {
Some(name) => name.to_string(),
None => {
let mut names: Vec<&str> = finding
.nodes_involved
.iter()
.filter_map(|id| graph.node(*id))
.map(|n| n.name.as_str())
.collect();
names.sort_unstable();
names.dedup();
names.join(",")
}
};
let canonical = format!(
"v1\x1frule={rule_id}\x1ffile={file}\x1fcategory={category}\x1fnodes={node_segment}"
);
let digest = Sha256::digest(canonical.as_bytes());
let mut out = String::with_capacity(16);
for byte in &digest[..8] {
use std::fmt::Write;
let _ = write!(&mut out, "{byte:02x}");
}
out
}
#[cfg(test)]
mod fingerprint_tests {
use super::*;
use crate::graph::{AuthorityGraph, NodeKind, PipelineSource, TrustZone};
fn source(file: &str) -> PipelineSource {
PipelineSource {
file: file.to_string(),
repo: None,
git_ref: None,
commit_sha: None,
}
}
fn make_finding(category: FindingCategory, msg: &str, nodes: Vec<NodeId>) -> Finding {
Finding {
severity: Severity::High,
category,
path: None,
nodes_involved: nodes,
message: msg.to_string(),
recommendation: Recommendation::Manual {
action: "fix it".to_string(),
},
}
}
#[test]
fn fingerprint_is_stable_across_repeat_calls() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let f = make_finding(
FindingCategory::AuthorityPropagation,
"AWS_KEY reaches third party",
vec![s],
);
let a = compute_fingerprint(&f, &graph);
let b = compute_fingerprint(&f, &graph);
assert_eq!(a, b, "same finding must hash identically across calls");
assert_eq!(a.len(), 16, "fingerprint is 16 hex chars");
assert!(a.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn different_files_produce_different_fingerprints() {
let mut g_a = AuthorityGraph::new(source("workflows/a.yml"));
let mut g_b = AuthorityGraph::new(source("workflows/b.yml"));
let s_a = g_a.add_node(NodeKind::Secret, "TOKEN", TrustZone::FirstParty);
let s_b = g_b.add_node(NodeKind::Secret, "TOKEN", TrustZone::FirstParty);
let f_a = make_finding(FindingCategory::UnpinnedAction, "msg", vec![s_a]);
let f_b = make_finding(FindingCategory::UnpinnedAction, "msg", vec![s_b]);
assert_ne!(
compute_fingerprint(&f_a, &g_a),
compute_fingerprint(&f_b, &g_b)
);
}
#[test]
fn different_rules_produce_different_fingerprints() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let f1 = make_finding(FindingCategory::AuthorityPropagation, "msg", vec![s]);
let f2 = make_finding(FindingCategory::UntrustedWithAuthority, "msg", vec![s]);
assert_ne!(
compute_fingerprint(&f1, &graph),
compute_fingerprint(&f2, &graph)
);
}
#[test]
fn message_changes_do_not_affect_fingerprint() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "AWS_KEY", TrustZone::FirstParty);
let f1 = make_finding(
FindingCategory::AuthorityPropagation,
"old phrasing of the message",
vec![s],
);
let f2 = make_finding(
FindingCategory::AuthorityPropagation,
"completely different new phrasing",
vec![s],
);
assert_eq!(
compute_fingerprint(&f1, &graph),
compute_fingerprint(&f2, &graph)
);
}
#[test]
fn per_hop_findings_against_same_authority_collapse() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let secret = graph.add_node(NodeKind::Secret, "DEPLOY_TOKEN", TrustZone::FirstParty);
let step_a = graph.add_node(NodeKind::Step, "deploy[0]", TrustZone::Untrusted);
let step_b = graph.add_node(NodeKind::Step, "deploy[1]", TrustZone::Untrusted);
let f_a = make_finding(
FindingCategory::AuthorityPropagation,
"DEPLOY_TOKEN reaches deploy[0]",
vec![secret, step_a],
);
let f_b = make_finding(
FindingCategory::AuthorityPropagation,
"DEPLOY_TOKEN reaches deploy[1]",
vec![secret, step_b],
);
assert_eq!(
compute_fingerprint(&f_a, &graph),
compute_fingerprint(&f_b, &graph),
"per-hop findings against one secret must share a fingerprint"
);
}
#[test]
fn custom_rule_id_in_message_is_used() {
let mut graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let s = graph.add_node(NodeKind::Secret, "X", TrustZone::FirstParty);
let f_custom = make_finding(
FindingCategory::UnpinnedAction,
"[my_custom_rule] something happened",
vec![s],
);
let f_plain = make_finding(FindingCategory::UnpinnedAction, "no prefix here", vec![s]);
assert_ne!(
compute_fingerprint(&f_custom, &graph),
compute_fingerprint(&f_plain, &graph),
"custom rule id must distinguish from category fallback"
);
}
#[test]
fn empty_node_list_still_produces_fingerprint() {
let graph = AuthorityGraph::new(source(".github/workflows/ci.yml"));
let f = make_finding(FindingCategory::UnpinnedAction, "no nodes here", vec![]);
let fp = compute_fingerprint(&f, &graph);
assert_eq!(fp.len(), 16);
assert!(fp.chars().all(|c| c.is_ascii_hexdigit()));
}
}