Skip to main content

zeph_memory/store/
skills.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9#[derive(Debug)]
10pub struct SkillUsageRow {
11    pub skill_name: String,
12    pub invocation_count: i64,
13    pub last_used_at: String,
14}
15
16#[derive(Debug)]
17pub struct SkillMetricsRow {
18    pub skill_name: String,
19    pub version_id: Option<i64>,
20    pub total: i64,
21    pub successes: i64,
22    pub failures: i64,
23}
24
25#[derive(Debug)]
26pub struct SkillVersionRow {
27    pub id: i64,
28    pub skill_name: String,
29    pub version: i64,
30    pub body: String,
31    pub description: String,
32    pub source: String,
33    pub is_active: bool,
34    pub success_count: i64,
35    pub failure_count: i64,
36    pub created_at: String,
37}
38
39type SkillVersionTuple = (
40    i64,
41    String,
42    i64,
43    String,
44    String,
45    String,
46    i64,
47    i64,
48    i64,
49    String,
50);
51
52fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
53    SkillVersionRow {
54        id: t.0,
55        skill_name: t.1,
56        version: t.2,
57        body: t.3,
58        description: t.4,
59        source: t.5,
60        is_active: t.6 != 0,
61        success_count: t.7,
62        failure_count: t.8,
63        created_at: t.9,
64    }
65}
66
67impl SqliteStore {
68    /// Record usage of skills (UPSERT: increment count and update timestamp).
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the database operation fails.
73    pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
74        for name in skill_names {
75            zeph_db::query(sql!(
76                "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
77                 VALUES (?, 1, CURRENT_TIMESTAMP) \
78                 ON CONFLICT(skill_name) DO UPDATE SET \
79                 invocation_count = invocation_count + 1, \
80                 last_used_at = CURRENT_TIMESTAMP"
81            ))
82            .bind(name)
83            .execute(&self.pool)
84            .await?;
85        }
86        Ok(())
87    }
88
89    /// Load all skill usage statistics.
90    ///
91    /// # Errors
92    ///
93    /// Returns an error if the query fails.
94    pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
95        let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
96            "SELECT skill_name, invocation_count, last_used_at \
97             FROM skill_usage ORDER BY invocation_count DESC"
98        ))
99        .fetch_all(&self.pool)
100        .await?;
101
102        Ok(rows
103            .into_iter()
104            .map(
105                |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
106                    skill_name,
107                    invocation_count,
108                    last_used_at,
109                },
110            )
111            .collect())
112    }
113
114    /// Record a skill outcome event.
115    ///
116    /// # Errors
117    ///
118    /// Returns an error if the insert fails.
119    pub async fn record_skill_outcome(
120        &self,
121        skill_name: &str,
122        version_id: Option<i64>,
123        conversation_id: Option<crate::types::ConversationId>,
124        outcome: &str,
125        error_context: Option<&str>,
126        outcome_detail: Option<&str>,
127    ) -> Result<(), MemoryError> {
128        zeph_db::query(sql!(
129            "INSERT INTO skill_outcomes \
130             (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
131             VALUES (?, ?, ?, ?, ?, ?)"
132        ))
133        .bind(skill_name)
134        .bind(version_id)
135        .bind(conversation_id)
136        .bind(outcome)
137        .bind(error_context)
138        .bind(outcome_detail)
139        .execute(&self.pool)
140        .await?;
141        Ok(())
142    }
143
144    /// Record outcomes for multiple skills in a single transaction.
145    ///
146    /// # Errors
147    ///
148    /// Returns an error if any insert fails (whole batch is rolled back).
149    pub async fn record_skill_outcomes_batch(
150        &self,
151        skill_names: &[String],
152        conversation_id: Option<crate::types::ConversationId>,
153        outcome: &str,
154        error_context: Option<&str>,
155        outcome_detail: Option<&str>,
156    ) -> Result<(), MemoryError> {
157        // Acquire the write lock up front to avoid DEFERRED read->write upgrades
158        // failing with SQLITE_BUSY_SNAPSHOT under concurrent WAL writers.
159        let mut tx = begin_write(&self.pool).await?;
160
161        let mut version_map: std::collections::HashMap<String, Option<i64>> =
162            std::collections::HashMap::new();
163        for name in skill_names {
164            let vid: Option<(i64,)> = zeph_db::query_as(sql!(
165                "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
166            ))
167            .bind(name)
168            .fetch_optional(&mut *tx)
169            .await?;
170            version_map.insert(name.clone(), vid.map(|r| r.0));
171        }
172
173        for name in skill_names {
174            let version_id = version_map.get(name.as_str()).copied().flatten();
175            zeph_db::query(sql!(
176                "INSERT INTO skill_outcomes \
177                 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
178                 VALUES (?, ?, ?, ?, ?, ?)"
179            ))
180            .bind(name)
181            .bind(version_id)
182            .bind(conversation_id)
183            .bind(outcome)
184            .bind(error_context)
185            .bind(outcome_detail)
186            .execute(&mut *tx)
187            .await?;
188        }
189        tx.commit().await?;
190        Ok(())
191    }
192
193    /// Load metrics for a skill (latest version group).
194    ///
195    /// # Errors
196    ///
197    /// Returns an error if the query fails.
198    pub async fn skill_metrics(
199        &self,
200        skill_name: &str,
201    ) -> Result<Option<SkillMetricsRow>, MemoryError> {
202        let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
203            "SELECT skill_name, version_id, \
204             COUNT(*) as total, \
205             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
206             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
207             FROM skill_outcomes WHERE skill_name = ? \
208             AND outcome NOT IN ('user_approval', 'user_rejection') \
209             GROUP BY skill_name, version_id \
210             ORDER BY version_id DESC LIMIT 1"
211        ))
212        .bind(skill_name)
213        .fetch_optional(&self.pool)
214        .await?;
215
216        Ok(row.map(
217            |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
218                skill_name,
219                version_id,
220                total,
221                successes,
222                failures,
223            },
224        ))
225    }
226
227    /// Load all skill outcome stats grouped by skill name.
228    ///
229    /// # Errors
230    ///
231    /// Returns an error if the query fails.
232    pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
233        let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234            "SELECT skill_name, version_id, \
235             COUNT(*) as total, \
236             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238             FROM skill_outcomes \
239             GROUP BY skill_name \
240             ORDER BY total DESC"
241        ))
242        .fetch_all(&self.pool)
243        .await?;
244
245        Ok(rows
246            .into_iter()
247            .map(
248                |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249                    skill_name,
250                    version_id,
251                    total,
252                    successes,
253                    failures,
254                },
255            )
256            .collect())
257    }
258
259    /// Save a new skill version and return its ID.
260    ///
261    /// # Errors
262    ///
263    /// Returns an error if the insert fails.
264    #[allow(clippy::too_many_arguments)]
265    pub async fn save_skill_version(
266        &self,
267        skill_name: &str,
268        version: i64,
269        body: &str,
270        description: &str,
271        source: &str,
272        error_context: Option<&str>,
273        predecessor_id: Option<i64>,
274    ) -> Result<i64, MemoryError> {
275        let row: (i64,) = zeph_db::query_as(sql!(
276            "INSERT INTO skill_versions \
277             (skill_name, version, body, description, source, error_context, predecessor_id) \
278             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
279        ))
280        .bind(skill_name)
281        .bind(version)
282        .bind(body)
283        .bind(description)
284        .bind(source)
285        .bind(error_context)
286        .bind(predecessor_id)
287        .fetch_one(&self.pool)
288        .await?;
289        Ok(row.0)
290    }
291
292    /// Count the number of distinct conversation sessions in which a skill produced an outcome.
293    ///
294    /// Uses `COUNT(DISTINCT conversation_id)` from `skill_outcomes`. Rows where
295    /// `conversation_id IS NULL` are excluded (legacy rows without session tracking).
296    ///
297    /// # Errors
298    ///
299    /// Returns an error if the query fails.
300    pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
301        let row: (i64,) = zeph_db::query_as(sql!(
302            "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
303             WHERE skill_name = ? AND conversation_id IS NOT NULL"
304        ))
305        .bind(skill_name)
306        .fetch_one(&self.pool)
307        .await?;
308        Ok(row.0)
309    }
310
311    /// Load the active version for a skill.
312    ///
313    /// # Errors
314    ///
315    /// Returns an error if the query fails.
316    pub async fn active_skill_version(
317        &self,
318        skill_name: &str,
319    ) -> Result<Option<SkillVersionRow>, MemoryError> {
320        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
321            "SELECT id, skill_name, version, body, description, source, \
322                 is_active, success_count, failure_count, created_at \
323                 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
324        ))
325        .bind(skill_name)
326        .fetch_optional(&self.pool)
327        .await?;
328
329        Ok(row.map(skill_version_from_tuple))
330    }
331
332    /// Activate a specific version (deactivates others for the same skill).
333    ///
334    /// # Errors
335    ///
336    /// Returns an error if the update fails.
337    pub async fn activate_skill_version(
338        &self,
339        skill_name: &str,
340        version_id: i64,
341    ) -> Result<(), MemoryError> {
342        let mut tx = begin_write(&self.pool).await?;
343
344        zeph_db::query(sql!(
345            "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
346        ))
347        .bind(skill_name)
348        .execute(&mut *tx)
349        .await?;
350
351        zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
352            .bind(version_id)
353            .execute(&mut *tx)
354            .await?;
355
356        tx.commit().await?;
357        Ok(())
358    }
359
360    /// Get the next version number for a skill.
361    ///
362    /// # Errors
363    ///
364    /// Returns an error if the query fails.
365    pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
366        let row: (i64,) = zeph_db::query_as(sql!(
367            "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
368        ))
369        .bind(skill_name)
370        .fetch_one(&self.pool)
371        .await?;
372        Ok(row.0)
373    }
374
375    /// Get the latest auto-generated version's `created_at` for cooldown check.
376    ///
377    /// # Errors
378    ///
379    /// Returns an error if the query fails.
380    pub async fn last_improvement_time(
381        &self,
382        skill_name: &str,
383    ) -> Result<Option<String>, MemoryError> {
384        let row: Option<(String,)> = zeph_db::query_as(sql!(
385            "SELECT created_at FROM skill_versions \
386             WHERE skill_name = ? AND source = 'auto' \
387             ORDER BY id DESC LIMIT 1"
388        ))
389        .bind(skill_name)
390        .fetch_optional(&self.pool)
391        .await?;
392        Ok(row.map(|r| r.0))
393    }
394
395    /// Ensure a base (v1 manual) version exists for a skill. Idempotent.
396    ///
397    /// # Errors
398    ///
399    /// Returns an error if the DB operation fails.
400    pub async fn ensure_skill_version_exists(
401        &self,
402        skill_name: &str,
403        body: &str,
404        description: &str,
405    ) -> Result<(), MemoryError> {
406        let existing: Option<(i64,)> = zeph_db::query_as(sql!(
407            "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
408        ))
409        .bind(skill_name)
410        .fetch_optional(&self.pool)
411        .await?;
412
413        if existing.is_none() {
414            let id = self
415                .save_skill_version(skill_name, 1, body, description, "manual", None, None)
416                .await?;
417            self.activate_skill_version(skill_name, id).await?;
418        }
419        Ok(())
420    }
421
422    /// Load all versions for a skill, ordered by version number.
423    ///
424    /// # Errors
425    ///
426    /// Returns an error if the query fails.
427    pub async fn load_skill_versions(
428        &self,
429        skill_name: &str,
430    ) -> Result<Vec<SkillVersionRow>, MemoryError> {
431        let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
432            "SELECT id, skill_name, version, body, description, source, \
433                 is_active, success_count, failure_count, created_at \
434                 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
435        ))
436        .bind(skill_name)
437        .fetch_all(&self.pool)
438        .await?;
439
440        Ok(rows.into_iter().map(skill_version_from_tuple).collect())
441    }
442
443    /// Count auto-generated versions for a skill.
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if the query fails.
448    pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
449        let row: (i64,) = zeph_db::query_as(sql!(
450            "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
451        ))
452        .bind(skill_name)
453        .fetch_one(&self.pool)
454        .await?;
455        Ok(row.0)
456    }
457
458    /// Delete oldest non-active auto versions exceeding max limit.
459    /// Returns the number of pruned versions.
460    ///
461    /// # Errors
462    ///
463    /// Returns an error if the delete fails.
464    pub async fn prune_skill_versions(
465        &self,
466        skill_name: &str,
467        max_versions: u32,
468    ) -> Result<u32, MemoryError> {
469        let result = zeph_db::query(sql!(
470            "DELETE FROM skill_versions WHERE id IN (\
471                SELECT id FROM skill_versions \
472                WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
473                ORDER BY id ASC \
474                LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
475                    WHERE skill_name = ? AND source = 'auto') - ?)\
476            )"
477        ))
478        .bind(skill_name)
479        .bind(skill_name)
480        .bind(max_versions)
481        .execute(&self.pool)
482        .await?;
483        Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
484    }
485
486    /// Get the predecessor version for rollback.
487    ///
488    /// # Errors
489    ///
490    /// Returns an error if the query fails.
491    pub async fn predecessor_version(
492        &self,
493        version_id: i64,
494    ) -> Result<Option<SkillVersionRow>, MemoryError> {
495        let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
496            "SELECT predecessor_id FROM skill_versions WHERE id = ?"
497        ))
498        .bind(version_id)
499        .fetch_optional(&self.pool)
500        .await?;
501
502        let Some((Some(pid),)) = pred_id else {
503            return Ok(None);
504        };
505
506        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
507            "SELECT id, skill_name, version, body, description, source, \
508                 is_active, success_count, failure_count, created_at \
509                 FROM skill_versions WHERE id = ?"
510        ))
511        .bind(pid)
512        .fetch_optional(&self.pool)
513        .await?;
514
515        Ok(row.map(skill_version_from_tuple))
516    }
517
518    /// Return the skill names for all currently active auto-generated versions.
519    ///
520    /// Used to check rollback eligibility at the start of each agent turn.
521    ///
522    /// # Errors
523    /// Returns [`MemoryError`] on `SQLite` query failure.
524    pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
525        let rows: Vec<(String,)> = zeph_db::query_as(sql!(
526            "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
527        ))
528        .fetch_all(&self.pool)
529        .await?;
530        Ok(rows.into_iter().map(|(name,)| name).collect())
531    }
532}
533
534#[cfg(test)]
535mod tests {
536    use std::time::Duration;
537
538    use tokio::time::sleep;
539
540    use super::*;
541
542    async fn test_store() -> SqliteStore {
543        SqliteStore::new(":memory:").await.unwrap()
544    }
545
546    #[tokio::test]
547    async fn record_skill_usage_increments() {
548        let store = test_store().await;
549
550        store.record_skill_usage(&["git"]).await.unwrap();
551        store.record_skill_usage(&["git"]).await.unwrap();
552
553        let usage = store.load_skill_usage().await.unwrap();
554        assert_eq!(usage.len(), 1);
555        assert_eq!(usage[0].skill_name, "git");
556        assert_eq!(usage[0].invocation_count, 2);
557    }
558
559    #[tokio::test]
560    async fn load_skill_usage_returns_all() {
561        let store = test_store().await;
562
563        store.record_skill_usage(&["git", "docker"]).await.unwrap();
564        store.record_skill_usage(&["git"]).await.unwrap();
565
566        let usage = store.load_skill_usage().await.unwrap();
567        assert_eq!(usage.len(), 2);
568        assert_eq!(usage[0].skill_name, "git");
569        assert_eq!(usage[0].invocation_count, 2);
570        assert_eq!(usage[1].skill_name, "docker");
571        assert_eq!(usage[1].invocation_count, 1);
572    }
573
574    #[tokio::test]
575    async fn migration_005_creates_tables() {
576        let store = test_store().await;
577        let pool = store.pool();
578
579        let versions: (i64,) = zeph_db::query_as(sql!(
580            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
581        ))
582        .fetch_one(pool)
583        .await
584        .unwrap();
585        assert_eq!(versions.0, 1);
586
587        let outcomes: (i64,) = zeph_db::query_as(sql!(
588            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
589        ))
590        .fetch_one(pool)
591        .await
592        .unwrap();
593        assert_eq!(outcomes.0, 1);
594    }
595
596    #[tokio::test]
597    async fn record_skill_outcome_inserts() {
598        let store = test_store().await;
599
600        store
601            .record_skill_outcome(
602                "git",
603                None,
604                Some(crate::types::ConversationId(1)),
605                "success",
606                None,
607                None,
608            )
609            .await
610            .unwrap();
611        store
612            .record_skill_outcome(
613                "git",
614                None,
615                Some(crate::types::ConversationId(1)),
616                "tool_failure",
617                Some("exit code 1"),
618                None,
619            )
620            .await
621            .unwrap();
622
623        let metrics = store.skill_metrics("git").await.unwrap().unwrap();
624        assert_eq!(metrics.total, 2);
625        assert_eq!(metrics.successes, 1);
626        assert_eq!(metrics.failures, 1);
627    }
628
629    #[tokio::test]
630    async fn skill_metrics_none_for_unknown() {
631        let store = test_store().await;
632        let m = store.skill_metrics("nonexistent").await.unwrap();
633        assert!(m.is_none());
634    }
635
636    #[tokio::test]
637    async fn load_skill_outcome_stats_grouped() {
638        let store = test_store().await;
639
640        store
641            .record_skill_outcome("git", None, None, "success", None, None)
642            .await
643            .unwrap();
644        store
645            .record_skill_outcome("git", None, None, "tool_failure", None, None)
646            .await
647            .unwrap();
648        store
649            .record_skill_outcome("docker", None, None, "success", None, None)
650            .await
651            .unwrap();
652
653        let stats = store.load_skill_outcome_stats().await.unwrap();
654        assert_eq!(stats.len(), 2);
655        assert_eq!(stats[0].skill_name, "git");
656        assert_eq!(stats[0].total, 2);
657        assert_eq!(stats[1].skill_name, "docker");
658        assert_eq!(stats[1].total, 1);
659    }
660
661    #[tokio::test]
662    async fn save_and_load_skill_version() {
663        let store = test_store().await;
664
665        let id = store
666            .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
667            .await
668            .unwrap();
669        assert!(id > 0);
670
671        store.activate_skill_version("git", id).await.unwrap();
672
673        let active = store.active_skill_version("git").await.unwrap().unwrap();
674        assert_eq!(active.version, 1);
675        assert_eq!(active.body, "body v1");
676        assert!(active.is_active);
677    }
678
679    #[tokio::test]
680    async fn activate_deactivates_previous() {
681        let store = test_store().await;
682
683        let v1 = store
684            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
685            .await
686            .unwrap();
687        store.activate_skill_version("git", v1).await.unwrap();
688
689        let v2 = store
690            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
691            .await
692            .unwrap();
693        store.activate_skill_version("git", v2).await.unwrap();
694
695        let versions = store.load_skill_versions("git").await.unwrap();
696        assert_eq!(versions.len(), 2);
697        assert!(!versions[0].is_active);
698        assert!(versions[1].is_active);
699    }
700
701    #[tokio::test]
702    async fn next_skill_version_increments() {
703        let store = test_store().await;
704
705        let next = store.next_skill_version("git").await.unwrap();
706        assert_eq!(next, 1);
707
708        store
709            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
710            .await
711            .unwrap();
712        let next = store.next_skill_version("git").await.unwrap();
713        assert_eq!(next, 2);
714    }
715
716    #[tokio::test]
717    async fn last_improvement_time_returns_auto_only() {
718        let store = test_store().await;
719
720        store
721            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
722            .await
723            .unwrap();
724
725        let t = store.last_improvement_time("git").await.unwrap();
726        assert!(t.is_none());
727
728        store
729            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
730            .await
731            .unwrap();
732
733        let t = store.last_improvement_time("git").await.unwrap();
734        assert!(t.is_some());
735    }
736
737    #[tokio::test]
738    async fn ensure_skill_version_exists_idempotent() {
739        let store = test_store().await;
740
741        store
742            .ensure_skill_version_exists("git", "body", "Git helper")
743            .await
744            .unwrap();
745        store
746            .ensure_skill_version_exists("git", "body2", "Git helper 2")
747            .await
748            .unwrap();
749
750        let versions = store.load_skill_versions("git").await.unwrap();
751        assert_eq!(versions.len(), 1);
752        assert_eq!(versions[0].body, "body");
753    }
754
755    #[tokio::test]
756    async fn load_skill_versions_ordered() {
757        let store = test_store().await;
758
759        let v1 = store
760            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
761            .await
762            .unwrap();
763        store
764            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
765            .await
766            .unwrap();
767
768        let versions = store.load_skill_versions("git").await.unwrap();
769        assert_eq!(versions.len(), 2);
770        assert_eq!(versions[0].version, 1);
771        assert_eq!(versions[1].version, 2);
772    }
773
774    #[tokio::test]
775    async fn count_auto_versions_only() {
776        let store = test_store().await;
777
778        store
779            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
780            .await
781            .unwrap();
782        store
783            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
784            .await
785            .unwrap();
786        store
787            .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
788            .await
789            .unwrap();
790
791        let count = store.count_auto_versions("git").await.unwrap();
792        assert_eq!(count, 2);
793    }
794
795    #[tokio::test]
796    async fn prune_preserves_manual_and_active() {
797        let store = test_store().await;
798
799        let v1 = store
800            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
801            .await
802            .unwrap();
803        store.activate_skill_version("git", v1).await.unwrap();
804
805        for i in 2..=5 {
806            store
807                .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
808                .await
809                .unwrap();
810        }
811
812        let pruned = store.prune_skill_versions("git", 2).await.unwrap();
813        assert_eq!(pruned, 2);
814
815        let versions = store.load_skill_versions("git").await.unwrap();
816        assert!(versions.iter().any(|v| v.source == "manual"));
817        let auto_count = versions.iter().filter(|v| v.source == "auto").count();
818        assert_eq!(auto_count, 2);
819    }
820
821    #[tokio::test]
822    async fn predecessor_version_returns_parent() {
823        let store = test_store().await;
824
825        let v1 = store
826            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
827            .await
828            .unwrap();
829        let v2 = store
830            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
831            .await
832            .unwrap();
833
834        let pred = store.predecessor_version(v2).await.unwrap().unwrap();
835        assert_eq!(pred.id, v1);
836        assert_eq!(pred.version, 1);
837    }
838
839    #[tokio::test]
840    async fn predecessor_version_none_for_root() {
841        let store = test_store().await;
842
843        let v1 = store
844            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
845            .await
846            .unwrap();
847
848        let pred = store.predecessor_version(v1).await.unwrap();
849        assert!(pred.is_none());
850    }
851
852    #[tokio::test]
853    async fn active_skill_version_none_for_unknown() {
854        let store = test_store().await;
855        let active = store.active_skill_version("nonexistent").await.unwrap();
856        assert!(active.is_none());
857    }
858
859    #[tokio::test]
860    async fn load_skill_outcome_stats_empty() {
861        let store = test_store().await;
862        let stats = store.load_skill_outcome_stats().await.unwrap();
863        assert!(stats.is_empty());
864    }
865
866    #[tokio::test]
867    async fn load_skill_versions_empty() {
868        let store = test_store().await;
869        let versions = store.load_skill_versions("nonexistent").await.unwrap();
870        assert!(versions.is_empty());
871    }
872
873    #[tokio::test]
874    async fn count_auto_versions_zero_for_unknown() {
875        let store = test_store().await;
876        let count = store.count_auto_versions("nonexistent").await.unwrap();
877        assert_eq!(count, 0);
878    }
879
880    #[tokio::test]
881    async fn prune_nothing_when_below_limit() {
882        let store = test_store().await;
883
884        store
885            .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
886            .await
887            .unwrap();
888
889        let pruned = store.prune_skill_versions("git", 5).await.unwrap();
890        assert_eq!(pruned, 0);
891    }
892
893    #[tokio::test]
894    async fn record_skill_outcome_with_error_context() {
895        let store = test_store().await;
896
897        store
898            .record_skill_outcome(
899                "docker",
900                None,
901                Some(crate::types::ConversationId(1)),
902                "tool_failure",
903                Some("container not found"),
904                None,
905            )
906            .await
907            .unwrap();
908
909        let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
910        assert_eq!(metrics.total, 1);
911        assert_eq!(metrics.failures, 1);
912    }
913
914    #[tokio::test]
915    async fn save_skill_version_with_error_context() {
916        let store = test_store().await;
917
918        let id = store
919            .save_skill_version(
920                "git",
921                1,
922                "improved body",
923                "Git helper",
924                "auto",
925                Some("exit code 128"),
926                None,
927            )
928            .await
929            .unwrap();
930        assert!(id > 0);
931    }
932
933    #[tokio::test]
934    async fn record_skill_outcomes_batch_resolves_version_id() {
935        let store = test_store().await;
936
937        let vid = store
938            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
939            .await
940            .unwrap();
941        store.activate_skill_version("git", vid).await.unwrap();
942
943        store
944            .record_skill_outcomes_batch(
945                &["git".to_string()],
946                None,
947                "tool_failure",
948                Some("exit code 1"),
949                Some("exit_nonzero"),
950            )
951            .await
952            .unwrap();
953
954        let pool = store.pool();
955        let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
956            "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
957        ))
958        .fetch_one(pool)
959        .await
960        .unwrap();
961        assert_eq!(
962            row.0,
963            Some(vid),
964            "version_id should be resolved to active version"
965        );
966        assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
967    }
968
969    #[tokio::test]
970    async fn record_skill_outcome_stores_outcome_detail() {
971        let store = test_store().await;
972
973        store
974            .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
975            .await
976            .unwrap();
977
978        let pool = store.pool();
979        let row: (Option<String>,) = zeph_db::query_as(sql!(
980            "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
981        ))
982        .fetch_one(pool)
983        .await
984        .unwrap();
985        assert_eq!(row.0.as_deref(), Some("timeout"));
986    }
987
988    #[tokio::test]
989    async fn record_skill_outcomes_batch_waits_for_active_writer() {
990        let file = tempfile::NamedTempFile::new().expect("tempfile");
991        let path = file.path().to_str().expect("valid path").to_owned();
992        let store = SqliteStore::with_pool_size(&path, 2)
993            .await
994            .expect("with_pool_size");
995
996        let vid = store
997            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
998            .await
999            .unwrap();
1000        store.activate_skill_version("git", vid).await.unwrap();
1001
1002        let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1003        zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1004            .execute(&mut *writer_tx)
1005            .await
1006            .expect("hold write lock");
1007
1008        let batch_store = store.clone();
1009        let batch = tokio::spawn(async move {
1010            batch_store
1011                .record_skill_outcomes_batch(
1012                    &["git".to_string()],
1013                    None,
1014                    "success",
1015                    None,
1016                    Some("waited_for_writer"),
1017                )
1018                .await
1019        });
1020
1021        sleep(Duration::from_millis(100)).await;
1022        writer_tx.commit().await.expect("commit writer");
1023
1024        batch
1025            .await
1026            .expect("join batch task")
1027            .expect("record outcomes");
1028
1029        let count: i64 = zeph_db::query_scalar(sql!(
1030            "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1031        ))
1032        .fetch_one(store.pool())
1033        .await
1034        .unwrap();
1035        assert_eq!(
1036            count, 1,
1037            "expected batch insert to succeed after writer commits"
1038        );
1039    }
1040
1041    #[tokio::test]
1042    async fn distinct_session_count_empty() {
1043        let store = test_store().await;
1044        let count = store.distinct_session_count("unknown-skill").await.unwrap();
1045        assert_eq!(count, 0);
1046    }
1047
1048    #[tokio::test]
1049    async fn distinct_session_count_single_session() {
1050        let store = test_store().await;
1051        let cid = crate::types::ConversationId(1);
1052        store
1053            .record_skill_outcome("git", None, Some(cid), "success", None, None)
1054            .await
1055            .unwrap();
1056        store
1057            .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1058            .await
1059            .unwrap();
1060        let count = store.distinct_session_count("git").await.unwrap();
1061        assert_eq!(count, 1);
1062    }
1063
1064    #[tokio::test]
1065    async fn distinct_session_count_multiple_sessions() {
1066        let store = test_store().await;
1067        for i in 0..3i64 {
1068            store
1069                .record_skill_outcome(
1070                    "git",
1071                    None,
1072                    Some(crate::types::ConversationId(i)),
1073                    "success",
1074                    None,
1075                    None,
1076                )
1077                .await
1078                .unwrap();
1079        }
1080        let count = store.distinct_session_count("git").await.unwrap();
1081        assert_eq!(count, 3);
1082    }
1083
1084    #[tokio::test]
1085    async fn distinct_session_count_null_conversation_ids_excluded() {
1086        let store = test_store().await;
1087        // Insert outcomes with NULL conversation_id (legacy rows).
1088        store
1089            .record_skill_outcome("git", None, None, "success", None, None)
1090            .await
1091            .unwrap();
1092        store
1093            .record_skill_outcome("git", None, None, "success", None, None)
1094            .await
1095            .unwrap();
1096        let count = store.distinct_session_count("git").await.unwrap();
1097        assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1098    }
1099}