Skip to main content

difflore_core/skills/
sweep.rs

1//! Maintenance sweeps for stale or weakly grounded skills.
2//!
3//! The corpus has accumulated a long tail of `conv-review-*` skills that
4//! never get accepted, never get served past the first injection, and
5//! still pay full freight in the top-5 retrieval ranking. Two
6//! complementary sweeps live here:
7//!
8//! 1. [`sweep_stale_skills`] multiplicatively decays `confidence_score`
9//!    on skills that were installed long enough ago to have a track
10//!    record, have NOT been served in the recent window, and have NEVER
11//!    earned an accepted fix. Confidence is a multiplier in the recall
12//!    ranker (`context::retrieval::rules`), so halving it ~halves the
13//!    chunk's fused score and naturally drops it out of the top 5 over
14//!    two sweeps without deleting anything.
15//!
16//! 2. [`quarantine_unguided_conv_reviews`] flips `conv-review-*` skills
17//!    that have neither `file_patterns` nor a `trigger` to `status =
18//!    'pending'`. These are the worst offenders: imported from a PR
19//!    comment with no grounding so the embedding has to do all the
20//!    work, and the embedding usually loses to a real rule. Pending
21//!    status keeps the row (it can be promoted back by a human or an
22//!    accept event) but removes it from active recall.
23//!
24//! Hard constraints: no schema changes, no DELETEs, all writes wrapped
25//! in a single sqlx transaction so a partial sweep can't half-decay the
26//! corpus.
27
28use serde::Serialize;
29use sqlx::SqlitePool;
30
31use crate::Result;
32
33/// Tuning knobs for [`sweep_stale_skills`].
34/// Defaults: 14-day install/serve windows, x0.5 decay, 0.05 floor.
35#[derive(Debug, Clone, Copy)]
36pub struct SweepOpts {
37    /// Skip skills installed within the last `stale_install_days` —
38    /// they haven't had a fair chance to be served yet.
39    pub stale_install_days: u32,
40    /// A skill counts as "active" if any `mcp_rule_serves` row inside
41    /// the last `stale_serve_days` references it via `rule_ids_json`.
42    pub stale_serve_days: u32,
43    /// Multiplier applied to `confidence_score` per sweep. Defaults to
44    /// 0.5 so two sweeps drop a 0.6 base below the 0.15 cutoff.
45    pub decay_factor: f32,
46    /// When true, compute the decay plan without committing.
47    pub dry_run: bool,
48    /// Confidence floor — we never drive below this. Rule eviction is
49    /// a separate concern handled by other tooling.
50    pub min_floor: f32,
51}
52
53impl Default for SweepOpts {
54    fn default() -> Self {
55        Self {
56            stale_install_days: 14,
57            stale_serve_days: 14,
58            decay_factor: 0.5,
59            dry_run: false,
60            min_floor: 0.05,
61        }
62    }
63}
64
65/// Outcome of one [`sweep_stale_skills`] pass. Counts are best-effort
66/// snapshots taken during the same transaction as the writes (or the
67/// dry-run preview, if `dry_run` was set).
68#[derive(Debug, Clone, Serialize)]
69#[serde(rename_all = "camelCase")]
70pub struct SweepReport {
71    /// Total skills considered by the sweep (`installed_at < cutoff`).
72    pub examined: u64,
73    /// Skills whose confidence was actually multiplied by `decay_factor`.
74    pub decayed: u64,
75    /// Skills skipped because they were served inside the recent window
76    /// or had an accepted `fix_outcomes` row.
77    pub skipped_because_active: u64,
78    /// Skills already at or below `min_floor` — left untouched so the
79    /// floor stays meaningful.
80    pub skipped_because_already_at_floor: u64,
81    /// Echo of [`SweepOpts::dry_run`] so JSON readers can confirm
82    /// whether the report represents a real or simulated pass.
83    pub dry_run: bool,
84}
85
86/// Decay `confidence_score` on truly-stale skills. See module docs.
87pub async fn sweep_stale_skills(pool: &SqlitePool, opts: SweepOpts) -> Result<SweepReport> {
88    let install_window = format!("-{} days", opts.stale_install_days);
89    let serve_window = format!("-{} days", opts.stale_serve_days);
90
91    // Single SQL pass identifies the decay candidates. We compute the
92    // bucket counts on the same snapshot so the report numbers reconcile
93    // with what we'd actually UPDATE. We avoid sqlx::query! macros here
94    // because the compile-time cache would have to be regenerated.
95    //
96    // Bucket logic, evaluated per stale-installed skill:
97    //   - "active" → served in the last `stale_serve_days` OR has an
98    //     accepted fix_outcomes row
99    //   - "at_floor" → confidence_score <= min_floor (cheap short-circuit)
100    //   - "decay" → everything else: the actual targets
101    // Cross-join expands rule_ids_json (a JSON array of skill ids) into
102    // one row per id; SELECT value pulls the scalar element out. Matches
103    // the json_each idiom used elsewhere (e.g. mcp_server::server,
104    // context::index_db::queries) so SQLite's json1 builds the right
105    // implicit text-affinity comparison against skills.id.
106    let stale_serve_subquery = "id IN (\
107        SELECT value FROM mcp_rule_serves, json_each(rule_ids_json) \
108        WHERE served_at > datetime('now', ?2)\
109    )";
110    let accepted_subquery =
111        "id IN (SELECT rule_id FROM fix_outcomes WHERE rule_id IS NOT NULL AND accepted = 1)";
112
113    // We materialise the candidate ids so the dry-run preview and the
114    // real UPDATE both see the same snapshot. Local DBs we've measured
115    // have ~2K stale candidates so a fetch_all is fine.
116    let candidates_sql = format!(
117        "SELECT id, confidence_score FROM skills \
118         WHERE installed_at < datetime('now', ?1) \
119           AND NOT ({stale_serve_subquery}) \
120           AND NOT ({accepted_subquery})"
121    );
122
123    let examined: i64 =
124        sqlx::query_scalar("SELECT COUNT(*) FROM skills WHERE installed_at < datetime('now', ?1)")
125            .bind(&install_window)
126            .fetch_one(pool)
127            .await?;
128
129    let rows: Vec<(String, f64)> = sqlx::query_as(&candidates_sql)
130        .bind(&install_window)
131        .bind(&serve_window)
132        .fetch_all(pool)
133        .await?;
134
135    // Partition: anything above floor is a decay target; floor-or-below
136    // is a skip. Cast to f64 because sqlx hands back the column as f64.
137    let floor = f64::from(opts.min_floor);
138    let to_decay: Vec<&(String, f64)> = rows.iter().filter(|(_, c)| *c > floor).collect();
139    let at_floor_len = rows.len() - to_decay.len();
140
141    let decayed_count = u64::try_from(to_decay.len()).unwrap_or(u64::MAX);
142    let at_floor_count = u64::try_from(at_floor_len).unwrap_or(u64::MAX);
143    let examined_u64 = u64::try_from(examined).unwrap_or(0);
144    // "active" = stale-installed minus the candidates we did pull.
145    let skipped_active = examined_u64.saturating_sub(decayed_count + at_floor_count);
146
147    let report = SweepReport {
148        examined: examined_u64,
149        decayed: decayed_count,
150        skipped_because_active: skipped_active,
151        skipped_because_already_at_floor: at_floor_count,
152        dry_run: opts.dry_run,
153    };
154
155    if opts.dry_run || to_decay.is_empty() {
156        return Ok(report);
157    }
158
159    // Real write path: single transaction so a mid-batch failure leaves
160    // the corpus consistent.
161    let factor = f64::from(opts.decay_factor);
162    let mut tx = pool.begin().await?;
163    for (id, conf) in &to_decay {
164        let new_conf = (conf * factor).max(floor);
165        sqlx::query(
166            "UPDATE skills SET confidence_score = ?1, updated_at = datetime('now') WHERE id = ?2",
167        )
168        .bind(new_conf)
169        .bind(id)
170        .execute(&mut *tx)
171        .await?;
172    }
173    tx.commit().await?;
174
175    Ok(report)
176}
177
178/// Outcome of [`quarantine_unguided_conv_reviews`]. `flipped_ids` is the
179/// exact list of skill ids that moved from `active` to `pending` —
180/// useful for both audit logs and undo scripts.
181#[derive(Debug, Clone, Serialize)]
182#[serde(rename_all = "camelCase")]
183pub struct QuarantineReport {
184    pub examined: u64,
185    pub flipped: u64,
186    pub flipped_ids: Vec<String>,
187    pub dry_run: bool,
188}
189
190/// Move `conv-review-*` skills with neither `file_patterns` nor a
191/// `trigger` to `status='pending'`. See module docs for the rationale.
192pub async fn quarantine_unguided_conv_reviews(
193    pool: &SqlitePool,
194    dry_run: bool,
195) -> Result<QuarantineReport> {
196    // file_patterns is TEXT NULLable — treat NULL, "", and "[]" as
197    // "unguided". trigger is TEXT NULLable — treat NULL and "" the same.
198    let candidates: Vec<String> = sqlx::query_scalar(
199        "SELECT id FROM skills \
200         WHERE id LIKE 'conv-review-%' \
201           AND status = 'active' \
202           AND (file_patterns IS NULL OR file_patterns = '' OR file_patterns = '[]') \
203           AND (trigger IS NULL OR trigger = '')",
204    )
205    .fetch_all(pool)
206    .await?;
207
208    let examined_total: i64 = sqlx::query_scalar(
209        "SELECT COUNT(*) FROM skills WHERE id LIKE 'conv-review-%' AND status = 'active'",
210    )
211    .fetch_one(pool)
212    .await?;
213
214    let report = QuarantineReport {
215        examined: u64::try_from(examined_total).unwrap_or(0),
216        flipped: u64::try_from(candidates.len()).unwrap_or(u64::MAX),
217        flipped_ids: candidates.clone(),
218        dry_run,
219    };
220
221    if dry_run || candidates.is_empty() {
222        return Ok(report);
223    }
224
225    let mut tx = pool.begin().await?;
226    for id in &candidates {
227        sqlx::query(
228            "UPDATE skills SET status = 'pending', updated_at = datetime('now') WHERE id = ?1",
229        )
230        .bind(id)
231        .execute(&mut *tx)
232        .await?;
233    }
234    tx.commit().await?;
235
236    Ok(report)
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use sqlx::sqlite::SqlitePoolOptions;
243
244    async fn fresh_pool() -> SqlitePool {
245        let pool = SqlitePoolOptions::new()
246            .max_connections(1)
247            .connect("sqlite::memory:")
248            .await
249            .unwrap();
250        // Minimal schema covering only what the sweep touches.
251        sqlx::query(
252            "CREATE TABLE skills (
253                id TEXT PRIMARY KEY NOT NULL,
254                name TEXT NOT NULL DEFAULT '',
255                source TEXT NOT NULL DEFAULT '',
256                directory TEXT NOT NULL DEFAULT '',
257                version TEXT NOT NULL DEFAULT '',
258                description TEXT NOT NULL DEFAULT '',
259                type TEXT NOT NULL DEFAULT 'skill',
260                engines TEXT NOT NULL DEFAULT '[]',
261                tags TEXT NOT NULL DEFAULT '[]',
262                trigger TEXT,
263                check_prompt TEXT,
264                repo_owner TEXT,
265                repo_name TEXT,
266                repo_branch TEXT,
267                readme_url TEXT,
268                source_repo TEXT,
269                enabled_for_codex INTEGER NOT NULL DEFAULT 0,
270                enabled_for_claude INTEGER NOT NULL DEFAULT 0,
271                enabled_for_gemini INTEGER NOT NULL DEFAULT 0,
272                enabled_for_cursor INTEGER NOT NULL DEFAULT 0,
273                confidence_score REAL NOT NULL DEFAULT 0.7,
274                file_patterns TEXT,
275                origin TEXT NOT NULL DEFAULT 'manual',
276                content_hash TEXT,
277                hash_created_at INTEGER,
278                status TEXT NOT NULL DEFAULT 'active',
279                installed_at TEXT NOT NULL DEFAULT (datetime('now')),
280                updated_at TEXT NOT NULL DEFAULT (datetime('now'))
281            )",
282        )
283        .execute(&pool)
284        .await
285        .unwrap();
286        sqlx::query(
287            "CREATE TABLE mcp_rule_serves (
288                id INTEGER PRIMARY KEY AUTOINCREMENT,
289                rule_ids_json TEXT NOT NULL DEFAULT '[]',
290                served_at TEXT NOT NULL DEFAULT (datetime('now'))
291            )",
292        )
293        .execute(&pool)
294        .await
295        .unwrap();
296        sqlx::query(
297            "CREATE TABLE fix_outcomes (
298                id TEXT PRIMARY KEY NOT NULL,
299                rule_id TEXT,
300                rule_name TEXT NOT NULL DEFAULT '',
301                accepted INTEGER NOT NULL DEFAULT 0,
302                created_at TEXT NOT NULL DEFAULT (datetime('now'))
303            )",
304        )
305        .execute(&pool)
306        .await
307        .unwrap();
308        pool
309    }
310
311    /// Insert a skill with `installed_at = datetime('now', age_modifier)`.
312    /// `age_modifier` is a SQLite modifier like "-20 days" or "-2 days".
313    async fn insert_skill(
314        pool: &SqlitePool,
315        id: &str,
316        confidence: f64,
317        age_modifier: &str,
318        file_patterns: Option<&str>,
319        trigger: Option<&str>,
320    ) {
321        sqlx::query(
322            "INSERT INTO skills (id, name, confidence_score, installed_at, file_patterns, trigger) \
323             VALUES (?1, ?1, ?2, datetime('now', ?3), ?4, ?5)",
324        )
325        .bind(id)
326        .bind(confidence)
327        .bind(age_modifier)
328        .bind(file_patterns)
329        .bind(trigger)
330        .execute(pool)
331        .await
332        .unwrap();
333    }
334
335    async fn confidence(pool: &SqlitePool, id: &str) -> f64 {
336        sqlx::query_scalar("SELECT confidence_score FROM skills WHERE id = ?1")
337            .bind(id)
338            .fetch_one(pool)
339            .await
340            .unwrap()
341    }
342
343    /// Tolerant float comparison so clippy's `float_cmp` lint stays
344    /// satisfied while keeping the assertion intent obvious.
345    fn approx_eq(a: f64, b: f64) -> bool {
346        (a - b).abs() < 1e-6
347    }
348
349    async fn status(pool: &SqlitePool, id: &str) -> String {
350        sqlx::query_scalar("SELECT status FROM skills WHERE id = ?1")
351            .bind(id)
352            .fetch_one(pool)
353            .await
354            .unwrap()
355    }
356
357    fn opts() -> SweepOpts {
358        SweepOpts::default()
359    }
360
361    #[tokio::test]
362    async fn sweep_only_decays_stale_never_served_with_no_accept() {
363        let pool = fresh_pool().await;
364        // 1. Fresh (<14d): should be skipped (not even examined).
365        insert_skill(&pool, "fresh", 0.7, "-2 days", None, None).await;
366        // 2. Stale & never served: the only legitimate decay target.
367        insert_skill(&pool, "stale-quiet", 0.7, "-20 days", None, None).await;
368        // 3. Stale but served 5 days ago: counts as "active".
369        insert_skill(&pool, "stale-served", 0.7, "-20 days", None, None).await;
370        sqlx::query(
371            "INSERT INTO mcp_rule_serves (rule_ids_json, served_at) \
372             VALUES (?1, datetime('now', '-5 days'))",
373        )
374        .bind(r#"["stale-served"]"#)
375        .execute(&pool)
376        .await
377        .unwrap();
378        // 4. Stale with accepted fix: counts as "active".
379        insert_skill(&pool, "stale-accepted", 0.7, "-20 days", None, None).await;
380        sqlx::query(
381            "INSERT INTO fix_outcomes (id, rule_id, rule_name, accepted) \
382             VALUES ('fo-1', 'stale-accepted', 'stale-accepted', 1)",
383        )
384        .execute(&pool)
385        .await
386        .unwrap();
387        // 5. Stale already at floor: skipped because at floor.
388        insert_skill(&pool, "stale-floor", 0.05, "-20 days", None, None).await;
389
390        let report = sweep_stale_skills(&pool, opts()).await.unwrap();
391
392        assert!(approx_eq(confidence(&pool, "fresh").await, 0.7));
393        // 0.7 * 0.5 = 0.35
394        assert!(approx_eq(confidence(&pool, "stale-quiet").await, 0.35));
395        assert!(approx_eq(confidence(&pool, "stale-served").await, 0.7));
396        assert!(approx_eq(confidence(&pool, "stale-accepted").await, 0.7));
397        assert!(approx_eq(confidence(&pool, "stale-floor").await, 0.05));
398
399        assert_eq!(report.decayed, 1);
400        assert_eq!(report.skipped_because_already_at_floor, 1);
401        // examined = 4 stale skills (fresh excluded by install window)
402        assert_eq!(report.examined, 4);
403        // 4 examined - 1 decayed - 1 at_floor = 2 active
404        assert_eq!(report.skipped_because_active, 2);
405        assert!(!report.dry_run);
406    }
407
408    #[tokio::test]
409    async fn sweep_dry_run_does_not_commit() {
410        let pool = fresh_pool().await;
411        insert_skill(&pool, "fresh", 0.7, "-2 days", None, None).await;
412        insert_skill(&pool, "stale-quiet", 0.7, "-20 days", None, None).await;
413        insert_skill(&pool, "stale-served", 0.7, "-20 days", None, None).await;
414        sqlx::query(
415            "INSERT INTO mcp_rule_serves (rule_ids_json, served_at) \
416             VALUES (?1, datetime('now', '-5 days'))",
417        )
418        .bind(r#"["stale-served"]"#)
419        .execute(&pool)
420        .await
421        .unwrap();
422        insert_skill(&pool, "stale-accepted", 0.7, "-20 days", None, None).await;
423        sqlx::query(
424            "INSERT INTO fix_outcomes (id, rule_id, rule_name, accepted) \
425             VALUES ('fo-1', 'stale-accepted', 'stale-accepted', 1)",
426        )
427        .execute(&pool)
428        .await
429        .unwrap();
430        insert_skill(&pool, "stale-floor", 0.05, "-20 days", None, None).await;
431
432        let dry = SweepOpts {
433            dry_run: true,
434            ..SweepOpts::default()
435        };
436        let report = sweep_stale_skills(&pool, dry).await.unwrap();
437
438        // The report still reflects what *would* happen…
439        assert_eq!(report.decayed, 1);
440        assert!(report.dry_run);
441        // …but NO row was actually updated.
442        assert!(approx_eq(confidence(&pool, "fresh").await, 0.7));
443        assert!(approx_eq(confidence(&pool, "stale-quiet").await, 0.7));
444        assert!(approx_eq(confidence(&pool, "stale-served").await, 0.7));
445        assert!(approx_eq(confidence(&pool, "stale-accepted").await, 0.7));
446        assert!(approx_eq(confidence(&pool, "stale-floor").await, 0.05));
447    }
448
449    #[tokio::test]
450    async fn quarantine_flips_only_unguided_conv_reviews() {
451        let pool = fresh_pool().await;
452        // 1. conv-review with file_patterns: keep active.
453        insert_skill(
454            &pool,
455            "conv-review-1",
456            0.6,
457            "-1 days",
458            Some(r#"["**/*.rs"]"#),
459            None,
460        )
461        .await;
462        // 2. conv-review with neither: quarantine.
463        insert_skill(&pool, "conv-review-2", 0.6, "-1 days", None, None).await;
464        // 3. conv-review with trigger only: keep active.
465        insert_skill(
466            &pool,
467            "conv-review-3",
468            0.6,
469            "-1 days",
470            None,
471            Some("when editing"),
472        )
473        .await;
474
475        let report = quarantine_unguided_conv_reviews(&pool, false)
476            .await
477            .unwrap();
478
479        assert_eq!(report.flipped, 1);
480        assert_eq!(report.flipped_ids, vec!["conv-review-2".to_owned()]);
481        assert_eq!(status(&pool, "conv-review-1").await, "active");
482        assert_eq!(status(&pool, "conv-review-2").await, "pending");
483        assert_eq!(status(&pool, "conv-review-3").await, "active");
484    }
485
486    #[tokio::test]
487    async fn decay_is_bounded_by_min_floor() {
488        let pool = fresh_pool().await;
489        // Confidence just above floor — one decay step would push it
490        // below if we didn't clamp.
491        insert_skill(&pool, "just-above-floor", 0.06, "-20 days", None, None).await;
492
493        let report = sweep_stale_skills(&pool, opts()).await.unwrap();
494        assert_eq!(report.decayed, 1);
495
496        let new_conf = confidence(&pool, "just-above-floor").await;
497        // 0.06 * 0.5 = 0.03, clamped to 0.05 floor.
498        assert!(
499            (new_conf - 0.05).abs() < 1e-6,
500            "expected floor clamp at 0.05, got {new_conf}"
501        );
502        // And running again should hit the at_floor short-circuit.
503        let report2 = sweep_stale_skills(&pool, opts()).await.unwrap();
504        assert_eq!(report2.decayed, 0);
505        assert_eq!(report2.skipped_because_already_at_floor, 1);
506    }
507}