zeph_tools/compression/
store.rs1use std::sync::Arc;
7
8use zeph_db::DbPool;
9
10#[derive(Debug, Clone, sqlx::FromRow)]
12pub struct CompressionRule {
13 pub id: String,
15 pub tool_glob: Option<String>,
17 pub pattern: String,
19 pub replacement_template: String,
21 pub hit_count: i64,
23 pub source: String,
25 pub created_at: String,
27}
28
29#[derive(Clone)]
35pub struct CompressionRuleStore {
36 pool: Arc<DbPool>,
37}
38
39impl CompressionRuleStore {
40 #[must_use]
42 pub fn new(pool: Arc<DbPool>) -> Self {
43 Self { pool }
44 }
45
46 pub async fn list_active(&self) -> Result<Vec<CompressionRule>, zeph_db::SqlxError> {
52 sqlx::query_as(zeph_db::sql!(
53 "SELECT id, tool_glob, pattern, replacement_template, hit_count, source, created_at \
54 FROM compression_rules ORDER BY hit_count ASC"
55 ))
56 .fetch_all(self.pool.as_ref())
57 .await
58 }
59
60 pub async fn upsert(&self, rule: &CompressionRule) -> Result<(), zeph_db::SqlxError> {
66 sqlx::query(zeph_db::sql!(
67 "INSERT INTO compression_rules \
68 (id, tool_glob, pattern, replacement_template, hit_count, source, created_at) \
69 VALUES (?, ?, ?, ?, ?, ?, ?) \
70 ON CONFLICT(tool_glob, pattern) DO UPDATE SET \
71 replacement_template = excluded.replacement_template, \
72 source = excluded.source"
73 ))
74 .bind(&rule.id)
75 .bind(&rule.tool_glob)
76 .bind(&rule.pattern)
77 .bind(&rule.replacement_template)
78 .bind(rule.hit_count)
79 .bind(&rule.source)
80 .bind(&rule.created_at)
81 .execute(self.pool.as_ref())
82 .await?;
83 Ok(())
84 }
85
86 pub async fn increment_hits(&self, batch: &[(String, u64)]) -> Result<(), zeph_db::SqlxError> {
96 for (id, delta) in batch {
97 sqlx::query(zeph_db::sql!(
98 "UPDATE compression_rules SET hit_count = hit_count + ? WHERE id = ?"
99 ))
100 .bind((*delta).cast_signed())
101 .bind(id.as_str())
102 .execute(self.pool.as_ref())
103 .await?;
104 }
105 Ok(())
106 }
107
108 pub async fn delete(&self, id: &str) -> Result<(), zeph_db::SqlxError> {
114 sqlx::query(zeph_db::sql!("DELETE FROM compression_rules WHERE id = ?"))
115 .bind(id)
116 .execute(self.pool.as_ref())
117 .await?;
118 Ok(())
119 }
120
121 pub async fn prune_lowest_hits(&self, max_rules: u32) -> Result<u64, zeph_db::SqlxError> {
129 let count: i64 =
130 sqlx::query_scalar(zeph_db::sql!("SELECT COUNT(*) FROM compression_rules"))
131 .fetch_one(self.pool.as_ref())
132 .await?;
133
134 if count <= i64::from(max_rules) {
135 return Ok(0);
136 }
137
138 let to_delete = count - i64::from(max_rules);
139 let result = sqlx::query(zeph_db::sql!(
140 "DELETE FROM compression_rules WHERE id IN \
141 (SELECT id FROM compression_rules ORDER BY hit_count ASC LIMIT ?)"
142 ))
143 .bind(to_delete)
144 .execute(self.pool.as_ref())
145 .await?;
146
147 Ok(result.rows_affected())
148 }
149}