rsigma 0.17.0

CLI for parsing, validating, linting and evaluating Sigma detection rules
//! DeTT&CT administration-file emitters.
//!
//! Two typed serde documents, both seeded for analyst review rather than
//! emitted as authoritative:
//!
//! * a data-source administration file (per ATT&CK data source: products,
//!   `available_for_data_analytics`, and a seed `data_quality` block), and
//! * a technique-administration file (per technique: a `visibility`
//!   `score_logbook` carrying the rolled-up 0-to-4 score).
//!
//! The detection half of the technique-administration schema is left to
//! `rule coverage`; an analyst can merge the two in DeTT&CT.

use serde::Serialize;

use super::analysis::{VisibilityAnalysis, level_name};
use super::navigator::DOMAIN;

/// DeTT&CT administration-file schema version these documents target.
const DETTECT_VERSION: f32 = 1.2;

/// Header prepended to every emitted DeTT&CT file, marking the scores as
/// seeds. Kept out of the serde structs so the YAML body stays clean.
pub(crate) const SEED_HEADER: &str = "# Generated by rsigma rule visibility. Scores are conservative seeds derived\n# from observed field coverage; review and tune them in DeTT&CT before use.\n";

// ---------------------------------------------------------------------------
// Data-source administration
// ---------------------------------------------------------------------------

#[derive(Debug, Serialize)]
pub(crate) struct DataSourceAdmin {
    version: f32,
    file_type: &'static str,
    name: String,
    data_sources: Vec<DataSourceEntry>,
}

#[derive(Debug, Serialize)]
struct DataSourceEntry {
    data_source_name: String,
    date_registered: Option<String>,
    date_connected: Option<String>,
    products: Vec<String>,
    available_for_data_analytics: bool,
    comment: String,
    data_quality: DataQuality,
}

/// DeTT&CT data-quality dimensions (0-5). Seeded with the source's visibility
/// score; analysts tune them. Never fabricated beyond the observed signal.
#[derive(Debug, Serialize)]
struct DataQuality {
    device_completeness: u8,
    data_field_completeness: u8,
    timeliness: u8,
    consistency: u8,
    retention: u8,
}

/// Build the DeTT&CT data-source administration document.
pub(crate) fn build_data_source_admin(
    analysis: &VisibilityAnalysis,
    name: &str,
) -> DataSourceAdmin {
    let data_sources = analysis
        .data_sources
        .iter()
        .map(|d| {
            let q = d.score;
            DataSourceEntry {
                data_source_name: d.data_source.clone(),
                date_registered: None,
                date_connected: None,
                products: d.products.clone(),
                available_for_data_analytics: d.available(),
                comment: format!(
                    "seed ({}): logsources [{}]; observed {} of {} mapped rule fields",
                    level_name(d.score),
                    d.logsources.join(", "),
                    d.observed_fields.len(),
                    d.mapped_fields.len(),
                ),
                data_quality: DataQuality {
                    device_completeness: q,
                    data_field_completeness: q,
                    timeliness: q,
                    consistency: q,
                    retention: q,
                },
            }
        })
        .collect();

    DataSourceAdmin {
        version: DETTECT_VERSION,
        file_type: "data-source-administration",
        name: name.to_string(),
        data_sources,
    }
}

// ---------------------------------------------------------------------------
// Technique administration
// ---------------------------------------------------------------------------

#[derive(Debug, Serialize)]
pub(crate) struct TechniqueAdmin {
    version: f32,
    file_type: &'static str,
    name: String,
    domain: &'static str,
    techniques: Vec<TechniqueEntry>,
}

#[derive(Debug, Serialize)]
struct TechniqueEntry {
    technique_id: String,
    visibility: VisibilityBlock,
}

#[derive(Debug, Serialize)]
struct VisibilityBlock {
    score_logbook: Vec<ScoreLog>,
}

#[derive(Debug, Serialize)]
struct ScoreLog {
    date: Option<String>,
    score: u8,
    comment: String,
}

/// Build the DeTT&CT technique-administration document (visibility axis only).
pub(crate) fn build_technique_admin(analysis: &VisibilityAnalysis, name: &str) -> TechniqueAdmin {
    let techniques = analysis
        .techniques
        .iter()
        .map(|t| TechniqueEntry {
            technique_id: t.technique_id.clone(),
            visibility: VisibilityBlock {
                score_logbook: vec![ScoreLog {
                    date: None,
                    score: t.score,
                    comment: format!(
                        "seed ({}): data sources [{}]",
                        level_name(t.score),
                        t.data_sources.join(", "),
                    ),
                }],
            },
        })
        .collect();

    TechniqueAdmin {
        version: DETTECT_VERSION,
        file_type: "technique-administration",
        name: name.to_string(),
        domain: DOMAIN,
        techniques,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::commands::visibility::analysis::{
        DataSourceVisibility, TechniqueVisibility, VisibilityAnalysis,
    };

    fn sample() -> VisibilityAnalysis {
        VisibilityAnalysis {
            data_sources: vec![DataSourceVisibility {
                data_source: "Process".into(),
                score: 4,
                data_components: vec!["Process Creation".into()],
                products: vec!["Windows".into()],
                logsources: vec!["process_creation/windows".into()],
                mapped_fields: vec!["CommandLine".into(), "Image".into()],
                observed_fields: vec!["CommandLine".into(), "Image".into()],
                blind_spot: false,
            }],
            techniques: vec![TechniqueVisibility {
                technique_id: "T1059".into(),
                score: 4,
                data_sources: vec!["Process".into()],
            }],
            untapped: vec![],
            unmapped_logsources: vec![],
            rules_total: 1,
            logsources_total: 1,
            events_observed: 10,
            observed_unique_keys: 2,
            has_observed: true,
        }
    }

    #[test]
    fn data_source_admin_serializes_dettect_shape() {
        let admin = build_data_source_admin(&sample(), "rsigma visibility");
        let yaml = yaml_serde::to_string(&admin).unwrap();
        assert!(yaml.contains("file_type: data-source-administration"));
        assert!(yaml.contains("data_source_name: Process"));
        assert!(yaml.contains("available_for_data_analytics: true"));
        assert!(yaml.contains("device_completeness: 4"));
    }

    #[test]
    fn technique_admin_serializes_score_logbook() {
        let admin = build_technique_admin(&sample(), "rsigma visibility");
        let yaml = yaml_serde::to_string(&admin).unwrap();
        assert!(yaml.contains("file_type: technique-administration"));
        assert!(yaml.contains("domain: enterprise-attack"));
        assert!(yaml.contains("technique_id: T1059"));
        assert!(yaml.contains("score_logbook:"));
    }
}