1use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9#[derive(Debug)]
10pub struct SkillUsageRow {
11 pub skill_name: String,
12 pub invocation_count: i64,
13 pub last_used_at: String,
14}
15
16#[derive(Debug)]
17pub struct SkillMetricsRow {
18 pub skill_name: String,
19 pub version_id: Option<i64>,
20 pub total: i64,
21 pub successes: i64,
22 pub failures: i64,
23}
24
25#[derive(Debug)]
26pub struct SkillVersionRow {
27 pub id: i64,
28 pub skill_name: String,
29 pub version: i64,
30 pub body: String,
31 pub description: String,
32 pub source: String,
33 pub is_active: bool,
34 pub success_count: i64,
35 pub failure_count: i64,
36 pub created_at: String,
37}
38
39type SkillVersionTuple = (
40 i64,
41 String,
42 i64,
43 String,
44 String,
45 String,
46 i64,
47 i64,
48 i64,
49 String,
50);
51
52fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
53 SkillVersionRow {
54 id: t.0,
55 skill_name: t.1,
56 version: t.2,
57 body: t.3,
58 description: t.4,
59 source: t.5,
60 is_active: t.6 != 0,
61 success_count: t.7,
62 failure_count: t.8,
63 created_at: t.9,
64 }
65}
66
67impl SqliteStore {
68 pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
74 for name in skill_names {
75 zeph_db::query(sql!(
76 "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
77 VALUES (?, 1, CURRENT_TIMESTAMP) \
78 ON CONFLICT(skill_name) DO UPDATE SET \
79 invocation_count = invocation_count + 1, \
80 last_used_at = CURRENT_TIMESTAMP"
81 ))
82 .bind(name)
83 .execute(&self.pool)
84 .await?;
85 }
86 Ok(())
87 }
88
89 pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
95 let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
96 "SELECT skill_name, invocation_count, last_used_at \
97 FROM skill_usage ORDER BY invocation_count DESC"
98 ))
99 .fetch_all(&self.pool)
100 .await?;
101
102 Ok(rows
103 .into_iter()
104 .map(
105 |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
106 skill_name,
107 invocation_count,
108 last_used_at,
109 },
110 )
111 .collect())
112 }
113
114 pub async fn record_skill_outcome(
120 &self,
121 skill_name: &str,
122 version_id: Option<i64>,
123 conversation_id: Option<crate::types::ConversationId>,
124 outcome: &str,
125 error_context: Option<&str>,
126 outcome_detail: Option<&str>,
127 ) -> Result<(), MemoryError> {
128 zeph_db::query(sql!(
129 "INSERT INTO skill_outcomes \
130 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
131 VALUES (?, ?, ?, ?, ?, ?)"
132 ))
133 .bind(skill_name)
134 .bind(version_id)
135 .bind(conversation_id)
136 .bind(outcome)
137 .bind(error_context)
138 .bind(outcome_detail)
139 .execute(&self.pool)
140 .await?;
141 Ok(())
142 }
143
144 pub async fn record_skill_outcomes_batch(
150 &self,
151 skill_names: &[String],
152 conversation_id: Option<crate::types::ConversationId>,
153 outcome: &str,
154 error_context: Option<&str>,
155 outcome_detail: Option<&str>,
156 ) -> Result<(), MemoryError> {
157 let mut tx = begin_write(&self.pool).await?;
160
161 let mut version_map: std::collections::HashMap<String, Option<i64>> =
162 std::collections::HashMap::new();
163 for name in skill_names {
164 let vid: Option<(i64,)> = zeph_db::query_as(sql!(
165 "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
166 ))
167 .bind(name)
168 .fetch_optional(&mut *tx)
169 .await?;
170 version_map.insert(name.clone(), vid.map(|r| r.0));
171 }
172
173 for name in skill_names {
174 let version_id = version_map.get(name.as_str()).copied().flatten();
175 zeph_db::query(sql!(
176 "INSERT INTO skill_outcomes \
177 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
178 VALUES (?, ?, ?, ?, ?, ?)"
179 ))
180 .bind(name)
181 .bind(version_id)
182 .bind(conversation_id)
183 .bind(outcome)
184 .bind(error_context)
185 .bind(outcome_detail)
186 .execute(&mut *tx)
187 .await?;
188 }
189 tx.commit().await?;
190 Ok(())
191 }
192
193 pub async fn skill_metrics(
199 &self,
200 skill_name: &str,
201 ) -> Result<Option<SkillMetricsRow>, MemoryError> {
202 let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
203 "SELECT skill_name, version_id, \
204 COUNT(*) as total, \
205 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
206 COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
207 FROM skill_outcomes WHERE skill_name = ? \
208 AND outcome NOT IN ('user_approval', 'user_rejection') \
209 GROUP BY skill_name, version_id \
210 ORDER BY version_id DESC LIMIT 1"
211 ))
212 .bind(skill_name)
213 .fetch_optional(&self.pool)
214 .await?;
215
216 Ok(row.map(
217 |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
218 skill_name,
219 version_id,
220 total,
221 successes,
222 failures,
223 },
224 ))
225 }
226
227 pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
233 let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234 "SELECT skill_name, version_id, \
235 COUNT(*) as total, \
236 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237 COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238 FROM skill_outcomes \
239 GROUP BY skill_name \
240 ORDER BY total DESC"
241 ))
242 .fetch_all(&self.pool)
243 .await?;
244
245 Ok(rows
246 .into_iter()
247 .map(
248 |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249 skill_name,
250 version_id,
251 total,
252 successes,
253 failures,
254 },
255 )
256 .collect())
257 }
258
259 #[allow(clippy::too_many_arguments)]
265 pub async fn save_skill_version(
266 &self,
267 skill_name: &str,
268 version: i64,
269 body: &str,
270 description: &str,
271 source: &str,
272 error_context: Option<&str>,
273 predecessor_id: Option<i64>,
274 ) -> Result<i64, MemoryError> {
275 let row: (i64,) = zeph_db::query_as(sql!(
276 "INSERT INTO skill_versions \
277 (skill_name, version, body, description, source, error_context, predecessor_id) \
278 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
279 ))
280 .bind(skill_name)
281 .bind(version)
282 .bind(body)
283 .bind(description)
284 .bind(source)
285 .bind(error_context)
286 .bind(predecessor_id)
287 .fetch_one(&self.pool)
288 .await?;
289 Ok(row.0)
290 }
291
292 pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
301 let row: (i64,) = zeph_db::query_as(sql!(
302 "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
303 WHERE skill_name = ? AND conversation_id IS NOT NULL"
304 ))
305 .bind(skill_name)
306 .fetch_one(&self.pool)
307 .await?;
308 Ok(row.0)
309 }
310
311 pub async fn active_skill_version(
317 &self,
318 skill_name: &str,
319 ) -> Result<Option<SkillVersionRow>, MemoryError> {
320 let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
321 "SELECT id, skill_name, version, body, description, source, \
322 is_active, success_count, failure_count, created_at \
323 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
324 ))
325 .bind(skill_name)
326 .fetch_optional(&self.pool)
327 .await?;
328
329 Ok(row.map(skill_version_from_tuple))
330 }
331
332 pub async fn activate_skill_version(
338 &self,
339 skill_name: &str,
340 version_id: i64,
341 ) -> Result<(), MemoryError> {
342 let mut tx = begin_write(&self.pool).await?;
343
344 zeph_db::query(sql!(
345 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
346 ))
347 .bind(skill_name)
348 .execute(&mut *tx)
349 .await?;
350
351 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
352 .bind(version_id)
353 .execute(&mut *tx)
354 .await?;
355
356 tx.commit().await?;
357 Ok(())
358 }
359
360 pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
366 let row: (i64,) = zeph_db::query_as(sql!(
367 "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
368 ))
369 .bind(skill_name)
370 .fetch_one(&self.pool)
371 .await?;
372 Ok(row.0)
373 }
374
375 pub async fn last_improvement_time(
381 &self,
382 skill_name: &str,
383 ) -> Result<Option<String>, MemoryError> {
384 let row: Option<(String,)> = zeph_db::query_as(sql!(
385 "SELECT created_at FROM skill_versions \
386 WHERE skill_name = ? AND source = 'auto' \
387 ORDER BY id DESC LIMIT 1"
388 ))
389 .bind(skill_name)
390 .fetch_optional(&self.pool)
391 .await?;
392 Ok(row.map(|r| r.0))
393 }
394
395 pub async fn ensure_skill_version_exists(
401 &self,
402 skill_name: &str,
403 body: &str,
404 description: &str,
405 ) -> Result<(), MemoryError> {
406 let existing: Option<(i64,)> = zeph_db::query_as(sql!(
407 "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
408 ))
409 .bind(skill_name)
410 .fetch_optional(&self.pool)
411 .await?;
412
413 if existing.is_none() {
414 let id = self
415 .save_skill_version(skill_name, 1, body, description, "manual", None, None)
416 .await?;
417 self.activate_skill_version(skill_name, id).await?;
418 }
419 Ok(())
420 }
421
422 pub async fn load_skill_versions(
428 &self,
429 skill_name: &str,
430 ) -> Result<Vec<SkillVersionRow>, MemoryError> {
431 let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
432 "SELECT id, skill_name, version, body, description, source, \
433 is_active, success_count, failure_count, created_at \
434 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
435 ))
436 .bind(skill_name)
437 .fetch_all(&self.pool)
438 .await?;
439
440 Ok(rows.into_iter().map(skill_version_from_tuple).collect())
441 }
442
443 pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
449 let row: (i64,) = zeph_db::query_as(sql!(
450 "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
451 ))
452 .bind(skill_name)
453 .fetch_one(&self.pool)
454 .await?;
455 Ok(row.0)
456 }
457
458 pub async fn prune_skill_versions(
465 &self,
466 skill_name: &str,
467 max_versions: u32,
468 ) -> Result<u32, MemoryError> {
469 let result = zeph_db::query(sql!(
470 "DELETE FROM skill_versions WHERE id IN (\
471 SELECT id FROM skill_versions \
472 WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
473 ORDER BY id ASC \
474 LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
475 WHERE skill_name = ? AND source = 'auto') - ?)\
476 )"
477 ))
478 .bind(skill_name)
479 .bind(skill_name)
480 .bind(max_versions)
481 .execute(&self.pool)
482 .await?;
483 Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
484 }
485
486 pub async fn predecessor_version(
492 &self,
493 version_id: i64,
494 ) -> Result<Option<SkillVersionRow>, MemoryError> {
495 let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
496 "SELECT predecessor_id FROM skill_versions WHERE id = ?"
497 ))
498 .bind(version_id)
499 .fetch_optional(&self.pool)
500 .await?;
501
502 let Some((Some(pid),)) = pred_id else {
503 return Ok(None);
504 };
505
506 let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
507 "SELECT id, skill_name, version, body, description, source, \
508 is_active, success_count, failure_count, created_at \
509 FROM skill_versions WHERE id = ?"
510 ))
511 .bind(pid)
512 .fetch_optional(&self.pool)
513 .await?;
514
515 Ok(row.map(skill_version_from_tuple))
516 }
517
518 pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
525 let rows: Vec<(String,)> = zeph_db::query_as(sql!(
526 "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
527 ))
528 .fetch_all(&self.pool)
529 .await?;
530 Ok(rows.into_iter().map(|(name,)| name).collect())
531 }
532
533 pub async fn insert_tool_usage_log(
543 &self,
544 tool_sequence: &str,
545 sequence_hash: &str,
546 context_hash: &str,
547 outcome: &str,
548 conversation_id: Option<crate::types::ConversationId>,
549 ) -> Result<(), MemoryError> {
550 zeph_db::query(sql!(
551 "INSERT INTO skill_usage_log \
552 (tool_sequence, sequence_hash, context_hash, outcome, conversation_id) \
553 VALUES (?, ?, ?, ?, ?)"
554 ))
555 .bind(tool_sequence)
556 .bind(sequence_hash)
557 .bind(context_hash)
558 .bind(outcome)
559 .bind(conversation_id)
560 .execute(&self.pool)
561 .await?;
562 Ok(())
563 }
564
565 pub async fn find_recurring_patterns(
573 &self,
574 min_count: u32,
575 window_days: u32,
576 ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
577 let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
578 "SELECT tool_sequence, sequence_hash, \
579 COUNT(*) as occurrence_count, \
580 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
581 FROM skill_usage_log \
582 WHERE created_at > datetime('now', '-' || ? || ' days') \
583 GROUP BY sequence_hash \
584 HAVING occurrence_count >= ? \
585 ORDER BY occurrence_count DESC \
586 LIMIT 10"
587 ))
588 .bind(window_days)
589 .bind(min_count)
590 .fetch_all(&self.pool)
591 .await?;
592
593 Ok(rows
594 .into_iter()
595 .map(|(seq, hash, occ, suc)| {
596 (
597 seq,
598 hash,
599 u32::try_from(occ).unwrap_or(u32::MAX),
600 u32::try_from(suc).unwrap_or(0),
601 )
602 })
603 .collect())
604 }
605
606 pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
611 let result = zeph_db::query(sql!(
612 "DELETE FROM skill_usage_log \
613 WHERE created_at < datetime('now', '-' || ? || ' days')"
614 ))
615 .bind(retention_days)
616 .execute(&self.pool)
617 .await?;
618 Ok(result.rows_affected())
619 }
620
621 pub async fn insert_skill_heuristic(
628 &self,
629 skill_name: Option<&str>,
630 heuristic_text: &str,
631 confidence: f64,
632 ) -> Result<i64, MemoryError> {
633 let row: (i64,) = zeph_db::query_as(sql!(
634 "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) \
635 VALUES (?, ?, ?) RETURNING id"
636 ))
637 .bind(skill_name)
638 .bind(heuristic_text)
639 .bind(confidence)
640 .fetch_one(&self.pool)
641 .await?;
642 Ok(row.0)
643 }
644
645 pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
650 zeph_db::query(sql!(
651 "UPDATE skill_heuristics \
652 SET use_count = use_count + 1, updated_at = datetime('now') \
653 WHERE id = ?"
654 ))
655 .bind(id)
656 .execute(&self.pool)
657 .await?;
658 Ok(())
659 }
660
661 pub async fn load_skill_heuristics(
669 &self,
670 skill_name: &str,
671 min_confidence: f64,
672 limit: u32,
673 ) -> Result<Vec<(i64, String, f64, i64)>, MemoryError> {
674 let rows: Vec<(i64, String, f64, i64)> = zeph_db::query_as(sql!(
675 "SELECT id, heuristic_text, confidence, use_count \
676 FROM skill_heuristics \
677 WHERE (skill_name = ? OR skill_name IS NULL) \
678 AND confidence >= ? \
679 ORDER BY confidence DESC \
680 LIMIT ?"
681 ))
682 .bind(skill_name)
683 .bind(min_confidence)
684 .bind(limit)
685 .fetch_all(&self.pool)
686 .await?;
687 Ok(rows)
688 }
689
690 pub async fn load_all_heuristics_for_skill(
697 &self,
698 skill_name: Option<&str>,
699 ) -> Result<Vec<(i64, String)>, MemoryError> {
700 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
701 "SELECT id, heuristic_text FROM skill_heuristics \
702 WHERE (skill_name = ? OR (? IS NULL AND skill_name IS NULL))"
703 ))
704 .bind(skill_name)
705 .bind(skill_name)
706 .fetch_all(&self.pool)
707 .await?;
708 Ok(rows)
709 }
710
711 pub async fn find_step_corrections(
722 &self,
723 skill_name: &str,
724 failure_kind: &str,
725 error_context: &str,
726 tool_name: &str,
727 limit: u32,
728 ) -> Result<Vec<(i64, String)>, MemoryError> {
729 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
730 "SELECT id, hint FROM step_corrections \
731 WHERE skill_name = ? \
732 AND (failure_kind = '' OR failure_kind = ?) \
733 AND (error_substring = '' OR INSTR(?, error_substring) > 0) \
734 AND (tool_name = '' OR tool_name = ?) \
735 ORDER BY success_count DESC, use_count DESC \
736 LIMIT ?"
737 ))
738 .bind(skill_name)
739 .bind(failure_kind)
740 .bind(error_context)
741 .bind(tool_name)
742 .bind(limit)
743 .fetch_all(&self.pool)
744 .await?;
745 Ok(rows)
746 }
747
748 pub async fn insert_step_correction(
757 &self,
758 skill_name: &str,
759 failure_kind: &str,
760 error_substring: &str,
761 tool_name: &str,
762 hint: &str,
763 ) -> Result<(), MemoryError> {
764 zeph_db::query(sql!(
765 "INSERT OR IGNORE INTO step_corrections \
766 (skill_name, failure_kind, error_substring, tool_name, hint) \
767 VALUES (?, ?, ?, ?, ?)"
768 ))
769 .bind(skill_name)
770 .bind(failure_kind)
771 .bind(error_substring)
772 .bind(tool_name)
773 .bind(hint)
774 .execute(&self.pool)
775 .await?;
776 Ok(())
777 }
778
779 pub async fn record_correction_usage(
785 &self,
786 correction_id: i64,
787 was_successful: bool,
788 ) -> Result<(), MemoryError> {
789 if was_successful {
790 zeph_db::query(sql!(
791 "UPDATE step_corrections \
792 SET use_count = use_count + 1, success_count = success_count + 1 \
793 WHERE id = ?"
794 ))
795 .bind(correction_id)
796 .execute(&self.pool)
797 .await?;
798 } else {
799 zeph_db::query(sql!(
800 "UPDATE step_corrections SET use_count = use_count + 1 WHERE id = ?"
801 ))
802 .bind(correction_id)
803 .execute(&self.pool)
804 .await?;
805 }
806 Ok(())
807 }
808
809 pub async fn load_routing_head_weights(
819 &self,
820 ) -> Result<Option<(i64, Vec<u8>, f64, i64)>, MemoryError> {
821 let row: Option<(i64, Vec<u8>, f64, i64)> = zeph_db::query_as(sql!(
822 "SELECT embed_dim, weights, baseline, update_count \
823 FROM routing_head_weights WHERE id = 1"
824 ))
825 .fetch_optional(&self.pool)
826 .await?;
827 Ok(row)
828 }
829
830 pub async fn save_routing_head_weights(
836 &self,
837 embed_dim: i64,
838 weights: &[u8],
839 baseline: f64,
840 update_count: i64,
841 ) -> Result<(), MemoryError> {
842 zeph_db::query(sql!(
843 "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
844 VALUES (1, ?, ?, ?, ?, datetime('now')) \
845 ON CONFLICT(id) DO UPDATE SET \
846 embed_dim = excluded.embed_dim, \
847 weights = excluded.weights, \
848 baseline = excluded.baseline, \
849 update_count = excluded.update_count, \
850 updated_at = datetime('now')"
851 ))
852 .bind(embed_dim)
853 .bind(weights)
854 .bind(baseline)
855 .bind(update_count)
856 .execute(&self.pool)
857 .await?;
858 Ok(())
859 }
860}
861
862#[cfg(test)]
863mod tests {
864 use std::time::Duration;
865
866 use tokio::time::sleep;
867
868 use super::*;
869
870 async fn test_store() -> SqliteStore {
871 SqliteStore::new(":memory:").await.unwrap()
872 }
873
874 #[tokio::test]
875 async fn record_skill_usage_increments() {
876 let store = test_store().await;
877
878 store.record_skill_usage(&["git"]).await.unwrap();
879 store.record_skill_usage(&["git"]).await.unwrap();
880
881 let usage = store.load_skill_usage().await.unwrap();
882 assert_eq!(usage.len(), 1);
883 assert_eq!(usage[0].skill_name, "git");
884 assert_eq!(usage[0].invocation_count, 2);
885 }
886
887 #[tokio::test]
888 async fn load_skill_usage_returns_all() {
889 let store = test_store().await;
890
891 store.record_skill_usage(&["git", "docker"]).await.unwrap();
892 store.record_skill_usage(&["git"]).await.unwrap();
893
894 let usage = store.load_skill_usage().await.unwrap();
895 assert_eq!(usage.len(), 2);
896 assert_eq!(usage[0].skill_name, "git");
897 assert_eq!(usage[0].invocation_count, 2);
898 assert_eq!(usage[1].skill_name, "docker");
899 assert_eq!(usage[1].invocation_count, 1);
900 }
901
902 #[tokio::test]
903 async fn migration_005_creates_tables() {
904 let store = test_store().await;
905 let pool = store.pool();
906
907 let versions: (i64,) = zeph_db::query_as(sql!(
908 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
909 ))
910 .fetch_one(pool)
911 .await
912 .unwrap();
913 assert_eq!(versions.0, 1);
914
915 let outcomes: (i64,) = zeph_db::query_as(sql!(
916 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
917 ))
918 .fetch_one(pool)
919 .await
920 .unwrap();
921 assert_eq!(outcomes.0, 1);
922 }
923
924 #[tokio::test]
925 async fn record_skill_outcome_inserts() {
926 let store = test_store().await;
927
928 store
929 .record_skill_outcome(
930 "git",
931 None,
932 Some(crate::types::ConversationId(1)),
933 "success",
934 None,
935 None,
936 )
937 .await
938 .unwrap();
939 store
940 .record_skill_outcome(
941 "git",
942 None,
943 Some(crate::types::ConversationId(1)),
944 "tool_failure",
945 Some("exit code 1"),
946 None,
947 )
948 .await
949 .unwrap();
950
951 let metrics = store.skill_metrics("git").await.unwrap().unwrap();
952 assert_eq!(metrics.total, 2);
953 assert_eq!(metrics.successes, 1);
954 assert_eq!(metrics.failures, 1);
955 }
956
957 #[tokio::test]
958 async fn skill_metrics_none_for_unknown() {
959 let store = test_store().await;
960 let m = store.skill_metrics("nonexistent").await.unwrap();
961 assert!(m.is_none());
962 }
963
964 #[tokio::test]
965 async fn load_skill_outcome_stats_grouped() {
966 let store = test_store().await;
967
968 store
969 .record_skill_outcome("git", None, None, "success", None, None)
970 .await
971 .unwrap();
972 store
973 .record_skill_outcome("git", None, None, "tool_failure", None, None)
974 .await
975 .unwrap();
976 store
977 .record_skill_outcome("docker", None, None, "success", None, None)
978 .await
979 .unwrap();
980
981 let stats = store.load_skill_outcome_stats().await.unwrap();
982 assert_eq!(stats.len(), 2);
983 assert_eq!(stats[0].skill_name, "git");
984 assert_eq!(stats[0].total, 2);
985 assert_eq!(stats[1].skill_name, "docker");
986 assert_eq!(stats[1].total, 1);
987 }
988
989 #[tokio::test]
990 async fn save_and_load_skill_version() {
991 let store = test_store().await;
992
993 let id = store
994 .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
995 .await
996 .unwrap();
997 assert!(id > 0);
998
999 store.activate_skill_version("git", id).await.unwrap();
1000
1001 let active = store.active_skill_version("git").await.unwrap().unwrap();
1002 assert_eq!(active.version, 1);
1003 assert_eq!(active.body, "body v1");
1004 assert!(active.is_active);
1005 }
1006
1007 #[tokio::test]
1008 async fn activate_deactivates_previous() {
1009 let store = test_store().await;
1010
1011 let v1 = store
1012 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1013 .await
1014 .unwrap();
1015 store.activate_skill_version("git", v1).await.unwrap();
1016
1017 let v2 = store
1018 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1019 .await
1020 .unwrap();
1021 store.activate_skill_version("git", v2).await.unwrap();
1022
1023 let versions = store.load_skill_versions("git").await.unwrap();
1024 assert_eq!(versions.len(), 2);
1025 assert!(!versions[0].is_active);
1026 assert!(versions[1].is_active);
1027 }
1028
1029 #[tokio::test]
1030 async fn next_skill_version_increments() {
1031 let store = test_store().await;
1032
1033 let next = store.next_skill_version("git").await.unwrap();
1034 assert_eq!(next, 1);
1035
1036 store
1037 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1038 .await
1039 .unwrap();
1040 let next = store.next_skill_version("git").await.unwrap();
1041 assert_eq!(next, 2);
1042 }
1043
1044 #[tokio::test]
1045 async fn last_improvement_time_returns_auto_only() {
1046 let store = test_store().await;
1047
1048 store
1049 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1050 .await
1051 .unwrap();
1052
1053 let t = store.last_improvement_time("git").await.unwrap();
1054 assert!(t.is_none());
1055
1056 store
1057 .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1058 .await
1059 .unwrap();
1060
1061 let t = store.last_improvement_time("git").await.unwrap();
1062 assert!(t.is_some());
1063 }
1064
1065 #[tokio::test]
1066 async fn ensure_skill_version_exists_idempotent() {
1067 let store = test_store().await;
1068
1069 store
1070 .ensure_skill_version_exists("git", "body", "Git helper")
1071 .await
1072 .unwrap();
1073 store
1074 .ensure_skill_version_exists("git", "body2", "Git helper 2")
1075 .await
1076 .unwrap();
1077
1078 let versions = store.load_skill_versions("git").await.unwrap();
1079 assert_eq!(versions.len(), 1);
1080 assert_eq!(versions[0].body, "body");
1081 }
1082
1083 #[tokio::test]
1084 async fn load_skill_versions_ordered() {
1085 let store = test_store().await;
1086
1087 let v1 = store
1088 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1089 .await
1090 .unwrap();
1091 store
1092 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1093 .await
1094 .unwrap();
1095
1096 let versions = store.load_skill_versions("git").await.unwrap();
1097 assert_eq!(versions.len(), 2);
1098 assert_eq!(versions[0].version, 1);
1099 assert_eq!(versions[1].version, 2);
1100 }
1101
1102 #[tokio::test]
1103 async fn count_auto_versions_only() {
1104 let store = test_store().await;
1105
1106 store
1107 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1108 .await
1109 .unwrap();
1110 store
1111 .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1112 .await
1113 .unwrap();
1114 store
1115 .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
1116 .await
1117 .unwrap();
1118
1119 let count = store.count_auto_versions("git").await.unwrap();
1120 assert_eq!(count, 2);
1121 }
1122
1123 #[tokio::test]
1124 async fn prune_preserves_manual_and_active() {
1125 let store = test_store().await;
1126
1127 let v1 = store
1128 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1129 .await
1130 .unwrap();
1131 store.activate_skill_version("git", v1).await.unwrap();
1132
1133 for i in 2..=5 {
1134 store
1135 .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
1136 .await
1137 .unwrap();
1138 }
1139
1140 let pruned = store.prune_skill_versions("git", 2).await.unwrap();
1141 assert_eq!(pruned, 2);
1142
1143 let versions = store.load_skill_versions("git").await.unwrap();
1144 assert!(versions.iter().any(|v| v.source == "manual"));
1145 let auto_count = versions.iter().filter(|v| v.source == "auto").count();
1146 assert_eq!(auto_count, 2);
1147 }
1148
1149 #[tokio::test]
1150 async fn predecessor_version_returns_parent() {
1151 let store = test_store().await;
1152
1153 let v1 = store
1154 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1155 .await
1156 .unwrap();
1157 let v2 = store
1158 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1159 .await
1160 .unwrap();
1161
1162 let pred = store.predecessor_version(v2).await.unwrap().unwrap();
1163 assert_eq!(pred.id, v1);
1164 assert_eq!(pred.version, 1);
1165 }
1166
1167 #[tokio::test]
1168 async fn predecessor_version_none_for_root() {
1169 let store = test_store().await;
1170
1171 let v1 = store
1172 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1173 .await
1174 .unwrap();
1175
1176 let pred = store.predecessor_version(v1).await.unwrap();
1177 assert!(pred.is_none());
1178 }
1179
1180 #[tokio::test]
1181 async fn active_skill_version_none_for_unknown() {
1182 let store = test_store().await;
1183 let active = store.active_skill_version("nonexistent").await.unwrap();
1184 assert!(active.is_none());
1185 }
1186
1187 #[tokio::test]
1188 async fn load_skill_outcome_stats_empty() {
1189 let store = test_store().await;
1190 let stats = store.load_skill_outcome_stats().await.unwrap();
1191 assert!(stats.is_empty());
1192 }
1193
1194 #[tokio::test]
1195 async fn load_skill_versions_empty() {
1196 let store = test_store().await;
1197 let versions = store.load_skill_versions("nonexistent").await.unwrap();
1198 assert!(versions.is_empty());
1199 }
1200
1201 #[tokio::test]
1202 async fn count_auto_versions_zero_for_unknown() {
1203 let store = test_store().await;
1204 let count = store.count_auto_versions("nonexistent").await.unwrap();
1205 assert_eq!(count, 0);
1206 }
1207
1208 #[tokio::test]
1209 async fn prune_nothing_when_below_limit() {
1210 let store = test_store().await;
1211
1212 store
1213 .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
1214 .await
1215 .unwrap();
1216
1217 let pruned = store.prune_skill_versions("git", 5).await.unwrap();
1218 assert_eq!(pruned, 0);
1219 }
1220
1221 #[tokio::test]
1222 async fn record_skill_outcome_with_error_context() {
1223 let store = test_store().await;
1224
1225 store
1226 .record_skill_outcome(
1227 "docker",
1228 None,
1229 Some(crate::types::ConversationId(1)),
1230 "tool_failure",
1231 Some("container not found"),
1232 None,
1233 )
1234 .await
1235 .unwrap();
1236
1237 let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
1238 assert_eq!(metrics.total, 1);
1239 assert_eq!(metrics.failures, 1);
1240 }
1241
1242 #[tokio::test]
1243 async fn save_skill_version_with_error_context() {
1244 let store = test_store().await;
1245
1246 let id = store
1247 .save_skill_version(
1248 "git",
1249 1,
1250 "improved body",
1251 "Git helper",
1252 "auto",
1253 Some("exit code 128"),
1254 None,
1255 )
1256 .await
1257 .unwrap();
1258 assert!(id > 0);
1259 }
1260
1261 #[tokio::test]
1262 async fn record_skill_outcomes_batch_resolves_version_id() {
1263 let store = test_store().await;
1264
1265 let vid = store
1266 .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1267 .await
1268 .unwrap();
1269 store.activate_skill_version("git", vid).await.unwrap();
1270
1271 store
1272 .record_skill_outcomes_batch(
1273 &["git".to_string()],
1274 None,
1275 "tool_failure",
1276 Some("exit code 1"),
1277 Some("exit_nonzero"),
1278 )
1279 .await
1280 .unwrap();
1281
1282 let pool = store.pool();
1283 let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
1284 "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
1285 ))
1286 .fetch_one(pool)
1287 .await
1288 .unwrap();
1289 assert_eq!(
1290 row.0,
1291 Some(vid),
1292 "version_id should be resolved to active version"
1293 );
1294 assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
1295 }
1296
1297 #[tokio::test]
1298 async fn record_skill_outcome_stores_outcome_detail() {
1299 let store = test_store().await;
1300
1301 store
1302 .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
1303 .await
1304 .unwrap();
1305
1306 let pool = store.pool();
1307 let row: (Option<String>,) = zeph_db::query_as(sql!(
1308 "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
1309 ))
1310 .fetch_one(pool)
1311 .await
1312 .unwrap();
1313 assert_eq!(row.0.as_deref(), Some("timeout"));
1314 }
1315
1316 #[tokio::test]
1317 async fn record_skill_outcomes_batch_waits_for_active_writer() {
1318 let file = tempfile::NamedTempFile::new().expect("tempfile");
1319 let path = file.path().to_str().expect("valid path").to_owned();
1320 let store = SqliteStore::with_pool_size(&path, 2)
1321 .await
1322 .expect("with_pool_size");
1323
1324 let vid = store
1325 .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1326 .await
1327 .unwrap();
1328 store.activate_skill_version("git", vid).await.unwrap();
1329
1330 let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1331 zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1332 .execute(&mut *writer_tx)
1333 .await
1334 .expect("hold write lock");
1335
1336 let batch_store = store.clone();
1337 let batch = tokio::spawn(async move {
1338 batch_store
1339 .record_skill_outcomes_batch(
1340 &["git".to_string()],
1341 None,
1342 "success",
1343 None,
1344 Some("waited_for_writer"),
1345 )
1346 .await
1347 });
1348
1349 sleep(Duration::from_millis(100)).await;
1350 writer_tx.commit().await.expect("commit writer");
1351
1352 batch
1353 .await
1354 .expect("join batch task")
1355 .expect("record outcomes");
1356
1357 let count: i64 = zeph_db::query_scalar(sql!(
1358 "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1359 ))
1360 .fetch_one(store.pool())
1361 .await
1362 .unwrap();
1363 assert_eq!(
1364 count, 1,
1365 "expected batch insert to succeed after writer commits"
1366 );
1367 }
1368
1369 #[tokio::test]
1370 async fn distinct_session_count_empty() {
1371 let store = test_store().await;
1372 let count = store.distinct_session_count("unknown-skill").await.unwrap();
1373 assert_eq!(count, 0);
1374 }
1375
1376 #[tokio::test]
1377 async fn distinct_session_count_single_session() {
1378 let store = test_store().await;
1379 let cid = crate::types::ConversationId(1);
1380 store
1381 .record_skill_outcome("git", None, Some(cid), "success", None, None)
1382 .await
1383 .unwrap();
1384 store
1385 .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1386 .await
1387 .unwrap();
1388 let count = store.distinct_session_count("git").await.unwrap();
1389 assert_eq!(count, 1);
1390 }
1391
1392 #[tokio::test]
1393 async fn distinct_session_count_multiple_sessions() {
1394 let store = test_store().await;
1395 for i in 0..3i64 {
1396 store
1397 .record_skill_outcome(
1398 "git",
1399 None,
1400 Some(crate::types::ConversationId(i)),
1401 "success",
1402 None,
1403 None,
1404 )
1405 .await
1406 .unwrap();
1407 }
1408 let count = store.distinct_session_count("git").await.unwrap();
1409 assert_eq!(count, 3);
1410 }
1411
1412 #[tokio::test]
1413 async fn distinct_session_count_null_conversation_ids_excluded() {
1414 let store = test_store().await;
1415 store
1417 .record_skill_outcome("git", None, None, "success", None, None)
1418 .await
1419 .unwrap();
1420 store
1421 .record_skill_outcome("git", None, None, "success", None, None)
1422 .await
1423 .unwrap();
1424 let count = store.distinct_session_count("git").await.unwrap();
1425 assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1426 }
1427
1428 #[tokio::test]
1431 async fn insert_and_find_recurring_patterns() {
1432 let store = test_store().await;
1433 let seq = r#"["shell","web_scrape"]"#;
1434 let hash = "abcdef0123456789";
1435 let ctx = "ctxhash000000000";
1436
1437 for _ in 0..3 {
1438 store
1439 .insert_tool_usage_log(seq, hash, ctx, "success", None)
1440 .await
1441 .unwrap();
1442 }
1443 store
1444 .insert_tool_usage_log(seq, hash, ctx, "failure", None)
1445 .await
1446 .unwrap();
1447
1448 let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1449 assert_eq!(patterns.len(), 1);
1450 let (s, h, occ, suc) = &patterns[0];
1451 assert_eq!(s, seq);
1452 assert_eq!(h, hash);
1453 assert_eq!(*occ, 4);
1454 assert_eq!(*suc, 3);
1455 }
1456
1457 #[tokio::test]
1458 async fn find_recurring_patterns_below_threshold_returns_empty() {
1459 let store = test_store().await;
1460 let seq = r#"["shell"]"#;
1461 let hash = "0000000000000001";
1462 let ctx = "0000000000000001";
1463
1464 store
1465 .insert_tool_usage_log(seq, hash, ctx, "success", None)
1466 .await
1467 .unwrap();
1468
1469 let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1470 assert!(patterns.is_empty());
1471 }
1472
1473 #[tokio::test]
1474 async fn prune_tool_usage_log_removes_old_rows() {
1475 let store = test_store().await;
1476 zeph_db::query(sql!(
1478 "INSERT INTO skill_usage_log \
1479 (tool_sequence, sequence_hash, context_hash, outcome, created_at) \
1480 VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
1481 ))
1482 .bind(r#"["shell"]"#)
1483 .bind("hash0000000000001")
1484 .bind("ctx00000000000001")
1485 .bind("success")
1486 .execute(store.pool())
1487 .await
1488 .unwrap();
1489
1490 let removed = store.prune_tool_usage_log(1).await.unwrap();
1492 assert_eq!(removed, 1);
1493 }
1494
1495 #[tokio::test]
1498 async fn insert_and_load_skill_heuristics() {
1499 let store = test_store().await;
1500
1501 let id = store
1502 .insert_skill_heuristic(Some("git"), "always commit in small chunks", 0.9)
1503 .await
1504 .unwrap();
1505 assert!(id > 0);
1506
1507 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1508 assert_eq!(rows.len(), 1);
1509 assert_eq!(rows[0].1, "always commit in small chunks");
1510 assert!((rows[0].2 - 0.9).abs() < 1e-6);
1511 }
1512
1513 #[tokio::test]
1514 async fn load_skill_heuristics_includes_general() {
1515 let store = test_store().await;
1516
1517 store
1518 .insert_skill_heuristic(None, "general tip", 0.7)
1519 .await
1520 .unwrap();
1521 store
1522 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1523 .await
1524 .unwrap();
1525
1526 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1528 assert_eq!(rows.len(), 2);
1529 }
1530
1531 #[tokio::test]
1532 async fn load_skill_heuristics_filters_by_min_confidence() {
1533 let store = test_store().await;
1534
1535 store
1536 .insert_skill_heuristic(Some("git"), "low confidence tip", 0.3)
1537 .await
1538 .unwrap();
1539 store
1540 .insert_skill_heuristic(Some("git"), "high confidence tip", 0.9)
1541 .await
1542 .unwrap();
1543
1544 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1545 assert_eq!(rows.len(), 1);
1546 assert_eq!(rows[0].1, "high confidence tip");
1547 }
1548
1549 #[tokio::test]
1550 async fn increment_heuristic_use_count_works() {
1551 let store = test_store().await;
1552
1553 let id = store
1554 .insert_skill_heuristic(Some("git"), "tip", 0.8)
1555 .await
1556 .unwrap();
1557
1558 store.increment_heuristic_use_count(id).await.unwrap();
1559 store.increment_heuristic_use_count(id).await.unwrap();
1560
1561 let rows = store.load_skill_heuristics("git", 0.0, 10).await.unwrap();
1562 assert_eq!(rows[0].3, 2); }
1564
1565 #[tokio::test]
1566 async fn load_all_heuristics_for_skill_exact_match() {
1567 let store = test_store().await;
1568
1569 store
1570 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1571 .await
1572 .unwrap();
1573 store
1574 .insert_skill_heuristic(Some("docker"), "docker tip", 0.8)
1575 .await
1576 .unwrap();
1577
1578 let rows = store
1579 .load_all_heuristics_for_skill(Some("git"))
1580 .await
1581 .unwrap();
1582 assert_eq!(rows.len(), 1);
1583 assert_eq!(rows[0].1, "git tip");
1584 }
1585
1586 #[tokio::test]
1587 async fn load_all_heuristics_for_skill_null() {
1588 let store = test_store().await;
1589
1590 store
1591 .insert_skill_heuristic(None, "general", 0.8)
1592 .await
1593 .unwrap();
1594 store
1595 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1596 .await
1597 .unwrap();
1598
1599 let rows = store.load_all_heuristics_for_skill(None).await.unwrap();
1600 assert_eq!(rows.len(), 1);
1601 assert_eq!(rows[0].1, "general");
1602 }
1603
1604 #[tokio::test]
1605 async fn skill_trust_default_is_quarantined() {
1606 let store = test_store().await;
1610
1611 zeph_db::query(sql!(
1613 "INSERT INTO skill_trust (skill_name, blake3_hash) VALUES ('test-arise', 'abc')"
1614 ))
1615 .execute(store.pool())
1616 .await
1617 .unwrap();
1618
1619 let trust: (String,) = zeph_db::query_as(sql!(
1620 "SELECT trust_level FROM skill_trust WHERE skill_name = 'test-arise'"
1621 ))
1622 .fetch_one(store.pool())
1623 .await
1624 .unwrap();
1625
1626 assert_eq!(
1627 trust.0, "quarantined",
1628 "schema default for skill_trust.trust_level must be 'quarantined'"
1629 );
1630 }
1631}