use serde::Serialize;
use crate::ast::SigmaRule;
pub const EXEMPT_KEY: &str = "rsigma.ads.exempt";
pub const ADS_PREFIX: &str = "rsigma.ads.";
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum AdsSection {
Goal,
Categorization,
Strategy,
TechnicalContext,
BlindSpots,
FalsePositives,
Validation,
Priority,
Response,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
#[serde(rename_all = "snake_case", tag = "kind", content = "field")]
pub enum AdsCarrier {
StandardField(&'static str),
CustomAttribute(&'static str),
}
impl AdsCarrier {
pub fn name(&self) -> &'static str {
match self {
AdsCarrier::StandardField(name) | AdsCarrier::CustomAttribute(name) => name,
}
}
}
#[derive(Debug, Clone, Copy, Serialize)]
pub struct AdsSectionInfo {
pub section: AdsSection,
pub id: &'static str,
pub carrier: AdsCarrier,
pub default_required: bool,
pub description: &'static str,
}
macro_rules! ads_catalogue {
($($variant:ident => ($id:expr, $carrier:expr, $required:expr, $desc:expr)),+ $(,)?) => {
const ALL_ADS_SECTIONS: &[AdsSection] = &[$(AdsSection::$variant),+];
fn describe(section: AdsSection) -> AdsSectionInfo {
match section {
$(AdsSection::$variant => AdsSectionInfo {
section: AdsSection::$variant,
id: $id,
carrier: $carrier,
default_required: $required,
description: $desc,
}),+
}
}
};
}
use AdsCarrier::{CustomAttribute, StandardField};
ads_catalogue! {
Goal => ("goal", StandardField("description"), true,
"What the detection is trying to catch."),
Categorization => ("categorization", StandardField("tags"), true,
"The ATT&CK categorization, carried by attack.* tags."),
Strategy => ("strategy", CustomAttribute("rsigma.ads.strategy"), true,
"A one-paragraph abstract of the detection approach."),
TechnicalContext => ("technical_context", CustomAttribute("rsigma.ads.technical_context"), true,
"The data source, fields, and environment knowledge the detection needs."),
BlindSpots => ("blind_spots", CustomAttribute("rsigma.ads.blind_spots"), true,
"How an attacker could evade the detection, and what it assumes."),
FalsePositives => ("false_positives", StandardField("falsepositives"), true,
"Known benign triggers, carried by falsepositives."),
Validation => ("validation", CustomAttribute("rsigma.ads.validation"), true,
"A recipe that produces a true-positive event the detection fires on."),
Priority => ("priority", CustomAttribute("rsigma.ads.priority"), true,
"Why the detection's level is what it is (the priority rationale)."),
Response => ("response", CustomAttribute("rsigma.ads.response"), true,
"What an analyst should do when the detection fires."),
}
pub fn ads_catalogue() -> Vec<AdsSectionInfo> {
ALL_ADS_SECTIONS.iter().map(|&s| describe(s)).collect()
}
impl AdsSection {
pub fn all() -> &'static [AdsSection] {
ALL_ADS_SECTIONS
}
pub fn from_id(id: &str) -> Option<AdsSection> {
ALL_ADS_SECTIONS.iter().copied().find(|s| s.info().id == id)
}
pub fn info(&self) -> AdsSectionInfo {
describe(*self)
}
pub fn id(&self) -> &'static str {
self.info().id
}
pub fn carrier(&self) -> AdsCarrier {
self.info().carrier
}
pub fn carrier_field(&self) -> &'static str {
self.info().carrier.name()
}
pub fn default_required(&self) -> bool {
self.info().default_required
}
pub fn content(&self, rule: &SigmaRule) -> Option<AdsContent> {
match self {
AdsSection::Goal => rule
.description
.as_deref()
.and_then(non_blank)
.map(|s| AdsContent::Text(s.to_string())),
AdsSection::Categorization => {
let tags: Vec<String> = attack_tags(rule).map(str::to_string).collect();
if tags.is_empty() {
None
} else {
Some(AdsContent::List(tags))
}
}
AdsSection::FalsePositives => {
let items: Vec<String> = rule
.falsepositives
.iter()
.filter_map(|s| non_blank(s).map(str::to_string))
.collect();
if items.is_empty() {
None
} else {
Some(AdsContent::List(items))
}
}
other => {
let key = other.carrier_field();
rule.custom_attributes.get(key).and_then(content_from_value)
}
}
}
pub fn is_present(&self, rule: &SigmaRule) -> bool {
self.content(rule).is_some()
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
#[serde(untagged)]
pub enum AdsContent {
Text(String),
List(Vec<String>),
}
impl AdsContent {
pub fn as_text(&self) -> String {
match self {
AdsContent::Text(s) => s.clone(),
AdsContent::List(items) => items.join("\n"),
}
}
pub fn items(&self) -> Vec<String> {
match self {
AdsContent::Text(s) => vec![s.clone()],
AdsContent::List(items) => items.clone(),
}
}
}
pub fn is_exempt(rule: &SigmaRule) -> bool {
rule.custom_attributes
.get(EXEMPT_KEY)
.and_then(|v| v.as_bool())
.unwrap_or(false)
}
pub fn attack_tags(rule: &SigmaRule) -> impl Iterator<Item = &str> {
rule.tags
.iter()
.map(String::as_str)
.filter(|t| t.starts_with("attack."))
}
pub fn has_categorization(rule: &SigmaRule, extra_namespaces: &[String]) -> bool {
rule.tags
.iter()
.filter_map(|t| t.split('.').next())
.any(|ns| ns == "attack" || extra_namespaces.iter().any(|e| e == ns))
}
#[derive(Debug, Clone, Serialize)]
pub struct AdsSectionStatus {
pub id: &'static str,
pub required: bool,
pub present: bool,
pub carrier: &'static str,
#[serde(skip_serializing_if = "Option::is_none")]
pub content: Option<AdsContent>,
}
#[derive(Debug, Clone, Serialize)]
pub struct AdsDocument {
pub sections: Vec<AdsSectionStatus>,
}
impl AdsDocument {
pub fn from_rule(rule: &SigmaRule) -> Self {
let sections = AdsSection::all()
.iter()
.map(|s| {
let content = s.content(rule);
AdsSectionStatus {
id: s.id(),
required: s.default_required(),
present: content.is_some(),
carrier: s.carrier_field(),
content,
}
})
.collect();
AdsDocument { sections }
}
pub fn missing_required(&self) -> Vec<&'static str> {
self.sections
.iter()
.filter(|s| s.required && !s.present)
.map(|s| s.id)
.collect()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct AdsScaffoldEntry {
pub key: &'static str,
pub placeholder: AdsContent,
}
pub fn scaffold_missing(rule: &SigmaRule) -> Vec<AdsScaffoldEntry> {
AdsSection::all()
.iter()
.filter(|s| matches!(s.carrier(), AdsCarrier::CustomAttribute(_)))
.filter(|s| !s.is_present(rule))
.map(|s| AdsScaffoldEntry {
key: s.carrier_field(),
placeholder: placeholder_for(*s),
})
.collect()
}
fn placeholder_for(section: AdsSection) -> AdsContent {
match section {
AdsSection::Strategy => AdsContent::Text(
"TODO: a one-paragraph abstract of what this detection does and the approach it takes."
.to_string(),
),
AdsSection::TechnicalContext => AdsContent::Text(
"TODO: the data source, fields, and environment knowledge needed to understand this \
detection."
.to_string(),
),
AdsSection::BlindSpots => AdsContent::List(vec![
"TODO: a way an attacker could evade this detection.".to_string(),
"TODO: an assumption this detection relies on.".to_string(),
]),
AdsSection::Validation => AdsContent::Text(
"TODO: the steps to generate a true-positive event that triggers this detection."
.to_string(),
),
AdsSection::Priority => AdsContent::Text(
"TODO: why this detection's level is set as it is, and what it implies for response \
urgency."
.to_string(),
),
AdsSection::Response => AdsContent::List(vec![
"TODO: the first triage step when this detection fires.".to_string(),
"TODO: the escalation or containment action.".to_string(),
]),
AdsSection::Goal | AdsSection::Categorization | AdsSection::FalsePositives => {
AdsContent::Text(String::new())
}
}
}
fn non_blank(s: &str) -> Option<&str> {
let t = s.trim();
if t.is_empty() { None } else { Some(t) }
}
fn content_from_value(v: &yaml_serde::Value) -> Option<AdsContent> {
use yaml_serde::Value;
match v {
Value::Sequence(seq) => {
let items: Vec<String> = seq.iter().filter_map(scalar_text).collect();
if items.is_empty() {
None
} else {
Some(AdsContent::List(items))
}
}
other => scalar_text(other).map(AdsContent::Text),
}
}
fn scalar_text(v: &yaml_serde::Value) -> Option<String> {
use yaml_serde::Value;
match v {
Value::String(s) => non_blank(s).map(str::to_string),
Value::Bool(b) => Some(b.to_string()),
Value::Number(n) => Some(n.to_string()),
_ => None,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::parse_sigma_yaml;
fn rule(yaml: &str) -> SigmaRule {
parse_sigma_yaml(yaml).unwrap().rules.pop().unwrap()
}
const FULL_RULE: &str = r#"
title: Whoami execution
description: Detects whoami execution, a common discovery step.
status: stable
logsource:
category: process_creation
product: windows
detection:
selection:
CommandLine|contains: whoami
condition: selection
level: medium
falsepositives:
- Legitimate administrators enumerating their own privileges
tags:
- attack.execution
- attack.t1059
custom_attributes:
rsigma.ads.strategy: Watch for the whoami binary in process creation events.
rsigma.ads.technical_context: Requires process_creation telemetry with CommandLine.
rsigma.ads.blind_spots:
- Renamed whoami binaries evade the image match.
- Assumes CommandLine logging is enabled.
rsigma.ads.validation: Run `whoami` in a lab and confirm the rule fires.
rsigma.ads.priority: Medium because discovery is mid-kill-chain.
rsigma.ads.response:
- Confirm the user and host.
- Correlate with other discovery activity.
"#;
#[test]
fn catalogue_has_nine_sections() {
let cat = ads_catalogue();
assert_eq!(cat.len(), 9);
assert_eq!(ALL_ADS_SECTIONS.len(), 9);
}
#[test]
fn ids_are_unique_and_round_trip() {
use std::collections::HashSet;
let mut seen = HashSet::new();
for &s in AdsSection::all() {
let id = s.id();
assert!(seen.insert(id), "duplicate ADS section id: {id}");
assert_eq!(AdsSection::from_id(id), Some(s));
}
assert_eq!(AdsSection::from_id("nope"), None);
}
#[test]
fn carriers_match_the_schema() {
assert_eq!(AdsSection::Goal.carrier_field(), "description");
assert_eq!(AdsSection::Categorization.carrier_field(), "tags");
assert_eq!(AdsSection::FalsePositives.carrier_field(), "falsepositives");
assert_eq!(AdsSection::Strategy.carrier_field(), "rsigma.ads.strategy");
assert!(matches!(
AdsSection::Goal.carrier(),
AdsCarrier::StandardField(_)
));
assert!(matches!(
AdsSection::Response.carrier(),
AdsCarrier::CustomAttribute(_)
));
}
#[test]
fn full_rule_has_every_section_present() {
let rule = rule(FULL_RULE);
let doc = AdsDocument::from_rule(&rule);
assert!(doc.missing_required().is_empty(), "{doc:?}");
for s in AdsSection::all() {
assert!(s.is_present(&rule), "{} should be present", s.id());
}
}
#[test]
fn reused_fields_satisfy_their_sections() {
let rule = rule(FULL_RULE);
assert!(AdsSection::Goal.is_present(&rule));
assert!(AdsSection::Categorization.is_present(&rule));
assert!(AdsSection::FalsePositives.is_present(&rule));
}
#[test]
fn list_content_preserves_items() {
let rule = rule(FULL_RULE);
match AdsSection::BlindSpots.content(&rule).unwrap() {
AdsContent::List(items) => assert_eq!(items.len(), 2),
other => panic!("expected list, got {other:?}"),
}
}
#[test]
fn bare_rule_is_missing_custom_sections() {
let rule = rule(
r#"
title: Bare
status: stable
logsource:
category: test
detection:
selection:
field: value
condition: selection
"#,
);
let doc = AdsDocument::from_rule(&rule);
let missing = doc.missing_required();
assert_eq!(missing.len(), 9);
}
#[test]
fn scaffold_fills_only_missing_custom_sections() {
let rule = rule(
r#"
title: Partly documented
description: Has a goal already.
status: stable
logsource:
category: test
detection:
selection:
field: value
condition: selection
custom_attributes:
rsigma.ads.strategy: Already written.
"#,
);
let entries = scaffold_missing(&rule);
let keys: Vec<&str> = entries.iter().map(|e| e.key).collect();
assert!(!keys.contains(&"rsigma.ads.strategy"));
assert!(keys.contains(&"rsigma.ads.validation"));
assert!(keys.contains(&"rsigma.ads.response"));
assert_eq!(entries.len(), 5);
}
#[test]
fn categorization_honours_extra_namespaces() {
let rule = rule(
r#"
title: Private taxonomy
status: stable
logsource:
category: test
detection:
selection:
field: value
condition: selection
tags:
- myorg.technique
"#,
);
assert!(!AdsSection::Categorization.is_present(&rule));
assert!(!has_categorization(&rule, &[]));
assert!(has_categorization(&rule, &["myorg".to_string()]));
}
#[test]
fn exempt_flag_is_read() {
let rule = rule(
r#"
title: Vendor import
status: stable
logsource:
category: test
detection:
selection:
field: value
condition: selection
custom_attributes:
rsigma.ads.exempt: true
"#,
);
assert!(is_exempt(&rule));
}
}