use crate::finding::{Finding, FindingCategory, Recommendation, Severity};
use crate::graph::{
is_docker_digest_pinned, is_sha_pinned, AuthorityCompleteness, AuthorityGraph, EdgeKind,
IdentityScope, NodeId, NodeKind, TrustZone, META_ADD_SPN_TO_ENV, META_ATTESTS,
META_CHECKOUT_SELF, META_CLI_FLAG_EXPOSED, META_CONTAINER, META_DIGEST, META_ENV_APPROVAL,
META_IDENTITY_SCOPE, META_IMPLICIT, META_OIDC, META_PERMISSIONS, META_REPOSITORIES,
META_SCRIPT_BODY, META_SELF_HOSTED, META_SERVICE_CONNECTION, META_SERVICE_CONNECTION_NAME,
META_TERRAFORM_AUTO_APPROVE, META_TRIGGER, META_VARIABLE_GROUP, META_WRITES_ENV_GATE,
};
use crate::propagation;
fn cap_severity(severity: Severity, max_severity: Severity) -> Severity {
if severity < max_severity {
max_severity
} else {
severity
}
}
fn apply_confidence_cap(graph: &AuthorityGraph, findings: &mut [Finding]) {
if graph.completeness != AuthorityCompleteness::Partial {
return;
}
for finding in findings {
finding.severity = cap_severity(finding.severity, Severity::High);
}
}
pub fn authority_propagation(graph: &AuthorityGraph, max_hops: usize) -> Vec<Finding> {
let paths = propagation::propagation_analysis(graph, max_hops);
paths
.into_iter()
.filter(|p| p.crossed_boundary)
.map(|path| {
let source_name = graph
.node(path.source)
.map(|n| n.name.as_str())
.unwrap_or("?");
let sink_name = graph
.node(path.sink)
.map(|n| n.name.as_str())
.unwrap_or("?");
let sink_is_pinned = graph
.node(path.sink)
.map(|n| {
n.trust_zone == TrustZone::ThirdParty && n.metadata.contains_key(META_DIGEST)
})
.unwrap_or(false);
let source_is_constrained = graph
.node(path.source)
.and_then(|n| n.metadata.get(META_IDENTITY_SCOPE))
.map(|s| s == "constrained")
.unwrap_or(false);
let source_is_oidc = graph
.node(path.source)
.and_then(|n| n.metadata.get(META_OIDC))
.map(|v| v == "true")
.unwrap_or(false);
let base_severity = if sink_is_pinned && source_is_constrained && !source_is_oidc {
Severity::Medium
} else if sink_is_pinned && !source_is_oidc {
Severity::High
} else {
Severity::Critical
};
let crosses_approval_gate = path_crosses_env_approval(graph, &path);
let (severity, message_suffix) = if crosses_approval_gate {
(
downgrade_one_step(base_severity),
" (mitigated: environment approval gate)",
)
} else {
(base_severity, "")
};
Finding {
severity,
category: FindingCategory::AuthorityPropagation,
nodes_involved: vec![path.source, path.sink],
message: format!(
"{source_name} propagated to {sink_name} across trust boundary{message_suffix}"
),
recommendation: Recommendation::TsafeRemediation {
command: "tsafe exec --ns <scoped-namespace> -- <command>".to_string(),
explanation: format!("Scope {source_name} to only the steps that need it"),
},
path: Some(path),
}
})
.collect()
}
fn path_crosses_env_approval(graph: &AuthorityGraph, path: &propagation::PropagationPath) -> bool {
let has_marker = |id: NodeId| {
graph
.node(id)
.and_then(|n| n.metadata.get(META_ENV_APPROVAL))
.map(|v| v == "true")
.unwrap_or(false)
};
if has_marker(path.source) || has_marker(path.sink) {
return true;
}
for &edge_id in &path.edges {
if let Some(edge) = graph.edge(edge_id) {
if has_marker(edge.from) || has_marker(edge.to) {
return true;
}
}
}
false
}
fn downgrade_one_step(severity: Severity) -> Severity {
match severity {
Severity::Critical => Severity::High,
Severity::High => Severity::Medium,
Severity::Medium => Severity::Low,
Severity::Low => Severity::Low,
Severity::Info => Severity::Info,
}
}
pub fn over_privileged_identity(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for identity in graph.nodes_of_kind(NodeKind::Identity) {
let granted_scope = identity
.metadata
.get(META_PERMISSIONS)
.cloned()
.unwrap_or_default();
let scope = identity
.metadata
.get(META_IDENTITY_SCOPE)
.and_then(|s| match s.as_str() {
"broad" => Some(IdentityScope::Broad),
"constrained" => Some(IdentityScope::Constrained),
"unknown" => Some(IdentityScope::Unknown),
_ => None,
})
.unwrap_or_else(|| IdentityScope::from_permissions(&granted_scope));
let (should_flag, severity) = match scope {
IdentityScope::Broad => (true, Severity::High),
IdentityScope::Unknown => (true, Severity::Medium),
IdentityScope::Constrained => (false, Severity::Info),
};
if !should_flag {
continue;
}
let accessor_steps: Vec<_> = graph
.edges_to(identity.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.from))
.collect();
if !accessor_steps.is_empty() {
let scope_label = match scope {
IdentityScope::Broad => "broad",
IdentityScope::Unknown => "unknown (treat as risky)",
IdentityScope::Constrained => "constrained",
};
findings.push(Finding {
severity,
category: FindingCategory::OverPrivilegedIdentity,
path: None,
nodes_involved: std::iter::once(identity.id)
.chain(accessor_steps.iter().map(|n| n.id))
.collect(),
message: format!(
"{} has {} scope (permissions: '{}') — likely broader than needed",
identity.name, scope_label, granted_scope
),
recommendation: Recommendation::ReducePermissions {
current: granted_scope.clone(),
minimum: "{ contents: read }".into(),
},
});
}
}
findings
}
pub fn unpinned_action(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
let mut seen = std::collections::HashSet::new();
for image in graph.nodes_of_kind(NodeKind::Image) {
if image.trust_zone == TrustZone::FirstParty {
continue;
}
if image
.metadata
.get(META_CONTAINER)
.map(|v| v == "true")
.unwrap_or(false)
{
continue;
}
if !seen.insert(&image.name) {
continue;
}
let has_digest = image.metadata.contains_key(META_DIGEST);
if !has_digest && !is_sha_pinned(&image.name) {
findings.push(Finding {
severity: Severity::Medium,
category: FindingCategory::UnpinnedAction,
path: None,
nodes_involved: vec![image.id],
message: format!("{} is not pinned to a SHA digest", image.name),
recommendation: Recommendation::PinAction {
current: image.name.clone(),
pinned: format!(
"{}@<sha256-digest>",
image.name.split('@').next().unwrap_or(&image.name)
),
},
});
}
}
findings
}
pub fn untrusted_with_authority(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_in_zone(TrustZone::Untrusted) {
if step.kind != NodeKind::Step {
continue;
}
for edge in graph.edges_from(step.id) {
if edge.kind != EdgeKind::HasAccessTo {
continue;
}
if let Some(target) = graph.node(edge.to) {
if matches!(target.kind, NodeKind::Secret | NodeKind::Identity) {
let cli_flag_exposed = target
.metadata
.get(META_CLI_FLAG_EXPOSED)
.map(|v| v == "true")
.unwrap_or(false);
let is_implicit = target
.metadata
.get(META_IMPLICIT)
.map(|v| v == "true")
.unwrap_or(false);
let recommendation = if target.kind == NodeKind::Secret {
if cli_flag_exposed {
Recommendation::Manual {
action: format!(
"Move '{}' from -var flag to TF_VAR_{} env var — \
-var values appear in pipeline logs and Terraform plan output",
target.name, target.name
),
}
} else {
Recommendation::CellosRemediation {
reason: format!(
"Untrusted step '{}' has direct access to secret '{}'",
step.name, target.name
),
spec_hint: format!(
"cellos run --network deny-all --broker env:{}",
target.name
),
}
}
} else {
let minimum = if is_implicit {
"minimal required scope — or use CellOS deny-all egress as a compensating control to limit exfiltration of the injected token".into()
} else {
"minimal required scope".into()
};
Recommendation::ReducePermissions {
current: target
.metadata
.get(META_PERMISSIONS)
.cloned()
.unwrap_or_else(|| "unknown".into()),
minimum,
}
};
let log_exposure_note = if cli_flag_exposed {
" (passed as -var flag — value visible in pipeline logs)"
} else {
""
};
let (severity, message) =
if is_implicit {
(
Severity::Info,
format!(
"Untrusted step '{}' has structural access to implicit {} '{}' \
(platform-injected — all tasks receive this token by design){}",
step.name,
if target.kind == NodeKind::Secret { "secret" } else { "identity" },
target.name,
log_exposure_note,
),
)
} else {
(
Severity::Critical,
format!(
"Untrusted step '{}' has direct access to {} '{}'{}",
step.name,
if target.kind == NodeKind::Secret {
"secret"
} else {
"identity"
},
target.name,
log_exposure_note,
),
)
};
findings.push(Finding {
severity,
category: FindingCategory::UntrustedWithAuthority,
path: None,
nodes_involved: vec![step.id, target.id],
message,
recommendation,
});
}
}
}
}
findings
}
pub fn artifact_boundary_crossing(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for artifact in graph.nodes_of_kind(NodeKind::Artifact) {
let producers: Vec<_> = graph
.edges_to(artifact.id)
.filter(|e| e.kind == EdgeKind::Produces)
.filter_map(|e| graph.node(e.from))
.collect();
let consumers: Vec<_> = graph
.edges_from(artifact.id)
.filter(|e| e.kind == EdgeKind::Consumes)
.filter_map(|e| graph.node(e.to))
.collect();
for producer in &producers {
let producer_has_authority = graph.edges_from(producer.id).any(|e| {
e.kind == EdgeKind::HasAccessTo
&& graph
.node(e.to)
.map(|n| matches!(n.kind, NodeKind::Secret | NodeKind::Identity))
.unwrap_or(false)
});
if !producer_has_authority {
continue;
}
for consumer in &consumers {
if consumer.trust_zone.is_lower_than(&producer.trust_zone) {
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::ArtifactBoundaryCrossing,
path: None,
nodes_involved: vec![producer.id, artifact.id, consumer.id],
message: format!(
"Artifact '{}' produced by privileged step '{}' consumed by '{}' ({:?} -> {:?})",
artifact.name,
producer.name,
consumer.name,
producer.trust_zone,
consumer.trust_zone
),
recommendation: Recommendation::TsafeRemediation {
command: format!(
"tsafe exec --ns {} -- <build-command>",
producer.name
),
explanation: format!(
"Scope secrets to '{}' only; artifact '{}' should not carry authority",
producer.name, artifact.name
),
},
});
}
}
}
}
findings
}
pub fn long_lived_credential(graph: &AuthorityGraph) -> Vec<Finding> {
const STATIC_PATTERNS: &[&str] = &[
"AWS_ACCESS_KEY",
"AWS_SECRET_ACCESS_KEY",
"_API_KEY",
"_APIKEY",
"_PASSWORD",
"_PASSWD",
"_PRIVATE_KEY",
"_SECRET_KEY",
"_SERVICE_ACCOUNT",
"_SIGNING_KEY",
];
let mut findings = Vec::new();
for secret in graph.nodes_of_kind(NodeKind::Secret) {
let upper = secret.name.to_uppercase();
let is_static = STATIC_PATTERNS.iter().any(|p| upper.contains(p));
if is_static {
findings.push(Finding {
severity: Severity::Low,
category: FindingCategory::LongLivedCredential,
path: None,
nodes_involved: vec![secret.id],
message: format!(
"'{}' looks like a long-lived static credential",
secret.name
),
recommendation: Recommendation::FederateIdentity {
static_secret: secret.name.clone(),
oidc_provider: "GitHub Actions OIDC (id-token: write)".into(),
},
});
}
}
findings
}
pub fn floating_image(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
let mut seen = std::collections::HashSet::new();
for image in graph.nodes_of_kind(NodeKind::Image) {
let is_container = image
.metadata
.get(META_CONTAINER)
.map(|v| v == "true")
.unwrap_or(false);
if !is_container {
continue;
}
if !seen.insert(image.name.as_str()) {
continue;
}
if !is_docker_digest_pinned(&image.name) {
findings.push(Finding {
severity: Severity::Medium,
category: FindingCategory::FloatingImage,
path: None,
nodes_involved: vec![image.id],
message: format!("Container image '{}' is not pinned to a digest", image.name),
recommendation: Recommendation::PinAction {
current: image.name.clone(),
pinned: format!(
"{}@sha256:<digest>",
image.name.split(':').next().unwrap_or(&image.name)
),
},
});
}
}
findings
}
pub fn persisted_credential(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for edge in &graph.edges {
if edge.kind != EdgeKind::PersistsTo {
continue;
}
let Some(step) = graph.node(edge.from) else {
continue;
};
let Some(target) = graph.node(edge.to) else {
continue;
};
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::PersistedCredential,
path: None,
nodes_involved: vec![step.id, target.id],
message: format!(
"'{}' persists '{}' to disk via persistCredentials: true — \
credential remains in .git/config and is accessible to all subsequent steps",
step.name, target.name
),
recommendation: Recommendation::Manual {
action: "Remove persistCredentials: true from the checkout step. \
Pass credentials explicitly only to steps that need them."
.into(),
},
});
}
findings
}
pub fn trigger_context_mismatch(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = match graph.metadata.get(META_TRIGGER) {
Some(t) => t.clone(),
None => return Vec::new(),
};
let severity = match trigger.as_str() {
"pull_request_target" => Severity::Critical,
"pr" => Severity::High,
_ => return Vec::new(),
};
let mut steps_with_authority: Vec<NodeId> = Vec::new();
let mut authority_targets: Vec<NodeId> = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let mut step_holds_authority = false;
for edge in graph.edges_from(step.id) {
if edge.kind != EdgeKind::HasAccessTo {
continue;
}
if let Some(target) = graph.node(edge.to) {
if matches!(target.kind, NodeKind::Secret | NodeKind::Identity) {
step_holds_authority = true;
if !authority_targets.contains(&target.id) {
authority_targets.push(target.id);
}
}
}
}
if step_holds_authority {
steps_with_authority.push(step.id);
}
}
if steps_with_authority.is_empty() {
return Vec::new();
}
let n = steps_with_authority.len();
let mut nodes_involved = steps_with_authority.clone();
nodes_involved.extend(authority_targets);
vec![Finding {
severity,
category: FindingCategory::TriggerContextMismatch,
path: None,
nodes_involved,
message: format!(
"Workflow triggered by {trigger} with secret/identity access — {n} step(s) hold authority that attacker-controlled code could reach"
),
recommendation: Recommendation::Manual {
action: "Use a separate workflow triggered by workflow_run (not pull_request_target) for privileged operations, or ensure no checkout of the PR head ref occurs before secret use".into(),
},
}]
}
pub fn cross_workflow_authority_chain(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let authority_nodes: Vec<&_> = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| matches!(n.kind, NodeKind::Secret | NodeKind::Identity))
.collect();
if authority_nodes.is_empty() {
continue;
}
for edge in graph.edges_from(step.id) {
if edge.kind != EdgeKind::DelegatesTo {
continue;
}
let Some(target) = graph.node(edge.to) else {
continue;
};
if target.kind != NodeKind::Image {
continue;
}
if target.trust_zone == TrustZone::FirstParty {
continue;
}
let severity = match target.trust_zone {
TrustZone::Untrusted => Severity::Critical,
TrustZone::ThirdParty => Severity::High,
TrustZone::FirstParty => continue,
};
let authority_names: Vec<String> =
authority_nodes.iter().map(|n| n.name.clone()).collect();
let authority_label = authority_names.join(", ");
let mut nodes_involved = vec![step.id, target.id];
nodes_involved.extend(authority_nodes.iter().map(|n| n.id));
findings.push(Finding {
severity,
category: FindingCategory::CrossWorkflowAuthorityChain,
path: None,
nodes_involved,
message: format!(
"'{}' delegates to '{}' ({:?}) while holding authority ({}) — authority chain extends into opaque external workflow",
step.name, target.name, target.trust_zone, authority_label
),
recommendation: Recommendation::Manual {
action: format!(
"Pin '{}' to a full SHA digest; audit what authority the called workflow receives",
target.name
),
},
});
}
}
findings
}
pub fn authority_cycle(graph: &AuthorityGraph) -> Vec<Finding> {
let n = graph.nodes.len();
if n == 0 {
return Vec::new();
}
let mut delegates_to: Vec<Vec<NodeId>> = vec![Vec::new(); n];
for edge in &graph.edges {
if edge.kind == EdgeKind::DelegatesTo && edge.from < n && edge.to < n {
delegates_to[edge.from].push(edge.to);
}
}
let mut color: Vec<u8> = vec![0u8; n]; let mut cycle_nodes: std::collections::BTreeSet<NodeId> = std::collections::BTreeSet::new();
for start in 0..n {
if color[start] != 0 {
continue;
}
color[start] = 1;
let mut stack: Vec<(NodeId, usize)> = vec![(start, 0)];
loop {
let len = stack.len();
if len == 0 {
break;
}
let (node_id, edge_idx) = stack[len - 1];
if edge_idx < delegates_to[node_id].len() {
stack[len - 1].1 += 1;
let neighbor = delegates_to[node_id][edge_idx];
if color[neighbor] == 1 {
let cycle_start_idx =
stack.iter().position(|&(n, _)| n == neighbor).unwrap_or(0);
for &(n, _) in &stack[cycle_start_idx..] {
cycle_nodes.insert(n);
}
} else if color[neighbor] == 0 {
color[neighbor] = 1;
stack.push((neighbor, 0));
}
} else {
color[node_id] = 2;
stack.pop();
}
}
}
if cycle_nodes.is_empty() {
return Vec::new();
}
vec![Finding {
severity: Severity::High,
category: FindingCategory::AuthorityCycle,
path: None,
nodes_involved: cycle_nodes.into_iter().collect(),
message:
"Circular delegation detected — workflow calls itself transitively, creating unbounded privilege escalation paths"
.into(),
recommendation: Recommendation::Manual {
action: "Break the delegation cycle — a workflow must not directly or transitively call itself".into(),
},
}]
}
pub fn uplift_without_attestation(graph: &AuthorityGraph) -> Vec<Finding> {
let oidc_identity_ids: Vec<NodeId> = graph
.nodes_of_kind(NodeKind::Identity)
.filter(|n| {
n.metadata
.get(META_OIDC)
.map(|v| v == "true")
.unwrap_or(false)
})
.map(|n| n.id)
.collect();
if oidc_identity_ids.is_empty() {
return Vec::new();
}
let has_attestation = graph.nodes.iter().any(|n| {
n.metadata
.get(META_ATTESTS)
.map(|v| v == "true")
.unwrap_or(false)
});
if has_attestation {
return Vec::new();
}
let mut steps_using_oidc: Vec<NodeId> = Vec::new();
for edge in &graph.edges {
if edge.kind != EdgeKind::HasAccessTo {
continue;
}
if oidc_identity_ids.contains(&edge.to) && !steps_using_oidc.contains(&edge.from) {
steps_using_oidc.push(edge.from);
}
}
if steps_using_oidc.is_empty() {
return Vec::new();
}
let n = steps_using_oidc.len();
let mut nodes_involved = steps_using_oidc.clone();
nodes_involved.extend(oidc_identity_ids);
vec![Finding {
severity: Severity::Info,
category: FindingCategory::UpliftWithoutAttestation,
path: None,
nodes_involved,
message: format!(
"{n} step(s) use OIDC/federated identity but no provenance attestation step was detected — artifact integrity cannot be verified"
),
recommendation: Recommendation::Manual {
action: "Add 'actions/attest-build-provenance' after your build step (GHA) to provide SLSA provenance. See https://docs.github.com/en/actions/security-guides/using-artifact-attestations".into(),
},
}]
}
pub fn self_mutating_pipeline(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let writes_gate = step
.metadata
.get(META_WRITES_ENV_GATE)
.map(|v| v == "true")
.unwrap_or(false);
if !writes_gate {
continue;
}
let authority_nodes: Vec<&_> = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| matches!(n.kind, NodeKind::Secret | NodeKind::Identity))
.collect();
let is_untrusted = step.trust_zone == TrustZone::Untrusted;
let has_authority = !authority_nodes.is_empty();
let severity = if is_untrusted {
Severity::Critical
} else if has_authority {
Severity::High
} else {
Severity::Medium
};
let mut nodes_involved = vec![step.id];
nodes_involved.extend(authority_nodes.iter().map(|n| n.id));
let message = if is_untrusted {
format!(
"Untrusted step '{}' writes to the environment gate — attacker-controlled values can inject into subsequent steps' environment",
step.name
)
} else if has_authority {
let authority_label: Vec<String> =
authority_nodes.iter().map(|n| n.name.clone()).collect();
format!(
"Step '{}' writes to the environment gate while holding authority ({}) — secrets may leak into pipeline environment",
step.name,
authority_label.join(", ")
)
} else {
format!(
"Step '{}' writes to the environment gate — values can propagate into subsequent steps' environment",
step.name
)
};
findings.push(Finding {
severity,
category: FindingCategory::SelfMutatingPipeline,
path: None,
nodes_involved,
message,
recommendation: Recommendation::Manual {
action: "Avoid writing secrets or attacker-controlled values to $GITHUB_ENV / $GITHUB_PATH / pipeline variables. Use explicit step outputs with narrow scoping instead.".into(),
},
});
}
findings
}
pub fn checkout_self_pr_exposure(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = graph.metadata.get(META_TRIGGER).map(|s| s.as_str());
let is_pr_context = matches!(trigger, Some("pr") | Some("pull_request_target"));
if !is_pr_context {
return vec![];
}
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let is_checkout_self = step
.metadata
.get(META_CHECKOUT_SELF)
.map(|v| v == "true")
.unwrap_or(false);
if !is_checkout_self {
continue;
}
findings.push(Finding {
category: FindingCategory::CheckoutSelfPrExposure,
severity: Severity::High,
message: format!(
"PR-triggered pipeline checks out the repository at step '{}' — \
attacker-controlled code from the fork lands on the runner and is \
readable by all subsequent steps",
step.name
),
path: None,
nodes_involved: vec![step.id],
recommendation: Recommendation::Manual {
action: "Use `persist-credentials: false` and avoid reading workspace \
files in subsequent privileged steps. Consider `checkout: none` \
for jobs that only need pipeline config, not source code."
.into(),
},
});
}
findings
}
pub fn variable_group_in_pr_job(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = graph
.metadata
.get(META_TRIGGER)
.map(|s| s.as_str())
.unwrap_or("");
if trigger != "pull_request_target" && trigger != "pr" {
return Vec::new();
}
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let accessed_var_groups: Vec<&_> = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| {
(n.kind == NodeKind::Secret || n.kind == NodeKind::Identity)
&& n.metadata
.get(META_VARIABLE_GROUP)
.map(|v| v == "true")
.unwrap_or(false)
})
.collect();
if !accessed_var_groups.is_empty() {
let group_names: Vec<_> = accessed_var_groups
.iter()
.map(|n| n.name.as_str())
.collect();
findings.push(Finding {
severity: Severity::Critical,
category: FindingCategory::VariableGroupInPrJob,
path: None,
nodes_involved: std::iter::once(step.id)
.chain(accessed_var_groups.iter().map(|n| n.id))
.collect(),
message: format!(
"PR-triggered step '{}' accesses variable group(s) [{}] — secrets cross into untrusted PR execution context",
step.name,
group_names.join(", ")
),
recommendation: Recommendation::CellosRemediation {
reason: format!(
"PR-triggered step '{}' can exfiltrate variable group secrets via untrusted code",
step.name
),
spec_hint: "cellos run --network deny-all --policy requireEgressDeclared,requireRuntimeSecretDelivery".into(),
},
});
}
}
findings
}
pub fn self_hosted_pool_pr_hijack(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = graph
.metadata
.get(META_TRIGGER)
.map(|s| s.as_str())
.unwrap_or("");
if trigger != "pull_request_target" && trigger != "pr" {
return Vec::new();
}
let has_self_hosted_pool = graph.nodes_of_kind(NodeKind::Image).any(|n| {
n.metadata
.get(META_SELF_HOSTED)
.map(|v| v == "true")
.unwrap_or(false)
});
if !has_self_hosted_pool {
return Vec::new();
}
let checkout_steps: Vec<&_> = graph
.nodes_of_kind(NodeKind::Step)
.filter(|n| {
n.metadata
.get(META_CHECKOUT_SELF)
.map(|v| v == "true")
.unwrap_or(false)
})
.collect();
if checkout_steps.is_empty() {
return Vec::new();
}
let pool_nodes: Vec<&_> = graph
.nodes_of_kind(NodeKind::Image)
.filter(|n| {
n.metadata
.get(META_SELF_HOSTED)
.map(|v| v == "true")
.unwrap_or(false)
})
.collect();
let mut nodes_involved: Vec<NodeId> = pool_nodes.iter().map(|n| n.id).collect();
nodes_involved.extend(checkout_steps.iter().map(|n| n.id));
vec![Finding {
severity: Severity::Critical,
category: FindingCategory::SelfHostedPoolPrHijack,
path: None,
nodes_involved,
message:
"PR-triggered pipeline uses self-hosted agent pool with checkout:self — enables git hook injection persisting across pipeline runs on the shared runner"
.into(),
recommendation: Recommendation::Manual {
action: "Run PR pipelines on Microsoft-hosted (ephemeral) agents, or disable checkout:self for PR-triggered jobs on self-hosted pools".into(),
},
}]
}
pub fn service_connection_scope_mismatch(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = graph
.metadata
.get(META_TRIGGER)
.map(|s| s.as_str())
.unwrap_or("");
if trigger != "pull_request_target" && trigger != "pr" {
return Vec::new();
}
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let broad_scs: Vec<&_> = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| {
n.kind == NodeKind::Identity
&& n.metadata
.get(META_SERVICE_CONNECTION)
.map(|v| v == "true")
.unwrap_or(false)
&& n.metadata
.get(META_OIDC)
.map(|v| v != "true")
.unwrap_or(true) && matches!(
n.metadata.get(META_IDENTITY_SCOPE).map(|s| s.as_str()),
Some("broad") | Some("Broad") | None )
})
.collect();
for sc in &broad_scs {
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::ServiceConnectionScopeMismatch,
path: None,
nodes_involved: vec![step.id, sc.id],
message: format!(
"PR-triggered step '{}' accesses service connection '{}' with broad/unknown scope and no OIDC federation — static credential may have subscription-wide Azure RBAC",
step.name, sc.name
),
recommendation: Recommendation::CellosRemediation {
reason: "Broad-scope service connection reachable from PR code — CellOS egress isolation limits lateral movement even when connection cannot be immediately rescoped".into(),
spec_hint: "cellos run --network deny-all --policy requireEgressDeclared".into(),
},
});
}
}
findings
}
pub fn template_extends_unpinned_branch(graph: &AuthorityGraph) -> Vec<Finding> {
let raw = match graph.metadata.get(META_REPOSITORIES) {
Some(s) => s,
None => return Vec::new(),
};
let entries: Vec<serde_json::Value> = match serde_json::from_str(raw) {
Ok(v) => v,
Err(_) => return Vec::new(),
};
let mut findings = Vec::new();
for entry in entries {
let alias = match entry.get("alias").and_then(|v| v.as_str()) {
Some(a) => a,
None => continue,
};
let name = entry.get("name").and_then(|v| v.as_str()).unwrap_or(alias);
let repo_type = entry
.get("repo_type")
.and_then(|v| v.as_str())
.unwrap_or("git");
let ref_value = entry.get("ref").and_then(|v| v.as_str());
let used = entry.get("used").and_then(|v| v.as_bool()).unwrap_or(false);
let classification = classify_repository_ref(ref_value);
let resolved = match classification {
RepositoryRefClass::Pinned => continue,
RepositoryRefClass::DefaultBranch => {
if !used {
continue;
}
"default branch (no ref:)".to_string()
}
RepositoryRefClass::MutableBranch(b) => format!("mutable branch '{b}'"),
};
let pinned_example = format!("ref: <40-char-sha> # commit on {name}");
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::TemplateExtendsUnpinnedBranch,
path: None,
nodes_involved: Vec::new(),
message: format!(
"ADO resources.repositories alias '{alias}' (type: {repo_type}, name: {name}) resolves to {resolved} — \
whoever owns that branch can inject steps at the next pipeline run"
),
recommendation: Recommendation::PinAction {
current: ref_value.unwrap_or("(default branch)").to_string(),
pinned: pinned_example,
},
});
}
findings
}
pub fn template_repo_ref_is_feature_branch(graph: &AuthorityGraph) -> Vec<Finding> {
let raw = match graph.metadata.get(META_REPOSITORIES) {
Some(s) => s,
None => return Vec::new(),
};
let entries: Vec<serde_json::Value> = match serde_json::from_str(raw) {
Ok(v) => v,
Err(_) => return Vec::new(),
};
let mut findings = Vec::new();
for entry in entries {
let alias = match entry.get("alias").and_then(|v| v.as_str()) {
Some(a) => a,
None => continue,
};
let name = entry.get("name").and_then(|v| v.as_str()).unwrap_or(alias);
let repo_type = entry
.get("repo_type")
.and_then(|v| v.as_str())
.unwrap_or("git");
let ref_value = entry.get("ref").and_then(|v| v.as_str());
let branch = match classify_repository_ref(ref_value) {
RepositoryRefClass::MutableBranch(b) => b,
RepositoryRefClass::Pinned | RepositoryRefClass::DefaultBranch => continue,
};
if !is_feature_class_branch(&branch) {
continue;
}
let pinned_example = format!("ref: <40-char-sha> # commit on {name}");
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::TemplateRepoRefIsFeatureBranch,
path: None,
nodes_involved: Vec::new(),
message: format!(
"ADO resources.repositories alias '{alias}' (type: {repo_type}, name: {name}) is pinned to feature-class branch '{branch}' — \
weaker than even an unpinned trunk pin: any developer with write access to that branch can inject pipeline steps without a code review on main"
),
recommendation: Recommendation::PinAction {
current: ref_value.unwrap_or("(default branch)").to_string(),
pinned: pinned_example,
},
});
}
findings
}
fn is_feature_class_branch(branch: &str) -> bool {
let normalised = branch
.trim()
.trim_start_matches("refs/heads/")
.to_ascii_lowercase();
if normalised.is_empty() {
return false;
}
if matches!(normalised.as_str(), "main" | "master") {
return false;
}
const TRUNK_PREFIXES: &[&str] = &["release/", "releases/", "hotfix/", "hotfixes/"];
for p in TRUNK_PREFIXES {
if normalised == p.trim_end_matches('/') || normalised.starts_with(p) {
return false;
}
}
true
}
const VM_REMOTE_EXEC_TOKENS: &[&str] = &[
"set-azvmextension",
"invoke-azvmruncommand",
"az vm run-command",
"az vm extension set",
];
const SAS_MINT_TOKENS: &[&str] = &[
"new-azstoragecontainersastoken",
"new-azstorageblobsastoken",
"new-azstorageaccountsastoken",
"az storage container generate-sas",
"az storage blob generate-sas",
"az storage account generate-sas",
];
const COMMAND_LINE_SINK_TOKENS: &[&str] = &[
"commandtoexecute",
"scriptarguments",
"--arguments",
"-argumentlist",
"--scripts",
"-scriptstring",
];
fn step_secret_var_names(graph: &AuthorityGraph, step_id: NodeId) -> Vec<&str> {
graph
.edges_from(step_id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| n.kind == NodeKind::Secret)
.map(|n| n.name.as_str())
.collect()
}
fn step_secret_names(graph: &AuthorityGraph, step_id: NodeId) -> Vec<String> {
graph
.edges_from(step_id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| n.kind == NodeKind::Secret)
.map(|n| n.name.clone())
.collect()
}
fn body_interpolates_var(script_body: &str, var_name: &str) -> bool {
if var_name.is_empty() {
return false;
}
let body = script_body.to_lowercase();
let name = var_name.to_lowercase();
let dollar_paren = format!("$({name})");
if body.contains(&dollar_paren) {
return true;
}
let needle = format!("${name}");
let mut search_from = 0usize;
while let Some(pos) = body[search_from..].find(&needle) {
let abs = search_from + pos;
let end = abs + needle.len();
let next = body.as_bytes().get(end).copied();
let is_word = matches!(next, Some(c) if c.is_ascii_alphanumeric() || c == b'_');
if !is_word {
return true;
}
search_from = end;
}
false
}
fn script_assigns_secret_to_shell_var(script: &str, secret: &str) -> bool {
let needle = format!("$({secret})");
for line in script.lines() {
if !line.contains(&needle) {
continue;
}
let lhs = match line.find(&needle) {
Some(pos) => &line[..pos],
None => continue,
};
let trimmed = lhs.trim_start();
if matches_bash_assignment(trimmed) {
return true;
}
if matches_powershell_assignment(trimmed) {
return true;
}
}
false
}
fn body_mints_sas(body_lower: &str) -> bool {
SAS_MINT_TOKENS.iter().any(|t| body_lower.contains(t))
}
fn body_uses_vm_remote_exec(body_lower: &str) -> bool {
VM_REMOTE_EXEC_TOKENS.iter().any(|t| body_lower.contains(t))
}
fn body_has_cmdline_sink(body_lower: &str) -> bool {
COMMAND_LINE_SINK_TOKENS
.iter()
.any(|t| body_lower.contains(t))
}
fn powershell_sas_assignments(body: &str) -> Vec<String> {
let mut out = Vec::new();
let lower = body.to_lowercase();
let bytes = lower.as_bytes();
let mut i = 0usize;
while i < bytes.len() {
if bytes[i] != b'$' {
i += 1;
continue;
}
let name_start = i + 1;
let mut j = name_start;
while j < bytes.len() {
let c = bytes[j];
if c.is_ascii_alphanumeric() || c == b'_' {
j += 1;
} else {
break;
}
}
if j == name_start {
i += 1;
continue;
}
let mut k = j;
while k < bytes.len() && (bytes[k] == b' ' || bytes[k] == b'\t') {
k += 1;
}
if k >= bytes.len() || bytes[k] != b'=' {
i = j;
continue;
}
k += 1;
while k < bytes.len() && (bytes[k] == b' ' || bytes[k] == b'\t') {
k += 1;
}
let line_end = lower[k..].find('\n').map(|p| k + p).unwrap_or(bytes.len());
let rhs = &lower[k..line_end];
if SAS_MINT_TOKENS.iter().any(|t| rhs.contains(t)) {
let name = body
.get(name_start..j)
.unwrap_or(&lower[name_start..j])
.to_string();
if !out.iter().any(|n: &String| n.eq_ignore_ascii_case(&name)) {
out.push(name);
}
}
i = j;
}
out
}
pub fn vm_remote_exec_via_pipeline_secret(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let body = match step.metadata.get(META_SCRIPT_BODY) {
Some(b) if !b.is_empty() => b,
_ => continue,
};
let body_lower = body.to_lowercase();
if !body_uses_vm_remote_exec(&body_lower) {
continue;
}
let secret_names = step_secret_var_names(graph, step.id);
let secret_interpolated = secret_names
.iter()
.any(|name| body_interpolates_var(body, name));
let mints_sas = body_mints_sas(&body_lower);
if !secret_interpolated && !mints_sas {
continue;
}
let tool = VM_REMOTE_EXEC_TOKENS
.iter()
.find(|t| body_lower.contains(*t))
.copied()
.unwrap_or("Set-AzVMExtension");
let trigger = if secret_interpolated {
"interpolating a pipeline secret into the executed command line"
} else {
"embedding a freshly-minted SAS token into the executed command line"
};
let mut nodes_involved = vec![step.id];
for edge in graph.edges_from(step.id) {
if edge.kind == EdgeKind::HasAccessTo {
if let Some(n) = graph.node(edge.to) {
if n.kind == NodeKind::Secret {
nodes_involved.push(n.id);
}
}
}
}
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::VmRemoteExecViaPipelineSecret,
path: None,
nodes_involved,
message: format!(
"Step '{}' uses {} {} — pipeline-to-VM RCE primitive; credential is logged on the VM and in ARM extension status",
step.name, tool, trigger
),
recommendation: Recommendation::Manual {
action: "Stage the script on the VM and pass the SAS via env var or protectedSettings (encrypted, not logged); avoid embedding secrets in commandToExecute".into(),
},
});
}
findings
}
fn matches_bash_assignment(lhs: &str) -> bool {
let after_keyword = strip_one_of(lhs, &["export ", "declare ", "local ", "readonly "])
.unwrap_or(lhs)
.trim_start();
let trimmed = after_keyword.trim_end_matches(['"', '\'']);
let Some(ident) = trimmed.strip_suffix('=') else {
return false;
};
!ident.is_empty()
&& ident.chars().all(is_shell_var_char)
&& !ident.starts_with(|c: char| c.is_ascii_digit())
}
fn matches_powershell_assignment(lhs: &str) -> bool {
let trimmed = lhs.trim_end().trim_end_matches(['"', '\'']).trim_end();
if let Some(before_eq) = trimmed.strip_suffix('=') {
let before_eq = before_eq.trim_end();
if before_eq.starts_with('$') {
return true;
}
}
if trimmed.contains("Set-Variable") && trimmed.contains("-Value") {
return true;
}
false
}
fn is_shell_var_char(c: char) -> bool {
c.is_ascii_alphanumeric() || c == '_'
}
fn strip_one_of<'a>(s: &'a str, prefixes: &[&str]) -> Option<&'a str> {
for p in prefixes {
if let Some(rest) = s.strip_prefix(p) {
return Some(rest);
}
}
None
}
pub fn secret_to_inline_script_env_export(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let Some(script) = step.metadata.get(META_SCRIPT_BODY) else {
continue;
};
if script.is_empty() {
continue;
}
let secrets = step_secret_names(graph, step.id);
let exposed: Vec<String> = secrets
.into_iter()
.filter(|s| script_assigns_secret_to_shell_var(script, s))
.collect();
if exposed.is_empty() {
continue;
}
let n = exposed.len();
let preview: String = exposed
.iter()
.take(3)
.map(|s| format!("$({s})"))
.collect::<Vec<_>>()
.join(", ");
let suffix = if n > 3 {
format!(", and {} more", n - 3)
} else {
String::new()
};
let secret_node_ids: Vec<NodeId> = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| n.kind == NodeKind::Secret && exposed.contains(&n.name))
.map(|n| n.id)
.collect();
let mut nodes_involved = vec![step.id];
nodes_involved.extend(secret_node_ids);
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::SecretToInlineScriptEnvExport,
path: None,
nodes_involved,
message: format!(
"Step '{}' assigns pipeline secret(s) {preview}{suffix} to shell variables inside an inline script — once bound to a variable the value bypasses ADO's $(SECRET) log mask and will appear in any transcript (Start-Transcript, bash -x, terraform/az --debug)",
step.name
),
recommendation: Recommendation::TsafeRemediation {
command: "tsafe exec --ns <scoped-namespace> -- <command>".to_string(),
explanation: "Inject the secret as an env var on the step itself (ADO `env:` block) instead of materialising it inside the script body. The value still reaches the process but never travels through a shell variable assignment that transcripts can capture.".to_string(),
},
});
}
findings
}
enum RepositoryRefClass {
Pinned,
DefaultBranch,
MutableBranch(String),
}
fn classify_repository_ref(ref_value: Option<&str>) -> RepositoryRefClass {
let raw = match ref_value {
None => return RepositoryRefClass::DefaultBranch,
Some(s) if s.trim().is_empty() => return RepositoryRefClass::DefaultBranch,
Some(s) => s.trim(),
};
if is_hex_sha(raw) {
return RepositoryRefClass::Pinned;
}
if let Some(tag) = raw.strip_prefix("refs/tags/") {
if !tag.is_empty() {
return RepositoryRefClass::Pinned;
}
}
if let Some(branch) = raw.strip_prefix("refs/heads/") {
if is_hex_sha(branch) {
return RepositoryRefClass::Pinned;
}
return RepositoryRefClass::MutableBranch(branch.to_string());
}
RepositoryRefClass::MutableBranch(raw.to_string())
}
fn is_hex_sha(s: &str) -> bool {
s.len() >= 40 && s.chars().all(|c| c.is_ascii_hexdigit())
}
pub fn short_lived_sas_in_command_line(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let body = match step.metadata.get(META_SCRIPT_BODY) {
Some(b) if !b.is_empty() => b,
_ => continue,
};
let body_lower = body.to_lowercase();
if !body_mints_sas(&body_lower) {
continue;
}
if !body_has_cmdline_sink(&body_lower) {
continue;
}
let sas_vars = powershell_sas_assignments(body);
let mut interpolated_var: Option<String> = None;
for v in &sas_vars {
if body_interpolates_var(body, v) {
interpolated_var = Some(v.clone());
break;
}
}
let evidence = interpolated_var
.as_deref()
.map(|v| format!("$ {v} interpolated into argv"))
.unwrap_or_else(|| "SAS-mint and command-line sink in same script".to_string());
findings.push(Finding {
severity: Severity::Medium,
category: FindingCategory::ShortLivedSasInCommandLine,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' mints a SAS token and passes it on the command line ({}) — argv lands in /proc, ETW, and ARM extension status for the token's lifetime",
step.name, evidence
),
recommendation: Recommendation::Manual {
action: "Pass the SAS via env var, stdin, or VM-extension protectedSettings; never put SAS tokens in commandToExecute / --arguments / -ArgumentList".into(),
},
});
}
findings
}
fn line_writes_to_file(line: &str) -> bool {
if line.contains(" > ")
|| line.contains(" >> ")
|| line.contains(">/")
|| line.contains(">>/")
|| line.contains("| tee ")
|| line.contains("| tee -")
|| line.starts_with("tee ")
{
return true;
}
let lower = line.to_lowercase();
if lower.contains("out-file")
|| lower.contains("set-content")
|| lower.contains("add-content")
|| lower.contains("writealltext")
|| lower.contains("writealllines")
{
return true;
}
false
}
fn line_references_workspace_path(line: &str) -> bool {
let lower = line.to_lowercase();
if lower.contains("$(system.defaultworkingdirectory)")
|| lower.contains("$(build.sourcesdirectory)")
|| lower.contains("$(pipeline.workspace)")
|| lower.contains("$(agent.builddirectory)")
|| lower.contains("$(agent.tempdirectory)")
{
return true;
}
const RISKY_EXT: &[&str] = &[
".tfvars",
".env",
".hcl",
".pfx",
".key",
".pem",
".crt",
".p12",
".kubeconfig",
".jks",
".keystore",
];
RISKY_EXT.iter().any(|ext| lower.contains(ext))
}
fn script_materialises_secret_to_file(script: &str, secret: &str) -> bool {
let needle = format!("$({secret})");
for line in script.lines() {
if line.contains(&needle)
&& line_writes_to_file(line)
&& line_references_workspace_path(line)
{
return true;
}
}
let mut secret_bound_to_var = false;
for line in script.lines() {
let trimmed = line.trim();
if !secret_bound_to_var
&& trimmed.contains(&needle)
&& trimmed.starts_with('$')
&& trimmed.contains('=')
{
secret_bound_to_var = true;
continue;
}
if secret_bound_to_var && line_writes_to_file(line) && line_references_workspace_path(line)
{
return true;
}
}
false
}
pub fn secret_materialised_to_workspace_file(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let Some(script) = step.metadata.get(META_SCRIPT_BODY) else {
continue;
};
if script.is_empty() {
continue;
}
let secrets = step_secret_names(graph, step.id);
let materialised: Vec<String> = secrets
.into_iter()
.filter(|s| script_materialises_secret_to_file(script, s))
.collect();
if materialised.is_empty() {
continue;
}
let n = materialised.len();
let preview: String = materialised
.iter()
.take(3)
.map(|s| format!("$({s})"))
.collect::<Vec<_>>()
.join(", ");
let suffix = if n > 3 {
format!(", and {} more", n - 3)
} else {
String::new()
};
let secret_node_ids: Vec<NodeId> = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.filter(|n| n.kind == NodeKind::Secret && materialised.contains(&n.name))
.map(|n| n.id)
.collect();
let mut nodes_involved = vec![step.id];
nodes_involved.extend(secret_node_ids);
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::SecretMaterialisedToWorkspaceFile,
path: None,
nodes_involved,
message: format!(
"Step '{}' writes pipeline secret(s) {preview}{suffix} to a file under the agent workspace — the file persists for the rest of the job, is readable by every subsequent step, and may be uploaded by PublishPipelineArtifact",
step.name
),
recommendation: Recommendation::Manual {
action: "Replace inline secret materialisation with the `secureFile` task (downloaded to a temp dir with 0600 perms and auto-deleted), or pass the secret to the consuming tool over stdin / an env var instead of via a workspace file. If a file is unavoidable, write under `$(Agent.TempDirectory)` and `chmod 600` immediately.".into(),
},
});
}
findings
}
fn script_extracts_keyvault_to_plaintext(script: &str) -> bool {
let lower = script.to_lowercase();
if lower.contains("get-azkeyvaultsecret") && lower.contains("-asplaintext") {
return true;
}
if lower.contains("convertfrom-securestring") && lower.contains("-asplaintext") {
return true;
}
if lower.contains("get-azkeyvaultsecret") && lower.contains(".secretvaluetext") {
return true;
}
if lower.contains("get-azkeyvaultsecret") && lower.contains("ptrtostringauto") {
return true;
}
false
}
pub fn keyvault_secret_to_plaintext(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let Some(script) = step.metadata.get(META_SCRIPT_BODY) else {
continue;
};
if script.is_empty() {
continue;
}
if !script_extracts_keyvault_to_plaintext(script) {
continue;
}
findings.push(Finding {
severity: Severity::Medium,
category: FindingCategory::KeyVaultSecretToPlaintext,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' extracts a Key Vault secret as plaintext inside an inline script (-AsPlainText / .SecretValueText) — value bypasses ADO variable-group masking and is printed by Az verbose logging or any error stack trace",
step.name
),
recommendation: Recommendation::Manual {
action: "Keep the secret as a `SecureString`: drop `-AsPlainText`, pass the SecureString directly to cmdlets that accept it (e.g. `New-PSCredential`, `Connect-AzAccount -ServicePrincipal -Credential ...`), and only convert to plaintext at the moment of consumption, scoped to a single expression. For values that must be plaintext (REST calls, env vars) prefer ADO variable groups linked to Key Vault — the value then participates in pipeline log masking.".into(),
},
});
}
findings
}
fn looks_like_prod_connection(name: &str) -> bool {
let lower = name.to_lowercase();
let token_match = |s: &str| {
lower == s
|| lower.contains(&format!("-{s}-"))
|| lower.contains(&format!("_{s}_"))
|| lower.ends_with(&format!("-{s}"))
|| lower.ends_with(&format!("_{s}"))
|| lower.starts_with(&format!("{s}-"))
|| lower.starts_with(&format!("{s}_"))
};
token_match("prod") || token_match("production") || token_match("prd")
}
fn script_launders_spn_token(s: &str) -> bool {
let lower = s.to_lowercase();
if !lower.contains("##vso[task.setvariable") {
return false;
}
let token_markers = [
"$env:idtoken",
"$env:serviceprincipalkey",
"$env:serviceprincipalid",
"$env:tenantid",
"arm_oidc_token",
"arm_client_id",
"arm_client_secret",
"arm_tenant_id",
];
token_markers.iter().any(|m| lower.contains(m))
}
pub fn terraform_auto_approve_in_prod(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let auto_approve = step
.metadata
.get(META_TERRAFORM_AUTO_APPROVE)
.map(|v| v == "true")
.unwrap_or(false);
if !auto_approve {
continue;
}
let direct_conn = step.metadata.get(META_SERVICE_CONNECTION_NAME).cloned();
let edge_conn = graph
.edges_from(step.id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.find(|n| {
n.kind == NodeKind::Identity
&& n.metadata
.get(META_SERVICE_CONNECTION)
.map(|v| v == "true")
.unwrap_or(false)
})
.map(|n| n.name.clone());
let conn_name = match direct_conn.or(edge_conn) {
Some(n) if looks_like_prod_connection(&n) => n,
_ => continue,
};
let env_gated = step
.metadata
.get(META_ENV_APPROVAL)
.map(|v| v == "true")
.unwrap_or(false);
if env_gated {
continue;
}
findings.push(Finding {
severity: Severity::Critical,
category: FindingCategory::TerraformAutoApproveInProd,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' runs `terraform apply -auto-approve` against production service connection '{}' with no environment approval gate — any committer can rewrite prod infrastructure",
step.name, conn_name
),
recommendation: Recommendation::Manual {
action: "Move the apply step into a deployment job whose `environment:` is configured with required approvers in ADO, OR remove `-auto-approve` and run apply behind a manual checkpoint task. Combine with a non-shared agent pool so committers cannot pre-stage payloads.".into(),
},
});
}
findings
}
pub fn addspn_with_inline_script(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let add_spn = step
.metadata
.get(META_ADD_SPN_TO_ENV)
.map(|v| v == "true")
.unwrap_or(false);
if !add_spn {
continue;
}
let body = match step.metadata.get(META_SCRIPT_BODY) {
Some(b) if !b.trim().is_empty() => b,
_ => continue,
};
let launders = script_launders_spn_token(body);
let suffix = if launders {
" — explicit token laundering detected (##vso[task.setvariable] writes federated token material)"
} else {
""
};
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::AddSpnWithInlineScript,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' runs an inline script with addSpnToEnvironment:true — the federated SPN (idToken/servicePrincipalKey/tenantId) is exposed to script-controlled code and can be exfiltrated via setvariable{}",
step.name, suffix
),
recommendation: Recommendation::Manual {
action: "Replace the inline script with `scriptPath:` pointing to a reviewed file in-repo, OR drop `addSpnToEnvironment: true` and use the task's first-class auth surface. Never emit federated token material via `##vso[task.setvariable]` — those values are inherited by every downstream task and may appear in logs.".into(),
},
});
}
findings
}
pub fn parameter_interpolation_into_shell(graph: &AuthorityGraph) -> Vec<Finding> {
if graph.parameters.is_empty() {
return Vec::new();
}
let free_form: Vec<&str> = graph
.parameters
.iter()
.filter(|(_, spec)| {
!spec.has_values_allowlist
&& (spec.param_type.is_empty() || spec.param_type.eq_ignore_ascii_case("string"))
})
.map(|(name, _)| name.as_str())
.collect();
if free_form.is_empty() {
return Vec::new();
}
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let body = match step.metadata.get(META_SCRIPT_BODY) {
Some(b) if !b.is_empty() => b,
_ => continue,
};
let mut hits: Vec<&str> = Vec::new();
for &name in &free_form {
let needle_a = format!("${{{{ parameters.{name} }}}}");
let needle_b = format!("${{{{parameters.{name}}}}}");
if body.contains(&needle_a) || body.contains(&needle_b) {
hits.push(name);
}
}
if hits.is_empty() {
continue;
}
hits.sort();
hits.dedup();
let names = hits.join(", ");
findings.push(Finding {
severity: Severity::Medium,
category: FindingCategory::ParameterInterpolationIntoShell,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' interpolates free-form string parameter(s) [{}] into an inline script — anyone with 'queue build' permission can inject shell commands",
step.name, names
),
recommendation: Recommendation::Manual {
action: "Add a `values:` allowlist to the parameter declaration to constrain accepted inputs, OR pass the parameter through the step's `env:` block so the runtime quotes it as a shell variable instead of YAML-interpolating raw text.".into(),
},
});
}
findings
}
fn body_has_pipe_to_shell_with_floating_url(body: &str) -> bool {
let lower = body;
let has_curl_or_wget = lower.contains("curl") || lower.contains("wget");
let has_pipe_shell = lower.contains("| bash")
|| lower.contains("|bash")
|| lower.contains("| sh")
|| lower.contains("|sh")
|| lower.contains("<(curl")
|| lower.contains("<(wget");
let has_deno_remote = lower.contains("deno run http://") || lower.contains("deno run https://");
if !((has_curl_or_wget && has_pipe_shell) || has_deno_remote) {
return false;
}
for line in body.lines() {
let line_has_pipe_shell = line.contains("| bash")
|| line.contains("|bash")
|| line.contains("| sh")
|| line.contains("|sh")
|| line.contains("<(curl")
|| line.contains("<(wget");
let line_has_deno_remote =
line.contains("deno run http://") || line.contains("deno run https://");
if !(line_has_pipe_shell || line_has_deno_remote) {
continue;
}
if line_url_is_mutable(line) {
return true;
}
}
false
}
fn line_url_is_mutable(line: &str) -> bool {
const MUTABLE_PATHS: &[&str] = &[
"refs/heads/",
"/HEAD/",
"/main/",
"/master/",
"/develop/",
"/trunk/",
"/latest/",
];
for marker in MUTABLE_PATHS {
if line.contains(marker) {
return true;
}
}
false
}
pub fn runtime_script_fetched_from_floating_url(graph: &AuthorityGraph) -> Vec<Finding> {
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let body = match step.metadata.get(META_SCRIPT_BODY) {
Some(b) if !b.is_empty() => b,
_ => continue,
};
if !body_has_pipe_to_shell_with_floating_url(body) {
continue;
}
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::RuntimeScriptFetchedFromFloatingUrl,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' downloads and executes a script from a mutable URL (curl|bash, wget|sh, or `deno run` against a branch ref) — whoever controls that branch executes arbitrary code on the runner",
step.name
),
recommendation: Recommendation::Manual {
action: "Pin the URL to a release tag or commit SHA (e.g. .../v1.2.3/install.sh) and verify the download against a known checksum before executing it. Avoid `curl … | bash` entirely where possible — fetch to a file, inspect, then run.".into(),
},
});
}
findings
}
fn trigger_is_privileged_pr_class(trigger: &str) -> bool {
trigger.split(',').any(|t| {
let t = t.trim();
matches!(t, "pull_request_target" | "issue_comment" | "workflow_run")
})
}
pub fn pr_trigger_with_floating_action_ref(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = match graph.metadata.get(META_TRIGGER) {
Some(t) => t.as_str(),
None => return Vec::new(),
};
if !trigger_is_privileged_pr_class(trigger) {
return Vec::new();
}
let mut findings = Vec::new();
let mut seen = std::collections::HashSet::new();
for image in graph.nodes_of_kind(NodeKind::Image) {
if image.trust_zone == TrustZone::FirstParty {
continue;
}
if image
.metadata
.get(META_CONTAINER)
.map(|v| v == "true")
.unwrap_or(false)
{
continue;
}
if image.metadata.contains_key(META_SELF_HOSTED) {
continue;
}
if is_sha_pinned(&image.name) {
continue;
}
if !seen.insert(&image.name) {
continue;
}
findings.push(Finding {
severity: Severity::Critical,
category: FindingCategory::PrTriggerWithFloatingActionRef,
path: None,
nodes_involved: vec![image.id],
message: format!(
"Workflow trigger '{trigger}' runs in privileged base-repo context and step uses unpinned action '{}' — anyone who can push to that action's branch executes arbitrary code with full repo write token",
image.name
),
recommendation: Recommendation::PinAction {
current: image.name.clone(),
pinned: format!(
"{}@<sha256-digest>",
image.name.split('@').next().unwrap_or(&image.name)
),
},
});
}
findings
}
fn body_writes_api_response_to_env_sink(body: &str) -> bool {
let writes_env_sink = body.contains("$GITHUB_ENV")
|| body.contains("${GITHUB_ENV}")
|| body.contains("$GITHUB_OUTPUT")
|| body.contains("${GITHUB_OUTPUT}")
|| body.contains("$GITHUB_PATH")
|| body.contains("${GITHUB_PATH}");
if !writes_env_sink {
return false;
}
let calls_api = body.contains("gh pr view")
|| body.contains("gh pr list")
|| body.contains("gh api ")
|| body.contains("gh issue view")
|| body.contains("api.github.com");
if !calls_api {
return false;
}
let lines: Vec<&str> = body.lines().collect();
for line in &lines {
let line_calls_api = line.contains("gh pr view")
|| line.contains("gh pr list")
|| line.contains("gh api ")
|| line.contains("gh issue view")
|| line.contains("api.github.com");
let line_writes_sink = line.contains("$GITHUB_ENV")
|| line.contains("${GITHUB_ENV}")
|| line.contains("$GITHUB_OUTPUT")
|| line.contains("${GITHUB_OUTPUT}")
|| line.contains("$GITHUB_PATH")
|| line.contains("${GITHUB_PATH}");
if line_calls_api && line_writes_sink {
return true;
}
}
let mut last_api_line: Option<usize> = None;
for (i, line) in lines.iter().enumerate() {
let line_calls_api = line.contains("gh pr view")
|| line.contains("gh pr list")
|| line.contains("gh api ")
|| line.contains("gh issue view")
|| line.contains("api.github.com");
if line_calls_api {
last_api_line = Some(i);
}
let line_writes_sink = line.contains("$GITHUB_ENV")
|| line.contains("${GITHUB_ENV}")
|| line.contains("$GITHUB_OUTPUT")
|| line.contains("${GITHUB_OUTPUT}")
|| line.contains("$GITHUB_PATH")
|| line.contains("${GITHUB_PATH}");
if line_writes_sink {
if let Some(api_idx) = last_api_line {
if i.saturating_sub(api_idx) <= 6 {
return true;
}
}
}
}
false
}
pub fn untrusted_api_response_to_env_sink(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = match graph.metadata.get(META_TRIGGER) {
Some(t) => t.as_str(),
None => return Vec::new(),
};
let trigger_in_scope = trigger.split(',').any(|t| {
let t = t.trim();
matches!(t, "workflow_run" | "pull_request_target" | "issue_comment")
});
if !trigger_in_scope {
return Vec::new();
}
let mut findings = Vec::new();
for step in graph.nodes_of_kind(NodeKind::Step) {
let body = match step.metadata.get(META_SCRIPT_BODY) {
Some(b) if !b.is_empty() => b,
_ => continue,
};
if !body_writes_api_response_to_env_sink(body) {
continue;
}
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::UntrustedApiResponseToEnvSink,
path: None,
nodes_involved: vec![step.id],
message: format!(
"Step '{}' captures a GitHub API response (gh CLI or api.github.com) into the GHA env gate ($GITHUB_ENV/$GITHUB_OUTPUT/$GITHUB_PATH) under trigger '{trigger}' — attacker-influenced fields (branch name, PR title) can inject environment variables for every subsequent step in the same job",
step.name
),
recommendation: Recommendation::Manual {
action: "Validate the API field with a strict regex before redirecting (e.g. only `[0-9]+` for a PR number), or write only known-numeric fields. Never pipe free-form fields like branch name or PR title directly into $GITHUB_ENV.".into(),
},
});
}
findings
}
fn is_registry_login_action(action: &str) -> bool {
let bare = action.split('@').next().unwrap_or(action);
matches!(
bare,
"docker/login-action"
| "aws-actions/amazon-ecr-login"
| "aws-actions/configure-aws-credentials"
| "azure/docker-login"
| "azure/login"
| "google-github-actions/auth"
| "google-github-actions/setup-gcloud"
) || bare.ends_with("/login-to-gar")
|| bare.ends_with("/dockerhub-login")
|| bare.ends_with("/login-to-ecr")
|| bare.ends_with("/login-to-acr")
}
fn trigger_includes_pull_request(trigger: &str) -> bool {
trigger.split(',').any(|t| {
let t = t.trim();
t == "pull_request" || t == "pull_request_target"
})
}
pub fn pr_build_pushes_image_with_floating_credentials(graph: &AuthorityGraph) -> Vec<Finding> {
let trigger = match graph.metadata.get(META_TRIGGER) {
Some(t) => t.as_str(),
None => return Vec::new(),
};
if !trigger_includes_pull_request(trigger) {
return Vec::new();
}
let mut findings = Vec::new();
let mut seen = std::collections::HashSet::new();
for image in graph.nodes_of_kind(NodeKind::Image) {
if image.trust_zone == TrustZone::FirstParty {
continue;
}
if image
.metadata
.get(META_CONTAINER)
.map(|v| v == "true")
.unwrap_or(false)
{
continue;
}
if !is_registry_login_action(&image.name) {
continue;
}
if is_sha_pinned(&image.name) {
continue;
}
if !seen.insert(&image.name) {
continue;
}
findings.push(Finding {
severity: Severity::High,
category: FindingCategory::PrBuildPushesImageWithFloatingCredentials,
path: None,
nodes_involved: vec![image.id],
message: format!(
"PR-triggered workflow ('{trigger}') uses unpinned registry-login action '{}' — a compromise of that action's branch exfiltrates registry credentials or OIDC tokens, and any PR-controlled image content then reaches a shared registry",
image.name
),
recommendation: Recommendation::PinAction {
current: image.name.clone(),
pinned: format!(
"{}@<sha256-digest>",
image.name.split('@').next().unwrap_or(&image.name)
),
},
});
}
findings
}
pub fn run_all_rules(graph: &AuthorityGraph, max_hops: usize) -> Vec<Finding> {
let mut findings = Vec::new();
findings.extend(authority_propagation(graph, max_hops));
findings.extend(over_privileged_identity(graph));
findings.extend(unpinned_action(graph));
findings.extend(untrusted_with_authority(graph));
findings.extend(artifact_boundary_crossing(graph));
findings.extend(long_lived_credential(graph));
findings.extend(floating_image(graph));
findings.extend(persisted_credential(graph));
findings.extend(trigger_context_mismatch(graph));
findings.extend(cross_workflow_authority_chain(graph));
findings.extend(authority_cycle(graph));
findings.extend(uplift_without_attestation(graph));
findings.extend(self_mutating_pipeline(graph));
findings.extend(checkout_self_pr_exposure(graph));
findings.extend(variable_group_in_pr_job(graph));
findings.extend(self_hosted_pool_pr_hijack(graph));
findings.extend(service_connection_scope_mismatch(graph));
findings.extend(template_extends_unpinned_branch(graph));
findings.extend(template_repo_ref_is_feature_branch(graph));
findings.extend(vm_remote_exec_via_pipeline_secret(graph));
findings.extend(short_lived_sas_in_command_line(graph));
findings.extend(secret_to_inline_script_env_export(graph));
findings.extend(secret_materialised_to_workspace_file(graph));
findings.extend(keyvault_secret_to_plaintext(graph));
findings.extend(terraform_auto_approve_in_prod(graph));
findings.extend(addspn_with_inline_script(graph));
findings.extend(parameter_interpolation_into_shell(graph));
findings.extend(runtime_script_fetched_from_floating_url(graph));
findings.extend(pr_trigger_with_floating_action_ref(graph));
findings.extend(untrusted_api_response_to_env_sink(graph));
findings.extend(pr_build_pushes_image_with_floating_credentials(graph));
apply_confidence_cap(graph, &mut findings);
findings.sort_by_key(|f| f.severity);
findings
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::*;
fn source(file: &str) -> PipelineSource {
PipelineSource {
file: file.into(),
repo: None,
git_ref: None,
commit_sha: None,
}
}
#[test]
fn unpinned_third_party_action_flagged() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.add_node(
NodeKind::Image,
"actions/checkout@v4",
TrustZone::ThirdParty,
);
let findings = unpinned_action(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].category, FindingCategory::UnpinnedAction);
}
#[test]
fn pinned_action_not_flagged() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.add_node(
NodeKind::Image,
"actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29",
TrustZone::ThirdParty,
);
let findings = unpinned_action(&g);
assert!(findings.is_empty());
}
#[test]
fn untrusted_step_with_secret_is_critical() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "evil-action", TrustZone::Untrusted);
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = untrusted_with_authority(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
}
#[test]
fn implicit_identity_downgrades_to_info() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "AzureCLI@2", TrustZone::Untrusted);
let mut meta = std::collections::HashMap::new();
meta.insert(META_IMPLICIT.into(), "true".into());
meta.insert(META_IDENTITY_SCOPE.into(), "broad".into());
let token = g.add_node_with_metadata(
NodeKind::Identity,
"System.AccessToken",
TrustZone::FirstParty,
meta,
);
g.add_edge(step, token, EdgeKind::HasAccessTo);
let findings = untrusted_with_authority(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].severity,
Severity::Info,
"implicit token must be Info not Critical"
);
assert!(findings[0].message.contains("platform-injected"));
}
#[test]
fn explicit_secret_remains_critical_despite_implicit_token() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "AzureCLI@2", TrustZone::Untrusted);
let mut meta = std::collections::HashMap::new();
meta.insert(META_IMPLICIT.into(), "true".into());
let token = g.add_node_with_metadata(
NodeKind::Identity,
"System.AccessToken",
TrustZone::FirstParty,
meta,
);
let secret = g.add_node(NodeKind::Secret, "ARM_CLIENT_SECRET", TrustZone::FirstParty);
g.add_edge(step, token, EdgeKind::HasAccessTo);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = untrusted_with_authority(&g);
assert_eq!(findings.len(), 2);
let info = findings
.iter()
.find(|f| f.severity == Severity::Info)
.unwrap();
let crit = findings
.iter()
.find(|f| f.severity == Severity::Critical)
.unwrap();
assert!(info.message.contains("platform-injected"));
assert!(crit.message.contains("ARM_CLIENT_SECRET"));
}
#[test]
fn artifact_crossing_detected() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let secret = g.add_node(NodeKind::Secret, "KEY", TrustZone::FirstParty);
let build = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
let artifact = g.add_node(NodeKind::Artifact, "dist.zip", TrustZone::FirstParty);
let deploy = g.add_node(NodeKind::Step, "deploy", TrustZone::ThirdParty);
g.add_edge(build, secret, EdgeKind::HasAccessTo);
g.add_edge(build, artifact, EdgeKind::Produces);
g.add_edge(artifact, deploy, EdgeKind::Consumes);
let findings = artifact_boundary_crossing(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].category,
FindingCategory::ArtifactBoundaryCrossing
);
}
#[test]
fn propagation_to_sha_pinned_is_high_not_critical() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(
"digest".into(),
"a5ac7e51b41094c92402da3b24376905380afc29".into(),
);
let identity = g.add_node(NodeKind::Identity, "GITHUB_TOKEN", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "checkout", TrustZone::ThirdParty);
let image = g.add_node_with_metadata(
NodeKind::Image,
"actions/checkout@a5ac7e51b41094c92402da3b24376905380afc29",
TrustZone::ThirdParty,
meta,
);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
g.add_edge(step, image, EdgeKind::UsesImage);
let findings = authority_propagation(&g, 4);
let image_findings: Vec<_> = findings
.iter()
.filter(|f| f.nodes_involved.contains(&image))
.collect();
assert!(!image_findings.is_empty());
assert_eq!(image_findings[0].severity, Severity::High);
}
#[test]
fn oidc_identity_to_pinned_third_party_is_critical() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut id_meta = std::collections::HashMap::new();
id_meta.insert(META_OIDC.into(), "true".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"AWS_OIDC_ROLE",
TrustZone::FirstParty,
id_meta,
);
let mut img_meta = std::collections::HashMap::new();
img_meta.insert(
META_DIGEST.into(),
"a5ac7e51b41094c92402da3b24376905380afc29".into(),
);
let image = g.add_node_with_metadata(
NodeKind::Image,
"aws-actions/configure-aws-credentials@a5ac7e51b41094c92402da3b24376905380afc29",
TrustZone::ThirdParty,
img_meta,
);
let step = g.add_node(
NodeKind::Step,
"configure-aws-credentials",
TrustZone::ThirdParty,
);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
g.add_edge(step, image, EdgeKind::UsesImage);
let findings = authority_propagation(&g, 4);
let image_findings: Vec<_> = findings
.iter()
.filter(|f| f.nodes_involved.contains(&image))
.collect();
assert!(
!image_findings.is_empty(),
"expected OIDC→pinned propagation finding"
);
assert_eq!(image_findings[0].severity, Severity::Critical);
}
#[test]
fn propagation_to_untrusted_is_critical() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let identity = g.add_node(NodeKind::Identity, "GITHUB_TOKEN", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::Untrusted);
let image = g.add_node(NodeKind::Image, "evil/action@main", TrustZone::Untrusted);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
g.add_edge(step, image, EdgeKind::UsesImage);
let findings = authority_propagation(&g, 4);
let image_findings: Vec<_> = findings
.iter()
.filter(|f| f.nodes_involved.contains(&image))
.collect();
assert!(!image_findings.is_empty());
assert_eq!(image_findings[0].severity, Severity::Critical);
}
#[test]
fn long_lived_credential_detected() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.add_node(NodeKind::Secret, "AWS_ACCESS_KEY_ID", TrustZone::FirstParty);
g.add_node(NodeKind::Secret, "NPM_TOKEN", TrustZone::FirstParty);
g.add_node(NodeKind::Secret, "DEPLOY_API_KEY", TrustZone::FirstParty);
g.add_node(NodeKind::Secret, "CACHE_TTL", TrustZone::FirstParty);
let findings = long_lived_credential(&g);
assert_eq!(findings.len(), 2); assert!(findings
.iter()
.all(|f| f.category == FindingCategory::LongLivedCredential));
}
#[test]
fn duplicate_unpinned_actions_deduplicated() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.add_node(NodeKind::Image, "actions/checkout@v4", TrustZone::Untrusted);
g.add_node(NodeKind::Image, "actions/checkout@v4", TrustZone::Untrusted);
g.add_node(
NodeKind::Image,
"actions/setup-node@v3",
TrustZone::Untrusted,
);
let findings = unpinned_action(&g);
assert_eq!(findings.len(), 2);
}
#[test]
fn broad_identity_scope_flagged_as_high() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_PERMISSIONS.into(), "write-all".into());
meta.insert(META_IDENTITY_SCOPE.into(), "broad".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"GITHUB_TOKEN",
TrustZone::FirstParty,
meta,
);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
let findings = over_privileged_identity(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert!(findings[0].message.contains("broad"));
}
#[test]
fn unknown_identity_scope_flagged_as_medium() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_PERMISSIONS.into(), "custom-scope".into());
meta.insert(META_IDENTITY_SCOPE.into(), "unknown".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"GITHUB_TOKEN",
TrustZone::FirstParty,
meta,
);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
let findings = over_privileged_identity(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Medium);
assert!(findings[0].message.contains("unknown"));
}
#[test]
fn floating_image_unpinned_container_flagged() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_CONTAINER.into(), "true".into());
g.add_node_with_metadata(NodeKind::Image, "ubuntu:22.04", TrustZone::Untrusted, meta);
let findings = floating_image(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].category, FindingCategory::FloatingImage);
assert_eq!(findings[0].severity, Severity::Medium);
}
#[test]
fn partial_graph_caps_critical_findings_at_high() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.mark_partial("matrix strategy hides some authority paths");
let identity = g.add_node(NodeKind::Identity, "GITHUB_TOKEN", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::Untrusted);
let image = g.add_node(NodeKind::Image, "evil/action@main", TrustZone::Untrusted);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
g.add_edge(step, image, EdgeKind::UsesImage);
let findings = run_all_rules(&g, 4);
assert!(findings
.iter()
.any(|f| f.category == FindingCategory::AuthorityPropagation));
assert!(findings
.iter()
.any(|f| f.category == FindingCategory::UntrustedWithAuthority));
assert!(findings.iter().all(|f| f.severity >= Severity::High));
assert!(!findings.iter().any(|f| f.severity == Severity::Critical));
}
#[test]
fn complete_graph_keeps_critical_findings() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let identity = g.add_node(NodeKind::Identity, "GITHUB_TOKEN", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::Untrusted);
let image = g.add_node(NodeKind::Image, "evil/action@main", TrustZone::Untrusted);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
g.add_edge(step, image, EdgeKind::UsesImage);
let findings = run_all_rules(&g, 4);
assert!(findings.iter().any(|f| f.severity == Severity::Critical));
}
#[test]
fn floating_image_digest_pinned_container_not_flagged() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_CONTAINER.into(), "true".into());
g.add_node_with_metadata(
NodeKind::Image,
"ubuntu@sha256:a5ac7e51b41094c92402da3b24376905380afc29a5ac7e51b41094c92402da3b",
TrustZone::ThirdParty,
meta,
);
let findings = floating_image(&g);
assert!(
findings.is_empty(),
"digest-pinned container should not be flagged"
);
}
#[test]
fn unpinned_action_does_not_flag_container_images() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_CONTAINER.into(), "true".into());
g.add_node_with_metadata(NodeKind::Image, "ubuntu:22.04", TrustZone::Untrusted, meta);
let findings = unpinned_action(&g);
assert!(
findings.is_empty(),
"unpinned_action must skip container images to avoid double-flagging"
);
}
#[test]
fn floating_image_ignores_action_images() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.add_node(NodeKind::Image, "actions/checkout@v4", TrustZone::Untrusted);
let findings = floating_image(&g);
assert!(
findings.is_empty(),
"floating_image should not flag step actions"
);
}
#[test]
fn persisted_credential_rule_fires_on_persists_to_edge() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let token = g.add_node(
NodeKind::Identity,
"System.AccessToken",
TrustZone::FirstParty,
);
let checkout = g.add_node(NodeKind::Step, "checkout", TrustZone::FirstParty);
g.add_edge(checkout, token, EdgeKind::PersistsTo);
let findings = persisted_credential(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].category, FindingCategory::PersistedCredential);
assert_eq!(findings[0].severity, Severity::High);
assert!(findings[0].message.contains("persistCredentials"));
}
#[test]
fn untrusted_with_cli_flag_exposed_secret_notes_log_exposure() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "TerraformCLI@0", TrustZone::Untrusted);
let mut meta = std::collections::HashMap::new();
meta.insert(META_CLI_FLAG_EXPOSED.into(), "true".into());
let secret =
g.add_node_with_metadata(NodeKind::Secret, "db_password", TrustZone::FirstParty, meta);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = untrusted_with_authority(&g);
assert_eq!(findings.len(), 1);
assert!(
findings[0].message.contains("-var flag"),
"message should note -var flag log exposure"
);
assert!(matches!(
findings[0].recommendation,
Recommendation::Manual { .. }
));
}
#[test]
fn constrained_identity_scope_not_flagged() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_PERMISSIONS.into(), "{ contents: read }".into());
meta.insert(META_IDENTITY_SCOPE.into(), "constrained".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"GITHUB_TOKEN",
TrustZone::FirstParty,
meta,
);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
let findings = over_privileged_identity(&g);
assert!(
findings.is_empty(),
"constrained scope should not be flagged"
);
}
#[test]
fn trigger_context_mismatch_fires_on_pull_request_target_with_secret() {
let mut g = AuthorityGraph::new(source("ci.yml"));
g.metadata
.insert(META_TRIGGER.into(), "pull_request_target".into());
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = trigger_context_mismatch(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(
findings[0].category,
FindingCategory::TriggerContextMismatch
);
}
#[test]
fn trigger_context_mismatch_no_fire_without_trigger_metadata() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = trigger_context_mismatch(&g);
assert!(findings.is_empty(), "no trigger metadata → no finding");
}
#[test]
fn cross_workflow_authority_chain_detected() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let external = g.add_node(
NodeKind::Image,
"evil/workflow.yml@main",
TrustZone::Untrusted,
);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
g.add_edge(step, external, EdgeKind::DelegatesTo);
let findings = cross_workflow_authority_chain(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(
findings[0].category,
FindingCategory::CrossWorkflowAuthorityChain
);
}
#[test]
fn cross_workflow_authority_chain_no_fire_if_local_delegation() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let local = g.add_node(NodeKind::Image, "./local-action", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
g.add_edge(step, local, EdgeKind::DelegatesTo);
let findings = cross_workflow_authority_chain(&g);
assert!(
findings.is_empty(),
"FirstParty delegation should not be flagged"
);
}
#[test]
fn authority_cycle_detected() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let a = g.add_node(NodeKind::Step, "A", TrustZone::FirstParty);
let b = g.add_node(NodeKind::Step, "B", TrustZone::FirstParty);
g.add_edge(a, b, EdgeKind::DelegatesTo);
g.add_edge(b, a, EdgeKind::DelegatesTo);
let findings = authority_cycle(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].category, FindingCategory::AuthorityCycle);
assert_eq!(findings[0].severity, Severity::High);
}
#[test]
fn authority_cycle_no_fire_for_acyclic_graph() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let a = g.add_node(NodeKind::Step, "A", TrustZone::FirstParty);
let b = g.add_node(NodeKind::Step, "B", TrustZone::FirstParty);
let c = g.add_node(NodeKind::Step, "C", TrustZone::FirstParty);
g.add_edge(a, b, EdgeKind::DelegatesTo);
g.add_edge(b, c, EdgeKind::DelegatesTo);
let findings = authority_cycle(&g);
assert!(findings.is_empty(), "acyclic graph must not fire");
}
#[test]
fn uplift_without_attestation_fires_when_oidc_no_attests() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_OIDC.into(), "true".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"AWS/deploy-role",
TrustZone::FirstParty,
meta,
);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
let findings = uplift_without_attestation(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Info);
assert_eq!(
findings[0].category,
FindingCategory::UpliftWithoutAttestation
);
}
#[test]
fn uplift_without_attestation_no_fire_when_attests_present() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut id_meta = std::collections::HashMap::new();
id_meta.insert(META_OIDC.into(), "true".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"AWS/deploy-role",
TrustZone::FirstParty,
id_meta,
);
let mut step_meta = std::collections::HashMap::new();
step_meta.insert(META_ATTESTS.into(), "true".into());
let attest_step =
g.add_node_with_metadata(NodeKind::Step, "attest", TrustZone::FirstParty, step_meta);
let build_step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(build_step, identity, EdgeKind::HasAccessTo);
let _ = attest_step;
let findings = uplift_without_attestation(&g);
assert!(findings.is_empty(), "attestation present → no finding");
}
#[test]
fn uplift_without_attestation_no_fire_without_oidc() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_PERMISSIONS.into(), "write-all".into());
meta.insert(META_IDENTITY_SCOPE.into(), "broad".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"GITHUB_TOKEN",
TrustZone::FirstParty,
meta,
);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
let findings = uplift_without_attestation(&g);
assert!(
findings.is_empty(),
"broad identity without OIDC must not fire"
);
}
#[test]
fn self_mutating_pipeline_untrusted_is_critical() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_WRITES_ENV_GATE.into(), "true".into());
g.add_node_with_metadata(NodeKind::Step, "fork-step", TrustZone::Untrusted, meta);
let findings = self_mutating_pipeline(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(findings[0].category, FindingCategory::SelfMutatingPipeline);
}
#[test]
fn self_mutating_pipeline_privileged_step_is_high() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_WRITES_ENV_GATE.into(), "true".into());
let step = g.add_node_with_metadata(NodeKind::Step, "build", TrustZone::FirstParty, meta);
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = self_mutating_pipeline(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
}
#[test]
fn trigger_context_mismatch_fires_on_ado_pr_with_secret_as_high() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = trigger_context_mismatch(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert_eq!(
findings[0].category,
FindingCategory::TriggerContextMismatch
);
}
#[test]
fn cross_workflow_authority_chain_third_party_is_high() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let external = g.add_node(
NodeKind::Image,
"org/repo/.github/workflows/deploy.yml@a5ac7e51b41094c92402da3b24376905380afc29",
TrustZone::ThirdParty,
);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
g.add_edge(step, external, EdgeKind::DelegatesTo);
let findings = cross_workflow_authority_chain(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].severity,
Severity::High,
"ThirdParty delegation target should be High (Critical reserved for Untrusted)"
);
assert_eq!(
findings[0].category,
FindingCategory::CrossWorkflowAuthorityChain
);
}
#[test]
fn self_mutating_pipeline_first_party_no_authority_is_medium() {
let mut g = AuthorityGraph::new(source("ci.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_WRITES_ENV_GATE.into(), "true".into());
g.add_node_with_metadata(NodeKind::Step, "set-version", TrustZone::FirstParty, meta);
let findings = self_mutating_pipeline(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Medium);
assert_eq!(findings[0].category, FindingCategory::SelfMutatingPipeline);
}
#[test]
fn authority_cycle_3node_cycle_includes_all_members() {
let mut g = AuthorityGraph::new(source("test.yml"));
let a = g.add_node(NodeKind::Step, "A", TrustZone::FirstParty);
let b = g.add_node(NodeKind::Step, "B", TrustZone::FirstParty);
let c = g.add_node(NodeKind::Step, "C", TrustZone::FirstParty);
g.add_edge(a, b, EdgeKind::DelegatesTo);
g.add_edge(b, c, EdgeKind::DelegatesTo);
g.add_edge(c, a, EdgeKind::DelegatesTo);
let findings = authority_cycle(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].category, FindingCategory::AuthorityCycle);
assert!(
findings[0].nodes_involved.contains(&a),
"A must be in nodes_involved"
);
assert!(
findings[0].nodes_involved.contains(&b),
"B must be in nodes_involved — middle of A→B→C→A cycle"
);
assert!(
findings[0].nodes_involved.contains(&c),
"C must be in nodes_involved"
);
}
#[test]
fn variable_group_in_pr_job_fires_on_pr_trigger_with_var_group() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let mut secret_meta = std::collections::HashMap::new();
secret_meta.insert(META_VARIABLE_GROUP.into(), "true".into());
let secret = g.add_node_with_metadata(
NodeKind::Secret,
"prod-deploy-secrets",
TrustZone::FirstParty,
secret_meta,
);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = variable_group_in_pr_job(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(findings[0].category, FindingCategory::VariableGroupInPrJob);
assert!(findings[0].message.contains("prod-deploy-secrets"));
}
#[test]
fn variable_group_in_pr_job_no_fire_without_pr_trigger() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let mut secret_meta = std::collections::HashMap::new();
secret_meta.insert(META_VARIABLE_GROUP.into(), "true".into());
let secret = g.add_node_with_metadata(
NodeKind::Secret,
"prod-deploy-secrets",
TrustZone::FirstParty,
secret_meta,
);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = variable_group_in_pr_job(&g);
assert!(
findings.is_empty(),
"no PR trigger → variable_group_in_pr_job must not fire"
);
}
#[test]
fn self_hosted_pool_pr_hijack_fires_when_all_three_factors_present() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let mut pool_meta = std::collections::HashMap::new();
pool_meta.insert(META_SELF_HOSTED.into(), "true".into());
g.add_node_with_metadata(
NodeKind::Image,
"self-hosted-pool",
TrustZone::FirstParty,
pool_meta,
);
let mut step_meta = std::collections::HashMap::new();
step_meta.insert(META_CHECKOUT_SELF.into(), "true".into());
g.add_node_with_metadata(NodeKind::Step, "checkout", TrustZone::FirstParty, step_meta);
let findings = self_hosted_pool_pr_hijack(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(
findings[0].category,
FindingCategory::SelfHostedPoolPrHijack
);
assert!(findings[0].message.contains("self-hosted"));
}
#[test]
fn self_hosted_pool_pr_hijack_no_fire_without_pr_trigger() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let mut pool_meta = std::collections::HashMap::new();
pool_meta.insert(META_SELF_HOSTED.into(), "true".into());
g.add_node_with_metadata(
NodeKind::Image,
"self-hosted-pool",
TrustZone::FirstParty,
pool_meta,
);
let mut step_meta = std::collections::HashMap::new();
step_meta.insert(META_CHECKOUT_SELF.into(), "true".into());
g.add_node_with_metadata(NodeKind::Step, "checkout", TrustZone::FirstParty, step_meta);
let findings = self_hosted_pool_pr_hijack(&g);
assert!(
findings.is_empty(),
"no PR trigger → self_hosted_pool_pr_hijack must not fire"
);
}
#[test]
fn service_connection_scope_mismatch_fires_on_pr_broad_non_oidc() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let mut sc_meta = std::collections::HashMap::new();
sc_meta.insert(META_SERVICE_CONNECTION.into(), "true".into());
sc_meta.insert(META_IDENTITY_SCOPE.into(), "broad".into());
let sc = g.add_node_with_metadata(
NodeKind::Identity,
"prod-azure-sc",
TrustZone::FirstParty,
sc_meta,
);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
g.add_edge(step, sc, EdgeKind::HasAccessTo);
let findings = service_connection_scope_mismatch(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert_eq!(
findings[0].category,
FindingCategory::ServiceConnectionScopeMismatch
);
assert!(findings[0].message.contains("prod-azure-sc"));
}
#[test]
fn service_connection_scope_mismatch_no_fire_without_pr_trigger() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let mut sc_meta = std::collections::HashMap::new();
sc_meta.insert(META_SERVICE_CONNECTION.into(), "true".into());
sc_meta.insert(META_IDENTITY_SCOPE.into(), "broad".into());
let sc = g.add_node_with_metadata(
NodeKind::Identity,
"prod-azure-sc",
TrustZone::FirstParty,
sc_meta,
);
let step = g.add_node(NodeKind::Step, "deploy", TrustZone::FirstParty);
g.add_edge(step, sc, EdgeKind::HasAccessTo);
let findings = service_connection_scope_mismatch(&g);
assert!(
findings.is_empty(),
"no PR trigger → service_connection_scope_mismatch must not fire"
);
}
#[test]
fn checkout_self_pr_exposure_fires_on_pr_trigger() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let mut step_meta = std::collections::HashMap::new();
step_meta.insert(META_CHECKOUT_SELF.into(), "true".into());
g.add_node_with_metadata(NodeKind::Step, "checkout", TrustZone::FirstParty, step_meta);
let findings = checkout_self_pr_exposure(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].category,
FindingCategory::CheckoutSelfPrExposure
);
assert_eq!(findings[0].severity, Severity::High);
}
#[test]
fn checkout_self_pr_exposure_no_fire_without_pr_trigger() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let mut step_meta = std::collections::HashMap::new();
step_meta.insert(META_CHECKOUT_SELF.into(), "true".into());
g.add_node_with_metadata(NodeKind::Step, "checkout", TrustZone::FirstParty, step_meta);
let findings = checkout_self_pr_exposure(&g);
assert!(
findings.is_empty(),
"no PR trigger → checkout_self_pr_exposure must not fire"
);
}
#[test]
fn variable_group_in_pr_job_uses_cellos_remediation() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let mut secret_meta = std::collections::HashMap::new();
secret_meta.insert(META_VARIABLE_GROUP.into(), "true".into());
let secret = g.add_node_with_metadata(
NodeKind::Secret,
"prod-secret",
TrustZone::FirstParty,
secret_meta,
);
let step = g.add_node(NodeKind::Step, "deploy step", TrustZone::Untrusted);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = variable_group_in_pr_job(&g);
assert!(!findings.is_empty());
assert!(
matches!(
findings[0].recommendation,
Recommendation::CellosRemediation { .. }
),
"variable_group_in_pr_job must recommend CellosRemediation"
);
}
#[test]
fn service_connection_scope_mismatch_uses_cellos_remediation() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.metadata.insert(META_TRIGGER.into(), "pr".into());
let mut id_meta = std::collections::HashMap::new();
id_meta.insert(META_SERVICE_CONNECTION.into(), "true".into());
id_meta.insert(META_IDENTITY_SCOPE.into(), "broad".into());
let identity = g.add_node_with_metadata(
NodeKind::Identity,
"sub-conn",
TrustZone::FirstParty,
id_meta,
);
let step = g.add_node(NodeKind::Step, "azure deploy", TrustZone::Untrusted);
g.add_edge(step, identity, EdgeKind::HasAccessTo);
let findings = service_connection_scope_mismatch(&g);
assert!(!findings.is_empty());
assert!(
matches!(
findings[0].recommendation,
Recommendation::CellosRemediation { .. }
),
"service_connection_scope_mismatch must recommend CellosRemediation"
);
}
fn build_env_approval_graph(gated: bool) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let secret = g.add_node(NodeKind::Secret, "DEPLOY_KEY", TrustZone::FirstParty);
let mut middle_meta = std::collections::HashMap::new();
if gated {
middle_meta.insert(META_ENV_APPROVAL.into(), "true".into());
}
let middle = g.add_node_with_metadata(
NodeKind::Step,
"deploy-prod",
TrustZone::FirstParty,
middle_meta,
);
let artifact = g.add_node(NodeKind::Artifact, "release.tar", TrustZone::FirstParty);
let third = g.add_node(
NodeKind::Step,
"third-party/uploader",
TrustZone::ThirdParty,
);
g.add_edge(middle, secret, EdgeKind::HasAccessTo);
g.add_edge(middle, artifact, EdgeKind::Produces);
g.add_edge(artifact, third, EdgeKind::Consumes);
g
}
#[test]
fn env_approval_gate_reduces_propagation_severity() {
let baseline = authority_propagation(&build_env_approval_graph(false), 4);
let baseline_finding = baseline
.iter()
.find(|f| f.category == FindingCategory::AuthorityPropagation)
.expect("baseline must produce an AuthorityPropagation finding");
assert_eq!(baseline_finding.severity, Severity::Critical);
assert!(!baseline_finding
.message
.contains("environment approval gate"));
let gated = authority_propagation(&build_env_approval_graph(true), 4);
let gated_finding = gated
.iter()
.find(|f| f.category == FindingCategory::AuthorityPropagation)
.expect("gated must produce an AuthorityPropagation finding");
assert_eq!(
gated_finding.severity,
Severity::High,
"Critical must downgrade to High when path crosses an env-approval gate"
);
assert!(
gated_finding
.message
.contains("(mitigated: environment approval gate)"),
"gated finding must annotate the mitigation in its message"
);
}
#[test]
fn downgrade_one_step_table() {
assert_eq!(downgrade_one_step(Severity::Critical), Severity::High);
assert_eq!(downgrade_one_step(Severity::High), Severity::Medium);
assert_eq!(downgrade_one_step(Severity::Medium), Severity::Low);
assert_eq!(downgrade_one_step(Severity::Low), Severity::Low);
assert_eq!(downgrade_one_step(Severity::Info), Severity::Info);
}
fn graph_with_repo(
alias: &str,
repo_type: &str,
name: &str,
git_ref: Option<&str>,
used: bool,
) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let mut obj = serde_json::Map::new();
obj.insert("alias".into(), serde_json::Value::String(alias.into()));
obj.insert(
"repo_type".into(),
serde_json::Value::String(repo_type.into()),
);
obj.insert("name".into(), serde_json::Value::String(name.into()));
if let Some(r) = git_ref {
obj.insert("ref".into(), serde_json::Value::String(r.into()));
}
obj.insert("used".into(), serde_json::Value::Bool(used));
let arr = serde_json::Value::Array(vec![serde_json::Value::Object(obj)]);
g.metadata.insert(
META_REPOSITORIES.into(),
serde_json::to_string(&arr).unwrap(),
);
g
}
fn graph_with_script_step(body: &str, secret_name: Option<&str>) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("ado.yml"));
let mut meta = std::collections::HashMap::new();
meta.insert(META_SCRIPT_BODY.into(), body.into());
let step_id =
g.add_node_with_metadata(NodeKind::Step, "deploy-vm", TrustZone::FirstParty, meta);
if let Some(name) = secret_name {
let sec = g.add_node(NodeKind::Secret, name, TrustZone::FirstParty);
g.add_edge(step_id, sec, EdgeKind::HasAccessTo);
}
g
}
fn build_step_with_script(secret_name: &str, script: &str) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("ado.yml"));
let secret = g.add_node(NodeKind::Secret, secret_name, TrustZone::FirstParty);
let mut meta = std::collections::HashMap::new();
meta.insert(META_SCRIPT_BODY.into(), script.into());
let step = g.add_node_with_metadata(NodeKind::Step, "deploy", TrustZone::FirstParty, meta);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
g
}
#[test]
fn template_extends_unpinned_branch_fires_on_missing_ref() {
let g = graph_with_repo(
"template-library",
"git",
"Template Library/Library",
None,
true,
);
let findings = template_extends_unpinned_branch(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].category,
FindingCategory::TemplateExtendsUnpinnedBranch
);
assert_eq!(findings[0].severity, Severity::High);
assert!(findings[0].message.contains("default branch"));
}
#[test]
fn template_extends_unpinned_branch_fires_on_refs_heads_main() {
let g = graph_with_repo(
"templates",
"git",
"org/templates",
Some("refs/heads/main"),
true,
);
let findings = template_extends_unpinned_branch(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("mutable branch 'main'"));
}
#[test]
fn template_extends_unpinned_branch_skips_tag_pinned() {
let g = graph_with_repo(
"templates",
"github",
"org/templates",
Some("refs/tags/v1.0.0"),
true,
);
let findings = template_extends_unpinned_branch(&g);
assert!(
findings.is_empty(),
"refs/tags/v1.0.0 must be treated as pinned"
);
}
#[test]
fn template_extends_unpinned_branch_skips_sha_pinned() {
let sha = "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0";
assert_eq!(sha.len(), 40);
let g = graph_with_repo("templates", "git", "org/templates", Some(sha), true);
let findings = template_extends_unpinned_branch(&g);
assert!(
findings.is_empty(),
"40-char hex SHA must be treated as pinned"
);
}
#[test]
fn template_extends_unpinned_branch_skips_unreferenced_repo_with_no_ref() {
let g = graph_with_repo(
"templates",
"git",
"org/templates",
None, false, );
let findings = template_extends_unpinned_branch(&g);
assert!(
findings.is_empty(),
"repo declared with no ref and no consumer must not fire"
);
}
#[test]
fn template_extends_unpinned_branch_fires_on_explicit_branch_even_without_in_file_consumer() {
let g = graph_with_repo(
"adf_publish",
"git",
"org/finance-reporting",
Some("refs/heads/adf_publish"),
false, );
let findings = template_extends_unpinned_branch(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("mutable branch 'adf_publish'"));
}
#[test]
fn template_extends_unpinned_branch_skips_when_metadata_absent() {
let g = AuthorityGraph::new(source("ci.yml"));
assert!(template_extends_unpinned_branch(&g).is_empty());
}
#[test]
fn template_extends_unpinned_branch_handles_bare_branch_name() {
let g = graph_with_repo(
"template-library",
"git",
"Template Library/Library",
Some("main"),
true,
);
let findings = template_extends_unpinned_branch(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("mutable branch 'main'"));
}
#[test]
fn template_repo_ref_is_feature_branch_fires_on_bare_feature_branch() {
let g = graph_with_repo(
"templateLibRepo",
"git",
"Template Library/Template Library",
Some("feature/maps-network"),
true,
);
let findings = template_repo_ref_is_feature_branch(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].category,
FindingCategory::TemplateRepoRefIsFeatureBranch
);
assert_eq!(findings[0].severity, Severity::High);
assert!(findings[0].message.contains("feature/maps-network"));
assert!(findings[0].message.contains("feature-class"));
}
#[test]
fn template_repo_ref_is_feature_branch_fires_on_refs_heads_feature() {
let g = graph_with_repo(
"templates",
"git",
"org/templates",
Some("refs/heads/feature/wip"),
true,
);
let findings = template_repo_ref_is_feature_branch(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("feature/wip"));
}
#[test]
fn template_repo_ref_is_feature_branch_fires_on_develop_branch() {
let g = graph_with_repo(
"templates",
"git",
"org/templates",
Some("refs/heads/develop"),
true,
);
let findings = template_repo_ref_is_feature_branch(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn template_repo_ref_is_feature_branch_skips_main_branch() {
let g = graph_with_repo(
"templates",
"git",
"org/templates",
Some("refs/heads/main"),
true,
);
assert!(template_repo_ref_is_feature_branch(&g).is_empty());
assert_eq!(template_extends_unpinned_branch(&g).len(), 1);
}
#[test]
fn template_repo_ref_is_feature_branch_skips_master_release_hotfix() {
for ref_value in [
"master",
"refs/heads/master",
"release/v1.4",
"refs/heads/release/2026-q2",
"releases/2026-04",
"hotfix/CVE-2026-0001",
"refs/heads/hotfix/CVE-2026-0002",
] {
let g = graph_with_repo("t", "git", "org/t", Some(ref_value), true);
assert!(
template_repo_ref_is_feature_branch(&g).is_empty(),
"ref {ref_value:?} must not fire as feature-class"
);
}
}
#[test]
fn template_repo_ref_is_feature_branch_skips_pinned_refs() {
let sha = "a1b2c3d4e5f6a7b8c9d0e1f2a3b4c5d6e7f8a9b0";
for ref_value in [
sha.to_string(),
"refs/tags/v1.4.2".to_string(),
format!("refs/heads/{sha}"),
] {
let g = graph_with_repo("templates", "git", "org/t", Some(&ref_value), true);
assert!(
template_repo_ref_is_feature_branch(&g).is_empty(),
"pinned ref {ref_value:?} must not fire"
);
}
}
#[test]
fn template_repo_ref_is_feature_branch_skips_when_ref_absent() {
let g = graph_with_repo("templates", "git", "org/templates", None, true);
assert!(template_repo_ref_is_feature_branch(&g).is_empty());
}
#[test]
fn template_repo_ref_is_feature_branch_cofires_with_parent_rule() {
let g = graph_with_repo(
"templateLibRepo",
"git",
"Template Library/Template Library",
Some("feature/maps-network"),
true,
);
let parent = template_extends_unpinned_branch(&g);
let refinement = template_repo_ref_is_feature_branch(&g);
assert_eq!(parent.len(), 1, "parent rule must still fire");
assert_eq!(refinement.len(), 1, "refinement must fire alongside");
assert_ne!(parent[0].category, refinement[0].category);
}
#[test]
fn is_feature_class_branch_classification() {
for b in [
"main",
"MAIN",
"master",
"refs/heads/main",
"release/v1",
"release/",
"release",
"releases/2026",
"hotfix/x",
"hotfix",
"hotfixes/y",
" refs/heads/main ",
] {
assert!(!is_feature_class_branch(b), "{b:?} must be trunk");
}
for b in [
"feature/foo",
"topic/bar",
"dev/wip",
"wip/x",
"develop",
"users/alice/spike",
"personal-branch",
"refs/heads/feature/x",
"main-staging", ] {
assert!(is_feature_class_branch(b), "{b:?} must be feature-class");
}
assert!(!is_feature_class_branch(""));
assert!(!is_feature_class_branch(" "));
}
#[test]
fn template_extends_unpinned_branch_skips_refs_heads_with_sha() {
let sha = "0123456789abcdef0123456789abcdef01234567";
let g = graph_with_repo(
"templates",
"git",
"org/templates",
Some(&format!("refs/heads/{sha}")),
true,
);
let findings = template_extends_unpinned_branch(&g);
assert!(findings.is_empty());
}
#[test]
fn vm_remote_exec_fires_on_set_azvmextension_with_minted_sas() {
let body = r#"
$sastokenpackages = New-AzStorageContainerSASToken -Container $packagecontainer -Context $ctx -Permission r -ExpiryTime (Get-Date).AddHours(3)
Set-AzVMExtension -ResourceGroupName $vmRG -VMName $vm.name -Name 'customScript' `
-Publisher 'Microsoft.Compute' -ExtensionType 'CustomScriptExtension' `
-Settings @{ "commandToExecute" = "powershell -File install.ps1 -saskey `"$sastokenpackages`"" }
"#;
let g = graph_with_script_step(body, None);
let findings = vm_remote_exec_via_pipeline_secret(&g);
assert_eq!(findings.len(), 1, "should fire once");
assert_eq!(
findings[0].category,
FindingCategory::VmRemoteExecViaPipelineSecret
);
assert_eq!(findings[0].severity, Severity::High);
}
#[test]
fn vm_remote_exec_fires_on_invoke_azvmruncommand_with_pipeline_secret() {
let body = r#"
Invoke-AzVMRunCommand -ResourceGroupName rg -VMName vm `
-CommandId RunPowerShellScript -ScriptString "Add-LocalGroupMember -Member admin -Password $(DOMAIN_JOIN_PASSWORD)"
"#;
let g = graph_with_script_step(body, Some("DOMAIN_JOIN_PASSWORD"));
let findings = vm_remote_exec_via_pipeline_secret(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0]
.message
.contains("interpolating a pipeline secret"));
}
#[test]
fn vm_remote_exec_does_not_fire_without_remote_exec_call() {
let body = r#"
$sas = New-AzStorageContainerSASToken -Container c -Context $ctx -Permission r -ExpiryTime (Get-Date).AddHours(1)
Write-Host "sas length is $($sas.Length)"
"#;
let g = graph_with_script_step(body, None);
let findings = vm_remote_exec_via_pipeline_secret(&g);
assert!(findings.is_empty());
}
#[test]
fn vm_remote_exec_does_not_fire_when_remote_exec_has_no_secret_or_sas() {
let body = r#"
Set-AzVMExtension -ResourceGroupName rg -VMName vm -Name diag `
-Publisher Microsoft.Azure.Diagnostics -ExtensionType IaaSDiagnostics `
-Settings @{ "xmlCfg" = "<wadcfg/>" }
"#;
let g = graph_with_script_step(body, None);
let findings = vm_remote_exec_via_pipeline_secret(&g);
assert!(
findings.is_empty(),
"no SAS-mint and no secret interpolation → no finding"
);
}
#[test]
fn vm_remote_exec_fires_on_az_cli_run_command() {
let body = r#"
az vm run-command invoke --resource-group rg --name vm `
--command-id RunShellScript --scripts "echo $(DB_PASSWORD) > /tmp/x"
"#;
let g = graph_with_script_step(body, Some("DB_PASSWORD"));
let findings = vm_remote_exec_via_pipeline_secret(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("az vm run-command"));
}
#[test]
fn sas_in_cmdline_fires_on_minted_sas_interpolated_into_command_to_execute() {
let body = r#"
$sastokenpackages = New-AzStorageContainerSASToken -Container c -Context $ctx -Permission r -ExpiryTime (Get-Date).AddHours(3)
$settings = @{ "commandToExecute" = "powershell install.ps1 -sas `"$sastokenpackages`"" }
"#;
let g = graph_with_script_step(body, None);
let findings = short_lived_sas_in_command_line(&g);
assert_eq!(findings.len(), 1);
assert_eq!(
findings[0].category,
FindingCategory::ShortLivedSasInCommandLine
);
assert_eq!(findings[0].severity, Severity::Medium);
assert!(findings[0].message.contains("sastokenpackages"));
}
#[test]
fn sas_in_cmdline_does_not_fire_when_sas_is_only_uploaded_to_blob() {
let body = r#"
$sas = New-AzStorageContainerSASToken -Container c -Context $ctx -Permission r -ExpiryTime (Get-Date).AddHours(1)
$url = "https://acct.blob.core.windows.net/c/?" + $sas
Invoke-WebRequest -Uri $url -OutFile foo.zip
"#;
let g = graph_with_script_step(body, None);
let findings = short_lived_sas_in_command_line(&g);
assert!(findings.is_empty(), "no command-line sink → no finding");
}
#[test]
fn sas_in_cmdline_does_not_fire_without_sas_mint() {
let body = r#"
$settings = @{ "commandToExecute" = "powershell -File foo.ps1" }
"#;
let g = graph_with_script_step(body, None);
let findings = short_lived_sas_in_command_line(&g);
assert!(findings.is_empty());
}
#[test]
fn sas_in_cmdline_fires_on_az_cli_generate_sas_with_arguments() {
let body = r#"
sas=$(az storage container generate-sas --name c --account-name acct --permissions r --expiry 2099-01-01 -o tsv)
az vm extension set --vm-name vm --resource-group rg --name CustomScript --publisher Microsoft.Compute \
--settings "{ \"commandToExecute\": \"curl https://acct.blob.core.windows.net/c/foo?$sas\" }"
"#;
let g = graph_with_script_step(body, None);
let findings = short_lived_sas_in_command_line(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn co_fire_on_solarwinds_pattern() {
let body = r#"
$sastokenpackages = New-AzStorageContainerSASToken -Container $pc -Context $ctx -Permission r -ExpiryTime (Get-Date).AddHours(3)
Set-AzVMExtension -ResourceGroupName $rg -VMName $vm `
-Publisher 'Microsoft.Compute' -ExtensionType 'CustomScriptExtension' `
-Settings @{ "commandToExecute" = "powershell -File install.ps1 -sas `"$sastokenpackages`"" }
"#;
let g = graph_with_script_step(body, None);
let r6 = vm_remote_exec_via_pipeline_secret(&g);
let r7 = short_lived_sas_in_command_line(&g);
assert_eq!(r6.len(), 1, "rule 6 must fire on solarwinds shape");
assert_eq!(r7.len(), 1, "rule 7 must fire on solarwinds shape");
}
#[test]
fn body_interpolates_var_does_not_match_prefix() {
assert!(!body_interpolates_var(
"Write-Host $sastokenpackages",
"sas"
));
assert!(body_interpolates_var(
"Write-Host $sastokenpackages",
"sastokenpackages"
));
assert!(body_interpolates_var("echo $(SECRET)", "SECRET"));
}
#[test]
fn powershell_sas_assignments_extracts_var_names() {
let body = r#"
$a = New-AzStorageContainerSASToken -Container c -Context $ctx -Permission r
$b = Get-Date
$sasBlob = New-AzStorageBlobSASToken -Container c -Blob foo -Context $ctx -Permission r
"#;
let names = powershell_sas_assignments(body);
assert!(names.iter().any(|n| n.eq_ignore_ascii_case("a")));
assert!(names.iter().any(|n| n.eq_ignore_ascii_case("sasBlob")));
assert!(!names.iter().any(|n| n.eq_ignore_ascii_case("b")));
}
#[test]
fn bash_export_of_pipeline_secret_flagged() {
let g = build_step_with_script(
"TF_TOKEN",
"echo init\nexport TF_TOKEN_app_terraform_io=\"$(TF_TOKEN)\"\nterraform init",
);
let findings = secret_to_inline_script_env_export(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert!(findings[0].message.contains("$(TF_TOKEN)"));
}
#[test]
fn powershell_assignment_of_pipeline_secret_flagged() {
let g = build_step_with_script(
"AppContainerDBPassword",
"$AppContainerDBPassword = \"$(AppContainerDBPassword)\"\n$x = 1",
);
let findings = secret_to_inline_script_env_export(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("$(AppContainerDBPassword)"));
}
#[test]
fn secret_passed_as_command_argument_not_flagged() {
let g = build_step_with_script("TF_TOKEN", "terraform plan -var \"token=$(TF_TOKEN)\"");
let findings = secret_to_inline_script_env_export(&g);
assert!(
findings.is_empty(),
"command-arg use of $(SECRET) must not trip env-export rule"
);
}
#[test]
fn step_without_script_body_not_flagged() {
let mut g = AuthorityGraph::new(source("ado.yml"));
let secret = g.add_node(NodeKind::Secret, "TF_TOKEN", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "task", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
let findings = secret_to_inline_script_env_export(&g);
assert!(findings.is_empty());
}
#[test]
fn powershell_outfile_of_secret_to_workspace_flagged() {
let script = "$AppContainerDBPassword = \"$(AppContainerDBPassword)\"\n\
$TFfile = Get-Content $(System.DefaultWorkingDirectory)/in.tfvars\n\
$TFfile = $TFfile.Replace(\"x\", $AppContainerDBPassword)\n\
$TFfile | Out-File $(System.DefaultWorkingDirectory)/envVars/tffile.tfvars";
let g = build_step_with_script("AppContainerDBPassword", script);
let findings = secret_materialised_to_workspace_file(&g);
assert_eq!(
findings.len(),
1,
"Out-File of bound secret to workspace must fire"
);
assert_eq!(findings[0].severity, Severity::High);
}
#[test]
fn bash_redirect_of_secret_to_tfvars_flagged() {
let script =
"echo \"token = \\\"$(TF_TOKEN)\\\"\" > $(Build.SourcesDirectory)/secrets.tfvars";
let g = build_step_with_script("TF_TOKEN", script);
let findings = secret_materialised_to_workspace_file(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn echoing_secret_to_stdout_not_flagged_by_materialisation_rule() {
let g = build_step_with_script("TF_TOKEN", "echo using $(TF_TOKEN)\nterraform init");
let findings = secret_materialised_to_workspace_file(&g);
assert!(
findings.is_empty(),
"stdout echo (no file sink) must not trip materialisation rule"
);
}
#[test]
fn write_to_unrelated_path_not_flagged() {
let script = "echo $(MY_SECRET) > /var/tmp/ignore.log";
let g = build_step_with_script("MY_SECRET", script);
let findings = secret_materialised_to_workspace_file(&g);
assert!(findings.is_empty());
}
#[test]
fn keyvault_asplaintext_flagged() {
let script = "$pass = Get-AzKeyVaultSecret -VaultName foo -Name bar -AsPlainText\n\
Write-Host done";
let g = build_step_with_script("UNUSED", script);
let findings = keyvault_secret_to_plaintext(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Medium);
}
#[test]
fn keyvault_secretvaluetext_legacy_pattern_flagged() {
let script = "$pwd = (Get-AzKeyVaultSecret -VaultName foo -Name bar).SecretValueText";
let g = build_step_with_script("UNUSED", script);
let findings = keyvault_secret_to_plaintext(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn convertfrom_securestring_asplaintext_flagged() {
let script = "$plain = ConvertFrom-SecureString $sec -AsPlainText";
let g = build_step_with_script("UNUSED", script);
let findings = keyvault_secret_to_plaintext(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn keyvault_securestring_handling_not_flagged() {
let script = "$sec = Get-AzKeyVaultSecret -VaultName foo -Name bar\n\
$cred = New-Object PSCredential 'svc', $sec.SecretValue";
let g = build_step_with_script("UNUSED", script);
let findings = keyvault_secret_to_plaintext(&g);
assert!(
findings.is_empty(),
"SecureString-only handling is the recommended pattern and must not fire"
);
}
fn step_with_meta(g: &mut AuthorityGraph, name: &str, meta: &[(&str, &str)]) -> NodeId {
let mut m = std::collections::HashMap::new();
for (k, v) in meta {
m.insert((*k).to_string(), (*v).to_string());
}
g.add_node_with_metadata(NodeKind::Step, name, TrustZone::FirstParty, m)
}
#[test]
fn terraform_auto_approve_against_prod_connection_fires() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"Terraform : Apply",
&[
(META_TERRAFORM_AUTO_APPROVE, "true"),
(META_SERVICE_CONNECTION_NAME, "sharedservice-w365-prod-sc"),
],
);
let findings = terraform_auto_approve_in_prod(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(
findings[0].category,
FindingCategory::TerraformAutoApproveInProd
);
assert!(
findings[0].message.contains("sharedservice-w365-prod-sc"),
"message should name the connection, got: {}",
findings[0].message
);
}
#[test]
fn terraform_auto_approve_via_edge_to_service_connection_identity() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
let step = step_with_meta(
&mut g,
"Terraform : Apply",
&[(META_TERRAFORM_AUTO_APPROVE, "true")],
);
let mut id_meta = std::collections::HashMap::new();
id_meta.insert(META_SERVICE_CONNECTION.into(), "true".into());
let conn = g.add_node_with_metadata(
NodeKind::Identity,
"alz-infra-sc-prd-uks",
TrustZone::FirstParty,
id_meta,
);
g.add_edge(step, conn, EdgeKind::HasAccessTo);
let findings = terraform_auto_approve_in_prod(&g);
assert_eq!(findings.len(), 1);
assert!(findings[0].message.contains("alz-infra-sc-prd-uks"));
}
#[test]
fn terraform_auto_approve_with_env_gate_does_not_fire() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"Terraform : Apply",
&[
(META_TERRAFORM_AUTO_APPROVE, "true"),
(META_SERVICE_CONNECTION_NAME, "platform-prod-sc"),
(META_ENV_APPROVAL, "true"),
],
);
let findings = terraform_auto_approve_in_prod(&g);
assert!(
findings.is_empty(),
"env-gated apply must not fire — gate is the change-control"
);
}
#[test]
fn terraform_auto_approve_against_non_prod_does_not_fire() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"Terraform : Apply",
&[
(META_TERRAFORM_AUTO_APPROVE, "true"),
(META_SERVICE_CONNECTION_NAME, "platform-dev-sc"),
],
);
let findings = terraform_auto_approve_in_prod(&g);
assert!(findings.is_empty(), "dev connection must not match prod");
}
#[test]
fn terraform_apply_without_auto_approve_does_not_fire() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"Terraform : Apply",
&[(META_SERVICE_CONNECTION_NAME, "platform-prod-sc")],
);
let findings = terraform_auto_approve_in_prod(&g);
assert!(findings.is_empty());
}
#[test]
fn looks_like_prod_connection_matches_real_world_names() {
assert!(looks_like_prod_connection("sharedservice-w365-prod-sc"));
assert!(looks_like_prod_connection("alz-infra-sc-prd"));
assert!(looks_like_prod_connection("prod-tenant-arm"));
assert!(looks_like_prod_connection("PROD"));
assert!(looks_like_prod_connection("my_prod_arm"));
assert!(!looks_like_prod_connection("approver-sc"));
assert!(!looks_like_prod_connection("reproducer-sc"));
assert!(!looks_like_prod_connection("dev-sc"));
assert!(!looks_like_prod_connection("staging"));
}
#[test]
fn addspn_with_inline_script_fires_with_basic_body() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"ado : azure : login (federated)",
&[
(META_ADD_SPN_TO_ENV, "true"),
(META_SCRIPT_BODY, "az account show --query id -o tsv"),
],
);
let findings = addspn_with_inline_script(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert!(!findings[0]
.message
.contains("explicit token laundering detected"));
}
#[test]
fn addspn_with_inline_script_escalates_message_on_token_laundering() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"ado : azure : login (federated)",
&[
(META_ADD_SPN_TO_ENV, "true"),
(
META_SCRIPT_BODY,
"Write-Output \"##vso[task.setvariable variable=ARM_OIDC_TOKEN]$env:idToken\"",
),
],
);
let findings = addspn_with_inline_script(&g);
assert_eq!(findings.len(), 1);
assert!(
findings[0]
.message
.contains("explicit token laundering detected"),
"message should escalate, got: {}",
findings[0].message
);
}
#[test]
fn addspn_without_inline_script_does_not_fire() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"AzureCLI scriptPath",
&[(META_ADD_SPN_TO_ENV, "true")],
);
let findings = addspn_with_inline_script(&g);
assert!(findings.is_empty());
}
#[test]
fn inline_script_without_addspn_does_not_fire() {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
step_with_meta(
&mut g,
"az account show",
&[(META_SCRIPT_BODY, "az account show")],
);
let findings = addspn_with_inline_script(&g);
assert!(findings.is_empty());
}
#[test]
fn script_launders_spn_token_recognises_known_markers() {
assert!(script_launders_spn_token(
"Write-Output \"##vso[task.setvariable variable=ARM_OIDC_TOKEN]$env:idToken\""
));
assert!(script_launders_spn_token(
"echo \"##vso[task.setvariable variable=X]$env:servicePrincipalKey\""
));
assert!(!script_launders_spn_token(
"echo \"##vso[task.setvariable variable=X]hello\""
));
assert!(!script_launders_spn_token("$env:idToken"));
}
fn graph_with_param(spec: ParamSpec, name: &str) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("azure-pipelines.yml"));
g.parameters.insert(name.to_string(), spec);
g
}
#[test]
fn parameter_interpolation_fires_on_free_form_string_in_inline_script() {
let mut g = graph_with_param(
ParamSpec {
param_type: "string".into(),
has_values_allowlist: false,
},
"appName",
);
step_with_meta(
&mut g,
"terraform workspace",
&[(
META_SCRIPT_BODY,
"terraform workspace select -or-create ${{ parameters.appName }}",
)],
);
let findings = parameter_interpolation_into_shell(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Medium);
assert!(findings[0].message.contains("appName"));
}
#[test]
fn parameter_interpolation_with_values_allowlist_does_not_fire() {
let mut g = graph_with_param(
ParamSpec {
param_type: "string".into(),
has_values_allowlist: true,
},
"location",
);
step_with_meta(
&mut g,
"Terraform Plan",
&[(
META_SCRIPT_BODY,
"terraform plan -var=\"location=${{ parameters.location }}\"",
)],
);
let findings = parameter_interpolation_into_shell(&g);
assert!(
findings.is_empty(),
"values: allowlist must suppress the finding"
);
}
#[test]
fn parameter_interpolation_default_type_is_treated_as_string() {
let mut g = graph_with_param(
ParamSpec {
param_type: "".into(),
has_values_allowlist: false,
},
"appName",
);
step_with_meta(
&mut g,
"Terraform : Plan",
&[(
META_SCRIPT_BODY,
"terraform plan -var \"appName=${{ parameters.appName }}\"",
)],
);
let findings = parameter_interpolation_into_shell(&g);
assert_eq!(findings.len(), 1, "missing type: must default to string");
}
#[test]
fn parameter_interpolation_skips_non_string_params() {
let mut g = graph_with_param(
ParamSpec {
param_type: "boolean".into(),
has_values_allowlist: false,
},
"enabled",
);
step_with_meta(
&mut g,
"step",
&[(META_SCRIPT_BODY, "echo ${{ parameters.enabled }}")],
);
let findings = parameter_interpolation_into_shell(&g);
assert!(findings.is_empty(), "boolean params can't carry shell");
}
#[test]
fn parameter_interpolation_no_spaces_form_also_matches() {
let mut g = graph_with_param(
ParamSpec {
param_type: "string".into(),
has_values_allowlist: false,
},
"x",
);
step_with_meta(
&mut g,
"step",
&[(META_SCRIPT_BODY, "echo ${{parameters.x}}")],
);
let findings = parameter_interpolation_into_shell(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn parameter_interpolation_skips_step_without_script_body() {
let mut g = graph_with_param(
ParamSpec {
param_type: "string".into(),
has_values_allowlist: false,
},
"appName",
);
g.add_node(NodeKind::Step, "task-step", TrustZone::Untrusted);
let findings = parameter_interpolation_into_shell(&g);
assert!(findings.is_empty());
}
fn step_with_body(body: &str) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("ci.yml"));
let id = g.add_node(NodeKind::Step, "install", TrustZone::FirstParty);
if let Some(node) = g.nodes.get_mut(id) {
node.metadata
.insert(META_SCRIPT_BODY.into(), body.to_string());
}
g
}
#[test]
fn floating_curl_pipe_bash_master_is_flagged() {
let g = step_with_body(
"curl -fsSL https://raw.githubusercontent.com/tilt-dev/tilt/master/scripts/install.sh | bash",
);
let findings = runtime_script_fetched_from_floating_url(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert_eq!(
findings[0].category,
FindingCategory::RuntimeScriptFetchedFromFloatingUrl
);
}
#[test]
fn floating_deno_run_main_is_flagged() {
let g = step_with_body(
"deno run https://raw.githubusercontent.com/denoland/deno/refs/heads/main/tools/verify_pr_title.js \"$PR_TITLE\"",
);
let findings = runtime_script_fetched_from_floating_url(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn pinned_curl_url_with_tag_not_flagged() {
let g = step_with_body(
"curl -fsSL https://raw.githubusercontent.com/tilt-dev/tilt/v0.33.10/scripts/install.sh | bash",
);
let findings = runtime_script_fetched_from_floating_url(&g);
assert!(findings.is_empty(), "tag-pinned URL must not fire");
}
#[test]
fn curl_without_pipe_to_shell_not_flagged() {
let g = step_with_body(
"curl -sSLO https://raw.githubusercontent.com/rust-lang/rust/master/src/tools/linkchecker/linkcheck.sh",
);
let findings = runtime_script_fetched_from_floating_url(&g);
assert!(findings.is_empty(), "download-only must not fire");
}
#[test]
fn bash_process_substitution_curl_main_is_flagged() {
let g = step_with_body(
"bash <(curl -s https://raw.githubusercontent.com/some/repo/main/install.sh)",
);
let findings = runtime_script_fetched_from_floating_url(&g);
assert_eq!(findings.len(), 1);
}
fn graph_with_trigger_and_action(trigger: &str, action: &str) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("pr.yml"));
g.metadata.insert(META_TRIGGER.into(), trigger.into());
g.add_node(NodeKind::Image, action, TrustZone::ThirdParty);
g
}
#[test]
fn pull_request_target_with_floating_main_action_flagged_critical() {
let g = graph_with_trigger_and_action("pull_request_target", "actions/checkout@main");
let findings = pr_trigger_with_floating_action_ref(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert_eq!(
findings[0].category,
FindingCategory::PrTriggerWithFloatingActionRef
);
}
#[test]
fn pull_request_target_with_sha_pinned_action_not_flagged() {
let g = graph_with_trigger_and_action(
"pull_request_target",
"denoland/setup-deno@667a34cdef165d8d2b2e98dde39547c9daac7282",
);
let findings = pr_trigger_with_floating_action_ref(&g);
assert!(findings.is_empty());
}
#[test]
fn issue_comment_with_floating_action_flagged() {
let g = graph_with_trigger_and_action("issue_comment", "foo/bar@v1");
let findings = pr_trigger_with_floating_action_ref(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn pull_request_only_does_not_trigger_critical_compound_rule() {
let g = graph_with_trigger_and_action("pull_request", "foo/bar@main");
let findings = pr_trigger_with_floating_action_ref(&g);
assert!(
findings.is_empty(),
"pull_request alone must not produce a critical compound finding"
);
}
#[test]
fn comma_separated_trigger_with_pull_request_target_flagged() {
let g = graph_with_trigger_and_action(
"pull_request_target,push,workflow_dispatch",
"foo/bar@main",
);
let findings = pr_trigger_with_floating_action_ref(&g);
assert_eq!(findings.len(), 1);
}
fn graph_with_trigger_and_step_body(trigger: &str, body: &str) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("consumer.yml"));
g.metadata.insert(META_TRIGGER.into(), trigger.into());
let id = g.add_node(NodeKind::Step, "capture", TrustZone::FirstParty);
if let Some(node) = g.nodes.get_mut(id) {
node.metadata
.insert(META_SCRIPT_BODY.into(), body.to_string());
}
g
}
#[test]
fn workflow_run_gh_pr_view_to_github_env_flagged() {
let body = "gh pr view --repo \"$REPO\" \"$PR_BRANCH\" --json 'number' --jq '\"PR_NUMBER=\\(.number)\"' >> $GITHUB_ENV";
let g = graph_with_trigger_and_step_body("workflow_run", body);
let findings = untrusted_api_response_to_env_sink(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
}
#[test]
fn workflow_run_without_env_sink_not_flagged() {
let body = "gh pr view --repo \"$REPO\" \"$PR_BRANCH\" --json number";
let g = graph_with_trigger_and_step_body("workflow_run", body);
let findings = untrusted_api_response_to_env_sink(&g);
assert!(findings.is_empty());
}
#[test]
fn push_trigger_writing_to_env_not_flagged() {
let body = "gh pr view --json number --jq .number >> $GITHUB_ENV";
let g = graph_with_trigger_and_step_body("push", body);
let findings = untrusted_api_response_to_env_sink(&g);
assert!(findings.is_empty());
}
#[test]
fn workflow_run_multiline_capture_then_write_flagged() {
let body = "VAL=$(gh api repos/foo/bar/pulls/$PR --jq .head.ref)\necho \"BRANCH=$VAL\" >> $GITHUB_ENV";
let g = graph_with_trigger_and_step_body("workflow_run", body);
let findings = untrusted_api_response_to_env_sink(&g);
assert_eq!(findings.len(), 1);
}
fn graph_pr_with_login_action(trigger: &str, action: &str) -> AuthorityGraph {
let mut g = AuthorityGraph::new(source("pr-build.yml"));
g.metadata.insert(META_TRIGGER.into(), trigger.into());
g.add_node(NodeKind::Image, action, TrustZone::ThirdParty);
g
}
#[test]
fn pr_with_floating_login_to_gar_flagged() {
let g = graph_pr_with_login_action(
"pull_request",
"grafana/shared-workflows/actions/login-to-gar@main",
);
let findings = pr_build_pushes_image_with_floating_credentials(&g);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::High);
assert_eq!(
findings[0].category,
FindingCategory::PrBuildPushesImageWithFloatingCredentials
);
}
#[test]
fn pr_with_floating_docker_login_action_flagged() {
let g = graph_pr_with_login_action("pull_request", "docker/login-action@v3");
let findings = pr_build_pushes_image_with_floating_credentials(&g);
assert_eq!(findings.len(), 1);
}
#[test]
fn pr_with_sha_pinned_docker_login_not_flagged() {
let g = graph_pr_with_login_action(
"pull_request",
"docker/login-action@343f7c4344506bcbf9b4de18042ae17996df046d",
);
let findings = pr_build_pushes_image_with_floating_credentials(&g);
assert!(findings.is_empty());
}
#[test]
fn push_trigger_with_floating_login_action_not_flagged() {
let g = graph_pr_with_login_action("push", "docker/login-action@v3");
let findings = pr_build_pushes_image_with_floating_credentials(&g);
assert!(findings.is_empty());
}
#[test]
fn pr_with_unrelated_unpinned_action_not_flagged() {
let g = graph_pr_with_login_action("pull_request", "actions/checkout@v4");
let findings = pr_build_pushes_image_with_floating_credentials(&g);
assert!(findings.is_empty());
}
}