Skip to main content

zeph_tools/compression/
store.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! SQLite/Postgres-backed storage for TACO compression rules.
5
6use std::sync::Arc;
7
8use zeph_db::DbPool;
9
10/// A single compression rule stored in the database.
11#[derive(Debug, Clone, sqlx::FromRow)]
12pub struct CompressionRule {
13    /// UUID v4 string identifier.
14    pub id: String,
15    /// Optional glob pattern matching tool names (e.g., `"shell"`, `"web_*"`).
16    pub tool_glob: Option<String>,
17    /// Regex pattern applied to tool output.
18    pub pattern: String,
19    /// Replacement template (may reference capture groups, e.g. `"$1"`).
20    pub replacement_template: String,
21    /// Number of times this rule has matched. Updated by [`CompressionRuleStore::increment_hits`].
22    pub hit_count: i64,
23    /// Origin of this rule: `"operator"` (config-inserted) or `"llm-evolved"` (auto-generated).
24    pub source: String,
25    /// RFC 3339 creation timestamp.
26    pub created_at: String,
27}
28
29/// Persistence layer for TACO compression rules.
30///
31/// All rules are loaded at startup via [`CompressionRuleStore::list_active`] and cached in
32/// [`super::RuleBasedCompressor`]. Hit counts are flushed in batches via
33/// [`CompressionRuleStore::increment_hits`] during the `maybe_autodream` maintenance pass.
34#[derive(Clone)]
35pub struct CompressionRuleStore {
36    pool: Arc<DbPool>,
37}
38
39impl CompressionRuleStore {
40    /// Construct a store backed by the given pool.
41    #[must_use]
42    pub fn new(pool: Arc<DbPool>) -> Self {
43        Self { pool }
44    }
45
46    /// Return all rules, ordered by ascending hit count (least-used first for pruning).
47    ///
48    /// # Errors
49    ///
50    /// Returns a database error on failure.
51    pub async fn list_active(&self) -> Result<Vec<CompressionRule>, zeph_db::SqlxError> {
52        sqlx::query_as(zeph_db::sql!(
53            "SELECT id, tool_glob, pattern, replacement_template, hit_count, source, created_at \
54             FROM compression_rules ORDER BY hit_count ASC"
55        ))
56        .fetch_all(self.pool.as_ref())
57        .await
58    }
59
60    /// Insert or update a rule (keyed by `(tool_glob, pattern)`).
61    ///
62    /// # Errors
63    ///
64    /// Returns a database error on failure.
65    pub async fn upsert(&self, rule: &CompressionRule) -> Result<(), zeph_db::SqlxError> {
66        sqlx::query(zeph_db::sql!(
67            "INSERT INTO compression_rules \
68             (id, tool_glob, pattern, replacement_template, hit_count, source, created_at) \
69             VALUES (?, ?, ?, ?, ?, ?, ?) \
70             ON CONFLICT(tool_glob, pattern) DO UPDATE SET \
71             replacement_template = excluded.replacement_template, \
72             source = excluded.source"
73        ))
74        .bind(&rule.id)
75        .bind(&rule.tool_glob)
76        .bind(&rule.pattern)
77        .bind(&rule.replacement_template)
78        .bind(rule.hit_count)
79        .bind(&rule.source)
80        .bind(&rule.created_at)
81        .execute(self.pool.as_ref())
82        .await?;
83        Ok(())
84    }
85
86    /// Batch-increment hit counts for a set of rule IDs.
87    ///
88    /// Called during the `maybe_autodream` maintenance pass. Uses individual
89    /// UPDATE statements rather than a batch because the count of rules is small
90    /// and cross-backend portability is preferred.
91    ///
92    /// # Errors
93    ///
94    /// Returns a database error on failure.
95    pub async fn increment_hits(&self, batch: &[(String, u64)]) -> Result<(), zeph_db::SqlxError> {
96        for (id, delta) in batch {
97            sqlx::query(zeph_db::sql!(
98                "UPDATE compression_rules SET hit_count = hit_count + ? WHERE id = ?"
99            ))
100            .bind((*delta).cast_signed())
101            .bind(id.as_str())
102            .execute(self.pool.as_ref())
103            .await?;
104        }
105        Ok(())
106    }
107
108    /// Delete a rule by ID.
109    ///
110    /// # Errors
111    ///
112    /// Returns a database error on failure.
113    pub async fn delete(&self, id: &str) -> Result<(), zeph_db::SqlxError> {
114        sqlx::query(zeph_db::sql!("DELETE FROM compression_rules WHERE id = ?"))
115            .bind(id)
116            .execute(self.pool.as_ref())
117            .await?;
118        Ok(())
119    }
120
121    /// Prune the lowest-hit rules to keep the table below `max_rules`.
122    ///
123    /// Returns the number of rules deleted.
124    ///
125    /// # Errors
126    ///
127    /// Returns a database error on failure.
128    pub async fn prune_lowest_hits(&self, max_rules: u32) -> Result<u64, zeph_db::SqlxError> {
129        let count: i64 =
130            sqlx::query_scalar(zeph_db::sql!("SELECT COUNT(*) FROM compression_rules"))
131                .fetch_one(self.pool.as_ref())
132                .await?;
133
134        if count <= i64::from(max_rules) {
135            return Ok(0);
136        }
137
138        let to_delete = count - i64::from(max_rules);
139        let result = sqlx::query(zeph_db::sql!(
140            "DELETE FROM compression_rules WHERE id IN \
141             (SELECT id FROM compression_rules ORDER BY hit_count ASC LIMIT ?)"
142        ))
143        .bind(to_delete)
144        .execute(self.pool.as_ref())
145        .await?;
146
147        Ok(result.rows_affected())
148    }
149}