use std::collections::BTreeSet;
use std::path::{Path, PathBuf};
use anyhow::{Context, Result};
use serde::Serialize;
use super::heuristic::extraction::{self, ExtractionKind, ExtractionSignal};
use super::heuristic::self_tag::{self, SelfTagSignal};
use super::redact;
use super::transcript::{self, TranscriptEntry};
use crate::distill_queue;
use crate::domain::MemoryScope;
use crate::lifecycle_service::LifecycleService;
use crate::lifecycle_store::{ProposeMemoryRequest, RecordMemoryRequest, TransitionMetadata};
use crate::vault_writer;
#[derive(Debug, Clone, Default)]
struct ContextSignals {
entities: Vec<String>,
tags: Vec<String>,
#[allow(dead_code)]
triggers: Vec<String>,
related_files: Vec<String>,
applies_to: Vec<String>,
project_id: Option<String>,
}
#[derive(Debug, Clone)]
struct SamplingCandidate {
kind: ExtractionKind,
summary: String,
entities: Vec<String>,
tags: Vec<String>,
triggers: Vec<String>,
}
fn infer_context_signals(
cwd: &Path,
transcript_path: Option<&Path>,
project_id: Option<&str>,
) -> ContextSignals {
let mut signals = ContextSignals {
project_id: project_id.map(|s| s.to_string()),
..Default::default()
};
if let Some(project_name) = extract_project_name(cwd) {
signals.applies_to.push(project_name);
}
if let Some(path) = transcript_path
&& let Ok(entries) = transcript::read_tail(path, 300)
{
let mut file_set: BTreeSet<String> = BTreeSet::new();
for entry in &entries {
if !matches!(entry, TranscriptEntry::User { .. }) {
continue;
}
let text = entry.text();
for file_path in extract_file_paths(&text) {
file_set.insert(file_path);
}
}
signals.related_files = file_set.into_iter().take(10).collect();
}
if let Some(project_name) = extract_project_name(cwd) {
signals.entities.push(project_name);
}
signals
}
fn extract_project_name(cwd: &Path) -> Option<String> {
let skip_names = [
"src",
"lib",
"bin",
"target",
"build",
"dist",
"node_modules",
];
for component in cwd.components().rev() {
if let std::path::Component::Normal(name) = component {
let name_str = name.to_str().unwrap_or("");
if !name_str.is_empty() && !name_str.starts_with('.') && !skip_names.contains(&name_str)
{
return Some(name_str.to_string());
}
}
}
None
}
fn extract_file_paths(text: &str) -> Vec<String> {
let mut paths = Vec::new();
for word in text.split_whitespace() {
let cleaned = word.trim_matches(|c: char| c == '`' || c == '\'' || c == '"' || c == ',');
if looks_like_file_path(cleaned) {
paths.push(cleaned.to_string());
}
}
paths
}
fn looks_like_file_path(s: &str) -> bool {
if s.len() < 4 || s.len() > 200 {
return false;
}
let has_slash = s.contains('/');
let has_extension = s.ends_with(".rs")
|| s.ends_with(".ts")
|| s.ends_with(".tsx")
|| s.ends_with(".js")
|| s.ends_with(".jsx")
|| s.ends_with(".py")
|| s.ends_with(".go")
|| s.ends_with(".toml")
|| s.ends_with(".yaml")
|| s.ends_with(".yml")
|| s.ends_with(".json")
|| s.ends_with(".md");
if !has_slash && !has_extension {
return false;
}
if s.starts_with("http://") || s.starts_with("https://") {
return false;
}
s.chars()
.all(|c| c.is_alphanumeric() || "/_-./".contains(c))
}
fn tags_for_kind(kind: ExtractionKind) -> Vec<String> {
match kind {
ExtractionKind::Decision => vec!["architecture".to_string()],
ExtractionKind::Incident => vec!["debugging".to_string()],
ExtractionKind::BehaviorPattern => vec!["workflow".to_string()],
}
}
fn triggers_from_summary(summary: &str) -> Vec<String> {
let stop_words = [
"the", "and", "for", "that", "this", "with", "from", "are", "was", "were", "been", "have",
"has", "had", "not", "but", "all", "can", "will", "just", "more", "when", "what", "how",
"use", "used", "using",
];
summary
.split_whitespace()
.map(|w| w.trim_matches(|c: char| !c.is_alphanumeric()))
.filter(|w| w.chars().count() >= 3)
.filter(|w| !stop_words.contains(&w.to_lowercase().as_str()))
.take(3)
.map(|w| w.to_lowercase())
.collect()
}
pub use crate::sampling::{NoopSamplingClient, SamplingClient, SamplingError, SamplingFuture};
#[derive(Debug, Clone)]
pub struct DistillRequest {
pub config_path: PathBuf,
pub cwd: PathBuf,
pub transcript_path: Option<PathBuf>,
pub project_id: Option<String>,
pub actor: String,
pub source_ref_self_tag: String,
pub source_ref_extraction: String,
}
impl DistillRequest {
pub fn new(config_path: PathBuf, cwd: PathBuf, transcript_path: Option<PathBuf>) -> Self {
Self {
config_path,
cwd,
transcript_path,
project_id: None,
actor: "spool-distill".to_string(),
source_ref_self_tag: "distill:self-tag".to_string(),
source_ref_extraction: "distill:extraction".to_string(),
}
}
pub fn with_project_id(mut self, project_id: Option<String>) -> Self {
self.project_id = project_id;
self
}
pub fn with_actor(mut self, actor: impl Into<String>) -> Self {
self.actor = actor.into();
self
}
pub fn with_source_refs(
mut self,
self_tag: impl Into<String>,
extraction: impl Into<String>,
) -> Self {
self.source_ref_self_tag = self_tag.into();
self.source_ref_extraction = extraction.into();
self
}
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct DistillReport {
pub transcript_path: Option<PathBuf>,
pub queue_drained: usize,
pub signals_detected: usize,
pub signals_redacted_dropped: usize,
pub signals_duplicate_dropped: usize,
pub signals_persisted: Vec<String>,
pub candidates_detected: usize,
pub candidates_redacted_dropped: usize,
pub candidates_duplicate_dropped: usize,
pub candidates_persisted: Vec<String>,
pub sampling_attempted: bool,
pub fallback_used: String,
}
pub fn run(request: DistillRequest) -> Result<DistillReport> {
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.context("building distill pipeline runtime")?;
runtime.block_on(run_with_sampling(request, &NoopSamplingClient))
}
pub async fn run_with_sampling(
request: DistillRequest,
sampling: &(dyn SamplingClient + Send),
) -> Result<DistillReport> {
let runtime_dir = ensure_runtime_dir(&request.cwd)?;
let drained = distill_queue::drain_all(&runtime_dir).unwrap_or_default();
let mut report = DistillReport {
transcript_path: request.transcript_path.clone(),
queue_drained: drained.len(),
sampling_attempted: false,
fallback_used: "tier1_heuristic".to_string(),
..Default::default()
};
let context = infer_context_signals(
&request.cwd,
request.transcript_path.as_deref(),
request.project_id.as_deref(),
);
let transcript_excerpt = request
.transcript_path
.as_deref()
.map(transcript_excerpt_for_sampling)
.unwrap_or_default();
let mut sampling_summaries: Vec<String> = Vec::new();
if sampling.is_available() {
report.sampling_attempted = true;
let prompt = build_sampling_prompt(&drained, &transcript_excerpt);
match sampling.create_message(&prompt).await {
Ok(response_text) => {
let parsed = parse_sampling_candidates(&response_text);
let written = persist_sampling_candidates(
&request.config_path,
parsed,
&request.actor,
&request.source_ref_extraction,
&context,
&mut report,
);
sampling_summaries.extend(written);
report.fallback_used = if sampling_summaries.is_empty() {
"sampling_no_candidates+tier1_combined".to_string()
} else {
"sampling+tier1_combined".to_string()
};
}
Err(err) => {
report.fallback_used = format!("sampling_failed:{err}+tier1_fallback");
}
}
}
let spool_root = request
.config_path
.parent()
.unwrap_or_else(|| Path::new("."));
let user_rules = crate::rules::load(spool_root);
let signals = match request.transcript_path.as_deref() {
Some(path) => collect_user_self_tags(path, &user_rules.extraction),
None => Vec::new(),
};
report.signals_detected = signals.len();
let mut self_tag_summaries: Vec<String> = Vec::new();
if !signals.is_empty() {
let service = LifecycleService::new();
let existing_summaries = load_existing_wakeup_summaries(service, &request.config_path);
let mut written: Vec<String> = Vec::new();
for signal in signals {
let redacted = redact::redact(&signal.content);
if !redacted.is_clean() {
report.signals_redacted_dropped += 1;
continue;
}
if is_suppressed(&redacted.redacted, &user_rules.suppress) {
report.signals_redacted_dropped += 1;
continue;
}
let summary_lower = redacted.redacted.to_lowercase();
if existing_summaries
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower))
|| written
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower))
{
report.signals_duplicate_dropped += 1;
continue;
}
match persist_self_tag(
service,
&request.config_path,
&signal,
&redacted.redacted,
&request.actor,
&request.source_ref_self_tag,
&context,
) {
Ok(record_id) => {
written.push(summary_lower);
self_tag_summaries.push(redacted.redacted);
report.signals_persisted.push(record_id);
}
Err(err) => {
eprintln!(
"[spool distill] failed to persist self-tag '{}': {:#}",
signal.trigger, err
);
}
}
}
}
if let Some(path) = request.transcript_path.as_deref() {
let candidates = collect_user_extraction(path);
report.candidates_detected += candidates.len();
if !candidates.is_empty() {
let mut suppress = self_tag_summaries.clone();
suppress.extend(sampling_summaries.iter().cloned());
persist_candidates(
&request.config_path,
candidates,
&suppress,
&request.actor,
&request.source_ref_extraction,
&context,
&mut report,
);
}
}
Ok(report)
}
fn build_sampling_prompt(
drained: &[distill_queue::DistillSignal],
transcript_excerpt: &str,
) -> String {
let mut buf = String::with_capacity(2048);
buf.push_str("You are spool-distill, a memory-extraction helper.\n");
buf.push_str(
"Your job is to pull out user-relevant *memories* (decisions, \
preferences, recurring incidents, durable facts) from the \
working session below, and return them as JSON.\n\n",
);
buf.push_str("## Pending tool-use signals\n");
if drained.is_empty() {
buf.push_str("(none)\n");
} else {
for (i, sig) in drained.iter().enumerate() {
let payload = sig.payload.as_deref().unwrap_or("");
let tool = sig.tool_name.as_deref().unwrap_or("?");
let head = first_chars(payload, 200);
buf.push_str(&format!("{}. [{}] {}\n", i + 1, tool, head));
}
}
buf.push_str("\n## Recent session excerpt\n");
if transcript_excerpt.is_empty() {
buf.push_str("(no transcript provided)\n");
} else {
buf.push_str(transcript_excerpt);
if !transcript_excerpt.ends_with('\n') {
buf.push('\n');
}
}
buf.push_str(
"\n## Output schema\n\
Return a JSON array (no prose, no markdown fences). Each \
element must be:\n\
{\n \"kind\": \"behavior\"|\"incident\"|\"decision\",\n\
\"summary\": string, // <= 200 chars\n\
\"entities\": [string], // tools, libraries, concepts, APIs mentioned (optional)\n\
\"tags\": [string], // semantic categories: database, testing, deployment, etc. (optional)\n\
\"triggers\": [string] // keywords that should trigger retrieval of this memory (optional)\n\
}\n\
If you have nothing worth extracting, return [].\n\
Do not include API keys, tokens, or secrets in the summary.\n",
);
buf
}
fn transcript_excerpt_for_sampling(path: &Path) -> String {
const MAX_TURNS: usize = 12;
const MAX_CHARS: usize = 4000;
let entries = match transcript::read_tail(path, MAX_TURNS * 3) {
Ok(e) => e,
Err(_) => return String::new(),
};
let tail: Vec<&TranscriptEntry> = entries.iter().rev().take(MAX_TURNS).collect();
let mut out = String::new();
for entry in tail.into_iter().rev() {
let role = match entry {
TranscriptEntry::User { .. } => "user",
TranscriptEntry::Assistant { .. } => "assistant",
_ => continue,
};
let text = entry.text();
let redacted = redact::redact(&text).redacted;
let head = first_chars(&redacted, 400);
out.push_str(&format!("[{role}] {head}\n"));
if out.len() > MAX_CHARS {
break;
}
}
out
}
fn parse_sampling_candidates(response: &str) -> Vec<SamplingCandidate> {
let trimmed = strip_code_fence(response.trim());
let parsed: Result<Vec<serde_json::Value>, _> = serde_json::from_str(trimmed);
let array = match parsed {
Ok(arr) => arr,
Err(_) => return Vec::new(),
};
let mut out = Vec::with_capacity(array.len());
for item in array {
let summary = item
.get("summary")
.and_then(serde_json::Value::as_str)
.unwrap_or("")
.trim();
if summary.is_empty() {
continue;
}
let kind_str = item
.get("kind")
.and_then(serde_json::Value::as_str)
.unwrap_or("incident");
let kind = match kind_str {
"behavior" => ExtractionKind::BehaviorPattern,
"decision" => ExtractionKind::Decision,
_ => ExtractionKind::Incident,
};
let entities = parse_string_array_from_json(&item, "entities");
let tags = parse_string_array_from_json(&item, "tags");
let triggers = parse_string_array_from_json(&item, "triggers");
out.push(SamplingCandidate {
kind,
summary: summary.to_string(),
entities,
tags,
triggers,
});
}
out
}
fn parse_string_array_from_json(value: &serde_json::Value, key: &str) -> Vec<String> {
value
.get(key)
.and_then(serde_json::Value::as_array)
.map(|arr| {
arr.iter()
.filter_map(serde_json::Value::as_str)
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect()
})
.unwrap_or_default()
}
fn strip_code_fence(s: &str) -> &str {
let s = s.trim();
if let Some(rest) = s.strip_prefix("```json") {
return rest.trim_start().trim_end_matches("```").trim();
}
if let Some(rest) = s.strip_prefix("```") {
return rest.trim_start().trim_end_matches("```").trim();
}
s
}
fn persist_sampling_candidates(
config_path: &Path,
parsed: Vec<SamplingCandidate>,
actor: &str,
source_ref: &str,
context: &ContextSignals,
report: &mut DistillReport,
) -> Vec<String> {
if parsed.is_empty() {
return Vec::new();
}
let service = LifecycleService::new();
let existing_pending = load_existing_pending_summaries(service, config_path);
let mut written: Vec<String> = Vec::new();
let mut written_summaries: Vec<String> = Vec::new();
for candidate in parsed {
report.candidates_detected += 1;
let redacted = redact::redact(&candidate.summary);
if !redacted.is_clean() {
report.candidates_redacted_dropped += 1;
continue;
}
let summary_lower = redacted.redacted.to_lowercase();
let dup = existing_pending
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower))
|| written
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower));
if dup {
report.candidates_duplicate_dropped += 1;
continue;
}
let entities = if candidate.entities.is_empty() {
context.entities.clone()
} else {
candidate.entities.clone()
};
let tags = if candidate.tags.is_empty() {
let mut t = tags_for_kind(candidate.kind);
t.extend(context.tags.iter().cloned());
t
} else {
candidate.tags.clone()
};
let triggers = if candidate.triggers.is_empty() {
triggers_from_summary(&redacted.redacted)
} else {
candidate.triggers.clone()
};
let title = build_sampling_title(&candidate);
let request = ProposeMemoryRequest {
title,
summary: redacted.redacted.clone(),
memory_type: candidate.kind.memory_type().to_string(),
scope: MemoryScope::Project,
source_ref: source_ref.to_string(),
project_id: None,
user_id: None,
sensitivity: None,
metadata: TransitionMetadata {
actor: Some(actor.to_string()),
reason: Some(format!("sampling extraction ({:?})", candidate.kind)),
evidence_refs: Vec::new(),
},
entities,
tags,
triggers,
related_files: context.related_files.clone(),
related_records: Vec::new(),
supersedes: None,
applies_to: context.applies_to.clone(),
valid_until: None,
};
match service.propose_ai(config_path, request) {
Ok(result) => {
vault_writer::writeback_from_config(config_path, &result.entry);
written.push(summary_lower);
written_summaries.push(redacted.redacted);
report.candidates_persisted.push(result.entry.record_id);
}
Err(err) => {
eprintln!(
"[spool distill] failed to persist sampling candidate ({:?}): {:#}",
candidate.kind, err
);
}
}
}
written_summaries
}
fn build_sampling_title(candidate: &SamplingCandidate) -> String {
let head = first_chars(&candidate.summary, 60);
format!("[{}] {}", candidate.kind.memory_type(), head)
}
fn ensure_runtime_dir(cwd: &Path) -> Result<PathBuf> {
let dir = cwd.join(".spool");
if !dir.exists() {
std::fs::create_dir_all(&dir)
.with_context(|| format!("creating runtime dir {}", dir.display()))?;
}
Ok(dir)
}
fn collect_user_self_tags(
path: &Path,
user_rules: &[crate::rules::ExtractionRule],
) -> Vec<SelfTagSignal> {
let entries = match transcript::read_tail(path, 500) {
Ok(e) => e,
Err(err) => {
eprintln!(
"[spool distill] failed to read transcript {}: {:#}",
path.display(),
err
);
return Vec::new();
}
};
let mut signals = Vec::new();
for entry in entries {
if !matches!(entry, TranscriptEntry::User { .. }) {
continue;
}
signals.extend(self_tag::detect_with_user_rules(
entry.authored_text(),
user_rules,
));
}
signals
}
fn collect_user_extraction(path: &Path) -> Vec<ExtractionSignal> {
let entries = match transcript::read_tail(path, 500) {
Ok(e) => e,
Err(_) => return Vec::new(),
};
let user_texts: Vec<&str> = entries
.iter()
.filter(|e| matches!(e, TranscriptEntry::User { .. }))
.map(|e| e.authored_text())
.collect();
extraction::detect(&user_texts)
}
fn persist_candidates(
config_path: &Path,
candidates: Vec<ExtractionSignal>,
self_tag_summaries: &[String],
actor: &str,
source_ref: &str,
context: &ContextSignals,
report: &mut DistillReport,
) {
let service = LifecycleService::new();
let existing_pending = load_existing_pending_summaries(service, config_path);
let mut written: Vec<String> = Vec::new();
for signal in candidates {
let redacted = redact::redact(&signal.summary);
if !redacted.is_clean() {
report.candidates_redacted_dropped += 1;
continue;
}
let summary_lower = redacted.redacted.to_lowercase();
let dup = existing_pending
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower))
|| written
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower))
|| self_tag_summaries
.iter()
.any(|s| s.eq_ignore_ascii_case(&summary_lower));
if dup {
report.candidates_duplicate_dropped += 1;
continue;
}
match persist_extraction(
service,
config_path,
&signal,
&redacted.redacted,
actor,
source_ref,
context,
) {
Ok(record_id) => {
written.push(summary_lower);
report.candidates_persisted.push(record_id);
}
Err(err) => {
eprintln!(
"[spool distill] failed to persist candidate ({:?}): {:#}",
signal.kind, err
);
}
}
}
}
fn is_suppressed(text: &str, suppress_rules: &[crate::rules::SuppressRule]) -> bool {
if suppress_rules.is_empty() {
return false;
}
let text_lower = text.to_lowercase();
for rule in suppress_rules {
if rule.pattern.is_empty() {
continue;
}
match regex::Regex::new(&rule.pattern) {
Ok(re) => {
if re.is_match(&text_lower) {
return true;
}
}
Err(_) => {
if text_lower.contains(&rule.pattern.to_lowercase()) {
return true;
}
}
}
}
false
}
fn load_existing_wakeup_summaries(service: LifecycleService, config_path: &Path) -> Vec<String> {
match service.load_workbench(config_path) {
Ok(snap) => snap
.wakeup_ready
.into_iter()
.map(|e| e.record.summary.to_lowercase())
.collect(),
Err(err) => {
eprintln!(
"[spool distill] failed to load wakeup-ready snapshot: {:#}",
err
);
Vec::new()
}
}
}
fn load_existing_pending_summaries(service: LifecycleService, config_path: &Path) -> Vec<String> {
match service.load_workbench(config_path) {
Ok(snap) => snap
.pending_review
.into_iter()
.map(|e| e.record.summary.to_lowercase())
.collect(),
Err(err) => {
eprintln!(
"[spool distill] failed to load pending-review snapshot: {:#}",
err
);
Vec::new()
}
}
}
fn persist_self_tag(
service: LifecycleService,
config_path: &Path,
signal: &SelfTagSignal,
summary: &str,
actor: &str,
source_ref: &str,
context: &ContextSignals,
) -> Result<String> {
let title = build_self_tag_title(signal);
let triggers = triggers_from_summary(summary);
let mut tags = tags_for_kind(match signal.kind {
super::heuristic::self_tag::SelfTagKind::Decision => ExtractionKind::Decision,
_ => ExtractionKind::BehaviorPattern,
});
tags.extend(context.tags.iter().cloned());
let request = RecordMemoryRequest {
title,
summary: summary.to_string(),
memory_type: signal.kind.memory_type().to_string(),
scope: if context.project_id.is_some() {
MemoryScope::Project
} else {
MemoryScope::User
},
source_ref: source_ref.to_string(),
project_id: context.project_id.clone(),
user_id: None,
sensitivity: None,
metadata: TransitionMetadata {
actor: Some(actor.to_string()),
reason: Some(format!("self-tag detected: {}", signal.trigger)),
evidence_refs: Vec::new(),
},
entities: context.entities.clone(),
tags,
triggers,
related_files: context.related_files.clone(),
related_records: Vec::new(),
supersedes: None,
applies_to: context.applies_to.clone(),
valid_until: None,
};
let result = service.record_manual(config_path, request)?;
vault_writer::writeback_from_config(config_path, &result.entry);
Ok(result.entry.record_id)
}
fn persist_extraction(
service: LifecycleService,
config_path: &Path,
signal: &ExtractionSignal,
summary: &str,
actor: &str,
source_ref: &str,
context: &ContextSignals,
) -> Result<String> {
let title = build_extraction_title(signal);
let triggers = triggers_from_summary(summary);
let mut tags = tags_for_kind(signal.kind);
tags.extend(context.tags.iter().cloned());
let request = ProposeMemoryRequest {
title,
summary: summary.to_string(),
memory_type: signal.kind.memory_type().to_string(),
scope: if context.project_id.is_some() {
MemoryScope::Project
} else {
MemoryScope::User
},
source_ref: source_ref.to_string(),
project_id: context.project_id.clone(),
user_id: None,
sensitivity: None,
metadata: TransitionMetadata {
actor: Some(actor.to_string()),
reason: Some(format!(
"extraction heuristic ({:?}) hits={}",
signal.kind,
signal.evidence_indices.len()
)),
evidence_refs: Vec::new(),
},
entities: context.entities.clone(),
tags,
triggers,
related_files: context.related_files.clone(),
related_records: Vec::new(),
supersedes: None,
applies_to: context.applies_to.clone(),
valid_until: None,
};
let result = service.propose_ai(config_path, request)?;
vault_writer::writeback_from_config(config_path, &result.entry);
Ok(result.entry.record_id)
}
fn build_self_tag_title(signal: &SelfTagSignal) -> String {
let head = first_chars(&signal.content, 60);
format!("[{}] {}", signal.trigger, head)
}
fn build_extraction_title(signal: &ExtractionSignal) -> String {
let head = first_chars(&signal.summary, 60);
format!("[{}] {}", signal.kind.memory_type(), head)
}
fn first_chars(s: &str, max: usize) -> String {
let mut out = String::new();
for (i, ch) in s.chars().enumerate() {
if i >= max {
out.push('…');
break;
}
out.push(ch);
}
out
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::{Value, json};
use std::fs;
use tempfile::tempdir;
fn fixture_config(temp: &tempfile::TempDir) -> PathBuf {
let cfg = temp.path().join("spool.toml");
fs::write(&cfg, "[vault]\nroot = \"/tmp\"\n").unwrap();
cfg
}
fn write_transcript(path: &Path, entries: &[Value]) {
let mut body = String::new();
for e in entries {
body.push_str(&e.to_string());
body.push('\n');
}
fs::write(path, body).unwrap();
}
#[test]
fn run_without_transcript_returns_empty_report() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let report = run(DistillRequest::new(cfg, cwd, None)).unwrap();
assert_eq!(report.signals_detected, 0);
assert_eq!(report.candidates_detected, 0);
assert!(!report.sampling_attempted);
assert_eq!(report.fallback_used, "tier1_heuristic");
}
#[test]
fn run_persists_self_tag_signal_with_custom_source_ref() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[json!({
"type": "user",
"message": {"role": "user", "content": "记一下: cargo install 是默认"}
})],
);
let request = DistillRequest::new(cfg.clone(), cwd, Some(transcript))
.with_actor("test-actor")
.with_source_refs("test:self-tag", "test:extraction");
let report = run(request).unwrap();
assert_eq!(report.signals_persisted.len(), 1);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
assert_eq!(snap.wakeup_ready.len(), 1);
assert_eq!(
snap.wakeup_ready[0].record.origin.source_ref,
"test:self-tag"
);
assert_eq!(
snap.wakeup_ready[0].metadata.actor.as_deref(),
Some("test-actor")
);
}
#[test]
fn run_persists_extraction_candidate_with_custom_source_ref() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[
json!({"type":"user","message":{"role":"user","content":"试一下"}}),
json!({"type":"user","message":{"role":"user","content":"还是错了"}}),
json!({"type":"user","message":{"role":"user","content":"又失败了"}}),
],
);
let request = DistillRequest::new(cfg.clone(), cwd, Some(transcript))
.with_actor("mcp")
.with_source_refs("mcp:self-tag", "mcp:extraction");
let report = run(request).unwrap();
assert_eq!(report.candidates_persisted.len(), 1);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
assert_eq!(snap.pending_review.len(), 1);
assert_eq!(
snap.pending_review[0].record.origin.source_ref,
"mcp:extraction"
);
}
#[test]
fn run_drops_self_tag_with_secret_and_keeps_pipeline_alive() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[json!({
"type": "user",
"message": {
"role": "user",
"content": "记一下: prod token sk-abcDEF1234567890ABCDEFGHIJ stays here"
}
})],
);
let report = run(DistillRequest::new(cfg.clone(), cwd, Some(transcript))).unwrap();
assert_eq!(report.signals_detected, 1);
assert_eq!(report.signals_redacted_dropped, 1);
assert!(report.signals_persisted.is_empty());
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
assert!(snap.wakeup_ready.is_empty());
}
#[test]
fn run_drains_queue_even_without_transcript() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let runtime_dir = cwd.join(".spool");
fs::create_dir_all(&runtime_dir).unwrap();
distill_queue::append(
&runtime_dir,
&distill_queue::DistillSignal {
recorded_at: 1,
tool_name: Some("Bash".into()),
cwd: cwd.display().to_string(),
payload: Some("ls".into()),
},
distill_queue::DEFAULT_LRU_CAP,
)
.unwrap();
let report = run(DistillRequest::new(cfg, cwd, None)).unwrap();
assert_eq!(report.queue_drained, 1);
assert!(distill_queue::peek_all(&runtime_dir).unwrap().is_empty());
}
struct FakeSamplingClient {
available: bool,
outcome: std::sync::Mutex<Result<String, SamplingError>>,
}
impl SamplingClient for FakeSamplingClient {
fn is_available(&self) -> bool {
self.available
}
fn create_message<'a>(&'a self, _prompt: &'a str) -> SamplingFuture<'a> {
let outcome = self.outcome.lock().unwrap().clone();
Box::pin(async move { outcome })
}
}
fn block_on<F: std::future::Future>(fut: F) -> F::Output {
tokio::runtime::Builder::new_current_thread()
.build()
.unwrap()
.block_on(fut)
}
#[test]
fn noop_sampling_client_reports_unavailable_and_keeps_tier1_label() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let report = block_on(run_with_sampling(
DistillRequest::new(cfg, cwd, None),
&NoopSamplingClient,
))
.unwrap();
assert!(!report.sampling_attempted);
assert_eq!(report.fallback_used, "tier1_heuristic");
}
#[test]
fn run_with_sampling_records_failure_and_falls_back_to_tier1() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[json!({
"type": "user",
"message": {"role": "user", "content": "记一下: sampling fallback path"}
})],
);
let fake = FakeSamplingClient {
available: true,
outcome: std::sync::Mutex::new(Err(SamplingError::Rejected("user denied".into()))),
};
let report = block_on(run_with_sampling(
DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
&fake,
))
.unwrap();
assert!(report.sampling_attempted);
assert!(
report.fallback_used.contains("sampling_failed"),
"fallback label must mention sampling failure: {}",
report.fallback_used
);
assert!(
report.fallback_used.contains("rejected"),
"fallback label must surface rejection cause: {}",
report.fallback_used
);
assert_eq!(report.signals_persisted.len(), 1);
}
#[test]
fn run_with_sampling_writes_candidates_from_llm_response() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[json!({
"type": "user",
"message": {"role": "user", "content": "we keep retrying after failures"}
})],
);
let llm_json = r#"[
{"kind":"behavior","summary":"prefers cargo install + ~/.cargo/bin"},
{"kind":"decision","summary":"all sampling writes start as candidate"},
{"kind":"incident","summary":"target/debug binary kept getting wiped"}
]"#;
let fake = FakeSamplingClient {
available: true,
outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
};
let report = block_on(run_with_sampling(
DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
&fake,
))
.unwrap();
assert!(report.sampling_attempted);
assert_eq!(
report.candidates_persisted.len(),
3,
"all 3 LLM candidates must be persisted: {:?}",
report.candidates_persisted
);
assert!(
report.fallback_used.starts_with("sampling+tier1_combined"),
"fallback label must reflect successful sampling: {}",
report.fallback_used
);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
assert_eq!(snap.pending_review.len(), 3);
let memory_types: Vec<&str> = snap
.pending_review
.iter()
.map(|e| e.record.memory_type.as_str())
.collect();
assert!(memory_types.contains(&"behavior_pattern"));
assert!(memory_types.contains(&"decision"));
assert!(memory_types.contains(&"incident"));
}
#[test]
fn run_with_sampling_tolerates_fenced_json_response() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(&transcript, &[]);
let fake = FakeSamplingClient {
available: true,
outcome: std::sync::Mutex::new(Ok(
"```json\n[{\"kind\":\"decision\",\"summary\":\"fenced\"}]\n```".to_string(),
)),
};
let report = block_on(run_with_sampling(
DistillRequest::new(cfg, cwd, Some(transcript)),
&fake,
))
.unwrap();
assert_eq!(report.candidates_persisted.len(), 1);
}
#[test]
fn run_with_sampling_drops_secret_carrying_candidates() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let llm_json =
r#"[{"kind":"decision","summary":"prod token sk-abcDEF1234567890ABCDEFGHIJ leaked"}]"#;
let fake = FakeSamplingClient {
available: true,
outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
};
let report = block_on(run_with_sampling(
DistillRequest::new(cfg, cwd, None),
&fake,
))
.unwrap();
assert!(report.sampling_attempted);
assert_eq!(report.candidates_persisted.len(), 0);
assert_eq!(report.candidates_redacted_dropped, 1);
}
#[test]
fn self_tag_should_populate_context_signals_from_cwd() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("my-project");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[json!({
"type": "user",
"message": {"role": "user", "content": "记一下: 用 cargo test 跑测试"}
})],
);
let report = run(DistillRequest::new(cfg.clone(), cwd, Some(transcript))).unwrap();
assert_eq!(report.signals_persisted.len(), 1);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
let record = &snap.wakeup_ready[0].record;
assert!(
record.applies_to.iter().any(|a| a == "my-project"),
"applies_to should contain project name: {:?}",
record.applies_to
);
assert!(
!record.triggers.is_empty(),
"triggers should be populated from summary"
);
assert!(
record.entities.iter().any(|e| e == "my-project"),
"entities should contain project name: {:?}",
record.entities
);
}
#[test]
fn sampling_should_use_llm_provided_structured_fields() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("repo");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(&transcript, &[]);
let llm_json = r#"[{
"kind": "decision",
"summary": "use SQLite for local storage",
"entities": ["SQLite", "rusqlite"],
"tags": ["database", "storage"],
"triggers": ["sqlite", "local storage", "database"]
}]"#;
let fake = FakeSamplingClient {
available: true,
outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
};
let report = block_on(run_with_sampling(
DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
&fake,
))
.unwrap();
assert_eq!(report.candidates_persisted.len(), 1);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
let record = &snap.pending_review[0].record;
assert_eq!(record.entities, vec!["SQLite", "rusqlite"]);
assert_eq!(record.tags, vec!["database", "storage"]);
assert_eq!(record.triggers, vec!["sqlite", "local storage", "database"]);
}
#[test]
fn sampling_should_fallback_to_inferred_fields_when_llm_omits_them() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("spool");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(&transcript, &[]);
let llm_json = r#"[{"kind":"behavior","summary":"prefers cargo install for binaries"}]"#;
let fake = FakeSamplingClient {
available: true,
outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
};
let report = block_on(run_with_sampling(
DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
&fake,
))
.unwrap();
assert_eq!(report.candidates_persisted.len(), 1);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
let record = &snap.pending_review[0].record;
assert!(
record.entities.iter().any(|e| e == "spool"),
"entities should be inferred from cwd: {:?}",
record.entities
);
assert!(
record.tags.iter().any(|t| t == "workflow"),
"tags should be inferred from kind: {:?}",
record.tags
);
assert!(
!record.triggers.is_empty(),
"triggers should be inferred from summary"
);
assert!(
record.applies_to.iter().any(|a| a == "spool"),
"applies_to should contain project name: {:?}",
record.applies_to
);
}
#[test]
fn extraction_candidate_should_populate_context_signals() {
let temp = tempdir().unwrap();
let cfg = fixture_config(&temp);
let cwd = temp.path().join("my-app");
fs::create_dir_all(&cwd).unwrap();
let transcript = temp.path().join("session.jsonl");
write_transcript(
&transcript,
&[
json!({"type":"user","message":{"role":"user","content":"试一下 src/main.rs"}}),
json!({"type":"user","message":{"role":"user","content":"还是错了"}}),
json!({"type":"user","message":{"role":"user","content":"又失败了"}}),
],
);
let report = run(DistillRequest::new(cfg.clone(), cwd, Some(transcript))).unwrap();
assert_eq!(report.candidates_persisted.len(), 1);
let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
let record = &snap.pending_review[0].record;
assert!(
record.applies_to.iter().any(|a| a == "my-app"),
"applies_to should contain project name: {:?}",
record.applies_to
);
assert!(
record.tags.iter().any(|t| t == "debugging"),
"tags should include debugging for incident: {:?}",
record.tags
);
assert!(
record.related_files.iter().any(|f| f == "src/main.rs"),
"related_files should contain src/main.rs: {:?}",
record.related_files
);
}
#[test]
fn infer_context_signals_should_extract_project_name() {
let signals = infer_context_signals(Path::new("/Users/dev/Work/my-project"), None, None);
assert_eq!(signals.applies_to, vec!["my-project"]);
assert_eq!(signals.entities, vec!["my-project"]);
}
#[test]
fn infer_context_signals_should_skip_dot_directories() {
let signals = infer_context_signals(Path::new("/Users/dev/.hidden"), None, None);
assert_eq!(signals.applies_to, vec!["dev"]);
}
#[test]
fn extract_file_paths_should_find_source_paths() {
let text = "check src/engine/scorer.rs and also lib/utils.ts for the fix";
let paths = extract_file_paths(text);
assert!(paths.contains(&"src/engine/scorer.rs".to_string()));
assert!(paths.contains(&"lib/utils.ts".to_string()));
}
#[test]
fn extract_file_paths_should_reject_urls() {
let text = "see https://example.com/path/to/file.rs for docs";
let paths = extract_file_paths(text);
assert!(paths.is_empty());
}
#[test]
fn triggers_from_summary_should_extract_significant_words() {
let triggers = triggers_from_summary("prefer cargo install for binary distribution");
assert!(triggers.contains(&"prefer".to_string()));
assert!(triggers.contains(&"cargo".to_string()));
assert!(triggers.contains(&"install".to_string()));
assert!(!triggers.contains(&"for".to_string()));
}
#[test]
fn parse_sampling_candidates_should_handle_new_format_with_structured_fields() {
let response = r#"[{
"kind": "decision",
"summary": "use tokio for async",
"entities": ["tokio", "async-std"],
"tags": ["async", "runtime"],
"triggers": ["tokio", "async runtime"]
}]"#;
let candidates = parse_sampling_candidates(response);
assert_eq!(candidates.len(), 1);
assert_eq!(candidates[0].entities, vec!["tokio", "async-std"]);
assert_eq!(candidates[0].tags, vec!["async", "runtime"]);
assert_eq!(candidates[0].triggers, vec!["tokio", "async runtime"]);
}
#[test]
fn parse_sampling_candidates_should_handle_old_format_without_structured_fields() {
let response = r#"[{"kind": "incident", "summary": "build failed twice"}]"#;
let candidates = parse_sampling_candidates(response);
assert_eq!(candidates.len(), 1);
assert!(candidates[0].entities.is_empty());
assert!(candidates[0].tags.is_empty());
assert!(candidates[0].triggers.is_empty());
}
}