use crate::graph::{
AuthorityGraph, EdgeKind, Node, NodeId, NodeKind, META_GHA_ACTION, META_GHA_WITH_INPUTS,
META_JOB_NAME, META_OIDC, META_SCRIPT_BODY,
};
use serde::Serialize;
use std::fmt::Write as _;
#[derive(Debug, Clone, Default)]
pub struct ExploitGraphOptions<'a> {
pub job: Option<&'a str>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExploitGraphExport {
pub schema_version: &'static str,
pub schema_uri: &'static str,
pub view: &'static str,
pub source: crate::graph::PipelineSource,
pub paths: Vec<ExploitPathExport>,
pub summary: ExploitGraphSummary,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExploitGraphSummary {
pub path_count: usize,
pub observed_path_count: usize,
pub authority_path_count: usize,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExploitPathExport {
pub rule_id: &'static str,
pub umbrella_rule_id: &'static str,
pub rule_scope: &'static str,
pub mutable_channel: &'static str,
pub helper: &'static str,
pub helper_resolution: &'static str,
pub authority_transport: Vec<&'static str>,
pub authority_origin: &'static str,
pub nodes: Vec<ExploitNodeExport>,
pub edges: Vec<ExploitEdgeExport>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExploitNodeExport {
pub id: String,
pub kind: &'static str,
pub label: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_node_id: Option<NodeId>,
#[serde(skip_serializing_if = "Option::is_none")]
pub note: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
pub struct ExploitEdgeExport {
pub from: String,
pub to: String,
pub kind: &'static str,
pub confidence: &'static str,
pub authority_bearing: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub observed: Option<bool>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MutableChannel {
GithubPath,
}
impl MutableChannel {
fn label(self) -> &'static str {
match self {
Self::GithubPath => "GITHUB_PATH",
}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum HelperResolution {
BareCommand,
ShellString,
ToolkitWhich,
AbsolutePath,
ToolcachePath,
ActionOwnedPath,
UserSuppliedAbsolutePath,
AmbientPathByExplicitMode,
Unknown,
}
impl HelperResolution {
fn as_str(self) -> &'static str {
match self {
Self::BareCommand => "bare_command",
Self::ShellString => "shell_string",
Self::ToolkitWhich => "toolkit_which",
Self::AbsolutePath => "absolute_path",
Self::ToolcachePath => "toolcache_path",
Self::ActionOwnedPath => "action_owned_path",
Self::UserSuppliedAbsolutePath => "user_supplied_absolute_path",
Self::AmbientPathByExplicitMode => "ambient_path_by_explicit_mode",
Self::Unknown => "unknown",
}
}
fn is_path_selected(self) -> bool {
matches!(
self,
Self::BareCommand | Self::ShellString | Self::ToolkitWhich
)
}
fn is_downgrade_or_suppress(self) -> bool {
matches!(
self,
Self::AbsolutePath
| Self::ToolcachePath
| Self::ActionOwnedPath
| Self::UserSuppliedAbsolutePath
| Self::AmbientPathByExplicitMode
)
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AuthorityTransport {
Argv,
Stdin,
Env,
CredentialFilePath,
ConfigFilePath,
WorkspaceFile,
OidcRequestEnv,
}
impl AuthorityTransport {
fn as_str(self) -> &'static str {
match self {
Self::Argv => "argv",
Self::Stdin => "stdin",
Self::Env => "env",
Self::CredentialFilePath => "credential_file_path",
Self::ConfigFilePath => "config_file_path",
Self::WorkspaceFile => "workspace_file",
Self::OidcRequestEnv => "oidc_request_env",
}
}
}
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum AuthorityOrigin {
CallerProvidedSecret,
ActionInputSecret,
GitHubToken,
OidcRequestCapability,
CloudCredentialMintedByAction,
RegistryCredentialMintedByAction,
GeneratedCredentialFile,
DerivedSecretPayload,
}
impl AuthorityOrigin {
fn as_str(self) -> &'static str {
match self {
Self::CallerProvidedSecret => "caller_provided_secret",
Self::ActionInputSecret => "action_input_secret",
Self::GitHubToken => "github_token",
Self::OidcRequestCapability => "oidc_request_capability",
Self::CloudCredentialMintedByAction => "cloud_credential_minted_by_action",
Self::RegistryCredentialMintedByAction => "registry_credential_minted_by_action",
Self::GeneratedCredentialFile => "generated_credential_file",
Self::DerivedSecretPayload => "derived_secret_payload",
}
}
}
#[derive(Debug, Clone, Copy)]
struct ExploitRule {
id: &'static str,
scope: &'static str,
mutable_channel: MutableChannel,
require_prior_mutation: bool,
require_path_selected_helper: bool,
require_authority_materialization: bool,
}
#[derive(Debug, Clone, Copy)]
struct ActionCatalogEntry {
action: &'static str,
action_label: &'static str,
helper: &'static str,
helper_resolution: HelperResolution,
authority_patterns: &'static [AuthorityPattern],
mode_gate: Option<ModeGate>,
suppress: bool,
}
#[derive(Debug, Clone, Copy)]
struct AuthorityPattern {
artifact_label: &'static str,
env_label: Option<&'static str>,
transports: &'static [AuthorityTransport],
origin: AuthorityOrigin,
input_keys: &'static [&'static str],
accepts_graph_authority: bool,
}
#[derive(Debug, Clone, Copy)]
struct ModeGate {
input_key: &'static str,
expected: TruthyMode,
}
#[derive(Debug, Clone, Copy)]
enum TruthyMode {
Truthy,
}
#[derive(Debug, Clone, Copy)]
struct AuthorityMaterialization<'a> {
pattern: AuthorityPattern,
graph_authority: Option<&'a Node>,
}
const RULE_SCOPE_EXPLOIT_PATH: &str = "exploit_path";
const RULE_HELPER_AUTHORITY: ExploitRule = ExploitRule {
id: "EXPLOIT_PATH_HELPER_AUTHORITY",
scope: RULE_SCOPE_EXPLOIT_PATH,
mutable_channel: MutableChannel::GithubPath,
require_prior_mutation: true,
require_path_selected_helper: true,
require_authority_materialization: true,
};
const EXPLOIT_RULES: &[ExploitRule] = &[RULE_HELPER_AUTHORITY];
const TRANSPORT_CREDENTIAL_FILE_ENV: &[AuthorityTransport] = &[
AuthorityTransport::CredentialFilePath,
AuthorityTransport::Env,
];
const TRANSPORT_ENV_STDIN: &[AuthorityTransport] =
&[AuthorityTransport::Env, AuthorityTransport::Stdin];
const TRANSPORT_STDIN: &[AuthorityTransport] = &[AuthorityTransport::Stdin];
const TRANSPORT_ARGV: &[AuthorityTransport] = &[AuthorityTransport::Argv];
const TRANSPORT_ENV: &[AuthorityTransport] = &[AuthorityTransport::Env];
const TRANSPORT_CONFIG_OIDC_ENV: &[AuthorityTransport] = &[
AuthorityTransport::ConfigFilePath,
AuthorityTransport::OidcRequestEnv,
];
const INPUT_FIREBASE_SERVICE_ACCOUNT: &[&str] = &["firebaseServiceAccount"];
const INPUT_AZURE_CREDS: &[&str] = &["creds"];
const INPUT_CLOUDFLARE_AUTHORITY: &[&str] = &["apiToken", "secrets", "command"];
const INPUT_DOCKER_LOGIN_AUTHORITY: &[&str] = &["password", "registry"];
const INPUT_NPM_PUBLISH_AUTHORITY: &[&str] = &["token", "access"];
const INPUT_EMPTY: &[&str] = &[];
const FIREBASE_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "service-account credential file",
env_label: Some("GOOGLE_APPLICATION_CREDENTIALS"),
transports: TRANSPORT_CREDENTIAL_FILE_ENV,
origin: AuthorityOrigin::GeneratedCredentialFile,
input_keys: INPUT_FIREBASE_SERVICE_ACCOUNT,
accepts_graph_authority: true,
}];
const AZURE_LOGIN_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "Azure service principal credential payload",
env_label: None,
transports: TRANSPORT_ARGV,
origin: AuthorityOrigin::ActionInputSecret,
input_keys: INPUT_AZURE_CREDS,
accepts_graph_authority: true,
}];
const CLOUDFLARE_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "Wrangler deploy secret payload",
env_label: Some("CLOUDFLARE_API_TOKEN / SECRET_ALPHA"),
transports: TRANSPORT_ENV_STDIN,
origin: AuthorityOrigin::ActionInputSecret,
input_keys: INPUT_CLOUDFLARE_AUTHORITY,
accepts_graph_authority: true,
}];
const DOCKER_LOGIN_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "registry password payload",
env_label: None,
transports: TRANSPORT_STDIN,
origin: AuthorityOrigin::CallerProvidedSecret,
input_keys: INPUT_DOCKER_LOGIN_AUTHORITY,
accepts_graph_authority: true,
}];
const ECR_LOGIN_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "AWS-minted ECR password",
env_label: None,
transports: TRANSPORT_ARGV,
origin: AuthorityOrigin::RegistryCredentialMintedByAction,
input_keys: INPUT_EMPTY,
accepts_graph_authority: true,
}];
const NPM_PUBLISH_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: ".npmrc / package publish token",
env_label: Some("NODE_AUTH_TOKEN"),
transports: TRANSPORT_ENV,
origin: AuthorityOrigin::ActionInputSecret,
input_keys: INPUT_NPM_PUBLISH_AUTHORITY,
accepts_graph_authority: true,
}];
const TELEPORT_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "Teleport bot config / identity material",
env_label: Some("GitHub OIDC request env"),
transports: TRANSPORT_CONFIG_OIDC_ENV,
origin: AuthorityOrigin::OidcRequestCapability,
input_keys: INPUT_EMPTY,
accepts_graph_authority: true,
}];
const SETUP_GCLOUD_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "ambient gcloud credential context",
env_label: Some("Google Cloud auth env"),
transports: TRANSPORT_ARGV,
origin: AuthorityOrigin::CloudCredentialMintedByAction,
input_keys: INPUT_EMPTY,
accepts_graph_authority: true,
}];
const GORELEASER_AUTHORITY: &[AuthorityPattern] = &[AuthorityPattern {
artifact_label: "GoReleaser token context",
env_label: None,
transports: TRANSPORT_ENV,
origin: AuthorityOrigin::ActionInputSecret,
input_keys: INPUT_EMPTY,
accepts_graph_authority: true,
}];
const ACTION_CATALOG: &[ActionCatalogEntry] = &[
ActionCatalogEntry {
action: "firebaseextended/action-hosting-deploy",
action_label: "FirebaseExtended/action-hosting-deploy",
helper: "npx",
helper_resolution: HelperResolution::BareCommand,
authority_patterns: FIREBASE_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "azure/login",
action_label: "Azure/login",
helper: "az",
helper_resolution: HelperResolution::BareCommand,
authority_patterns: AZURE_LOGIN_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "cloudflare/wrangler-action",
action_label: "cloudflare/wrangler-action",
helper: "npx",
helper_resolution: HelperResolution::ShellString,
authority_patterns: CLOUDFLARE_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "docker/login-action",
action_label: "docker/login-action",
helper: "docker",
helper_resolution: HelperResolution::BareCommand,
authority_patterns: DOCKER_LOGIN_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "aws-actions/amazon-ecr-login",
action_label: "aws-actions/amazon-ecr-login",
helper: "docker",
helper_resolution: HelperResolution::BareCommand,
authority_patterns: ECR_LOGIN_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "js-devtools/npm-publish",
action_label: "JS-DevTools/npm-publish",
helper: "npm",
helper_resolution: HelperResolution::BareCommand,
authority_patterns: NPM_PUBLISH_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "teleport-actions/database-tunnel",
action_label: "teleport-actions/database-tunnel",
helper: "tbot",
helper_resolution: HelperResolution::ToolkitWhich,
authority_patterns: TELEPORT_AUTHORITY,
mode_gate: None,
suppress: false,
},
ActionCatalogEntry {
action: "google-github-actions/setup-gcloud",
action_label: "google-github-actions/setup-gcloud",
helper: "gcloud",
helper_resolution: HelperResolution::AmbientPathByExplicitMode,
authority_patterns: SETUP_GCLOUD_AUTHORITY,
mode_gate: Some(ModeGate {
input_key: "skip_install",
expected: TruthyMode::Truthy,
}),
suppress: false,
},
ActionCatalogEntry {
action: "goreleaser/goreleaser-action",
action_label: "goreleaser/goreleaser-action",
helper: "goreleaser",
helper_resolution: HelperResolution::ToolcachePath,
authority_patterns: GORELEASER_AUTHORITY,
mode_gate: None,
suppress: true,
},
];
#[derive(Debug)]
struct ExploitPath<'a> {
rule: ExploitRule,
rule_id: &'static str,
writer: &'a Node,
action_step: &'a Node,
catalog: ActionCatalogEntry,
authority: AuthorityMaterialization<'a>,
}
pub fn render_dot(graph: &AuthorityGraph, options: ExploitGraphOptions<'_>) -> String {
let paths = exploit_paths(graph, options.job);
let mut out = String::new();
out.push_str("digraph taudit_exploit {\n");
out.push_str(" rankdir=LR;\n");
out.push_str(" graph [fontname=\"Helvetica\"];\n");
out.push_str(" node [fontname=\"Helvetica\" style=\"rounded\"];\n");
out.push_str(" edge [fontname=\"Helvetica\"];\n");
let title = format!("taudit exploit view\nsource: {}", graph.source.file);
out.push_str(&format!(" label=\"{}\";\n", dot_escape(&title)));
out.push_str(" labelloc=\"t\";\n");
legend(&mut out);
for (idx, path) in paths.iter().enumerate() {
render_path(&mut out, idx, path);
}
if paths.is_empty() {
out.push_str(
" \"empty\" [label=\"No exploit path found\\nrequires prior mutable state and later PATH-resolved helper authority\" shape=note color=gray];\n",
);
}
out.push_str("}\n");
out
}
pub fn render_mermaid(graph: &AuthorityGraph, options: ExploitGraphOptions<'_>) -> String {
let paths = exploit_paths(graph, options.job);
let mut out = String::new();
out.push_str("flowchart LR\n");
if paths.is_empty() {
out.push_str(" empty[\"No exploit path found\"]\n");
return out;
}
for (idx, path) in paths.iter().enumerate() {
let nodes = path_nodes(idx, path);
for node in &nodes {
let label = mermaid_escape(&format!("{}: {}", node.kind, node.label));
match node.kind {
"Step" => {
let _ = writeln!(out, " {}(\"{}\")", node.id, label);
}
"ThirdPartyAction" => {
let _ = writeln!(out, " {}[\"{}\"]", node.id, label);
}
"MutableState" | "AuthorityEnv" => {
let _ = writeln!(out, " {}[[\"{}\"]]", node.id, label);
}
"ResolvedHelper" | "ObservedSink" => {
let _ = writeln!(out, " {}{{\"{}\"}}", node.id, label);
}
_ => {
let _ = writeln!(out, " {}[\"{}\"]", node.id, label);
}
}
}
for edge in path_edges(idx, path) {
let label = mermaid_escape(edge.kind);
let arrow = if edge.confidence == "observed" {
"==>"
} else if edge.confidence == "inferred" {
"-.->"
} else {
"-->"
};
let _ = writeln!(out, " {} {}|{}| {}", edge.from, arrow, label, edge.to);
}
}
out
}
pub fn build_export(
graph: &AuthorityGraph,
options: ExploitGraphOptions<'_>,
) -> ExploitGraphExport {
let paths: Vec<ExploitPathExport> = exploit_paths(graph, options.job)
.iter()
.enumerate()
.map(|(idx, path)| ExploitPathExport {
rule_id: path.rule_id,
umbrella_rule_id: path.rule.id,
rule_scope: path.rule.scope,
mutable_channel: path.rule.mutable_channel.label(),
helper: path.catalog.helper,
helper_resolution: path.catalog.helper_resolution.as_str(),
authority_transport: path
.authority
.pattern
.transports
.iter()
.map(|transport| transport.as_str())
.collect(),
authority_origin: path.authority.pattern.origin.as_str(),
nodes: path_nodes(idx, path),
edges: path_edges(idx, path),
})
.collect();
let observed_path_count = paths
.iter()
.filter(|path| path.edges.iter().any(|edge| edge.observed == Some(true)))
.count();
let authority_path_count = paths
.iter()
.filter(|path| path.edges.iter().any(|edge| edge.authority_bearing))
.count();
ExploitGraphExport {
schema_version: "1.0.0",
schema_uri: "https://taudit.dev/schemas/exploit-graph.v1.json",
view: "exploit",
source: graph.source.clone(),
summary: ExploitGraphSummary {
path_count: paths.len(),
observed_path_count,
authority_path_count,
},
paths,
}
}
pub fn render_json_pretty(
graph: &AuthorityGraph,
options: ExploitGraphOptions<'_>,
) -> Result<String, serde_json::Error> {
serde_json::to_string_pretty(&build_export(graph, options))
}
pub fn render_summary_pretty(
graph: &AuthorityGraph,
options: ExploitGraphOptions<'_>,
) -> Result<String, serde_json::Error> {
let export = build_export(graph, options);
serde_json::to_string_pretty(&serde_json::json!({
"schema_version": export.schema_version,
"schema_uri": export.schema_uri,
"view": export.view,
"source": export.source,
"summary": export.summary,
}))
}
fn exploit_paths<'a>(graph: &'a AuthorityGraph, job_filter: Option<&str>) -> Vec<ExploitPath<'a>> {
if graph
.metadata
.get("platform")
.map(String::as_str)
.is_some_and(|p| p != "github-actions")
{
return Vec::new();
}
graph
.nodes_of_kind(NodeKind::Step)
.filter(|step| {
job_filter.is_none()
|| step.metadata.get(META_JOB_NAME).map(String::as_str) == job_filter
})
.flat_map(|step| {
EXPLOIT_RULES
.iter()
.copied()
.filter_map(move |rule| match_exploit_rule(graph, step, rule))
})
.collect()
}
fn match_exploit_rule<'a>(
graph: &'a AuthorityGraph,
step: &'a Node,
rule: ExploitRule,
) -> Option<ExploitPath<'a>> {
let catalog = catalog_entry(step)?;
if catalog.suppress || catalog.helper_resolution.is_downgrade_or_suppress() {
return None;
}
if rule.require_path_selected_helper && !catalog.helper_resolution.is_path_selected() {
return None;
}
let writer = if rule.require_prior_mutation {
prior_mutable_writer(graph, step, rule.mutable_channel)?
} else {
step
};
let authority = if rule.require_authority_materialization {
authority_materialization(graph, step, catalog)?
} else {
AuthorityMaterialization {
pattern: catalog.authority_patterns.first().copied()?,
graph_authority: None,
}
};
Some(ExploitPath {
rule,
rule_id: transport_rule_id(authority.pattern.transports),
writer,
action_step: step,
catalog,
authority,
})
}
fn catalog_entry(step: &Node) -> Option<ActionCatalogEntry> {
let action = step.metadata.get(META_GHA_ACTION)?.to_ascii_lowercase();
let entry = ACTION_CATALOG
.iter()
.copied()
.find(|entry| entry.action == action)?;
if !mode_gate_matches(step, entry.mode_gate) {
return None;
}
Some(entry)
}
fn authority_materialization<'a>(
graph: &'a AuthorityGraph,
step: &Node,
catalog: ActionCatalogEntry,
) -> Option<AuthorityMaterialization<'a>> {
let graph_authority = first_authority_node(graph, step.id);
catalog
.authority_patterns
.iter()
.copied()
.find(|pattern| {
(pattern.accepts_graph_authority && graph_authority.is_some())
|| pattern_input_signal(step, *pattern)
|| origin_materializes_without_input(pattern.origin)
})
.map(|pattern| AuthorityMaterialization {
pattern,
graph_authority,
})
}
fn pattern_input_signal(step: &Node, pattern: AuthorityPattern) -> bool {
let inputs = step
.metadata
.get(META_GHA_WITH_INPUTS)
.map(String::as_str)
.unwrap_or("");
pattern
.input_keys
.iter()
.any(|key| contains_input_key(inputs, key))
}
fn origin_materializes_without_input(origin: AuthorityOrigin) -> bool {
matches!(
origin,
AuthorityOrigin::CloudCredentialMintedByAction
| AuthorityOrigin::RegistryCredentialMintedByAction
| AuthorityOrigin::OidcRequestCapability
| AuthorityOrigin::GeneratedCredentialFile
| AuthorityOrigin::DerivedSecretPayload
)
}
fn mode_gate_matches(step: &Node, gate: Option<ModeGate>) -> bool {
match gate {
None => true,
Some(ModeGate {
input_key,
expected: TruthyMode::Truthy,
}) => with_truthy(step, input_key),
}
}
fn contains_input_key(inputs: &str, key: &str) -> bool {
inputs.lines().any(|line| {
line.split_once('=')
.map(|(k, v)| k.eq_ignore_ascii_case(key) && !v.trim().is_empty())
.unwrap_or(false)
})
}
fn with_truthy(step: &Node, key: &str) -> bool {
let Some(inputs) = step.metadata.get(META_GHA_WITH_INPUTS) else {
return false;
};
inputs.lines().any(|line| {
let Some((k, v)) = line.split_once('=') else {
return false;
};
k.eq_ignore_ascii_case(key)
&& matches!(v.trim().to_ascii_lowercase().as_str(), "true" | "yes" | "1")
})
}
fn prior_mutable_writer<'a>(
graph: &'a AuthorityGraph,
step: &Node,
channel: MutableChannel,
) -> Option<&'a Node> {
let job = step.metadata.get(META_JOB_NAME)?;
graph
.nodes_of_kind(NodeKind::Step)
.filter(|candidate| candidate.id < step.id)
.filter(|candidate| candidate.metadata.get(META_JOB_NAME) == Some(job))
.filter(|candidate| {
candidate
.metadata
.get(META_SCRIPT_BODY)
.map(|body| writes_mutable_channel(body, channel.label()))
.unwrap_or(false)
})
.last()
}
fn writes_mutable_channel(body: &str, channel: &str) -> bool {
body.contains(channel)
&& (body.contains(">>")
|| body.contains("tee -a")
|| body.contains("Out-File")
|| body.contains("Add-Content"))
}
fn first_authority_node(graph: &AuthorityGraph, step_id: NodeId) -> Option<&Node> {
graph
.edges_from(step_id)
.filter(|e| e.kind == EdgeKind::HasAccessTo)
.filter_map(|e| graph.node(e.to))
.find(|n| match n.kind {
NodeKind::Secret => true,
NodeKind::Identity => {
n.metadata
.get(META_OIDC)
.map(|v| v == "true")
.unwrap_or(false)
|| n.name != "GITHUB_TOKEN"
}
_ => false,
})
}
fn render_path(out: &mut String, idx: usize, path: &ExploitPath<'_>) {
for node_export in path_nodes(idx, path) {
let (shape, color) = dot_node_style(node_export.kind);
node(
out,
&node_export.id,
node_export.kind,
&node_export.label,
shape,
color,
node_export.note.as_deref(),
);
}
for edge_export in path_edges(idx, path) {
let (style, color, pen) = dot_edge_style(&edge_export);
edge(
out,
&edge_export.from,
&edge_export.to,
edge_export.kind,
style,
color,
pen,
);
}
}
fn path_nodes(idx: usize, path: &ExploitPath<'_>) -> Vec<ExploitNodeExport> {
let ids = PathIds::new(idx);
let mut nodes = vec![
ExploitNodeExport {
id: ids.step,
kind: "Step",
label: path.writer.name.clone(),
source_node_id: Some(path.writer.id),
note: None,
},
ExploitNodeExport {
id: ids.state,
kind: "MutableState",
label: path.rule.mutable_channel.label().into(),
source_node_id: None,
note: None,
},
ExploitNodeExport {
id: ids.helper,
kind: "ResolvedHelper",
label: format!("PATH-selected {}", path.catalog.helper),
source_node_id: None,
note: None,
},
ExploitNodeExport {
id: ids.boundary,
kind: "ThirdPartyAction",
label: path.catalog.action_label.into(),
source_node_id: Some(path.action_step.id),
note: Some(path.action_step.name.clone()),
},
ExploitNodeExport {
id: ids.artifact,
kind: "AuthorityArtifact",
label: authority_artifact_label(path),
source_node_id: path.authority.graph_authority.map(|n| n.id),
note: Some(authority_note(path.authority.pattern)),
},
];
if let Some(env_name) = path.authority.pattern.env_label {
nodes.push(ExploitNodeExport {
id: ids.env,
kind: "AuthorityEnv",
label: env_name.into(),
source_node_id: path.authority.graph_authority.map(|n| n.id),
note: None,
});
}
nodes
}
fn path_edges(idx: usize, path: &ExploitPath<'_>) -> Vec<ExploitEdgeExport> {
let ids = PathIds::new(idx);
let mut edges = vec![
edge_export(
&ids.step,
&ids.state,
"mutates_state",
"static",
false,
None,
),
edge_export(
&ids.state,
&ids.helper,
"influences_resolution",
"inferred",
false,
None,
),
edge_export(
&ids.step,
&ids.boundary,
"uses_action",
"static",
false,
None,
),
edge_export(
&ids.boundary,
&ids.artifact,
"creates_authority_artifact",
"static",
true,
None,
),
];
if path.authority.pattern.env_label.is_some() {
edges.push(edge_export(
&ids.artifact,
&ids.env,
"exposes_env",
"static",
true,
None,
));
}
edges.push(edge_export(
&ids.boundary,
&ids.helper,
"invokes_helper",
"static",
true,
None,
));
edges.push(edge_export(
&ids.helper,
&ids.artifact,
helper_authority_edge_kind(path.authority.pattern.transports),
"inferred",
true,
None,
));
edges
}
fn helper_authority_edge_kind(transports: &[AuthorityTransport]) -> &'static str {
if transports.contains(&AuthorityTransport::CredentialFilePath)
|| transports.contains(&AuthorityTransport::ConfigFilePath)
{
"reads_artifact"
} else if transports.contains(&AuthorityTransport::Stdin) {
"receives_stdin"
} else if transports.contains(&AuthorityTransport::Argv) {
"receives_argv"
} else if transports.contains(&AuthorityTransport::Env)
|| transports.contains(&AuthorityTransport::OidcRequestEnv)
{
"inherits_env"
} else {
"receives_authority"
}
}
fn transport_rule_id(transports: &[AuthorityTransport]) -> &'static str {
if transports.contains(&AuthorityTransport::CredentialFilePath)
|| transports.contains(&AuthorityTransport::ConfigFilePath)
{
"EXPLOIT_PATH_HELPER_CREDENTIAL_FILE"
} else if transports.contains(&AuthorityTransport::Stdin) {
"EXPLOIT_PATH_HELPER_STDIN_AUTHORITY"
} else if transports.contains(&AuthorityTransport::Argv) {
"EXPLOIT_PATH_HELPER_ARGV_AUTHORITY"
} else if transports.contains(&AuthorityTransport::Env)
|| transports.contains(&AuthorityTransport::OidcRequestEnv)
{
"EXPLOIT_PATH_HELPER_ENV_AUTHORITY"
} else {
"EXPLOIT_PATH_HELPER_AUTHORITY"
}
}
fn edge_export(
from: &str,
to: &str,
kind: &'static str,
confidence: &'static str,
authority_bearing: bool,
observed: Option<bool>,
) -> ExploitEdgeExport {
ExploitEdgeExport {
from: from.into(),
to: to.into(),
kind,
confidence,
authority_bearing,
observed,
}
}
struct PathIds {
step: String,
state: String,
helper: String,
boundary: String,
artifact: String,
env: String,
}
impl PathIds {
fn new(idx: usize) -> Self {
let p = format!("p{idx}");
Self {
step: format!("{p}_step"),
state: format!("{p}_state"),
helper: format!("{p}_helper"),
boundary: format!("{p}_action"),
artifact: format!("{p}_artifact"),
env: format!("{p}_env"),
}
}
}
fn dot_node_style(kind: &str) -> (&'static str, &'static str) {
match kind {
"Step" => ("ellipse", "green"),
"MutableState" => ("folder", "orange"),
"ResolvedHelper" => ("component", "red"),
"ThirdPartyAction" => ("box3d", "goldenrod"),
"AuthorityArtifact" => ("hexagon", "red"),
"AuthorityEnv" => ("note", "red"),
"ObservedSink" => ("doubleoctagon", "red"),
_ => ("box", "gray"),
}
}
fn dot_edge_style(edge: &ExploitEdgeExport) -> (&'static str, &'static str, u8) {
match edge.confidence {
"observed" => ("bold", "red", 3),
"inferred" if edge.authority_bearing => ("dashed", "red", 2),
"inferred" => ("dashed", "black", 1),
_ if edge.authority_bearing => ("solid", "red", 2),
_ => ("solid", "black", 1),
}
}
fn authority_artifact_label(path: &ExploitPath<'_>) -> String {
path.authority.pattern.artifact_label.to_string()
}
fn authority_note(pattern: AuthorityPattern) -> String {
let transport_labels: Vec<_> = pattern
.transports
.iter()
.map(|transport| transport.as_str())
.collect();
format!(
"transport: {}; origin: {}",
transport_labels.join(", "),
pattern.origin.as_str()
)
}
fn node(
out: &mut String,
id: &str,
kind: &str,
label: &str,
shape: &str,
color: &str,
note: Option<&str>,
) {
let rendered = match note {
Some(note) => format!("{kind}\n{label}\n{note}"),
None => format!("{kind}\n{label}"),
};
let _ = writeln!(
out,
" \"{}\" [label=\"{}\" shape={} color={}];",
dot_escape(id),
dot_escape(&rendered),
shape,
color
);
}
fn edge(out: &mut String, from: &str, to: &str, label: &str, style: &str, color: &str, pen: u8) {
let _ = writeln!(
out,
" \"{}\" -> \"{}\" [label=\"{}\" style={} color={} penwidth={}];",
dot_escape(from),
dot_escape(to),
dot_escape(label),
style,
color,
pen
);
}
fn legend(out: &mut String) {
out.push_str(" subgraph cluster_legend {\n");
out.push_str(" label=\"edge confidence\";\n");
out.push_str(" color=gray;\n");
out.push_str(" \"legend_static_a\" [label=\"static/source\" shape=plaintext];\n");
out.push_str(" \"legend_static_b\" [label=\"solid\" shape=plaintext];\n");
out.push_str(
" \"legend_static_a\" -> \"legend_static_b\" [style=solid label=\"static\"];\n",
);
out.push_str(" \"legend_inferred_a\" [label=\"inferred taint\" shape=plaintext];\n");
out.push_str(" \"legend_inferred_b\" [label=\"dashed\" shape=plaintext];\n");
out.push_str(" \"legend_inferred_a\" -> \"legend_inferred_b\" [style=dashed label=\"inferred\"];\n");
out.push_str(" \"legend_observed_a\" [label=\"hosted witness\" shape=plaintext];\n");
out.push_str(" \"legend_observed_b\" [label=\"bold red\" shape=plaintext];\n");
out.push_str(" \"legend_observed_a\" -> \"legend_observed_b\" [style=bold color=red penwidth=3 label=\"observed\"];\n");
out.push_str(" }\n");
}
fn dot_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'\\' => out.push_str("\\\\"),
'"' => out.push_str("\\\""),
'\n' | '\r' => out.push_str("\\n"),
_ => out.push(c),
}
}
out
}
fn mermaid_escape(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for c in s.chars() {
match c {
'&' => out.push_str("&"),
'<' => out.push_str("<"),
'>' => out.push_str(">"),
'"' => out.push_str("""),
'\n' | '\r' => out.push(' '),
'|' => out.push_str("|"),
'[' => out.push_str("["),
']' => out.push_str("]"),
'{' | '}' => out.push('·'),
_ => out.push(c),
}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::PipelineSource;
use crate::graph::*;
use std::collections::HashMap;
fn source(file: &str) -> PipelineSource {
PipelineSource {
file: file.into(),
repo: None,
git_ref: None,
commit_sha: None,
}
}
#[test]
fn firebase_exploit_path_dot_shows_red_authority_path() {
let mut g = AuthorityGraph::new(source("deploy.yml"));
g.metadata
.insert(META_PLATFORM.into(), "github-actions".into());
let mut writer_meta = HashMap::new();
writer_meta.insert(META_JOB_NAME.into(), "deploy".into());
writer_meta.insert(
META_SCRIPT_BODY.into(),
"mkdir -p /tmp/fake\necho /tmp/fake >> $GITHUB_PATH".into(),
);
g.add_node_with_metadata(
NodeKind::Step,
"Create fake npx and persist PATH mutation",
TrustZone::FirstParty,
writer_meta,
);
let mut action_meta = HashMap::new();
action_meta.insert(META_JOB_NAME.into(), "deploy".into());
action_meta.insert(
META_GHA_ACTION.into(),
"FirebaseExtended/action-hosting-deploy".into(),
);
action_meta.insert(
META_GHA_WITH_INPUTS.into(),
"firebaseServiceAccount=${{ secrets.FIREBASE_SERVICE_ACCOUNT }}".into(),
);
let action = g.add_node_with_metadata(
NodeKind::Step,
"Run Firebase Hosting witness",
TrustZone::ThirdParty,
action_meta,
);
let secret = g.add_node(
NodeKind::Secret,
"FIREBASE_SERVICE_ACCOUNT",
TrustZone::FirstParty,
);
g.add_edge(action, secret, EdgeKind::HasAccessTo);
let dot = render_dot(&g, ExploitGraphOptions { job: None });
assert!(dot.contains("MutableState\\nGITHUB_PATH"));
assert!(dot.contains("ResolvedHelper\\nPATH-selected npx"));
assert!(dot.contains("ThirdPartyAction\\nFirebaseExtended/action-hosting-deploy"));
assert!(dot.contains("AuthorityArtifact\\nservice-account credential file"));
assert!(dot.contains("AuthorityEnv\\nGOOGLE_APPLICATION_CREDENTIALS"));
assert!(dot.contains("label=\"influences_resolution\" style=dashed"));
assert!(dot.contains("label=\"creates_authority_artifact\" style=solid color=red"));
assert!(!dot.contains("ObservedSink"));
assert!(!dot.contains("observed_by_witness"));
let json = render_json_pretty(&g, ExploitGraphOptions { job: None }).unwrap();
assert!(json.contains("\"view\": \"exploit\""));
assert!(json.contains("\"kind\": \"MutableState\""));
assert!(json.contains("\"authority_bearing\": true"));
}
}