1mod correlation;
10#[cfg(test)]
11mod tests;
12
13use std::collections::HashMap;
14use std::sync::LazyLock;
15
16use regex::Regex;
17use rsigma_eval::pipeline::state::PipelineState;
18use rsigma_parser::*;
19
20use crate::backend::*;
21use crate::condition::convert_condition_expr;
22use crate::convert::{default_convert_detection, default_convert_detection_item};
23use crate::error::{ConvertError, Result};
24use crate::state::{ConversionState, ConvertResult};
25
26fn validate_sql_identifier(s: &str) -> Result<()> {
27 static RE: LazyLock<Regex> =
28 LazyLock::new(|| Regex::new(r"^[A-Za-z_][A-Za-z0-9_$]*$").unwrap());
29 if RE.is_match(s) {
30 Ok(())
31 } else {
32 Err(ConvertError::InvalidIdentifier(s.to_string()))
33 }
34}
35
36pub static POSTGRES_CONFIG: TextQueryConfig = TextQueryConfig {
41 precedence: (TokenType::NOT, TokenType::AND, TokenType::OR),
42 group_expression: "({expr})",
43 token_separator: " ",
44
45 and_token: "AND",
46 or_token: "OR",
47 not_token: "NOT",
48 eq_token: " = ",
49
50 not_eq_token: Some(" <> "),
51 eq_expression: None,
52 not_eq_expression: None,
53 convert_not_as_not_eq: false,
54
55 wildcard_multi: "%",
56 wildcard_single: "_",
57
58 str_quote: "'",
59 str_quote_pattern: None,
60 str_quote_pattern_negation: false,
61 escape_char: "'",
62 add_escaped: &[],
63 filter_chars: &[],
64
65 field_quote: Some("\""),
66 field_quote_pattern: Some(r"^[a-z_][a-z0-9_]*$"),
67 field_quote_pattern_negation: true,
68 field_escape: None,
69 field_escape_pattern: None,
70
71 startswith_expression: Some("{field} ILIKE {value}"),
72 not_startswith_expression: Some("{field} NOT ILIKE {value}"),
73 startswith_expression_allow_special: false,
74 endswith_expression: Some("{field} ILIKE {value}"),
75 not_endswith_expression: Some("{field} NOT ILIKE {value}"),
76 endswith_expression_allow_special: false,
77 contains_expression: Some("{field} ILIKE {value}"),
78 not_contains_expression: Some("{field} NOT ILIKE {value}"),
79 contains_expression_allow_special: false,
80 wildcard_match_expression: Some("{field} ILIKE {value}"),
81
82 case_sensitive_match_expression: Some("{field} LIKE {value}"),
83 case_sensitive_startswith_expression: Some("{field} LIKE {value}"),
84 case_sensitive_endswith_expression: Some("{field} LIKE {value}"),
85 case_sensitive_contains_expression: Some("{field} LIKE {value}"),
86
87 re_expression: Some("{field} ~* {regex}"),
88 not_re_expression: Some("{field} !~* {regex}"),
89 re_escape_char: None,
90 re_escape: &[],
91 re_escape_escape_char: None,
92
93 cidr_expression: Some("({field})::inet <<= {value}::cidr"),
94 not_cidr_expression: Some("NOT (({field})::inet <<= {value}::cidr)"),
95
96 field_null_expression: "{field} IS NULL",
97 field_exists_expression: Some("{field} IS NOT NULL"),
98 field_not_exists_expression: Some("{field} IS NULL"),
99
100 compare_op_expression: Some("{field} {op} {value}"),
101 compare_ops: &[("gt", ">"), ("gte", ">="), ("lt", "<"), ("lte", "<=")],
102
103 convert_or_as_in: true,
104 convert_and_as_in: false,
105 in_expressions_allow_wildcards: false,
106 field_in_list_expression: Some("{field} {op} ({list})"),
107 or_in_operator: Some("IN"),
108 and_in_operator: None,
109 list_separator: ", ",
110
111 unbound_value_str_expression: None,
112 unbound_value_num_expression: None,
113 unbound_value_re_expression: None,
114
115 field_eq_field_expression: Some("{field1} = {field2}"),
116 field_eq_field_escaping_quoting: true,
117
118 deferred_start: None,
119 deferred_separator: None,
120 deferred_only_query: "",
121
122 bool_true: "true",
123 bool_false: "false",
124 query_expression: "SELECT * FROM {table} WHERE {query}",
125 state_defaults: &[("table", "security_events")],
126};
127
128pub struct PostgresBackend {
134 pub config: &'static TextQueryConfig,
135 pub table: String,
137 pub timestamp_field: String,
139 pub json_field: Option<String>,
141 pub case_sensitive_re: bool,
143 pub schema: Option<String>,
145 pub database: Option<String>,
147 pub timescaledb: bool,
149}
150
151impl PostgresBackend {
152 pub fn new() -> Self {
153 Self {
154 config: &POSTGRES_CONFIG,
155 table: "security_events".to_string(),
156 timestamp_field: "time".to_string(),
157 json_field: None,
158 case_sensitive_re: false,
159 schema: None,
160 database: None,
161 timescaledb: false,
162 }
163 }
164
165 pub fn from_options(options: &HashMap<String, String>) -> Self {
172 let mut backend = Self::new();
173 if let Some(v) = options.get("table") {
174 backend.table = v.clone();
175 }
176 if let Some(v) = options.get("schema") {
177 backend.schema = Some(v.clone());
178 }
179 if let Some(v) = options.get("database") {
180 backend.database = Some(v.clone());
181 }
182 if let Some(v) = options.get("timestamp_field") {
183 backend.timestamp_field = v.clone();
184 }
185 if let Some(v) = options.get("json_field") {
186 backend.json_field = Some(v.clone());
187 }
188 if let Some(v) = options.get("case_sensitive_re") {
189 backend.case_sensitive_re = v == "true";
190 }
191 backend
192 }
193
194 fn resolve_table(
200 &self,
201 custom_attrs: &HashMap<String, yaml_serde::Value>,
202 state: &HashMap<String, serde_json::Value>,
203 ) -> Result<String> {
204 let table = custom_attrs
205 .get("postgres.table")
206 .and_then(|v| v.as_str())
207 .or(state.get("table").and_then(|v| v.as_str()))
208 .unwrap_or(&self.table);
209 validate_sql_identifier(table)?;
210
211 let schema = custom_attrs
212 .get("postgres.schema")
213 .and_then(|v| v.as_str())
214 .or(state.get("schema").and_then(|v| v.as_str()))
215 .or(self.schema.as_deref())
216 .filter(|s| !s.is_empty());
217
218 if let Some(s) = schema {
219 validate_sql_identifier(s)?;
220 Ok(format!("{s}.{table}"))
221 } else {
222 Ok(table.to_string())
223 }
224 }
225
226 fn qualify_table_name(
229 &self,
230 table: &str,
231 state: &HashMap<String, serde_json::Value>,
232 per_rule_schema: Option<&str>,
233 ) -> Result<String> {
234 validate_sql_identifier(table)?;
235
236 let schema = per_rule_schema
237 .or(state.get("schema").and_then(|v| v.as_str()))
238 .or(self.schema.as_deref())
239 .filter(|s| !s.is_empty());
240
241 if let Some(s) = schema {
242 validate_sql_identifier(s)?;
243 Ok(format!("{s}.{table}"))
244 } else {
245 Ok(table.to_string())
246 }
247 }
248
249 fn field_expr(&self, field: &str) -> Result<String> {
250 match &self.json_field {
251 Some(json_col) if field.contains('.') => {
252 let parts: Vec<&str> = field.split('.').collect();
253 let last = parts.len() - 1;
254 let mut expr = json_col.clone();
255 for (i, part) in parts.iter().enumerate() {
256 validate_sql_identifier(part)?;
257 let escaped = part.replace('\'', "''");
258 if i == last {
259 expr.push_str(&format!("->>'{escaped}'"));
260 } else {
261 expr.push_str(&format!("->'{escaped}'"));
262 }
263 }
264 Ok(expr)
265 }
266 Some(json_col) => {
267 validate_sql_identifier(field)?;
268 let escaped = field.replace('\'', "''");
269 Ok(format!("{json_col}->>'{escaped}'"))
270 }
271 None => Ok(text_escape_and_quote_field(self.config, field)),
272 }
273 }
274
275 fn escape_sql_str(&self, s: &str) -> String {
278 s.replace('\'', "''")
279 }
280
281 fn build_like_value(&self, value: &SigmaString) -> String {
284 let mut result = String::with_capacity(value.original.len() + 2);
285 result.push('\'');
286 for part in &value.parts {
287 match part {
288 StringPart::Plain(s) => {
289 for ch in s.chars() {
290 match ch {
291 '\'' => result.push_str("''"),
292 '%' => result.push_str("\\%"),
293 '_' => result.push_str("\\_"),
294 '\\' => result.push_str("\\\\"),
295 _ => result.push(ch),
296 }
297 }
298 }
299 StringPart::Special(SpecialChar::WildcardMulti) => result.push('%'),
300 StringPart::Special(SpecialChar::WildcardSingle) => result.push('_'),
301 }
302 }
303 result.push('\'');
304 result
305 }
306
307 fn wrap_like_wildcards(
310 &self,
311 quoted: &str,
312 is_contains: bool,
313 is_startswith: bool,
314 is_endswith: bool,
315 ) -> String {
316 if !is_contains && !is_startswith && !is_endswith {
317 return quoted.to_string();
318 }
319 let inner = "ed[1..quoted.len() - 1];
320 let prefix = if is_contains || is_endswith { "%" } else { "" };
321 let suffix = if is_contains || is_startswith {
322 "%"
323 } else {
324 ""
325 };
326 format!("'{prefix}{inner}{suffix}'")
327 }
328
329 fn build_plain_value(&self, value: &SigmaString) -> String {
331 let plain = value.as_plain().unwrap_or_else(|| value.original.clone());
332 format!("'{}'", self.escape_sql_str(&plain))
333 }
334}
335
336impl Default for PostgresBackend {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342impl Backend for PostgresBackend {
343 fn name(&self) -> &str {
344 "postgres"
345 }
346
347 fn formats(&self) -> &[(&str, &str)] {
348 &[
349 ("default", "Plain PostgreSQL SQL"),
350 ("view", "CREATE OR REPLACE VIEW for each rule"),
351 (
352 "timescaledb",
353 "TimescaleDB-optimized queries with time_bucket()",
354 ),
355 (
356 "continuous_aggregate",
357 "CREATE MATERIALIZED VIEW ... WITH (timescaledb.continuous)",
358 ),
359 (
360 "sliding_window",
361 "Correlation queries using window functions for per-row sliding detection",
362 ),
363 ]
364 }
365
366 fn requires_pipeline(&self) -> bool {
367 false
368 }
369
370 fn convert_rule(
373 &self,
374 rule: &SigmaRule,
375 output_format: &str,
376 pipeline_state: &PipelineState,
377 ) -> Result<Vec<String>> {
378 let mut queries = Vec::new();
379 for (idx, cond_expr) in rule.detection.conditions.iter().enumerate() {
380 let mut state = ConversionState::new(pipeline_state.state.clone());
381 state
382 .processing_state
383 .insert("_output_format".to_string(), output_format.into());
384 let query = self.convert_condition(cond_expr, &rule.detection.named, &mut state)?;
385 let finished = self.finish_query(rule, query, &state)?;
386 let finalized = self.finalize_query(rule, finished, idx, &state, output_format)?;
387 queries.push(finalized);
388 }
389 Ok(queries)
390 }
391
392 fn convert_condition(
395 &self,
396 expr: &ConditionExpr,
397 detections: &HashMap<String, Detection>,
398 state: &mut ConversionState,
399 ) -> Result<String> {
400 convert_condition_expr(self, expr, detections, state)
401 }
402
403 fn convert_condition_and(&self, exprs: &[String]) -> Result<String> {
404 Ok(text_convert_condition_and(self.config, exprs))
405 }
406
407 fn convert_condition_or(&self, exprs: &[String]) -> Result<String> {
408 Ok(text_convert_condition_or(self.config, exprs))
409 }
410
411 fn convert_condition_not(&self, expr: &str) -> Result<String> {
412 Ok(text_convert_condition_not(self.config, expr))
413 }
414
415 fn convert_detection(&self, det: &Detection, state: &mut ConversionState) -> Result<String> {
418 default_convert_detection(self, det, state)
419 }
420
421 fn convert_detection_item(
422 &self,
423 item: &DetectionItem,
424 state: &mut ConversionState,
425 ) -> Result<String> {
426 default_convert_detection_item(self, item, state)
427 }
428
429 fn escape_and_quote_field(&self, field: &str) -> String {
432 self.field_expr(field)
433 .unwrap_or_else(|_| text_escape_and_quote_field(self.config, field))
434 }
435
436 fn convert_value_str(&self, value: &SigmaString, _state: &ConversionState) -> String {
437 self.build_like_value(value)
438 }
439
440 fn convert_value_re(&self, regex: &str, _state: &ConversionState) -> String {
441 format!("'{}'", self.escape_sql_str(regex))
442 }
443
444 fn convert_field_eq_str(
447 &self,
448 field: &str,
449 value: &SigmaString,
450 modifiers: &[Modifier],
451 _state: &mut ConversionState,
452 ) -> Result<ConvertResult> {
453 let f = self.field_expr(field)?;
454 let is_cased = modifiers.contains(&Modifier::Cased);
455 let is_contains = modifiers.contains(&Modifier::Contains);
456 let is_startswith = modifiers.contains(&Modifier::StartsWith);
457 let is_endswith = modifiers.contains(&Modifier::EndsWith);
458 let has_wildcards = value.contains_wildcards();
459
460 let like_op = if is_cased { "LIKE" } else { "ILIKE" };
461
462 if is_contains || is_startswith || is_endswith || has_wildcards {
463 let inner = self.build_like_value(value);
464 let val = self.wrap_like_wildcards(&inner, is_contains, is_startswith, is_endswith);
465 return Ok(ConvertResult::Query(format!("{f} {like_op} {val}")));
466 }
467
468 let val = self.build_plain_value(value);
469 Ok(ConvertResult::Query(format!("{f} = {val}")))
470 }
471
472 fn convert_field_eq_str_case_sensitive(
473 &self,
474 field: &str,
475 value: &SigmaString,
476 modifiers: &[Modifier],
477 state: &mut ConversionState,
478 ) -> Result<ConvertResult> {
479 let mut mods = modifiers.to_vec();
480 if !mods.contains(&Modifier::Cased) {
481 mods.push(Modifier::Cased);
482 }
483 self.convert_field_eq_str(field, value, &mods, state)
484 }
485
486 fn convert_field_eq_num(
487 &self,
488 field: &str,
489 value: f64,
490 _state: &mut ConversionState,
491 ) -> Result<String> {
492 let f = self.field_expr(field)?;
493 if value.fract() == 0.0 && (i64::MIN as f64..=i64::MAX as f64).contains(&value) {
494 Ok(format!("{f} = {}", value as i64))
495 } else {
496 Ok(format!("{f} = {value}"))
497 }
498 }
499
500 fn convert_field_eq_bool(
501 &self,
502 field: &str,
503 value: bool,
504 _state: &mut ConversionState,
505 ) -> Result<String> {
506 let f = self.field_expr(field)?;
507 let v = if value {
508 self.config.bool_true
509 } else {
510 self.config.bool_false
511 };
512 Ok(format!("{f} = {v}"))
513 }
514
515 fn convert_field_eq_null(&self, field: &str, _state: &mut ConversionState) -> Result<String> {
516 let f = self.field_expr(field)?;
517 Ok(format!("{f} IS NULL"))
518 }
519
520 fn convert_field_eq_re(
521 &self,
522 field: &str,
523 pattern: &str,
524 flags: &[Modifier],
525 _state: &mut ConversionState,
526 ) -> Result<ConvertResult> {
527 let f = self.field_expr(field)?;
528 let escaped_pattern = self.escape_sql_str(pattern);
529 let is_cased = flags.contains(&Modifier::Cased) || self.case_sensitive_re;
530 let op = if is_cased { "~" } else { "~*" };
531 Ok(ConvertResult::Query(format!(
532 "{f} {op} '{escaped_pattern}'"
533 )))
534 }
535
536 fn convert_field_eq_cidr(
537 &self,
538 field: &str,
539 cidr: &str,
540 _state: &mut ConversionState,
541 ) -> Result<ConvertResult> {
542 let f = self.field_expr(field)?;
543 Ok(ConvertResult::Query(format!(
544 "({f})::inet <<= '{cidr}'::cidr"
545 )))
546 }
547
548 fn convert_field_compare(
549 &self,
550 field: &str,
551 op: &Modifier,
552 value: f64,
553 _state: &mut ConversionState,
554 ) -> Result<String> {
555 let f = self.field_expr(field)?;
556 let op_token = match op {
557 Modifier::Lt => "<",
558 Modifier::Lte => "<=",
559 Modifier::Gt => ">",
560 Modifier::Gte => ">=",
561 _ => {
562 return Err(ConvertError::UnsupportedModifier(format!(
563 "compare op {op:?}"
564 )));
565 }
566 };
567 let val_str =
568 if value.fract() == 0.0 && (i64::MIN as f64..=i64::MAX as f64).contains(&value) {
569 (value as i64).to_string()
570 } else {
571 value.to_string()
572 };
573 Ok(format!("{f} {op_token} {val_str}"))
574 }
575
576 fn convert_field_exists(
577 &self,
578 field: &str,
579 exists: bool,
580 _state: &mut ConversionState,
581 ) -> Result<String> {
582 let f = self.field_expr(field)?;
583 if exists {
584 Ok(format!("{f} IS NOT NULL"))
585 } else {
586 Ok(format!("{f} IS NULL"))
587 }
588 }
589
590 fn convert_field_eq_query_expr(
591 &self,
592 field: &str,
593 expr: &str,
594 _id: &str,
595 _state: &mut ConversionState,
596 ) -> Result<String> {
597 let f = self.field_expr(field)?;
598 let resolved = expr.replace("{field}", &f);
599 Ok(resolved)
600 }
601
602 fn convert_field_ref(
603 &self,
604 field1: &str,
605 field2: &str,
606 _state: &mut ConversionState,
607 ) -> Result<ConvertResult> {
608 let f1 = self.field_expr(field1)?;
609 let f2 = self.field_expr(field2)?;
610 Ok(ConvertResult::Query(format!("{f1} = {f2}")))
611 }
612
613 fn convert_keyword(&self, value: &SigmaValue, _state: &mut ConversionState) -> Result<String> {
614 let search_target = match &self.json_field {
615 Some(json_col) => format!("{json_col}::text"),
616 None => "ROW(*)::text".to_string(),
617 };
618 match value {
619 SigmaValue::String(s) => {
620 let plain = s.as_plain().unwrap_or_else(|| s.original.clone());
621 if plain.is_empty() {
622 return Err(ConvertError::UnsupportedKeyword);
623 }
624 let escaped = self.escape_sql_str(&plain);
625 Ok(format!(
626 "to_tsvector('simple', {search_target}) @@ plainto_tsquery('simple', '{escaped}')"
627 ))
628 }
629 SigmaValue::Integer(n) => Ok(format!(
630 "to_tsvector('simple', {search_target}) @@ plainto_tsquery('simple', '{n}')"
631 )),
632 SigmaValue::Float(f) => Ok(format!(
633 "to_tsvector('simple', {search_target}) @@ plainto_tsquery('simple', '{f}')"
634 )),
635 _ => Err(ConvertError::UnsupportedKeyword),
636 }
637 }
638
639 fn convert_condition_as_in_expression(
640 &self,
641 field: &str,
642 values: &[&SigmaValue],
643 is_or: bool,
644 _state: &mut ConversionState,
645 ) -> Result<String> {
646 if !is_or {
647 return Err(ConvertError::UnsupportedModifier(
648 "AND IN-list not supported for PostgreSQL".into(),
649 ));
650 }
651 let f = self.field_expr(field)?;
652 let items: Vec<String> = values
653 .iter()
654 .map(|v| match v {
655 SigmaValue::String(s) => self.build_plain_value(s),
656 SigmaValue::Integer(n) => n.to_string(),
657 SigmaValue::Float(f) => f.to_string(),
658 _ => String::new(),
659 })
660 .collect();
661 let list = items.join(", ");
662 Ok(format!("{f} IN ({list})"))
663 }
664
665 fn finish_query(
668 &self,
669 rule: &SigmaRule,
670 query: String,
671 state: &ConversionState,
672 ) -> Result<String> {
673 let qualified = self.resolve_table(&rule.custom_attributes, &state.processing_state)?;
674
675 let is_timescaledb = state
676 .get_state_str("_output_format")
677 .is_some_and(|f| f == "timescaledb" || f == "continuous_aggregate");
678
679 let base_cols = if rule.fields.is_empty() {
680 "*".to_string()
681 } else {
682 rule.fields
683 .iter()
684 .map(|f| self.format_select_field(f))
685 .collect::<Result<Vec<_>>>()?
686 .join(", ")
687 };
688
689 let select_cols = if is_timescaledb {
690 format!(
691 "time_bucket('1 hour', {}) AS bucket, {}",
692 self.timestamp_field, base_cols
693 )
694 } else {
695 base_cols
696 };
697
698 let custom_tmpl = state.get_state_str("query_expression_template");
699
700 let effective_tmpl = match custom_tmpl {
701 Some(t) => t.to_string(),
702 None => format!("SELECT {select_cols} FROM {{table}} WHERE {{query}}"),
703 };
704
705 let mut result = effective_tmpl.replace("{query}", &query);
706 result = result.replace("{table}", &qualified);
707 result = result.replace("{rule.title}", &rule.title);
708 if let Some(id) = &rule.id {
709 result = result.replace("{rule.id}", id);
710 }
711
712 Ok(result)
713 }
714
715 fn finalize_query(
716 &self,
717 rule: &SigmaRule,
718 query: String,
719 _index: usize,
720 _state: &ConversionState,
721 output_format: &str,
722 ) -> Result<String> {
723 let view_name = || {
724 let raw = match &rule.id {
725 Some(id) => id.replace('-', "_"),
726 None => rule.title.to_lowercase().replace([' ', '-'], "_"),
727 };
728 let sanitized: String = raw
729 .chars()
730 .filter(|c| c.is_ascii_alphanumeric() || *c == '_')
731 .collect();
732 if sanitized.is_empty() {
733 "sigma_rule".to_string()
734 } else {
735 format!("sigma_{sanitized}")
736 }
737 };
738
739 match output_format {
740 "default" | "timescaledb" => Ok(query),
741 "view" => Ok(format!("CREATE OR REPLACE VIEW {} AS {query}", view_name())),
742 "continuous_aggregate" => Ok(format!(
743 "CREATE MATERIALIZED VIEW {} \
744 WITH (timescaledb.continuous) AS {query} \
745 WITH NO DATA",
746 view_name()
747 )),
748 other => Err(ConvertError::RuleConversion(format!(
749 "unknown output format: {other}"
750 ))),
751 }
752 }
753
754 fn finalize_output(&self, queries: Vec<String>, output_format: &str) -> Result<String> {
755 let sep = match output_format {
756 "view" | "continuous_aggregate" => ";\n\n",
757 _ => "\n",
758 };
759 Ok(queries.join(sep))
760 }
761
762 fn supports_correlation(&self) -> bool {
765 true
766 }
767
768 fn convert_correlation_rule(
769 &self,
770 rule: &CorrelationRule,
771 output_format: &str,
772 pipeline_state: &PipelineState,
773 ) -> Result<Vec<String>> {
774 let table = self.resolve_table(&rule.custom_attributes, &pipeline_state.state)?;
775 let ts = &self.timestamp_field;
776 let use_time_bucket =
777 output_format == "timescaledb" || output_format == "continuous_aggregate";
778
779 let mut group_by_cols: Vec<String> = rule
780 .group_by
781 .iter()
782 .map(|g| self.field_expr(g))
783 .collect::<Result<_>>()?;
784 if use_time_bucket {
785 group_by_cols.insert(0, format!("time_bucket('1 hour', {ts})"));
786 }
787 let group_by_clause = if group_by_cols.is_empty() {
788 String::new()
789 } else {
790 format!(" GROUP BY {}", group_by_cols.join(", "))
791 };
792 let group_by_select = if group_by_cols.is_empty() {
793 String::new()
794 } else {
795 format!("{}, ", group_by_cols.join(", "))
796 };
797
798 let window_secs = rule.timespan.seconds;
799 let having_clause = self.build_having_clause(&rule.condition)?;
800
801 let field_from_condition = match &rule.condition {
802 CorrelationCondition::Threshold { field, .. } => {
803 field.as_ref().and_then(|f| f.first().cloned())
804 }
805 _ => None,
806 };
807 let value_field = field_from_condition.as_deref().or_else(|| {
808 rule.aliases
809 .first()
810 .and_then(|a| a.mapping.values().next().map(|s| s.as_str()))
811 });
812
813 let rule_tables: HashMap<String, String> = pipeline_state
815 .state
816 .get("_rule_tables")
817 .and_then(|v| serde_json::from_value(v.clone()).ok())
818 .unwrap_or_default();
819
820 let rule_queries: HashMap<String, String> = pipeline_state
822 .state
823 .get("_rule_queries")
824 .and_then(|v| serde_json::from_value(v.clone()).ok())
825 .unwrap_or_default();
826
827 let (cte_prefix, source_table, time_filter) =
828 self.build_correlation_source(&rule.rules, &rule_queries, &table, ts, window_secs);
829
830 let query = match rule.correlation_type {
831 CorrelationType::EventCount if output_format == "sliding_window" => self
832 .build_sliding_window_query(
833 &cte_prefix,
834 &source_table,
835 &time_filter,
836 &rule.group_by,
837 ts,
838 window_secs,
839 &rule.condition,
840 )?,
841 CorrelationType::EventCount => {
842 format!(
843 "{cte_prefix}SELECT {group_by_select}COUNT(*) AS event_count \
844 FROM {source_table}\
845 {time_filter}\
846 {group_by_clause} \
847 HAVING {having_clause}",
848 having_clause = having_clause.replace("{agg}", "COUNT(*)")
849 )
850 }
851 CorrelationType::ValueCount => {
852 let field = match value_field {
853 Some(f) => self.field_expr(f)?,
854 None => "'unknown_field'".to_string(),
855 };
856 let agg = format!("COUNT(DISTINCT {field})");
857 format!(
858 "{cte_prefix}SELECT {group_by_select}{agg} AS value_count \
859 FROM {source_table}\
860 {time_filter}\
861 {group_by_clause} \
862 HAVING {having_clause}",
863 having_clause = having_clause.replace("{agg}", &agg)
864 )
865 }
866 CorrelationType::Temporal | CorrelationType::TemporalOrdered => self
867 .build_temporal_query(
868 rule,
869 &table,
870 ts,
871 window_secs,
872 &group_by_select,
873 &group_by_clause,
874 &having_clause,
875 &rule_tables,
876 pipeline_state,
877 )?,
878 CorrelationType::ValueSum => {
879 let field = match value_field {
880 Some(f) => self.field_expr(f)?,
881 None => "'unknown_field'".to_string(),
882 };
883 let agg = format!("SUM({field})");
884 format!(
885 "{cte_prefix}SELECT {group_by_select}{agg} AS value_sum \
886 FROM {source_table}\
887 {time_filter}\
888 {group_by_clause} \
889 HAVING {having_clause}",
890 having_clause = having_clause.replace("{agg}", &agg)
891 )
892 }
893 CorrelationType::ValueAvg => {
894 let field = match value_field {
895 Some(f) => self.field_expr(f)?,
896 None => "'unknown_field'".to_string(),
897 };
898 let agg = format!("AVG({field})");
899 format!(
900 "{cte_prefix}SELECT {group_by_select}{agg} AS value_avg \
901 FROM {source_table}\
902 {time_filter}\
903 {group_by_clause} \
904 HAVING {having_clause}",
905 having_clause = having_clause.replace("{agg}", &agg)
906 )
907 }
908 CorrelationType::ValuePercentile | CorrelationType::ValueMedian => {
909 let field = match value_field {
910 Some(f) => self.field_expr(f)?,
911 None => "'unknown_field'".to_string(),
912 };
913 let percentile = if rule.correlation_type == CorrelationType::ValueMedian {
914 0.5
915 } else {
916 match &rule.condition {
917 CorrelationCondition::Threshold { percentile, .. } => {
918 percentile.map(|p| p as f64 / 100.0).unwrap_or(0.95)
919 }
920 _ => 0.95,
921 }
922 };
923 let agg = format!("PERCENTILE_CONT({percentile}) WITHIN GROUP (ORDER BY {field})");
924 format!(
925 "{cte_prefix}SELECT {group_by_select}\
926 {agg} AS pct_value \
927 FROM {source_table}\
928 {time_filter}\
929 {group_by_clause} \
930 HAVING {having_clause}",
931 having_clause = having_clause.replace("{agg}", &agg)
932 )
933 }
934 };
935
936 Ok(vec![query])
937 }
938}