Skip to main content

zeph_memory/store/
skills.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9/// Aggregated usage statistics for a single skill, returned by [`SqliteStore::load_skill_usage`].
10#[derive(Debug)]
11pub struct SkillUsageRow {
12    /// The skill's unique name.
13    pub skill_name: String,
14    /// Total number of times this skill has been invoked.
15    pub invocation_count: i64,
16    /// ISO-8601 timestamp of the most recent invocation.
17    pub last_used_at: String,
18}
19
20/// Per-skill outcome metrics (success/failure counts) returned by [`SqliteStore::load_skill_outcome_stats`].
21#[derive(Debug)]
22pub struct SkillMetricsRow {
23    /// The skill's unique name.
24    pub skill_name: String,
25    /// The version ID this row refers to, if tracked per-version.
26    pub version_id: Option<i64>,
27    /// Total number of recorded outcomes.
28    pub total: i64,
29    /// Number of successful outcomes.
30    pub successes: i64,
31    /// Number of failed outcomes.
32    pub failures: i64,
33}
34
35/// A single persisted skill version, returned by [`SqliteStore::load_skill_versions`] and related queries.
36#[derive(Debug)]
37pub struct SkillVersionRow {
38    /// Primary key assigned by the database.
39    pub id: i64,
40    /// The skill's unique name.
41    pub skill_name: String,
42    /// Monotonically increasing version number within the skill.
43    pub version: i64,
44    /// Full SKILL.md body for this version.
45    pub body: String,
46    /// Human-readable description extracted from the skill body.
47    pub description: String,
48    /// Origin of the version: `"manual"`, `"auto"`, etc.
49    pub source: String,
50    /// Whether this version is currently the active one for the skill.
51    pub is_active: bool,
52    /// Number of successful tool invocations recorded against this version.
53    pub success_count: i64,
54    /// Number of failed tool invocations recorded against this version.
55    pub failure_count: i64,
56    /// ISO-8601 timestamp when this version was created.
57    pub created_at: String,
58}
59
60type SkillVersionTuple = (
61    i64,
62    String,
63    i64,
64    String,
65    String,
66    String,
67    i64,
68    i64,
69    i64,
70    String,
71);
72
73fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
74    SkillVersionRow {
75        id: t.0,
76        skill_name: t.1,
77        version: t.2,
78        body: t.3,
79        description: t.4,
80        source: t.5,
81        is_active: t.6 != 0,
82        success_count: t.7,
83        failure_count: t.8,
84        created_at: t.9,
85    }
86}
87
88impl SqliteStore {
89    /// Record usage of skills (UPSERT: increment count and update timestamp).
90    ///
91    /// All UPSERTs are issued inside a single write transaction to avoid N
92    /// round-trips and WAL contention under concurrent callers.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the database operation fails.
97    #[tracing::instrument(skip_all, name = "memory.skills.record_skill_usage")]
98    pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
99        let mut tx = begin_write(&self.pool).await?;
100        for name in skill_names {
101            zeph_db::query(sql!(
102                "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
103                 VALUES (?, 1, CURRENT_TIMESTAMP) \
104                 ON CONFLICT(skill_name) DO UPDATE SET \
105                 invocation_count = invocation_count + 1, \
106                 last_used_at = CURRENT_TIMESTAMP"
107            ))
108            .bind(name)
109            .execute(&mut *tx)
110            .await?;
111        }
112        tx.commit().await?;
113        Ok(())
114    }
115
116    /// Load all skill usage statistics.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the query fails.
121    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_usage")]
122    pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
123        let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
124            "SELECT skill_name, invocation_count, last_used_at \
125             FROM skill_usage ORDER BY invocation_count DESC"
126        ))
127        .fetch_all(&self.pool)
128        .await?;
129
130        Ok(rows
131            .into_iter()
132            .map(
133                |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
134                    skill_name,
135                    invocation_count,
136                    last_used_at,
137                },
138            )
139            .collect())
140    }
141
142    /// Record a skill outcome event.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the insert fails.
147    #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcome")]
148    pub async fn record_skill_outcome(
149        &self,
150        skill_name: &str,
151        version_id: Option<i64>,
152        conversation_id: Option<crate::types::ConversationId>,
153        outcome: &str,
154        error_context: Option<&str>,
155        outcome_detail: Option<&str>,
156    ) -> Result<(), MemoryError> {
157        zeph_db::query(sql!(
158            "INSERT INTO skill_outcomes \
159             (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
160             VALUES (?, ?, ?, ?, ?, ?)"
161        ))
162        .bind(skill_name)
163        .bind(version_id)
164        .bind(conversation_id)
165        .bind(outcome)
166        .bind(error_context)
167        .bind(outcome_detail)
168        .execute(&self.pool)
169        .await?;
170        Ok(())
171    }
172
173    /// Record outcomes for multiple skills in a single transaction.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if any insert fails (whole batch is rolled back).
178    #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcomes_batch")]
179    pub async fn record_skill_outcomes_batch(
180        &self,
181        skill_names: &[String],
182        conversation_id: Option<crate::types::ConversationId>,
183        outcome: &str,
184        error_context: Option<&str>,
185        outcome_detail: Option<&str>,
186    ) -> Result<(), MemoryError> {
187        // Acquire the write lock up front to avoid DEFERRED read->write upgrades
188        // failing with SQLITE_BUSY_SNAPSHOT under concurrent WAL writers.
189        let mut tx = begin_write(&self.pool).await?;
190
191        let mut version_map: std::collections::HashMap<String, Option<i64>> =
192            std::collections::HashMap::new();
193        for name in skill_names {
194            let vid: Option<(i64,)> = zeph_db::query_as(sql!(
195                "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
196            ))
197            .bind(name)
198            .fetch_optional(&mut *tx)
199            .await?;
200            version_map.insert(name.clone(), vid.map(|r| r.0));
201        }
202
203        for name in skill_names {
204            let version_id = version_map.get(name.as_str()).copied().flatten();
205            zeph_db::query(sql!(
206                "INSERT INTO skill_outcomes \
207                 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
208                 VALUES (?, ?, ?, ?, ?, ?)"
209            ))
210            .bind(name)
211            .bind(version_id)
212            .bind(conversation_id)
213            .bind(outcome)
214            .bind(error_context)
215            .bind(outcome_detail)
216            .execute(&mut *tx)
217            .await?;
218        }
219        tx.commit().await?;
220        Ok(())
221    }
222
223    /// Load metrics for a skill (latest version group).
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the query fails.
228    #[tracing::instrument(skip_all, name = "memory.skills.skill_metrics")]
229    pub async fn skill_metrics(
230        &self,
231        skill_name: &str,
232    ) -> Result<Option<SkillMetricsRow>, MemoryError> {
233        let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234            "SELECT skill_name, version_id, \
235             COUNT(*) as total, \
236             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238             FROM skill_outcomes WHERE skill_name = ? \
239             AND outcome NOT IN ('user_approval', 'user_rejection') \
240             GROUP BY skill_name, version_id \
241             ORDER BY version_id DESC LIMIT 1"
242        ))
243        .bind(skill_name)
244        .fetch_optional(&self.pool)
245        .await?;
246
247        Ok(row.map(
248            |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249                skill_name,
250                version_id,
251                total,
252                successes,
253                failures,
254            },
255        ))
256    }
257
258    /// Load all skill outcome stats grouped by skill name.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the query fails.
263    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_outcome_stats")]
264    pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
265        let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
266            "SELECT skill_name, version_id, \
267             COUNT(*) as total, \
268             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
269             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
270             FROM skill_outcomes \
271             GROUP BY skill_name \
272             ORDER BY total DESC"
273        ))
274        .fetch_all(&self.pool)
275        .await?;
276
277        Ok(rows
278            .into_iter()
279            .map(
280                |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
281                    skill_name,
282                    version_id,
283                    total,
284                    successes,
285                    failures,
286                },
287            )
288            .collect())
289    }
290
291    /// Save a new skill version and return its ID.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the insert fails.
296    #[allow(clippy::too_many_arguments)]
297    // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
298    #[tracing::instrument(skip_all, name = "memory.skills.save_skill_version")]
299    pub async fn save_skill_version(
300        &self,
301        skill_name: &str,
302        version: i64,
303        body: &str,
304        description: &str,
305        source: &str,
306        error_context: Option<&str>,
307        predecessor_id: Option<i64>,
308    ) -> Result<i64, MemoryError> {
309        let row: (i64,) = zeph_db::query_as(sql!(
310            "INSERT INTO skill_versions \
311             (skill_name, version, body, description, source, error_context, predecessor_id) \
312             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
313        ))
314        .bind(skill_name)
315        .bind(version)
316        .bind(body)
317        .bind(description)
318        .bind(source)
319        .bind(error_context)
320        .bind(predecessor_id)
321        .fetch_one(&self.pool)
322        .await?;
323        Ok(row.0)
324    }
325
326    /// Save a new skill version and atomically activate it within a single `BEGIN IMMEDIATE`
327    /// transaction, preventing orphaned saved-but-inactive versions on crash or concurrent access.
328    ///
329    /// Equivalent to calling [`Self::save_skill_version`] followed by [`Self::activate_skill_version`] but
330    /// without the window between the two calls where the DB can be left in an inconsistent state.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if the insert, deactivate, or activate queries fail. The transaction is
335    /// rolled back automatically on error.
336    #[allow(clippy::too_many_arguments)]
337    #[tracing::instrument(skip_all, name = "memory.skills.save_and_activate_skill_version")]
338    pub async fn save_and_activate_skill_version(
339        &self,
340        skill_name: &str,
341        version: i64,
342        body: &str,
343        description: &str,
344        source: &str,
345        error_context: Option<&str>,
346        predecessor_id: Option<i64>,
347    ) -> Result<i64, MemoryError> {
348        let mut tx = begin_write(&self.pool).await?;
349
350        let row: (i64,) = zeph_db::query_as(sql!(
351            "INSERT INTO skill_versions \
352             (skill_name, version, body, description, source, error_context, predecessor_id) \
353             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
354        ))
355        .bind(skill_name)
356        .bind(version)
357        .bind(body)
358        .bind(description)
359        .bind(source)
360        .bind(error_context)
361        .bind(predecessor_id)
362        .fetch_one(&mut *tx)
363        .await?;
364        let id = row.0;
365
366        zeph_db::query(sql!(
367            "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
368        ))
369        .bind(skill_name)
370        .execute(&mut *tx)
371        .await?;
372
373        zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
374            .bind(id)
375            .execute(&mut *tx)
376            .await?;
377
378        tx.commit().await?;
379        Ok(id)
380    }
381
382    /// Count the number of distinct conversation sessions in which a skill produced an outcome.
383    ///
384    /// Uses `COUNT(DISTINCT conversation_id)` from `skill_outcomes`. Rows where
385    /// `conversation_id IS NULL` are excluded (legacy rows without session tracking).
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the query fails.
390    #[tracing::instrument(skip_all, name = "memory.skills.distinct_session_count")]
391    pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
392        let row: (i64,) = zeph_db::query_as(sql!(
393            "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
394             WHERE skill_name = ? AND conversation_id IS NOT NULL"
395        ))
396        .bind(skill_name)
397        .fetch_one(&self.pool)
398        .await?;
399        Ok(row.0)
400    }
401
402    /// Load the active version for a skill.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the query fails.
407    #[tracing::instrument(skip_all, name = "memory.skills.active_skill_version")]
408    pub async fn active_skill_version(
409        &self,
410        skill_name: &str,
411    ) -> Result<Option<SkillVersionRow>, MemoryError> {
412        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
413            "SELECT id, skill_name, version, body, description, source, \
414                 is_active, success_count, failure_count, created_at \
415                 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
416        ))
417        .bind(skill_name)
418        .fetch_optional(&self.pool)
419        .await?;
420
421        Ok(row.map(skill_version_from_tuple))
422    }
423
424    /// Activate a specific version (deactivates others for the same skill).
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the update fails.
429    #[tracing::instrument(skip_all, name = "memory.skills.activate_skill_version")]
430    pub async fn activate_skill_version(
431        &self,
432        skill_name: &str,
433        version_id: i64,
434    ) -> Result<(), MemoryError> {
435        let mut tx = begin_write(&self.pool).await?;
436
437        zeph_db::query(sql!(
438            "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
439        ))
440        .bind(skill_name)
441        .execute(&mut *tx)
442        .await?;
443
444        zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
445            .bind(version_id)
446            .execute(&mut *tx)
447            .await?;
448
449        tx.commit().await?;
450        Ok(())
451    }
452
453    /// Get the next version number for a skill.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the query fails.
458    #[tracing::instrument(skip_all, name = "memory.skills.next_skill_version")]
459    pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
460        let row: (i64,) = zeph_db::query_as(sql!(
461            "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
462        ))
463        .bind(skill_name)
464        .fetch_one(&self.pool)
465        .await?;
466        Ok(row.0)
467    }
468
469    /// Get the latest auto-generated version's `created_at` for cooldown check.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if the query fails.
474    #[tracing::instrument(skip_all, name = "memory.skills.last_improvement_time")]
475    pub async fn last_improvement_time(
476        &self,
477        skill_name: &str,
478    ) -> Result<Option<String>, MemoryError> {
479        let row: Option<(String,)> = zeph_db::query_as(sql!(
480            "SELECT created_at FROM skill_versions \
481             WHERE skill_name = ? AND source = 'auto' \
482             ORDER BY id DESC LIMIT 1"
483        ))
484        .bind(skill_name)
485        .fetch_optional(&self.pool)
486        .await?;
487        Ok(row.map(|r| r.0))
488    }
489
490    /// Ensure a base (v1 manual) version exists for a skill. Idempotent.
491    ///
492    /// The check-then-insert-then-activate sequence runs inside a single write
493    /// transaction so that two concurrent callers cannot both observe
494    /// `existing.is_none()` and race to insert duplicate v1 rows.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if the DB operation fails.
499    #[tracing::instrument(skip_all, name = "memory.skills.ensure_skill_version_exists")]
500    pub async fn ensure_skill_version_exists(
501        &self,
502        skill_name: &str,
503        body: &str,
504        description: &str,
505    ) -> Result<(), MemoryError> {
506        let mut tx = begin_write(&self.pool).await?;
507
508        let existing: Option<(i64,)> = zeph_db::query_as(sql!(
509            "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
510        ))
511        .bind(skill_name)
512        .fetch_optional(&mut *tx)
513        .await?;
514
515        if existing.is_none() {
516            let row: (i64,) = zeph_db::query_as(sql!(
517                "INSERT INTO skill_versions \
518                 (skill_name, version, body, description, source, error_context, predecessor_id) \
519                 VALUES (?, 1, ?, ?, 'manual', NULL, NULL) RETURNING id"
520            ))
521            .bind(skill_name)
522            .bind(body)
523            .bind(description)
524            .fetch_one(&mut *tx)
525            .await?;
526            let id = row.0;
527
528            zeph_db::query(sql!(
529                "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
530            ))
531            .bind(skill_name)
532            .execute(&mut *tx)
533            .await?;
534
535            zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
536                .bind(id)
537                .execute(&mut *tx)
538                .await?;
539        }
540
541        tx.commit().await?;
542        Ok(())
543    }
544
545    /// Load all versions for a skill, ordered by version number.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if the query fails.
550    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_versions")]
551    pub async fn load_skill_versions(
552        &self,
553        skill_name: &str,
554    ) -> Result<Vec<SkillVersionRow>, MemoryError> {
555        let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
556            "SELECT id, skill_name, version, body, description, source, \
557                 is_active, success_count, failure_count, created_at \
558                 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
559        ))
560        .bind(skill_name)
561        .fetch_all(&self.pool)
562        .await?;
563
564        Ok(rows.into_iter().map(skill_version_from_tuple).collect())
565    }
566
567    /// Count auto-generated versions for a skill.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the query fails.
572    #[tracing::instrument(skip_all, name = "memory.skills.count_auto_versions")]
573    pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
574        let row: (i64,) = zeph_db::query_as(sql!(
575            "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
576        ))
577        .bind(skill_name)
578        .fetch_one(&self.pool)
579        .await?;
580        Ok(row.0)
581    }
582
583    /// Delete oldest non-active auto versions exceeding max limit.
584    /// Returns the number of pruned versions.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the delete fails.
589    #[cfg(not(feature = "postgres"))]
590    #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
591    pub async fn prune_skill_versions(
592        &self,
593        skill_name: &str,
594        max_versions: u32,
595    ) -> Result<u32, MemoryError> {
596        let result = zeph_db::query(sql!(
597            "DELETE FROM skill_versions WHERE id IN (\
598                SELECT id FROM skill_versions \
599                WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
600                ORDER BY id ASC \
601                LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
602                    WHERE skill_name = ? AND source = 'auto') - ?)\
603            )"
604        ))
605        .bind(skill_name)
606        .bind(skill_name)
607        .bind(max_versions)
608        .execute(&self.pool)
609        .await?;
610        Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
611    }
612
613    /// Prune old auto-generated skill versions, keeping only the most recent `max_versions`.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if the database query fails.
618    #[cfg(feature = "postgres")]
619    #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
620    pub async fn prune_skill_versions(
621        &self,
622        skill_name: &str,
623        max_versions: u32,
624    ) -> Result<u32, MemoryError> {
625        let result = zeph_db::query(sql!(
626            "DELETE FROM skill_versions WHERE id IN (\
627                SELECT id FROM skill_versions \
628                WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
629                ORDER BY id DESC \
630                OFFSET ?\
631            )"
632        ))
633        .bind(skill_name)
634        .bind(i64::from(max_versions))
635        .execute(&self.pool)
636        .await?;
637        Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
638    }
639
640    /// Get the predecessor version for rollback.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the query fails.
645    #[tracing::instrument(skip_all, name = "memory.skills.predecessor_version")]
646    pub async fn predecessor_version(
647        &self,
648        version_id: i64,
649    ) -> Result<Option<SkillVersionRow>, MemoryError> {
650        let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
651            "SELECT predecessor_id FROM skill_versions WHERE id = ?"
652        ))
653        .bind(version_id)
654        .fetch_optional(&self.pool)
655        .await?;
656
657        let Some((Some(pid),)) = pred_id else {
658            return Ok(None);
659        };
660
661        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
662            "SELECT id, skill_name, version, body, description, source, \
663                 is_active, success_count, failure_count, created_at \
664                 FROM skill_versions WHERE id = ?"
665        ))
666        .bind(pid)
667        .fetch_optional(&self.pool)
668        .await?;
669
670        Ok(row.map(skill_version_from_tuple))
671    }
672
673    /// Return the skill names for all currently active auto-generated versions.
674    ///
675    /// Used to check rollback eligibility at the start of each agent turn.
676    ///
677    /// # Errors
678    /// Returns [`MemoryError`] on `SQLite` query failure.
679    #[tracing::instrument(skip_all, name = "memory.skills.list_active_auto_versions")]
680    pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
681        let rows: Vec<(String,)> = zeph_db::query_as(sql!(
682            "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
683        ))
684        .fetch_all(&self.pool)
685        .await?;
686        Ok(rows.into_iter().map(|(name,)| name).collect())
687    }
688
689    // --- STEM: skill_usage_log queries ---
690
691    /// Insert a tool usage log entry.
692    ///
693    /// `tool_sequence` must be a normalized compact JSON array (see `stem::normalize_tool_sequence`).
694    /// `sequence_hash` is the 16-char blake3 hex of `tool_sequence`.
695    ///
696    /// # Errors
697    /// Returns [`MemoryError`] on insert failure.
698    #[tracing::instrument(skip_all, name = "memory.skills.insert_tool_usage_log")]
699    pub async fn insert_tool_usage_log(
700        &self,
701        tool_sequence: &str,
702        sequence_hash: &str,
703        context_hash: &str,
704        outcome: &str,
705        conversation_id: Option<crate::types::ConversationId>,
706    ) -> Result<(), MemoryError> {
707        zeph_db::query(sql!(
708            "INSERT INTO skill_usage_log \
709             (tool_sequence, sequence_hash, context_hash, outcome, conversation_id) \
710             VALUES (?, ?, ?, ?, ?)"
711        ))
712        .bind(tool_sequence)
713        .bind(sequence_hash)
714        .bind(context_hash)
715        .bind(outcome)
716        .bind(conversation_id)
717        .execute(&self.pool)
718        .await?;
719        Ok(())
720    }
721
722    /// Find tool sequences that have been seen at least `min_count` times within the last
723    /// `window_days` days.
724    ///
725    /// Returns `(tool_sequence, sequence_hash, occurrence_count, success_count)` tuples.
726    ///
727    /// # Errors
728    /// Returns [`MemoryError`] on query failure.
729    #[cfg(not(feature = "postgres"))]
730    #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
731    pub async fn find_recurring_patterns(
732        &self,
733        min_count: u32,
734        window_days: u32,
735    ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
736        let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
737            "SELECT tool_sequence, sequence_hash, \
738                    COUNT(*) as occurrence_count, \
739                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
740             FROM skill_usage_log \
741             WHERE created_at > datetime('now', '-' || ? || ' days') \
742             GROUP BY sequence_hash \
743             HAVING occurrence_count >= ? \
744             ORDER BY occurrence_count DESC \
745             LIMIT 10"
746        ))
747        .bind(window_days)
748        .bind(min_count)
749        .fetch_all(&self.pool)
750        .await?;
751
752        Ok(rows
753            .into_iter()
754            .map(|(seq, hash, occ, suc)| {
755                (
756                    seq,
757                    hash,
758                    u32::try_from(occ).unwrap_or(u32::MAX),
759                    u32::try_from(suc).unwrap_or(0),
760                )
761            })
762            .collect())
763    }
764
765    /// Find tool+skill combinations used at least `min_count` times within `window_days` days.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the database query fails.
770    #[cfg(feature = "postgres")]
771    #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
772    pub async fn find_recurring_patterns(
773        &self,
774        min_count: u32,
775        window_days: u32,
776    ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
777        let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
778            "SELECT tool_sequence, sequence_hash, \
779                    COUNT(*) as occurrence_count, \
780                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
781             FROM skill_usage_log \
782             WHERE created_at > NOW() - INTERVAL '1 day' * ? \
783             GROUP BY sequence_hash, tool_sequence \
784             HAVING COUNT(*) >= ? \
785             ORDER BY occurrence_count DESC \
786             LIMIT 10"
787        ))
788        .bind(i64::from(window_days))
789        .bind(i64::from(min_count))
790        .fetch_all(&self.pool)
791        .await?;
792
793        Ok(rows
794            .into_iter()
795            .map(|(seq, hash, occ, suc)| {
796                (
797                    seq,
798                    hash,
799                    u32::try_from(occ).unwrap_or(u32::MAX),
800                    u32::try_from(suc).unwrap_or(0),
801                )
802            })
803            .collect())
804    }
805
806    /// Delete `skill_usage_log` rows older than `retention_days` days.
807    ///
808    /// # Errors
809    /// Returns [`MemoryError`] on delete failure.
810    #[cfg(not(feature = "postgres"))]
811    #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
812    pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
813        let result = zeph_db::query(sql!(
814            "DELETE FROM skill_usage_log \
815             WHERE created_at < datetime('now', '-' || ? || ' days')"
816        ))
817        .bind(retention_days)
818        .execute(&self.pool)
819        .await?;
820        Ok(result.rows_affected())
821    }
822
823    /// Delete tool usage log entries older than `retention_days` days.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the database query fails.
828    #[cfg(feature = "postgres")]
829    #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
830    pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
831        let result = zeph_db::query(sql!(
832            "DELETE FROM skill_usage_log \
833             WHERE created_at < NOW() - INTERVAL '1 day' * ?"
834        ))
835        .bind(i64::from(retention_days))
836        .execute(&self.pool)
837        .await?;
838        Ok(result.rows_affected())
839    }
840
841    // --- ERL: skill_heuristics queries ---
842
843    /// Insert a new heuristic (no dedup — caller must check first).
844    ///
845    /// # Errors
846    /// Returns [`MemoryError`] on insert failure.
847    #[tracing::instrument(skip_all, name = "memory.skills.insert_skill_heuristic")]
848    pub async fn insert_skill_heuristic(
849        &self,
850        skill_name: Option<&str>,
851        heuristic_text: &str,
852        confidence: f64,
853    ) -> Result<i64, MemoryError> {
854        let row: (i64,) = zeph_db::query_as(sql!(
855            "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) \
856             VALUES (?, ?, ?) RETURNING id"
857        ))
858        .bind(skill_name)
859        .bind(heuristic_text)
860        .bind(confidence)
861        .fetch_one(&self.pool)
862        .await?;
863        Ok(row.0)
864    }
865
866    /// Increment `use_count` and update `updated_at` for an existing heuristic by ID.
867    ///
868    /// # Errors
869    /// Returns [`MemoryError`] on update failure.
870    #[cfg(not(feature = "postgres"))]
871    #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
872    pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
873        zeph_db::query(sql!(
874            "UPDATE skill_heuristics \
875             SET use_count = use_count + 1, updated_at = datetime('now') \
876             WHERE id = ?"
877        ))
878        .bind(id)
879        .execute(&self.pool)
880        .await?;
881        Ok(())
882    }
883
884    /// Increment `use_count` and update `updated_at` for an existing heuristic by ID.
885    ///
886    /// # Errors
887    /// Returns [`MemoryError`] on update failure.
888    #[cfg(feature = "postgres")]
889    #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
890    pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
891        zeph_db::query(sql!(
892            "UPDATE skill_heuristics \
893             SET use_count = use_count + 1, updated_at = CURRENT_TIMESTAMP \
894             WHERE id = $1"
895        ))
896        .bind(id)
897        .execute(&self.pool)
898        .await?;
899        Ok(())
900    }
901
902    /// Load heuristics for a given skill (exact match + NULL/general), ordered by confidence DESC.
903    ///
904    /// Returns `(id, heuristic_text, confidence, use_count)` tuples.
905    /// At most `limit` rows are returned.
906    ///
907    /// # Errors
908    /// Returns [`MemoryError`] on query failure.
909    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_heuristics")]
910    pub async fn load_skill_heuristics(
911        &self,
912        skill_name: &str,
913        min_confidence: f64,
914        limit: u32,
915    ) -> Result<Vec<(i64, String, f64, i64)>, MemoryError> {
916        let rows: Vec<(i64, String, f64, i64)> = zeph_db::query_as(sql!(
917            "SELECT id, heuristic_text, confidence, use_count \
918             FROM skill_heuristics \
919             WHERE (skill_name = ? OR skill_name IS NULL) \
920               AND confidence >= ? \
921             ORDER BY confidence DESC \
922             LIMIT ?"
923        ))
924        .bind(skill_name)
925        .bind(min_confidence)
926        .bind(i64::from(limit))
927        .fetch_all(&self.pool)
928        .await?;
929        Ok(rows)
930    }
931
932    /// Load all heuristics for a skill (for dedup checks), without confidence filter.
933    ///
934    /// Returns `(id, heuristic_text)` tuples.
935    ///
936    /// # Errors
937    /// Returns [`MemoryError`] on query failure.
938    #[tracing::instrument(skip_all, name = "memory.skills.load_all_heuristics_for_skill")]
939    pub async fn load_all_heuristics_for_skill(
940        &self,
941        skill_name: Option<&str>,
942    ) -> Result<Vec<(i64, String)>, MemoryError> {
943        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
944            "SELECT id, heuristic_text FROM skill_heuristics \
945             WHERE (skill_name = ? OR (? IS NULL AND skill_name IS NULL))"
946        ))
947        .bind(skill_name)
948        .bind(skill_name)
949        .fetch_all(&self.pool)
950        .await?;
951        Ok(rows)
952    }
953
954    // --- D2Skill: step-level error corrections ---
955
956    /// Find matching step corrections for a tool failure.
957    ///
958    /// Returns `(id, hint)` pairs ordered by `success_count DESC, use_count DESC`.
959    /// Uses `INSTR` instead of LIKE to avoid wildcard injection from `error_context`.
960    ///
961    /// # Errors
962    ///
963    /// Returns [`MemoryError`] on query failure.
964    #[cfg(not(feature = "postgres"))]
965    #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
966    pub async fn find_step_corrections(
967        &self,
968        skill_name: &str,
969        failure_kind: &str,
970        error_context: &str,
971        tool_name: &str,
972        limit: u32,
973    ) -> Result<Vec<(i64, String)>, MemoryError> {
974        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
975            "SELECT id, hint FROM step_corrections \
976             WHERE skill_name = ? \
977               AND (failure_kind = '' OR failure_kind = ?) \
978               AND (error_substring = '' OR INSTR(?, error_substring) > 0) \
979               AND (tool_name = '' OR tool_name = ?) \
980             ORDER BY success_count DESC, use_count DESC \
981             LIMIT ?"
982        ))
983        .bind(skill_name)
984        .bind(failure_kind)
985        .bind(error_context)
986        .bind(tool_name)
987        .bind(limit)
988        .fetch_all(&self.pool)
989        .await?;
990        Ok(rows)
991    }
992
993    /// Find corrections that match the given skill, failure kind, error context, and tool name.
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the database query fails.
998    #[cfg(feature = "postgres")]
999    #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
1000    pub async fn find_step_corrections(
1001        &self,
1002        skill_name: &str,
1003        failure_kind: &str,
1004        error_context: &str,
1005        tool_name: &str,
1006        limit: u32,
1007    ) -> Result<Vec<(i64, String)>, MemoryError> {
1008        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
1009            "SELECT id, hint FROM step_corrections \
1010             WHERE skill_name = ? \
1011               AND (failure_kind = '' OR failure_kind = ?) \
1012               AND (error_substring = '' OR strpos(?, error_substring) > 0) \
1013               AND (tool_name = '' OR tool_name = ?) \
1014             ORDER BY success_count DESC, use_count DESC \
1015             LIMIT ?"
1016        ))
1017        .bind(skill_name)
1018        .bind(failure_kind)
1019        .bind(error_context)
1020        .bind(tool_name)
1021        .bind(i64::from(limit))
1022        .fetch_all(&self.pool)
1023        .await?;
1024        Ok(rows)
1025    }
1026
1027    /// Insert a new step correction (from ARISE trace analysis).
1028    ///
1029    /// Duplicate `(skill_name, failure_kind, error_substring, tool_name)` tuples are silently
1030    /// ignored (handled by `ON CONFLICT IGNORE` in the schema).
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns [`MemoryError`] on query failure.
1035    #[tracing::instrument(skip_all, name = "memory.skills.insert_step_correction")]
1036    pub async fn insert_step_correction(
1037        &self,
1038        skill_name: &str,
1039        failure_kind: &str,
1040        error_substring: &str,
1041        tool_name: &str,
1042        hint: &str,
1043    ) -> Result<(), MemoryError> {
1044        zeph_db::query(sql!(
1045            "INSERT INTO step_corrections \
1046             (skill_name, failure_kind, error_substring, tool_name, hint) \
1047             VALUES (?, ?, ?, ?, ?) \
1048             ON CONFLICT (skill_name, failure_kind, error_substring, tool_name) DO NOTHING"
1049        ))
1050        .bind(skill_name)
1051        .bind(failure_kind)
1052        .bind(error_substring)
1053        .bind(tool_name)
1054        .bind(hint)
1055        .execute(&self.pool)
1056        .await?;
1057        Ok(())
1058    }
1059
1060    /// Increment `use_count` and optionally `success_count` for a step correction.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns [`MemoryError`] on query failure.
1065    #[tracing::instrument(skip_all, name = "memory.skills.record_correction_usage")]
1066    pub async fn record_correction_usage(
1067        &self,
1068        correction_id: i64,
1069        was_successful: bool,
1070    ) -> Result<(), MemoryError> {
1071        if was_successful {
1072            zeph_db::query(sql!(
1073                "UPDATE step_corrections \
1074                 SET use_count = use_count + 1, success_count = success_count + 1 \
1075                 WHERE id = ?"
1076            ))
1077            .bind(correction_id)
1078            .execute(&self.pool)
1079            .await?;
1080        } else {
1081            zeph_db::query(sql!(
1082                "UPDATE step_corrections SET use_count = use_count + 1 WHERE id = ?"
1083            ))
1084            .bind(correction_id)
1085            .execute(&self.pool)
1086            .await?;
1087        }
1088        Ok(())
1089    }
1090
1091    // --- SkillOrchestra: RL routing head weight persistence ---
1092
1093    /// Load routing head weights blob from the singleton row.
1094    ///
1095    /// Returns `(embed_dim, weights_bytes, baseline, update_count)` or `None` if not yet trained.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns [`MemoryError`] on query failure.
1100    #[tracing::instrument(skip_all, name = "memory.skills.load_routing_head_weights")]
1101    #[allow(clippy::type_complexity)]
1102    pub async fn load_routing_head_weights(
1103        &self,
1104    ) -> Result<Option<(i64, Vec<u8>, f64, i64)>, MemoryError> {
1105        let row: Option<(i64, Vec<u8>, f64, i64)> = zeph_db::query_as(sql!(
1106            "SELECT embed_dim, weights, baseline, update_count \
1107             FROM routing_head_weights WHERE id = 1"
1108        ))
1109        .fetch_optional(&self.pool)
1110        .await?;
1111        Ok(row)
1112    }
1113
1114    /// Persist routing head weights (upsert singleton row).
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns [`MemoryError`] on query failure.
1119    #[cfg(not(feature = "postgres"))]
1120    #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1121    pub async fn save_routing_head_weights(
1122        &self,
1123        embed_dim: i64,
1124        weights: &[u8],
1125        baseline: f64,
1126        update_count: i64,
1127    ) -> Result<(), MemoryError> {
1128        zeph_db::query(sql!(
1129            "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1130             VALUES (1, ?, ?, ?, ?, datetime('now')) \
1131             ON CONFLICT(id) DO UPDATE SET \
1132               embed_dim = excluded.embed_dim, \
1133               weights = excluded.weights, \
1134               baseline = excluded.baseline, \
1135               update_count = excluded.update_count, \
1136               updated_at = datetime('now')"
1137        ))
1138        .bind(embed_dim)
1139        .bind(weights)
1140        .bind(baseline)
1141        .bind(update_count)
1142        .execute(&self.pool)
1143        .await?;
1144        Ok(())
1145    }
1146
1147    /// Persist routing head weights (upsert singleton row).
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns [`MemoryError`] on query failure.
1152    #[cfg(feature = "postgres")]
1153    #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1154    pub async fn save_routing_head_weights(
1155        &self,
1156        embed_dim: i64,
1157        weights: &[u8],
1158        baseline: f64,
1159        update_count: i64,
1160    ) -> Result<(), MemoryError> {
1161        zeph_db::query(sql!(
1162            "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1163             VALUES (1, $1, $2, $3, $4, CURRENT_TIMESTAMP) \
1164             ON CONFLICT(id) DO UPDATE SET \
1165               embed_dim = excluded.embed_dim, \
1166               weights = excluded.weights, \
1167               baseline = excluded.baseline, \
1168               update_count = excluded.update_count, \
1169               updated_at = CURRENT_TIMESTAMP"
1170        ))
1171        .bind(embed_dim)
1172        .bind(weights)
1173        .bind(baseline)
1174        .bind(update_count)
1175        .execute(&self.pool)
1176        .await?;
1177        Ok(())
1178    }
1179
1180    // --- AutoSkill A6: Heuristic promotion helpers (spec 061) ---
1181
1182    /// Return skills where the count of heuristics (above `min_confidence`) meets
1183    /// `min_count`. Each entry is `(skill_name, heuristic_count)`.
1184    ///
1185    /// Used by the promotion background task to find qualifying skills.
1186    ///
1187    /// # Errors
1188    ///
1189    /// Returns an error if the query fails.
1190    #[tracing::instrument(skip_all, name = "memory.skills.count_heuristics_by_skill")]
1191    pub async fn count_heuristics_by_skill(
1192        &self,
1193        min_confidence: f64,
1194        min_count: u32,
1195    ) -> Result<Vec<(String, i64)>, MemoryError> {
1196        let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1197            "SELECT skill_name, COUNT(*) AS cnt \
1198             FROM skill_heuristics \
1199             WHERE confidence >= ? AND skill_name IS NOT NULL \
1200             GROUP BY skill_name \
1201             HAVING cnt >= ?"
1202        ))
1203        .bind(min_confidence)
1204        .bind(i64::from(min_count))
1205        .fetch_all(&self.pool)
1206        .await?;
1207        Ok(rows)
1208    }
1209
1210    /// Load all heuristic texts for a specific skill above `min_confidence`.
1211    ///
1212    /// Returns heuristic texts sorted alphabetically (deterministic batch hash).
1213    ///
1214    /// # Errors
1215    ///
1216    /// Returns an error if the query fails.
1217    #[tracing::instrument(skip_all, name = "memory.skills.load_heuristic_texts_for_promotion")]
1218    pub async fn load_heuristic_texts_for_promotion(
1219        &self,
1220        skill_name: &str,
1221        min_confidence: f64,
1222    ) -> Result<Vec<String>, MemoryError> {
1223        let rows: Vec<(String,)> = zeph_db::query_as(sql!(
1224            "SELECT heuristic_text \
1225             FROM skill_heuristics \
1226             WHERE skill_name = ? AND confidence >= ? \
1227             ORDER BY heuristic_text ASC"
1228        ))
1229        .bind(skill_name)
1230        .bind(min_confidence)
1231        .fetch_all(&self.pool)
1232        .await?;
1233        Ok(rows.into_iter().map(|r| r.0).collect())
1234    }
1235
1236    /// Check whether `(skill_name, batch_hash)` already exists in `skill_heuristic_promotions`.
1237    ///
1238    /// Returns `true` when the batch was already evaluated — the promotion loop should skip it.
1239    ///
1240    /// # Errors
1241    ///
1242    /// Returns an error if the query fails.
1243    #[tracing::instrument(skip_all, name = "memory.skills.promotion_already_evaluated")]
1244    pub async fn promotion_already_evaluated(
1245        &self,
1246        skill_name: &str,
1247        batch_hash: &str,
1248    ) -> Result<bool, MemoryError> {
1249        let count: i64 = zeph_db::query_scalar(sql!(
1250            "SELECT COUNT(*) FROM skill_heuristic_promotions \
1251             WHERE skill_name = ? AND batch_hash = ?"
1252        ))
1253        .bind(skill_name)
1254        .bind(batch_hash)
1255        .fetch_one(&self.pool)
1256        .await?;
1257        Ok(count > 0)
1258    }
1259
1260    /// Record a promotion evaluation result in `skill_heuristic_promotions`.
1261    ///
1262    /// Uses `INSERT OR IGNORE` so concurrent callers are safe (`batch_hash` is part of
1263    /// the primary key).
1264    ///
1265    /// # Errors
1266    ///
1267    /// Returns an error if the query fails.
1268    #[tracing::instrument(skip_all, name = "memory.skills.record_promotion_evaluation")]
1269    pub async fn record_promotion_evaluation(
1270        &self,
1271        skill_name: &str,
1272        batch_hash: &str,
1273        recommendation: &str,
1274        draft_skill_name: Option<&str>,
1275    ) -> Result<(), MemoryError> {
1276        let now = i64::try_from(
1277            std::time::SystemTime::now()
1278                .duration_since(std::time::UNIX_EPOCH)
1279                .unwrap_or_default()
1280                .as_secs(),
1281        )
1282        .unwrap_or(i64::MAX);
1283        zeph_db::query(sql!(
1284            "INSERT OR IGNORE INTO skill_heuristic_promotions \
1285             (skill_name, batch_hash, evaluated_at, recommendation, draft_skill_name) \
1286             VALUES (?, ?, ?, ?, ?)"
1287        ))
1288        .bind(skill_name)
1289        .bind(batch_hash)
1290        .bind(now)
1291        .bind(recommendation)
1292        .bind(draft_skill_name)
1293        .execute(&self.pool)
1294        .await?;
1295        Ok(())
1296    }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301    use std::time::Duration;
1302
1303    use tokio::time::sleep;
1304
1305    use super::*;
1306
1307    async fn test_store() -> SqliteStore {
1308        SqliteStore::new(":memory:").await.unwrap()
1309    }
1310
1311    #[tokio::test]
1312    async fn record_skill_usage_increments() {
1313        let store = test_store().await;
1314
1315        store.record_skill_usage(&["git"]).await.unwrap();
1316        store.record_skill_usage(&["git"]).await.unwrap();
1317
1318        let usage = store.load_skill_usage().await.unwrap();
1319        assert_eq!(usage.len(), 1);
1320        assert_eq!(usage[0].skill_name, "git");
1321        assert_eq!(usage[0].invocation_count, 2);
1322    }
1323
1324    #[tokio::test]
1325    async fn load_skill_usage_returns_all() {
1326        let store = test_store().await;
1327
1328        store.record_skill_usage(&["git", "docker"]).await.unwrap();
1329        store.record_skill_usage(&["git"]).await.unwrap();
1330
1331        let usage = store.load_skill_usage().await.unwrap();
1332        assert_eq!(usage.len(), 2);
1333        assert_eq!(usage[0].skill_name, "git");
1334        assert_eq!(usage[0].invocation_count, 2);
1335        assert_eq!(usage[1].skill_name, "docker");
1336        assert_eq!(usage[1].invocation_count, 1);
1337    }
1338
1339    #[tokio::test]
1340    async fn migration_005_creates_tables() {
1341        let store = test_store().await;
1342        let pool = store.pool();
1343
1344        let versions: (i64,) = zeph_db::query_as(sql!(
1345            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
1346        ))
1347        .fetch_one(pool)
1348        .await
1349        .unwrap();
1350        assert_eq!(versions.0, 1);
1351
1352        let outcomes: (i64,) = zeph_db::query_as(sql!(
1353            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
1354        ))
1355        .fetch_one(pool)
1356        .await
1357        .unwrap();
1358        assert_eq!(outcomes.0, 1);
1359    }
1360
1361    #[tokio::test]
1362    async fn record_skill_outcome_inserts() {
1363        let store = test_store().await;
1364
1365        store
1366            .record_skill_outcome(
1367                "git",
1368                None,
1369                Some(crate::types::ConversationId(1)),
1370                "success",
1371                None,
1372                None,
1373            )
1374            .await
1375            .unwrap();
1376        store
1377            .record_skill_outcome(
1378                "git",
1379                None,
1380                Some(crate::types::ConversationId(1)),
1381                "tool_failure",
1382                Some("exit code 1"),
1383                None,
1384            )
1385            .await
1386            .unwrap();
1387
1388        let metrics = store.skill_metrics("git").await.unwrap().unwrap();
1389        assert_eq!(metrics.total, 2);
1390        assert_eq!(metrics.successes, 1);
1391        assert_eq!(metrics.failures, 1);
1392    }
1393
1394    #[tokio::test]
1395    async fn skill_metrics_none_for_unknown() {
1396        let store = test_store().await;
1397        let m = store.skill_metrics("nonexistent").await.unwrap();
1398        assert!(m.is_none());
1399    }
1400
1401    #[tokio::test]
1402    async fn load_skill_outcome_stats_grouped() {
1403        let store = test_store().await;
1404
1405        store
1406            .record_skill_outcome("git", None, None, "success", None, None)
1407            .await
1408            .unwrap();
1409        store
1410            .record_skill_outcome("git", None, None, "tool_failure", None, None)
1411            .await
1412            .unwrap();
1413        store
1414            .record_skill_outcome("docker", None, None, "success", None, None)
1415            .await
1416            .unwrap();
1417
1418        let stats = store.load_skill_outcome_stats().await.unwrap();
1419        assert_eq!(stats.len(), 2);
1420        assert_eq!(stats[0].skill_name, "git");
1421        assert_eq!(stats[0].total, 2);
1422        assert_eq!(stats[1].skill_name, "docker");
1423        assert_eq!(stats[1].total, 1);
1424    }
1425
1426    #[tokio::test]
1427    async fn save_and_load_skill_version() {
1428        let store = test_store().await;
1429
1430        let id = store
1431            .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
1432            .await
1433            .unwrap();
1434        assert!(id > 0);
1435
1436        store.activate_skill_version("git", id).await.unwrap();
1437
1438        let active = store.active_skill_version("git").await.unwrap().unwrap();
1439        assert_eq!(active.version, 1);
1440        assert_eq!(active.body, "body v1");
1441        assert!(active.is_active);
1442    }
1443
1444    #[tokio::test]
1445    async fn activate_deactivates_previous() {
1446        let store = test_store().await;
1447
1448        let v1 = store
1449            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1450            .await
1451            .unwrap();
1452        store.activate_skill_version("git", v1).await.unwrap();
1453
1454        let v2 = store
1455            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1456            .await
1457            .unwrap();
1458        store.activate_skill_version("git", v2).await.unwrap();
1459
1460        let versions = store.load_skill_versions("git").await.unwrap();
1461        assert_eq!(versions.len(), 2);
1462        assert!(!versions[0].is_active);
1463        assert!(versions[1].is_active);
1464    }
1465
1466    #[tokio::test]
1467    async fn next_skill_version_increments() {
1468        let store = test_store().await;
1469
1470        let next = store.next_skill_version("git").await.unwrap();
1471        assert_eq!(next, 1);
1472
1473        store
1474            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1475            .await
1476            .unwrap();
1477        let next = store.next_skill_version("git").await.unwrap();
1478        assert_eq!(next, 2);
1479    }
1480
1481    #[tokio::test]
1482    async fn last_improvement_time_returns_auto_only() {
1483        let store = test_store().await;
1484
1485        store
1486            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1487            .await
1488            .unwrap();
1489
1490        let t = store.last_improvement_time("git").await.unwrap();
1491        assert!(t.is_none());
1492
1493        store
1494            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1495            .await
1496            .unwrap();
1497
1498        let t = store.last_improvement_time("git").await.unwrap();
1499        assert!(t.is_some());
1500    }
1501
1502    #[tokio::test]
1503    async fn ensure_skill_version_exists_idempotent() {
1504        let store = test_store().await;
1505
1506        store
1507            .ensure_skill_version_exists("git", "body", "Git helper")
1508            .await
1509            .unwrap();
1510        store
1511            .ensure_skill_version_exists("git", "body2", "Git helper 2")
1512            .await
1513            .unwrap();
1514
1515        let versions = store.load_skill_versions("git").await.unwrap();
1516        assert_eq!(versions.len(), 1);
1517        assert_eq!(versions[0].body, "body");
1518    }
1519
1520    #[tokio::test]
1521    async fn load_skill_versions_ordered() {
1522        let store = test_store().await;
1523
1524        let v1 = store
1525            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1526            .await
1527            .unwrap();
1528        store
1529            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1530            .await
1531            .unwrap();
1532
1533        let versions = store.load_skill_versions("git").await.unwrap();
1534        assert_eq!(versions.len(), 2);
1535        assert_eq!(versions[0].version, 1);
1536        assert_eq!(versions[1].version, 2);
1537    }
1538
1539    #[tokio::test]
1540    async fn count_auto_versions_only() {
1541        let store = test_store().await;
1542
1543        store
1544            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1545            .await
1546            .unwrap();
1547        store
1548            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1549            .await
1550            .unwrap();
1551        store
1552            .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
1553            .await
1554            .unwrap();
1555
1556        let count = store.count_auto_versions("git").await.unwrap();
1557        assert_eq!(count, 2);
1558    }
1559
1560    #[tokio::test]
1561    async fn prune_preserves_manual_and_active() {
1562        let store = test_store().await;
1563
1564        let v1 = store
1565            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1566            .await
1567            .unwrap();
1568        store.activate_skill_version("git", v1).await.unwrap();
1569
1570        for i in 2..=5 {
1571            store
1572                .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
1573                .await
1574                .unwrap();
1575        }
1576
1577        let pruned = store.prune_skill_versions("git", 2).await.unwrap();
1578        assert_eq!(pruned, 2);
1579
1580        let versions = store.load_skill_versions("git").await.unwrap();
1581        assert!(versions.iter().any(|v| v.source == "manual"));
1582        let auto_count = versions.iter().filter(|v| v.source == "auto").count();
1583        assert_eq!(auto_count, 2);
1584    }
1585
1586    #[tokio::test]
1587    async fn predecessor_version_returns_parent() {
1588        let store = test_store().await;
1589
1590        let v1 = store
1591            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1592            .await
1593            .unwrap();
1594        let v2 = store
1595            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1596            .await
1597            .unwrap();
1598
1599        let pred = store.predecessor_version(v2).await.unwrap().unwrap();
1600        assert_eq!(pred.id, v1);
1601        assert_eq!(pred.version, 1);
1602    }
1603
1604    #[tokio::test]
1605    async fn predecessor_version_none_for_root() {
1606        let store = test_store().await;
1607
1608        let v1 = store
1609            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1610            .await
1611            .unwrap();
1612
1613        let pred = store.predecessor_version(v1).await.unwrap();
1614        assert!(pred.is_none());
1615    }
1616
1617    #[tokio::test]
1618    async fn active_skill_version_none_for_unknown() {
1619        let store = test_store().await;
1620        let active = store.active_skill_version("nonexistent").await.unwrap();
1621        assert!(active.is_none());
1622    }
1623
1624    #[tokio::test]
1625    async fn load_skill_outcome_stats_empty() {
1626        let store = test_store().await;
1627        let stats = store.load_skill_outcome_stats().await.unwrap();
1628        assert!(stats.is_empty());
1629    }
1630
1631    #[tokio::test]
1632    async fn load_skill_versions_empty() {
1633        let store = test_store().await;
1634        let versions = store.load_skill_versions("nonexistent").await.unwrap();
1635        assert!(versions.is_empty());
1636    }
1637
1638    #[tokio::test]
1639    async fn count_auto_versions_zero_for_unknown() {
1640        let store = test_store().await;
1641        let count = store.count_auto_versions("nonexistent").await.unwrap();
1642        assert_eq!(count, 0);
1643    }
1644
1645    #[tokio::test]
1646    async fn prune_nothing_when_below_limit() {
1647        let store = test_store().await;
1648
1649        store
1650            .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
1651            .await
1652            .unwrap();
1653
1654        let pruned = store.prune_skill_versions("git", 5).await.unwrap();
1655        assert_eq!(pruned, 0);
1656    }
1657
1658    #[tokio::test]
1659    async fn record_skill_outcome_with_error_context() {
1660        let store = test_store().await;
1661
1662        store
1663            .record_skill_outcome(
1664                "docker",
1665                None,
1666                Some(crate::types::ConversationId(1)),
1667                "tool_failure",
1668                Some("container not found"),
1669                None,
1670            )
1671            .await
1672            .unwrap();
1673
1674        let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
1675        assert_eq!(metrics.total, 1);
1676        assert_eq!(metrics.failures, 1);
1677    }
1678
1679    #[tokio::test]
1680    async fn save_skill_version_with_error_context() {
1681        let store = test_store().await;
1682
1683        let id = store
1684            .save_skill_version(
1685                "git",
1686                1,
1687                "improved body",
1688                "Git helper",
1689                "auto",
1690                Some("exit code 128"),
1691                None,
1692            )
1693            .await
1694            .unwrap();
1695        assert!(id > 0);
1696    }
1697
1698    #[tokio::test]
1699    async fn record_skill_outcomes_batch_resolves_version_id() {
1700        let store = test_store().await;
1701
1702        let vid = store
1703            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1704            .await
1705            .unwrap();
1706        store.activate_skill_version("git", vid).await.unwrap();
1707
1708        store
1709            .record_skill_outcomes_batch(
1710                &["git".to_string()],
1711                None,
1712                "tool_failure",
1713                Some("exit code 1"),
1714                Some("exit_nonzero"),
1715            )
1716            .await
1717            .unwrap();
1718
1719        let pool = store.pool();
1720        let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
1721            "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
1722        ))
1723        .fetch_one(pool)
1724        .await
1725        .unwrap();
1726        assert_eq!(
1727            row.0,
1728            Some(vid),
1729            "version_id should be resolved to active version"
1730        );
1731        assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
1732    }
1733
1734    #[tokio::test]
1735    async fn record_skill_outcome_stores_outcome_detail() {
1736        let store = test_store().await;
1737
1738        store
1739            .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
1740            .await
1741            .unwrap();
1742
1743        let pool = store.pool();
1744        let row: (Option<String>,) = zeph_db::query_as(sql!(
1745            "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
1746        ))
1747        .fetch_one(pool)
1748        .await
1749        .unwrap();
1750        assert_eq!(row.0.as_deref(), Some("timeout"));
1751    }
1752
1753    #[tokio::test]
1754    async fn record_skill_outcomes_batch_waits_for_active_writer() {
1755        let file = tempfile::NamedTempFile::new().expect("tempfile");
1756        let path = file.path().to_str().expect("valid path").to_owned();
1757        let store = SqliteStore::with_pool_size(&path, 2)
1758            .await
1759            .expect("with_pool_size");
1760
1761        let vid = store
1762            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1763            .await
1764            .unwrap();
1765        store.activate_skill_version("git", vid).await.unwrap();
1766
1767        let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1768        zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1769            .execute(&mut *writer_tx)
1770            .await
1771            .expect("hold write lock");
1772
1773        let batch_store = store.clone();
1774        let batch = tokio::spawn(async move {
1775            batch_store
1776                .record_skill_outcomes_batch(
1777                    &["git".to_string()],
1778                    None,
1779                    "success",
1780                    None,
1781                    Some("waited_for_writer"),
1782                )
1783                .await
1784        });
1785
1786        sleep(Duration::from_millis(100)).await;
1787        writer_tx.commit().await.expect("commit writer");
1788
1789        batch
1790            .await
1791            .expect("join batch task")
1792            .expect("record outcomes");
1793
1794        let count: i64 = zeph_db::query_scalar(sql!(
1795            "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1796        ))
1797        .fetch_one(store.pool())
1798        .await
1799        .unwrap();
1800        assert_eq!(
1801            count, 1,
1802            "expected batch insert to succeed after writer commits"
1803        );
1804    }
1805
1806    #[tokio::test]
1807    async fn distinct_session_count_empty() {
1808        let store = test_store().await;
1809        let count = store.distinct_session_count("unknown-skill").await.unwrap();
1810        assert_eq!(count, 0);
1811    }
1812
1813    #[tokio::test]
1814    async fn distinct_session_count_single_session() {
1815        let store = test_store().await;
1816        let cid = crate::types::ConversationId(1);
1817        store
1818            .record_skill_outcome("git", None, Some(cid), "success", None, None)
1819            .await
1820            .unwrap();
1821        store
1822            .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1823            .await
1824            .unwrap();
1825        let count = store.distinct_session_count("git").await.unwrap();
1826        assert_eq!(count, 1);
1827    }
1828
1829    #[tokio::test]
1830    async fn distinct_session_count_multiple_sessions() {
1831        let store = test_store().await;
1832        for i in 0..3i64 {
1833            store
1834                .record_skill_outcome(
1835                    "git",
1836                    None,
1837                    Some(crate::types::ConversationId(i)),
1838                    "success",
1839                    None,
1840                    None,
1841                )
1842                .await
1843                .unwrap();
1844        }
1845        let count = store.distinct_session_count("git").await.unwrap();
1846        assert_eq!(count, 3);
1847    }
1848
1849    #[tokio::test]
1850    async fn distinct_session_count_null_conversation_ids_excluded() {
1851        let store = test_store().await;
1852        // Insert outcomes with NULL conversation_id (legacy rows).
1853        store
1854            .record_skill_outcome("git", None, None, "success", None, None)
1855            .await
1856            .unwrap();
1857        store
1858            .record_skill_outcome("git", None, None, "success", None, None)
1859            .await
1860            .unwrap();
1861        let count = store.distinct_session_count("git").await.unwrap();
1862        assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1863    }
1864
1865    // --- STEM: skill_usage_log ---
1866
1867    #[tokio::test]
1868    async fn insert_and_find_recurring_patterns() {
1869        let store = test_store().await;
1870        let seq = r#"["shell","web_scrape"]"#;
1871        let hash = "abcdef0123456789";
1872        let ctx = "ctxhash000000000";
1873
1874        for _ in 0..3 {
1875            store
1876                .insert_tool_usage_log(seq, hash, ctx, "success", None)
1877                .await
1878                .unwrap();
1879        }
1880        store
1881            .insert_tool_usage_log(seq, hash, ctx, "failure", None)
1882            .await
1883            .unwrap();
1884
1885        let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1886        assert_eq!(patterns.len(), 1);
1887        let (s, h, occ, suc) = &patterns[0];
1888        assert_eq!(s, seq);
1889        assert_eq!(h, hash);
1890        assert_eq!(*occ, 4);
1891        assert_eq!(*suc, 3);
1892    }
1893
1894    #[tokio::test]
1895    async fn find_recurring_patterns_below_threshold_returns_empty() {
1896        let store = test_store().await;
1897        let seq = r#"["shell"]"#;
1898        let hash = "0000000000000001";
1899        let ctx = "0000000000000001";
1900
1901        store
1902            .insert_tool_usage_log(seq, hash, ctx, "success", None)
1903            .await
1904            .unwrap();
1905
1906        let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1907        assert!(patterns.is_empty());
1908    }
1909
1910    #[tokio::test]
1911    async fn prune_tool_usage_log_removes_old_rows() {
1912        let store = test_store().await;
1913        // Insert a row with an artificially old timestamp so it falls within 0-day window.
1914        zeph_db::query(sql!(
1915            "INSERT INTO skill_usage_log \
1916             (tool_sequence, sequence_hash, context_hash, outcome, created_at) \
1917             VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
1918        ))
1919        .bind(r#"["shell"]"#)
1920        .bind("hash0000000000001")
1921        .bind("ctx00000000000001")
1922        .bind("success")
1923        .execute(store.pool())
1924        .await
1925        .unwrap();
1926
1927        // Prune rows older than 1 day — the row above is 2 days old so it must be removed.
1928        let removed = store.prune_tool_usage_log(1).await.unwrap();
1929        assert_eq!(removed, 1);
1930    }
1931
1932    // --- ERL: skill_heuristics ---
1933
1934    #[tokio::test]
1935    async fn insert_and_load_skill_heuristics() {
1936        let store = test_store().await;
1937
1938        let id = store
1939            .insert_skill_heuristic(Some("git"), "always commit in small chunks", 0.9)
1940            .await
1941            .unwrap();
1942        assert!(id > 0);
1943
1944        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1945        assert_eq!(rows.len(), 1);
1946        assert_eq!(rows[0].1, "always commit in small chunks");
1947        assert!((rows[0].2 - 0.9).abs() < 1e-6);
1948    }
1949
1950    #[tokio::test]
1951    async fn load_skill_heuristics_includes_general() {
1952        let store = test_store().await;
1953
1954        store
1955            .insert_skill_heuristic(None, "general tip", 0.7)
1956            .await
1957            .unwrap();
1958        store
1959            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1960            .await
1961            .unwrap();
1962
1963        // querying for "git" should include both the git-specific and the general (NULL) heuristic
1964        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1965        assert_eq!(rows.len(), 2);
1966    }
1967
1968    #[tokio::test]
1969    async fn load_skill_heuristics_filters_by_min_confidence() {
1970        let store = test_store().await;
1971
1972        store
1973            .insert_skill_heuristic(Some("git"), "low confidence tip", 0.3)
1974            .await
1975            .unwrap();
1976        store
1977            .insert_skill_heuristic(Some("git"), "high confidence tip", 0.9)
1978            .await
1979            .unwrap();
1980
1981        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1982        assert_eq!(rows.len(), 1);
1983        assert_eq!(rows[0].1, "high confidence tip");
1984    }
1985
1986    #[tokio::test]
1987    async fn increment_heuristic_use_count_works() {
1988        let store = test_store().await;
1989
1990        let id = store
1991            .insert_skill_heuristic(Some("git"), "tip", 0.8)
1992            .await
1993            .unwrap();
1994
1995        store.increment_heuristic_use_count(id).await.unwrap();
1996        store.increment_heuristic_use_count(id).await.unwrap();
1997
1998        let rows = store.load_skill_heuristics("git", 0.0, 10).await.unwrap();
1999        assert_eq!(rows[0].3, 2); // use_count
2000    }
2001
2002    #[tokio::test]
2003    async fn load_all_heuristics_for_skill_exact_match() {
2004        let store = test_store().await;
2005
2006        store
2007            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
2008            .await
2009            .unwrap();
2010        store
2011            .insert_skill_heuristic(Some("docker"), "docker tip", 0.8)
2012            .await
2013            .unwrap();
2014
2015        let rows = store
2016            .load_all_heuristics_for_skill(Some("git"))
2017            .await
2018            .unwrap();
2019        assert_eq!(rows.len(), 1);
2020        assert_eq!(rows[0].1, "git tip");
2021    }
2022
2023    #[tokio::test]
2024    async fn load_all_heuristics_for_skill_null() {
2025        let store = test_store().await;
2026
2027        store
2028            .insert_skill_heuristic(None, "general", 0.8)
2029            .await
2030            .unwrap();
2031        store
2032            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
2033            .await
2034            .unwrap();
2035
2036        let rows = store.load_all_heuristics_for_skill(None).await.unwrap();
2037        assert_eq!(rows.len(), 1);
2038        assert_eq!(rows[0].1, "general");
2039    }
2040
2041    #[tokio::test]
2042    async fn skill_trust_default_is_quarantined() {
2043        // Verify the DB schema default for skill_trust.trust_level is 'quarantined'.
2044        // ARISE-generated versions do not call set_skill_trust_level, so they inherit
2045        // this default when the trust row is first created by the scanner.
2046        let store = test_store().await;
2047
2048        // Insert a trust row without specifying trust_level to exercise the DEFAULT.
2049        zeph_db::query(sql!(
2050            "INSERT INTO skill_trust (skill_name, blake3_hash) VALUES ('test-arise', 'abc')"
2051        ))
2052        .execute(store.pool())
2053        .await
2054        .unwrap();
2055
2056        let trust: (String,) = zeph_db::query_as(sql!(
2057            "SELECT trust_level FROM skill_trust WHERE skill_name = 'test-arise'"
2058        ))
2059        .fetch_one(store.pool())
2060        .await
2061        .unwrap();
2062
2063        assert_eq!(
2064            trust.0, "quarantined",
2065            "schema default for skill_trust.trust_level must be 'quarantined'"
2066        );
2067    }
2068
2069    #[tokio::test]
2070    async fn save_and_activate_skill_version_activates_new() {
2071        let store = test_store().await;
2072
2073        let id = store
2074            .save_and_activate_skill_version(
2075                "git",
2076                1,
2077                "body v1",
2078                "Git helper",
2079                "manual",
2080                None,
2081                None,
2082            )
2083            .await
2084            .unwrap();
2085        assert!(id > 0);
2086
2087        let active = store.active_skill_version("git").await.unwrap().unwrap();
2088        assert_eq!(active.id, id);
2089        assert_eq!(active.version, 1);
2090        assert_eq!(active.body, "body v1");
2091        assert!(active.is_active);
2092    }
2093
2094    #[tokio::test]
2095    async fn save_and_activate_skill_version_deactivates_previous() {
2096        let store = test_store().await;
2097
2098        let v1 = store
2099            .save_and_activate_skill_version("git", 1, "v1", "desc", "manual", None, None)
2100            .await
2101            .unwrap();
2102
2103        let v2 = store
2104            .save_and_activate_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
2105            .await
2106            .unwrap();
2107
2108        let versions = store.load_skill_versions("git").await.unwrap();
2109        assert_eq!(versions.len(), 2);
2110
2111        let old = versions.iter().find(|v| v.id == v1).unwrap();
2112        let new = versions.iter().find(|v| v.id == v2).unwrap();
2113        assert!(!old.is_active, "previous version must be deactivated");
2114        assert!(new.is_active, "new version must be active");
2115    }
2116
2117    #[tokio::test]
2118    async fn save_and_activate_skill_version_only_one_active() {
2119        let store = test_store().await;
2120
2121        let mut last_id = 0i64;
2122        for i in 1i64..=4 {
2123            last_id = store
2124                .save_and_activate_skill_version(
2125                    "git",
2126                    i,
2127                    &format!("v{i}"),
2128                    "desc",
2129                    "auto",
2130                    None,
2131                    None,
2132                )
2133                .await
2134                .unwrap();
2135        }
2136
2137        let versions = store.load_skill_versions("git").await.unwrap();
2138        let active_count = versions.iter().filter(|v| v.is_active).count();
2139        assert_eq!(active_count, 1, "exactly one version must be active");
2140        assert_eq!(
2141            versions.iter().find(|v| v.is_active).unwrap().id,
2142            last_id,
2143            "the last saved version must be the active one"
2144        );
2145    }
2146
2147    // ── AutoSkill A6: heuristic promotion helpers ─────────────────────────
2148
2149    async fn insert_heuristic(store: &SqliteStore, skill: &str, text: &str, confidence: f64) {
2150        zeph_db::query(sql!(
2151            "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) VALUES (?, ?, ?)"
2152        ))
2153        .bind(skill)
2154        .bind(text)
2155        .bind(confidence)
2156        .execute(store.pool())
2157        .await
2158        .unwrap();
2159    }
2160
2161    /// Regression test for #4531: `NULL` `skill_name` heuristics must be excluded from
2162    /// `count_heuristics_by_skill` so sqlx does not encounter a non-null column mismatch.
2163    #[tokio::test]
2164    async fn count_heuristics_by_skill_excludes_null_skill_name() {
2165        let store = test_store().await;
2166
2167        // Insert heuristics with NULL skill_name (general/unattributed).
2168        store
2169            .insert_skill_heuristic(None, "general tip 1", 0.9)
2170            .await
2171            .unwrap();
2172        store
2173            .insert_skill_heuristic(None, "general tip 2", 0.8)
2174            .await
2175            .unwrap();
2176        store
2177            .insert_skill_heuristic(None, "general tip 3", 0.7)
2178            .await
2179            .unwrap();
2180
2181        // NULL rows alone must not produce any results (no skill_name to group by).
2182        let results = store.count_heuristics_by_skill(0.5, 2).await.unwrap();
2183        assert!(
2184            results.is_empty(),
2185            "NULL skill_name heuristics must be excluded from count_heuristics_by_skill results"
2186        );
2187
2188        // Verify that named skills are still counted correctly alongside NULL rows.
2189        insert_heuristic(&store, "git", "use --no-pager", 0.8).await;
2190        insert_heuristic(&store, "git", "check status first", 0.9).await;
2191
2192        let results = store.count_heuristics_by_skill(0.5, 2).await.unwrap();
2193        assert_eq!(results.len(), 1, "only git should qualify");
2194        assert_eq!(results[0].0, "git");
2195        assert_eq!(results[0].1, 2, "git has exactly 2 qualifying heuristics");
2196    }
2197
2198    #[tokio::test]
2199    async fn count_heuristics_by_skill_threshold_and_confidence() {
2200        let store = test_store().await;
2201
2202        // "git" has 3 heuristics: 2 above min_confidence, 1 below
2203        insert_heuristic(&store, "git", "use --no-pager", 0.8).await;
2204        insert_heuristic(&store, "git", "check status first", 0.9).await;
2205        insert_heuristic(&store, "git", "low confidence hint", 0.3).await;
2206
2207        // "docker" has 1 heuristic above confidence — below min_count=2
2208        insert_heuristic(&store, "docker", "use --rm", 0.7).await;
2209
2210        let results = store.count_heuristics_by_skill(0.5, 2).await.unwrap();
2211        assert_eq!(results.len(), 1, "only git meets threshold");
2212        assert_eq!(results[0].0, "git");
2213        assert_eq!(results[0].1, 2, "only heuristics >= 0.5 confidence counted");
2214    }
2215
2216    #[tokio::test]
2217    async fn load_heuristic_texts_sorted_for_deterministic_hash() {
2218        let store = test_store().await;
2219
2220        insert_heuristic(&store, "git", "zebra", 0.8).await;
2221        insert_heuristic(&store, "git", "alpha", 0.9).await;
2222        insert_heuristic(&store, "git", "middle", 0.7).await;
2223        insert_heuristic(&store, "git", "skipped", 0.2).await; // below min_confidence
2224
2225        let texts = store
2226            .load_heuristic_texts_for_promotion("git", 0.5)
2227            .await
2228            .unwrap();
2229
2230        assert_eq!(texts.len(), 3);
2231        assert_eq!(
2232            texts,
2233            vec!["alpha", "middle", "zebra"],
2234            "must be sorted ASC"
2235        );
2236    }
2237
2238    #[tokio::test]
2239    async fn promotion_already_evaluated_idempotency() {
2240        let store = test_store().await;
2241
2242        // Not yet evaluated.
2243        let found = store
2244            .promotion_already_evaluated("git", "abc123")
2245            .await
2246            .unwrap();
2247        assert!(!found);
2248
2249        store
2250            .record_promotion_evaluation("git", "abc123", "none", None)
2251            .await
2252            .unwrap();
2253
2254        // Now it should be found.
2255        let found = store
2256            .promotion_already_evaluated("git", "abc123")
2257            .await
2258            .unwrap();
2259        assert!(found);
2260    }
2261
2262    #[tokio::test]
2263    async fn record_promotion_evaluation_insert_or_ignore() {
2264        let store = test_store().await;
2265
2266        // First insert — succeeds.
2267        store
2268            .record_promotion_evaluation("git", "hash1", "body_enrichment", Some("git-v2"))
2269            .await
2270            .unwrap();
2271
2272        // Duplicate insert — should not error (INSERT OR IGNORE).
2273        store
2274            .record_promotion_evaluation("git", "hash1", "new_skill", Some("git-extra"))
2275            .await
2276            .unwrap();
2277
2278        // Only one row should exist; recommendation is from the first insert.
2279        let rec: (String,) = zeph_db::query_as(sql!(
2280            "SELECT recommendation FROM skill_heuristic_promotions WHERE skill_name = ? AND batch_hash = ?"
2281        ))
2282        .bind("git")
2283        .bind("hash1")
2284        .fetch_one(store.pool())
2285        .await
2286        .unwrap();
2287        assert_eq!(
2288            rec.0, "body_enrichment",
2289            "first insert wins (INSERT OR IGNORE)"
2290        );
2291    }
2292}