use crate::finding::{Finding, FindingCategory, Recommendation, Severity};
use crate::graph::{AuthorityGraph, NodeKind, TrustZone};
use crate::propagation::PropagationPath;
use serde::de::{self, MapAccess, Visitor};
use serde::{Deserialize, Deserializer};
use std::collections::HashMap;
use std::fmt;
use std::fs;
use std::io;
use std::path::{Path, PathBuf};
#[derive(Debug, Clone, Deserialize)]
pub struct CustomRule {
pub id: String,
pub name: String,
#[serde(default)]
pub description: String,
pub severity: Severity,
pub category: FindingCategory,
#[serde(rename = "match", default)]
pub match_spec: MatchSpec,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct MatchSpec {
#[serde(default)]
pub source: NodeMatcher,
#[serde(default)]
pub sink: NodeMatcher,
#[serde(default)]
pub path: PathMatcher,
#[serde(default)]
pub graph_metadata: MetadataMatcher,
#[serde(default)]
pub standalone: Option<NodeMatcher>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum OneOrMany<T> {
One(T),
Many(Vec<T>),
}
impl<T: PartialEq> OneOrMany<T> {
fn contains(&self, value: &T) -> bool {
match self {
OneOrMany::One(v) => v == value,
OneOrMany::Many(vs) => vs.iter().any(|v| v == value),
}
}
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
pub enum MetadataPredicate {
Equals(String),
Op(MetadataOp),
}
#[derive(Debug, Clone, Default, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct MetadataOp {
#[serde(default)]
pub equals: Option<String>,
#[serde(default)]
pub not_equals: Option<String>,
#[serde(default)]
pub contains: Option<String>,
#[serde(default, rename = "in")]
pub in_: Option<Vec<String>>,
}
impl MetadataOp {
fn matches(&self, actual: Option<&String>) -> bool {
if let Some(want) = &self.equals {
if actual.map(|s| s.as_str()) != Some(want.as_str()) {
return false;
}
}
if let Some(want) = &self.not_equals {
if actual.map(|s| s.as_str()) == Some(want.as_str()) {
return false;
}
}
if let Some(needle) = &self.contains {
match actual {
Some(s) if s.contains(needle.as_str()) => {}
_ => return false,
}
}
if let Some(allowed) = &self.in_ {
match actual {
Some(s) if allowed.iter().any(|a| a == s) => {}
_ => return false,
}
}
true
}
}
impl MetadataPredicate {
fn matches(&self, actual: Option<&String>) -> bool {
match self {
MetadataPredicate::Equals(want) => actual.map(|s| s.as_str()) == Some(want.as_str()),
MetadataPredicate::Op(op) => op.matches(actual),
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MetadataMatcher {
pub fields: HashMap<String, MetadataPredicate>,
pub not: Option<Box<MetadataMatcher>>,
}
impl MetadataMatcher {
fn matches(&self, metadata: &HashMap<String, String>) -> bool {
for (key, pred) in &self.fields {
if !pred.matches(metadata.get(key)) {
return false;
}
}
if let Some(inner) = &self.not {
if inner.matches(metadata) {
return false;
}
}
true
}
fn is_empty(&self) -> bool {
self.fields.is_empty() && self.not.is_none()
}
}
impl<'de> Deserialize<'de> for MetadataMatcher {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
struct MetadataMatcherVisitor;
impl<'de> Visitor<'de> for MetadataMatcherVisitor {
type Value = MetadataMatcher;
fn expecting(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.write_str("a metadata predicate map (field -> string|operator) with optional `not:` sub-map")
}
fn visit_map<M>(self, mut map: M) -> Result<MetadataMatcher, M::Error>
where
M: MapAccess<'de>,
{
let mut fields: HashMap<String, MetadataPredicate> = HashMap::new();
let mut not: Option<Box<MetadataMatcher>> = None;
while let Some(key) = map.next_key::<String>()? {
if key == "not" {
if not.is_some() {
return Err(de::Error::duplicate_field("not"));
}
let inner: MetadataMatcher = map.next_value()?;
not = Some(Box::new(inner));
} else {
let value: MetadataPredicate = map.next_value()?;
if fields.insert(key.clone(), value).is_some() {
return Err(de::Error::custom(format!(
"duplicate metadata field `{key}`"
)));
}
}
}
Ok(MetadataMatcher { fields, not })
}
}
deserializer.deserialize_map(MetadataMatcherVisitor)
}
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct NodeMatcher {
#[serde(default)]
pub node_type: Option<OneOrMany<NodeKind>>,
#[serde(default)]
pub trust_zone: Option<OneOrMany<TrustZone>>,
#[serde(default)]
pub metadata: MetadataMatcher,
#[serde(default)]
pub not: Option<Box<NodeMatcher>>,
}
#[derive(Debug, Clone, Default, Deserialize)]
pub struct PathMatcher {
#[serde(default)]
pub crosses_to: Vec<TrustZone>,
}
#[derive(Debug)]
pub enum CustomRuleError {
FileRead(PathBuf, io::Error),
YamlParse(PathBuf, serde_yaml::Error),
}
impl fmt::Display for CustomRuleError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
CustomRuleError::FileRead(path, err) => {
write!(
f,
"failed to read custom rule file {}: {err}",
path.display()
)
}
CustomRuleError::YamlParse(path, err) => {
write!(
f,
"failed to parse custom rule file {}: {err}",
path.display()
)
}
}
}
}
impl std::error::Error for CustomRuleError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
CustomRuleError::FileRead(_, err) => Some(err),
CustomRuleError::YamlParse(_, err) => Some(err),
}
}
}
pub fn load_rules_dir(dir: &Path) -> Result<Vec<CustomRule>, Vec<CustomRuleError>> {
let mut entries: Vec<PathBuf> = Vec::new();
let read_dir = match fs::read_dir(dir) {
Ok(rd) => rd,
Err(err) => return Err(vec![CustomRuleError::FileRead(dir.to_path_buf(), err)]),
};
for entry in read_dir.flatten() {
let path = entry.path();
if !path.is_file() {
continue;
}
match path.extension().and_then(|e| e.to_str()) {
Some("yml") | Some("yaml") => entries.push(path),
_ => {}
}
}
entries.sort();
let mut rules = Vec::new();
let mut errors = Vec::new();
for path in entries {
match fs::read_to_string(&path) {
Ok(content) => match parse_rules_multi_doc(&content) {
Ok(mut parsed) => rules.append(&mut parsed),
Err(err) => errors.push(CustomRuleError::YamlParse(path, err)),
},
Err(err) => errors.push(CustomRuleError::FileRead(path, err)),
}
}
if errors.is_empty() {
Ok(rules)
} else {
Err(errors)
}
}
pub fn parse_rules_multi_doc(content: &str) -> Result<Vec<CustomRule>, serde_yaml::Error> {
let mut rules = Vec::new();
for doc in serde_yaml::Deserializer::from_str(content) {
let value = serde_yaml::Value::deserialize(doc)?;
if value.is_null() {
continue;
}
let rule: CustomRule = serde_yaml::from_value(value)?;
rules.push(rule);
}
Ok(rules)
}
impl NodeMatcher {
fn matches(&self, node: &crate::graph::Node) -> bool {
if let Some(kinds) = &self.node_type {
if !kinds.contains(&node.kind) {
return false;
}
}
if let Some(zones) = &self.trust_zone {
if !zones.contains(&node.trust_zone) {
return false;
}
}
if !self.metadata.matches(&node.metadata) {
return false;
}
if let Some(inner) = &self.not {
if inner.matches(node) {
return false;
}
}
true
}
#[allow(dead_code)]
fn is_wildcard(&self) -> bool {
self.node_type.is_none()
&& self.trust_zone.is_none()
&& self.metadata.is_empty()
&& self.not.is_none()
}
}
impl PathMatcher {
fn matches(&self, path: &PropagationPath) -> bool {
if self.crosses_to.is_empty() {
return true;
}
match path.boundary_crossing {
Some((_, to_zone)) => self.crosses_to.contains(&to_zone),
None => false,
}
}
}
pub fn evaluate_custom_rules(
graph: &AuthorityGraph,
paths: &[PropagationPath],
rules: &[CustomRule],
) -> Vec<Finding> {
let mut findings = Vec::new();
for rule in rules {
if let Some(matcher) = &rule.match_spec.standalone {
if !rule.match_spec.graph_metadata.matches(&graph.metadata) {
continue;
}
for node in &graph.nodes {
if !matcher.matches(node) {
continue;
}
findings.push(Finding {
severity: rule.severity,
category: rule.category,
nodes_involved: vec![node.id],
message: format!("[{}] {}: {}", rule.id, rule.name, node.name),
recommendation: Recommendation::Manual {
action: if rule.description.is_empty() {
format!("Review custom rule '{}'", rule.id)
} else {
rule.description.clone()
},
},
path: None,
});
}
continue;
}
if !rule.match_spec.graph_metadata.matches(&graph.metadata) {
continue;
}
for path in paths {
let source_node = match graph.node(path.source) {
Some(n) => n,
None => continue,
};
let sink_node = match graph.node(path.sink) {
Some(n) => n,
None => continue,
};
if !rule.match_spec.source.matches(source_node) {
continue;
}
if !rule.match_spec.sink.matches(sink_node) {
continue;
}
if !rule.match_spec.path.matches(path) {
continue;
}
findings.push(Finding {
severity: rule.severity,
category: rule.category,
nodes_involved: vec![path.source, path.sink],
message: format!(
"[{}] {}: {} -> {}",
rule.id, rule.name, source_node.name, sink_node.name
),
recommendation: Recommendation::Manual {
action: if rule.description.is_empty() {
format!("Review custom rule '{}'", rule.id)
} else {
rule.description.clone()
},
},
path: Some(path.clone()),
});
}
}
findings
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::{AuthorityGraph, EdgeKind, PipelineSource};
use crate::propagation::{propagation_analysis, DEFAULT_MAX_HOPS};
fn source() -> PipelineSource {
PipelineSource {
file: "test.yml".into(),
repo: None,
git_ref: None,
commit_sha: None,
}
}
fn build_graph_with_paths() -> (AuthorityGraph, Vec<PropagationPath>) {
let mut g = AuthorityGraph::new(source());
let secret = g.add_node(NodeKind::Secret, "API_KEY", TrustZone::FirstParty);
let trusted = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
let untrusted = g.add_node(NodeKind::Step, "third-party", TrustZone::Untrusted);
g.add_edge(trusted, secret, EdgeKind::HasAccessTo);
g.add_edge(trusted, untrusted, EdgeKind::DelegatesTo);
let paths = propagation_analysis(&g, DEFAULT_MAX_HOPS);
(g, paths)
}
fn one<T>(v: T) -> Option<OneOrMany<T>> {
Some(OneOrMany::One(v))
}
#[test]
fn custom_rule_fires_on_matching_path() {
let (graph, paths) = build_graph_with_paths();
let rule = CustomRule {
id: "secret_to_untrusted".into(),
name: "Secret reaching untrusted step".into(),
description: "Custom policy".into(),
severity: Severity::Critical,
category: FindingCategory::AuthorityPropagation,
match_spec: MatchSpec {
source: NodeMatcher {
node_type: None,
trust_zone: one(TrustZone::FirstParty),
metadata: MetadataMatcher::default(),
not: None,
},
sink: NodeMatcher {
node_type: None,
trust_zone: one(TrustZone::Untrusted),
metadata: MetadataMatcher::default(),
not: None,
},
path: PathMatcher::default(),
graph_metadata: MetadataMatcher::default(),
standalone: None,
},
};
let findings = evaluate_custom_rules(&graph, &paths, &[rule]);
assert_eq!(findings.len(), 1);
assert_eq!(findings[0].severity, Severity::Critical);
assert!(findings[0].message.contains("secret_to_untrusted"));
}
#[test]
fn custom_rule_does_not_fire_when_predicates_miss() {
let (graph, paths) = build_graph_with_paths();
let rule = CustomRule {
id: "miss".into(),
name: "Untrusted source".into(),
description: String::new(),
severity: Severity::Critical,
category: FindingCategory::AuthorityPropagation,
match_spec: MatchSpec {
source: NodeMatcher {
node_type: None,
trust_zone: one(TrustZone::Untrusted),
metadata: MetadataMatcher::default(),
not: None,
},
sink: NodeMatcher::default(),
path: PathMatcher::default(),
graph_metadata: MetadataMatcher::default(),
standalone: None,
},
};
let findings = evaluate_custom_rules(&graph, &paths, &[rule]);
assert!(findings.is_empty());
}
#[test]
fn yaml_round_trip_loads_full_rule() {
let yaml = r#"
id: my_secret_to_untrusted
name: Secret reaching untrusted step
description: "Custom policy: secrets must not reach untrusted steps"
severity: critical
category: authority_propagation
match:
source:
node_type: secret
trust_zone: first_party
sink:
node_type: step
trust_zone: untrusted
path:
crosses_to: [untrusted]
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml must parse");
assert_eq!(rule.id, "my_secret_to_untrusted");
assert_eq!(rule.severity, Severity::Critical);
assert!(matches!(
rule.match_spec.source.node_type,
Some(OneOrMany::One(NodeKind::Secret))
));
assert!(matches!(
rule.match_spec.sink.trust_zone,
Some(OneOrMany::One(TrustZone::Untrusted))
));
assert_eq!(rule.match_spec.path.crosses_to, vec![TrustZone::Untrusted]);
}
#[test]
fn metadata_predicate_must_match_all_keys() {
let mut g = AuthorityGraph::new(source());
let mut meta = HashMap::new();
meta.insert("kind".to_string(), "deploy".to_string());
let secret =
g.add_node_with_metadata(NodeKind::Secret, "TOKEN", TrustZone::FirstParty, meta);
let sink = g.add_node(NodeKind::Step, "remote", TrustZone::Untrusted);
let step = g.add_node(NodeKind::Step, "use", TrustZone::FirstParty);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
g.add_edge(step, sink, EdgeKind::DelegatesTo);
let paths = propagation_analysis(&g, DEFAULT_MAX_HOPS);
let mut want_fields = HashMap::new();
want_fields.insert(
"kind".to_string(),
MetadataPredicate::Equals("deploy".to_string()),
);
let hit = CustomRule {
id: "hit".into(),
name: "n".into(),
description: String::new(),
severity: Severity::High,
category: FindingCategory::AuthorityPropagation,
match_spec: MatchSpec {
source: NodeMatcher {
node_type: one(NodeKind::Secret),
trust_zone: None,
metadata: MetadataMatcher {
fields: want_fields,
not: None,
},
not: None,
},
sink: NodeMatcher::default(),
path: PathMatcher::default(),
graph_metadata: MetadataMatcher::default(),
standalone: None,
},
};
assert_eq!(evaluate_custom_rules(&g, &paths, &[hit]).len(), 1);
let mut wrong_fields = HashMap::new();
wrong_fields.insert(
"kind".to_string(),
MetadataPredicate::Equals("build".to_string()),
);
let miss = CustomRule {
id: "miss".into(),
name: "n".into(),
description: String::new(),
severity: Severity::High,
category: FindingCategory::AuthorityPropagation,
match_spec: MatchSpec {
source: NodeMatcher {
node_type: one(NodeKind::Secret),
trust_zone: None,
metadata: MetadataMatcher {
fields: wrong_fields,
not: None,
},
not: None,
},
sink: NodeMatcher::default(),
path: PathMatcher::default(),
graph_metadata: MetadataMatcher::default(),
standalone: None,
},
};
assert!(evaluate_custom_rules(&g, &paths, &[miss]).is_empty());
}
#[test]
fn load_rules_dir_reads_yml_and_yaml() {
let tmp = std::env::temp_dir().join(format!("taudit-custom-rules-{}", std::process::id()));
fs::create_dir_all(&tmp).unwrap();
let yml_path = tmp.join("a.yml");
let yaml_path = tmp.join("b.yaml");
let other_path = tmp.join("c.txt");
fs::write(
&yml_path,
"id: a\nname: a\nseverity: high\ncategory: authority_propagation\n",
)
.unwrap();
fs::write(
&yaml_path,
"id: b\nname: b\nseverity: medium\ncategory: unpinned_action\n",
)
.unwrap();
fs::write(&other_path, "ignored").unwrap();
let rules = load_rules_dir(&tmp).expect("load must succeed");
assert_eq!(rules.len(), 2);
assert_eq!(rules[0].id, "a");
assert_eq!(rules[1].id, "b");
let _ = fs::remove_dir_all(&tmp);
}
#[test]
fn load_rules_dir_reports_yaml_errors_with_path() {
let tmp =
std::env::temp_dir().join(format!("taudit-custom-rules-bad-{}", std::process::id()));
fs::create_dir_all(&tmp).unwrap();
let bad = tmp.join("bad.yml");
fs::write(&bad, "id: x\nseverity: not-a-real-severity\n").unwrap();
let errs = load_rules_dir(&tmp).expect_err("should fail");
assert_eq!(errs.len(), 1);
let msg = errs[0].to_string();
assert!(msg.contains("bad.yml"), "error must mention path: {msg}");
let _ = fs::remove_dir_all(&tmp);
}
fn simple_first_to_untrusted_graph() -> (AuthorityGraph, Vec<PropagationPath>) {
let mut g = AuthorityGraph::new(source());
let mut meta = HashMap::new();
meta.insert("oidc".to_string(), "true".to_string());
meta.insert("permissions".to_string(), "contents: write".to_string());
meta.insert("role".to_string(), "admin".to_string());
let secret =
g.add_node_with_metadata(NodeKind::Identity, "GH_TOKEN", TrustZone::FirstParty, meta);
let step = g.add_node(NodeKind::Step, "use-it", TrustZone::FirstParty);
let untrusted = g.add_node(NodeKind::Step, "third-party", TrustZone::Untrusted);
g.add_edge(step, secret, EdgeKind::HasAccessTo);
g.add_edge(step, untrusted, EdgeKind::DelegatesTo);
let paths = propagation_analysis(&g, DEFAULT_MAX_HOPS);
(g, paths)
}
#[test]
fn negation_on_trust_zone_inverts_match() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
sink:
not:
trust_zone: untrusted
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
}
#[test]
fn negation_on_node_type_list_matches_other_kinds() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
not:
node_type: [secret, identity]
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
let yaml2 = r#"
id: r2
name: r2
severity: high
category: authority_propagation
match:
source:
not:
node_type: [step]
"#;
let rule2: CustomRule = serde_yaml::from_str(yaml2).expect("yaml parses");
assert!(!evaluate_custom_rules(&graph, &paths, &[rule2]).is_empty());
}
#[test]
fn metadata_negation_matches_absent_or_other_value() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
not:
oidc: "true"
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
}
#[test]
fn metadata_contains_does_substring_match() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
permissions:
contains: "contents: write"
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule]).len(), 1);
let yaml_miss = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
permissions:
contains: "actions: write"
"#;
let rule_miss: CustomRule = serde_yaml::from_str(yaml_miss).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule_miss]).is_empty());
}
#[test]
fn metadata_in_matches_any_of_allowed_values() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
role:
in: [admin, owner, write]
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule]).len(), 1);
let yaml_miss = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
role:
in: [reader, none]
"#;
let rule_miss: CustomRule = serde_yaml::from_str(yaml_miss).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule_miss]).is_empty());
}
#[test]
fn metadata_not_equals_excludes_specific_value() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
role:
not_equals: admin
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
let yaml_hit = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
role:
not_equals: reader
"#;
let rule_hit: CustomRule = serde_yaml::from_str(yaml_hit).expect("yaml parses");
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule_hit]).len(), 1);
}
#[test]
fn nested_not_collapses_to_inner_condition() {
let (graph, paths) = simple_first_to_untrusted_graph();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
not:
not:
trust_zone: first_party
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(!evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
}
#[test]
fn node_type_accepts_single_value_back_compat() {
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
node_type: identity
trust_zone: first_party
metadata:
oidc: "true"
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("v0.4 form must still parse");
assert!(matches!(
rule.match_spec.source.node_type,
Some(OneOrMany::One(NodeKind::Identity))
));
assert!(matches!(
rule.match_spec.source.trust_zone,
Some(OneOrMany::One(TrustZone::FirstParty))
));
let pred = rule
.match_spec
.source
.metadata
.fields
.get("oidc")
.expect("oidc predicate");
assert!(matches!(pred, MetadataPredicate::Equals(v) if v == "true"));
let (graph, paths) = simple_first_to_untrusted_graph();
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule]).len(), 1);
}
#[test]
fn node_type_accepts_list_form() {
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
node_type: [secret, identity]
trust_zone: [first_party, third_party]
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("list form must parse");
match &rule.match_spec.source.node_type {
Some(OneOrMany::Many(v)) => {
assert_eq!(v, &vec![NodeKind::Secret, NodeKind::Identity]);
}
other => panic!("expected list form, got {other:?}"),
}
let (graph, paths) = simple_first_to_untrusted_graph();
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule]).len(), 1);
}
fn pr_context_graph_with_meta(meta: &[(&str, &str)]) -> (AuthorityGraph, Vec<PropagationPath>) {
let mut g = AuthorityGraph::new(source());
let mut secret_meta = HashMap::new();
secret_meta.insert("variable_group".to_string(), "true".to_string());
let secret = g.add_node_with_metadata(
NodeKind::Secret,
"VG_SECRET",
TrustZone::FirstParty,
secret_meta,
);
let step = g.add_node(NodeKind::Step, "use", TrustZone::FirstParty);
let untrusted = g.add_node(NodeKind::Step, "third-party", TrustZone::Untrusted);
g.add_edge(step, secret, crate::graph::EdgeKind::HasAccessTo);
g.add_edge(step, untrusted, crate::graph::EdgeKind::DelegatesTo);
for (k, v) in meta {
g.metadata.insert((*k).to_string(), (*v).to_string());
}
let paths = propagation_analysis(&g, DEFAULT_MAX_HOPS);
(g, paths)
}
#[test]
fn graph_metadata_equals_matches_when_value_present() {
let (graph, paths) = pr_context_graph_with_meta(&[("trigger", "pr")]);
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
graph_metadata:
trigger:
equals: pr
source:
metadata:
variable_group: "true"
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule]).len(), 1);
}
#[test]
fn graph_metadata_in_matches_any_of_listed_values() {
let (graph, paths) = pr_context_graph_with_meta(&[("trigger", "merge_request_event")]);
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
graph_metadata:
trigger:
in: [pull_request_target, pr, merge_request_event]
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(!evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
}
#[test]
fn graph_metadata_negation_excludes_unwanted_trigger() {
let (graph, paths) = pr_context_graph_with_meta(&[("trigger", "push")]);
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
graph_metadata:
not:
trigger:
equals: push
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(evaluate_custom_rules(&graph, &paths, &[rule]).is_empty());
let (graph2, paths2) = pr_context_graph_with_meta(&[("trigger", "pr")]);
let rule2: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert!(!evaluate_custom_rules(&graph2, &paths2, &[rule2]).is_empty());
}
#[test]
fn graph_metadata_missing_key_does_not_match_no_crash() {
let (graph, paths) = pr_context_graph_with_meta(&[]);
assert!(!graph.metadata.contains_key("trigger"));
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
graph_metadata:
trigger:
equals: pr
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
let findings = evaluate_custom_rules(&graph, &paths, &[rule]);
assert!(findings.is_empty(), "missing key must yield no findings");
}
#[test]
fn rules_without_graph_metadata_remain_backward_compatible() {
let (graph, paths) = pr_context_graph_with_meta(&[("trigger", "anything")]);
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
variable_group: "true"
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert_eq!(evaluate_custom_rules(&graph, &paths, &[rule]).len(), 1);
}
fn graph_with_image_sink() -> (AuthorityGraph, Vec<PropagationPath>) {
let mut g = AuthorityGraph::new(source());
let identity = g.add_node(NodeKind::Identity, "GH_TOKEN", TrustZone::FirstParty);
let step = g.add_node(NodeKind::Step, "publish", TrustZone::FirstParty);
let image = g.add_node(
NodeKind::Image,
"third-party/deploy@v1",
TrustZone::Untrusted,
);
g.add_edge(step, identity, crate::graph::EdgeKind::HasAccessTo);
g.add_edge(step, image, crate::graph::EdgeKind::UsesImage);
let paths = propagation_analysis(&g, DEFAULT_MAX_HOPS);
(g, paths)
}
#[test]
fn sink_node_type_image_matches_image_path_endpoint() {
let (graph, paths) = graph_with_image_sink();
let yaml = r#"
id: r
name: r
severity: high
category: untrusted_with_authority
match:
sink:
node_type: image
trust_zone: untrusted
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
let findings = evaluate_custom_rules(&graph, &paths, &[rule]);
assert!(
!findings.is_empty(),
"Image-as-sink must produce at least one finding"
);
}
#[test]
fn standalone_matches_every_floating_image_in_graph() {
let mut g = AuthorityGraph::new(source());
let _step = g.add_node(NodeKind::Step, "build", TrustZone::FirstParty);
let _floating1 = g.add_node(NodeKind::Image, "alpine:latest", TrustZone::ThirdParty);
let _floating2 = g.add_node(NodeKind::Image, "ubuntu:22.04", TrustZone::ThirdParty);
let mut pinned_meta = HashMap::new();
pinned_meta.insert("digest".to_string(), "sha256:abc".to_string());
let _pinned = g.add_node_with_metadata(
NodeKind::Image,
"alpine@sha256:abc",
TrustZone::ThirdParty,
pinned_meta,
);
let paths: Vec<PropagationPath> = Vec::new();
let yaml = r#"
id: floating_image_standalone
name: Floating image
severity: medium
category: unpinned_action
match:
standalone:
node_type: image
not:
metadata:
digest:
contains: "sha256:"
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
let findings = evaluate_custom_rules(&g, &paths, &[rule]);
assert_eq!(
findings.len(),
2,
"standalone must fire once per floating Image node"
);
}
#[test]
fn standalone_supports_in_operator() {
let mut g = AuthorityGraph::new(source());
let mut self_hosted_meta = HashMap::new();
self_hosted_meta.insert("self_hosted".to_string(), "true".to_string());
let _pool = g.add_node_with_metadata(
NodeKind::Image,
"self-pool",
TrustZone::FirstParty,
self_hosted_meta,
);
let _hosted = g.add_node(NodeKind::Image, "ubuntu-latest", TrustZone::ThirdParty);
let paths: Vec<PropagationPath> = Vec::new();
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
standalone:
node_type: image
metadata:
self_hosted:
in: ["true", "yes"]
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
let findings = evaluate_custom_rules(&g, &paths, &[rule]);
assert_eq!(findings.len(), 1, "in:[\"true\",\"yes\"] matches one node");
}
#[test]
fn standalone_still_honors_graph_metadata_gate() {
let mut g_pr = AuthorityGraph::new(source());
g_pr.metadata.insert("trigger".into(), "pr".into());
g_pr.add_node(NodeKind::Image, "alpine:latest", TrustZone::ThirdParty);
let mut g_push = AuthorityGraph::new(source());
g_push.metadata.insert("trigger".into(), "push".into());
g_push.add_node(NodeKind::Image, "alpine:latest", TrustZone::ThirdParty);
let yaml = r#"
id: r
name: r
severity: low
category: unpinned_action
match:
graph_metadata:
trigger:
equals: pr
standalone:
node_type: image
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
assert_eq!(
evaluate_custom_rules(&g_pr, &[], std::slice::from_ref(&rule)).len(),
1,
"fires on PR graph"
);
assert!(
evaluate_custom_rules(&g_push, &[], std::slice::from_ref(&rule)).is_empty(),
"graph_metadata gate must suppress on push graph"
);
}
#[test]
fn standalone_ignores_source_sink_path_fields() {
let mut g = AuthorityGraph::new(source());
let _img = g.add_node(NodeKind::Image, "alpine:latest", TrustZone::ThirdParty);
let paths: Vec<PropagationPath> = Vec::new();
let yaml = r#"
id: r
name: r
severity: low
category: unpinned_action
match:
source:
node_type: secret # would never match anything in this graph
standalone:
node_type: image
"#;
let rule: CustomRule = serde_yaml::from_str(yaml).expect("yaml parses");
let findings = evaluate_custom_rules(&g, &paths, &[rule]);
assert_eq!(findings.len(), 1);
}
#[test]
fn multi_doc_yaml_loads_each_document_as_separate_rule() {
let yaml = r#"
id: rule_a
name: First rule
severity: high
category: authority_propagation
match:
source:
node_type: secret
---
id: rule_b
name: Second rule
severity: critical
category: untrusted_with_authority
match:
sink:
trust_zone: untrusted
---
id: rule_c
name: Third rule
severity: medium
category: unpinned_action
"#;
let rules = parse_rules_multi_doc(yaml).expect("multi-doc must parse");
assert_eq!(rules.len(), 3, "expected 3 rules from 3-doc YAML");
assert_eq!(rules[0].id, "rule_a");
assert_eq!(rules[1].id, "rule_b");
assert_eq!(rules[2].id, "rule_c");
assert_eq!(rules[1].severity, Severity::Critical);
}
#[test]
fn single_doc_yaml_still_loads_identically() {
let yaml = r#"
id: solo
name: Solo rule
severity: high
category: authority_propagation
"#;
let rules = parse_rules_multi_doc(yaml).expect("single-doc must parse");
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].id, "solo");
}
#[test]
fn multi_doc_with_empty_leading_document_is_skipped() {
let yaml = r#"---
---
id: only
name: only
severity: low
category: authority_propagation
"#;
let rules = parse_rules_multi_doc(yaml).expect("must parse");
assert_eq!(rules.len(), 1);
assert_eq!(rules[0].id, "only");
}
#[test]
fn load_rules_dir_loads_multi_doc_files() {
let tmp =
std::env::temp_dir().join(format!("taudit-custom-rules-multi-{}", std::process::id()));
fs::create_dir_all(&tmp).unwrap();
let path = tmp.join("bundle.yml");
fs::write(
&path,
r#"
id: a
name: a
severity: high
category: authority_propagation
---
id: b
name: b
severity: medium
category: unpinned_action
---
id: c
name: c
severity: low
category: authority_propagation
"#,
)
.unwrap();
let rules = load_rules_dir(&tmp).expect("multi-doc file must load");
assert_eq!(rules.len(), 3, "expected 3 rules from one bundled file");
let _ = fs::remove_dir_all(&tmp);
}
#[test]
fn unknown_metadata_operator_is_rejected() {
let yaml = r#"
id: r
name: r
severity: high
category: authority_propagation
match:
source:
metadata:
role:
starts_with: adm
"#;
let err = serde_yaml::from_str::<CustomRule>(yaml)
.expect_err("unknown operator must be rejected");
let msg = err.to_string();
assert!(
msg.contains("metadata") || msg.contains("variant"),
"parse should fail with a meaningful location: {msg}"
);
}
}