Skip to main content

rsigma_convert/backends/postgres/
mod.rs

1//! PostgreSQL/TimescaleDB backend for Sigma rule conversion.
2//!
3//! Converts Sigma detection rules into PostgreSQL SQL queries, leveraging
4//! PostgreSQL-native features: `ILIKE` for case-insensitive matching,
5//! `~*`/`~` for regex, `inet`/`cidr` for network address matching,
6//! `tsvector`/`tsquery` for full-text keyword search, and JSONB for
7//! semi-structured event data.
8
9mod correlation;
10#[cfg(test)]
11mod tests;
12
13use std::collections::HashMap;
14use std::sync::LazyLock;
15
16use regex::Regex;
17use rsigma_eval::pipeline::state::PipelineState;
18use rsigma_parser::*;
19
20use crate::backend::*;
21use crate::condition::convert_condition_expr;
22use crate::convert::{default_convert_detection, default_convert_detection_item};
23use crate::error::{ConvertError, Result};
24use crate::state::{ConversionState, ConvertResult};
25
26fn validate_sql_identifier(s: &str) -> Result<()> {
27    static RE: LazyLock<Regex> =
28        LazyLock::new(|| Regex::new(r"^[A-Za-z_][A-Za-z0-9_$]*$").unwrap());
29    if RE.is_match(s) {
30        Ok(())
31    } else {
32        Err(ConvertError::InvalidIdentifier(s.to_string()))
33    }
34}
35
36// =============================================================================
37// PostgreSQL TextQueryConfig
38// =============================================================================
39
40pub static POSTGRES_CONFIG: TextQueryConfig = TextQueryConfig {
41    precedence: (TokenType::NOT, TokenType::AND, TokenType::OR),
42    group_expression: "({expr})",
43    token_separator: " ",
44
45    and_token: "AND",
46    or_token: "OR",
47    not_token: "NOT",
48    eq_token: " = ",
49
50    not_eq_token: Some(" <> "),
51    eq_expression: None,
52    not_eq_expression: None,
53    convert_not_as_not_eq: false,
54
55    wildcard_multi: "%",
56    wildcard_single: "_",
57
58    str_quote: "'",
59    str_quote_pattern: None,
60    str_quote_pattern_negation: false,
61    escape_char: "'",
62    add_escaped: &[],
63    filter_chars: &[],
64
65    field_quote: Some("\""),
66    field_quote_pattern: Some(r"^[a-z_][a-z0-9_]*$"),
67    field_quote_pattern_negation: true,
68    field_escape: None,
69    field_escape_pattern: None,
70
71    startswith_expression: Some("{field} ILIKE {value}"),
72    not_startswith_expression: Some("{field} NOT ILIKE {value}"),
73    startswith_expression_allow_special: false,
74    endswith_expression: Some("{field} ILIKE {value}"),
75    not_endswith_expression: Some("{field} NOT ILIKE {value}"),
76    endswith_expression_allow_special: false,
77    contains_expression: Some("{field} ILIKE {value}"),
78    not_contains_expression: Some("{field} NOT ILIKE {value}"),
79    contains_expression_allow_special: false,
80    wildcard_match_expression: Some("{field} ILIKE {value}"),
81
82    case_sensitive_match_expression: Some("{field} LIKE {value}"),
83    case_sensitive_startswith_expression: Some("{field} LIKE {value}"),
84    case_sensitive_endswith_expression: Some("{field} LIKE {value}"),
85    case_sensitive_contains_expression: Some("{field} LIKE {value}"),
86
87    re_expression: Some("{field} ~* {regex}"),
88    not_re_expression: Some("{field} !~* {regex}"),
89    re_escape_char: None,
90    re_escape: &[],
91    re_escape_escape_char: None,
92
93    cidr_expression: Some("({field})::inet <<= {value}::cidr"),
94    not_cidr_expression: Some("NOT (({field})::inet <<= {value}::cidr)"),
95
96    field_null_expression: "{field} IS NULL",
97    field_exists_expression: Some("{field} IS NOT NULL"),
98    field_not_exists_expression: Some("{field} IS NULL"),
99
100    compare_op_expression: Some("{field} {op} {value}"),
101    compare_ops: &[("gt", ">"), ("gte", ">="), ("lt", "<"), ("lte", "<=")],
102
103    convert_or_as_in: true,
104    convert_and_as_in: false,
105    in_expressions_allow_wildcards: false,
106    field_in_list_expression: Some("{field} {op} ({list})"),
107    or_in_operator: Some("IN"),
108    and_in_operator: None,
109    list_separator: ", ",
110
111    unbound_value_str_expression: None,
112    unbound_value_num_expression: None,
113    unbound_value_re_expression: None,
114
115    field_eq_field_expression: Some("{field1} = {field2}"),
116    field_eq_field_escaping_quoting: true,
117
118    deferred_start: None,
119    deferred_separator: None,
120    deferred_only_query: "",
121
122    bool_true: "true",
123    bool_false: "false",
124    query_expression: "SELECT * FROM {table} WHERE {query}",
125    state_defaults: &[("table", "security_events")],
126};
127
128// =============================================================================
129// PostgresBackend
130// =============================================================================
131
132/// PostgreSQL/TimescaleDB backend for Sigma rule conversion.
133pub struct PostgresBackend {
134    pub config: &'static TextQueryConfig,
135    /// Default table name (overridden by pipeline state `table` key).
136    pub table: String,
137    /// Timestamp column name for time-windowed queries.
138    pub timestamp_field: String,
139    /// If set, fields are accessed via JSONB extraction (`metadata->>'fieldName'`).
140    pub json_field: Option<String>,
141    /// Use case-sensitive regex (`~`) instead of case-insensitive (`~*`).
142    pub case_sensitive_re: bool,
143    /// PostgreSQL schema name (e.g. `public`).
144    pub schema: Option<String>,
145    /// PostgreSQL database name (connection-level metadata, not used in queries).
146    pub database: Option<String>,
147    /// Enable TimescaleDB-specific features.
148    pub timescaledb: bool,
149}
150
151impl PostgresBackend {
152    pub fn new() -> Self {
153        Self {
154            config: &POSTGRES_CONFIG,
155            table: "security_events".to_string(),
156            timestamp_field: "time".to_string(),
157            json_field: None,
158            case_sensitive_re: false,
159            schema: None,
160            database: None,
161            timescaledb: false,
162        }
163    }
164
165    /// Create a backend from CLI-style key=value option pairs.
166    ///
167    /// Recognized keys: `table`, `schema`, `database`, `timestamp_field`,
168    /// `json_field`, `case_sensitive_re` (true/false).
169    /// Unknown keys are silently ignored so forward-compatible options can be
170    /// added without breaking existing invocations.
171    pub fn from_options(options: &HashMap<String, String>) -> Self {
172        let mut backend = Self::new();
173        if let Some(v) = options.get("table") {
174            backend.table = v.clone();
175        }
176        if let Some(v) = options.get("schema") {
177            backend.schema = Some(v.clone());
178        }
179        if let Some(v) = options.get("database") {
180            backend.database = Some(v.clone());
181        }
182        if let Some(v) = options.get("timestamp_field") {
183            backend.timestamp_field = v.clone();
184        }
185        if let Some(v) = options.get("json_field") {
186            backend.json_field = Some(v.clone());
187        }
188        if let Some(v) = options.get("case_sensitive_re") {
189            backend.case_sensitive_re = v == "true";
190        }
191        backend
192    }
193
194    /// Resolve the fully qualified table name `[schema.]table` using this
195    /// precedence for each component:
196    ///
197    /// **table**: `custom_attributes["postgres.table"]` > `state["table"]` > `self.table`
198    /// **schema**: `custom_attributes["postgres.schema"]` > `state["schema"]` > `self.schema`
199    fn resolve_table(
200        &self,
201        custom_attrs: &HashMap<String, yaml_serde::Value>,
202        state: &HashMap<String, serde_json::Value>,
203    ) -> Result<String> {
204        let table = custom_attrs
205            .get("postgres.table")
206            .and_then(|v| v.as_str())
207            .or(state.get("table").and_then(|v| v.as_str()))
208            .unwrap_or(&self.table);
209        validate_sql_identifier(table)?;
210
211        let schema = custom_attrs
212            .get("postgres.schema")
213            .and_then(|v| v.as_str())
214            .or(state.get("schema").and_then(|v| v.as_str()))
215            .or(self.schema.as_deref())
216            .filter(|s| !s.is_empty());
217
218        if let Some(s) = schema {
219            validate_sql_identifier(s)?;
220            Ok(format!("{s}.{table}"))
221        } else {
222            Ok(table.to_string())
223        }
224    }
225
226    /// Qualify a raw table name with schema. Precedence:
227    /// `per_rule_schema` > `state["schema"]` > `self.schema` > none.
228    fn qualify_table_name(
229        &self,
230        table: &str,
231        state: &HashMap<String, serde_json::Value>,
232        per_rule_schema: Option<&str>,
233    ) -> Result<String> {
234        validate_sql_identifier(table)?;
235
236        let schema = per_rule_schema
237            .or(state.get("schema").and_then(|v| v.as_str()))
238            .or(self.schema.as_deref())
239            .filter(|s| !s.is_empty());
240
241        if let Some(s) = schema {
242            validate_sql_identifier(s)?;
243            Ok(format!("{s}.{table}"))
244        } else {
245            Ok(table.to_string())
246        }
247    }
248
249    fn field_expr(&self, field: &str) -> Result<String> {
250        match &self.json_field {
251            Some(json_col) if field.contains('.') => {
252                let parts: Vec<&str> = field.split('.').collect();
253                let last = parts.len() - 1;
254                let mut expr = json_col.clone();
255                for (i, part) in parts.iter().enumerate() {
256                    validate_sql_identifier(part)?;
257                    let escaped = part.replace('\'', "''");
258                    if i == last {
259                        expr.push_str(&format!("->>'{escaped}'"));
260                    } else {
261                        expr.push_str(&format!("->'{escaped}'"));
262                    }
263                }
264                Ok(expr)
265            }
266            Some(json_col) => {
267                validate_sql_identifier(field)?;
268                let escaped = field.replace('\'', "''");
269                Ok(format!("{json_col}->>'{escaped}'"))
270            }
271            None => Ok(text_escape_and_quote_field(self.config, field)),
272        }
273    }
274
275    /// Escape a string value for use in a SQL single-quoted literal.
276    /// PostgreSQL uses `''` to escape single quotes inside string literals.
277    fn escape_sql_str(&self, s: &str) -> String {
278        s.replace('\'', "''")
279    }
280
281    /// Build a SigmaString value into a SQL string literal with proper escaping
282    /// and wildcard translation for LIKE/ILIKE.
283    fn build_like_value(&self, value: &SigmaString) -> String {
284        let mut result = String::with_capacity(value.original.len() + 2);
285        result.push('\'');
286        for part in &value.parts {
287            match part {
288                StringPart::Plain(s) => {
289                    for ch in s.chars() {
290                        match ch {
291                            '\'' => result.push_str("''"),
292                            '%' => result.push_str("\\%"),
293                            '_' => result.push_str("\\_"),
294                            '\\' => result.push_str("\\\\"),
295                            _ => result.push(ch),
296                        }
297                    }
298                }
299                StringPart::Special(SpecialChar::WildcardMulti) => result.push('%'),
300                StringPart::Special(SpecialChar::WildcardSingle) => result.push('_'),
301            }
302        }
303        result.push('\'');
304        result
305    }
306
307    /// Add `%` wildcards to a LIKE value based on modifier semantics.
308    /// The value is already a quoted `'...'` string from `build_like_value`.
309    fn wrap_like_wildcards(
310        &self,
311        quoted: &str,
312        is_contains: bool,
313        is_startswith: bool,
314        is_endswith: bool,
315    ) -> String {
316        if !is_contains && !is_startswith && !is_endswith {
317            return quoted.to_string();
318        }
319        let inner = &quoted[1..quoted.len() - 1];
320        let prefix = if is_contains || is_endswith { "%" } else { "" };
321        let suffix = if is_contains || is_startswith {
322            "%"
323        } else {
324            ""
325        };
326        format!("'{prefix}{inner}{suffix}'")
327    }
328
329    /// Build a plain SQL string literal from a SigmaString (no wildcards).
330    fn build_plain_value(&self, value: &SigmaString) -> String {
331        let plain = value.as_plain().unwrap_or_else(|| value.original.clone());
332        format!("'{}'", self.escape_sql_str(&plain))
333    }
334}
335
336impl Default for PostgresBackend {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342impl Backend for PostgresBackend {
343    fn name(&self) -> &str {
344        "postgres"
345    }
346
347    fn formats(&self) -> &[(&str, &str)] {
348        &[
349            ("default", "Plain PostgreSQL SQL"),
350            ("view", "CREATE OR REPLACE VIEW for each rule"),
351            (
352                "timescaledb",
353                "TimescaleDB-optimized queries with time_bucket()",
354            ),
355            (
356                "continuous_aggregate",
357                "CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)",
358            ),
359            (
360                "sliding_window",
361                "Correlation queries using window functions for per-row sliding detection",
362            ),
363        ]
364    }
365
366    fn requires_pipeline(&self) -> bool {
367        false
368    }
369
370    // --- Detection rule conversion ---
371
372    fn convert_rule(
373        &self,
374        rule: &SigmaRule,
375        output_format: &str,
376        pipeline_state: &PipelineState,
377    ) -> Result<Vec<String>> {
378        let mut queries = Vec::new();
379        for (idx, cond_expr) in rule.detection.conditions.iter().enumerate() {
380            let mut state = ConversionState::new(pipeline_state.state.clone());
381            state
382                .processing_state
383                .insert("_output_format".to_string(), output_format.into());
384            let query = self.convert_condition(cond_expr, &rule.detection.named, &mut state)?;
385            let finished = self.finish_query(rule, query, &state)?;
386            let finalized = self.finalize_query(rule, finished, idx, &state, output_format)?;
387            queries.push(finalized);
388        }
389        Ok(queries)
390    }
391
392    // --- Condition tree dispatch ---
393
394    fn convert_condition(
395        &self,
396        expr: &ConditionExpr,
397        detections: &HashMap<String, Detection>,
398        state: &mut ConversionState,
399    ) -> Result<String> {
400        convert_condition_expr(self, expr, detections, state)
401    }
402
403    fn convert_condition_and(&self, exprs: &[String]) -> Result<String> {
404        Ok(text_convert_condition_and(self.config, exprs))
405    }
406
407    fn convert_condition_or(&self, exprs: &[String]) -> Result<String> {
408        Ok(text_convert_condition_or(self.config, exprs))
409    }
410
411    fn convert_condition_not(&self, expr: &str) -> Result<String> {
412        Ok(text_convert_condition_not(self.config, expr))
413    }
414
415    // --- Detection ---
416
417    fn convert_detection(&self, det: &Detection, state: &mut ConversionState) -> Result<String> {
418        default_convert_detection(self, det, state)
419    }
420
421    fn convert_detection_item(
422        &self,
423        item: &DetectionItem,
424        state: &mut ConversionState,
425    ) -> Result<String> {
426        default_convert_detection_item(self, item, state)
427    }
428
429    // --- Field/value escaping ---
430
431    fn escape_and_quote_field(&self, field: &str) -> String {
432        self.field_expr(field)
433            .unwrap_or_else(|_| text_escape_and_quote_field(self.config, field))
434    }
435
436    fn convert_value_str(&self, value: &SigmaString, _state: &ConversionState) -> String {
437        self.build_like_value(value)
438    }
439
440    fn convert_value_re(&self, regex: &str, _state: &ConversionState) -> String {
441        format!("'{}'", self.escape_sql_str(regex))
442    }
443
444    // --- Value-type-specific methods ---
445
446    fn convert_field_eq_str(
447        &self,
448        field: &str,
449        value: &SigmaString,
450        modifiers: &[Modifier],
451        _state: &mut ConversionState,
452    ) -> Result<ConvertResult> {
453        let f = self.field_expr(field)?;
454        let is_cased = modifiers.contains(&Modifier::Cased);
455        let is_contains = modifiers.contains(&Modifier::Contains);
456        let is_startswith = modifiers.contains(&Modifier::StartsWith);
457        let is_endswith = modifiers.contains(&Modifier::EndsWith);
458        let has_wildcards = value.contains_wildcards();
459
460        let like_op = if is_cased { "LIKE" } else { "ILIKE" };
461
462        if is_contains || is_startswith || is_endswith || has_wildcards {
463            let inner = self.build_like_value(value);
464            let val = self.wrap_like_wildcards(&inner, is_contains, is_startswith, is_endswith);
465            return Ok(ConvertResult::Query(format!("{f} {like_op} {val}")));
466        }
467
468        let val = self.build_plain_value(value);
469        Ok(ConvertResult::Query(format!("{f} = {val}")))
470    }
471
472    fn convert_field_eq_str_case_sensitive(
473        &self,
474        field: &str,
475        value: &SigmaString,
476        modifiers: &[Modifier],
477        state: &mut ConversionState,
478    ) -> Result<ConvertResult> {
479        let mut mods = modifiers.to_vec();
480        if !mods.contains(&Modifier::Cased) {
481            mods.push(Modifier::Cased);
482        }
483        self.convert_field_eq_str(field, value, &mods, state)
484    }
485
486    fn convert_field_eq_num(
487        &self,
488        field: &str,
489        value: f64,
490        _state: &mut ConversionState,
491    ) -> Result<String> {
492        let f = self.field_expr(field)?;
493        if value.fract() == 0.0 && (i64::MIN as f64..=i64::MAX as f64).contains(&value) {
494            Ok(format!("{f} = {}", value as i64))
495        } else {
496            Ok(format!("{f} = {value}"))
497        }
498    }
499
500    fn convert_field_eq_bool(
501        &self,
502        field: &str,
503        value: bool,
504        _state: &mut ConversionState,
505    ) -> Result<String> {
506        let f = self.field_expr(field)?;
507        let v = if value {
508            self.config.bool_true
509        } else {
510            self.config.bool_false
511        };
512        Ok(format!("{f} = {v}"))
513    }
514
515    fn convert_field_eq_null(&self, field: &str, _state: &mut ConversionState) -> Result<String> {
516        let f = self.field_expr(field)?;
517        Ok(format!("{f} IS NULL"))
518    }
519
520    fn convert_field_eq_re(
521        &self,
522        field: &str,
523        pattern: &str,
524        flags: &[Modifier],
525        _state: &mut ConversionState,
526    ) -> Result<ConvertResult> {
527        let f = self.field_expr(field)?;
528        let escaped_pattern = self.escape_sql_str(pattern);
529        let is_cased = flags.contains(&Modifier::Cased) || self.case_sensitive_re;
530        let op = if is_cased { "~" } else { "~*" };
531        Ok(ConvertResult::Query(format!(
532            "{f} {op} '{escaped_pattern}'"
533        )))
534    }
535
536    fn convert_field_eq_cidr(
537        &self,
538        field: &str,
539        cidr: &str,
540        _state: &mut ConversionState,
541    ) -> Result<ConvertResult> {
542        let f = self.field_expr(field)?;
543        Ok(ConvertResult::Query(format!(
544            "({f})::inet <<= '{cidr}'::cidr"
545        )))
546    }
547
548    fn convert_field_compare(
549        &self,
550        field: &str,
551        op: &Modifier,
552        value: f64,
553        _state: &mut ConversionState,
554    ) -> Result<String> {
555        let f = self.field_expr(field)?;
556        let op_token = match op {
557            Modifier::Lt => "<",
558            Modifier::Lte => "<=",
559            Modifier::Gt => ">",
560            Modifier::Gte => ">=",
561            _ => {
562                return Err(ConvertError::UnsupportedModifier(format!(
563                    "compare op {op:?}"
564                )));
565            }
566        };
567        let val_str =
568            if value.fract() == 0.0 && (i64::MIN as f64..=i64::MAX as f64).contains(&value) {
569                (value as i64).to_string()
570            } else {
571                value.to_string()
572            };
573        Ok(format!("{f} {op_token} {val_str}"))
574    }
575
576    fn convert_field_exists(
577        &self,
578        field: &str,
579        exists: bool,
580        _state: &mut ConversionState,
581    ) -> Result<String> {
582        let f = self.field_expr(field)?;
583        if exists {
584            Ok(format!("{f} IS NOT NULL"))
585        } else {
586            Ok(format!("{f} IS NULL"))
587        }
588    }
589
590    fn convert_field_eq_query_expr(
591        &self,
592        field: &str,
593        expr: &str,
594        _id: &str,
595        _state: &mut ConversionState,
596    ) -> Result<String> {
597        let f = self.field_expr(field)?;
598        let resolved = expr.replace("{field}", &f);
599        Ok(resolved)
600    }
601
602    fn convert_field_ref(
603        &self,
604        field1: &str,
605        field2: &str,
606        _state: &mut ConversionState,
607    ) -> Result<ConvertResult> {
608        let f1 = self.field_expr(field1)?;
609        let f2 = self.field_expr(field2)?;
610        Ok(ConvertResult::Query(format!("{f1} = {f2}")))
611    }
612
613    fn convert_keyword(&self, value: &SigmaValue, _state: &mut ConversionState) -> Result<String> {
614        let search_target = match &self.json_field {
615            Some(json_col) => format!("{json_col}::text"),
616            None => "ROW(*)::text".to_string(),
617        };
618        match value {
619            SigmaValue::String(s) => {
620                let plain = s.as_plain().unwrap_or_else(|| s.original.clone());
621                if plain.is_empty() {
622                    return Err(ConvertError::UnsupportedKeyword);
623                }
624                let escaped = self.escape_sql_str(&plain);
625                Ok(format!(
626                    "to_tsvector('simple', {search_target}) @@ plainto_tsquery('simple', '{escaped}')"
627                ))
628            }
629            SigmaValue::Integer(n) => Ok(format!(
630                "to_tsvector('simple', {search_target}) @@ plainto_tsquery('simple', '{n}')"
631            )),
632            SigmaValue::Float(f) => Ok(format!(
633                "to_tsvector('simple', {search_target}) @@ plainto_tsquery('simple', '{f}')"
634            )),
635            _ => Err(ConvertError::UnsupportedKeyword),
636        }
637    }
638
639    fn convert_condition_as_in_expression(
640        &self,
641        field: &str,
642        values: &[&SigmaValue],
643        is_or: bool,
644        _state: &mut ConversionState,
645    ) -> Result<String> {
646        if !is_or {
647            return Err(ConvertError::UnsupportedModifier(
648                "AND IN-list not supported for PostgreSQL".into(),
649            ));
650        }
651        let f = self.field_expr(field)?;
652        let items: Vec<String> = values
653            .iter()
654            .map(|v| match v {
655                SigmaValue::String(s) => self.build_plain_value(s),
656                SigmaValue::Integer(n) => n.to_string(),
657                SigmaValue::Float(f) => f.to_string(),
658                _ => String::new(),
659            })
660            .collect();
661        let list = items.join(", ");
662        Ok(format!("{f} IN ({list})"))
663    }
664
665    // --- Query finalization ---
666
667    fn finish_query(
668        &self,
669        rule: &SigmaRule,
670        query: String,
671        state: &ConversionState,
672    ) -> Result<String> {
673        let qualified = self.resolve_table(&rule.custom_attributes, &state.processing_state)?;
674
675        let is_timescaledb = state
676            .get_state_str("_output_format")
677            .is_some_and(|f| f == "timescaledb" || f == "continuous_aggregate");
678
679        let base_cols = if rule.fields.is_empty() {
680            "*".to_string()
681        } else {
682            rule.fields
683                .iter()
684                .map(|f| self.format_select_field(f))
685                .collect::<Result<Vec<_>>>()?
686                .join(", ")
687        };
688
689        let select_cols = if is_timescaledb {
690            format!(
691                "time_bucket('1 hour', {}) AS bucket, {}",
692                self.timestamp_field, base_cols
693            )
694        } else {
695            base_cols
696        };
697
698        let custom_tmpl = state.get_state_str("query_expression_template");
699
700        let effective_tmpl = match custom_tmpl {
701            Some(t) => t.to_string(),
702            None => format!("SELECT {select_cols} FROM {{table}} WHERE {{query}}"),
703        };
704
705        let mut result = effective_tmpl.replace("{query}", &query);
706        result = result.replace("{table}", &qualified);
707        result = result.replace("{rule.title}", &rule.title);
708        if let Some(id) = &rule.id {
709            result = result.replace("{rule.id}", id);
710        }
711
712        Ok(result)
713    }
714
715    fn finalize_query(
716        &self,
717        rule: &SigmaRule,
718        query: String,
719        _index: usize,
720        _state: &ConversionState,
721        output_format: &str,
722    ) -> Result<String> {
723        let view_name = || {
724            let raw = match &rule.id {
725                Some(id) => id.replace('-', "_"),
726                None => rule.title.to_lowercase().replace([' ', '-'], "_"),
727            };
728            let sanitized: String = raw
729                .chars()
730                .filter(|c| c.is_ascii_alphanumeric() || *c == '_')
731                .collect();
732            if sanitized.is_empty() {
733                "sigma_rule".to_string()
734            } else {
735                format!("sigma_{sanitized}")
736            }
737        };
738
739        match output_format {
740            "default" | "timescaledb" => Ok(query),
741            "view" => Ok(format!("CREATE OR REPLACE VIEW {} AS {query}", view_name())),
742            "continuous_aggregate" => Ok(format!(
743                "CREATE MATERIALIZED VIEW {} \
744                 WITH (timescaledb.continuous) AS {query} \
745                 WITH NO DATA",
746                view_name()
747            )),
748            other => Err(ConvertError::RuleConversion(format!(
749                "unknown output format: {other}"
750            ))),
751        }
752    }
753
754    fn finalize_output(&self, queries: Vec<String>, output_format: &str) -> Result<String> {
755        let sep = match output_format {
756            "view" | "continuous_aggregate" => ";\n\n",
757            _ => "\n",
758        };
759        Ok(queries.join(sep))
760    }
761
762    // --- Correlation ---
763
764    fn supports_correlation(&self) -> bool {
765        true
766    }
767
768    fn convert_correlation_rule(
769        &self,
770        rule: &CorrelationRule,
771        output_format: &str,
772        pipeline_state: &PipelineState,
773    ) -> Result<Vec<String>> {
774        let table = self.resolve_table(&rule.custom_attributes, &pipeline_state.state)?;
775        let ts = &self.timestamp_field;
776        let use_time_bucket =
777            output_format == "timescaledb" || output_format == "continuous_aggregate";
778
779        let mut group_by_cols: Vec<String> = rule
780            .group_by
781            .iter()
782            .map(|g| self.field_expr(g))
783            .collect::<Result<_>>()?;
784        if use_time_bucket {
785            group_by_cols.insert(0, format!("time_bucket('1 hour', {ts})"));
786        }
787        let group_by_clause = if group_by_cols.is_empty() {
788            String::new()
789        } else {
790            format!(" GROUP BY {}", group_by_cols.join(", "))
791        };
792        let group_by_select = if group_by_cols.is_empty() {
793            String::new()
794        } else {
795            format!("{}, ", group_by_cols.join(", "))
796        };
797
798        let window_secs = rule.timespan.seconds;
799        let having_clause = self.build_having_clause(&rule.condition)?;
800
801        let field_from_condition = match &rule.condition {
802            CorrelationCondition::Threshold { field, .. } => {
803                field.as_ref().and_then(|f| f.first().cloned())
804            }
805            _ => None,
806        };
807        let value_field = field_from_condition.as_deref().or_else(|| {
808            rule.aliases
809                .first()
810                .and_then(|a| a.mapping.values().next().map(|s| s.as_str()))
811        });
812
813        // Build per-rule table mapping from _rule_tables injected by convert_collection
814        let rule_tables: HashMap<String, String> = pipeline_state
815            .state
816            .get("_rule_tables")
817            .and_then(|v| serde_json::from_value(v.clone()).ok())
818            .unwrap_or_default();
819
820        // Per-rule converted queries for CTE-based pre-filtering
821        let rule_queries: HashMap<String, String> = pipeline_state
822            .state
823            .get("_rule_queries")
824            .and_then(|v| serde_json::from_value(v.clone()).ok())
825            .unwrap_or_default();
826
827        let (cte_prefix, source_table, time_filter) =
828            self.build_correlation_source(&rule.rules, &rule_queries, &table, ts, window_secs);
829
830        let query = match rule.correlation_type {
831            CorrelationType::EventCount if output_format == "sliding_window" => self
832                .build_sliding_window_query(
833                    &cte_prefix,
834                    &source_table,
835                    &time_filter,
836                    &rule.group_by,
837                    ts,
838                    window_secs,
839                    &rule.condition,
840                )?,
841            CorrelationType::EventCount => {
842                format!(
843                    "{cte_prefix}SELECT {group_by_select}COUNT(*) AS event_count \
844                     FROM {source_table}\
845                     {time_filter}\
846                     {group_by_clause} \
847                     HAVING {having_clause}",
848                    having_clause = having_clause.replace("{agg}", "COUNT(*)")
849                )
850            }
851            CorrelationType::ValueCount => {
852                let field = match value_field {
853                    Some(f) => self.field_expr(f)?,
854                    None => "'unknown_field'".to_string(),
855                };
856                let agg = format!("COUNT(DISTINCT {field})");
857                format!(
858                    "{cte_prefix}SELECT {group_by_select}{agg} AS value_count \
859                     FROM {source_table}\
860                     {time_filter}\
861                     {group_by_clause} \
862                     HAVING {having_clause}",
863                    having_clause = having_clause.replace("{agg}", &agg)
864                )
865            }
866            CorrelationType::Temporal | CorrelationType::TemporalOrdered => self
867                .build_temporal_query(
868                    rule,
869                    &table,
870                    ts,
871                    window_secs,
872                    &group_by_select,
873                    &group_by_clause,
874                    &having_clause,
875                    &rule_tables,
876                    pipeline_state,
877                )?,
878            CorrelationType::ValueSum => {
879                let field = match value_field {
880                    Some(f) => self.field_expr(f)?,
881                    None => "'unknown_field'".to_string(),
882                };
883                let agg = format!("SUM({field})");
884                format!(
885                    "{cte_prefix}SELECT {group_by_select}{agg} AS value_sum \
886                     FROM {source_table}\
887                     {time_filter}\
888                     {group_by_clause} \
889                     HAVING {having_clause}",
890                    having_clause = having_clause.replace("{agg}", &agg)
891                )
892            }
893            CorrelationType::ValueAvg => {
894                let field = match value_field {
895                    Some(f) => self.field_expr(f)?,
896                    None => "'unknown_field'".to_string(),
897                };
898                let agg = format!("AVG({field})");
899                format!(
900                    "{cte_prefix}SELECT {group_by_select}{agg} AS value_avg \
901                     FROM {source_table}\
902                     {time_filter}\
903                     {group_by_clause} \
904                     HAVING {having_clause}",
905                    having_clause = having_clause.replace("{agg}", &agg)
906                )
907            }
908            CorrelationType::ValuePercentile | CorrelationType::ValueMedian => {
909                let field = match value_field {
910                    Some(f) => self.field_expr(f)?,
911                    None => "'unknown_field'".to_string(),
912                };
913                let percentile = if rule.correlation_type == CorrelationType::ValueMedian {
914                    0.5
915                } else {
916                    match &rule.condition {
917                        CorrelationCondition::Threshold { percentile, .. } => {
918                            percentile.map(|p| p as f64 / 100.0).unwrap_or(0.95)
919                        }
920                        _ => 0.95,
921                    }
922                };
923                let agg = format!("PERCENTILE_CONT({percentile}) WITHIN GROUP (ORDER BY {field})");
924                format!(
925                    "{cte_prefix}SELECT {group_by_select}\
926                     {agg} AS pct_value \
927                     FROM {source_table}\
928                     {time_filter}\
929                     {group_by_clause} \
930                     HAVING {having_clause}",
931                    having_clause = having_clause.replace("{agg}", &agg)
932                )
933            }
934        };
935
936        Ok(vec![query])
937    }
938}