1use super::SqliteStore;
5use crate::error::MemoryError;
6#[allow(unused_imports)]
7use zeph_db::{begin_write, sql};
8
9#[derive(Debug)]
11pub struct SkillUsageRow {
12 pub skill_name: String,
14 pub invocation_count: i64,
16 pub last_used_at: String,
18}
19
20#[derive(Debug)]
22pub struct SkillMetricsRow {
23 pub skill_name: String,
25 pub version_id: Option<i64>,
27 pub total: i64,
29 pub successes: i64,
31 pub failures: i64,
33}
34
35#[derive(Debug)]
37pub struct SkillVersionRow {
38 pub id: i64,
40 pub skill_name: String,
42 pub version: i64,
44 pub body: String,
46 pub description: String,
48 pub source: String,
50 pub is_active: bool,
52 pub success_count: i64,
54 pub failure_count: i64,
56 pub created_at: String,
58}
59
60type SkillVersionTuple = (
61 i64,
62 String,
63 i64,
64 String,
65 String,
66 String,
67 i64,
68 i64,
69 i64,
70 String,
71);
72
73fn skill_version_from_tuple(t: SkillVersionTuple) -> SkillVersionRow {
74 SkillVersionRow {
75 id: t.0,
76 skill_name: t.1,
77 version: t.2,
78 body: t.3,
79 description: t.4,
80 source: t.5,
81 is_active: t.6 != 0,
82 success_count: t.7,
83 failure_count: t.8,
84 created_at: t.9,
85 }
86}
87
88impl SqliteStore {
89 #[tracing::instrument(skip_all, name = "memory.skills.record_skill_usage")]
98 pub async fn record_skill_usage(&self, skill_names: &[&str]) -> Result<(), MemoryError> {
99 let mut tx = begin_write(&self.pool).await?;
100 for name in skill_names {
101 zeph_db::query(sql!(
102 "INSERT INTO skill_usage (skill_name, invocation_count, last_used_at) \
103 VALUES (?, 1, CURRENT_TIMESTAMP) \
104 ON CONFLICT(skill_name) DO UPDATE SET \
105 invocation_count = invocation_count + 1, \
106 last_used_at = CURRENT_TIMESTAMP"
107 ))
108 .bind(name)
109 .execute(&mut *tx)
110 .await?;
111 }
112 tx.commit().await?;
113 Ok(())
114 }
115
116 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_usage")]
122 pub async fn load_skill_usage(&self) -> Result<Vec<SkillUsageRow>, MemoryError> {
123 let rows: Vec<(String, i64, String)> = zeph_db::query_as(sql!(
124 "SELECT skill_name, invocation_count, last_used_at \
125 FROM skill_usage ORDER BY invocation_count DESC"
126 ))
127 .fetch_all(&self.pool)
128 .await?;
129
130 Ok(rows
131 .into_iter()
132 .map(
133 |(skill_name, invocation_count, last_used_at)| SkillUsageRow {
134 skill_name,
135 invocation_count,
136 last_used_at,
137 },
138 )
139 .collect())
140 }
141
142 #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcome")]
148 pub async fn record_skill_outcome(
149 &self,
150 skill_name: &str,
151 version_id: Option<i64>,
152 conversation_id: Option<crate::types::ConversationId>,
153 outcome: &str,
154 error_context: Option<&str>,
155 outcome_detail: Option<&str>,
156 ) -> Result<(), MemoryError> {
157 zeph_db::query(sql!(
158 "INSERT INTO skill_outcomes \
159 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
160 VALUES (?, ?, ?, ?, ?, ?)"
161 ))
162 .bind(skill_name)
163 .bind(version_id)
164 .bind(conversation_id)
165 .bind(outcome)
166 .bind(error_context)
167 .bind(outcome_detail)
168 .execute(&self.pool)
169 .await?;
170 Ok(())
171 }
172
173 #[tracing::instrument(skip_all, name = "memory.skills.record_skill_outcomes_batch")]
179 pub async fn record_skill_outcomes_batch(
180 &self,
181 skill_names: &[String],
182 conversation_id: Option<crate::types::ConversationId>,
183 outcome: &str,
184 error_context: Option<&str>,
185 outcome_detail: Option<&str>,
186 ) -> Result<(), MemoryError> {
187 let mut tx = begin_write(&self.pool).await?;
190
191 let mut version_map: std::collections::HashMap<String, Option<i64>> =
192 std::collections::HashMap::new();
193 for name in skill_names {
194 let vid: Option<(i64,)> = zeph_db::query_as(sql!(
195 "SELECT id FROM skill_versions WHERE skill_name = ? AND is_active = 1"
196 ))
197 .bind(name)
198 .fetch_optional(&mut *tx)
199 .await?;
200 version_map.insert(name.clone(), vid.map(|r| r.0));
201 }
202
203 for name in skill_names {
204 let version_id = version_map.get(name.as_str()).copied().flatten();
205 zeph_db::query(sql!(
206 "INSERT INTO skill_outcomes \
207 (skill_name, version_id, conversation_id, outcome, error_context, outcome_detail) \
208 VALUES (?, ?, ?, ?, ?, ?)"
209 ))
210 .bind(name)
211 .bind(version_id)
212 .bind(conversation_id)
213 .bind(outcome)
214 .bind(error_context)
215 .bind(outcome_detail)
216 .execute(&mut *tx)
217 .await?;
218 }
219 tx.commit().await?;
220 Ok(())
221 }
222
223 #[tracing::instrument(skip_all, name = "memory.skills.skill_metrics")]
229 pub async fn skill_metrics(
230 &self,
231 skill_name: &str,
232 ) -> Result<Option<SkillMetricsRow>, MemoryError> {
233 let row: Option<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
234 "SELECT skill_name, version_id, \
235 COUNT(*) as total, \
236 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
237 COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
238 FROM skill_outcomes WHERE skill_name = ? \
239 AND outcome NOT IN ('user_approval', 'user_rejection') \
240 GROUP BY skill_name, version_id \
241 ORDER BY version_id DESC LIMIT 1"
242 ))
243 .bind(skill_name)
244 .fetch_optional(&self.pool)
245 .await?;
246
247 Ok(row.map(
248 |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
249 skill_name,
250 version_id,
251 total,
252 successes,
253 failures,
254 },
255 ))
256 }
257
258 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_outcome_stats")]
264 pub async fn load_skill_outcome_stats(&self) -> Result<Vec<SkillMetricsRow>, MemoryError> {
265 let rows: Vec<(String, Option<i64>, i64, i64, i64)> = zeph_db::query_as(sql!(
266 "SELECT skill_name, version_id, \
267 COUNT(*) as total, \
268 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as successes, \
269 COUNT(*) - SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as failures \
270 FROM skill_outcomes \
271 GROUP BY skill_name \
272 ORDER BY total DESC"
273 ))
274 .fetch_all(&self.pool)
275 .await?;
276
277 Ok(rows
278 .into_iter()
279 .map(
280 |(skill_name, version_id, total, successes, failures)| SkillMetricsRow {
281 skill_name,
282 version_id,
283 total,
284 successes,
285 failures,
286 },
287 )
288 .collect())
289 }
290
291 #[allow(clippy::too_many_arguments)]
297 #[tracing::instrument(skip_all, name = "memory.skills.save_skill_version")]
299 pub async fn save_skill_version(
300 &self,
301 skill_name: &str,
302 version: i64,
303 body: &str,
304 description: &str,
305 source: &str,
306 error_context: Option<&str>,
307 predecessor_id: Option<i64>,
308 ) -> Result<i64, MemoryError> {
309 let row: (i64,) = zeph_db::query_as(sql!(
310 "INSERT INTO skill_versions \
311 (skill_name, version, body, description, source, error_context, predecessor_id) \
312 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
313 ))
314 .bind(skill_name)
315 .bind(version)
316 .bind(body)
317 .bind(description)
318 .bind(source)
319 .bind(error_context)
320 .bind(predecessor_id)
321 .fetch_one(&self.pool)
322 .await?;
323 Ok(row.0)
324 }
325
326 #[allow(clippy::too_many_arguments)]
337 #[tracing::instrument(skip_all, name = "memory.skills.save_and_activate_skill_version")]
338 pub async fn save_and_activate_skill_version(
339 &self,
340 skill_name: &str,
341 version: i64,
342 body: &str,
343 description: &str,
344 source: &str,
345 error_context: Option<&str>,
346 predecessor_id: Option<i64>,
347 ) -> Result<i64, MemoryError> {
348 let mut tx = begin_write(&self.pool).await?;
349
350 let row: (i64,) = zeph_db::query_as(sql!(
351 "INSERT INTO skill_versions \
352 (skill_name, version, body, description, source, error_context, predecessor_id) \
353 VALUES (?, ?, ?, ?, ?, ?, ?) RETURNING id"
354 ))
355 .bind(skill_name)
356 .bind(version)
357 .bind(body)
358 .bind(description)
359 .bind(source)
360 .bind(error_context)
361 .bind(predecessor_id)
362 .fetch_one(&mut *tx)
363 .await?;
364 let id = row.0;
365
366 zeph_db::query(sql!(
367 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
368 ))
369 .bind(skill_name)
370 .execute(&mut *tx)
371 .await?;
372
373 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
374 .bind(id)
375 .execute(&mut *tx)
376 .await?;
377
378 tx.commit().await?;
379 Ok(id)
380 }
381
382 #[tracing::instrument(skip_all, name = "memory.skills.distinct_session_count")]
391 pub async fn distinct_session_count(&self, skill_name: &str) -> Result<i64, MemoryError> {
392 let row: (i64,) = zeph_db::query_as(sql!(
393 "SELECT COUNT(DISTINCT conversation_id) FROM skill_outcomes \
394 WHERE skill_name = ? AND conversation_id IS NOT NULL"
395 ))
396 .bind(skill_name)
397 .fetch_one(&self.pool)
398 .await?;
399 Ok(row.0)
400 }
401
402 #[tracing::instrument(skip_all, name = "memory.skills.active_skill_version")]
408 pub async fn active_skill_version(
409 &self,
410 skill_name: &str,
411 ) -> Result<Option<SkillVersionRow>, MemoryError> {
412 let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
413 "SELECT id, skill_name, version, body, description, source, \
414 is_active, success_count, failure_count, created_at \
415 FROM skill_versions WHERE skill_name = ? AND is_active = 1 LIMIT 1"
416 ))
417 .bind(skill_name)
418 .fetch_optional(&self.pool)
419 .await?;
420
421 Ok(row.map(skill_version_from_tuple))
422 }
423
424 #[tracing::instrument(skip_all, name = "memory.skills.activate_skill_version")]
430 pub async fn activate_skill_version(
431 &self,
432 skill_name: &str,
433 version_id: i64,
434 ) -> Result<(), MemoryError> {
435 let mut tx = begin_write(&self.pool).await?;
436
437 zeph_db::query(sql!(
438 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
439 ))
440 .bind(skill_name)
441 .execute(&mut *tx)
442 .await?;
443
444 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
445 .bind(version_id)
446 .execute(&mut *tx)
447 .await?;
448
449 tx.commit().await?;
450 Ok(())
451 }
452
453 #[tracing::instrument(skip_all, name = "memory.skills.next_skill_version")]
459 pub async fn next_skill_version(&self, skill_name: &str) -> Result<i64, MemoryError> {
460 let row: (i64,) = zeph_db::query_as(sql!(
461 "SELECT COALESCE(MAX(version), 0) + 1 FROM skill_versions WHERE skill_name = ?"
462 ))
463 .bind(skill_name)
464 .fetch_one(&self.pool)
465 .await?;
466 Ok(row.0)
467 }
468
469 #[tracing::instrument(skip_all, name = "memory.skills.last_improvement_time")]
475 pub async fn last_improvement_time(
476 &self,
477 skill_name: &str,
478 ) -> Result<Option<String>, MemoryError> {
479 let row: Option<(String,)> = zeph_db::query_as(sql!(
480 "SELECT created_at FROM skill_versions \
481 WHERE skill_name = ? AND source = 'auto' \
482 ORDER BY id DESC LIMIT 1"
483 ))
484 .bind(skill_name)
485 .fetch_optional(&self.pool)
486 .await?;
487 Ok(row.map(|r| r.0))
488 }
489
490 #[tracing::instrument(skip_all, name = "memory.skills.ensure_skill_version_exists")]
500 pub async fn ensure_skill_version_exists(
501 &self,
502 skill_name: &str,
503 body: &str,
504 description: &str,
505 ) -> Result<(), MemoryError> {
506 let mut tx = begin_write(&self.pool).await?;
507
508 let existing: Option<(i64,)> = zeph_db::query_as(sql!(
509 "SELECT id FROM skill_versions WHERE skill_name = ? LIMIT 1"
510 ))
511 .bind(skill_name)
512 .fetch_optional(&mut *tx)
513 .await?;
514
515 if existing.is_none() {
516 let row: (i64,) = zeph_db::query_as(sql!(
517 "INSERT INTO skill_versions \
518 (skill_name, version, body, description, source, error_context, predecessor_id) \
519 VALUES (?, 1, ?, ?, 'manual', NULL, NULL) RETURNING id"
520 ))
521 .bind(skill_name)
522 .bind(body)
523 .bind(description)
524 .fetch_one(&mut *tx)
525 .await?;
526 let id = row.0;
527
528 zeph_db::query(sql!(
529 "UPDATE skill_versions SET is_active = 0 WHERE skill_name = ? AND is_active = 1"
530 ))
531 .bind(skill_name)
532 .execute(&mut *tx)
533 .await?;
534
535 zeph_db::query(sql!("UPDATE skill_versions SET is_active = 1 WHERE id = ?"))
536 .bind(id)
537 .execute(&mut *tx)
538 .await?;
539 }
540
541 tx.commit().await?;
542 Ok(())
543 }
544
545 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_versions")]
551 pub async fn load_skill_versions(
552 &self,
553 skill_name: &str,
554 ) -> Result<Vec<SkillVersionRow>, MemoryError> {
555 let rows: Vec<SkillVersionTuple> = zeph_db::query_as(sql!(
556 "SELECT id, skill_name, version, body, description, source, \
557 is_active, success_count, failure_count, created_at \
558 FROM skill_versions WHERE skill_name = ? ORDER BY version ASC"
559 ))
560 .bind(skill_name)
561 .fetch_all(&self.pool)
562 .await?;
563
564 Ok(rows.into_iter().map(skill_version_from_tuple).collect())
565 }
566
567 #[tracing::instrument(skip_all, name = "memory.skills.count_auto_versions")]
573 pub async fn count_auto_versions(&self, skill_name: &str) -> Result<i64, MemoryError> {
574 let row: (i64,) = zeph_db::query_as(sql!(
575 "SELECT COUNT(*) FROM skill_versions WHERE skill_name = ? AND source = 'auto'"
576 ))
577 .bind(skill_name)
578 .fetch_one(&self.pool)
579 .await?;
580 Ok(row.0)
581 }
582
583 #[cfg(not(feature = "postgres"))]
590 #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
591 pub async fn prune_skill_versions(
592 &self,
593 skill_name: &str,
594 max_versions: u32,
595 ) -> Result<u32, MemoryError> {
596 let result = zeph_db::query(sql!(
597 "DELETE FROM skill_versions WHERE id IN (\
598 SELECT id FROM skill_versions \
599 WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
600 ORDER BY id ASC \
601 LIMIT max(0, (SELECT COUNT(*) FROM skill_versions \
602 WHERE skill_name = ? AND source = 'auto') - ?)\
603 )"
604 ))
605 .bind(skill_name)
606 .bind(skill_name)
607 .bind(max_versions)
608 .execute(&self.pool)
609 .await?;
610 Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
611 }
612
613 #[cfg(feature = "postgres")]
619 #[tracing::instrument(skip_all, name = "memory.skills.prune_skill_versions")]
620 pub async fn prune_skill_versions(
621 &self,
622 skill_name: &str,
623 max_versions: u32,
624 ) -> Result<u32, MemoryError> {
625 let result = zeph_db::query(sql!(
626 "DELETE FROM skill_versions WHERE id IN (\
627 SELECT id FROM skill_versions \
628 WHERE skill_name = ? AND source = 'auto' AND is_active = 0 \
629 ORDER BY id DESC \
630 OFFSET ?\
631 )"
632 ))
633 .bind(skill_name)
634 .bind(i64::from(max_versions))
635 .execute(&self.pool)
636 .await?;
637 Ok(u32::try_from(result.rows_affected()).unwrap_or(0))
638 }
639
640 #[tracing::instrument(skip_all, name = "memory.skills.predecessor_version")]
646 pub async fn predecessor_version(
647 &self,
648 version_id: i64,
649 ) -> Result<Option<SkillVersionRow>, MemoryError> {
650 let pred_id: Option<(Option<i64>,)> = zeph_db::query_as(sql!(
651 "SELECT predecessor_id FROM skill_versions WHERE id = ?"
652 ))
653 .bind(version_id)
654 .fetch_optional(&self.pool)
655 .await?;
656
657 let Some((Some(pid),)) = pred_id else {
658 return Ok(None);
659 };
660
661 let row: Option<SkillVersionTuple> = zeph_db::query_as(sql!(
662 "SELECT id, skill_name, version, body, description, source, \
663 is_active, success_count, failure_count, created_at \
664 FROM skill_versions WHERE id = ?"
665 ))
666 .bind(pid)
667 .fetch_optional(&self.pool)
668 .await?;
669
670 Ok(row.map(skill_version_from_tuple))
671 }
672
673 #[tracing::instrument(skip_all, name = "memory.skills.list_active_auto_versions")]
680 pub async fn list_active_auto_versions(&self) -> Result<Vec<String>, MemoryError> {
681 let rows: Vec<(String,)> = zeph_db::query_as(sql!(
682 "SELECT skill_name FROM skill_versions WHERE is_active = 1 AND source = 'auto'"
683 ))
684 .fetch_all(&self.pool)
685 .await?;
686 Ok(rows.into_iter().map(|(name,)| name).collect())
687 }
688
689 #[tracing::instrument(skip_all, name = "memory.skills.insert_tool_usage_log")]
699 pub async fn insert_tool_usage_log(
700 &self,
701 tool_sequence: &str,
702 sequence_hash: &str,
703 context_hash: &str,
704 outcome: &str,
705 conversation_id: Option<crate::types::ConversationId>,
706 ) -> Result<(), MemoryError> {
707 zeph_db::query(sql!(
708 "INSERT INTO skill_usage_log \
709 (tool_sequence, sequence_hash, context_hash, outcome, conversation_id) \
710 VALUES (?, ?, ?, ?, ?)"
711 ))
712 .bind(tool_sequence)
713 .bind(sequence_hash)
714 .bind(context_hash)
715 .bind(outcome)
716 .bind(conversation_id)
717 .execute(&self.pool)
718 .await?;
719 Ok(())
720 }
721
722 #[cfg(not(feature = "postgres"))]
730 #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
731 pub async fn find_recurring_patterns(
732 &self,
733 min_count: u32,
734 window_days: u32,
735 ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
736 let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
737 "SELECT tool_sequence, sequence_hash, \
738 COUNT(*) as occurrence_count, \
739 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
740 FROM skill_usage_log \
741 WHERE created_at > datetime('now', '-' || ? || ' days') \
742 GROUP BY sequence_hash \
743 HAVING occurrence_count >= ? \
744 ORDER BY occurrence_count DESC \
745 LIMIT 10"
746 ))
747 .bind(window_days)
748 .bind(min_count)
749 .fetch_all(&self.pool)
750 .await?;
751
752 Ok(rows
753 .into_iter()
754 .map(|(seq, hash, occ, suc)| {
755 (
756 seq,
757 hash,
758 u32::try_from(occ).unwrap_or(u32::MAX),
759 u32::try_from(suc).unwrap_or(0),
760 )
761 })
762 .collect())
763 }
764
765 #[cfg(feature = "postgres")]
771 #[tracing::instrument(skip_all, name = "memory.skills.find_recurring_patterns")]
772 pub async fn find_recurring_patterns(
773 &self,
774 min_count: u32,
775 window_days: u32,
776 ) -> Result<Vec<(String, String, u32, u32)>, MemoryError> {
777 let rows: Vec<(String, String, i64, i64)> = zeph_db::query_as(sql!(
778 "SELECT tool_sequence, sequence_hash, \
779 COUNT(*) as occurrence_count, \
780 SUM(CASE WHEN outcome = 'success' THEN 1 ELSE 0 END) as success_count \
781 FROM skill_usage_log \
782 WHERE created_at > NOW() - INTERVAL '1 day' * ? \
783 GROUP BY sequence_hash, tool_sequence \
784 HAVING COUNT(*) >= ? \
785 ORDER BY occurrence_count DESC \
786 LIMIT 10"
787 ))
788 .bind(i64::from(window_days))
789 .bind(i64::from(min_count))
790 .fetch_all(&self.pool)
791 .await?;
792
793 Ok(rows
794 .into_iter()
795 .map(|(seq, hash, occ, suc)| {
796 (
797 seq,
798 hash,
799 u32::try_from(occ).unwrap_or(u32::MAX),
800 u32::try_from(suc).unwrap_or(0),
801 )
802 })
803 .collect())
804 }
805
806 #[cfg(not(feature = "postgres"))]
811 #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
812 pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
813 let result = zeph_db::query(sql!(
814 "DELETE FROM skill_usage_log \
815 WHERE created_at < datetime('now', '-' || ? || ' days')"
816 ))
817 .bind(retention_days)
818 .execute(&self.pool)
819 .await?;
820 Ok(result.rows_affected())
821 }
822
823 #[cfg(feature = "postgres")]
829 #[tracing::instrument(skip_all, name = "memory.skills.prune_tool_usage_log")]
830 pub async fn prune_tool_usage_log(&self, retention_days: u32) -> Result<u64, MemoryError> {
831 let result = zeph_db::query(sql!(
832 "DELETE FROM skill_usage_log \
833 WHERE created_at < NOW() - INTERVAL '1 day' * ?"
834 ))
835 .bind(i64::from(retention_days))
836 .execute(&self.pool)
837 .await?;
838 Ok(result.rows_affected())
839 }
840
841 #[tracing::instrument(skip_all, name = "memory.skills.insert_skill_heuristic")]
848 pub async fn insert_skill_heuristic(
849 &self,
850 skill_name: Option<&str>,
851 heuristic_text: &str,
852 confidence: f64,
853 ) -> Result<i64, MemoryError> {
854 let row: (i64,) = zeph_db::query_as(sql!(
855 "INSERT INTO skill_heuristics (skill_name, heuristic_text, confidence) \
856 VALUES (?, ?, ?) RETURNING id"
857 ))
858 .bind(skill_name)
859 .bind(heuristic_text)
860 .bind(confidence)
861 .fetch_one(&self.pool)
862 .await?;
863 Ok(row.0)
864 }
865
866 #[cfg(not(feature = "postgres"))]
871 #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
872 pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
873 zeph_db::query(sql!(
874 "UPDATE skill_heuristics \
875 SET use_count = use_count + 1, updated_at = datetime('now') \
876 WHERE id = ?"
877 ))
878 .bind(id)
879 .execute(&self.pool)
880 .await?;
881 Ok(())
882 }
883
884 #[cfg(feature = "postgres")]
889 #[tracing::instrument(skip_all, name = "memory.skills.increment_heuristic_use_count")]
890 pub async fn increment_heuristic_use_count(&self, id: i64) -> Result<(), MemoryError> {
891 zeph_db::query(sql!(
892 "UPDATE skill_heuristics \
893 SET use_count = use_count + 1, updated_at = CURRENT_TIMESTAMP \
894 WHERE id = $1"
895 ))
896 .bind(id)
897 .execute(&self.pool)
898 .await?;
899 Ok(())
900 }
901
902 #[tracing::instrument(skip_all, name = "memory.skills.load_skill_heuristics")]
910 pub async fn load_skill_heuristics(
911 &self,
912 skill_name: &str,
913 min_confidence: f64,
914 limit: u32,
915 ) -> Result<Vec<(i64, String, f64, i64)>, MemoryError> {
916 let rows: Vec<(i64, String, f64, i64)> = zeph_db::query_as(sql!(
917 "SELECT id, heuristic_text, confidence, use_count \
918 FROM skill_heuristics \
919 WHERE (skill_name = ? OR skill_name IS NULL) \
920 AND confidence >= ? \
921 ORDER BY confidence DESC \
922 LIMIT ?"
923 ))
924 .bind(skill_name)
925 .bind(min_confidence)
926 .bind(i64::from(limit))
927 .fetch_all(&self.pool)
928 .await?;
929 Ok(rows)
930 }
931
932 #[tracing::instrument(skip_all, name = "memory.skills.load_all_heuristics_for_skill")]
939 pub async fn load_all_heuristics_for_skill(
940 &self,
941 skill_name: Option<&str>,
942 ) -> Result<Vec<(i64, String)>, MemoryError> {
943 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
944 "SELECT id, heuristic_text FROM skill_heuristics \
945 WHERE (skill_name = ? OR (? IS NULL AND skill_name IS NULL))"
946 ))
947 .bind(skill_name)
948 .bind(skill_name)
949 .fetch_all(&self.pool)
950 .await?;
951 Ok(rows)
952 }
953
954 #[cfg(not(feature = "postgres"))]
965 #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
966 pub async fn find_step_corrections(
967 &self,
968 skill_name: &str,
969 failure_kind: &str,
970 error_context: &str,
971 tool_name: &str,
972 limit: u32,
973 ) -> Result<Vec<(i64, String)>, MemoryError> {
974 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
975 "SELECT id, hint FROM step_corrections \
976 WHERE skill_name = ? \
977 AND (failure_kind = '' OR failure_kind = ?) \
978 AND (error_substring = '' OR INSTR(?, error_substring) > 0) \
979 AND (tool_name = '' OR tool_name = ?) \
980 ORDER BY success_count DESC, use_count DESC \
981 LIMIT ?"
982 ))
983 .bind(skill_name)
984 .bind(failure_kind)
985 .bind(error_context)
986 .bind(tool_name)
987 .bind(limit)
988 .fetch_all(&self.pool)
989 .await?;
990 Ok(rows)
991 }
992
993 #[cfg(feature = "postgres")]
999 #[tracing::instrument(skip_all, name = "memory.skills.find_step_corrections")]
1000 pub async fn find_step_corrections(
1001 &self,
1002 skill_name: &str,
1003 failure_kind: &str,
1004 error_context: &str,
1005 tool_name: &str,
1006 limit: u32,
1007 ) -> Result<Vec<(i64, String)>, MemoryError> {
1008 let rows: Vec<(i64, String)> = zeph_db::query_as(sql!(
1009 "SELECT id, hint FROM step_corrections \
1010 WHERE skill_name = ? \
1011 AND (failure_kind = '' OR failure_kind = ?) \
1012 AND (error_substring = '' OR strpos(?, error_substring) > 0) \
1013 AND (tool_name = '' OR tool_name = ?) \
1014 ORDER BY success_count DESC, use_count DESC \
1015 LIMIT ?"
1016 ))
1017 .bind(skill_name)
1018 .bind(failure_kind)
1019 .bind(error_context)
1020 .bind(tool_name)
1021 .bind(i64::from(limit))
1022 .fetch_all(&self.pool)
1023 .await?;
1024 Ok(rows)
1025 }
1026
1027 #[tracing::instrument(skip_all, name = "memory.skills.insert_step_correction")]
1036 pub async fn insert_step_correction(
1037 &self,
1038 skill_name: &str,
1039 failure_kind: &str,
1040 error_substring: &str,
1041 tool_name: &str,
1042 hint: &str,
1043 ) -> Result<(), MemoryError> {
1044 zeph_db::query(sql!(
1045 "INSERT INTO step_corrections \
1046 (skill_name, failure_kind, error_substring, tool_name, hint) \
1047 VALUES (?, ?, ?, ?, ?) \
1048 ON CONFLICT (skill_name, failure_kind, error_substring, tool_name) DO NOTHING"
1049 ))
1050 .bind(skill_name)
1051 .bind(failure_kind)
1052 .bind(error_substring)
1053 .bind(tool_name)
1054 .bind(hint)
1055 .execute(&self.pool)
1056 .await?;
1057 Ok(())
1058 }
1059
1060 #[tracing::instrument(skip_all, name = "memory.skills.record_correction_usage")]
1066 pub async fn record_correction_usage(
1067 &self,
1068 correction_id: i64,
1069 was_successful: bool,
1070 ) -> Result<(), MemoryError> {
1071 if was_successful {
1072 zeph_db::query(sql!(
1073 "UPDATE step_corrections \
1074 SET use_count = use_count + 1, success_count = success_count + 1 \
1075 WHERE id = ?"
1076 ))
1077 .bind(correction_id)
1078 .execute(&self.pool)
1079 .await?;
1080 } else {
1081 zeph_db::query(sql!(
1082 "UPDATE step_corrections SET use_count = use_count + 1 WHERE id = ?"
1083 ))
1084 .bind(correction_id)
1085 .execute(&self.pool)
1086 .await?;
1087 }
1088 Ok(())
1089 }
1090
1091 #[tracing::instrument(skip_all, name = "memory.skills.load_routing_head_weights")]
1101 #[allow(clippy::type_complexity)]
1102 pub async fn load_routing_head_weights(
1103 &self,
1104 ) -> Result<Option<(i64, Vec<u8>, f64, i64)>, MemoryError> {
1105 let row: Option<(i64, Vec<u8>, f64, i64)> = zeph_db::query_as(sql!(
1106 "SELECT embed_dim, weights, baseline, update_count \
1107 FROM routing_head_weights WHERE id = 1"
1108 ))
1109 .fetch_optional(&self.pool)
1110 .await?;
1111 Ok(row)
1112 }
1113
1114 #[cfg(not(feature = "postgres"))]
1120 #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1121 pub async fn save_routing_head_weights(
1122 &self,
1123 embed_dim: i64,
1124 weights: &[u8],
1125 baseline: f64,
1126 update_count: i64,
1127 ) -> Result<(), MemoryError> {
1128 zeph_db::query(sql!(
1129 "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1130 VALUES (1, ?, ?, ?, ?, datetime('now')) \
1131 ON CONFLICT(id) DO UPDATE SET \
1132 embed_dim = excluded.embed_dim, \
1133 weights = excluded.weights, \
1134 baseline = excluded.baseline, \
1135 update_count = excluded.update_count, \
1136 updated_at = datetime('now')"
1137 ))
1138 .bind(embed_dim)
1139 .bind(weights)
1140 .bind(baseline)
1141 .bind(update_count)
1142 .execute(&self.pool)
1143 .await?;
1144 Ok(())
1145 }
1146
1147 #[cfg(feature = "postgres")]
1153 #[tracing::instrument(skip_all, name = "memory.skills.save_routing_head_weights")]
1154 pub async fn save_routing_head_weights(
1155 &self,
1156 embed_dim: i64,
1157 weights: &[u8],
1158 baseline: f64,
1159 update_count: i64,
1160 ) -> Result<(), MemoryError> {
1161 zeph_db::query(sql!(
1162 "INSERT INTO routing_head_weights (id, embed_dim, weights, baseline, update_count, updated_at) \
1163 VALUES (1, $1, $2, $3, $4, CURRENT_TIMESTAMP) \
1164 ON CONFLICT(id) DO UPDATE SET \
1165 embed_dim = excluded.embed_dim, \
1166 weights = excluded.weights, \
1167 baseline = excluded.baseline, \
1168 update_count = excluded.update_count, \
1169 updated_at = CURRENT_TIMESTAMP"
1170 ))
1171 .bind(embed_dim)
1172 .bind(weights)
1173 .bind(baseline)
1174 .bind(update_count)
1175 .execute(&self.pool)
1176 .await?;
1177 Ok(())
1178 }
1179}
1180
1181#[cfg(test)]
1182mod tests {
1183 use std::time::Duration;
1184
1185 use tokio::time::sleep;
1186
1187 use super::*;
1188
1189 async fn test_store() -> SqliteStore {
1190 SqliteStore::new(":memory:").await.unwrap()
1191 }
1192
1193 #[tokio::test]
1194 async fn record_skill_usage_increments() {
1195 let store = test_store().await;
1196
1197 store.record_skill_usage(&["git"]).await.unwrap();
1198 store.record_skill_usage(&["git"]).await.unwrap();
1199
1200 let usage = store.load_skill_usage().await.unwrap();
1201 assert_eq!(usage.len(), 1);
1202 assert_eq!(usage[0].skill_name, "git");
1203 assert_eq!(usage[0].invocation_count, 2);
1204 }
1205
1206 #[tokio::test]
1207 async fn load_skill_usage_returns_all() {
1208 let store = test_store().await;
1209
1210 store.record_skill_usage(&["git", "docker"]).await.unwrap();
1211 store.record_skill_usage(&["git"]).await.unwrap();
1212
1213 let usage = store.load_skill_usage().await.unwrap();
1214 assert_eq!(usage.len(), 2);
1215 assert_eq!(usage[0].skill_name, "git");
1216 assert_eq!(usage[0].invocation_count, 2);
1217 assert_eq!(usage[1].skill_name, "docker");
1218 assert_eq!(usage[1].invocation_count, 1);
1219 }
1220
1221 #[tokio::test]
1222 async fn migration_005_creates_tables() {
1223 let store = test_store().await;
1224 let pool = store.pool();
1225
1226 let versions: (i64,) = zeph_db::query_as(sql!(
1227 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_versions'"
1228 ))
1229 .fetch_one(pool)
1230 .await
1231 .unwrap();
1232 assert_eq!(versions.0, 1);
1233
1234 let outcomes: (i64,) = zeph_db::query_as(sql!(
1235 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='skill_outcomes'"
1236 ))
1237 .fetch_one(pool)
1238 .await
1239 .unwrap();
1240 assert_eq!(outcomes.0, 1);
1241 }
1242
1243 #[tokio::test]
1244 async fn record_skill_outcome_inserts() {
1245 let store = test_store().await;
1246
1247 store
1248 .record_skill_outcome(
1249 "git",
1250 None,
1251 Some(crate::types::ConversationId(1)),
1252 "success",
1253 None,
1254 None,
1255 )
1256 .await
1257 .unwrap();
1258 store
1259 .record_skill_outcome(
1260 "git",
1261 None,
1262 Some(crate::types::ConversationId(1)),
1263 "tool_failure",
1264 Some("exit code 1"),
1265 None,
1266 )
1267 .await
1268 .unwrap();
1269
1270 let metrics = store.skill_metrics("git").await.unwrap().unwrap();
1271 assert_eq!(metrics.total, 2);
1272 assert_eq!(metrics.successes, 1);
1273 assert_eq!(metrics.failures, 1);
1274 }
1275
1276 #[tokio::test]
1277 async fn skill_metrics_none_for_unknown() {
1278 let store = test_store().await;
1279 let m = store.skill_metrics("nonexistent").await.unwrap();
1280 assert!(m.is_none());
1281 }
1282
1283 #[tokio::test]
1284 async fn load_skill_outcome_stats_grouped() {
1285 let store = test_store().await;
1286
1287 store
1288 .record_skill_outcome("git", None, None, "success", None, None)
1289 .await
1290 .unwrap();
1291 store
1292 .record_skill_outcome("git", None, None, "tool_failure", None, None)
1293 .await
1294 .unwrap();
1295 store
1296 .record_skill_outcome("docker", None, None, "success", None, None)
1297 .await
1298 .unwrap();
1299
1300 let stats = store.load_skill_outcome_stats().await.unwrap();
1301 assert_eq!(stats.len(), 2);
1302 assert_eq!(stats[0].skill_name, "git");
1303 assert_eq!(stats[0].total, 2);
1304 assert_eq!(stats[1].skill_name, "docker");
1305 assert_eq!(stats[1].total, 1);
1306 }
1307
1308 #[tokio::test]
1309 async fn save_and_load_skill_version() {
1310 let store = test_store().await;
1311
1312 let id = store
1313 .save_skill_version("git", 1, "body v1", "Git helper", "manual", None, None)
1314 .await
1315 .unwrap();
1316 assert!(id > 0);
1317
1318 store.activate_skill_version("git", id).await.unwrap();
1319
1320 let active = store.active_skill_version("git").await.unwrap().unwrap();
1321 assert_eq!(active.version, 1);
1322 assert_eq!(active.body, "body v1");
1323 assert!(active.is_active);
1324 }
1325
1326 #[tokio::test]
1327 async fn activate_deactivates_previous() {
1328 let store = test_store().await;
1329
1330 let v1 = store
1331 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1332 .await
1333 .unwrap();
1334 store.activate_skill_version("git", v1).await.unwrap();
1335
1336 let v2 = store
1337 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1338 .await
1339 .unwrap();
1340 store.activate_skill_version("git", v2).await.unwrap();
1341
1342 let versions = store.load_skill_versions("git").await.unwrap();
1343 assert_eq!(versions.len(), 2);
1344 assert!(!versions[0].is_active);
1345 assert!(versions[1].is_active);
1346 }
1347
1348 #[tokio::test]
1349 async fn next_skill_version_increments() {
1350 let store = test_store().await;
1351
1352 let next = store.next_skill_version("git").await.unwrap();
1353 assert_eq!(next, 1);
1354
1355 store
1356 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1357 .await
1358 .unwrap();
1359 let next = store.next_skill_version("git").await.unwrap();
1360 assert_eq!(next, 2);
1361 }
1362
1363 #[tokio::test]
1364 async fn last_improvement_time_returns_auto_only() {
1365 let store = test_store().await;
1366
1367 store
1368 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1369 .await
1370 .unwrap();
1371
1372 let t = store.last_improvement_time("git").await.unwrap();
1373 assert!(t.is_none());
1374
1375 store
1376 .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1377 .await
1378 .unwrap();
1379
1380 let t = store.last_improvement_time("git").await.unwrap();
1381 assert!(t.is_some());
1382 }
1383
1384 #[tokio::test]
1385 async fn ensure_skill_version_exists_idempotent() {
1386 let store = test_store().await;
1387
1388 store
1389 .ensure_skill_version_exists("git", "body", "Git helper")
1390 .await
1391 .unwrap();
1392 store
1393 .ensure_skill_version_exists("git", "body2", "Git helper 2")
1394 .await
1395 .unwrap();
1396
1397 let versions = store.load_skill_versions("git").await.unwrap();
1398 assert_eq!(versions.len(), 1);
1399 assert_eq!(versions[0].body, "body");
1400 }
1401
1402 #[tokio::test]
1403 async fn load_skill_versions_ordered() {
1404 let store = test_store().await;
1405
1406 let v1 = store
1407 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1408 .await
1409 .unwrap();
1410 store
1411 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1412 .await
1413 .unwrap();
1414
1415 let versions = store.load_skill_versions("git").await.unwrap();
1416 assert_eq!(versions.len(), 2);
1417 assert_eq!(versions[0].version, 1);
1418 assert_eq!(versions[1].version, 2);
1419 }
1420
1421 #[tokio::test]
1422 async fn count_auto_versions_only() {
1423 let store = test_store().await;
1424
1425 store
1426 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1427 .await
1428 .unwrap();
1429 store
1430 .save_skill_version("git", 2, "v2", "desc", "auto", None, None)
1431 .await
1432 .unwrap();
1433 store
1434 .save_skill_version("git", 3, "v3", "desc", "auto", None, None)
1435 .await
1436 .unwrap();
1437
1438 let count = store.count_auto_versions("git").await.unwrap();
1439 assert_eq!(count, 2);
1440 }
1441
1442 #[tokio::test]
1443 async fn prune_preserves_manual_and_active() {
1444 let store = test_store().await;
1445
1446 let v1 = store
1447 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1448 .await
1449 .unwrap();
1450 store.activate_skill_version("git", v1).await.unwrap();
1451
1452 for i in 2..=5 {
1453 store
1454 .save_skill_version("git", i, &format!("v{i}"), "desc", "auto", None, None)
1455 .await
1456 .unwrap();
1457 }
1458
1459 let pruned = store.prune_skill_versions("git", 2).await.unwrap();
1460 assert_eq!(pruned, 2);
1461
1462 let versions = store.load_skill_versions("git").await.unwrap();
1463 assert!(versions.iter().any(|v| v.source == "manual"));
1464 let auto_count = versions.iter().filter(|v| v.source == "auto").count();
1465 assert_eq!(auto_count, 2);
1466 }
1467
1468 #[tokio::test]
1469 async fn predecessor_version_returns_parent() {
1470 let store = test_store().await;
1471
1472 let v1 = store
1473 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1474 .await
1475 .unwrap();
1476 let v2 = store
1477 .save_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1478 .await
1479 .unwrap();
1480
1481 let pred = store.predecessor_version(v2).await.unwrap().unwrap();
1482 assert_eq!(pred.id, v1);
1483 assert_eq!(pred.version, 1);
1484 }
1485
1486 #[tokio::test]
1487 async fn predecessor_version_none_for_root() {
1488 let store = test_store().await;
1489
1490 let v1 = store
1491 .save_skill_version("git", 1, "v1", "desc", "manual", None, None)
1492 .await
1493 .unwrap();
1494
1495 let pred = store.predecessor_version(v1).await.unwrap();
1496 assert!(pred.is_none());
1497 }
1498
1499 #[tokio::test]
1500 async fn active_skill_version_none_for_unknown() {
1501 let store = test_store().await;
1502 let active = store.active_skill_version("nonexistent").await.unwrap();
1503 assert!(active.is_none());
1504 }
1505
1506 #[tokio::test]
1507 async fn load_skill_outcome_stats_empty() {
1508 let store = test_store().await;
1509 let stats = store.load_skill_outcome_stats().await.unwrap();
1510 assert!(stats.is_empty());
1511 }
1512
1513 #[tokio::test]
1514 async fn load_skill_versions_empty() {
1515 let store = test_store().await;
1516 let versions = store.load_skill_versions("nonexistent").await.unwrap();
1517 assert!(versions.is_empty());
1518 }
1519
1520 #[tokio::test]
1521 async fn count_auto_versions_zero_for_unknown() {
1522 let store = test_store().await;
1523 let count = store.count_auto_versions("nonexistent").await.unwrap();
1524 assert_eq!(count, 0);
1525 }
1526
1527 #[tokio::test]
1528 async fn prune_nothing_when_below_limit() {
1529 let store = test_store().await;
1530
1531 store
1532 .save_skill_version("git", 1, "v1", "desc", "auto", None, None)
1533 .await
1534 .unwrap();
1535
1536 let pruned = store.prune_skill_versions("git", 5).await.unwrap();
1537 assert_eq!(pruned, 0);
1538 }
1539
1540 #[tokio::test]
1541 async fn record_skill_outcome_with_error_context() {
1542 let store = test_store().await;
1543
1544 store
1545 .record_skill_outcome(
1546 "docker",
1547 None,
1548 Some(crate::types::ConversationId(1)),
1549 "tool_failure",
1550 Some("container not found"),
1551 None,
1552 )
1553 .await
1554 .unwrap();
1555
1556 let metrics = store.skill_metrics("docker").await.unwrap().unwrap();
1557 assert_eq!(metrics.total, 1);
1558 assert_eq!(metrics.failures, 1);
1559 }
1560
1561 #[tokio::test]
1562 async fn save_skill_version_with_error_context() {
1563 let store = test_store().await;
1564
1565 let id = store
1566 .save_skill_version(
1567 "git",
1568 1,
1569 "improved body",
1570 "Git helper",
1571 "auto",
1572 Some("exit code 128"),
1573 None,
1574 )
1575 .await
1576 .unwrap();
1577 assert!(id > 0);
1578 }
1579
1580 #[tokio::test]
1581 async fn record_skill_outcomes_batch_resolves_version_id() {
1582 let store = test_store().await;
1583
1584 let vid = store
1585 .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1586 .await
1587 .unwrap();
1588 store.activate_skill_version("git", vid).await.unwrap();
1589
1590 store
1591 .record_skill_outcomes_batch(
1592 &["git".to_string()],
1593 None,
1594 "tool_failure",
1595 Some("exit code 1"),
1596 Some("exit_nonzero"),
1597 )
1598 .await
1599 .unwrap();
1600
1601 let pool = store.pool();
1602 let row: (Option<i64>, Option<String>) = zeph_db::query_as(sql!(
1603 "SELECT version_id, outcome_detail FROM skill_outcomes WHERE skill_name = 'git' LIMIT 1"
1604 ))
1605 .fetch_one(pool)
1606 .await
1607 .unwrap();
1608 assert_eq!(
1609 row.0,
1610 Some(vid),
1611 "version_id should be resolved to active version"
1612 );
1613 assert_eq!(row.1.as_deref(), Some("exit_nonzero"));
1614 }
1615
1616 #[tokio::test]
1617 async fn record_skill_outcome_stores_outcome_detail() {
1618 let store = test_store().await;
1619
1620 store
1621 .record_skill_outcome("docker", None, None, "tool_failure", None, Some("timeout"))
1622 .await
1623 .unwrap();
1624
1625 let pool = store.pool();
1626 let row: (Option<String>,) = zeph_db::query_as(sql!(
1627 "SELECT outcome_detail FROM skill_outcomes WHERE skill_name = 'docker' LIMIT 1"
1628 ))
1629 .fetch_one(pool)
1630 .await
1631 .unwrap();
1632 assert_eq!(row.0.as_deref(), Some("timeout"));
1633 }
1634
1635 #[tokio::test]
1636 async fn record_skill_outcomes_batch_waits_for_active_writer() {
1637 let file = tempfile::NamedTempFile::new().expect("tempfile");
1638 let path = file.path().to_str().expect("valid path").to_owned();
1639 let store = SqliteStore::with_pool_size(&path, 2)
1640 .await
1641 .expect("with_pool_size");
1642
1643 let vid = store
1644 .save_skill_version("git", 1, "body", "desc", "manual", None, None)
1645 .await
1646 .unwrap();
1647 store.activate_skill_version("git", vid).await.unwrap();
1648
1649 let mut writer_tx = begin_write(store.pool()).await.expect("begin immediate");
1650 zeph_db::query(sql!("INSERT INTO conversations DEFAULT VALUES"))
1651 .execute(&mut *writer_tx)
1652 .await
1653 .expect("hold write lock");
1654
1655 let batch_store = store.clone();
1656 let batch = tokio::spawn(async move {
1657 batch_store
1658 .record_skill_outcomes_batch(
1659 &["git".to_string()],
1660 None,
1661 "success",
1662 None,
1663 Some("waited_for_writer"),
1664 )
1665 .await
1666 });
1667
1668 sleep(Duration::from_millis(100)).await;
1669 writer_tx.commit().await.expect("commit writer");
1670
1671 batch
1672 .await
1673 .expect("join batch task")
1674 .expect("record outcomes");
1675
1676 let count: i64 = zeph_db::query_scalar(sql!(
1677 "SELECT COUNT(*) FROM skill_outcomes WHERE skill_name = 'git'"
1678 ))
1679 .fetch_one(store.pool())
1680 .await
1681 .unwrap();
1682 assert_eq!(
1683 count, 1,
1684 "expected batch insert to succeed after writer commits"
1685 );
1686 }
1687
1688 #[tokio::test]
1689 async fn distinct_session_count_empty() {
1690 let store = test_store().await;
1691 let count = store.distinct_session_count("unknown-skill").await.unwrap();
1692 assert_eq!(count, 0);
1693 }
1694
1695 #[tokio::test]
1696 async fn distinct_session_count_single_session() {
1697 let store = test_store().await;
1698 let cid = crate::types::ConversationId(1);
1699 store
1700 .record_skill_outcome("git", None, Some(cid), "success", None, None)
1701 .await
1702 .unwrap();
1703 store
1704 .record_skill_outcome("git", None, Some(cid), "tool_failure", None, None)
1705 .await
1706 .unwrap();
1707 let count = store.distinct_session_count("git").await.unwrap();
1708 assert_eq!(count, 1);
1709 }
1710
1711 #[tokio::test]
1712 async fn distinct_session_count_multiple_sessions() {
1713 let store = test_store().await;
1714 for i in 0..3i64 {
1715 store
1716 .record_skill_outcome(
1717 "git",
1718 None,
1719 Some(crate::types::ConversationId(i)),
1720 "success",
1721 None,
1722 None,
1723 )
1724 .await
1725 .unwrap();
1726 }
1727 let count = store.distinct_session_count("git").await.unwrap();
1728 assert_eq!(count, 3);
1729 }
1730
1731 #[tokio::test]
1732 async fn distinct_session_count_null_conversation_ids_excluded() {
1733 let store = test_store().await;
1734 store
1736 .record_skill_outcome("git", None, None, "success", None, None)
1737 .await
1738 .unwrap();
1739 store
1740 .record_skill_outcome("git", None, None, "success", None, None)
1741 .await
1742 .unwrap();
1743 let count = store.distinct_session_count("git").await.unwrap();
1744 assert_eq!(count, 0, "NULL conversation_ids must not be counted");
1745 }
1746
1747 #[tokio::test]
1750 async fn insert_and_find_recurring_patterns() {
1751 let store = test_store().await;
1752 let seq = r#"["shell","web_scrape"]"#;
1753 let hash = "abcdef0123456789";
1754 let ctx = "ctxhash000000000";
1755
1756 for _ in 0..3 {
1757 store
1758 .insert_tool_usage_log(seq, hash, ctx, "success", None)
1759 .await
1760 .unwrap();
1761 }
1762 store
1763 .insert_tool_usage_log(seq, hash, ctx, "failure", None)
1764 .await
1765 .unwrap();
1766
1767 let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1768 assert_eq!(patterns.len(), 1);
1769 let (s, h, occ, suc) = &patterns[0];
1770 assert_eq!(s, seq);
1771 assert_eq!(h, hash);
1772 assert_eq!(*occ, 4);
1773 assert_eq!(*suc, 3);
1774 }
1775
1776 #[tokio::test]
1777 async fn find_recurring_patterns_below_threshold_returns_empty() {
1778 let store = test_store().await;
1779 let seq = r#"["shell"]"#;
1780 let hash = "0000000000000001";
1781 let ctx = "0000000000000001";
1782
1783 store
1784 .insert_tool_usage_log(seq, hash, ctx, "success", None)
1785 .await
1786 .unwrap();
1787
1788 let patterns = store.find_recurring_patterns(3, 90).await.unwrap();
1789 assert!(patterns.is_empty());
1790 }
1791
1792 #[tokio::test]
1793 async fn prune_tool_usage_log_removes_old_rows() {
1794 let store = test_store().await;
1795 zeph_db::query(sql!(
1797 "INSERT INTO skill_usage_log \
1798 (tool_sequence, sequence_hash, context_hash, outcome, created_at) \
1799 VALUES (?, ?, ?, ?, datetime('now', '-2 days'))"
1800 ))
1801 .bind(r#"["shell"]"#)
1802 .bind("hash0000000000001")
1803 .bind("ctx00000000000001")
1804 .bind("success")
1805 .execute(store.pool())
1806 .await
1807 .unwrap();
1808
1809 let removed = store.prune_tool_usage_log(1).await.unwrap();
1811 assert_eq!(removed, 1);
1812 }
1813
1814 #[tokio::test]
1817 async fn insert_and_load_skill_heuristics() {
1818 let store = test_store().await;
1819
1820 let id = store
1821 .insert_skill_heuristic(Some("git"), "always commit in small chunks", 0.9)
1822 .await
1823 .unwrap();
1824 assert!(id > 0);
1825
1826 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1827 assert_eq!(rows.len(), 1);
1828 assert_eq!(rows[0].1, "always commit in small chunks");
1829 assert!((rows[0].2 - 0.9).abs() < 1e-6);
1830 }
1831
1832 #[tokio::test]
1833 async fn load_skill_heuristics_includes_general() {
1834 let store = test_store().await;
1835
1836 store
1837 .insert_skill_heuristic(None, "general tip", 0.7)
1838 .await
1839 .unwrap();
1840 store
1841 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1842 .await
1843 .unwrap();
1844
1845 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1847 assert_eq!(rows.len(), 2);
1848 }
1849
1850 #[tokio::test]
1851 async fn load_skill_heuristics_filters_by_min_confidence() {
1852 let store = test_store().await;
1853
1854 store
1855 .insert_skill_heuristic(Some("git"), "low confidence tip", 0.3)
1856 .await
1857 .unwrap();
1858 store
1859 .insert_skill_heuristic(Some("git"), "high confidence tip", 0.9)
1860 .await
1861 .unwrap();
1862
1863 let rows = store.load_skill_heuristics("git", 0.5, 10).await.unwrap();
1864 assert_eq!(rows.len(), 1);
1865 assert_eq!(rows[0].1, "high confidence tip");
1866 }
1867
1868 #[tokio::test]
1869 async fn increment_heuristic_use_count_works() {
1870 let store = test_store().await;
1871
1872 let id = store
1873 .insert_skill_heuristic(Some("git"), "tip", 0.8)
1874 .await
1875 .unwrap();
1876
1877 store.increment_heuristic_use_count(id).await.unwrap();
1878 store.increment_heuristic_use_count(id).await.unwrap();
1879
1880 let rows = store.load_skill_heuristics("git", 0.0, 10).await.unwrap();
1881 assert_eq!(rows[0].3, 2); }
1883
1884 #[tokio::test]
1885 async fn load_all_heuristics_for_skill_exact_match() {
1886 let store = test_store().await;
1887
1888 store
1889 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1890 .await
1891 .unwrap();
1892 store
1893 .insert_skill_heuristic(Some("docker"), "docker tip", 0.8)
1894 .await
1895 .unwrap();
1896
1897 let rows = store
1898 .load_all_heuristics_for_skill(Some("git"))
1899 .await
1900 .unwrap();
1901 assert_eq!(rows.len(), 1);
1902 assert_eq!(rows[0].1, "git tip");
1903 }
1904
1905 #[tokio::test]
1906 async fn load_all_heuristics_for_skill_null() {
1907 let store = test_store().await;
1908
1909 store
1910 .insert_skill_heuristic(None, "general", 0.8)
1911 .await
1912 .unwrap();
1913 store
1914 .insert_skill_heuristic(Some("git"), "git tip", 0.8)
1915 .await
1916 .unwrap();
1917
1918 let rows = store.load_all_heuristics_for_skill(None).await.unwrap();
1919 assert_eq!(rows.len(), 1);
1920 assert_eq!(rows[0].1, "general");
1921 }
1922
1923 #[tokio::test]
1924 async fn skill_trust_default_is_quarantined() {
1925 let store = test_store().await;
1929
1930 zeph_db::query(sql!(
1932 "INSERT INTO skill_trust (skill_name, blake3_hash) VALUES ('test-arise', 'abc')"
1933 ))
1934 .execute(store.pool())
1935 .await
1936 .unwrap();
1937
1938 let trust: (String,) = zeph_db::query_as(sql!(
1939 "SELECT trust_level FROM skill_trust WHERE skill_name = 'test-arise'"
1940 ))
1941 .fetch_one(store.pool())
1942 .await
1943 .unwrap();
1944
1945 assert_eq!(
1946 trust.0, "quarantined",
1947 "schema default for skill_trust.trust_level must be 'quarantined'"
1948 );
1949 }
1950
1951 #[tokio::test]
1952 async fn save_and_activate_skill_version_activates_new() {
1953 let store = test_store().await;
1954
1955 let id = store
1956 .save_and_activate_skill_version(
1957 "git",
1958 1,
1959 "body v1",
1960 "Git helper",
1961 "manual",
1962 None,
1963 None,
1964 )
1965 .await
1966 .unwrap();
1967 assert!(id > 0);
1968
1969 let active = store.active_skill_version("git").await.unwrap().unwrap();
1970 assert_eq!(active.id, id);
1971 assert_eq!(active.version, 1);
1972 assert_eq!(active.body, "body v1");
1973 assert!(active.is_active);
1974 }
1975
1976 #[tokio::test]
1977 async fn save_and_activate_skill_version_deactivates_previous() {
1978 let store = test_store().await;
1979
1980 let v1 = store
1981 .save_and_activate_skill_version("git", 1, "v1", "desc", "manual", None, None)
1982 .await
1983 .unwrap();
1984
1985 let v2 = store
1986 .save_and_activate_skill_version("git", 2, "v2", "desc", "auto", None, Some(v1))
1987 .await
1988 .unwrap();
1989
1990 let versions = store.load_skill_versions("git").await.unwrap();
1991 assert_eq!(versions.len(), 2);
1992
1993 let old = versions.iter().find(|v| v.id == v1).unwrap();
1994 let new = versions.iter().find(|v| v.id == v2).unwrap();
1995 assert!(!old.is_active, "previous version must be deactivated");
1996 assert!(new.is_active, "new version must be active");
1997 }
1998
1999 #[tokio::test]
2000 async fn save_and_activate_skill_version_only_one_active() {
2001 let store = test_store().await;
2002
2003 let mut last_id = 0i64;
2004 for i in 1i64..=4 {
2005 last_id = store
2006 .save_and_activate_skill_version(
2007 "git",
2008 i,
2009 &format!("v{i}"),
2010 "desc",
2011 "auto",
2012 None,
2013 None,
2014 )
2015 .await
2016 .unwrap();
2017 }
2018
2019 let versions = store.load_skill_versions("git").await.unwrap();
2020 let active_count = versions.iter().filter(|v| v.is_active).count();
2021 assert_eq!(active_count, 1, "exactly one version must be active");
2022 assert_eq!(
2023 versions.iter().find(|v| v.is_active).unwrap().id,
2024 last_id,
2025 "the last saved version must be the active one"
2026 );
2027 }
2028}