1use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9#[derive(Debug)]
11pub struct SkillUsageRow {
12 pub skill_name: String,
14 pub invocation_count: i64,
16 pub last_used_at: String,
18}
19
20#[derive(Debug)]
22pub struct SkillMetricsRow {
23 pub skill_name: String,
25 pub version_id: Option<i64>,
27 pub total: i64,
29 pub successes: i64,
31 pub failures: i64,
33}
34
35#[derive(Debug)]
37pub struct SkillVersionRow {
38 pub id: i64,
40 pub skill_name: String,
42 pub version: i64,
44 pub body: String,
46 pub description: String,
48 pub source: String,
50 pub is_active: bool,
52 pub success_count: i64,
54 pub failure_count: i64,
56 pub created_at: String,
58}
59
60type SkillVersionTuple = (
61 i64,
62 String,
63 i64,
64 String,
65 String,
66 String,
67 i64,
68 i64,
69 i64,
70 String,
71);
72
73fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
74 SkillVersionRow {
75 id: t.0,
76 skill_name: t.1,
77 version: t.2,
78 body: t.3,
79 description: t.4,
80 source: t.5,
81 is_active: t.6 != 0,
82 success_count: t.7,
83 failure_count: t.8,
84 created_at: t.9,
85 }
86}
87
88impl SqliteStore {
89 #[tracing::instrument(skip_all, name = "memory.skills.record_skill_usage")]
98 pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
99 let mut tx = begin_write(&self.pool).await?;
100 for name in skill_names {
101 zeph_db::query(sql!(
102 "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
103 VALUES (?, 1, CURRENT_TIMESTAMP) \
104 ON CONFLICT(skill_name) DO UPDATE SET \
105 invocation_count = invocation_count + 1, \
106 last_used_at = CURRENT_TIMESTAMP"
107 ))
108 .bind(name)
109 .execute(&mut *tx)
110 .await?;
111 }
112 tx.commit().await?;
113 Ok(())
114 }
115
116 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_usage")]
122 pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
123 let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
124 "SELECT skill_name, invocation_count, last_used_at \
125 FROM skill_usage ORDER BY invocation_count DESC"
126 ))
127 .fetch_all(&self.pool)
128 .await?;
129
130 Ok(rows
131 .into_iter()
132 .map(
133 |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
134 skill_name,
135 invocation_count,
136 last_used_at,
137 },
138 )
139 .collect())
140 }
141
142 #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcome")]
148 pub async fn record_skill_outcome(
149 &self,
150 skill_name: &str,
151 version_id: Option<i64>,
152 conversation_id: Option<crate::types::ConversationId>,
153 outcome: &str,
154 error_context: Option<&str>,
155 outcome_detail: Option<&str>,
156 ) -> Result<(), MemoryError> {
157 zeph_db::query(sql!(
158 "INSERT INTO skill_outcomes \
159 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
160 VALUES (?, ?, ?, ?, ?, ?)"
161 ))
162 .bind(skill_name)
163 .bind(version_id)
164 .bind(conversation_id)
165 .bind(outcome)
166 .bind(error_context)
167 .bind(outcome_detail)
168 .execute(&self.pool)
169 .await?;
170 Ok(())
171 }
172
173 #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcomes_batch")]
179 pub async fn record_skill_outcomes_batch(
180 &self,
181 skill_names: &[String],
182 conversation_id: Option<crate::types::ConversationId>,
183 outcome: &str,
184 error_context: Option<&str>,
185 outcome_detail: Option<&str>,
186 ) -> Result<(), MemoryError> {
187 let mut tx = begin_write(&self.pool).await?;
190
191 let mut version_map: std::collections::HashMap<String, Option<i64>> =
192 std::collections::HashMap::new();
193 for name in skill_names {
194 let vid: Option<(i64,)> = zeph_db::query_as(sql!(
195 "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
196 ))
197 .bind(name)
198 .fetch_optional(&mut *tx)
199 .await?;
200 version_map.insert(name.clone(), vid.map(|r| r.0));
201 }
202
203 for name in skill_names {
204 let version_id = version_map.get(name.as_str()).copied().flatten();
205 zeph_db::query(sql!(
206 "INSERT INTO skill_outcomes \
207 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
208 VALUES (?, ?, ?, ?, ?, ?)"
209 ))
210 .bind(name)
211 .bind(version_id)
212 .bind(conversation_id)
213 .bind(outcome)
214 .bind(error_context)
215 .bind(outcome_detail)
216 .execute(&mut *tx)
217 .await?;
218 }
219 tx.commit().await?;
220 Ok(())
221 }
222
223 #[tracing::instrument(skip_all, name = "memory.skills.skill_metrics")]
229 pub async fn skill_metrics(
230 &self,
231 skill_name: &str,
232 ) -> Result<Option<SkillMetricsRow>, MemoryError> {
233 let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234 "SELECT skill_name, version_id, \
235 COUNT(*) as total, \
236 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237 COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238 FROM skill_outcomes WHERE skill_name = ? \
239 AND outcome NOT IN ('user_approval', 'user_rejection') \
240 GROUP BY skill_name, version_id \
241 ORDER BY version_id DESC LIMIT 1"
242 ))
243 .bind(skill_name)
244 .fetch_optional(&self.pool)
245 .await?;
246
247 Ok(row.map(
248 |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249 skill_name,
250 version_id,
251 total,
252 successes,
253 failures,
254 },
255 ))
256 }
257
258 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_outcome_stats")]
264 pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
265 let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
266 "SELECT skill_name, version_id, \
267 COUNT(*) as total, \
268 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
269 COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
270 FROM skill_outcomes \
271 GROUP BY skill_name \
272 ORDER BY total DESC"
273 ))
274 .fetch_all(&self.pool)
275 .await?;
276
277 Ok(rows
278 .into_iter()
279 .map(
280 |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
281 skill_name,
282 version_id,
283 total,
284 successes,
285 failures,
286 },
287 )
288 .collect())
289 }
290
291 #[allow(clippy::too_many_arguments)]
297 #[tracing::instrument(skip_all, name = "memory.skills.save_skill_version")]
299 pub async fn save_skill_version(
300 &self,
301 skill_name: &str,
302 version: i64,
303 body: &str,
304 description: &str,
305 source: &str,
306 error_context: Option<&str>,
307 predecessor_id: Option<i64>,
308 ) -> Result<i64, MemoryError> {
309 let row: (i64,) = zeph_db::query_as(sql!(
310 "INSERT INTO skill_versions \
311 (skill_name, version, body, description, source, error_context, predecessor_id) \
312 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
313 ))
314 .bind(skill_name)
315 .bind(version)
316 .bind(body)
317 .bind(description)
318 .bind(source)
319 .bind(error_context)
320 .bind(predecessor_id)
321 .fetch_one(&self.pool)
322 .await?;
323 Ok(row.0)
324 }
325
326 #[allow(clippy::too_many_arguments)]
337 #[tracing::instrument(skip_all, name = "memory.skills.save_and_activate_skill_version")]
338 pub async fn save_and_activate_skill_version(
339 &self,
340 skill_name: &str,
341 version: i64,
342 body: &str,
343 description: &str,
344 source: &str,
345 error_context: Option<&str>,
346 predecessor_id: Option<i64>,
347 ) -> Result<i64, MemoryError> {
348 let mut tx = begin_write(&self.pool).await?;
349
350 let row: (i64,) = zeph_db::query_as(sql!(
351 "INSERT INTO skill_versions \
352 (skill_name, version, body, description, source, error_context, predecessor_id) \
353 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
354 ))
355 .bind(skill_name)
356 .bind(version)
357 .bind(body)
358 .bind(description)
359 .bind(source)
360 .bind(error_context)
361 .bind(predecessor_id)
362 .fetch_one(&mut *tx)
363 .await?;
364 let id = row.0;
365
366 zeph_db::query(sql!(
367 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
368 ))
369 .bind(skill_name)
370 .execute(&mut *tx)
371 .await?;
372
373 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
374 .bind(id)
375 .execute(&mut *tx)
376 .await?;
377
378 tx.commit().await?;
379 Ok(id)
380 }
381
382 #[tracing::instrument(skip_all, name = "memory.skills.distinct_session_count")]
391 pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
392 let row: (i64,) = zeph_db::query_as(sql!(
393 "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
394 WHERE skill_name = ? AND conversation_id IS NOT NULL"
395 ))
396 .bind(skill_name)
397 .fetch_one(&self.pool)
398 .await?;
399 Ok(row.0)
400 }
401
402 #[tracing::instrument(skip_all, name = "memory.skills.active_skill_version")]
408 pub async fn active_skill_version(
409 &self,
410 skill_name: &str,
411 ) -> Result<Option<SkillVersionRow>, MemoryError> {
412 let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
413 "SELECT id, skill_name, version, body, description, source, \
414 is_active, success_count, failure_count, created_at \
415 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
416 ))
417 .bind(skill_name)
418 .fetch_optional(&self.pool)
419 .await?;
420
421 Ok(row.map(skill_version_from_tuple))
422 }
423
424 #[tracing::instrument(skip_all, name = "memory.skills.activate_skill_version")]
430 pub async fn activate_skill_version(
431 &self,
432 skill_name: &str,
433 version_id: i64,
434 ) -> Result<(), MemoryError> {
435 let mut tx = begin_write(&self.pool).await?;
436
437 zeph_db::query(sql!(
438 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
439 ))
440 .bind(skill_name)
441 .execute(&mut *tx)
442 .await?;
443
444 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
445 .bind(version_id)
446 .execute(&mut *tx)
447 .await?;
448
449 tx.commit().await?;
450 Ok(())
451 }
452
453 #[tracing::instrument(skip_all, name = "memory.skills.next_skill_version")]
459 pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
460 let row: (i64,) = zeph_db::query_as(sql!(
461 "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
462 ))
463 .bind(skill_name)
464 .fetch_one(&self.pool)
465 .await?;
466 Ok(row.0)
467 }
468
469 #[tracing::instrument(skip_all, name = "memory.skills.last_improvement_time")]
475 pub async fn last_improvement_time(
476 &self,
477 skill_name: &str,
478 ) -> Result<Option<String>, MemoryError> {
479 let row: Option<(String,)> = zeph_db::query_as(sql!(
480 "SELECT created_at FROM skill_versions \
481 WHERE skill_name = ? AND source = 'auto' \
482 ORDER BY id DESC LIMIT 1"
483 ))
484 .bind(skill_name)
485 .fetch_optional(&self.pool)
486 .await?;
487 Ok(row.map(|r| r.0))
488 }
489
490 #[tracing::instrument(skip_all, name = "memory.skills.ensure_skill_version_exists")]
500 pub async fn ensure_skill_version_exists(
501 &self,
502 skill_name: &str,
503 body: &str,
504 description: &str,
505 ) -> Result<(), MemoryError> {
506 let mut tx = begin_write(&self.pool).await?;
507
508 let existing: Option<(i64,)> = zeph_db::query_as(sql!(
509 "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
510 ))
511 .bind(skill_name)
512 .fetch_optional(&mut *tx)
513 .await?;
514
515 if existing.is_none() {
516 let row: (i64,) = zeph_db::query_as(sql!(
517 "INSERT INTO skill_versions \
518 (skill_name, version, body, description, source, error_context, predecessor_id) \
519 VALUES (?, 1, ?, ?, 'manual', NULL, NULL) RETURNING id"
520 ))
521 .bind(skill_name)
522 .bind(body)
523 .bind(description)
524 .fetch_one(&mut *tx)
525 .await?;
526 let id = row.0;
527
528 zeph_db::query(sql!(
529 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
530 ))
531 .bind(skill_name)
532 .execute(&mut *tx)
533 .await?;
534
535 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
536 .bind(id)
537 .execute(&mut *tx)
538 .await?;
539 }
540
541 tx.commit().await?;
542 Ok(())
543 }
544
545 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_versions")]
551 pub async fn load_skill_versions(
552 &self,
553 skill_name: &str,
554 ) -> Result<Vec<SkillVersionRow>, MemoryError> {
555 let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
556 "SELECT id, skill_name, version, body, description, source, \
557 is_active, success_count, failure_count, created_at \
558 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
559 ))
560 .bind(skill_name)
561 .fetch_all(&self.pool)
562 .await?;
563
564 Ok(rows.into_iter().map(skill_version_from_tuple).collect())
565 }
566
567 #[tracing::instrument(skip_all, name = "memory.skills.count_auto_versions")]
573 pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
574 let row: (i64,) = zeph_db::query_as(sql!(
575 "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
576 ))
577 .bind(skill_name)
578 .fetch_one(&self.pool)
579 .await?;
580 Ok(row.0)
581 }
582
583 #[cfg(not(feature = "postgres"))]
590 #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
591 pub async fn prune_skill_versions(
592 &self,
593 skill_name: &str,
594 max_versions: u32,
595 ) -> Result<u32, MemoryError> {
596 let result = zeph_db::query(sql!(
597 "DELETE FROM skill_versions WHERE id IN (\
598 SELECT id FROM skill_versions \
599 WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
600 ORDER BY id ASC \
601 LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
602 WHERE skill_name = ? AND source = 'auto') - ?)\
603 )"
604 ))
605 .bind(skill_name)
606 .bind(skill_name)
607 .bind(max_versions)
608 .execute(&self.pool)
609 .await?;
610 Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
611 }
612
613 #[cfg(feature = "postgres")]
619 #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
620 pub async fn prune_skill_versions(
621 &self,
622 skill_name: &str,
623 max_versions: u32,
624 ) -> Result<u32, MemoryError> {
625 let result = zeph_db::query(sql!(
626 "DELETE FROM skill_versions WHERE id IN (\
627 SELECT id FROM skill_versions \
628 WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
629 ORDER BY id DESC \
630 OFFSET ?\
631 )"
632 ))
633 .bind(skill_name)
634 .bind(i64::from(max_versions))
635 .execute(&self.pool)
636 .await?;
637 Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
638 }
639
640 #[tracing::instrument(skip_all, name = "memory.skills.predecessor_version")]
646 pub async fn predecessor_version(
647 &self,
648 version_id: i64,
649 ) -> Result<Option<SkillVersionRow>, MemoryError> {
650 let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
651 "SELECT predecessor_id FROM skill_versions WHERE id = ?"
652 ))
653 .bind(version_id)
654 .fetch_optional(&self.pool)
655 .await?;
656
657 let Some((Some(pid),)) = pred_id else {
658 return Ok(None);
659 };
660
661 let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
662 "SELECT id, skill_name, version, body, description, source, \
663 is_active, success_count, failure_count, created_at \
664 FROM skill_versions WHERE id = ?"
665 ))
666 .bind(pid)
667 .fetch_optional(&self.pool)
668 .await?;
669
670 Ok(row.map(skill_version_from_tuple))
671 }
672
673 #[tracing::instrument(skip_all, name = "memory.skills.list_active_auto_versions")]
680 pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
681 let rows: Vec<(String,)> = zeph_db::query_as(sql!(
682 "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
683 ))
684 .fetch_all(&self.pool)
685 .await?;
686 Ok(rows.into_iter().map(|(name,)| name).collect())
687 }
688
689 #[tracing::instrument(skip_all, name = "memory.skills.insert_tool_usage_log")]
699 pub async fn insert_tool_usage_log(
700 &self,
701 tool_sequence: &str,
702 sequence_hash: &str,
703 context_hash: &str,
704 outcome: &str,
705 conversation_id: Option<crate::types::ConversationId>,
706 ) -> Result<(), MemoryError> {
707 zeph_db::query(sql!(
708 "INSERT INTO skill_usage_log \
709 (tool_sequence, sequence_hash, context_hash, outcome, conversation_id) \
710 VALUES (?, ?, ?, ?, ?)"
711 ))
712 .bind(tool_sequence)
713 .bind(sequence_hash)
714 .bind(context_hash)
715 .bind(outcome)
716 .bind(conversation_id)
717 .execute(&self.pool)
718 .await?;
719 Ok(())
720 }
721
722 #[cfg(not(feature = "postgres"))]
730 #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
731 pub async fn find_recurring_patterns(
732 &self,
733 min_count: u32,
734 window_days: u32,
735 ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
736 let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
737 "SELECT tool_sequence, sequence_hash, \
738 COUNT(*) as occurrence_count, \
739 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
740 FROM skill_usage_log \
741 WHERE created_at > datetime('now', '-' || ? || ' days') \
742 GROUP BY sequence_hash \
743 HAVING occurrence_count >= ? \
744 ORDER BY occurrence_count DESC \
745 LIMIT 10"
746 ))
747 .bind(window_days)
748 .bind(min_count)
749 .fetch_all(&self.pool)
750 .await?;
751
752 Ok(rows
753 .into_iter()
754 .map(|(seq, hash, occ, suc)| {
755 (
756 seq,
757 hash,
758 u32::try_from(occ).unwrap_or(u32::MAX),
759 u32::try_from(suc).unwrap_or(0),
760 )
761 })
762 .collect())
763 }
764
765 #[cfg(feature = "postgres")]
771 #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
772 pub async fn find_recurring_patterns(
773 &self,
774 min_count: u32,
775 window_days: u32,
776 ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
777 let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
778 "SELECT tool_sequence, sequence_hash, \
779 COUNT(*) as occurrence_count, \
780 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
781 FROM skill_usage_log \
782 WHERE created_at > NOW() - INTERVAL '1 day' * ? \
783 GROUP BY sequence_hash, tool_sequence \
784 HAVING COUNT(*) >= ? \
785 ORDER BY occurrence_count DESC \
786 LIMIT 10"
787 ))
788 .bind(i64::from(window_days))
789 .bind(i64::from(min_count))
790 .fetch_all(&self.pool)
791 .await?;
792
793 Ok(rows
794 .into_iter()
795 .map(|(seq, hash, occ, suc)| {
796 (
797 seq,
798 hash,
799 u32::try_from(occ).unwrap_or(u32::MAX),
800 u32::try_from(suc).unwrap_or(0),
801 )
802 })
803 .collect())
804 }
805
806 #[cfg(not(feature = "postgres"))]
811 #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
812 pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
813 let result = zeph_db::query(sql!(
814 "DELETE FROM skill_usage_log \
815 WHERE created_at < datetime('now', '-' || ? || ' days')"
816 ))
817 .bind(retention_days)
818 .execute(&self.pool)
819 .await?;
820 Ok(result.rows_affected())
821 }
822
823 #[cfg(feature = "postgres")]
829 #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
830 pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
831 let result = zeph_db::query(sql!(
832 "DELETE FROM skill_usage_log \
833 WHERE created_at < NOW() - INTERVAL '1 day' * ?"
834 ))
835 .bind(i64::from(retention_days))
836 .execute(&self.pool)
837 .await?;
838 Ok(result.rows_affected())
839 }
840
841 #[tracing::instrument(skip_all, name = "memory.skills.insert_skill_heuristic")]
848 pub async fn insert_skill_heuristic(
849 &self,
850 skill_name: Option<&str>,
851 heuristic_text: &str,
852 confidence: f64,
853 ) -> Result<i64, MemoryError> {
854 let row: (i64,) = zeph_db::query_as(sql!(
855 "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) \
856 VALUES (?, ?, ?) RETURNING id"
857 ))
858 .bind(skill_name)
859 .bind(heuristic_text)
860 .bind(confidence)
861 .fetch_one(&self.pool)
862 .await?;
863 Ok(row.0)
864 }
865
866 #[cfg(not(feature = "postgres"))]
871 #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
872 pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
873 zeph_db::query(sql!(
874 "UPDATE skill_heuristics \
875 SET use_count = use_count + 1, updated_at = datetime('now') \
876 WHERE id = ?"
877 ))
878 .bind(id)
879 .execute(&self.pool)
880 .await?;
881 Ok(())
882 }
883
884 #[cfg(feature = "postgres")]
889 #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
890 pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
891 zeph_db::query(sql!(
892 "UPDATE skill_heuristics \
893 SET use_count = use_count + 1, updated_at = CURRENT_TIMESTAMP \
894 WHERE id = $1"
895 ))
896 .bind(id)
897 .execute(&self.pool)
898 .await?;
899 Ok(())
900 }
901
902 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_heuristics")]
910 pub async fn load_skill_heuristics(
911 &self,
912 skill_name: &str,
913 min_confidence: f64,
914 limit: u32,
915 ) -> Result<Vec<(i64, String, f64, i64)>, MemoryError> {
916 let rows: Vec<(i64, String, f64, i64)> = zeph_db::query_as(sql!(
917 "SELECT id, heuristic_text, confidence, use_count \
918 FROM skill_heuristics \
919 WHERE (skill_name = ? OR skill_name IS NULL) \
920 AND confidence >= ? \
921 ORDER BY confidence DESC \
922 LIMIT ?"
923 ))
924 .bind(skill_name)
925 .bind(min_confidence)
926 .bind(i64::from(limit))
927 .fetch_all(&self.pool)
928 .await?;
929 Ok(rows)
930 }
931
932 #[tracing::instrument(skip_all, name = "memory.skills.load_all_heuristics_for_skill")]
939 pub async fn load_all_heuristics_for_skill(
940 &self,
941 skill_name: Option<&str>,
942 ) -> Result<Vec<(i64, String)>, MemoryError> {
943 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
944 "SELECT id, heuristic_text FROM skill_heuristics \
945 WHERE (skill_name = ? OR (? IS NULL AND skill_name IS NULL))"
946 ))
947 .bind(skill_name)
948 .bind(skill_name)
949 .fetch_all(&self.pool)
950 .await?;
951 Ok(rows)
952 }
953
954 #[cfg(not(feature = "postgres"))]
965 #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
966 pub async fn find_step_corrections(
967 &self,
968 skill_name: &str,
969 failure_kind: &str,
970 error_context: &str,
971 tool_name: &str,
972 limit: u32,
973 ) -> Result<Vec<(i64, String)>, MemoryError> {
974 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
975 "SELECT id, hint FROM step_corrections \
976 WHERE skill_name = ? \
977 AND (failure_kind = '' OR failure_kind = ?) \
978 AND (error_substring = '' OR INSTR(?, error_substring) > 0) \
979 AND (tool_name = '' OR tool_name = ?) \
980 ORDER BY success_count DESC, use_count DESC \
981 LIMIT ?"
982 ))
983 .bind(skill_name)
984 .bind(failure_kind)
985 .bind(error_context)
986 .bind(tool_name)
987 .bind(limit)
988 .fetch_all(&self.pool)
989 .await?;
990 Ok(rows)
991 }
992
993 #[cfg(feature = "postgres")]
999 #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
1000 pub async fn find_step_corrections(
1001 &self,
1002 skill_name: &str,
1003 failure_kind: &str,
1004 error_context: &str,
1005 tool_name: &str,
1006 limit: u32,
1007 ) -> Result<Vec<(i64, String)>, MemoryError> {
1008 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
1009 "SELECT id, hint FROM step_corrections \
1010 WHERE skill_name = ? \
1011 AND (failure_kind = '' OR failure_kind = ?) \
1012 AND (error_substring = '' OR strpos(?, error_substring) > 0) \
1013 AND (tool_name = '' OR tool_name = ?) \
1014 ORDER BY success_count DESC, use_count DESC \
1015 LIMIT ?"
1016 ))
1017 .bind(skill_name)
1018 .bind(failure_kind)
1019 .bind(error_context)
1020 .bind(tool_name)
1021 .bind(i64::from(limit))
1022 .fetch_all(&self.pool)
1023 .await?;
1024 Ok(rows)
1025 }
1026
1027 #[tracing::instrument(skip_all, name = "memory.skills.insert_step_correction")]
1036 pub async fn insert_step_correction(
1037 &self,
1038 skill_name: &str,
1039 failure_kind: &str,
1040 error_substring: &str,
1041 tool_name: &str,
1042 hint: &str,
1043 ) -> Result<(), MemoryError> {
1044 zeph_db::query(sql!(
1045 "INSERT INTO step_corrections \
1046 (skill_name, failure_kind, error_substring, tool_name, hint) \
1047 VALUES (?, ?, ?, ?, ?) \
1048 ON CONFLICT (skill_name, failure_kind, error_substring, tool_name) DO NOTHING"
1049 ))
1050 .bind(skill_name)
1051 .bind(failure_kind)
1052 .bind(error_substring)
1053 .bind(tool_name)
1054 .bind(hint)
1055 .execute(&self.pool)
1056 .await?;
1057 Ok(())
1058 }
1059
1060 #[tracing::instrument(skip_all, name = "memory.skills.record_correction_usage")]
1066 pub async fn record_correction_usage(
1067 &self,
1068 correction_id: i64,
1069 was_successful: bool,
1070 ) -> Result<(), MemoryError> {
1071 if was_successful {
1072 zeph_db::query(sql!(
1073 "UPDATE step_corrections \
1074 SET use_count = use_count + 1, success_count = success_count + 1 \
1075 WHERE id = ?"
1076 ))
1077 .bind(correction_id)
1078 .execute(&self.pool)
1079 .await?;
1080 } else {
1081 zeph_db::query(sql!(
1082 "UPDATE step_corrections SET use_count = use_count + 1 WHERE id = ?"
1083 ))
1084 .bind(correction_id)
1085 .execute(&self.pool)
1086 .await?;
1087 }
1088 Ok(())
1089 }
1090
1091 #[tracing::instrument(skip_all, name = "memory.skills.load_routing_head_weights")]
1101 #[allow(clippy::type_complexity)]
1102 pub async fn load_routing_head_weights(
1103 &self,
1104 ) -> Result<Option<(i64, Vec<u8>, f64, i64)>, MemoryError> {
1105 let row: Option<(i64, Vec<u8>, f64, i64)> = zeph_db::query_as(sql!(
1106 "SELECT embed_dim, weights, baseline, update_count \
1107 FROM routing_head_weights WHERE id = 1"
1108 ))
1109 .fetch_optional(&self.pool)
1110 .await?;
1111 Ok(row)
1112 }
1113
1114 #[cfg(not(feature = "postgres"))]
1120 #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1121 pub async fn save_routing_head_weights(
1122 &self,
1123 embed_dim: i64,
1124 weights: &[u8],
1125 baseline: f64,
1126 update_count: i64,
1127 ) -> Result<(), MemoryError> {
1128 zeph_db::query(sql!(
1129 "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1130 VALUES (1, ?, ?, ?, ?, datetime('now')) \
1131 ON CONFLICT(id) DO UPDATE SET \
1132 embed_dim = excluded.embed_dim, \
1133 weights = excluded.weights, \
1134 baseline = excluded.baseline, \
1135 update_count = excluded.update_count, \
1136 updated_at = datetime('now')"
1137 ))
1138 .bind(embed_dim)
1139 .bind(weights)
1140 .bind(baseline)
1141 .bind(update_count)
1142 .execute(&self.pool)
1143 .await?;
1144 Ok(())
1145 }
1146
1147 #[cfg(feature = "postgres")]
1153 #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1154 pub async fn save_routing_head_weights(
1155 &self,
1156 embed_dim: i64,
1157 weights: &[u8],
1158 baseline: f64,
1159 update_count: i64,
1160 ) -> Result<(), MemoryError> {
1161 zeph_db::query(sql!(
1162 "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1163 VALUES (1, $1, $2, $3, $4, CURRENT_TIMESTAMP) \
1164 ON CONFLICT(id) DO UPDATE SET \
1165 embed_dim = excluded.embed_dim, \
1166 weights = excluded.weights, \
1167 baseline = excluded.baseline, \
1168 update_count = excluded.update_count, \
1169 updated_at = CURRENT_TIMESTAMP"
1170 ))
1171 .bind(embed_dim)
1172 .bind(weights)
1173 .bind(baseline)
1174 .bind(update_count)
1175 .execute(&self.pool)
1176 .await?;
1177 Ok(())
1178 }
1179
1180 #[tracing::instrument(skip_all, name = "memory.skills.count_heuristics_by_skill")]
1191 pub async fn count_heuristics_by_skill(
1192 &self,
1193 min_confidence: f64,
1194 min_count: u32,
1195 ) -> Result<Vec<(String, i64)>, MemoryError> {
1196 let rows: Vec<(String, i64)> = zeph_db::query_as(sql!(
1197 "SELECT skill_name, COUNT(*) AS cnt \
1198 FROM skill_heuristics \
1199 WHERE confidence >= ? AND skill_name IS NOT NULL \
1200 GROUP BY skill_name \
1201 HAVING cnt >= ?"
1202 ))
1203 .bind(min_confidence)
1204 .bind(i64::from(min_count))
1205 .fetch_all(&self.pool)
1206 .await?;
1207 Ok(rows)
1208 }
1209
1210 #[tracing::instrument(skip_all, name = "memory.skills.load_heuristic_texts_for_promotion")]
1218 pub async fn load_heuristic_texts_for_promotion(
1219 &self,
1220 skill_name: &str,
1221 min_confidence: f64,
1222 ) -> Result<Vec<String>, MemoryError> {
1223 let rows: Vec<(String,)> = zeph_db::query_as(sql!(
1224 "SELECT heuristic_text \
1225 FROM skill_heuristics \
1226 WHERE skill_name = ? AND confidence >= ? \
1227 ORDER BY heuristic_text ASC"
1228 ))
1229 .bind(skill_name)
1230 .bind(min_confidence)
1231 .fetch_all(&self.pool)
1232 .await?;
1233 Ok(rows.into_iter().map(|r| r.0).collect())
1234 }
1235
1236 #[tracing::instrument(skip_all, name = "memory.skills.promotion_already_evaluated")]
1244 pub async fn promotion_already_evaluated(
1245 &self,
1246 skill_name: &str,
1247 batch_hash: &str,
1248 ) -> Result<bool, MemoryError> {
1249 let count: i64 = zeph_db::query_scalar(sql!(
1250 "SELECT COUNT(*) FROM skill_heuristic_promotions \
1251 WHERE skill_name = ? AND batch_hash = ?"
1252 ))
1253 .bind(skill_name)
1254 .bind(batch_hash)
1255 .fetch_one(&self.pool)
1256 .await?;
1257 Ok(count > 0)
1258 }
1259
1260 #[tracing::instrument(skip_all, name = "memory.skills.record_promotion_evaluation")]
1269 pub async fn record_promotion_evaluation(
1270 &self,
1271 skill_name: &str,
1272 batch_hash: &str,
1273 recommendation: &str,
1274 draft_skill_name: Option<&str>,
1275 ) -> Result<(), MemoryError> {
1276 let now = i64::try_from(
1277 std::time::SystemTime::now()
1278 .duration_since(std::time::UNIX_EPOCH)
1279 .unwrap_or_default()
1280 .as_secs(),
1281 )
1282 .unwrap_or(i64::MAX);
1283 zeph_db::query(sql!(
1284 "INSERT OR IGNORE INTO skill_heuristic_promotions \
1285 (skill_name, batch_hash, evaluated_at, recommendation, draft_skill_name) \
1286 VALUES (?, ?, ?, ?, ?)"
1287 ))
1288 .bind(skill_name)
1289 .bind(batch_hash)
1290 .bind(now)
1291 .bind(recommendation)
1292 .bind(draft_skill_name)
1293 .execute(&self.pool)
1294 .await?;
1295 Ok(())
1296 }
1297}
1298
1299#[cfg(test)]
1300mod tests {
1301 use std::time::Duration;
1302
1303 use tokio::time::sleep;
1304
1305 use super::*;
1306
1307 async fn test_store() -> SqliteStore {
1308 SqliteStore::new(":memory:").await.unwrap()
1309 }
1310
1311 #[tokio::test]
1312 async fn record_skill_usage_increments() {
1313 let store = test_store().await;
1314
1315 store.record_skill_usage(&["git"]).await.unwrap();
1316 store.record_skill_usage(&["git"]).await.unwrap();
1317
1318 let usage = store.load_skill_usage().await.unwrap();
1319 assert_eq!(usage.len(), 1);
1320 assert_eq!(usage[0].skill_name, "git");
1321 assert_eq!(usage[0].invocation_count, 2);
1322 }
1323
1324 #[tokio::test]
1325 async fn load_skill_usage_returns_all() {
1326 let store = test_store().await;
1327
1328 store.record_skill_usage(&["git", "docker"]).await.unwrap();
1329 store.record_skill_usage(&["git"]).await.unwrap();
1330
1331 let usage = store.load_skill_usage().await.unwrap();
1332 assert_eq!(usage.len(), 2);
1333 assert_eq!(usage[0].skill_name, "git");
1334 assert_eq!(usage[0].invocation_count, 2);
1335 assert_eq!(usage[1].skill_name, "docker");
1336 assert_eq!(usage[1].invocation_count, 1);
1337 }
1338
1339 #[tokio::test]
1340 async fn migration_005_creates_tables() {
1341 let store = test_store().await;
1342 let pool = store.pool();
1343
1344 let versions: (i64,) = zeph_db::query_as(sql!(
1345 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
1346 ))
1347 .fetch_one(pool)
1348 .await
1349 .unwrap();
1350 assert_eq!(versions.0, 1);
1351
1352 let outcomes: (i64,) = zeph_db::query_as(sql!(
1353 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
1354 ))
1355 .fetch_one(pool)
1356 .await
1357 .unwrap();
1358 assert_eq!(outcomes.0, 1);
1359 }
1360
1361 #[tokio::test]
1362 async fn record_skill_outcome_inserts() {
1363 let store = test_store().await;
1364
1365 store
1366 .record_skill_outcome(
1367 "git",
1368 None,
1369 Some(crate::types::ConversationId(1)),
1370 "success",
1371 None,
1372 None,
1373 )
1374 .await
1375 .unwrap();
1376 store
1377 .record_skill_outcome(
1378 "git",
1379 None,
1380 Some(crate::types::ConversationId(1)),
1381 "tool_failure",
1382 Some("exit code 1"),
1383 None,
1384 )
1385 .await
1386 .unwrap();
1387
1388 let metrics = store.skill_metrics("git").await.unwrap().unwrap();
1389 assert_eq!(metrics.total, 2);
1390 assert_eq!(metrics.successes, 1);
1391 assert_eq!(metrics.failures, 1);
1392 }
1393
1394 #[tokio::test]
1395 async fn skill_metrics_none_for_unknown() {
1396 let store = test_store().await;
1397 let m = store.skill_metrics("nonexistent").await.unwrap();
1398 assert!(m.is_none());
1399 }
1400
1401 #[tokio::test]
1402 async fn load_skill_outcome_stats_grouped() {
1403 let store = test_store().await;
1404
1405 store
1406 .record_skill_outcome("git", None, None, "success", None, None)
1407 .await
1408 .unwrap();
1409 store
1410 .record_skill_outcome("git", None, None, "tool_failure", None, None)
1411 .await
1412 .unwrap();
1413 store
1414 .record_skill_outcome("docker", None, None, "success", None, None)
1415 .await
1416 .unwrap();
1417
1418 let stats = store.load_skill_outcome_stats().await.unwrap();
1419 assert_eq!(stats.len(), 2);
1420 assert_eq!(stats[0].skill_name, "git");
1421 assert_eq!(stats[0].total, 2);
1422 assert_eq!(stats[1].skill_name, "docker");
1423 assert_eq!(stats[1].total, 1);
1424 }
1425
1426 #[tokio::test]
1427 async fn save_and_load_skill_version() {
1428 let store = test_store().await;
1429
1430 let id = store
1431 .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
1432 .await
1433 .unwrap();
1434 assert!(id > 0);
1435
1436 store.activate_skill_version("git", id).await.unwrap();
1437
1438 let active = store.active_skill_version("git").await.unwrap().unwrap();
1439 assert_eq!(active.version, 1);
1440 assert_eq!(active.body, "body v1");
1441 assert!(active.is_active);
1442 }
1443
1444 #[tokio::test]
1445 async fn activate_deactivates_previous() {
1446 let store = test_store().await;
1447
1448 let v1 = store
1449 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1450 .await
1451 .unwrap();
1452 store.activate_skill_version("git", v1).await.unwrap();
1453
1454 let v2 = store
1455 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1456 .await
1457 .unwrap();
1458 store.activate_skill_version("git", v2).await.unwrap();
1459
1460 let versions = store.load_skill_versions("git").await.unwrap();
1461 assert_eq!(versions.len(), 2);
1462 assert!(!versions[0].is_active);
1463 assert!(versions[1].is_active);
1464 }
1465
1466 #[tokio::test]
1467 async fn next_skill_version_increments() {
1468 let store = test_store().await;
1469
1470 let next = store.next_skill_version("git").await.unwrap();
1471 assert_eq!(next, 1);
1472
1473 store
1474 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1475 .await
1476 .unwrap();
1477 let next = store.next_skill_version("git").await.unwrap();
1478 assert_eq!(next, 2);
1479 }
1480
1481 #[tokio::test]
1482 async fn last_improvement_time_returns_auto_only() {
1483 let store = test_store().await;
1484
1485 store
1486 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1487 .await
1488 .unwrap();
1489
1490 let t = store.last_improvement_time("git").await.unwrap();
1491 assert!(t.is_none());
1492
1493 store
1494 .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1495 .await
1496 .unwrap();
1497
1498 let t = store.last_improvement_time("git").await.unwrap();
1499 assert!(t.is_some());
1500 }
1501
1502 #[tokio::test]
1503 async fn ensure_skill_version_exists_idempotent() {
1504 let store = test_store().await;
1505
1506 store
1507 .ensure_skill_version_exists("git", "body", "Git helper")
1508 .await
1509 .unwrap();
1510 store
1511 .ensure_skill_version_exists("git", "body2", "Git helper 2")
1512 .await
1513 .unwrap();
1514
1515 let versions = store.load_skill_versions("git").await.unwrap();
1516 assert_eq!(versions.len(), 1);
1517 assert_eq!(versions[0].body, "body");
1518 }
1519
1520 #[tokio::test]
1521 async fn load_skill_versions_ordered() {
1522 let store = test_store().await;
1523
1524 let v1 = store
1525 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1526 .await
1527 .unwrap();
1528 store
1529 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1530 .await
1531 .unwrap();
1532
1533 let versions = store.load_skill_versions("git").await.unwrap();
1534 assert_eq!(versions.len(), 2);
1535 assert_eq!(versions[0].version, 1);
1536 assert_eq!(versions[1].version, 2);
1537 }
1538
1539 #[tokio::test]
1540 async fn count_auto_versions_only() {
1541 let store = test_store().await;
1542
1543 store
1544 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1545 .await
1546 .unwrap();
1547 store
1548 .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1549 .await
1550 .unwrap();
1551 store
1552 .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
1553 .await
1554 .unwrap();
1555
1556 let count = store.count_auto_versions("git").await.unwrap();
1557 assert_eq!(count, 2);
1558 }
1559
1560 #[tokio::test]
1561 async fn prune_preserves_manual_and_active() {
1562 let store = test_store().await;
1563
1564 let v1 = store
1565 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1566 .await
1567 .unwrap();
1568 store.activate_skill_version("git", v1).await.unwrap();
1569
1570 for i in 2..=5 {
1571 store
1572 .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
1573 .await
1574 .unwrap();
1575 }
1576
1577 let pruned = store.prune_skill_versions("git", 2).await.unwrap();
1578 assert_eq!(pruned, 2);
1579
1580 let versions = store.load_skill_versions("git").await.unwrap();
1581 assert!(versions.iter().any(|v| v.source == "manual"));
1582 let auto_count = versions.iter().filter(|v| v.source == "auto").count();
1583 assert_eq!(auto_count, 2);
1584 }
1585
1586 #[tokio::test]
1587 async fn predecessor_version_returns_parent() {
1588 let store = test_store().await;
1589
1590 let v1 = store
1591 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1592 .await
1593 .unwrap();
1594 let v2 = store
1595 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1596 .await
1597 .unwrap();
1598
1599 let pred = store.predecessor_version(v2).await.unwrap().unwrap();
1600 assert_eq!(pred.id, v1);
1601 assert_eq!(pred.version, 1);
1602 }
1603
1604 #[tokio::test]
1605 async fn predecessor_version_none_for_root() {
1606 let store = test_store().await;
1607
1608 let v1 = store
1609 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1610 .await
1611 .unwrap();
1612
1613 let pred = store.predecessor_version(v1).await.unwrap();
1614 assert!(pred.is_none());
1615 }
1616
1617 #[tokio::test]
1618 async fn active_skill_version_none_for_unknown() {
1619 let store = test_store().await;
1620 let active = store.active_skill_version("nonexistent").await.unwrap();
1621 assert!(active.is_none());
1622 }
1623
1624 #[tokio::test]
1625 async fn load_skill_outcome_stats_empty() {
1626 let store = test_store().await;
1627 let stats = store.load_skill_outcome_stats().await.unwrap();
1628 assert!(stats.is_empty());
1629 }
1630
1631 #[tokio::test]
1632 async fn load_skill_versions_empty() {
1633 let store = test_store().await;
1634 let versions = store.load_skill_versions("nonexistent").await.unwrap();
1635 assert!(versions.is_empty());
1636 }
1637
1638 #[tokio::test]
1639 async fn count_auto_versions_zero_for_unknown() {
1640 let store = test_store().await;
1641 let count = store.count_auto_versions("nonexistent").await.unwrap();
1642 assert_eq!(count, 0);
1643 }
1644
1645 #[tokio::test]
1646 async fn prune_nothing_when_below_limit() {
1647 let store = test_store().await;
1648
1649 store
1650 .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
1651 .await
1652 .unwrap();
1653
1654 let pruned = store.prune_skill_versions("git", 5).await.unwrap();
1655 assert_eq!(pruned, 0);
1656 }
1657
1658 #[tokio::test]
1659 async fn record_skill_outcome_with_error_context() {
1660 let store = test_store().await;
1661
1662 store
1663 .record_skill_outcome(
1664 "docker",
1665 None,
1666 Some(crate::types::ConversationId(1)),
1667 "tool_failure",
1668 Some("container not found"),
1669 None,
1670 )
1671 .await
1672 .unwrap();
1673
1674 let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
1675 assert_eq!(metrics.total, 1);
1676 assert_eq!(metrics.failures, 1);
1677 }
1678
1679 #[tokio::test]
1680 async fn save_skill_version_with_error_context() {
1681 let store = test_store().await;
1682
1683 let id = store
1684 .save_skill_version(
1685 "git",
1686 1,
1687 "improved body",
1688 "Git helper",
1689 "auto",
1690 Some("exit code 128"),
1691 None,
1692 )
1693 .await
1694 .unwrap();
1695 assert!(id > 0);
1696 }
1697
1698 #[tokio::test]
1699 async fn record_skill_outcomes_batch_resolves_version_id() {
1700 let store = test_store().await;
1701
1702 let vid = store
1703 .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1704 .await
1705 .unwrap();
1706 store.activate_skill_version("git", vid).await.unwrap();
1707
1708 store
1709 .record_skill_outcomes_batch(
1710 &["git".to_string()],
1711 None,
1712 "tool_failure",
1713 Some("exit code 1"),
1714 Some("exit_nonzero"),
1715 )
1716 .await
1717 .unwrap();
1718
1719 let pool = store.pool();
1720 let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
1721 "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
1722 ))
1723 .fetch_one(pool)
1724 .await
1725 .unwrap();
1726 assert_eq!(
1727 row.0,
1728 Some(vid),
1729 "version_id should be resolved to active version"
1730 );
1731 assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
1732 }
1733
1734 #[tokio::test]
1735 async fn record_skill_outcome_stores_outcome_detail() {
1736 let store = test_store().await;
1737
1738 store
1739 .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
1740 .await
1741 .unwrap();
1742
1743 let pool = store.pool();
1744 let row: (Option<String>,) = zeph_db::query_as(sql!(
1745 "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
1746 ))
1747 .fetch_one(pool)
1748 .await
1749 .unwrap();
1750 assert_eq!(row.0.as_deref(), Some("timeout"));
1751 }
1752
1753 #[tokio::test]
1754 async fn record_skill_outcomes_batch_waits_for_active_writer() {
1755 let file = tempfile::NamedTempFile::new().expect("tempfile");
1756 let path = file.path().to_str().expect("valid path").to_owned();
1757 let store = SqliteStore::with_pool_size(&path, 2)
1758 .await
1759 .expect("with_pool_size");
1760
1761 let vid = store
1762 .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1763 .await
1764 .unwrap();
1765 store.activate_skill_version("git", vid).await.unwrap();
1766
1767 let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1768 zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1769 .execute(&mut *writer_tx)
1770 .await
1771 .expect("hold write lock");
1772
1773 let batch_store = store.clone();
1774 let batch = tokio::spawn(async move {
1775 batch_store
1776 .record_skill_outcomes_batch(
1777 &["git".to_string()],
1778 None,
1779 "success",
1780 None,
1781 Some("waited_for_writer"),
1782 )
1783 .await
1784 });
1785
1786 sleep(Duration::from_millis(100)).await;
1787 writer_tx.commit().await.expect("commit writer");
1788
1789 batch
1790 .await
1791 .expect("join batch task")
1792 .expect("record outcomes");
1793
1794 let count: i64 = zeph_db::query_scalar(sql!(
1795 "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1796 ))
1797 .fetch_one(store.pool())
1798 .await
1799 .unwrap();
1800 assert_eq!(
1801 count, 1,
1802 "expected batch insert to succeed after writer commits"
1803 );
1804 }
1805
1806 #[tokio::test]
1807 async fn distinct_session_count_empty() {
1808 let store = test_store().await;
1809 let count = store.distinct_session_count("unknown-skill").await.unwrap();
1810 assert_eq!(count, 0);
1811 }
1812
1813 #[tokio::test]
1814 async fn distinct_session_count_single_session() {
1815 let store = test_store().await;
1816 let cid = crate::types::ConversationId(1);
1817 store
1818 .record_skill_outcome("git", None, Some(cid), "success", None, None)
1819 .await
1820 .unwrap();
1821 store
1822 .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1823 .await
1824 .unwrap();
1825 let count = store.distinct_session_count("git").await.unwrap();
1826 assert_eq!(count, 1);
1827 }
1828
1829 #[tokio::test]
1830 async fn distinct_session_count_multiple_sessions() {
1831 let store = test_store().await;
1832 for i in 0..3i64 {
1833 store
1834 .record_skill_outcome(
1835 "git",
1836 None,
1837 Some(crate::types::ConversationId(i)),
1838 "success",
1839 None,
1840 None,
1841 )
1842 .await
1843 .unwrap();
1844 }
1845 let count = store.distinct_session_count("git").await.unwrap();
1846 assert_eq!(count, 3);
1847 }
1848
1849 #[tokio::test]
1850 async fn distinct_session_count_null_conversation_ids_excluded() {
1851 let store = test_store().await;
1852 store
1854 .record_skill_outcome("git", None, None, "success", None, None)
1855 .await
1856 .unwrap();
1857 store
1858 .record_skill_outcome("git", None, None, "success", None, None)
1859 .await
1860 .unwrap();
1861 let count = store.distinct_session_count("git").await.unwrap();
1862 assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1863 }
1864
1865 #[tokio::test]
1868 async fn insert_and_find_recurring_patterns() {
1869 let store = test_store().await;
1870 let seq = r#"["shell","web_scrape"]"#;
1871 let hash = "abcdef0123456789";
1872 let ctx = "ctxhash000000000";
1873
1874 for _ in 0..3 {
1875 store
1876 .insert_tool_usage_log(seq, hash, ctx, "success", None)
1877 .await
1878 .unwrap();
1879 }
1880 store
1881 .insert_tool_usage_log(seq, hash, ctx, "failure", None)
1882 .await
1883 .unwrap();
1884
1885 let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1886 assert_eq!(patterns.len(), 1);
1887 let (s, h, occ, suc) = &patterns[0];
1888 assert_eq!(s, seq);
1889 assert_eq!(h, hash);
1890 assert_eq!(*occ, 4);
1891 assert_eq!(*suc, 3);
1892 }
1893
1894 #[tokio::test]
1895 async fn find_recurring_patterns_below_threshold_returns_empty() {
1896 let store = test_store().await;
1897 let seq = r#"["shell"]"#;
1898 let hash = "0000000000000001";
1899 let ctx = "0000000000000001";
1900
1901 store
1902 .insert_tool_usage_log(seq, hash, ctx, "success", None)
1903 .await
1904 .unwrap();
1905
1906 let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1907 assert!(patterns.is_empty());
1908 }
1909
1910 #[tokio::test]
1911 async fn prune_tool_usage_log_removes_old_rows() {
1912 let store = test_store().await;
1913 zeph_db::query(sql!(
1915 "INSERT INTO skill_usage_log \
1916 (tool_sequence, sequence_hash, context_hash, outcome, created_at) \
1917 VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
1918 ))
1919 .bind(r#"["shell"]"#)
1920 .bind("hash0000000000001")
1921 .bind("ctx00000000000001")
1922 .bind("success")
1923 .execute(store.pool())
1924 .await
1925 .unwrap();
1926
1927 let removed = store.prune_tool_usage_log(1).await.unwrap();
1929 assert_eq!(removed, 1);
1930 }
1931
1932 #[tokio::test]
1935 async fn insert_and_load_skill_heuristics() {
1936 let store = test_store().await;
1937
1938 let id = store
1939 .insert_skill_heuristic(Some("git"), "always commit in small chunks", 0.9)
1940 .await
1941 .unwrap();
1942 assert!(id > 0);
1943
1944 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1945 assert_eq!(rows.len(), 1);
1946 assert_eq!(rows[0].1, "always commit in small chunks");
1947 assert!((rows[0].2 - 0.9).abs() < 1e-6);
1948 }
1949
1950 #[tokio::test]
1951 async fn load_skill_heuristics_includes_general() {
1952 let store = test_store().await;
1953
1954 store
1955 .insert_skill_heuristic(None, "general tip", 0.7)
1956 .await
1957 .unwrap();
1958 store
1959 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1960 .await
1961 .unwrap();
1962
1963 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1965 assert_eq!(rows.len(), 2);
1966 }
1967
1968 #[tokio::test]
1969 async fn load_skill_heuristics_filters_by_min_confidence() {
1970 let store = test_store().await;
1971
1972 store
1973 .insert_skill_heuristic(Some("git"), "low confidence tip", 0.3)
1974 .await
1975 .unwrap();
1976 store
1977 .insert_skill_heuristic(Some("git"), "high confidence tip", 0.9)
1978 .await
1979 .unwrap();
1980
1981 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1982 assert_eq!(rows.len(), 1);
1983 assert_eq!(rows[0].1, "high confidence tip");
1984 }
1985
1986 #[tokio::test]
1987 async fn increment_heuristic_use_count_works() {
1988 let store = test_store().await;
1989
1990 let id = store
1991 .insert_skill_heuristic(Some("git"), "tip", 0.8)
1992 .await
1993 .unwrap();
1994
1995 store.increment_heuristic_use_count(id).await.unwrap();
1996 store.increment_heuristic_use_count(id).await.unwrap();
1997
1998 let rows = store.load_skill_heuristics("git", 0.0, 10).await.unwrap();
1999 assert_eq!(rows[0].3, 2); }
2001
2002 #[tokio::test]
2003 async fn load_all_heuristics_for_skill_exact_match() {
2004 let store = test_store().await;
2005
2006 store
2007 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
2008 .await
2009 .unwrap();
2010 store
2011 .insert_skill_heuristic(Some("docker"), "docker tip", 0.8)
2012 .await
2013 .unwrap();
2014
2015 let rows = store
2016 .load_all_heuristics_for_skill(Some("git"))
2017 .await
2018 .unwrap();
2019 assert_eq!(rows.len(), 1);
2020 assert_eq!(rows[0].1, "git tip");
2021 }
2022
2023 #[tokio::test]
2024 async fn load_all_heuristics_for_skill_null() {
2025 let store = test_store().await;
2026
2027 store
2028 .insert_skill_heuristic(None, "general", 0.8)
2029 .await
2030 .unwrap();
2031 store
2032 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
2033 .await
2034 .unwrap();
2035
2036 let rows = store.load_all_heuristics_for_skill(None).await.unwrap();
2037 assert_eq!(rows.len(), 1);
2038 assert_eq!(rows[0].1, "general");
2039 }
2040
2041 #[tokio::test]
2042 async fn skill_trust_default_is_quarantined() {
2043 let store = test_store().await;
2047
2048 zeph_db::query(sql!(
2050 "INSERT INTO skill_trust (skill_name, blake3_hash) VALUES ('test-arise', 'abc')"
2051 ))
2052 .execute(store.pool())
2053 .await
2054 .unwrap();
2055
2056 let trust: (String,) = zeph_db::query_as(sql!(
2057 "SELECT trust_level FROM skill_trust WHERE skill_name = 'test-arise'"
2058 ))
2059 .fetch_one(store.pool())
2060 .await
2061 .unwrap();
2062
2063 assert_eq!(
2064 trust.0, "quarantined",
2065 "schema default for skill_trust.trust_level must be 'quarantined'"
2066 );
2067 }
2068
2069 #[tokio::test]
2070 async fn save_and_activate_skill_version_activates_new() {
2071 let store = test_store().await;
2072
2073 let id = store
2074 .save_and_activate_skill_version(
2075 "git",
2076 1,
2077 "body v1",
2078 "Git helper",
2079 "manual",
2080 None,
2081 None,
2082 )
2083 .await
2084 .unwrap();
2085 assert!(id > 0);
2086
2087 let active = store.active_skill_version("git").await.unwrap().unwrap();
2088 assert_eq!(active.id, id);
2089 assert_eq!(active.version, 1);
2090 assert_eq!(active.body, "body v1");
2091 assert!(active.is_active);
2092 }
2093
2094 #[tokio::test]
2095 async fn save_and_activate_skill_version_deactivates_previous() {
2096 let store = test_store().await;
2097
2098 let v1 = store
2099 .save_and_activate_skill_version("git", 1, "v1", "desc", "manual", None, None)
2100 .await
2101 .unwrap();
2102
2103 let v2 = store
2104 .save_and_activate_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
2105 .await
2106 .unwrap();
2107
2108 let versions = store.load_skill_versions("git").await.unwrap();
2109 assert_eq!(versions.len(), 2);
2110
2111 let old = versions.iter().find(|v| v.id == v1).unwrap();
2112 let new = versions.iter().find(|v| v.id == v2).unwrap();
2113 assert!(!old.is_active, "previous version must be deactivated");
2114 assert!(new.is_active, "new version must be active");
2115 }
2116
2117 #[tokio::test]
2118 async fn save_and_activate_skill_version_only_one_active() {
2119 let store = test_store().await;
2120
2121 let mut last_id = 0i64;
2122 for i in 1i64..=4 {
2123 last_id = store
2124 .save_and_activate_skill_version(
2125 "git",
2126 i,
2127 &format!("v{i}"),
2128 "desc",
2129 "auto",
2130 None,
2131 None,
2132 )
2133 .await
2134 .unwrap();
2135 }
2136
2137 let versions = store.load_skill_versions("git").await.unwrap();
2138 let active_count = versions.iter().filter(|v| v.is_active).count();
2139 assert_eq!(active_count, 1, "exactly one version must be active");
2140 assert_eq!(
2141 versions.iter().find(|v| v.is_active).unwrap().id,
2142 last_id,
2143 "the last saved version must be the active one"
2144 );
2145 }
2146
2147 async fn insert_heuristic(store: &SqliteStore, skill: &str, text: &str, confidence: f64) {
2150 zeph_db::query(sql!(
2151 "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) VALUES (?, ?, ?)"
2152 ))
2153 .bind(skill)
2154 .bind(text)
2155 .bind(confidence)
2156 .execute(store.pool())
2157 .await
2158 .unwrap();
2159 }
2160
2161 #[tokio::test]
2164 async fn count_heuristics_by_skill_excludes_null_skill_name() {
2165 let store = test_store().await;
2166
2167 store
2169 .insert_skill_heuristic(None, "general tip 1", 0.9)
2170 .await
2171 .unwrap();
2172 store
2173 .insert_skill_heuristic(None, "general tip 2", 0.8)
2174 .await
2175 .unwrap();
2176 store
2177 .insert_skill_heuristic(None, "general tip 3", 0.7)
2178 .await
2179 .unwrap();
2180
2181 let results = store.count_heuristics_by_skill(0.5, 2).await.unwrap();
2183 assert!(
2184 results.is_empty(),
2185 "NULL skill_name heuristics must be excluded from count_heuristics_by_skill results"
2186 );
2187
2188 insert_heuristic(&store, "git", "use --no-pager", 0.8).await;
2190 insert_heuristic(&store, "git", "check status first", 0.9).await;
2191
2192 let results = store.count_heuristics_by_skill(0.5, 2).await.unwrap();
2193 assert_eq!(results.len(), 1, "only git should qualify");
2194 assert_eq!(results[0].0, "git");
2195 assert_eq!(results[0].1, 2, "git has exactly 2 qualifying heuristics");
2196 }
2197
2198 #[tokio::test]
2199 async fn count_heuristics_by_skill_threshold_and_confidence() {
2200 let store = test_store().await;
2201
2202 insert_heuristic(&store, "git", "use --no-pager", 0.8).await;
2204 insert_heuristic(&store, "git", "check status first", 0.9).await;
2205 insert_heuristic(&store, "git", "low confidence hint", 0.3).await;
2206
2207 insert_heuristic(&store, "docker", "use --rm", 0.7).await;
2209
2210 let results = store.count_heuristics_by_skill(0.5, 2).await.unwrap();
2211 assert_eq!(results.len(), 1, "only git meets threshold");
2212 assert_eq!(results[0].0, "git");
2213 assert_eq!(results[0].1, 2, "only heuristics >= 0.5 confidence counted");
2214 }
2215
2216 #[tokio::test]
2217 async fn load_heuristic_texts_sorted_for_deterministic_hash() {
2218 let store = test_store().await;
2219
2220 insert_heuristic(&store, "git", "zebra", 0.8).await;
2221 insert_heuristic(&store, "git", "alpha", 0.9).await;
2222 insert_heuristic(&store, "git", "middle", 0.7).await;
2223 insert_heuristic(&store, "git", "skipped", 0.2).await; let texts = store
2226 .load_heuristic_texts_for_promotion("git", 0.5)
2227 .await
2228 .unwrap();
2229
2230 assert_eq!(texts.len(), 3);
2231 assert_eq!(
2232 texts,
2233 vec!["alpha", "middle", "zebra"],
2234 "must be sorted ASC"
2235 );
2236 }
2237
2238 #[tokio::test]
2239 async fn promotion_already_evaluated_idempotency() {
2240 let store = test_store().await;
2241
2242 let found = store
2244 .promotion_already_evaluated("git", "abc123")
2245 .await
2246 .unwrap();
2247 assert!(!found);
2248
2249 store
2250 .record_promotion_evaluation("git", "abc123", "none", None)
2251 .await
2252 .unwrap();
2253
2254 let found = store
2256 .promotion_already_evaluated("git", "abc123")
2257 .await
2258 .unwrap();
2259 assert!(found);
2260 }
2261
2262 #[tokio::test]
2263 async fn record_promotion_evaluation_insert_or_ignore() {
2264 let store = test_store().await;
2265
2266 store
2268 .record_promotion_evaluation("git", "hash1", "body_enrichment", Some("git-v2"))
2269 .await
2270 .unwrap();
2271
2272 store
2274 .record_promotion_evaluation("git", "hash1", "new_skill", Some("git-extra"))
2275 .await
2276 .unwrap();
2277
2278 let rec: (String,) = zeph_db::query_as(sql!(
2280 "SELECT recommendation FROM skill_heuristic_promotions WHERE skill_name = ? AND batch_hash = ?"
2281 ))
2282 .bind("git")
2283 .bind("hash1")
2284 .fetch_one(store.pool())
2285 .await
2286 .unwrap();
2287 assert_eq!(
2288 rec.0, "body_enrichment",
2289 "first insert wins (INSERT OR IGNORE)"
2290 );
2291 }
2292}