Skip to main content

zeph_memory/store/
skills.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9#[derive(Debug)]
10pub struct SkillUsageRow {
11    pub skill_name: String,
12    pub invocation_count: i64,
13    pub last_used_at: String,
14}
15
16#[derive(Debug)]
17pub struct SkillMetricsRow {
18    pub skill_name: String,
19    pub version_id: Option<i64>,
20    pub total: i64,
21    pub successes: i64,
22    pub failures: i64,
23}
24
25#[derive(Debug)]
26pub struct SkillVersionRow {
27    pub id: i64,
28    pub skill_name: String,
29    pub version: i64,
30    pub body: String,
31    pub description: String,
32    pub source: String,
33    pub is_active: bool,
34    pub success_count: i64,
35    pub failure_count: i64,
36    pub created_at: String,
37}
38
39type SkillVersionTuple = (
40    i64,
41    String,
42    i64,
43    String,
44    String,
45    String,
46    i64,
47    i64,
48    i64,
49    String,
50);
51
52fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
53    SkillVersionRow {
54        id: t.0,
55        skill_name: t.1,
56        version: t.2,
57        body: t.3,
58        description: t.4,
59        source: t.5,
60        is_active: t.6 != 0,
61        success_count: t.7,
62        failure_count: t.8,
63        created_at: t.9,
64    }
65}
66
67impl SqliteStore {
68    /// Record usage of skills (UPSERT: increment count and update timestamp).
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the database operation fails.
73    pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
74        for name in skill_names {
75            zeph_db::query(sql!(
76                "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
77                 VALUES (?, 1, CURRENT_TIMESTAMP) \
78                 ON CONFLICT(skill_name) DO UPDATE SET \
79                 invocation_count = invocation_count + 1, \
80                 last_used_at = CURRENT_TIMESTAMP"
81            ))
82            .bind(name)
83            .execute(&self.pool)
84            .await?;
85        }
86        Ok(())
87    }
88
89    /// Load all skill usage statistics.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the query fails.
94    pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
95        let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
96            "SELECT skill_name, invocation_count, last_used_at \
97             FROM skill_usage ORDER BY invocation_count DESC"
98        ))
99        .fetch_all(&self.pool)
100        .await?;
101
102        Ok(rows
103            .into_iter()
104            .map(
105                |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
106                    skill_name,
107                    invocation_count,
108                    last_used_at,
109                },
110            )
111            .collect())
112    }
113
114    /// Record a skill outcome event.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the insert fails.
119    pub async fn record_skill_outcome(
120        &self,
121        skill_name: &str,
122        version_id: Option<i64>,
123        conversation_id: Option<crate::types::ConversationId>,
124        outcome: &str,
125        error_context: Option<&str>,
126        outcome_detail: Option<&str>,
127    ) -> Result<(), MemoryError> {
128        zeph_db::query(sql!(
129            "INSERT INTO skill_outcomes \
130             (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
131             VALUES (?, ?, ?, ?, ?, ?)"
132        ))
133        .bind(skill_name)
134        .bind(version_id)
135        .bind(conversation_id)
136        .bind(outcome)
137        .bind(error_context)
138        .bind(outcome_detail)
139        .execute(&self.pool)
140        .await?;
141        Ok(())
142    }
143
144    /// Record outcomes for multiple skills in a single transaction.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if any insert fails (whole batch is rolled back).
149    pub async fn record_skill_outcomes_batch(
150        &self,
151        skill_names: &[String],
152        conversation_id: Option<crate::types::ConversationId>,
153        outcome: &str,
154        error_context: Option<&str>,
155        outcome_detail: Option<&str>,
156    ) -> Result<(), MemoryError> {
157        // Acquire the write lock up front to avoid DEFERRED read->write upgrades
158        // failing with SQLITE_BUSY_SNAPSHOT under concurrent WAL writers.
159        let mut tx = begin_write(&self.pool).await?;
160
161        let mut version_map: std::collections::HashMap<String, Option<i64>> =
162            std::collections::HashMap::new();
163        for name in skill_names {
164            let vid: Option<(i64,)> = zeph_db::query_as(sql!(
165                "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
166            ))
167            .bind(name)
168            .fetch_optional(&mut *tx)
169            .await?;
170            version_map.insert(name.clone(), vid.map(|r| r.0));
171        }
172
173        for name in skill_names {
174            let version_id = version_map.get(name.as_str()).copied().flatten();
175            zeph_db::query(sql!(
176                "INSERT INTO skill_outcomes \
177                 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
178                 VALUES (?, ?, ?, ?, ?, ?)"
179            ))
180            .bind(name)
181            .bind(version_id)
182            .bind(conversation_id)
183            .bind(outcome)
184            .bind(error_context)
185            .bind(outcome_detail)
186            .execute(&mut *tx)
187            .await?;
188        }
189        tx.commit().await?;
190        Ok(())
191    }
192
193    /// Load metrics for a skill (latest version group).
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the query fails.
198    pub async fn skill_metrics(
199        &self,
200        skill_name: &str,
201    ) -> Result<Option<SkillMetricsRow>, MemoryError> {
202        let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
203            "SELECT skill_name, version_id, \
204             COUNT(*) as total, \
205             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
206             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
207             FROM skill_outcomes WHERE skill_name = ? \
208             AND outcome NOT IN ('user_approval', 'user_rejection') \
209             GROUP BY skill_name, version_id \
210             ORDER BY version_id DESC LIMIT 1"
211        ))
212        .bind(skill_name)
213        .fetch_optional(&self.pool)
214        .await?;
215
216        Ok(row.map(
217            |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
218                skill_name,
219                version_id,
220                total,
221                successes,
222                failures,
223            },
224        ))
225    }
226
227    /// Load all skill outcome stats grouped by skill name.
228    ///
229    /// # Errors
230    ///
231    /// Returns an error if the query fails.
232    pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
233        let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234            "SELECT skill_name, version_id, \
235             COUNT(*) as total, \
236             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238             FROM skill_outcomes \
239             GROUP BY skill_name \
240             ORDER BY total DESC"
241        ))
242        .fetch_all(&self.pool)
243        .await?;
244
245        Ok(rows
246            .into_iter()
247            .map(
248                |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249                    skill_name,
250                    version_id,
251                    total,
252                    successes,
253                    failures,
254                },
255            )
256            .collect())
257    }
258
259    /// Save a new skill version and return its ID.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the insert fails.
264    #[allow(clippy::too_many_arguments)]
265    pub async fn save_skill_version(
266        &self,
267        skill_name: &str,
268        version: i64,
269        body: &str,
270        description: &str,
271        source: &str,
272        error_context: Option<&str>,
273        predecessor_id: Option<i64>,
274    ) -> Result<i64, MemoryError> {
275        let row: (i64,) = zeph_db::query_as(sql!(
276            "INSERT INTO skill_versions \
277             (skill_name, version, body, description, source, error_context, predecessor_id) \
278             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
279        ))
280        .bind(skill_name)
281        .bind(version)
282        .bind(body)
283        .bind(description)
284        .bind(source)
285        .bind(error_context)
286        .bind(predecessor_id)
287        .fetch_one(&self.pool)
288        .await?;
289        Ok(row.0)
290    }
291
292    /// Count the number of distinct conversation sessions in which a skill produced an outcome.
293    ///
294    /// Uses `COUNT(DISTINCT conversation_id)` from `skill_outcomes`. Rows where
295    /// `conversation_id IS NULL` are excluded (legacy rows without session tracking).
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the query fails.
300    pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
301        let row: (i64,) = zeph_db::query_as(sql!(
302            "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
303             WHERE skill_name = ? AND conversation_id IS NOT NULL"
304        ))
305        .bind(skill_name)
306        .fetch_one(&self.pool)
307        .await?;
308        Ok(row.0)
309    }
310
311    /// Load the active version for a skill.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the query fails.
316    pub async fn active_skill_version(
317        &self,
318        skill_name: &str,
319    ) -> Result<Option<SkillVersionRow>, MemoryError> {
320        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
321            "SELECT id, skill_name, version, body, description, source, \
322                 is_active, success_count, failure_count, created_at \
323                 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
324        ))
325        .bind(skill_name)
326        .fetch_optional(&self.pool)
327        .await?;
328
329        Ok(row.map(skill_version_from_tuple))
330    }
331
332    /// Activate a specific version (deactivates others for the same skill).
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the update fails.
337    pub async fn activate_skill_version(
338        &self,
339        skill_name: &str,
340        version_id: i64,
341    ) -> Result<(), MemoryError> {
342        let mut tx = begin_write(&self.pool).await?;
343
344        zeph_db::query(sql!(
345            "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
346        ))
347        .bind(skill_name)
348        .execute(&mut *tx)
349        .await?;
350
351        zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
352            .bind(version_id)
353            .execute(&mut *tx)
354            .await?;
355
356        tx.commit().await?;
357        Ok(())
358    }
359
360    /// Get the next version number for a skill.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the query fails.
365    pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
366        let row: (i64,) = zeph_db::query_as(sql!(
367            "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
368        ))
369        .bind(skill_name)
370        .fetch_one(&self.pool)
371        .await?;
372        Ok(row.0)
373    }
374
375    /// Get the latest auto-generated version's `created_at` for cooldown check.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the query fails.
380    pub async fn last_improvement_time(
381        &self,
382        skill_name: &str,
383    ) -> Result<Option<String>, MemoryError> {
384        let row: Option<(String,)> = zeph_db::query_as(sql!(
385            "SELECT created_at FROM skill_versions \
386             WHERE skill_name = ? AND source = 'auto' \
387             ORDER BY id DESC LIMIT 1"
388        ))
389        .bind(skill_name)
390        .fetch_optional(&self.pool)
391        .await?;
392        Ok(row.map(|r| r.0))
393    }
394
395    /// Ensure a base (v1 manual) version exists for a skill. Idempotent.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the DB operation fails.
400    pub async fn ensure_skill_version_exists(
401        &self,
402        skill_name: &str,
403        body: &str,
404        description: &str,
405    ) -> Result<(), MemoryError> {
406        let existing: Option<(i64,)> = zeph_db::query_as(sql!(
407            "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
408        ))
409        .bind(skill_name)
410        .fetch_optional(&self.pool)
411        .await?;
412
413        if existing.is_none() {
414            let id = self
415                .save_skill_version(skill_name, 1, body, description, "manual", None, None)
416                .await?;
417            self.activate_skill_version(skill_name, id).await?;
418        }
419        Ok(())
420    }
421
422    /// Load all versions for a skill, ordered by version number.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if the query fails.
427    pub async fn load_skill_versions(
428        &self,
429        skill_name: &str,
430    ) -> Result<Vec<SkillVersionRow>, MemoryError> {
431        let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
432            "SELECT id, skill_name, version, body, description, source, \
433                 is_active, success_count, failure_count, created_at \
434                 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
435        ))
436        .bind(skill_name)
437        .fetch_all(&self.pool)
438        .await?;
439
440        Ok(rows.into_iter().map(skill_version_from_tuple).collect())
441    }
442
443    /// Count auto-generated versions for a skill.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if the query fails.
448    pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
449        let row: (i64,) = zeph_db::query_as(sql!(
450            "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
451        ))
452        .bind(skill_name)
453        .fetch_one(&self.pool)
454        .await?;
455        Ok(row.0)
456    }
457
458    /// Delete oldest non-active auto versions exceeding max limit.
459    /// Returns the number of pruned versions.
460    ///
461    /// # Errors
462    ///
463    /// Returns an error if the delete fails.
464    pub async fn prune_skill_versions(
465        &self,
466        skill_name: &str,
467        max_versions: u32,
468    ) -> Result<u32, MemoryError> {
469        let result = zeph_db::query(sql!(
470            "DELETE FROM skill_versions WHERE id IN (\
471                SELECT id FROM skill_versions \
472                WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
473                ORDER BY id ASC \
474                LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
475                    WHERE skill_name = ? AND source = 'auto') - ?)\
476            )"
477        ))
478        .bind(skill_name)
479        .bind(skill_name)
480        .bind(max_versions)
481        .execute(&self.pool)
482        .await?;
483        Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
484    }
485
486    /// Get the predecessor version for rollback.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the query fails.
491    pub async fn predecessor_version(
492        &self,
493        version_id: i64,
494    ) -> Result<Option<SkillVersionRow>, MemoryError> {
495        let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
496            "SELECT predecessor_id FROM skill_versions WHERE id = ?"
497        ))
498        .bind(version_id)
499        .fetch_optional(&self.pool)
500        .await?;
501
502        let Some((Some(pid),)) = pred_id else {
503            return Ok(None);
504        };
505
506        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
507            "SELECT id, skill_name, version, body, description, source, \
508                 is_active, success_count, failure_count, created_at \
509                 FROM skill_versions WHERE id = ?"
510        ))
511        .bind(pid)
512        .fetch_optional(&self.pool)
513        .await?;
514
515        Ok(row.map(skill_version_from_tuple))
516    }
517
518    /// Return the skill names for all currently active auto-generated versions.
519    ///
520    /// Used to check rollback eligibility at the start of each agent turn.
521    ///
522    /// # Errors
523    /// Returns [`MemoryError`] on `SQLite` query failure.
524    pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
525        let rows: Vec<(String,)> = zeph_db::query_as(sql!(
526            "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
527        ))
528        .fetch_all(&self.pool)
529        .await?;
530        Ok(rows.into_iter().map(|(name,)| name).collect())
531    }
532
533    // --- STEM: skill_usage_log queries ---
534
535    /// Insert a tool usage log entry.
536    ///
537    /// `tool_sequence` must be a normalized compact JSON array (see `stem::normalize_tool_sequence`).
538    /// `sequence_hash` is the 16-char blake3 hex of `tool_sequence`.
539    ///
540    /// # Errors
541    /// Returns [`MemoryError`] on insert failure.
542    pub async fn insert_tool_usage_log(
543        &self,
544        tool_sequence: &str,
545        sequence_hash: &str,
546        context_hash: &str,
547        outcome: &str,
548        conversation_id: Option<crate::types::ConversationId>,
549    ) -> Result<(), MemoryError> {
550        zeph_db::query(sql!(
551            "INSERT INTO skill_usage_log \
552             (tool_sequence, sequence_hash, context_hash, outcome, conversation_id) \
553             VALUES (?, ?, ?, ?, ?)"
554        ))
555        .bind(tool_sequence)
556        .bind(sequence_hash)
557        .bind(context_hash)
558        .bind(outcome)
559        .bind(conversation_id)
560        .execute(&self.pool)
561        .await?;
562        Ok(())
563    }
564
565    /// Find tool sequences that have been seen at least `min_count` times within the last
566    /// `window_days` days.
567    ///
568    /// Returns `(tool_sequence, sequence_hash, occurrence_count, success_count)` tuples.
569    ///
570    /// # Errors
571    /// Returns [`MemoryError`] on query failure.
572    pub async fn find_recurring_patterns(
573        &self,
574        min_count: u32,
575        window_days: u32,
576    ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
577        let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
578            "SELECT tool_sequence, sequence_hash, \
579                    COUNT(*) as occurrence_count, \
580                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
581             FROM skill_usage_log \
582             WHERE created_at > datetime('now', '-' || ? || ' days') \
583             GROUP BY sequence_hash \
584             HAVING occurrence_count >= ? \
585             ORDER BY occurrence_count DESC \
586             LIMIT 10"
587        ))
588        .bind(window_days)
589        .bind(min_count)
590        .fetch_all(&self.pool)
591        .await?;
592
593        Ok(rows
594            .into_iter()
595            .map(|(seq, hash, occ, suc)| {
596                (
597                    seq,
598                    hash,
599                    u32::try_from(occ).unwrap_or(u32::MAX),
600                    u32::try_from(suc).unwrap_or(0),
601                )
602            })
603            .collect())
604    }
605
606    /// Delete `skill_usage_log` rows older than `retention_days` days.
607    ///
608    /// # Errors
609    /// Returns [`MemoryError`] on delete failure.
610    pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
611        let result = zeph_db::query(sql!(
612            "DELETE FROM skill_usage_log \
613             WHERE created_at < datetime('now', '-' || ? || ' days')"
614        ))
615        .bind(retention_days)
616        .execute(&self.pool)
617        .await?;
618        Ok(result.rows_affected())
619    }
620
621    // --- ERL: skill_heuristics queries ---
622
623    /// Insert a new heuristic (no dedup — caller must check first).
624    ///
625    /// # Errors
626    /// Returns [`MemoryError`] on insert failure.
627    pub async fn insert_skill_heuristic(
628        &self,
629        skill_name: Option<&str>,
630        heuristic_text: &str,
631        confidence: f64,
632    ) -> Result<i64, MemoryError> {
633        let row: (i64,) = zeph_db::query_as(sql!(
634            "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) \
635             VALUES (?, ?, ?) RETURNING id"
636        ))
637        .bind(skill_name)
638        .bind(heuristic_text)
639        .bind(confidence)
640        .fetch_one(&self.pool)
641        .await?;
642        Ok(row.0)
643    }
644
645    /// Increment `use_count` and update `updated_at` for an existing heuristic by ID.
646    ///
647    /// # Errors
648    /// Returns [`MemoryError`] on update failure.
649    pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
650        zeph_db::query(sql!(
651            "UPDATE skill_heuristics \
652             SET use_count = use_count + 1, updated_at = datetime('now') \
653             WHERE id = ?"
654        ))
655        .bind(id)
656        .execute(&self.pool)
657        .await?;
658        Ok(())
659    }
660
661    /// Load heuristics for a given skill (exact match + NULL/general), ordered by confidence DESC.
662    ///
663    /// Returns `(id, heuristic_text, confidence, use_count)` tuples.
664    /// At most `limit` rows are returned.
665    ///
666    /// # Errors
667    /// Returns [`MemoryError`] on query failure.
668    pub async fn load_skill_heuristics(
669        &self,
670        skill_name: &str,
671        min_confidence: f64,
672        limit: u32,
673    ) -> Result<Vec<(i64, String, f64, i64)>, MemoryError> {
674        let rows: Vec<(i64, String, f64, i64)> = zeph_db::query_as(sql!(
675            "SELECT id, heuristic_text, confidence, use_count \
676             FROM skill_heuristics \
677             WHERE (skill_name = ? OR skill_name IS NULL) \
678               AND confidence >= ? \
679             ORDER BY confidence DESC \
680             LIMIT ?"
681        ))
682        .bind(skill_name)
683        .bind(min_confidence)
684        .bind(limit)
685        .fetch_all(&self.pool)
686        .await?;
687        Ok(rows)
688    }
689
690    /// Load all heuristics for a skill (for dedup checks), without confidence filter.
691    ///
692    /// Returns `(id, heuristic_text)` tuples.
693    ///
694    /// # Errors
695    /// Returns [`MemoryError`] on query failure.
696    pub async fn load_all_heuristics_for_skill(
697        &self,
698        skill_name: Option<&str>,
699    ) -> Result<Vec<(i64, String)>, MemoryError> {
700        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
701            "SELECT id, heuristic_text FROM skill_heuristics \
702             WHERE (skill_name = ? OR (? IS NULL AND skill_name IS NULL))"
703        ))
704        .bind(skill_name)
705        .bind(skill_name)
706        .fetch_all(&self.pool)
707        .await?;
708        Ok(rows)
709    }
710}
711
712#[cfg(test)]
713mod tests {
714    use std::time::Duration;
715
716    use tokio::time::sleep;
717
718    use super::*;
719
720    async fn test_store() -> SqliteStore {
721        SqliteStore::new(":memory:").await.unwrap()
722    }
723
724    #[tokio::test]
725    async fn record_skill_usage_increments() {
726        let store = test_store().await;
727
728        store.record_skill_usage(&["git"]).await.unwrap();
729        store.record_skill_usage(&["git"]).await.unwrap();
730
731        let usage = store.load_skill_usage().await.unwrap();
732        assert_eq!(usage.len(), 1);
733        assert_eq!(usage[0].skill_name, "git");
734        assert_eq!(usage[0].invocation_count, 2);
735    }
736
737    #[tokio::test]
738    async fn load_skill_usage_returns_all() {
739        let store = test_store().await;
740
741        store.record_skill_usage(&["git", "docker"]).await.unwrap();
742        store.record_skill_usage(&["git"]).await.unwrap();
743
744        let usage = store.load_skill_usage().await.unwrap();
745        assert_eq!(usage.len(), 2);
746        assert_eq!(usage[0].skill_name, "git");
747        assert_eq!(usage[0].invocation_count, 2);
748        assert_eq!(usage[1].skill_name, "docker");
749        assert_eq!(usage[1].invocation_count, 1);
750    }
751
752    #[tokio::test]
753    async fn migration_005_creates_tables() {
754        let store = test_store().await;
755        let pool = store.pool();
756
757        let versions: (i64,) = zeph_db::query_as(sql!(
758            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
759        ))
760        .fetch_one(pool)
761        .await
762        .unwrap();
763        assert_eq!(versions.0, 1);
764
765        let outcomes: (i64,) = zeph_db::query_as(sql!(
766            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
767        ))
768        .fetch_one(pool)
769        .await
770        .unwrap();
771        assert_eq!(outcomes.0, 1);
772    }
773
774    #[tokio::test]
775    async fn record_skill_outcome_inserts() {
776        let store = test_store().await;
777
778        store
779            .record_skill_outcome(
780                "git",
781                None,
782                Some(crate::types::ConversationId(1)),
783                "success",
784                None,
785                None,
786            )
787            .await
788            .unwrap();
789        store
790            .record_skill_outcome(
791                "git",
792                None,
793                Some(crate::types::ConversationId(1)),
794                "tool_failure",
795                Some("exit code 1"),
796                None,
797            )
798            .await
799            .unwrap();
800
801        let metrics = store.skill_metrics("git").await.unwrap().unwrap();
802        assert_eq!(metrics.total, 2);
803        assert_eq!(metrics.successes, 1);
804        assert_eq!(metrics.failures, 1);
805    }
806
807    #[tokio::test]
808    async fn skill_metrics_none_for_unknown() {
809        let store = test_store().await;
810        let m = store.skill_metrics("nonexistent").await.unwrap();
811        assert!(m.is_none());
812    }
813
814    #[tokio::test]
815    async fn load_skill_outcome_stats_grouped() {
816        let store = test_store().await;
817
818        store
819            .record_skill_outcome("git", None, None, "success", None, None)
820            .await
821            .unwrap();
822        store
823            .record_skill_outcome("git", None, None, "tool_failure", None, None)
824            .await
825            .unwrap();
826        store
827            .record_skill_outcome("docker", None, None, "success", None, None)
828            .await
829            .unwrap();
830
831        let stats = store.load_skill_outcome_stats().await.unwrap();
832        assert_eq!(stats.len(), 2);
833        assert_eq!(stats[0].skill_name, "git");
834        assert_eq!(stats[0].total, 2);
835        assert_eq!(stats[1].skill_name, "docker");
836        assert_eq!(stats[1].total, 1);
837    }
838
839    #[tokio::test]
840    async fn save_and_load_skill_version() {
841        let store = test_store().await;
842
843        let id = store
844            .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
845            .await
846            .unwrap();
847        assert!(id > 0);
848
849        store.activate_skill_version("git", id).await.unwrap();
850
851        let active = store.active_skill_version("git").await.unwrap().unwrap();
852        assert_eq!(active.version, 1);
853        assert_eq!(active.body, "body v1");
854        assert!(active.is_active);
855    }
856
857    #[tokio::test]
858    async fn activate_deactivates_previous() {
859        let store = test_store().await;
860
861        let v1 = store
862            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
863            .await
864            .unwrap();
865        store.activate_skill_version("git", v1).await.unwrap();
866
867        let v2 = store
868            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
869            .await
870            .unwrap();
871        store.activate_skill_version("git", v2).await.unwrap();
872
873        let versions = store.load_skill_versions("git").await.unwrap();
874        assert_eq!(versions.len(), 2);
875        assert!(!versions[0].is_active);
876        assert!(versions[1].is_active);
877    }
878
879    #[tokio::test]
880    async fn next_skill_version_increments() {
881        let store = test_store().await;
882
883        let next = store.next_skill_version("git").await.unwrap();
884        assert_eq!(next, 1);
885
886        store
887            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
888            .await
889            .unwrap();
890        let next = store.next_skill_version("git").await.unwrap();
891        assert_eq!(next, 2);
892    }
893
894    #[tokio::test]
895    async fn last_improvement_time_returns_auto_only() {
896        let store = test_store().await;
897
898        store
899            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
900            .await
901            .unwrap();
902
903        let t = store.last_improvement_time("git").await.unwrap();
904        assert!(t.is_none());
905
906        store
907            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
908            .await
909            .unwrap();
910
911        let t = store.last_improvement_time("git").await.unwrap();
912        assert!(t.is_some());
913    }
914
915    #[tokio::test]
916    async fn ensure_skill_version_exists_idempotent() {
917        let store = test_store().await;
918
919        store
920            .ensure_skill_version_exists("git", "body", "Git helper")
921            .await
922            .unwrap();
923        store
924            .ensure_skill_version_exists("git", "body2", "Git helper 2")
925            .await
926            .unwrap();
927
928        let versions = store.load_skill_versions("git").await.unwrap();
929        assert_eq!(versions.len(), 1);
930        assert_eq!(versions[0].body, "body");
931    }
932
933    #[tokio::test]
934    async fn load_skill_versions_ordered() {
935        let store = test_store().await;
936
937        let v1 = store
938            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
939            .await
940            .unwrap();
941        store
942            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
943            .await
944            .unwrap();
945
946        let versions = store.load_skill_versions("git").await.unwrap();
947        assert_eq!(versions.len(), 2);
948        assert_eq!(versions[0].version, 1);
949        assert_eq!(versions[1].version, 2);
950    }
951
952    #[tokio::test]
953    async fn count_auto_versions_only() {
954        let store = test_store().await;
955
956        store
957            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
958            .await
959            .unwrap();
960        store
961            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
962            .await
963            .unwrap();
964        store
965            .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
966            .await
967            .unwrap();
968
969        let count = store.count_auto_versions("git").await.unwrap();
970        assert_eq!(count, 2);
971    }
972
973    #[tokio::test]
974    async fn prune_preserves_manual_and_active() {
975        let store = test_store().await;
976
977        let v1 = store
978            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
979            .await
980            .unwrap();
981        store.activate_skill_version("git", v1).await.unwrap();
982
983        for i in 2..=5 {
984            store
985                .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
986                .await
987                .unwrap();
988        }
989
990        let pruned = store.prune_skill_versions("git", 2).await.unwrap();
991        assert_eq!(pruned, 2);
992
993        let versions = store.load_skill_versions("git").await.unwrap();
994        assert!(versions.iter().any(|v| v.source == "manual"));
995        let auto_count = versions.iter().filter(|v| v.source == "auto").count();
996        assert_eq!(auto_count, 2);
997    }
998
999    #[tokio::test]
1000    async fn predecessor_version_returns_parent() {
1001        let store = test_store().await;
1002
1003        let v1 = store
1004            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1005            .await
1006            .unwrap();
1007        let v2 = store
1008            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1009            .await
1010            .unwrap();
1011
1012        let pred = store.predecessor_version(v2).await.unwrap().unwrap();
1013        assert_eq!(pred.id, v1);
1014        assert_eq!(pred.version, 1);
1015    }
1016
1017    #[tokio::test]
1018    async fn predecessor_version_none_for_root() {
1019        let store = test_store().await;
1020
1021        let v1 = store
1022            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1023            .await
1024            .unwrap();
1025
1026        let pred = store.predecessor_version(v1).await.unwrap();
1027        assert!(pred.is_none());
1028    }
1029
1030    #[tokio::test]
1031    async fn active_skill_version_none_for_unknown() {
1032        let store = test_store().await;
1033        let active = store.active_skill_version("nonexistent").await.unwrap();
1034        assert!(active.is_none());
1035    }
1036
1037    #[tokio::test]
1038    async fn load_skill_outcome_stats_empty() {
1039        let store = test_store().await;
1040        let stats = store.load_skill_outcome_stats().await.unwrap();
1041        assert!(stats.is_empty());
1042    }
1043
1044    #[tokio::test]
1045    async fn load_skill_versions_empty() {
1046        let store = test_store().await;
1047        let versions = store.load_skill_versions("nonexistent").await.unwrap();
1048        assert!(versions.is_empty());
1049    }
1050
1051    #[tokio::test]
1052    async fn count_auto_versions_zero_for_unknown() {
1053        let store = test_store().await;
1054        let count = store.count_auto_versions("nonexistent").await.unwrap();
1055        assert_eq!(count, 0);
1056    }
1057
1058    #[tokio::test]
1059    async fn prune_nothing_when_below_limit() {
1060        let store = test_store().await;
1061
1062        store
1063            .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
1064            .await
1065            .unwrap();
1066
1067        let pruned = store.prune_skill_versions("git", 5).await.unwrap();
1068        assert_eq!(pruned, 0);
1069    }
1070
1071    #[tokio::test]
1072    async fn record_skill_outcome_with_error_context() {
1073        let store = test_store().await;
1074
1075        store
1076            .record_skill_outcome(
1077                "docker",
1078                None,
1079                Some(crate::types::ConversationId(1)),
1080                "tool_failure",
1081                Some("container not found"),
1082                None,
1083            )
1084            .await
1085            .unwrap();
1086
1087        let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
1088        assert_eq!(metrics.total, 1);
1089        assert_eq!(metrics.failures, 1);
1090    }
1091
1092    #[tokio::test]
1093    async fn save_skill_version_with_error_context() {
1094        let store = test_store().await;
1095
1096        let id = store
1097            .save_skill_version(
1098                "git",
1099                1,
1100                "improved body",
1101                "Git helper",
1102                "auto",
1103                Some("exit code 128"),
1104                None,
1105            )
1106            .await
1107            .unwrap();
1108        assert!(id > 0);
1109    }
1110
1111    #[tokio::test]
1112    async fn record_skill_outcomes_batch_resolves_version_id() {
1113        let store = test_store().await;
1114
1115        let vid = store
1116            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1117            .await
1118            .unwrap();
1119        store.activate_skill_version("git", vid).await.unwrap();
1120
1121        store
1122            .record_skill_outcomes_batch(
1123                &["git".to_string()],
1124                None,
1125                "tool_failure",
1126                Some("exit code 1"),
1127                Some("exit_nonzero"),
1128            )
1129            .await
1130            .unwrap();
1131
1132        let pool = store.pool();
1133        let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
1134            "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
1135        ))
1136        .fetch_one(pool)
1137        .await
1138        .unwrap();
1139        assert_eq!(
1140            row.0,
1141            Some(vid),
1142            "version_id should be resolved to active version"
1143        );
1144        assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
1145    }
1146
1147    #[tokio::test]
1148    async fn record_skill_outcome_stores_outcome_detail() {
1149        let store = test_store().await;
1150
1151        store
1152            .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
1153            .await
1154            .unwrap();
1155
1156        let pool = store.pool();
1157        let row: (Option<String>,) = zeph_db::query_as(sql!(
1158            "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
1159        ))
1160        .fetch_one(pool)
1161        .await
1162        .unwrap();
1163        assert_eq!(row.0.as_deref(), Some("timeout"));
1164    }
1165
1166    #[tokio::test]
1167    async fn record_skill_outcomes_batch_waits_for_active_writer() {
1168        let file = tempfile::NamedTempFile::new().expect("tempfile");
1169        let path = file.path().to_str().expect("valid path").to_owned();
1170        let store = SqliteStore::with_pool_size(&path, 2)
1171            .await
1172            .expect("with_pool_size");
1173
1174        let vid = store
1175            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1176            .await
1177            .unwrap();
1178        store.activate_skill_version("git", vid).await.unwrap();
1179
1180        let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1181        zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1182            .execute(&mut *writer_tx)
1183            .await
1184            .expect("hold write lock");
1185
1186        let batch_store = store.clone();
1187        let batch = tokio::spawn(async move {
1188            batch_store
1189                .record_skill_outcomes_batch(
1190                    &["git".to_string()],
1191                    None,
1192                    "success",
1193                    None,
1194                    Some("waited_for_writer"),
1195                )
1196                .await
1197        });
1198
1199        sleep(Duration::from_millis(100)).await;
1200        writer_tx.commit().await.expect("commit writer");
1201
1202        batch
1203            .await
1204            .expect("join batch task")
1205            .expect("record outcomes");
1206
1207        let count: i64 = zeph_db::query_scalar(sql!(
1208            "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1209        ))
1210        .fetch_one(store.pool())
1211        .await
1212        .unwrap();
1213        assert_eq!(
1214            count, 1,
1215            "expected batch insert to succeed after writer commits"
1216        );
1217    }
1218
1219    #[tokio::test]
1220    async fn distinct_session_count_empty() {
1221        let store = test_store().await;
1222        let count = store.distinct_session_count("unknown-skill").await.unwrap();
1223        assert_eq!(count, 0);
1224    }
1225
1226    #[tokio::test]
1227    async fn distinct_session_count_single_session() {
1228        let store = test_store().await;
1229        let cid = crate::types::ConversationId(1);
1230        store
1231            .record_skill_outcome("git", None, Some(cid), "success", None, None)
1232            .await
1233            .unwrap();
1234        store
1235            .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1236            .await
1237            .unwrap();
1238        let count = store.distinct_session_count("git").await.unwrap();
1239        assert_eq!(count, 1);
1240    }
1241
1242    #[tokio::test]
1243    async fn distinct_session_count_multiple_sessions() {
1244        let store = test_store().await;
1245        for i in 0..3i64 {
1246            store
1247                .record_skill_outcome(
1248                    "git",
1249                    None,
1250                    Some(crate::types::ConversationId(i)),
1251                    "success",
1252                    None,
1253                    None,
1254                )
1255                .await
1256                .unwrap();
1257        }
1258        let count = store.distinct_session_count("git").await.unwrap();
1259        assert_eq!(count, 3);
1260    }
1261
1262    #[tokio::test]
1263    async fn distinct_session_count_null_conversation_ids_excluded() {
1264        let store = test_store().await;
1265        // Insert outcomes with NULL conversation_id (legacy rows).
1266        store
1267            .record_skill_outcome("git", None, None, "success", None, None)
1268            .await
1269            .unwrap();
1270        store
1271            .record_skill_outcome("git", None, None, "success", None, None)
1272            .await
1273            .unwrap();
1274        let count = store.distinct_session_count("git").await.unwrap();
1275        assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1276    }
1277
1278    // --- STEM: skill_usage_log ---
1279
1280    #[tokio::test]
1281    async fn insert_and_find_recurring_patterns() {
1282        let store = test_store().await;
1283        let seq = r#"["shell","web_scrape"]"#;
1284        let hash = "abcdef0123456789";
1285        let ctx = "ctxhash000000000";
1286
1287        for _ in 0..3 {
1288            store
1289                .insert_tool_usage_log(seq, hash, ctx, "success", None)
1290                .await
1291                .unwrap();
1292        }
1293        store
1294            .insert_tool_usage_log(seq, hash, ctx, "failure", None)
1295            .await
1296            .unwrap();
1297
1298        let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1299        assert_eq!(patterns.len(), 1);
1300        let (s, h, occ, suc) = &patterns[0];
1301        assert_eq!(s, seq);
1302        assert_eq!(h, hash);
1303        assert_eq!(*occ, 4);
1304        assert_eq!(*suc, 3);
1305    }
1306
1307    #[tokio::test]
1308    async fn find_recurring_patterns_below_threshold_returns_empty() {
1309        let store = test_store().await;
1310        let seq = r#"["shell"]"#;
1311        let hash = "0000000000000001";
1312        let ctx = "0000000000000001";
1313
1314        store
1315            .insert_tool_usage_log(seq, hash, ctx, "success", None)
1316            .await
1317            .unwrap();
1318
1319        let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1320        assert!(patterns.is_empty());
1321    }
1322
1323    #[tokio::test]
1324    async fn prune_tool_usage_log_removes_old_rows() {
1325        let store = test_store().await;
1326        // Insert a row with an artificially old timestamp so it falls within 0-day window.
1327        zeph_db::query(sql!(
1328            "INSERT INTO skill_usage_log \
1329             (tool_sequence, sequence_hash, context_hash, outcome, created_at) \
1330             VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
1331        ))
1332        .bind(r#"["shell"]"#)
1333        .bind("hash0000000000001")
1334        .bind("ctx00000000000001")
1335        .bind("success")
1336        .execute(store.pool())
1337        .await
1338        .unwrap();
1339
1340        // Prune rows older than 1 day — the row above is 2 days old so it must be removed.
1341        let removed = store.prune_tool_usage_log(1).await.unwrap();
1342        assert_eq!(removed, 1);
1343    }
1344
1345    // --- ERL: skill_heuristics ---
1346
1347    #[tokio::test]
1348    async fn insert_and_load_skill_heuristics() {
1349        let store = test_store().await;
1350
1351        let id = store
1352            .insert_skill_heuristic(Some("git"), "always commit in small chunks", 0.9)
1353            .await
1354            .unwrap();
1355        assert!(id > 0);
1356
1357        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1358        assert_eq!(rows.len(), 1);
1359        assert_eq!(rows[0].1, "always commit in small chunks");
1360        assert!((rows[0].2 - 0.9).abs() < 1e-6);
1361    }
1362
1363    #[tokio::test]
1364    async fn load_skill_heuristics_includes_general() {
1365        let store = test_store().await;
1366
1367        store
1368            .insert_skill_heuristic(None, "general tip", 0.7)
1369            .await
1370            .unwrap();
1371        store
1372            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1373            .await
1374            .unwrap();
1375
1376        // querying for "git" should include both the git-specific and the general (NULL) heuristic
1377        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1378        assert_eq!(rows.len(), 2);
1379    }
1380
1381    #[tokio::test]
1382    async fn load_skill_heuristics_filters_by_min_confidence() {
1383        let store = test_store().await;
1384
1385        store
1386            .insert_skill_heuristic(Some("git"), "low confidence tip", 0.3)
1387            .await
1388            .unwrap();
1389        store
1390            .insert_skill_heuristic(Some("git"), "high confidence tip", 0.9)
1391            .await
1392            .unwrap();
1393
1394        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1395        assert_eq!(rows.len(), 1);
1396        assert_eq!(rows[0].1, "high confidence tip");
1397    }
1398
1399    #[tokio::test]
1400    async fn increment_heuristic_use_count_works() {
1401        let store = test_store().await;
1402
1403        let id = store
1404            .insert_skill_heuristic(Some("git"), "tip", 0.8)
1405            .await
1406            .unwrap();
1407
1408        store.increment_heuristic_use_count(id).await.unwrap();
1409        store.increment_heuristic_use_count(id).await.unwrap();
1410
1411        let rows = store.load_skill_heuristics("git", 0.0, 10).await.unwrap();
1412        assert_eq!(rows[0].3, 2); // use_count
1413    }
1414
1415    #[tokio::test]
1416    async fn load_all_heuristics_for_skill_exact_match() {
1417        let store = test_store().await;
1418
1419        store
1420            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1421            .await
1422            .unwrap();
1423        store
1424            .insert_skill_heuristic(Some("docker"), "docker tip", 0.8)
1425            .await
1426            .unwrap();
1427
1428        let rows = store
1429            .load_all_heuristics_for_skill(Some("git"))
1430            .await
1431            .unwrap();
1432        assert_eq!(rows.len(), 1);
1433        assert_eq!(rows[0].1, "git tip");
1434    }
1435
1436    #[tokio::test]
1437    async fn load_all_heuristics_for_skill_null() {
1438        let store = test_store().await;
1439
1440        store
1441            .insert_skill_heuristic(None, "general", 0.8)
1442            .await
1443            .unwrap();
1444        store
1445            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1446            .await
1447            .unwrap();
1448
1449        let rows = store.load_all_heuristics_for_skill(None).await.unwrap();
1450        assert_eq!(rows.len(), 1);
1451        assert_eq!(rows[0].1, "general");
1452    }
1453
1454    #[tokio::test]
1455    async fn skill_trust_default_is_quarantined() {
1456        // Verify the DB schema default for skill_trust.trust_level is 'quarantined'.
1457        // ARISE-generated versions do not call set_skill_trust_level, so they inherit
1458        // this default when the trust row is first created by the scanner.
1459        let store = test_store().await;
1460
1461        // Insert a trust row without specifying trust_level to exercise the DEFAULT.
1462        zeph_db::query(sql!(
1463            "INSERT INTO skill_trust (skill_name, blake3_hash) VALUES ('test-arise', 'abc')"
1464        ))
1465        .execute(store.pool())
1466        .await
1467        .unwrap();
1468
1469        let trust: (String,) = zeph_db::query_as(sql!(
1470            "SELECT trust_level FROM skill_trust WHERE skill_name = 'test-arise'"
1471        ))
1472        .fetch_one(store.pool())
1473        .await
1474        .unwrap();
1475
1476        assert_eq!(
1477            trust.0, "quarantined",
1478            "schema default for skill_trust.trust_level must be 'quarantined'"
1479        );
1480    }
1481}