use std::collections::HashSet;
use std::fs;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex, OnceLock};
use anyhow::{Context, Result, anyhow, bail};
use futures::StreamExt as _;
use tokio::sync::Semaphore;
use crate::agent::memory::{MemDocTier, MemoryDoc, MemoryStore};
use crate::provider::registry::ProviderRegistry;
use crate::provider::{
LlmProvider, LlmRequest, Message, MessageContent, Role, StreamEvent,
};
fn distill_lock() -> &'static Semaphore {
static LOCK: OnceLock<Semaphore> = OnceLock::new();
LOCK.get_or_init(|| Semaphore::new(1))
}
fn inflight_clusters() -> &'static Mutex<HashSet<u64>> {
static SET: OnceLock<Mutex<HashSet<u64>>> = OnceLock::new();
SET.get_or_init(|| Mutex::new(HashSet::new()))
}
pub fn cluster_fingerprint(doc_ids: &[String]) -> u64 {
let mut sorted: Vec<&str> = doc_ids.iter().map(String::as_str).collect();
sorted.sort_unstable();
let mut hasher = DefaultHasher::new();
for id in &sorted {
id.hash(&mut hasher);
}
hasher.finish()
}
pub struct ClusterGuard {
fingerprint: u64,
}
impl Drop for ClusterGuard {
fn drop(&mut self) {
if let Ok(mut set) = inflight_clusters().lock() {
set.remove(&self.fingerprint);
}
}
}
pub fn try_claim_cluster(doc_ids: &[String]) -> Option<ClusterGuard> {
let fingerprint = cluster_fingerprint(doc_ids);
let mut set = inflight_clusters().lock().ok()?;
if set.insert(fingerprint) {
Some(ClusterGuard { fingerprint })
} else {
None
}
}
pub async fn acquire_distill_permit() -> Result<tokio::sync::SemaphorePermit<'static>> {
distill_lock()
.acquire()
.await
.map_err(|e| anyhow!("distill semaphore closed: {e}"))
}
pub fn find_cluster(
store: &MemoryStore,
doc_id: &str,
scope: &str,
) -> Result<Option<Vec<MemoryDoc>>> {
let evo = crate::agent::evolution::evolution_config();
let source = store
.get_sync(doc_id)
.context("source doc not found in store")?
.clone();
if source.tier != MemDocTier::Core
|| source.scope != scope
|| source.tags.contains(&"crystallized".to_string())
{
return Ok(None);
}
let neighbours = store.find_near_duplicates(
doc_id,
Some(scope),
evo.cluster.similarity_threshold,
)?;
let mut cluster: Vec<MemoryDoc> = neighbours
.into_iter()
.filter(|(doc, _sim)| {
doc.tier == MemDocTier::Core
&& doc.scope == scope
&& !doc.tags.contains(&"crystallized".to_string())
})
.map(|(doc, _sim)| doc)
.collect();
cluster.insert(0, source);
if cluster.len() < evo.cluster.min_size {
return Ok(None);
}
Ok(Some(cluster))
}
pub fn build_distill_prompt(cluster: &[MemoryDoc]) -> String {
let mut prompt = String::with_capacity(8192);
prompt.push_str(
"You are a skill-engineering expert. Below are related memory documents \
from an AI agent's long-term memory store. Distill them into a single \
SKILL.md file following the Anthropic skill-creator standard.\n\n\
\
## SKILL.md Standard\n\
\
**Frontmatter** (required fields):\n\
```yaml\n\
---\n\
name: skill-name-in-kebab-case\n\
description: >\n\
What the skill does AND when to invoke it. Be slightly pushy so the\n\
agent does not undertrigger. Example: \"How to do X. Use this skill\n\
whenever the user asks about X, Y, or Z, even if not phrased explicitly.\"\n\
---\n\
```\n\n\
\
**Body** (Markdown, imperative language, under 500 lines):\n\
- Use numbered steps or headers to structure the workflow.\n\
- Explain *why* each step matters, not just *what* to do.\n\
- Include a short example (Input / Output) where it helps.\n\
- If the skill needs a reusable helper script, note it as:\n\
`See scripts/helper.py — run with: python scripts/helper.py <args>`\n\
(do NOT write the script here; the caller will create it separately)\n\
- If the skill references large external docs, note them as:\n\
`See references/guide.md for detailed field descriptions`\n\n\
\
**Rules**:\n\
- Do not invent information beyond what the memory documents contain.\n\
- Merge overlapping facts; prefer the most-accessed version.\n\
- Use imperative voice: \"Check the config\", not \"You should check\".\n\
- Avoid ALL-CAPS MUST/NEVER; explain reasoning instead.\n\
- Keep total length under 300 lines unless complexity demands more.\n\n\
\
=== MEMORY DOCUMENTS ===\n\n",
);
for (i, doc) in cluster.iter().enumerate() {
prompt.push_str(&format!(
"--- Memory {} (access_count={}) ---\nKind: {}\nText:\n{}\n\n",
i + 1,
doc.access_count,
doc.kind,
doc.text,
));
}
prompt.push_str(
"=== END OF MEMORIES ===\n\n\
Produce ONLY the SKILL.md content — frontmatter + body. \
No explanation, no commentary outside the file.",
);
prompt
}
pub fn write_skill(skills_dir: &Path, slug: &str, content: &str) -> Result<PathBuf> {
let mut attempt = 1u32;
let (dir, final_slug) = loop {
let candidate_slug = if attempt == 1 {
slug.to_owned()
} else {
format!("{slug}-{attempt}")
};
let candidate_dir = skills_dir.join(&candidate_slug);
if !candidate_dir.join("SKILL.md").exists() {
break (candidate_dir, candidate_slug);
}
attempt += 1;
if attempt > 99 {
bail!("write_skill: too many slug collisions for '{slug}'");
}
};
fs::create_dir_all(&dir)
.with_context(|| format!("failed to create skill directory: {}", dir.display()))?;
let path = dir.join("SKILL.md");
fs::write(&path, content)
.with_context(|| format!("failed to write SKILL.md at {}", path.display()))?;
if final_slug != slug {
tracing::info!(
requested = slug,
actual = %final_slug,
"skill slug collided, suffix appended"
);
}
Ok(path)
}
pub fn slugify(name: &str) -> String {
let lower = name.to_lowercase();
let slug: String = lower
.chars()
.map(|c| if c.is_alphanumeric() { c } else { '-' })
.collect();
let mut result = String::with_capacity(slug.len());
let mut prev_hyphen = true; for ch in slug.chars() {
if ch == '-' {
if !prev_hyphen {
result.push('-');
}
prev_hyphen = true;
} else {
result.push(ch);
prev_hyphen = false;
}
}
if result.ends_with('-') {
result.pop();
}
if result.is_empty() {
"unnamed-skill".to_owned()
} else {
result
}
}
pub async fn distill_with_llm(
prompt: &str,
provider: Arc<dyn LlmProvider>,
model: String,
) -> Result<String> {
let req = LlmRequest {
model,
messages: vec![Message {
role: Role::User,
content: MessageContent::Text(prompt.to_owned()),
}],
tools: Vec::new(),
system: None,
max_tokens: Some(4096),
temperature: Some(0.3),
frequency_penalty: None,
thinking_budget: None,
kv_cache_mode: 0,
session_key: None,
};
let mut stream = provider
.stream(req)
.await
.context("distill: provider stream failed")?;
let mut output = String::new();
while let Some(event) = stream.next().await {
match event {
Ok(StreamEvent::TextDelta(d)) => output.push_str(&d),
Ok(StreamEvent::ReasoningDelta(_)) => {}
Ok(StreamEvent::Done { .. }) => break,
Ok(StreamEvent::Error(msg)) => bail!("distill: provider error: {msg}"),
Ok(StreamEvent::ToolCall { .. }) => {} Err(e) => return Err(anyhow!("distill stream error: {e:#}")),
}
}
if output.trim().is_empty() {
bail!("distill: empty output from LLM");
}
Ok(output)
}
pub fn validate_skill_md(content: &str) -> Result<(String, String)> {
let trimmed = content.trim_start();
let rest = trimmed
.strip_prefix("---\n")
.or_else(|| trimmed.strip_prefix("---\r\n"))
.ok_or_else(|| anyhow!("SKILL.md must start with '---' frontmatter fence"))?;
let close_idx = rest
.find("\n---")
.ok_or_else(|| anyhow!("SKILL.md frontmatter has no closing '---'"))?;
let fm = &rest[..close_idx];
let parsed: serde_yaml_ng::Value = serde_yaml_ng::from_str(fm)
.map_err(|e| anyhow!("SKILL.md frontmatter YAML invalid: {e}"))?;
let name = parsed
.get("name")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|s| !s.is_empty())
.ok_or_else(|| anyhow!("SKILL.md frontmatter missing non-empty 'name'"))?
.to_owned();
let description = parsed
.get("description")
.and_then(|v| v.as_str())
.map(str::trim)
.filter(|s| !s.is_empty())
.ok_or_else(|| anyhow!("SKILL.md frontmatter missing non-empty 'description'"))?
.to_owned();
Ok((name, description))
}
pub async fn crystallize_one(
store: &Arc<tokio::sync::Mutex<MemoryStore>>,
doc_id: &str,
scope: &str,
providers: &Arc<ProviderRegistry>,
flash_model: &str,
skills_dir: &Path,
) -> Result<Option<PathBuf>> {
if !crate::agent::evolution::evolution_config().enabled {
return Ok(None);
}
let cluster = {
let s = store.lock().await;
match find_cluster(&s, doc_id, scope) {
Ok(Some(c)) => c,
Ok(None) => return Ok(None),
Err(e) => {
tracing::debug!(doc_id, "crystallization check failed: {e:#}");
return Ok(None);
}
}
};
let ids: Vec<String> = cluster.iter().map(|d| d.id.clone()).collect();
let _cluster_guard = match try_claim_cluster(&ids) {
Some(g) => g,
None => {
tracing::debug!(n = ids.len(), "cluster already in flight, skipping");
return Ok(None);
}
};
let prompt = build_distill_prompt(&cluster);
if flash_model.is_empty() {
tracing::debug!("crystallization: no flash model resolved, skipping");
return Ok(None);
}
let (provider_name, model_id) = providers.resolve_model(flash_model);
let provider_arc = match providers.get(provider_name) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
provider = provider_name,
"crystallization: provider not registered: {e:#}"
);
return Ok(None);
}
};
let _permit = match acquire_distill_permit().await {
Ok(p) => p,
Err(e) => {
tracing::warn!("crystallization: failed to acquire permit: {e:#}");
return Ok(None);
}
};
let skill_md = match distill_with_llm(&prompt, provider_arc, model_id.to_owned()).await {
Ok(md) => md,
Err(e) => {
tracing::warn!("crystallization: LLM distillation failed: {e:#}");
return Ok(None);
}
};
if let Err(e) = validate_skill_md(&skill_md) {
tracing::warn!("crystallization: invalid SKILL.md output: {e:#}");
return Ok(None);
}
let fallback = format!("auto-skill-{}", &doc_id[..8.min(doc_id.len())]);
let slug = extract_skill_slug(&skill_md, &fallback);
let path = write_skill(skills_dir, &slug, &skill_md)
.with_context(|| format!("write_skill failed for slug '{slug}'"))?;
tracing::info!(?path, slug = %slug, n = ids.len(), "crystallized memories into skill");
let mut s = store.lock().await;
for id in &ids {
if let Err(e) = s.tag_doc(id, "crystallized").await {
tracing::debug!(id, "tag_doc failed: {e:#}");
}
}
Ok(Some(path))
}
pub fn extract_skill_slug(skill_md: &str, fallback: &str) -> String {
let mut delimiters_seen = 0u8;
let mut in_frontmatter = false;
for line in skill_md.lines() {
let trimmed = line.trim();
if trimmed == "---" {
delimiters_seen += 1;
if delimiters_seen == 1 {
in_frontmatter = true;
continue;
}
break;
}
if !in_frontmatter {
continue;
}
if let Some(rest) = trimmed.strip_prefix("name:") {
let raw = rest.trim().trim_matches('"').trim_matches('\'').trim();
if !raw.is_empty() {
return slugify(raw);
}
}
}
slugify(fallback)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn slugify_basic() {
assert_eq!(slugify("Web Search Pattern"), "web-search-pattern");
}
#[test]
fn slugify_special_chars() {
assert_eq!(slugify(" LLM--Retry Logic! "), "llm-retry-logic");
}
#[test]
fn slugify_already_clean() {
assert_eq!(slugify("hello-world"), "hello-world");
}
#[test]
fn slugify_empty() {
assert_eq!(slugify(""), "unnamed-skill");
}
#[test]
fn extract_slug_from_frontmatter_unquoted() {
let md = "---\nname: web-search-helper\ndescription: foo\n---\nbody";
assert_eq!(extract_skill_slug(md, "fallback"), "web-search-helper");
}
#[test]
fn extract_slug_from_frontmatter_quoted() {
let md = "---\nname: \"Order Extractor\"\n---\nbody";
assert_eq!(extract_skill_slug(md, "fallback"), "order-extractor");
}
#[test]
fn extract_slug_falls_back_when_no_name() {
let md = "---\ndescription: foo\n---\nbody";
assert_eq!(extract_skill_slug(md, "Auto Skill"), "auto-skill");
}
#[test]
fn extract_slug_falls_back_when_no_frontmatter() {
let md = "just body, no frontmatter";
assert_eq!(extract_skill_slug(md, "Fallback Name"), "fallback-name");
}
#[test]
fn extract_slug_ignores_name_after_closing_delimiter() {
let md = "---\ndescription: foo\n---\nname: not-a-real-name\n";
assert_eq!(extract_skill_slug(md, "real"), "real");
}
#[test]
fn validate_accepts_well_formed() {
let md = "---\nname: my-skill\ndescription: Does X. Use when Y.\n---\nbody\n";
let (n, d) = validate_skill_md(md).expect("should be valid");
assert_eq!(n, "my-skill");
assert_eq!(d, "Does X. Use when Y.");
}
#[test]
fn validate_rejects_no_frontmatter() {
assert!(validate_skill_md("just body\n").is_err());
}
#[test]
fn validate_rejects_unclosed_frontmatter() {
assert!(validate_skill_md("---\nname: foo\n").is_err());
}
#[test]
fn validate_rejects_missing_name() {
let md = "---\ndescription: foo\n---\nbody";
assert!(validate_skill_md(md).is_err());
}
#[test]
fn validate_rejects_empty_name() {
let md = "---\nname: \"\"\ndescription: foo\n---\nbody";
assert!(validate_skill_md(md).is_err());
}
#[test]
fn validate_rejects_missing_description() {
let md = "---\nname: foo\n---\nbody";
assert!(validate_skill_md(md).is_err());
}
#[test]
fn validate_rejects_empty_description() {
let md = "---\nname: foo\ndescription: \" \"\n---\nbody";
assert!(validate_skill_md(md).is_err());
}
#[test]
fn validate_rejects_invalid_yaml() {
let md = "---\nname: foo\ndescription: [unclosed\n---\nbody";
assert!(validate_skill_md(md).is_err());
}
#[test]
fn cluster_fingerprint_is_order_invariant() {
let a = vec!["doc-1".to_owned(), "doc-2".to_owned(), "doc-3".to_owned()];
let b = vec!["doc-3".to_owned(), "doc-1".to_owned(), "doc-2".to_owned()];
assert_eq!(cluster_fingerprint(&a), cluster_fingerprint(&b));
}
#[test]
fn cluster_fingerprint_distinguishes_different_clusters() {
let a = vec!["doc-1".to_owned(), "doc-2".to_owned()];
let b = vec!["doc-1".to_owned(), "doc-3".to_owned()];
assert_ne!(cluster_fingerprint(&a), cluster_fingerprint(&b));
}
#[test]
fn try_claim_cluster_blocks_duplicates_and_releases_on_drop() {
let ids = vec![
"claim-test-aaa".to_owned(),
"claim-test-bbb".to_owned(),
"claim-test-ccc".to_owned(),
];
let g1 = try_claim_cluster(&ids).expect("first claim should win");
let g2 = try_claim_cluster(&ids);
assert!(g2.is_none(), "second claim while first held should fail");
drop(g1);
let g3 = try_claim_cluster(&ids).expect("claim should work after drop");
drop(g3);
}
#[test]
fn write_skill_appends_suffix_on_collision() {
let tmp = tempfile::tempdir().expect("tempdir");
let dir = tmp.path();
let p1 = write_skill(dir, "shared-name", "first\n").expect("first write");
let p2 = write_skill(dir, "shared-name", "second\n").expect("second write");
let p3 = write_skill(dir, "shared-name", "third\n").expect("third write");
assert_eq!(p1, dir.join("shared-name").join("SKILL.md"));
assert_eq!(p2, dir.join("shared-name-2").join("SKILL.md"));
assert_eq!(p3, dir.join("shared-name-3").join("SKILL.md"));
assert_eq!(fs::read_to_string(&p1).unwrap(), "first\n");
assert_eq!(fs::read_to_string(&p2).unwrap(), "second\n");
assert_eq!(fs::read_to_string(&p3).unwrap(), "third\n");
}
}