Skip to main content

kaizen/core_loop/
rules.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2use crate::core_loop::{AlertSeverity, LocalRule, RuleAction, TraceHit};
3use crate::store::Store;
4use anyhow::{Result, anyhow};
5use rusqlite::{OptionalExtension, params};
6
7#[derive(Debug, Clone, serde::Serialize)]
8pub struct RuleRun {
9    pub rule_id: String,
10    pub hits: usize,
11    pub actions: usize,
12}
13
14pub fn create(
15    store: &Store,
16    name: &str,
17    filter: &str,
18    action: RuleAction,
19    now_ms: u64,
20) -> Result<LocalRule> {
21    let rule = LocalRule {
22        id: uuid::Uuid::now_v7().to_string(),
23        name: name.into(),
24        filter: filter.into(),
25        action,
26        enabled: true,
27        created_at_ms: now_ms,
28    };
29    store.conn().execute(
30        "INSERT INTO rules (id, name, filter, action_json, enabled, created_at_ms)
31         VALUES (?1, ?2, ?3, ?4, 1, ?5)",
32        params![
33            rule.id,
34            rule.name,
35            rule.filter,
36            serde_json::to_string(&rule.action)?,
37            rule.created_at_ms as i64
38        ],
39    )?;
40    Ok(rule)
41}
42
43pub fn list(store: &Store) -> Result<Vec<LocalRule>> {
44    let mut stmt = store.conn().prepare("SELECT id, name, filter, action_json, enabled, created_at_ms FROM rules ORDER BY created_at_ms DESC")?;
45    let rows = stmt.query_map([], row)?;
46    rows.map(|r| r.map_err(anyhow::Error::from)).collect()
47}
48
49pub fn set_enabled(store: &Store, id: &str, enabled: bool) -> Result<()> {
50    store.conn().execute(
51        "UPDATE rules SET enabled = ?2 WHERE id = ?1",
52        params![id, enabled as i64],
53    )?;
54    Ok(())
55}
56
57pub fn run_enabled(
58    store: &Store,
59    workspace: &str,
60    start_ms: u64,
61    now_ms: u64,
62    dry_run: bool,
63) -> Result<Vec<RuleRun>> {
64    list(store)?
65        .into_iter()
66        .filter(|r| r.enabled)
67        .map(|r| run_one(store, workspace, start_ms, now_ms, dry_run, r))
68        .collect()
69}
70
71fn run_one(
72    store: &Store,
73    workspace: &str,
74    start_ms: u64,
75    now_ms: u64,
76    dry_run: bool,
77    rule: LocalRule,
78) -> Result<RuleRun> {
79    let hits = crate::core_loop::query::run(store, workspace, &rule.filter, start_ms, 100)?;
80    let actions = if dry_run {
81        0
82    } else {
83        apply_all(store, &rule, &hits, now_ms)?
84    };
85    Ok(RuleRun {
86        rule_id: rule.id,
87        hits: hits.len(),
88        actions,
89    })
90}
91
92fn apply_all(store: &Store, rule: &LocalRule, hits: &[TraceHit], now_ms: u64) -> Result<usize> {
93    hits.iter()
94        .map(|h| apply_one(store, rule, h, now_ms))
95        .try_fold(0, |n, r| r.map(|_| n + 1))
96}
97
98fn apply_one(store: &Store, rule: &LocalRule, hit: &TraceHit, now_ms: u64) -> Result<()> {
99    match &rule.action {
100        RuleAction::CreateCase { label } => case_action(store, rule, hit, label.clone(), now_ms),
101        RuleAction::QueueReview { title } => review_action(store, rule, hit, title.clone(), now_ms),
102        RuleAction::EmitAlert { severity } => alert_action(store, rule, hit, *severity, now_ms),
103    }
104}
105
106fn case_action(
107    store: &Store,
108    rule: &LocalRule,
109    hit: &TraceHit,
110    label: Option<String>,
111    now_ms: u64,
112) -> Result<()> {
113    let s = store
114        .get_session(&hit.session_id)?
115        .ok_or_else(|| anyhow!("session not found"))?;
116    let key = format!("rule:{}:case:{}", rule.id, hit_key(hit));
117    let rec = crate::core_loop::cases::create_case(
118        store,
119        &s,
120        &key,
121        &format!("rule:{}", rule.name),
122        label,
123        now_ms,
124    )?;
125    crate::core_loop::cases::add_ref(store, &rec.id, "hit", &hit_key(hit))
126}
127
128fn review_action(
129    store: &Store,
130    rule: &LocalRule,
131    hit: &TraceHit,
132    title: Option<String>,
133    now_ms: u64,
134) -> Result<()> {
135    let title = title.unwrap_or_else(|| format!("Review {}", rule.name));
136    let key = format!("rule:{}:review:{}", rule.id, hit_key(hit));
137    crate::core_loop::review::create(store, &key, &hit.session_id, &title, now_ms)?;
138    Ok(())
139}
140
141fn alert_action(
142    store: &Store,
143    rule: &LocalRule,
144    hit: &TraceHit,
145    severity: AlertSeverity,
146    now_ms: u64,
147) -> Result<()> {
148    let key = format!("rule:{}:alert:{}", rule.id, hit_key(hit));
149    crate::core_loop::alerts::emit(
150        store,
151        &key,
152        &rule.name,
153        severity,
154        &hit.summary,
155        Some(&hit.session_id),
156        now_ms,
157    )?;
158    Ok(())
159}
160
161fn hit_key(hit: &TraceHit) -> String {
162    hit.seq
163        .map(|s| format!("{}:{s}", hit.session_id))
164        .unwrap_or_else(|| hit.session_id.clone())
165}
166
167fn row(r: &rusqlite::Row<'_>) -> rusqlite::Result<LocalRule> {
168    let action_json: String = r.get(3)?;
169    Ok(LocalRule {
170        id: r.get(0)?,
171        name: r.get(1)?,
172        filter: r.get(2)?,
173        action: serde_json::from_str(&action_json).unwrap_or(RuleAction::EmitAlert {
174            severity: AlertSeverity::Warning,
175        }),
176        enabled: r.get::<_, i64>(4)? != 0,
177        created_at_ms: r.get::<_, i64>(5)? as u64,
178    })
179}
180
181pub fn get(store: &Store, id: &str) -> Result<LocalRule> {
182    let sql =
183        "SELECT id, name, filter, action_json, enabled, created_at_ms FROM rules WHERE id = ?1";
184    store
185        .conn()
186        .query_row(sql, params![id], row)
187        .optional()?
188        .ok_or_else(|| anyhow!("rule not found: {id}"))
189}