Skip to main content

rsigma_parser/parser/
detection.rs

1use std::collections::HashMap;
2
3use yaml_serde::Value;
4
5use crate::ast::*;
6use crate::condition::parse_condition;
7use crate::error::{Result, SigmaParserError};
8use crate::value::SigmaValue;
9
10use super::{
11    collect_custom_attributes, get_str, get_str_list, parse_logsource, parse_related, val_key,
12};
13
14// =============================================================================
15// Detection Rule Parsing
16// =============================================================================
17
18/// Parse a detection rule from a YAML value.
19///
20/// Reference: pySigma rule.py SigmaRule.from_yaml / from_dict
21pub(super) fn parse_detection_rule(value: &Value) -> Result<SigmaRule> {
22    let m = value
23        .as_mapping()
24        .ok_or_else(|| SigmaParserError::InvalidRule("Expected a YAML mapping".into()))?;
25
26    let title = get_str(m, "title")
27        .ok_or_else(|| SigmaParserError::MissingField("title".into()))?
28        .to_string();
29
30    let detection_val = m
31        .get(val_key("detection"))
32        .ok_or_else(|| SigmaParserError::MissingField("detection".into()))?;
33    let detection = parse_detections(detection_val)?;
34
35    let logsource = m
36        .get(val_key("logsource"))
37        .map(parse_logsource)
38        .transpose()?
39        .unwrap_or_default();
40
41    // Custom attributes: merge arbitrary top-level keys and the entries of the
42    // dedicated `custom_attributes:` mapping. Entries in `custom_attributes:`
43    // win over a top-level key of the same name (last-write-wins).
44    // Mirrors pySigma's `SigmaRule.custom_attributes` dict.
45    let standard_rule_keys: &[&str] = &[
46        "title",
47        "id",
48        "related",
49        "name",
50        "taxonomy",
51        "status",
52        "description",
53        "license",
54        "author",
55        "references",
56        "date",
57        "modified",
58        "logsource",
59        "detection",
60        "fields",
61        "falsepositives",
62        "level",
63        "tags",
64        "scope",
65        "custom_attributes",
66    ];
67    let custom_attributes = collect_custom_attributes(m, standard_rule_keys);
68
69    Ok(SigmaRule {
70        title,
71        logsource,
72        detection,
73        id: get_str(m, "id").map(|s| s.to_string()),
74        name: get_str(m, "name").map(|s| s.to_string()),
75        related: parse_related(m.get(val_key("related"))),
76        taxonomy: get_str(m, "taxonomy").map(|s| s.to_string()),
77        status: get_str(m, "status").and_then(|s| s.parse().ok()),
78        description: get_str(m, "description").map(|s| s.to_string()),
79        license: get_str(m, "license").map(|s| s.to_string()),
80        author: get_str(m, "author").map(|s| s.to_string()),
81        references: get_str_list(m, "references"),
82        date: get_str(m, "date").map(|s| s.to_string()),
83        modified: get_str(m, "modified").map(|s| s.to_string()),
84        fields: get_str_list(m, "fields"),
85        falsepositives: get_str_list(m, "falsepositives"),
86        level: get_str(m, "level").and_then(|s| s.parse().ok()),
87        tags: get_str_list(m, "tags"),
88        scope: get_str_list(m, "scope"),
89        custom_attributes,
90    })
91}
92
93// =============================================================================
94// Detection Section Parsing
95// =============================================================================
96
97/// Parse the `detection:` section of a rule.
98///
99/// The detection section contains:
100/// - `condition`: string or list of strings
101/// - `timeframe`: optional duration string
102/// - Everything else: named detection identifiers
103///
104/// Reference: pySigma rule/detection.py SigmaDetections.from_dict
105pub(super) fn parse_detections(value: &Value) -> Result<Detections> {
106    let m = value.as_mapping().ok_or_else(|| {
107        SigmaParserError::InvalidDetection("Detection section must be a mapping".into())
108    })?;
109
110    // Extract condition (required)
111    let condition_val = m
112        .get(val_key("condition"))
113        .ok_or_else(|| SigmaParserError::MissingField("condition".into()))?;
114
115    let condition_strings = match condition_val {
116        Value::String(s) => vec![s.clone()],
117        Value::Sequence(seq) => {
118            let mut strings = Vec::with_capacity(seq.len());
119            for v in seq {
120                match v.as_str() {
121                    Some(s) => strings.push(s.to_string()),
122                    None => {
123                        return Err(SigmaParserError::InvalidDetection(format!(
124                            "condition list items must be strings, got: {v:?}"
125                        )));
126                    }
127                }
128            }
129            strings
130        }
131        _ => {
132            return Err(SigmaParserError::InvalidDetection(
133                "condition must be a string or list of strings".into(),
134            ));
135        }
136    };
137
138    // Parse each condition string
139    let conditions: Vec<ConditionExpr> = condition_strings
140        .iter()
141        .map(|s| parse_condition(s))
142        .collect::<Result<Vec<_>>>()?;
143
144    // Extract optional timeframe
145    let timeframe = get_str(m, "timeframe").map(|s| s.to_string());
146
147    // Parse all named detections (everything except condition and timeframe)
148    let mut named = HashMap::new();
149    for (key, val) in m {
150        let key_str = key.as_str().unwrap_or("");
151        if key_str == "condition" || key_str == "timeframe" {
152            continue;
153        }
154        named.insert(key_str.to_string(), parse_detection(val)?);
155    }
156
157    Ok(Detections {
158        named,
159        conditions,
160        condition_strings,
161        timeframe,
162    })
163}
164
165/// Parse a single named detection definition.
166///
167/// A detection can be:
168/// 1. A mapping (key-value pairs, AND-linked)
169/// 2. A list of plain values (keyword detection)
170/// 3. A list of mappings (OR-linked sub-detections)
171///
172/// Reference: pySigma rule/detection.py SigmaDetection.from_definition
173fn parse_detection(value: &Value) -> Result<Detection> {
174    match value {
175        Value::Mapping(m) => {
176            // Case 1: key-value mapping → AND-linked detection items
177            let items: Vec<DetectionItem> = m
178                .iter()
179                .map(|(k, v)| parse_detection_item(k.as_str().unwrap_or(""), v))
180                .collect::<Result<Vec<_>>>()?;
181            Ok(Detection::AllOf(items))
182        }
183        Value::Sequence(seq) => {
184            // Check if all items are plain values (strings/numbers/etc.)
185            let all_plain = seq.iter().all(|v| !v.is_mapping() && !v.is_sequence());
186            if all_plain {
187                // Case 2: list of plain values → keyword detection
188                let values = seq.iter().map(SigmaValue::from_yaml).collect();
189                Ok(Detection::Keywords(values))
190            } else {
191                // Case 3: list of mappings → OR-linked sub-detections
192                let subs: Vec<Detection> = seq
193                    .iter()
194                    .map(parse_detection)
195                    .collect::<Result<Vec<_>>>()?;
196                Ok(Detection::AnyOf(subs))
197            }
198        }
199        // Plain value → single keyword
200        _ => Ok(Detection::Keywords(vec![SigmaValue::from_yaml(value)])),
201    }
202}
203
204/// Parse a single detection item from a key-value pair.
205///
206/// The key contains the field name and optional modifiers separated by `|`:
207/// - `EventType` → field="EventType", no modifiers
208/// - `TargetObject|endswith` → field="TargetObject", modifiers=[EndsWith]
209/// - `Destination|contains|all` → field="Destination", modifiers=[Contains, All]
210///
211/// Reference: pySigma rule/detection.py SigmaDetectionItem.from_mapping
212fn parse_detection_item(key: &str, value: &Value) -> Result<DetectionItem> {
213    let field = parse_field_spec(key)?;
214
215    let values = match value {
216        Value::Sequence(seq) => seq.iter().map(|v| to_sigma_value(v, &field)).collect(),
217        _ => vec![to_sigma_value(value, &field)],
218    };
219
220    Ok(DetectionItem { field, values })
221}
222
223/// Convert a YAML value to a SigmaValue, respecting field modifiers.
224///
225/// When the `re` modifier is present, strings are treated as raw (no wildcard parsing).
226fn to_sigma_value(v: &Value, field: &FieldSpec) -> SigmaValue {
227    if field.has_modifier(Modifier::Re)
228        && let Value::String(s) = v
229    {
230        return SigmaValue::from_raw_string(s);
231    }
232    SigmaValue::from_yaml(v)
233}
234
235/// Parse a field specification string like `"TargetObject|endswith"`.
236///
237/// Reference: pySigma rule/detection.py — `field, *modifier_ids = key.split("|")`
238pub fn parse_field_spec(key: &str) -> Result<FieldSpec> {
239    if key.is_empty() {
240        return Ok(FieldSpec::new(None, Vec::new()));
241    }
242
243    let parts: Vec<&str> = key.split('|').collect();
244    let field_name = parts[0];
245    let field = if field_name.is_empty() {
246        None
247    } else {
248        Some(field_name.to_string())
249    };
250
251    let mut modifiers = Vec::new();
252    for &mod_str in &parts[1..] {
253        // Sigma reserves `not` for condition expressions; it is not a value
254        // modifier. Catch this idiom up front so the diagnostic explains
255        // the workaround instead of just saying "unknown modifier".
256        if mod_str == "not" {
257            return Err(SigmaParserError::NotIsNotAModifier);
258        }
259        let m = mod_str
260            .parse::<Modifier>()
261            .map_err(|_| SigmaParserError::UnknownModifier(mod_str.to_string()))?;
262        modifiers.push(m);
263    }
264
265    Ok(FieldSpec::new(field, modifiers))
266}