Skip to main content

zeph_memory/store/
skills.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9/// Aggregated usage statistics for a single skill, returned by [`SqliteStore::load_skill_usage`].
10#[derive(Debug)]
11pub struct SkillUsageRow {
12    /// The skill's unique name.
13    pub skill_name: String,
14    /// Total number of times this skill has been invoked.
15    pub invocation_count: i64,
16    /// ISO-8601 timestamp of the most recent invocation.
17    pub last_used_at: String,
18}
19
20/// Per-skill outcome metrics (success/failure counts) returned by [`SqliteStore::load_skill_outcome_stats`].
21#[derive(Debug)]
22pub struct SkillMetricsRow {
23    /// The skill's unique name.
24    pub skill_name: String,
25    /// The version ID this row refers to, if tracked per-version.
26    pub version_id: Option<i64>,
27    /// Total number of recorded outcomes.
28    pub total: i64,
29    /// Number of successful outcomes.
30    pub successes: i64,
31    /// Number of failed outcomes.
32    pub failures: i64,
33}
34
35/// A single persisted skill version, returned by [`SqliteStore::load_skill_versions`] and related queries.
36#[derive(Debug)]
37pub struct SkillVersionRow {
38    /// Primary key assigned by the database.
39    pub id: i64,
40    /// The skill's unique name.
41    pub skill_name: String,
42    /// Monotonically increasing version number within the skill.
43    pub version: i64,
44    /// Full SKILL.md body for this version.
45    pub body: String,
46    /// Human-readable description extracted from the skill body.
47    pub description: String,
48    /// Origin of the version: `"manual"`, `"auto"`, etc.
49    pub source: String,
50    /// Whether this version is currently the active one for the skill.
51    pub is_active: bool,
52    /// Number of successful tool invocations recorded against this version.
53    pub success_count: i64,
54    /// Number of failed tool invocations recorded against this version.
55    pub failure_count: i64,
56    /// ISO-8601 timestamp when this version was created.
57    pub created_at: String,
58}
59
60type SkillVersionTuple = (
61    i64,
62    String,
63    i64,
64    String,
65    String,
66    String,
67    i64,
68    i64,
69    i64,
70    String,
71);
72
73fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
74    SkillVersionRow {
75        id: t.0,
76        skill_name: t.1,
77        version: t.2,
78        body: t.3,
79        description: t.4,
80        source: t.5,
81        is_active: t.6 != 0,
82        success_count: t.7,
83        failure_count: t.8,
84        created_at: t.9,
85    }
86}
87
88impl SqliteStore {
89    /// Record usage of skills (UPSERT: increment count and update timestamp).
90    ///
91    /// All UPSERTs are issued inside a single write transaction to avoid N
92    /// round-trips and WAL contention under concurrent callers.
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if the database operation fails.
97    #[tracing::instrument(skip_all, name = "memory.skills.record_skill_usage")]
98    pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
99        let mut tx = begin_write(&self.pool).await?;
100        for name in skill_names {
101            zeph_db::query(sql!(
102                "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
103                 VALUES (?, 1, CURRENT_TIMESTAMP) \
104                 ON CONFLICT(skill_name) DO UPDATE SET \
105                 invocation_count = invocation_count + 1, \
106                 last_used_at = CURRENT_TIMESTAMP"
107            ))
108            .bind(name)
109            .execute(&mut *tx)
110            .await?;
111        }
112        tx.commit().await?;
113        Ok(())
114    }
115
116    /// Load all skill usage statistics.
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the query fails.
121    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_usage")]
122    pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
123        let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
124            "SELECT skill_name, invocation_count, last_used_at \
125             FROM skill_usage ORDER BY invocation_count DESC"
126        ))
127        .fetch_all(&self.pool)
128        .await?;
129
130        Ok(rows
131            .into_iter()
132            .map(
133                |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
134                    skill_name,
135                    invocation_count,
136                    last_used_at,
137                },
138            )
139            .collect())
140    }
141
142    /// Record a skill outcome event.
143    ///
144    /// # Errors
145    ///
146    /// Returns an error if the insert fails.
147    #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcome")]
148    pub async fn record_skill_outcome(
149        &self,
150        skill_name: &str,
151        version_id: Option<i64>,
152        conversation_id: Option<crate::types::ConversationId>,
153        outcome: &str,
154        error_context: Option<&str>,
155        outcome_detail: Option<&str>,
156    ) -> Result<(), MemoryError> {
157        zeph_db::query(sql!(
158            "INSERT INTO skill_outcomes \
159             (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
160             VALUES (?, ?, ?, ?, ?, ?)"
161        ))
162        .bind(skill_name)
163        .bind(version_id)
164        .bind(conversation_id)
165        .bind(outcome)
166        .bind(error_context)
167        .bind(outcome_detail)
168        .execute(&self.pool)
169        .await?;
170        Ok(())
171    }
172
173    /// Record outcomes for multiple skills in a single transaction.
174    ///
175    /// # Errors
176    ///
177    /// Returns an error if any insert fails (whole batch is rolled back).
178    #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcomes_batch")]
179    pub async fn record_skill_outcomes_batch(
180        &self,
181        skill_names: &[String],
182        conversation_id: Option<crate::types::ConversationId>,
183        outcome: &str,
184        error_context: Option<&str>,
185        outcome_detail: Option<&str>,
186    ) -> Result<(), MemoryError> {
187        // Acquire the write lock up front to avoid DEFERRED read->write upgrades
188        // failing with SQLITE_BUSY_SNAPSHOT under concurrent WAL writers.
189        let mut tx = begin_write(&self.pool).await?;
190
191        let mut version_map: std::collections::HashMap<String, Option<i64>> =
192            std::collections::HashMap::new();
193        for name in skill_names {
194            let vid: Option<(i64,)> = zeph_db::query_as(sql!(
195                "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
196            ))
197            .bind(name)
198            .fetch_optional(&mut *tx)
199            .await?;
200            version_map.insert(name.clone(), vid.map(|r| r.0));
201        }
202
203        for name in skill_names {
204            let version_id = version_map.get(name.as_str()).copied().flatten();
205            zeph_db::query(sql!(
206                "INSERT INTO skill_outcomes \
207                 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
208                 VALUES (?, ?, ?, ?, ?, ?)"
209            ))
210            .bind(name)
211            .bind(version_id)
212            .bind(conversation_id)
213            .bind(outcome)
214            .bind(error_context)
215            .bind(outcome_detail)
216            .execute(&mut *tx)
217            .await?;
218        }
219        tx.commit().await?;
220        Ok(())
221    }
222
223    /// Load metrics for a skill (latest version group).
224    ///
225    /// # Errors
226    ///
227    /// Returns an error if the query fails.
228    #[tracing::instrument(skip_all, name = "memory.skills.skill_metrics")]
229    pub async fn skill_metrics(
230        &self,
231        skill_name: &str,
232    ) -> Result<Option<SkillMetricsRow>, MemoryError> {
233        let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234            "SELECT skill_name, version_id, \
235             COUNT(*) as total, \
236             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238             FROM skill_outcomes WHERE skill_name = ? \
239             AND outcome NOT IN ('user_approval', 'user_rejection') \
240             GROUP BY skill_name, version_id \
241             ORDER BY version_id DESC LIMIT 1"
242        ))
243        .bind(skill_name)
244        .fetch_optional(&self.pool)
245        .await?;
246
247        Ok(row.map(
248            |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249                skill_name,
250                version_id,
251                total,
252                successes,
253                failures,
254            },
255        ))
256    }
257
258    /// Load all skill outcome stats grouped by skill name.
259    ///
260    /// # Errors
261    ///
262    /// Returns an error if the query fails.
263    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_outcome_stats")]
264    pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
265        let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
266            "SELECT skill_name, version_id, \
267             COUNT(*) as total, \
268             SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
269             COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
270             FROM skill_outcomes \
271             GROUP BY skill_name \
272             ORDER BY total DESC"
273        ))
274        .fetch_all(&self.pool)
275        .await?;
276
277        Ok(rows
278            .into_iter()
279            .map(
280                |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
281                    skill_name,
282                    version_id,
283                    total,
284                    successes,
285                    failures,
286                },
287            )
288            .collect())
289    }
290
291    /// Save a new skill version and return its ID.
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the insert fails.
296    #[allow(clippy::too_many_arguments)]
297    // function with many required inputs; a *Params struct would be more verbose without simplifying the call site
298    #[tracing::instrument(skip_all, name = "memory.skills.save_skill_version")]
299    pub async fn save_skill_version(
300        &self,
301        skill_name: &str,
302        version: i64,
303        body: &str,
304        description: &str,
305        source: &str,
306        error_context: Option<&str>,
307        predecessor_id: Option<i64>,
308    ) -> Result<i64, MemoryError> {
309        let row: (i64,) = zeph_db::query_as(sql!(
310            "INSERT INTO skill_versions \
311             (skill_name, version, body, description, source, error_context, predecessor_id) \
312             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
313        ))
314        .bind(skill_name)
315        .bind(version)
316        .bind(body)
317        .bind(description)
318        .bind(source)
319        .bind(error_context)
320        .bind(predecessor_id)
321        .fetch_one(&self.pool)
322        .await?;
323        Ok(row.0)
324    }
325
326    /// Save a new skill version and atomically activate it within a single `BEGIN IMMEDIATE`
327    /// transaction, preventing orphaned saved-but-inactive versions on crash or concurrent access.
328    ///
329    /// Equivalent to calling [`Self::save_skill_version`] followed by [`Self::activate_skill_version`] but
330    /// without the window between the two calls where the DB can be left in an inconsistent state.
331    ///
332    /// # Errors
333    ///
334    /// Returns an error if the insert, deactivate, or activate queries fail. The transaction is
335    /// rolled back automatically on error.
336    #[allow(clippy::too_many_arguments)]
337    #[tracing::instrument(skip_all, name = "memory.skills.save_and_activate_skill_version")]
338    pub async fn save_and_activate_skill_version(
339        &self,
340        skill_name: &str,
341        version: i64,
342        body: &str,
343        description: &str,
344        source: &str,
345        error_context: Option<&str>,
346        predecessor_id: Option<i64>,
347    ) -> Result<i64, MemoryError> {
348        let mut tx = begin_write(&self.pool).await?;
349
350        let row: (i64,) = zeph_db::query_as(sql!(
351            "INSERT INTO skill_versions \
352             (skill_name, version, body, description, source, error_context, predecessor_id) \
353             VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
354        ))
355        .bind(skill_name)
356        .bind(version)
357        .bind(body)
358        .bind(description)
359        .bind(source)
360        .bind(error_context)
361        .bind(predecessor_id)
362        .fetch_one(&mut *tx)
363        .await?;
364        let id = row.0;
365
366        zeph_db::query(sql!(
367            "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
368        ))
369        .bind(skill_name)
370        .execute(&mut *tx)
371        .await?;
372
373        zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
374            .bind(id)
375            .execute(&mut *tx)
376            .await?;
377
378        tx.commit().await?;
379        Ok(id)
380    }
381
382    /// Count the number of distinct conversation sessions in which a skill produced an outcome.
383    ///
384    /// Uses `COUNT(DISTINCT conversation_id)` from `skill_outcomes`. Rows where
385    /// `conversation_id IS NULL` are excluded (legacy rows without session tracking).
386    ///
387    /// # Errors
388    ///
389    /// Returns an error if the query fails.
390    #[tracing::instrument(skip_all, name = "memory.skills.distinct_session_count")]
391    pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
392        let row: (i64,) = zeph_db::query_as(sql!(
393            "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
394             WHERE skill_name = ? AND conversation_id IS NOT NULL"
395        ))
396        .bind(skill_name)
397        .fetch_one(&self.pool)
398        .await?;
399        Ok(row.0)
400    }
401
402    /// Load the active version for a skill.
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if the query fails.
407    #[tracing::instrument(skip_all, name = "memory.skills.active_skill_version")]
408    pub async fn active_skill_version(
409        &self,
410        skill_name: &str,
411    ) -> Result<Option<SkillVersionRow>, MemoryError> {
412        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
413            "SELECT id, skill_name, version, body, description, source, \
414                 is_active, success_count, failure_count, created_at \
415                 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
416        ))
417        .bind(skill_name)
418        .fetch_optional(&self.pool)
419        .await?;
420
421        Ok(row.map(skill_version_from_tuple))
422    }
423
424    /// Activate a specific version (deactivates others for the same skill).
425    ///
426    /// # Errors
427    ///
428    /// Returns an error if the update fails.
429    #[tracing::instrument(skip_all, name = "memory.skills.activate_skill_version")]
430    pub async fn activate_skill_version(
431        &self,
432        skill_name: &str,
433        version_id: i64,
434    ) -> Result<(), MemoryError> {
435        let mut tx = begin_write(&self.pool).await?;
436
437        zeph_db::query(sql!(
438            "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
439        ))
440        .bind(skill_name)
441        .execute(&mut *tx)
442        .await?;
443
444        zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
445            .bind(version_id)
446            .execute(&mut *tx)
447            .await?;
448
449        tx.commit().await?;
450        Ok(())
451    }
452
453    /// Get the next version number for a skill.
454    ///
455    /// # Errors
456    ///
457    /// Returns an error if the query fails.
458    #[tracing::instrument(skip_all, name = "memory.skills.next_skill_version")]
459    pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
460        let row: (i64,) = zeph_db::query_as(sql!(
461            "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
462        ))
463        .bind(skill_name)
464        .fetch_one(&self.pool)
465        .await?;
466        Ok(row.0)
467    }
468
469    /// Get the latest auto-generated version's `created_at` for cooldown check.
470    ///
471    /// # Errors
472    ///
473    /// Returns an error if the query fails.
474    #[tracing::instrument(skip_all, name = "memory.skills.last_improvement_time")]
475    pub async fn last_improvement_time(
476        &self,
477        skill_name: &str,
478    ) -> Result<Option<String>, MemoryError> {
479        let row: Option<(String,)> = zeph_db::query_as(sql!(
480            "SELECT created_at FROM skill_versions \
481             WHERE skill_name = ? AND source = 'auto' \
482             ORDER BY id DESC LIMIT 1"
483        ))
484        .bind(skill_name)
485        .fetch_optional(&self.pool)
486        .await?;
487        Ok(row.map(|r| r.0))
488    }
489
490    /// Ensure a base (v1 manual) version exists for a skill. Idempotent.
491    ///
492    /// The check-then-insert-then-activate sequence runs inside a single write
493    /// transaction so that two concurrent callers cannot both observe
494    /// `existing.is_none()` and race to insert duplicate v1 rows.
495    ///
496    /// # Errors
497    ///
498    /// Returns an error if the DB operation fails.
499    #[tracing::instrument(skip_all, name = "memory.skills.ensure_skill_version_exists")]
500    pub async fn ensure_skill_version_exists(
501        &self,
502        skill_name: &str,
503        body: &str,
504        description: &str,
505    ) -> Result<(), MemoryError> {
506        let mut tx = begin_write(&self.pool).await?;
507
508        let existing: Option<(i64,)> = zeph_db::query_as(sql!(
509            "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
510        ))
511        .bind(skill_name)
512        .fetch_optional(&mut *tx)
513        .await?;
514
515        if existing.is_none() {
516            let row: (i64,) = zeph_db::query_as(sql!(
517                "INSERT INTO skill_versions \
518                 (skill_name, version, body, description, source, error_context, predecessor_id) \
519                 VALUES (?, 1, ?, ?, 'manual', NULL, NULL) RETURNING id"
520            ))
521            .bind(skill_name)
522            .bind(body)
523            .bind(description)
524            .fetch_one(&mut *tx)
525            .await?;
526            let id = row.0;
527
528            zeph_db::query(sql!(
529                "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
530            ))
531            .bind(skill_name)
532            .execute(&mut *tx)
533            .await?;
534
535            zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
536                .bind(id)
537                .execute(&mut *tx)
538                .await?;
539        }
540
541        tx.commit().await?;
542        Ok(())
543    }
544
545    /// Load all versions for a skill, ordered by version number.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error if the query fails.
550    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_versions")]
551    pub async fn load_skill_versions(
552        &self,
553        skill_name: &str,
554    ) -> Result<Vec<SkillVersionRow>, MemoryError> {
555        let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
556            "SELECT id, skill_name, version, body, description, source, \
557                 is_active, success_count, failure_count, created_at \
558                 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
559        ))
560        .bind(skill_name)
561        .fetch_all(&self.pool)
562        .await?;
563
564        Ok(rows.into_iter().map(skill_version_from_tuple).collect())
565    }
566
567    /// Count auto-generated versions for a skill.
568    ///
569    /// # Errors
570    ///
571    /// Returns an error if the query fails.
572    #[tracing::instrument(skip_all, name = "memory.skills.count_auto_versions")]
573    pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
574        let row: (i64,) = zeph_db::query_as(sql!(
575            "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
576        ))
577        .bind(skill_name)
578        .fetch_one(&self.pool)
579        .await?;
580        Ok(row.0)
581    }
582
583    /// Delete oldest non-active auto versions exceeding max limit.
584    /// Returns the number of pruned versions.
585    ///
586    /// # Errors
587    ///
588    /// Returns an error if the delete fails.
589    #[cfg(not(feature = "postgres"))]
590    #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
591    pub async fn prune_skill_versions(
592        &self,
593        skill_name: &str,
594        max_versions: u32,
595    ) -> Result<u32, MemoryError> {
596        let result = zeph_db::query(sql!(
597            "DELETE FROM skill_versions WHERE id IN (\
598                SELECT id FROM skill_versions \
599                WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
600                ORDER BY id ASC \
601                LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
602                    WHERE skill_name = ? AND source = 'auto') - ?)\
603            )"
604        ))
605        .bind(skill_name)
606        .bind(skill_name)
607        .bind(max_versions)
608        .execute(&self.pool)
609        .await?;
610        Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
611    }
612
613    /// Prune old auto-generated skill versions, keeping only the most recent `max_versions`.
614    ///
615    /// # Errors
616    ///
617    /// Returns an error if the database query fails.
618    #[cfg(feature = "postgres")]
619    #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
620    pub async fn prune_skill_versions(
621        &self,
622        skill_name: &str,
623        max_versions: u32,
624    ) -> Result<u32, MemoryError> {
625        let result = zeph_db::query(sql!(
626            "DELETE FROM skill_versions WHERE id IN (\
627                SELECT id FROM skill_versions \
628                WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
629                ORDER BY id DESC \
630                OFFSET ?\
631            )"
632        ))
633        .bind(skill_name)
634        .bind(i64::from(max_versions))
635        .execute(&self.pool)
636        .await?;
637        Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
638    }
639
640    /// Get the predecessor version for rollback.
641    ///
642    /// # Errors
643    ///
644    /// Returns an error if the query fails.
645    #[tracing::instrument(skip_all, name = "memory.skills.predecessor_version")]
646    pub async fn predecessor_version(
647        &self,
648        version_id: i64,
649    ) -> Result<Option<SkillVersionRow>, MemoryError> {
650        let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
651            "SELECT predecessor_id FROM skill_versions WHERE id = ?"
652        ))
653        .bind(version_id)
654        .fetch_optional(&self.pool)
655        .await?;
656
657        let Some((Some(pid),)) = pred_id else {
658            return Ok(None);
659        };
660
661        let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
662            "SELECT id, skill_name, version, body, description, source, \
663                 is_active, success_count, failure_count, created_at \
664                 FROM skill_versions WHERE id = ?"
665        ))
666        .bind(pid)
667        .fetch_optional(&self.pool)
668        .await?;
669
670        Ok(row.map(skill_version_from_tuple))
671    }
672
673    /// Return the skill names for all currently active auto-generated versions.
674    ///
675    /// Used to check rollback eligibility at the start of each agent turn.
676    ///
677    /// # Errors
678    /// Returns [`MemoryError`] on `SQLite` query failure.
679    #[tracing::instrument(skip_all, name = "memory.skills.list_active_auto_versions")]
680    pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
681        let rows: Vec<(String,)> = zeph_db::query_as(sql!(
682            "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
683        ))
684        .fetch_all(&self.pool)
685        .await?;
686        Ok(rows.into_iter().map(|(name,)| name).collect())
687    }
688
689    // --- STEM: skill_usage_log queries ---
690
691    /// Insert a tool usage log entry.
692    ///
693    /// `tool_sequence` must be a normalized compact JSON array (see `stem::normalize_tool_sequence`).
694    /// `sequence_hash` is the 16-char blake3 hex of `tool_sequence`.
695    ///
696    /// # Errors
697    /// Returns [`MemoryError`] on insert failure.
698    #[tracing::instrument(skip_all, name = "memory.skills.insert_tool_usage_log")]
699    pub async fn insert_tool_usage_log(
700        &self,
701        tool_sequence: &str,
702        sequence_hash: &str,
703        context_hash: &str,
704        outcome: &str,
705        conversation_id: Option<crate::types::ConversationId>,
706    ) -> Result<(), MemoryError> {
707        zeph_db::query(sql!(
708            "INSERT INTO skill_usage_log \
709             (tool_sequence, sequence_hash, context_hash, outcome, conversation_id) \
710             VALUES (?, ?, ?, ?, ?)"
711        ))
712        .bind(tool_sequence)
713        .bind(sequence_hash)
714        .bind(context_hash)
715        .bind(outcome)
716        .bind(conversation_id)
717        .execute(&self.pool)
718        .await?;
719        Ok(())
720    }
721
722    /// Find tool sequences that have been seen at least `min_count` times within the last
723    /// `window_days` days.
724    ///
725    /// Returns `(tool_sequence, sequence_hash, occurrence_count, success_count)` tuples.
726    ///
727    /// # Errors
728    /// Returns [`MemoryError`] on query failure.
729    #[cfg(not(feature = "postgres"))]
730    #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
731    pub async fn find_recurring_patterns(
732        &self,
733        min_count: u32,
734        window_days: u32,
735    ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
736        let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
737            "SELECT tool_sequence, sequence_hash, \
738                    COUNT(*) as occurrence_count, \
739                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
740             FROM skill_usage_log \
741             WHERE created_at > datetime('now', '-' || ? || ' days') \
742             GROUP BY sequence_hash \
743             HAVING occurrence_count >= ? \
744             ORDER BY occurrence_count DESC \
745             LIMIT 10"
746        ))
747        .bind(window_days)
748        .bind(min_count)
749        .fetch_all(&self.pool)
750        .await?;
751
752        Ok(rows
753            .into_iter()
754            .map(|(seq, hash, occ, suc)| {
755                (
756                    seq,
757                    hash,
758                    u32::try_from(occ).unwrap_or(u32::MAX),
759                    u32::try_from(suc).unwrap_or(0),
760                )
761            })
762            .collect())
763    }
764
765    /// Find tool+skill combinations used at least `min_count` times within `window_days` days.
766    ///
767    /// # Errors
768    ///
769    /// Returns an error if the database query fails.
770    #[cfg(feature = "postgres")]
771    #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
772    pub async fn find_recurring_patterns(
773        &self,
774        min_count: u32,
775        window_days: u32,
776    ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
777        let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
778            "SELECT tool_sequence, sequence_hash, \
779                    COUNT(*) as occurrence_count, \
780                    SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
781             FROM skill_usage_log \
782             WHERE created_at > NOW() - INTERVAL '1 day' * ? \
783             GROUP BY sequence_hash, tool_sequence \
784             HAVING COUNT(*) >= ? \
785             ORDER BY occurrence_count DESC \
786             LIMIT 10"
787        ))
788        .bind(i64::from(window_days))
789        .bind(i64::from(min_count))
790        .fetch_all(&self.pool)
791        .await?;
792
793        Ok(rows
794            .into_iter()
795            .map(|(seq, hash, occ, suc)| {
796                (
797                    seq,
798                    hash,
799                    u32::try_from(occ).unwrap_or(u32::MAX),
800                    u32::try_from(suc).unwrap_or(0),
801                )
802            })
803            .collect())
804    }
805
806    /// Delete `skill_usage_log` rows older than `retention_days` days.
807    ///
808    /// # Errors
809    /// Returns [`MemoryError`] on delete failure.
810    #[cfg(not(feature = "postgres"))]
811    #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
812    pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
813        let result = zeph_db::query(sql!(
814            "DELETE FROM skill_usage_log \
815             WHERE created_at < datetime('now', '-' || ? || ' days')"
816        ))
817        .bind(retention_days)
818        .execute(&self.pool)
819        .await?;
820        Ok(result.rows_affected())
821    }
822
823    /// Delete tool usage log entries older than `retention_days` days.
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the database query fails.
828    #[cfg(feature = "postgres")]
829    #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
830    pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
831        let result = zeph_db::query(sql!(
832            "DELETE FROM skill_usage_log \
833             WHERE created_at < NOW() - INTERVAL '1 day' * ?"
834        ))
835        .bind(i64::from(retention_days))
836        .execute(&self.pool)
837        .await?;
838        Ok(result.rows_affected())
839    }
840
841    // --- ERL: skill_heuristics queries ---
842
843    /// Insert a new heuristic (no dedup — caller must check first).
844    ///
845    /// # Errors
846    /// Returns [`MemoryError`] on insert failure.
847    #[tracing::instrument(skip_all, name = "memory.skills.insert_skill_heuristic")]
848    pub async fn insert_skill_heuristic(
849        &self,
850        skill_name: Option<&str>,
851        heuristic_text: &str,
852        confidence: f64,
853    ) -> Result<i64, MemoryError> {
854        let row: (i64,) = zeph_db::query_as(sql!(
855            "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) \
856             VALUES (?, ?, ?) RETURNING id"
857        ))
858        .bind(skill_name)
859        .bind(heuristic_text)
860        .bind(confidence)
861        .fetch_one(&self.pool)
862        .await?;
863        Ok(row.0)
864    }
865
866    /// Increment `use_count` and update `updated_at` for an existing heuristic by ID.
867    ///
868    /// # Errors
869    /// Returns [`MemoryError`] on update failure.
870    #[cfg(not(feature = "postgres"))]
871    #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
872    pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
873        zeph_db::query(sql!(
874            "UPDATE skill_heuristics \
875             SET use_count = use_count + 1, updated_at = datetime('now') \
876             WHERE id = ?"
877        ))
878        .bind(id)
879        .execute(&self.pool)
880        .await?;
881        Ok(())
882    }
883
884    /// Increment `use_count` and update `updated_at` for an existing heuristic by ID.
885    ///
886    /// # Errors
887    /// Returns [`MemoryError`] on update failure.
888    #[cfg(feature = "postgres")]
889    #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
890    pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
891        zeph_db::query(sql!(
892            "UPDATE skill_heuristics \
893             SET use_count = use_count + 1, updated_at = CURRENT_TIMESTAMP \
894             WHERE id = $1"
895        ))
896        .bind(id)
897        .execute(&self.pool)
898        .await?;
899        Ok(())
900    }
901
902    /// Load heuristics for a given skill (exact match + NULL/general), ordered by confidence DESC.
903    ///
904    /// Returns `(id, heuristic_text, confidence, use_count)` tuples.
905    /// At most `limit` rows are returned.
906    ///
907    /// # Errors
908    /// Returns [`MemoryError`] on query failure.
909    #[tracing::instrument(skip_all, name = "memory.skills.load_skill_heuristics")]
910    pub async fn load_skill_heuristics(
911        &self,
912        skill_name: &str,
913        min_confidence: f64,
914        limit: u32,
915    ) -> Result<Vec<(i64, String, f64, i64)>, MemoryError> {
916        let rows: Vec<(i64, String, f64, i64)> = zeph_db::query_as(sql!(
917            "SELECT id, heuristic_text, confidence, use_count \
918             FROM skill_heuristics \
919             WHERE (skill_name = ? OR skill_name IS NULL) \
920               AND confidence >= ? \
921             ORDER BY confidence DESC \
922             LIMIT ?"
923        ))
924        .bind(skill_name)
925        .bind(min_confidence)
926        .bind(i64::from(limit))
927        .fetch_all(&self.pool)
928        .await?;
929        Ok(rows)
930    }
931
932    /// Load all heuristics for a skill (for dedup checks), without confidence filter.
933    ///
934    /// Returns `(id, heuristic_text)` tuples.
935    ///
936    /// # Errors
937    /// Returns [`MemoryError`] on query failure.
938    #[tracing::instrument(skip_all, name = "memory.skills.load_all_heuristics_for_skill")]
939    pub async fn load_all_heuristics_for_skill(
940        &self,
941        skill_name: Option<&str>,
942    ) -> Result<Vec<(i64, String)>, MemoryError> {
943        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
944            "SELECT id, heuristic_text FROM skill_heuristics \
945             WHERE (skill_name = ? OR (? IS NULL AND skill_name IS NULL))"
946        ))
947        .bind(skill_name)
948        .bind(skill_name)
949        .fetch_all(&self.pool)
950        .await?;
951        Ok(rows)
952    }
953
954    // --- D2Skill: step-level error corrections ---
955
956    /// Find matching step corrections for a tool failure.
957    ///
958    /// Returns `(id, hint)` pairs ordered by `success_count DESC, use_count DESC`.
959    /// Uses `INSTR` instead of LIKE to avoid wildcard injection from `error_context`.
960    ///
961    /// # Errors
962    ///
963    /// Returns [`MemoryError`] on query failure.
964    #[cfg(not(feature = "postgres"))]
965    #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
966    pub async fn find_step_corrections(
967        &self,
968        skill_name: &str,
969        failure_kind: &str,
970        error_context: &str,
971        tool_name: &str,
972        limit: u32,
973    ) -> Result<Vec<(i64, String)>, MemoryError> {
974        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
975            "SELECT id, hint FROM step_corrections \
976             WHERE skill_name = ? \
977               AND (failure_kind = '' OR failure_kind = ?) \
978               AND (error_substring = '' OR INSTR(?, error_substring) > 0) \
979               AND (tool_name = '' OR tool_name = ?) \
980             ORDER BY success_count DESC, use_count DESC \
981             LIMIT ?"
982        ))
983        .bind(skill_name)
984        .bind(failure_kind)
985        .bind(error_context)
986        .bind(tool_name)
987        .bind(limit)
988        .fetch_all(&self.pool)
989        .await?;
990        Ok(rows)
991    }
992
993    /// Find corrections that match the given skill, failure kind, error context, and tool name.
994    ///
995    /// # Errors
996    ///
997    /// Returns an error if the database query fails.
998    #[cfg(feature = "postgres")]
999    #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
1000    pub async fn find_step_corrections(
1001        &self,
1002        skill_name: &str,
1003        failure_kind: &str,
1004        error_context: &str,
1005        tool_name: &str,
1006        limit: u32,
1007    ) -> Result<Vec<(i64, String)>, MemoryError> {
1008        let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
1009            "SELECT id, hint FROM step_corrections \
1010             WHERE skill_name = ? \
1011               AND (failure_kind = '' OR failure_kind = ?) \
1012               AND (error_substring = '' OR strpos(?, error_substring) > 0) \
1013               AND (tool_name = '' OR tool_name = ?) \
1014             ORDER BY success_count DESC, use_count DESC \
1015             LIMIT ?"
1016        ))
1017        .bind(skill_name)
1018        .bind(failure_kind)
1019        .bind(error_context)
1020        .bind(tool_name)
1021        .bind(i64::from(limit))
1022        .fetch_all(&self.pool)
1023        .await?;
1024        Ok(rows)
1025    }
1026
1027    /// Insert a new step correction (from ARISE trace analysis).
1028    ///
1029    /// Duplicate `(skill_name, failure_kind, error_substring, tool_name)` tuples are silently
1030    /// ignored (handled by `ON CONFLICT IGNORE` in the schema).
1031    ///
1032    /// # Errors
1033    ///
1034    /// Returns [`MemoryError`] on query failure.
1035    #[tracing::instrument(skip_all, name = "memory.skills.insert_step_correction")]
1036    pub async fn insert_step_correction(
1037        &self,
1038        skill_name: &str,
1039        failure_kind: &str,
1040        error_substring: &str,
1041        tool_name: &str,
1042        hint: &str,
1043    ) -> Result<(), MemoryError> {
1044        zeph_db::query(sql!(
1045            "INSERT INTO step_corrections \
1046             (skill_name, failure_kind, error_substring, tool_name, hint) \
1047             VALUES (?, ?, ?, ?, ?) \
1048             ON CONFLICT (skill_name, failure_kind, error_substring, tool_name) DO NOTHING"
1049        ))
1050        .bind(skill_name)
1051        .bind(failure_kind)
1052        .bind(error_substring)
1053        .bind(tool_name)
1054        .bind(hint)
1055        .execute(&self.pool)
1056        .await?;
1057        Ok(())
1058    }
1059
1060    /// Increment `use_count` and optionally `success_count` for a step correction.
1061    ///
1062    /// # Errors
1063    ///
1064    /// Returns [`MemoryError`] on query failure.
1065    #[tracing::instrument(skip_all, name = "memory.skills.record_correction_usage")]
1066    pub async fn record_correction_usage(
1067        &self,
1068        correction_id: i64,
1069        was_successful: bool,
1070    ) -> Result<(), MemoryError> {
1071        if was_successful {
1072            zeph_db::query(sql!(
1073                "UPDATE step_corrections \
1074                 SET use_count = use_count + 1, success_count = success_count + 1 \
1075                 WHERE id = ?"
1076            ))
1077            .bind(correction_id)
1078            .execute(&self.pool)
1079            .await?;
1080        } else {
1081            zeph_db::query(sql!(
1082                "UPDATE step_corrections SET use_count = use_count + 1 WHERE id = ?"
1083            ))
1084            .bind(correction_id)
1085            .execute(&self.pool)
1086            .await?;
1087        }
1088        Ok(())
1089    }
1090
1091    // --- SkillOrchestra: RL routing head weight persistence ---
1092
1093    /// Load routing head weights blob from the singleton row.
1094    ///
1095    /// Returns `(embed_dim, weights_bytes, baseline, update_count)` or `None` if not yet trained.
1096    ///
1097    /// # Errors
1098    ///
1099    /// Returns [`MemoryError`] on query failure.
1100    #[tracing::instrument(skip_all, name = "memory.skills.load_routing_head_weights")]
1101    #[allow(clippy::type_complexity)]
1102    pub async fn load_routing_head_weights(
1103        &self,
1104    ) -> Result<Option<(i64, Vec<u8>, f64, i64)>, MemoryError> {
1105        let row: Option<(i64, Vec<u8>, f64, i64)> = zeph_db::query_as(sql!(
1106            "SELECT embed_dim, weights, baseline, update_count \
1107             FROM routing_head_weights WHERE id = 1"
1108        ))
1109        .fetch_optional(&self.pool)
1110        .await?;
1111        Ok(row)
1112    }
1113
1114    /// Persist routing head weights (upsert singleton row).
1115    ///
1116    /// # Errors
1117    ///
1118    /// Returns [`MemoryError`] on query failure.
1119    #[cfg(not(feature = "postgres"))]
1120    #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1121    pub async fn save_routing_head_weights(
1122        &self,
1123        embed_dim: i64,
1124        weights: &[u8],
1125        baseline: f64,
1126        update_count: i64,
1127    ) -> Result<(), MemoryError> {
1128        zeph_db::query(sql!(
1129            "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1130             VALUES (1, ?, ?, ?, ?, datetime('now')) \
1131             ON CONFLICT(id) DO UPDATE SET \
1132               embed_dim = excluded.embed_dim, \
1133               weights = excluded.weights, \
1134               baseline = excluded.baseline, \
1135               update_count = excluded.update_count, \
1136               updated_at = datetime('now')"
1137        ))
1138        .bind(embed_dim)
1139        .bind(weights)
1140        .bind(baseline)
1141        .bind(update_count)
1142        .execute(&self.pool)
1143        .await?;
1144        Ok(())
1145    }
1146
1147    /// Persist routing head weights (upsert singleton row).
1148    ///
1149    /// # Errors
1150    ///
1151    /// Returns [`MemoryError`] on query failure.
1152    #[cfg(feature = "postgres")]
1153    #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1154    pub async fn save_routing_head_weights(
1155        &self,
1156        embed_dim: i64,
1157        weights: &[u8],
1158        baseline: f64,
1159        update_count: i64,
1160    ) -> Result<(), MemoryError> {
1161        zeph_db::query(sql!(
1162            "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1163             VALUES (1, $1, $2, $3, $4, CURRENT_TIMESTAMP) \
1164             ON CONFLICT(id) DO UPDATE SET \
1165               embed_dim = excluded.embed_dim, \
1166               weights = excluded.weights, \
1167               baseline = excluded.baseline, \
1168               update_count = excluded.update_count, \
1169               updated_at = CURRENT_TIMESTAMP"
1170        ))
1171        .bind(embed_dim)
1172        .bind(weights)
1173        .bind(baseline)
1174        .bind(update_count)
1175        .execute(&self.pool)
1176        .await?;
1177        Ok(())
1178    }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183    use std::time::Duration;
1184
1185    use tokio::time::sleep;
1186
1187    use super::*;
1188
1189    async fn test_store() -> SqliteStore {
1190        SqliteStore::new(":memory:").await.unwrap()
1191    }
1192
1193    #[tokio::test]
1194    async fn record_skill_usage_increments() {
1195        let store = test_store().await;
1196
1197        store.record_skill_usage(&["git"]).await.unwrap();
1198        store.record_skill_usage(&["git"]).await.unwrap();
1199
1200        let usage = store.load_skill_usage().await.unwrap();
1201        assert_eq!(usage.len(), 1);
1202        assert_eq!(usage[0].skill_name, "git");
1203        assert_eq!(usage[0].invocation_count, 2);
1204    }
1205
1206    #[tokio::test]
1207    async fn load_skill_usage_returns_all() {
1208        let store = test_store().await;
1209
1210        store.record_skill_usage(&["git", "docker"]).await.unwrap();
1211        store.record_skill_usage(&["git"]).await.unwrap();
1212
1213        let usage = store.load_skill_usage().await.unwrap();
1214        assert_eq!(usage.len(), 2);
1215        assert_eq!(usage[0].skill_name, "git");
1216        assert_eq!(usage[0].invocation_count, 2);
1217        assert_eq!(usage[1].skill_name, "docker");
1218        assert_eq!(usage[1].invocation_count, 1);
1219    }
1220
1221    #[tokio::test]
1222    async fn migration_005_creates_tables() {
1223        let store = test_store().await;
1224        let pool = store.pool();
1225
1226        let versions: (i64,) = zeph_db::query_as(sql!(
1227            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
1228        ))
1229        .fetch_one(pool)
1230        .await
1231        .unwrap();
1232        assert_eq!(versions.0, 1);
1233
1234        let outcomes: (i64,) = zeph_db::query_as(sql!(
1235            "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
1236        ))
1237        .fetch_one(pool)
1238        .await
1239        .unwrap();
1240        assert_eq!(outcomes.0, 1);
1241    }
1242
1243    #[tokio::test]
1244    async fn record_skill_outcome_inserts() {
1245        let store = test_store().await;
1246
1247        store
1248            .record_skill_outcome(
1249                "git",
1250                None,
1251                Some(crate::types::ConversationId(1)),
1252                "success",
1253                None,
1254                None,
1255            )
1256            .await
1257            .unwrap();
1258        store
1259            .record_skill_outcome(
1260                "git",
1261                None,
1262                Some(crate::types::ConversationId(1)),
1263                "tool_failure",
1264                Some("exit code 1"),
1265                None,
1266            )
1267            .await
1268            .unwrap();
1269
1270        let metrics = store.skill_metrics("git").await.unwrap().unwrap();
1271        assert_eq!(metrics.total, 2);
1272        assert_eq!(metrics.successes, 1);
1273        assert_eq!(metrics.failures, 1);
1274    }
1275
1276    #[tokio::test]
1277    async fn skill_metrics_none_for_unknown() {
1278        let store = test_store().await;
1279        let m = store.skill_metrics("nonexistent").await.unwrap();
1280        assert!(m.is_none());
1281    }
1282
1283    #[tokio::test]
1284    async fn load_skill_outcome_stats_grouped() {
1285        let store = test_store().await;
1286
1287        store
1288            .record_skill_outcome("git", None, None, "success", None, None)
1289            .await
1290            .unwrap();
1291        store
1292            .record_skill_outcome("git", None, None, "tool_failure", None, None)
1293            .await
1294            .unwrap();
1295        store
1296            .record_skill_outcome("docker", None, None, "success", None, None)
1297            .await
1298            .unwrap();
1299
1300        let stats = store.load_skill_outcome_stats().await.unwrap();
1301        assert_eq!(stats.len(), 2);
1302        assert_eq!(stats[0].skill_name, "git");
1303        assert_eq!(stats[0].total, 2);
1304        assert_eq!(stats[1].skill_name, "docker");
1305        assert_eq!(stats[1].total, 1);
1306    }
1307
1308    #[tokio::test]
1309    async fn save_and_load_skill_version() {
1310        let store = test_store().await;
1311
1312        let id = store
1313            .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
1314            .await
1315            .unwrap();
1316        assert!(id > 0);
1317
1318        store.activate_skill_version("git", id).await.unwrap();
1319
1320        let active = store.active_skill_version("git").await.unwrap().unwrap();
1321        assert_eq!(active.version, 1);
1322        assert_eq!(active.body, "body v1");
1323        assert!(active.is_active);
1324    }
1325
1326    #[tokio::test]
1327    async fn activate_deactivates_previous() {
1328        let store = test_store().await;
1329
1330        let v1 = store
1331            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1332            .await
1333            .unwrap();
1334        store.activate_skill_version("git", v1).await.unwrap();
1335
1336        let v2 = store
1337            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1338            .await
1339            .unwrap();
1340        store.activate_skill_version("git", v2).await.unwrap();
1341
1342        let versions = store.load_skill_versions("git").await.unwrap();
1343        assert_eq!(versions.len(), 2);
1344        assert!(!versions[0].is_active);
1345        assert!(versions[1].is_active);
1346    }
1347
1348    #[tokio::test]
1349    async fn next_skill_version_increments() {
1350        let store = test_store().await;
1351
1352        let next = store.next_skill_version("git").await.unwrap();
1353        assert_eq!(next, 1);
1354
1355        store
1356            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1357            .await
1358            .unwrap();
1359        let next = store.next_skill_version("git").await.unwrap();
1360        assert_eq!(next, 2);
1361    }
1362
1363    #[tokio::test]
1364    async fn last_improvement_time_returns_auto_only() {
1365        let store = test_store().await;
1366
1367        store
1368            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1369            .await
1370            .unwrap();
1371
1372        let t = store.last_improvement_time("git").await.unwrap();
1373        assert!(t.is_none());
1374
1375        store
1376            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1377            .await
1378            .unwrap();
1379
1380        let t = store.last_improvement_time("git").await.unwrap();
1381        assert!(t.is_some());
1382    }
1383
1384    #[tokio::test]
1385    async fn ensure_skill_version_exists_idempotent() {
1386        let store = test_store().await;
1387
1388        store
1389            .ensure_skill_version_exists("git", "body", "Git helper")
1390            .await
1391            .unwrap();
1392        store
1393            .ensure_skill_version_exists("git", "body2", "Git helper 2")
1394            .await
1395            .unwrap();
1396
1397        let versions = store.load_skill_versions("git").await.unwrap();
1398        assert_eq!(versions.len(), 1);
1399        assert_eq!(versions[0].body, "body");
1400    }
1401
1402    #[tokio::test]
1403    async fn load_skill_versions_ordered() {
1404        let store = test_store().await;
1405
1406        let v1 = store
1407            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1408            .await
1409            .unwrap();
1410        store
1411            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1412            .await
1413            .unwrap();
1414
1415        let versions = store.load_skill_versions("git").await.unwrap();
1416        assert_eq!(versions.len(), 2);
1417        assert_eq!(versions[0].version, 1);
1418        assert_eq!(versions[1].version, 2);
1419    }
1420
1421    #[tokio::test]
1422    async fn count_auto_versions_only() {
1423        let store = test_store().await;
1424
1425        store
1426            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1427            .await
1428            .unwrap();
1429        store
1430            .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1431            .await
1432            .unwrap();
1433        store
1434            .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
1435            .await
1436            .unwrap();
1437
1438        let count = store.count_auto_versions("git").await.unwrap();
1439        assert_eq!(count, 2);
1440    }
1441
1442    #[tokio::test]
1443    async fn prune_preserves_manual_and_active() {
1444        let store = test_store().await;
1445
1446        let v1 = store
1447            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1448            .await
1449            .unwrap();
1450        store.activate_skill_version("git", v1).await.unwrap();
1451
1452        for i in 2..=5 {
1453            store
1454                .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
1455                .await
1456                .unwrap();
1457        }
1458
1459        let pruned = store.prune_skill_versions("git", 2).await.unwrap();
1460        assert_eq!(pruned, 2);
1461
1462        let versions = store.load_skill_versions("git").await.unwrap();
1463        assert!(versions.iter().any(|v| v.source == "manual"));
1464        let auto_count = versions.iter().filter(|v| v.source == "auto").count();
1465        assert_eq!(auto_count, 2);
1466    }
1467
1468    #[tokio::test]
1469    async fn predecessor_version_returns_parent() {
1470        let store = test_store().await;
1471
1472        let v1 = store
1473            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1474            .await
1475            .unwrap();
1476        let v2 = store
1477            .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1478            .await
1479            .unwrap();
1480
1481        let pred = store.predecessor_version(v2).await.unwrap().unwrap();
1482        assert_eq!(pred.id, v1);
1483        assert_eq!(pred.version, 1);
1484    }
1485
1486    #[tokio::test]
1487    async fn predecessor_version_none_for_root() {
1488        let store = test_store().await;
1489
1490        let v1 = store
1491            .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1492            .await
1493            .unwrap();
1494
1495        let pred = store.predecessor_version(v1).await.unwrap();
1496        assert!(pred.is_none());
1497    }
1498
1499    #[tokio::test]
1500    async fn active_skill_version_none_for_unknown() {
1501        let store = test_store().await;
1502        let active = store.active_skill_version("nonexistent").await.unwrap();
1503        assert!(active.is_none());
1504    }
1505
1506    #[tokio::test]
1507    async fn load_skill_outcome_stats_empty() {
1508        let store = test_store().await;
1509        let stats = store.load_skill_outcome_stats().await.unwrap();
1510        assert!(stats.is_empty());
1511    }
1512
1513    #[tokio::test]
1514    async fn load_skill_versions_empty() {
1515        let store = test_store().await;
1516        let versions = store.load_skill_versions("nonexistent").await.unwrap();
1517        assert!(versions.is_empty());
1518    }
1519
1520    #[tokio::test]
1521    async fn count_auto_versions_zero_for_unknown() {
1522        let store = test_store().await;
1523        let count = store.count_auto_versions("nonexistent").await.unwrap();
1524        assert_eq!(count, 0);
1525    }
1526
1527    #[tokio::test]
1528    async fn prune_nothing_when_below_limit() {
1529        let store = test_store().await;
1530
1531        store
1532            .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
1533            .await
1534            .unwrap();
1535
1536        let pruned = store.prune_skill_versions("git", 5).await.unwrap();
1537        assert_eq!(pruned, 0);
1538    }
1539
1540    #[tokio::test]
1541    async fn record_skill_outcome_with_error_context() {
1542        let store = test_store().await;
1543
1544        store
1545            .record_skill_outcome(
1546                "docker",
1547                None,
1548                Some(crate::types::ConversationId(1)),
1549                "tool_failure",
1550                Some("container not found"),
1551                None,
1552            )
1553            .await
1554            .unwrap();
1555
1556        let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
1557        assert_eq!(metrics.total, 1);
1558        assert_eq!(metrics.failures, 1);
1559    }
1560
1561    #[tokio::test]
1562    async fn save_skill_version_with_error_context() {
1563        let store = test_store().await;
1564
1565        let id = store
1566            .save_skill_version(
1567                "git",
1568                1,
1569                "improved body",
1570                "Git helper",
1571                "auto",
1572                Some("exit code 128"),
1573                None,
1574            )
1575            .await
1576            .unwrap();
1577        assert!(id > 0);
1578    }
1579
1580    #[tokio::test]
1581    async fn record_skill_outcomes_batch_resolves_version_id() {
1582        let store = test_store().await;
1583
1584        let vid = store
1585            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1586            .await
1587            .unwrap();
1588        store.activate_skill_version("git", vid).await.unwrap();
1589
1590        store
1591            .record_skill_outcomes_batch(
1592                &["git".to_string()],
1593                None,
1594                "tool_failure",
1595                Some("exit code 1"),
1596                Some("exit_nonzero"),
1597            )
1598            .await
1599            .unwrap();
1600
1601        let pool = store.pool();
1602        let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
1603            "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
1604        ))
1605        .fetch_one(pool)
1606        .await
1607        .unwrap();
1608        assert_eq!(
1609            row.0,
1610            Some(vid),
1611            "version_id should be resolved to active version"
1612        );
1613        assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
1614    }
1615
1616    #[tokio::test]
1617    async fn record_skill_outcome_stores_outcome_detail() {
1618        let store = test_store().await;
1619
1620        store
1621            .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
1622            .await
1623            .unwrap();
1624
1625        let pool = store.pool();
1626        let row: (Option<String>,) = zeph_db::query_as(sql!(
1627            "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
1628        ))
1629        .fetch_one(pool)
1630        .await
1631        .unwrap();
1632        assert_eq!(row.0.as_deref(), Some("timeout"));
1633    }
1634
1635    #[tokio::test]
1636    async fn record_skill_outcomes_batch_waits_for_active_writer() {
1637        let file = tempfile::NamedTempFile::new().expect("tempfile");
1638        let path = file.path().to_str().expect("valid path").to_owned();
1639        let store = SqliteStore::with_pool_size(&path, 2)
1640            .await
1641            .expect("with_pool_size");
1642
1643        let vid = store
1644            .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1645            .await
1646            .unwrap();
1647        store.activate_skill_version("git", vid).await.unwrap();
1648
1649        let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1650        zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1651            .execute(&mut *writer_tx)
1652            .await
1653            .expect("hold write lock");
1654
1655        let batch_store = store.clone();
1656        let batch = tokio::spawn(async move {
1657            batch_store
1658                .record_skill_outcomes_batch(
1659                    &["git".to_string()],
1660                    None,
1661                    "success",
1662                    None,
1663                    Some("waited_for_writer"),
1664                )
1665                .await
1666        });
1667
1668        sleep(Duration::from_millis(100)).await;
1669        writer_tx.commit().await.expect("commit writer");
1670
1671        batch
1672            .await
1673            .expect("join batch task")
1674            .expect("record outcomes");
1675
1676        let count: i64 = zeph_db::query_scalar(sql!(
1677            "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1678        ))
1679        .fetch_one(store.pool())
1680        .await
1681        .unwrap();
1682        assert_eq!(
1683            count, 1,
1684            "expected batch insert to succeed after writer commits"
1685        );
1686    }
1687
1688    #[tokio::test]
1689    async fn distinct_session_count_empty() {
1690        let store = test_store().await;
1691        let count = store.distinct_session_count("unknown-skill").await.unwrap();
1692        assert_eq!(count, 0);
1693    }
1694
1695    #[tokio::test]
1696    async fn distinct_session_count_single_session() {
1697        let store = test_store().await;
1698        let cid = crate::types::ConversationId(1);
1699        store
1700            .record_skill_outcome("git", None, Some(cid), "success", None, None)
1701            .await
1702            .unwrap();
1703        store
1704            .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1705            .await
1706            .unwrap();
1707        let count = store.distinct_session_count("git").await.unwrap();
1708        assert_eq!(count, 1);
1709    }
1710
1711    #[tokio::test]
1712    async fn distinct_session_count_multiple_sessions() {
1713        let store = test_store().await;
1714        for i in 0..3i64 {
1715            store
1716                .record_skill_outcome(
1717                    "git",
1718                    None,
1719                    Some(crate::types::ConversationId(i)),
1720                    "success",
1721                    None,
1722                    None,
1723                )
1724                .await
1725                .unwrap();
1726        }
1727        let count = store.distinct_session_count("git").await.unwrap();
1728        assert_eq!(count, 3);
1729    }
1730
1731    #[tokio::test]
1732    async fn distinct_session_count_null_conversation_ids_excluded() {
1733        let store = test_store().await;
1734        // Insert outcomes with NULL conversation_id (legacy rows).
1735        store
1736            .record_skill_outcome("git", None, None, "success", None, None)
1737            .await
1738            .unwrap();
1739        store
1740            .record_skill_outcome("git", None, None, "success", None, None)
1741            .await
1742            .unwrap();
1743        let count = store.distinct_session_count("git").await.unwrap();
1744        assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1745    }
1746
1747    // --- STEM: skill_usage_log ---
1748
1749    #[tokio::test]
1750    async fn insert_and_find_recurring_patterns() {
1751        let store = test_store().await;
1752        let seq = r#"["shell","web_scrape"]"#;
1753        let hash = "abcdef0123456789";
1754        let ctx = "ctxhash000000000";
1755
1756        for _ in 0..3 {
1757            store
1758                .insert_tool_usage_log(seq, hash, ctx, "success", None)
1759                .await
1760                .unwrap();
1761        }
1762        store
1763            .insert_tool_usage_log(seq, hash, ctx, "failure", None)
1764            .await
1765            .unwrap();
1766
1767        let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1768        assert_eq!(patterns.len(), 1);
1769        let (s, h, occ, suc) = &patterns[0];
1770        assert_eq!(s, seq);
1771        assert_eq!(h, hash);
1772        assert_eq!(*occ, 4);
1773        assert_eq!(*suc, 3);
1774    }
1775
1776    #[tokio::test]
1777    async fn find_recurring_patterns_below_threshold_returns_empty() {
1778        let store = test_store().await;
1779        let seq = r#"["shell"]"#;
1780        let hash = "0000000000000001";
1781        let ctx = "0000000000000001";
1782
1783        store
1784            .insert_tool_usage_log(seq, hash, ctx, "success", None)
1785            .await
1786            .unwrap();
1787
1788        let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1789        assert!(patterns.is_empty());
1790    }
1791
1792    #[tokio::test]
1793    async fn prune_tool_usage_log_removes_old_rows() {
1794        let store = test_store().await;
1795        // Insert a row with an artificially old timestamp so it falls within 0-day window.
1796        zeph_db::query(sql!(
1797            "INSERT INTO skill_usage_log \
1798             (tool_sequence, sequence_hash, context_hash, outcome, created_at) \
1799             VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
1800        ))
1801        .bind(r#"["shell"]"#)
1802        .bind("hash0000000000001")
1803        .bind("ctx00000000000001")
1804        .bind("success")
1805        .execute(store.pool())
1806        .await
1807        .unwrap();
1808
1809        // Prune rows older than 1 day — the row above is 2 days old so it must be removed.
1810        let removed = store.prune_tool_usage_log(1).await.unwrap();
1811        assert_eq!(removed, 1);
1812    }
1813
1814    // --- ERL: skill_heuristics ---
1815
1816    #[tokio::test]
1817    async fn insert_and_load_skill_heuristics() {
1818        let store = test_store().await;
1819
1820        let id = store
1821            .insert_skill_heuristic(Some("git"), "always commit in small chunks", 0.9)
1822            .await
1823            .unwrap();
1824        assert!(id > 0);
1825
1826        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1827        assert_eq!(rows.len(), 1);
1828        assert_eq!(rows[0].1, "always commit in small chunks");
1829        assert!((rows[0].2 - 0.9).abs() < 1e-6);
1830    }
1831
1832    #[tokio::test]
1833    async fn load_skill_heuristics_includes_general() {
1834        let store = test_store().await;
1835
1836        store
1837            .insert_skill_heuristic(None, "general tip", 0.7)
1838            .await
1839            .unwrap();
1840        store
1841            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1842            .await
1843            .unwrap();
1844
1845        // querying for "git" should include both the git-specific and the general (NULL) heuristic
1846        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1847        assert_eq!(rows.len(), 2);
1848    }
1849
1850    #[tokio::test]
1851    async fn load_skill_heuristics_filters_by_min_confidence() {
1852        let store = test_store().await;
1853
1854        store
1855            .insert_skill_heuristic(Some("git"), "low confidence tip", 0.3)
1856            .await
1857            .unwrap();
1858        store
1859            .insert_skill_heuristic(Some("git"), "high confidence tip", 0.9)
1860            .await
1861            .unwrap();
1862
1863        let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1864        assert_eq!(rows.len(), 1);
1865        assert_eq!(rows[0].1, "high confidence tip");
1866    }
1867
1868    #[tokio::test]
1869    async fn increment_heuristic_use_count_works() {
1870        let store = test_store().await;
1871
1872        let id = store
1873            .insert_skill_heuristic(Some("git"), "tip", 0.8)
1874            .await
1875            .unwrap();
1876
1877        store.increment_heuristic_use_count(id).await.unwrap();
1878        store.increment_heuristic_use_count(id).await.unwrap();
1879
1880        let rows = store.load_skill_heuristics("git", 0.0, 10).await.unwrap();
1881        assert_eq!(rows[0].3, 2); // use_count
1882    }
1883
1884    #[tokio::test]
1885    async fn load_all_heuristics_for_skill_exact_match() {
1886        let store = test_store().await;
1887
1888        store
1889            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1890            .await
1891            .unwrap();
1892        store
1893            .insert_skill_heuristic(Some("docker"), "docker tip", 0.8)
1894            .await
1895            .unwrap();
1896
1897        let rows = store
1898            .load_all_heuristics_for_skill(Some("git"))
1899            .await
1900            .unwrap();
1901        assert_eq!(rows.len(), 1);
1902        assert_eq!(rows[0].1, "git tip");
1903    }
1904
1905    #[tokio::test]
1906    async fn load_all_heuristics_for_skill_null() {
1907        let store = test_store().await;
1908
1909        store
1910            .insert_skill_heuristic(None, "general", 0.8)
1911            .await
1912            .unwrap();
1913        store
1914            .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1915            .await
1916            .unwrap();
1917
1918        let rows = store.load_all_heuristics_for_skill(None).await.unwrap();
1919        assert_eq!(rows.len(), 1);
1920        assert_eq!(rows[0].1, "general");
1921    }
1922
1923    #[tokio::test]
1924    async fn skill_trust_default_is_quarantined() {
1925        // Verify the DB schema default for skill_trust.trust_level is 'quarantined'.
1926        // ARISE-generated versions do not call set_skill_trust_level, so they inherit
1927        // this default when the trust row is first created by the scanner.
1928        let store = test_store().await;
1929
1930        // Insert a trust row without specifying trust_level to exercise the DEFAULT.
1931        zeph_db::query(sql!(
1932            "INSERT INTO skill_trust (skill_name, blake3_hash) VALUES ('test-arise', 'abc')"
1933        ))
1934        .execute(store.pool())
1935        .await
1936        .unwrap();
1937
1938        let trust: (String,) = zeph_db::query_as(sql!(
1939            "SELECT trust_level FROM skill_trust WHERE skill_name = 'test-arise'"
1940        ))
1941        .fetch_one(store.pool())
1942        .await
1943        .unwrap();
1944
1945        assert_eq!(
1946            trust.0, "quarantined",
1947            "schema default for skill_trust.trust_level must be 'quarantined'"
1948        );
1949    }
1950
1951    #[tokio::test]
1952    async fn save_and_activate_skill_version_activates_new() {
1953        let store = test_store().await;
1954
1955        let id = store
1956            .save_and_activate_skill_version(
1957                "git",
1958                1,
1959                "body v1",
1960                "Git helper",
1961                "manual",
1962                None,
1963                None,
1964            )
1965            .await
1966            .unwrap();
1967        assert!(id > 0);
1968
1969        let active = store.active_skill_version("git").await.unwrap().unwrap();
1970        assert_eq!(active.id, id);
1971        assert_eq!(active.version, 1);
1972        assert_eq!(active.body, "body v1");
1973        assert!(active.is_active);
1974    }
1975
1976    #[tokio::test]
1977    async fn save_and_activate_skill_version_deactivates_previous() {
1978        let store = test_store().await;
1979
1980        let v1 = store
1981            .save_and_activate_skill_version("git", 1, "v1", "desc", "manual", None, None)
1982            .await
1983            .unwrap();
1984
1985        let v2 = store
1986            .save_and_activate_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1987            .await
1988            .unwrap();
1989
1990        let versions = store.load_skill_versions("git").await.unwrap();
1991        assert_eq!(versions.len(), 2);
1992
1993        let old = versions.iter().find(|v| v.id == v1).unwrap();
1994        let new = versions.iter().find(|v| v.id == v2).unwrap();
1995        assert!(!old.is_active, "previous version must be deactivated");
1996        assert!(new.is_active, "new version must be active");
1997    }
1998
1999    #[tokio::test]
2000    async fn save_and_activate_skill_version_only_one_active() {
2001        let store = test_store().await;
2002
2003        let mut last_id = 0i64;
2004        for i in 1i64..=4 {
2005            last_id = store
2006                .save_and_activate_skill_version(
2007                    "git",
2008                    i,
2009                    &format!("v{i}"),
2010                    "desc",
2011                    "auto",
2012                    None,
2013                    None,
2014                )
2015                .await
2016                .unwrap();
2017        }
2018
2019        let versions = store.load_skill_versions("git").await.unwrap();
2020        let active_count = versions.iter().filter(|v| v.is_active).count();
2021        assert_eq!(active_count, 1, "exactly one version must be active");
2022        assert_eq!(
2023            versions.iter().find(|v| v.is_active).unwrap().id,
2024            last_id,
2025            "the last saved version must be the active one"
2026        );
2027    }
2028}