use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
use parking_lot::RwLock;
use crate::gateway::kumiho_client::{ItemResponse, KumihoClient, KumihoError};
use crate::skills::effectiveness::{EffectivenessScore, SkillEffectivenessProvider};
pub const DEFAULT_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
pub const DEFAULT_IMPROVEMENT_THRESHOLD: f64 = 0.4;
pub const DEFAULT_IMPROVEMENT_MIN_SAMPLES: u32 = 10;
pub const DEFAULT_REGRESSION_DROP: f64 = 0.15;
pub const DEFAULT_REGRESSION_MIN_SAMPLES: u32 = 10;
#[derive(Debug, Clone)]
pub struct SkillImprovementCandidate {
pub skill_name: String,
pub rate: f64,
pub total: u32,
}
#[derive(Debug, Clone)]
pub struct SkillRegressionCandidate {
pub skill_name: String,
pub current_revision_kref: String,
pub current_rate: f64,
pub current_total: u32,
pub previous_revision_kref: String,
pub previous_rate: f64,
pub previous_total: u32,
}
static GLOBAL_CACHE: OnceLock<Arc<EffectivenessCache>> = OnceLock::new();
pub fn set_global(cache: Arc<EffectivenessCache>) -> Result<(), &'static str> {
GLOBAL_CACHE
.set(cache)
.map_err(|_| "effectiveness_cache global already installed")
}
pub fn global() -> Option<&'static Arc<EffectivenessCache>> {
GLOBAL_CACHE.get()
}
pub fn global_provider() -> Option<&'static dyn SkillEffectivenessProvider> {
GLOBAL_CACHE
.get()
.map(|arc| arc.as_ref() as &dyn SkillEffectivenessProvider)
}
#[derive(Default)]
struct RevisionState {
current_kref: Option<String>,
previous_kref: Option<String>,
per_revision: HashMap<String, EffectivenessScore>,
}
pub struct EffectivenessCache {
scores: RwLock<HashMap<String, EffectivenessScore>>,
revision_state: RwLock<HashMap<String, RevisionState>>,
last_refresh: RwLock<Option<Instant>>,
}
impl EffectivenessCache {
pub fn new() -> Arc<Self> {
Arc::new(Self {
scores: RwLock::new(HashMap::new()),
revision_state: RwLock::new(HashMap::new()),
last_refresh: RwLock::new(None),
})
}
pub fn len(&self) -> usize {
self.scores.read().len()
}
pub fn is_empty(&self) -> bool {
self.scores.read().is_empty()
}
pub fn snapshot(&self) -> HashMap<String, EffectivenessScore> {
self.scores.read().clone()
}
pub fn age(&self) -> Option<Duration> {
self.last_refresh.read().map(|t| t.elapsed())
}
pub fn replace_scores(&self, scores: HashMap<String, EffectivenessScore>) {
*self.scores.write() = scores;
*self.last_refresh.write() = Some(Instant::now());
}
fn replace_revision_state(&self, state: HashMap<String, RevisionState>) {
*self.revision_state.write() = state;
}
#[cfg(test)]
fn install_test_revision_state(
&self,
skill_name: &str,
current_kref: &str,
previous_kref: &str,
per_revision: HashMap<String, EffectivenessScore>,
) {
let mut map = self.revision_state.write();
map.insert(
skill_name.to_string(),
RevisionState {
current_kref: Some(current_kref.to_string()),
previous_kref: Some(previous_kref.to_string()),
per_revision,
},
);
}
pub async fn refresh_for_skills(
&self,
client: &KumihoClient,
memory_project: &str,
skill_names: &[String],
) -> Result<(), KumihoError> {
let mut new_scores: HashMap<String, EffectivenessScore> = HashMap::new();
let mut new_revision_state: HashMap<String, RevisionState> = HashMap::new();
for name in skill_names {
let safe = sanitize_skill_name(name);
let space_path = format!("/{memory_project}/Skills/{safe}/Outcomes");
let items = match client.list_items(&space_path, false).await {
Ok(items) => items,
Err(KumihoError::Api { status: 404, .. }) => {
Vec::new()
}
Err(e) => {
tracing::warn!(
skill = %name,
space = %space_path,
error = %e,
"skill effectiveness refresh failed for one skill; \
excluding from this snapshot",
);
continue;
}
};
new_scores.insert(name.clone(), classify_outcomes(&items));
let item_kref = format!(
"kref://{memory_project}/Skills/{safe}.{kind}",
kind = crate::skills::registration::SKILL_ITEM_KIND,
);
let mut state = RevisionState {
per_revision: classify_outcomes_per_revision(&items),
..Default::default()
};
match client
.get_revision_by_tag(&item_kref, crate::skills::registration::PUBLISHED_TAG)
.await
{
Ok(rev) => state.current_kref = Some(rev.kref),
Err(KumihoError::Api { status: 404, .. }) => {
}
Err(e) => {
tracing::debug!(
skill = %name,
item_kref = %item_kref,
error = %e,
"regression refresh: published tag lookup failed; \
per-revision bucketing skipped this cycle",
);
}
}
match client
.get_revision_by_tag(
&item_kref,
crate::skills::registration::PREVIOUS_PUBLISHED_TAG,
)
.await
{
Ok(rev) => state.previous_kref = Some(rev.kref),
Err(KumihoError::Api { status: 404, .. }) => {
}
Err(e) => {
tracing::debug!(
skill = %name,
item_kref = %item_kref,
error = %e,
"regression refresh: previous_published tag lookup failed",
);
}
}
new_revision_state.insert(name.clone(), state);
}
self.replace_scores(new_scores);
self.replace_revision_state(new_revision_state);
for cand in self.improvement_candidates() {
tracing::warn!(
skill = %cand.skill_name,
rate = cand.rate,
total = cand.total,
"skill effectiveness: rolling success rate below threshold; \
candidate for SkillImprover",
);
}
for cand in self.regression_candidates() {
tracing::warn!(
skill = %cand.skill_name,
current_revision = %cand.current_revision_kref,
current_rate = cand.current_rate,
current_total = cand.current_total,
previous_revision = %cand.previous_revision_kref,
previous_rate = cand.previous_rate,
previous_total = cand.previous_total,
"skill effectiveness: revision regressed against predecessor; \
candidate for auto-rollback",
);
}
Ok(())
}
pub fn improvement_candidates(&self) -> Vec<SkillImprovementCandidate> {
let scores = self.scores.read();
let mut candidates: Vec<SkillImprovementCandidate> = scores
.iter()
.filter_map(|(name, score)| {
let rate = score.rate?;
if score.total < DEFAULT_IMPROVEMENT_MIN_SAMPLES {
return None;
}
if rate >= DEFAULT_IMPROVEMENT_THRESHOLD {
return None;
}
Some(SkillImprovementCandidate {
skill_name: name.clone(),
rate,
total: score.total,
})
})
.collect();
candidates.sort_by(|a, b| {
a.rate
.partial_cmp(&b.rate)
.unwrap_or(std::cmp::Ordering::Equal)
});
candidates
}
pub fn regression_candidates(&self) -> Vec<SkillRegressionCandidate> {
let revision_state = self.revision_state.read();
let mut candidates: Vec<SkillRegressionCandidate> = revision_state
.iter()
.filter_map(|(name, state)| {
let current_kref = state.current_kref.as_deref()?;
let previous_kref = state.previous_kref.as_deref()?;
if current_kref == previous_kref {
return None;
}
let current_score = state.per_revision.get(current_kref)?;
let previous_score = state.per_revision.get(previous_kref)?;
if current_score.total < DEFAULT_REGRESSION_MIN_SAMPLES {
return None;
}
let current_rate = current_score.rate?;
let previous_rate = previous_score.rate?;
if previous_rate - current_rate < DEFAULT_REGRESSION_DROP {
return None;
}
Some(SkillRegressionCandidate {
skill_name: name.clone(),
current_revision_kref: current_kref.to_string(),
current_rate,
current_total: current_score.total,
previous_revision_kref: previous_kref.to_string(),
previous_rate,
previous_total: previous_score.total,
})
})
.collect();
candidates.sort_by(|a, b| {
let drop_a = a.previous_rate - a.current_rate;
let drop_b = b.previous_rate - b.current_rate;
drop_b
.partial_cmp(&drop_a)
.unwrap_or(std::cmp::Ordering::Equal)
});
candidates
}
pub fn spawn_refresh_task(
self: Arc<Self>,
client: Arc<KumihoClient>,
memory_project: String,
skill_names: Vec<String>,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if let Err(e) = self
.refresh_for_skills(&client, &memory_project, &skill_names)
.await
{
tracing::warn!(
error = %e,
"effectiveness cache refresh cycle failed; will retry",
);
}
}
})
}
}
impl SkillEffectivenessProvider for EffectivenessCache {
fn score(&self, skill_name: &str) -> Option<EffectivenessScore> {
self.scores.read().get(skill_name).copied()
}
}
fn sanitize_skill_name(name: &str) -> String {
name.replace('/', "-")
}
pub(crate) fn classify_outcomes(items: &[ItemResponse]) -> EffectivenessScore {
let mut successes: u32 = 0;
let mut failures: u32 = 0;
for it in items {
if it.item_name.starts_with("ok-") {
successes = successes.saturating_add(1);
} else if it.item_name.starts_with("fail-") {
failures = failures.saturating_add(1);
}
}
let total = successes.saturating_add(failures);
let rate = if total == 0 {
None
} else {
Some(f64::from(successes) / f64::from(total))
};
EffectivenessScore { rate, total }
}
pub(crate) fn classify_outcomes_per_revision(
items: &[ItemResponse],
) -> HashMap<String, EffectivenessScore> {
let mut tallies: HashMap<String, (u32, u32)> = HashMap::new();
for it in items {
let Some(kref) = it.metadata.get("skill_kref") else {
continue;
};
if is_tag_pointer_kref(kref) {
continue;
}
let entry = tallies.entry(kref.clone()).or_insert((0, 0));
if it.item_name.starts_with("ok-") {
entry.0 = entry.0.saturating_add(1);
} else if it.item_name.starts_with("fail-") {
entry.1 = entry.1.saturating_add(1);
}
}
tallies
.into_iter()
.map(|(kref, (successes, failures))| {
let total = successes.saturating_add(failures);
let rate = if total == 0 {
None
} else {
Some(f64::from(successes) / f64::from(total))
};
(kref, EffectivenessScore { rate, total })
})
.collect()
}
fn is_tag_pointer_kref(kref: &str) -> bool {
let Some((_, query)) = kref.split_once('?') else {
return false;
};
query.split('&').any(|part| part.starts_with("t="))
}
#[cfg(test)]
mod tests {
use super::*;
fn item(item_name: &str) -> ItemResponse {
ItemResponse {
kref: format!("kref://test/{item_name}"),
name: item_name.to_string(),
item_name: item_name.to_string(),
kind: "skill_outcome".to_string(),
deprecated: false,
created_at: None,
metadata: HashMap::new(),
}
}
#[test]
fn classify_outcomes_counts_ok_and_fail_prefixes() {
let items = vec![
item("ok-skill-on-mar-1-aa"),
item("ok-skill-on-mar-2-bb"),
item("ok-skill-on-mar-3-cc"),
item("fail-skill-on-mar-4-dd"),
item("fail-skill-on-mar-5-ee"),
];
let score = classify_outcomes(&items);
assert_eq!(score.total, 5);
assert_eq!(score.rate, Some(0.6));
}
#[test]
fn classify_outcomes_ignores_unprefixed_items() {
let items = vec![
item("ok-skill-aa"),
item("legacy-undated-bb"), item("fail-skill-cc"),
];
let score = classify_outcomes(&items);
assert_eq!(score.total, 2);
assert_eq!(score.rate, Some(0.5));
}
#[test]
fn classify_outcomes_returns_none_rate_for_empty() {
let score = classify_outcomes(&[]);
assert_eq!(score.total, 0);
assert_eq!(score.rate, None);
}
#[test]
fn cache_score_returns_none_before_refresh() {
let cache = EffectivenessCache::new();
assert!(cache.is_empty());
assert!(cache.score("anything").is_none());
assert!(cache.age().is_none());
}
#[test]
fn cache_replace_scores_publishes_atomically() {
let cache = EffectivenessCache::new();
let mut scores = HashMap::new();
scores.insert(
"alpha".to_string(),
EffectivenessScore {
rate: Some(0.9),
total: 10,
},
);
scores.insert(
"beta".to_string(),
EffectivenessScore {
rate: Some(0.1),
total: 5,
},
);
cache.replace_scores(scores);
let alpha = cache.score("alpha").expect("alpha present");
assert_eq!(alpha.rate, Some(0.9));
assert_eq!(alpha.total, 10);
let beta = cache.score("beta").expect("beta present");
assert_eq!(beta.total, 5);
assert_eq!(cache.len(), 2);
assert!(cache.age().is_some());
}
#[test]
fn cache_implements_skill_effectiveness_provider() {
let cache = EffectivenessCache::new();
let mut scores = HashMap::new();
scores.insert(
"foo".to_string(),
EffectivenessScore {
rate: Some(0.75),
total: 4,
},
);
cache.replace_scores(scores);
let provider: &dyn SkillEffectivenessProvider = &*cache;
assert!(provider.score("foo").is_some());
assert!(provider.score("missing").is_none());
}
#[test]
fn improvement_candidates_flags_below_threshold_with_enough_samples() {
let cache = EffectivenessCache::new();
let mut scores = HashMap::new();
scores.insert(
"regressed".to_string(),
EffectivenessScore {
rate: Some(0.3),
total: 20,
},
);
scores.insert(
"healthy".to_string(),
EffectivenessScore {
rate: Some(0.6),
total: 20,
},
);
cache.replace_scores(scores);
let candidates = cache.improvement_candidates();
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].skill_name, "regressed");
assert!((candidates[0].rate - 0.3).abs() < 1e-9);
assert_eq!(candidates[0].total, 20);
}
#[test]
fn improvement_candidates_skips_low_sample_skills() {
let cache = EffectivenessCache::new();
let mut scores = HashMap::new();
scores.insert(
"noisy".to_string(),
EffectivenessScore {
rate: Some(0.0),
total: 5,
},
);
cache.replace_scores(scores);
assert!(cache.improvement_candidates().is_empty());
}
#[test]
fn improvement_candidates_skips_no_data_skills() {
let cache = EffectivenessCache::new();
let mut scores = HashMap::new();
scores.insert(
"fresh".to_string(),
EffectivenessScore {
rate: None,
total: 0,
},
);
cache.replace_scores(scores);
assert!(cache.improvement_candidates().is_empty());
}
#[test]
fn improvement_candidates_sorted_worst_first() {
let cache = EffectivenessCache::new();
let mut scores = HashMap::new();
scores.insert(
"mid".to_string(),
EffectivenessScore {
rate: Some(0.3),
total: 50,
},
);
scores.insert(
"worst".to_string(),
EffectivenessScore {
rate: Some(0.05),
total: 30,
},
);
scores.insert(
"barely".to_string(),
EffectivenessScore {
rate: Some(0.39),
total: 100,
},
);
cache.replace_scores(scores);
let candidates = cache.improvement_candidates();
assert_eq!(candidates.len(), 3);
assert_eq!(candidates[0].skill_name, "worst");
assert_eq!(candidates[1].skill_name, "mid");
assert_eq!(candidates[2].skill_name, "barely");
}
#[test]
fn sanitize_skill_name_replaces_slashes() {
assert_eq!(sanitize_skill_name("plain"), "plain");
assert_eq!(sanitize_skill_name("with/slash"), "with-slash");
assert_eq!(sanitize_skill_name("a/b/c"), "a-b-c");
}
fn item_with_kref(item_name: &str, skill_kref: &str) -> ItemResponse {
let mut metadata = HashMap::new();
metadata.insert("skill_kref".to_string(), skill_kref.to_string());
ItemResponse {
kref: format!("kref://test/outcomes/{item_name}"),
name: item_name.to_string(),
item_name: item_name.to_string(),
kind: "skill_outcome".to_string(),
deprecated: false,
created_at: None,
metadata,
}
}
#[test]
fn classify_outcomes_per_revision_buckets_by_skill_kref() {
let r1 = "kref://m/Skills/foo.skilldef?r=1";
let r2 = "kref://m/Skills/foo.skilldef?r=2";
let items = vec![
item_with_kref("ok-a", r1),
item_with_kref("ok-b", r1),
item_with_kref("fail-c", r1),
item_with_kref("ok-d", r2),
item_with_kref("fail-e", r2),
item_with_kref("fail-f", r2),
];
let scores = classify_outcomes_per_revision(&items);
assert_eq!(scores.len(), 2);
let s1 = scores.get(r1).expect("r1 present");
assert_eq!(s1.total, 3);
assert!((s1.rate.unwrap() - (2.0 / 3.0)).abs() < 1e-9);
let s2 = scores.get(r2).expect("r2 present");
assert_eq!(s2.total, 3);
assert!((s2.rate.unwrap() - (1.0 / 3.0)).abs() < 1e-9);
}
#[test]
fn classify_outcomes_per_revision_drops_items_without_skill_kref() {
let r1 = "kref://m/Skills/foo.skilldef?r=1";
let items = vec![
item_with_kref("ok-a", r1),
item("ok-legacy"),
];
let scores = classify_outcomes_per_revision(&items);
assert_eq!(scores.len(), 1);
assert_eq!(scores.get(r1).unwrap().total, 1);
}
#[test]
fn classify_outcomes_per_revision_drops_tag_pointer_krefs() {
let tag_pointer = "kref://m/Skills/foo.skilldef?t=published";
let r1 = "kref://m/Skills/foo.skilldef?r=1";
let items = vec![
item_with_kref("ok-a", tag_pointer),
item_with_kref("fail-b", tag_pointer),
item_with_kref("ok-c", r1),
];
let scores = classify_outcomes_per_revision(&items);
assert_eq!(scores.len(), 1);
assert!(scores.contains_key(r1));
assert!(!scores.contains_key(tag_pointer));
}
#[test]
fn is_tag_pointer_kref_recognises_t_query() {
assert!(is_tag_pointer_kref(
"kref://m/Skills/foo.skilldef?t=published"
));
assert!(is_tag_pointer_kref(
"kref://m/Skills/foo.skilldef?r=1&t=stable"
));
assert!(!is_tag_pointer_kref("kref://m/Skills/foo.skilldef?r=1"));
assert!(!is_tag_pointer_kref("kref://m/Skills/foo.skilldef"));
}
fn rev_score(rate: f64, total: u32) -> EffectivenessScore {
EffectivenessScore {
rate: Some(rate),
total,
}
}
#[test]
fn regression_candidates_flags_drop_above_threshold() {
let cache = EffectivenessCache::new();
let mut per_rev = HashMap::new();
per_rev.insert("kref://r/2".to_string(), rev_score(0.30, 12));
per_rev.insert("kref://r/1".to_string(), rev_score(0.80, 50));
cache.install_test_revision_state("regressed", "kref://r/2", "kref://r/1", per_rev);
let candidates = cache.regression_candidates();
assert_eq!(candidates.len(), 1);
let c = &candidates[0];
assert_eq!(c.skill_name, "regressed");
assert_eq!(c.current_revision_kref, "kref://r/2");
assert!((c.current_rate - 0.30).abs() < 1e-9);
assert_eq!(c.previous_revision_kref, "kref://r/1");
assert!((c.previous_rate - 0.80).abs() < 1e-9);
}
#[test]
fn regression_candidates_skips_when_drop_below_threshold() {
let cache = EffectivenessCache::new();
let mut per_rev = HashMap::new();
per_rev.insert("kref://r/2".to_string(), rev_score(0.55, 30));
per_rev.insert("kref://r/1".to_string(), rev_score(0.60, 50));
cache.install_test_revision_state("flat", "kref://r/2", "kref://r/1", per_rev);
assert!(cache.regression_candidates().is_empty());
}
#[test]
fn regression_candidates_skips_when_current_below_min_samples() {
let cache = EffectivenessCache::new();
let mut per_rev = HashMap::new();
per_rev.insert("kref://r/2".to_string(), rev_score(0.0, 5));
per_rev.insert("kref://r/1".to_string(), rev_score(0.9, 100));
cache.install_test_revision_state("noisy", "kref://r/2", "kref://r/1", per_rev);
assert!(cache.regression_candidates().is_empty());
}
#[test]
fn regression_candidates_skips_when_no_previous_published() {
let cache = EffectivenessCache::new();
let mut per_rev = HashMap::new();
per_rev.insert("kref://r/1".to_string(), rev_score(0.10, 50));
let mut state_map = cache.revision_state.write();
state_map.insert(
"first-publish".to_string(),
RevisionState {
current_kref: Some("kref://r/1".to_string()),
previous_kref: None,
per_revision: per_rev,
},
);
drop(state_map);
assert!(cache.regression_candidates().is_empty());
}
#[test]
fn regression_candidates_skips_when_current_equals_previous() {
let cache = EffectivenessCache::new();
let mut per_rev = HashMap::new();
per_rev.insert("kref://r/1".to_string(), rev_score(0.10, 50));
cache.install_test_revision_state("post-rollback", "kref://r/1", "kref://r/1", per_rev);
assert!(cache.regression_candidates().is_empty());
}
#[test]
fn regression_candidates_sorted_worst_drop_first() {
let cache = EffectivenessCache::new();
let mut per_rev_a = HashMap::new();
per_rev_a.insert("kref://a/2".to_string(), rev_score(0.50, 20));
per_rev_a.insert("kref://a/1".to_string(), rev_score(0.70, 50));
cache.install_test_revision_state("a", "kref://a/2", "kref://a/1", per_rev_a);
let mut per_rev_b = HashMap::new();
per_rev_b.insert("kref://b/2".to_string(), rev_score(0.30, 20));
per_rev_b.insert("kref://b/1".to_string(), rev_score(0.80, 50));
cache.install_test_revision_state("b", "kref://b/2", "kref://b/1", per_rev_b);
let candidates = cache.regression_candidates();
assert_eq!(candidates.len(), 2);
assert_eq!(candidates[0].skill_name, "b");
assert_eq!(candidates[1].skill_name, "a");
}
}