Skip to main content

spool/distill/
pipeline.rs

1//! Distill pipeline service — the shared core that turns a session
2//! transcript + a queue of post-tool-use signals into ledger writes.
3//!
4//! ## Why a separate service module?
5//! Two surfaces want this exact pipeline:
6//! 1. **Stop hook** (`hook_runtime::stop`) — fires automatically at
7//!    session end.
8//! 2. **MCP `memory_distill_pending` tool** (R4a) — fires on demand
9//!    when the AI client asks spool to drain pending signals into
10//!    the ledger.
11//!
12//! Both surfaces need identical semantics: same heuristics, same
13//! redaction, same dedupe, same accepted/candidate split. We extract
14//! the pipeline here so the two surfaces stay in lockstep — diverging
15//! distill behavior between Stop hook and MCP would surprise users
16//! ("why did saying X create a memory yesterday but not today?").
17//!
18//! ## Pipeline
19//! Same flow as documented in `hook_runtime::stop`:
20//! 1. Drain `<cwd>/.spool/distill-pending.queue` (R4b will start
21//!    consuming the drained payloads; R4a still discards them).
22//! 2. Read transcript jsonl at `request.transcript_path`. Skip the
23//!    heuristic step entirely when the path is missing — produces an
24//!    empty report.
25//! 3. Run self-tag heuristics over user-authored entries → write
26//!    `accepted` records via `LifecycleService::record_manual`.
27//!    Drop signals that contain secrets or duplicate existing
28//!    wakeup-ready summaries.
29//! 4. Run extraction heuristics over the same user stream → write
30//!    `candidate` records via `LifecycleService::propose_ai`. Skip
31//!    candidates whose summary already became a self-tag this
32//!    session, or that match an existing pending-review record.
33//!
34//! ## R4 forward-compatibility
35//! [`DistillReport`] carries `sampling_attempted` + `fallback_used`
36//! fields that R4a always sets to `false` / `"tier1_heuristic"`.
37//! R4b will populate them from the real `sampling/createMessage`
38//! reverse-call result.
39
40use std::collections::BTreeSet;
41use std::path::{Path, PathBuf};
42
43use anyhow::{Context, Result};
44use serde::Serialize;
45
46use super::heuristic::extraction::{self, ExtractionKind, ExtractionSignal};
47use super::heuristic::self_tag::{self, SelfTagSignal};
48use super::redact;
49use super::transcript::{self, TranscriptEntry};
50use crate::distill_queue;
51use crate::domain::MemoryScope;
52use crate::lifecycle_service::LifecycleService;
53use crate::lifecycle_store::{ProposeMemoryRequest, RecordMemoryRequest, TransitionMetadata};
54use crate::vault_writer;
55
56// ---- Context signals inference ----
57
58/// Structured context signals inferred from the working session's cwd,
59/// transcript, and signal content. Computed once per pipeline run and
60/// threaded into all persist calls so every distilled memory carries
61/// useful retrieval metadata from day one.
62#[derive(Debug, Clone, Default)]
63struct ContextSignals {
64    entities: Vec<String>,
65    tags: Vec<String>,
66    #[allow(dead_code)]
67    triggers: Vec<String>,
68    related_files: Vec<String>,
69    applies_to: Vec<String>,
70    /// Project ID from config, used to bind distilled memories to the
71    /// current project so they surface in project-scoped retrieval.
72    project_id: Option<String>,
73}
74
75/// Sampling-derived candidate with optional structured fields parsed
76/// from the LLM response. Falls back to empty vecs when the LLM
77/// returns the old `{kind, summary}` format.
78#[derive(Debug, Clone)]
79struct SamplingCandidate {
80    kind: ExtractionKind,
81    summary: String,
82    entities: Vec<String>,
83    tags: Vec<String>,
84    triggers: Vec<String>,
85}
86
87/// Infer context signals from the working directory and transcript.
88/// Simple heuristics — no LLM, no network.
89fn infer_context_signals(
90    cwd: &Path,
91    transcript_path: Option<&Path>,
92    project_id: Option<&str>,
93) -> ContextSignals {
94    let mut signals = ContextSignals {
95        project_id: project_id.map(|s| s.to_string()),
96        ..Default::default()
97    };
98
99    // applies_to: extract project name from cwd (last meaningful segment)
100    if let Some(project_name) = extract_project_name(cwd) {
101        signals.applies_to.push(project_name);
102    }
103
104    // related_files: scan transcript user messages for file paths
105    if let Some(path) = transcript_path
106        && let Ok(entries) = transcript::read_tail(path, 300)
107    {
108        let mut file_set: BTreeSet<String> = BTreeSet::new();
109        for entry in &entries {
110            if !matches!(entry, TranscriptEntry::User { .. }) {
111                continue;
112            }
113            let text = entry.text();
114            for file_path in extract_file_paths(&text) {
115                file_set.insert(file_path);
116            }
117        }
118        signals.related_files = file_set.into_iter().take(10).collect();
119    }
120
121    // entities: extract from cwd segments
122    if let Some(project_name) = extract_project_name(cwd) {
123        signals.entities.push(project_name);
124    }
125
126    signals
127}
128
129/// Extract the project name from a cwd path. Takes the last meaningful
130/// directory segment, skipping common non-informative names.
131fn extract_project_name(cwd: &Path) -> Option<String> {
132    let skip_names = [
133        "src",
134        "lib",
135        "bin",
136        "target",
137        "build",
138        "dist",
139        "node_modules",
140    ];
141    for component in cwd.components().rev() {
142        if let std::path::Component::Normal(name) = component {
143            let name_str = name.to_str().unwrap_or("");
144            if !name_str.is_empty() && !name_str.starts_with('.') && !skip_names.contains(&name_str)
145            {
146                return Some(name_str.to_string());
147            }
148        }
149    }
150    None
151}
152
153/// Extract file paths from text using simple heuristics. Looks for
154/// patterns like `src/...`, `*.rs`, `*.ts`, paths with `/` that look
155/// like source files.
156fn extract_file_paths(text: &str) -> Vec<String> {
157    let mut paths = Vec::new();
158    for word in text.split_whitespace() {
159        let cleaned = word.trim_matches(|c: char| c == '`' || c == '\'' || c == '"' || c == ',');
160        if looks_like_file_path(cleaned) {
161            paths.push(cleaned.to_string());
162        }
163    }
164    paths
165}
166
167/// Heuristic: does this token look like a source file path?
168fn looks_like_file_path(s: &str) -> bool {
169    if s.len() < 4 || s.len() > 200 {
170        return false;
171    }
172    // Must contain a slash or a known extension
173    let has_slash = s.contains('/');
174    let has_extension = s.ends_with(".rs")
175        || s.ends_with(".ts")
176        || s.ends_with(".tsx")
177        || s.ends_with(".js")
178        || s.ends_with(".jsx")
179        || s.ends_with(".py")
180        || s.ends_with(".go")
181        || s.ends_with(".toml")
182        || s.ends_with(".yaml")
183        || s.ends_with(".yml")
184        || s.ends_with(".json")
185        || s.ends_with(".md");
186    if !has_slash && !has_extension {
187        return false;
188    }
189    // Reject URLs
190    if s.starts_with("http://") || s.starts_with("https://") {
191        return false;
192    }
193    // Must look path-like (alphanumeric + common path chars)
194    s.chars()
195        .all(|c| c.is_alphanumeric() || "/_-./".contains(c))
196}
197
198/// Infer tags from memory_type. Maps extraction kinds to semantic
199/// categories useful for retrieval.
200fn tags_for_kind(kind: ExtractionKind) -> Vec<String> {
201    match kind {
202        ExtractionKind::Decision => vec!["architecture".to_string()],
203        ExtractionKind::Incident => vec!["debugging".to_string()],
204        ExtractionKind::BehaviorPattern => vec!["workflow".to_string()],
205    }
206}
207
208/// Extract trigger keywords from a summary. Takes the first few
209/// significant words (>= 3 chars, not stop words).
210fn triggers_from_summary(summary: &str) -> Vec<String> {
211    let stop_words = [
212        "the", "and", "for", "that", "this", "with", "from", "are", "was", "were", "been", "have",
213        "has", "had", "not", "but", "all", "can", "will", "just", "more", "when", "what", "how",
214        "use", "used", "using",
215    ];
216    summary
217        .split_whitespace()
218        .map(|w| w.trim_matches(|c: char| !c.is_alphanumeric()))
219        .filter(|w| w.chars().count() >= 3)
220        .filter(|w| !stop_words.contains(&w.to_lowercase().as_str()))
221        .take(3)
222        .map(|w| w.to_lowercase())
223        .collect()
224}
225
226/// Boxed future returned by [`SamplingClient::create_message`]. We
227/// hand-roll the box (rather than pulling in `async-trait`) so the
228/// trait stays dyn-compatible — the pipeline always takes a
229/// `&dyn SamplingClient` so it can swap Noop / Mcp / Fake clients
230/// behind the same call site.
231pub use crate::sampling::{NoopSamplingClient, SamplingClient, SamplingError, SamplingFuture};
232
233/// Distill pipeline inputs. Caller is responsible for resolving cwd /
234/// transcript_path before calling.
235#[derive(Debug, Clone)]
236pub struct DistillRequest {
237    pub config_path: PathBuf,
238    pub cwd: PathBuf,
239    /// Optional. When `None`, the heuristic step is skipped (queue
240    /// is still drained).
241    pub transcript_path: Option<PathBuf>,
242    /// Project ID resolved from config for the current cwd. When set,
243    /// distilled memories are bound to this project so they surface in
244    /// project-scoped retrieval. When `None`, memories are written with
245    /// `scope=User` so they still appear as cross-project candidates.
246    pub project_id: Option<String>,
247    /// Identifies which surface invoked the pipeline. Recorded in
248    /// the lifecycle metadata `actor` field for audit.
249    pub actor: String,
250    /// Recorded in lifecycle metadata `source_ref`. e.g.
251    /// `"hook:stop:self-tag"` (hook) vs. `"mcp:memory_distill_pending"`
252    /// (MCP tool).
253    pub source_ref_self_tag: String,
254    pub source_ref_extraction: String,
255}
256
257impl DistillRequest {
258    pub fn new(config_path: PathBuf, cwd: PathBuf, transcript_path: Option<PathBuf>) -> Self {
259        Self {
260            config_path,
261            cwd,
262            transcript_path,
263            project_id: None,
264            actor: "spool-distill".to_string(),
265            source_ref_self_tag: "distill:self-tag".to_string(),
266            source_ref_extraction: "distill:extraction".to_string(),
267        }
268    }
269
270    pub fn with_project_id(mut self, project_id: Option<String>) -> Self {
271        self.project_id = project_id;
272        self
273    }
274
275    pub fn with_actor(mut self, actor: impl Into<String>) -> Self {
276        self.actor = actor.into();
277        self
278    }
279
280    pub fn with_source_refs(
281        mut self,
282        self_tag: impl Into<String>,
283        extraction: impl Into<String>,
284    ) -> Self {
285        self.source_ref_self_tag = self_tag.into();
286        self.source_ref_extraction = extraction.into();
287        self
288    }
289}
290
291#[derive(Debug, Clone, Default, Serialize)]
292pub struct DistillReport {
293    pub transcript_path: Option<PathBuf>,
294    pub queue_drained: usize,
295    pub signals_detected: usize,
296    pub signals_redacted_dropped: usize,
297    pub signals_duplicate_dropped: usize,
298    pub signals_persisted: Vec<String>,
299    pub candidates_detected: usize,
300    pub candidates_redacted_dropped: usize,
301    pub candidates_duplicate_dropped: usize,
302    pub candidates_persisted: Vec<String>,
303    /// True when the pipeline attempted MCP sampling reverse-call
304    /// (R4b). Always false for R4a.
305    pub sampling_attempted: bool,
306    /// Tier label that produced the candidates / accepted records.
307    /// `"tier1_heuristic"` for R4a; R4b may emit
308    /// `"sampling+tier1_fallback"` etc.
309    pub fallback_used: String,
310}
311
312/// Service entrypoint — pure function over the lifecycle
313/// service + queue + transcript. Caller threads `LifecycleService`
314/// (currently a unit struct, but we keep it threaded for future
315/// dependency injection).
316///
317/// Synchronous shim for surfaces with no live MCP session (Stop hook,
318/// CLI). Internally builds a single-thread tokio runtime and drives
319/// [`run_with_sampling`] with [`NoopSamplingClient`]. Surfaces that
320/// already run inside a tokio runtime (the MCP `tools/call` path)
321/// must call [`run_with_sampling`] directly to avoid nested
322/// `block_on` panics.
323pub fn run(request: DistillRequest) -> Result<DistillReport> {
324    let runtime = tokio::runtime::Builder::new_current_thread()
325        .build()
326        .context("building distill pipeline runtime")?;
327    runtime.block_on(run_with_sampling(request, &NoopSamplingClient))
328}
329
330/// Service entrypoint with explicit MCP sampling reverse-call
331/// client. Pipeline order:
332///
333/// 1. Drain `<cwd>/.spool/distill-pending.queue` — drained signals
334///    feed both the sampling prompt context and the report counter.
335/// 2. If `sampling.is_available()`, issue a reverse-call with the
336///    structured prompt. On success, parse the LLM's JSON response
337///    into `ExtractionSignal`-shaped candidates and persist them via
338///    `LifecycleService::propose_ai`. On failure, label
339///    `fallback_used = "sampling_failed:<reason>+tier1_fallback"` and
340///    fall through to Tier 1.
341/// 3. Run Tier 1 self-tag heuristics → `accepted` records.
342/// 4. Run Tier 1 extraction heuristics → `candidate` records (skipped
343///    for summaries already produced by sampling in step 2 to avoid
344///    double-write).
345///
346/// Tier 1 always runs underneath sampling — an unhelpful or partial
347/// LLM response cannot suppress the baseline heuristic signal.
348pub async fn run_with_sampling(
349    request: DistillRequest,
350    sampling: &(dyn SamplingClient + Send),
351) -> Result<DistillReport> {
352    let runtime_dir = ensure_runtime_dir(&request.cwd)?;
353    let drained = distill_queue::drain_all(&runtime_dir).unwrap_or_default();
354
355    let mut report = DistillReport {
356        transcript_path: request.transcript_path.clone(),
357        queue_drained: drained.len(),
358        sampling_attempted: false,
359        fallback_used: "tier1_heuristic".to_string(),
360        ..Default::default()
361    };
362
363    // Compute context signals once for the entire pipeline run.
364    let context = infer_context_signals(
365        &request.cwd,
366        request.transcript_path.as_deref(),
367        request.project_id.as_deref(),
368    );
369
370    let transcript_excerpt = request
371        .transcript_path
372        .as_deref()
373        .map(transcript_excerpt_for_sampling)
374        .unwrap_or_default();
375
376    // Step 2 — sampling reverse-call. Records candidates *before*
377    // Tier 1 so the Tier 1 dedupe can suppress duplicates.
378    let mut sampling_summaries: Vec<String> = Vec::new();
379    if sampling.is_available() {
380        report.sampling_attempted = true;
381        let prompt = build_sampling_prompt(&drained, &transcript_excerpt);
382        match sampling.create_message(&prompt).await {
383            Ok(response_text) => {
384                let parsed = parse_sampling_candidates(&response_text);
385                let written = persist_sampling_candidates(
386                    &request.config_path,
387                    parsed,
388                    &request.actor,
389                    &request.source_ref_extraction,
390                    &context,
391                    &mut report,
392                );
393                sampling_summaries.extend(written);
394                report.fallback_used = if sampling_summaries.is_empty() {
395                    "sampling_no_candidates+tier1_combined".to_string()
396                } else {
397                    "sampling+tier1_combined".to_string()
398                };
399            }
400            Err(err) => {
401                report.fallback_used = format!("sampling_failed:{err}+tier1_fallback");
402            }
403        }
404    }
405
406    // Step 3 — Tier 1 self-tag.
407    let spool_root = request
408        .config_path
409        .parent()
410        .unwrap_or_else(|| Path::new("."));
411    let user_rules = crate::rules::load(spool_root);
412    let signals = match request.transcript_path.as_deref() {
413        Some(path) => collect_user_self_tags(path, &user_rules.extraction),
414        None => Vec::new(),
415    };
416    report.signals_detected = signals.len();
417
418    let mut self_tag_summaries: Vec<String> = Vec::new();
419    if !signals.is_empty() {
420        let service = LifecycleService::new();
421        let existing_summaries = load_existing_wakeup_summaries(service, &request.config_path);
422        let mut written: Vec<String> = Vec::new();
423        for signal in signals {
424            let redacted = redact::redact(&signal.content);
425            if !redacted.is_clean() {
426                report.signals_redacted_dropped += 1;
427                continue;
428            }
429            if is_suppressed(&redacted.redacted, &user_rules.suppress) {
430                report.signals_redacted_dropped += 1;
431                continue;
432            }
433            let summary_lower = redacted.redacted.to_lowercase();
434            if existing_summaries
435                .iter()
436                .any(|s| s.eq_ignore_ascii_case(&summary_lower))
437                || written
438                    .iter()
439                    .any(|s| s.eq_ignore_ascii_case(&summary_lower))
440            {
441                report.signals_duplicate_dropped += 1;
442                continue;
443            }
444            match persist_self_tag(
445                service,
446                &request.config_path,
447                &signal,
448                &redacted.redacted,
449                &request.actor,
450                &request.source_ref_self_tag,
451                &context,
452            ) {
453                Ok(record_id) => {
454                    written.push(summary_lower);
455                    self_tag_summaries.push(redacted.redacted);
456                    report.signals_persisted.push(record_id);
457                }
458                Err(err) => {
459                    eprintln!(
460                        "[spool distill] failed to persist self-tag '{}': {:#}",
461                        signal.trigger, err
462                    );
463                }
464            }
465        }
466    }
467
468    // Step 4 — Tier 1 extraction. Sampling-produced summaries are
469    // additionally treated as duplicates so we don't double-write.
470    if let Some(path) = request.transcript_path.as_deref() {
471        let candidates = collect_user_extraction(path);
472        report.candidates_detected += candidates.len();
473        if !candidates.is_empty() {
474            let mut suppress = self_tag_summaries.clone();
475            suppress.extend(sampling_summaries.iter().cloned());
476            persist_candidates(
477                &request.config_path,
478                candidates,
479                &suppress,
480                &request.actor,
481                &request.source_ref_extraction,
482                &context,
483                &mut report,
484            );
485        }
486    }
487
488    Ok(report)
489}
490
491/// Render the sampling reverse-call prompt sent to the client's LLM.
492///
493/// Contract sent to the LLM:
494/// 1. Bullet list of pending tool-use signals (drained queue).
495/// 2. Tail excerpt of recent transcript turns (kept short — R4b
496///    targets a small context window, not full transcript replay).
497/// 3. Strict JSON-array output schema with kind + summary fields so
498///    [`parse_sampling_candidates`] can parse it.
499///
500/// We deliberately **do not** ask the LLM for self-tag detection
501/// (`accepted` records). PRD §B.14 reserves accepted writes to
502/// explicit user self-tag heuristics so users always see why their
503/// memory was promoted — sampling-derived candidates always start
504/// at `candidate` state and require user review.
505fn build_sampling_prompt(
506    drained: &[distill_queue::DistillSignal],
507    transcript_excerpt: &str,
508) -> String {
509    let mut buf = String::with_capacity(2048);
510    buf.push_str("You are spool-distill, a memory-extraction helper.\n");
511    buf.push_str(
512        "Your job is to pull out user-relevant *memories* (decisions, \
513         preferences, recurring incidents, durable facts) from the \
514         working session below, and return them as JSON.\n\n",
515    );
516
517    buf.push_str("## Pending tool-use signals\n");
518    if drained.is_empty() {
519        buf.push_str("(none)\n");
520    } else {
521        for (i, sig) in drained.iter().enumerate() {
522            let payload = sig.payload.as_deref().unwrap_or("");
523            let tool = sig.tool_name.as_deref().unwrap_or("?");
524            let head = first_chars(payload, 200);
525            buf.push_str(&format!("{}. [{}] {}\n", i + 1, tool, head));
526        }
527    }
528
529    buf.push_str("\n## Recent session excerpt\n");
530    if transcript_excerpt.is_empty() {
531        buf.push_str("(no transcript provided)\n");
532    } else {
533        buf.push_str(transcript_excerpt);
534        if !transcript_excerpt.ends_with('\n') {
535            buf.push('\n');
536        }
537    }
538
539    buf.push_str(
540        "\n## Output schema\n\
541         Return a JSON array (no prose, no markdown fences). Each \
542         element must be:\n\
543         {\n  \"kind\": \"behavior\"|\"incident\"|\"decision\",\n\
544         \"summary\": string,  // <= 200 chars\n\
545         \"entities\": [string],  // tools, libraries, concepts, APIs mentioned (optional)\n\
546         \"tags\": [string],  // semantic categories: database, testing, deployment, etc. (optional)\n\
547         \"triggers\": [string]  // keywords that should trigger retrieval of this memory (optional)\n\
548         }\n\
549         If you have nothing worth extracting, return [].\n\
550         Do not include API keys, tokens, or secrets in the summary.\n",
551    );
552    buf
553}
554
555/// Render a short transcript excerpt for the sampling prompt. We
556/// only keep the trailing user/assistant turns up to a soft char
557/// budget; secrets are redacted before they leave the process.
558fn transcript_excerpt_for_sampling(path: &Path) -> String {
559    const MAX_TURNS: usize = 12;
560    const MAX_CHARS: usize = 4000;
561
562    let entries = match transcript::read_tail(path, MAX_TURNS * 3) {
563        Ok(e) => e,
564        Err(_) => return String::new(),
565    };
566    let tail: Vec<&TranscriptEntry> = entries.iter().rev().take(MAX_TURNS).collect();
567    let mut out = String::new();
568    for entry in tail.into_iter().rev() {
569        let role = match entry {
570            TranscriptEntry::User { .. } => "user",
571            TranscriptEntry::Assistant { .. } => "assistant",
572            // Tool exchanges are noise for memory extraction; skip
573            // them rather than feeding tool I/O to the LLM. Other
574            // raw entries are skipped for the same reason — we only
575            // want human-authored or assistant-authored prose.
576            _ => continue,
577        };
578        let text = entry.text();
579        let redacted = redact::redact(&text).redacted;
580        let head = first_chars(&redacted, 400);
581        out.push_str(&format!("[{role}] {head}\n"));
582        if out.len() > MAX_CHARS {
583            break;
584        }
585    }
586    out
587}
588
589/// Parse the LLM-produced text back into structured candidates. The
590/// response is expected to be a JSON array of `{kind, summary, ...}`.
591/// We tolerate:
592/// - leading/trailing whitespace,
593/// - a single fenced ```json ...``` block (the LLM sometimes wraps
594///   even after we ask it not to),
595/// - unknown `kind` values (default to `Incident`),
596/// - missing entities/tags/triggers fields (default to empty vecs).
597///
598/// On any unrecoverable parse error we return an empty vec — the
599/// caller will fall back to Tier 1 heuristics, which is the safe
600/// behavior contract per PRD §C.13.
601fn parse_sampling_candidates(response: &str) -> Vec<SamplingCandidate> {
602    let trimmed = strip_code_fence(response.trim());
603    let parsed: Result<Vec<serde_json::Value>, _> = serde_json::from_str(trimmed);
604    let array = match parsed {
605        Ok(arr) => arr,
606        Err(_) => return Vec::new(),
607    };
608    let mut out = Vec::with_capacity(array.len());
609    for item in array {
610        let summary = item
611            .get("summary")
612            .and_then(serde_json::Value::as_str)
613            .unwrap_or("")
614            .trim();
615        if summary.is_empty() {
616            continue;
617        }
618        let kind_str = item
619            .get("kind")
620            .and_then(serde_json::Value::as_str)
621            .unwrap_or("incident");
622        let kind = match kind_str {
623            "behavior" => ExtractionKind::BehaviorPattern,
624            "decision" => ExtractionKind::Decision,
625            _ => ExtractionKind::Incident,
626        };
627        let entities = parse_string_array_from_json(&item, "entities");
628        let tags = parse_string_array_from_json(&item, "tags");
629        let triggers = parse_string_array_from_json(&item, "triggers");
630        out.push(SamplingCandidate {
631            kind,
632            summary: summary.to_string(),
633            entities,
634            tags,
635            triggers,
636        });
637    }
638    out
639}
640
641/// Parse an optional string array field from a JSON value. Returns
642/// empty vec if the field is missing or not an array.
643fn parse_string_array_from_json(value: &serde_json::Value, key: &str) -> Vec<String> {
644    value
645        .get(key)
646        .and_then(serde_json::Value::as_array)
647        .map(|arr| {
648            arr.iter()
649                .filter_map(serde_json::Value::as_str)
650                .map(|s| s.trim().to_string())
651                .filter(|s| !s.is_empty())
652                .collect()
653        })
654        .unwrap_or_default()
655}
656
657fn strip_code_fence(s: &str) -> &str {
658    let s = s.trim();
659    if let Some(rest) = s.strip_prefix("```json") {
660        return rest.trim_start().trim_end_matches("```").trim();
661    }
662    if let Some(rest) = s.strip_prefix("```") {
663        return rest.trim_start().trim_end_matches("```").trim();
664    }
665    s
666}
667
668/// Persist sampling-derived candidates to the ledger as `candidate`
669/// state. Returns the redacted summaries that were actually written
670/// so the caller can dedupe Tier 1 against them.
671fn persist_sampling_candidates(
672    config_path: &Path,
673    parsed: Vec<SamplingCandidate>,
674    actor: &str,
675    source_ref: &str,
676    context: &ContextSignals,
677    report: &mut DistillReport,
678) -> Vec<String> {
679    if parsed.is_empty() {
680        return Vec::new();
681    }
682    let service = LifecycleService::new();
683    let existing_pending = load_existing_pending_summaries(service, config_path);
684    let mut written: Vec<String> = Vec::new();
685    let mut written_summaries: Vec<String> = Vec::new();
686
687    for candidate in parsed {
688        report.candidates_detected += 1;
689        let redacted = redact::redact(&candidate.summary);
690        if !redacted.is_clean() {
691            report.candidates_redacted_dropped += 1;
692            continue;
693        }
694        let summary_lower = redacted.redacted.to_lowercase();
695        let dup = existing_pending
696            .iter()
697            .any(|s| s.eq_ignore_ascii_case(&summary_lower))
698            || written
699                .iter()
700                .any(|s| s.eq_ignore_ascii_case(&summary_lower));
701        if dup {
702            report.candidates_duplicate_dropped += 1;
703            continue;
704        }
705        // Merge LLM-provided fields with context-inferred signals.
706        // LLM fields take priority; context fills gaps.
707        let entities = if candidate.entities.is_empty() {
708            context.entities.clone()
709        } else {
710            candidate.entities.clone()
711        };
712        let tags = if candidate.tags.is_empty() {
713            let mut t = tags_for_kind(candidate.kind);
714            t.extend(context.tags.iter().cloned());
715            t
716        } else {
717            candidate.tags.clone()
718        };
719        let triggers = if candidate.triggers.is_empty() {
720            triggers_from_summary(&redacted.redacted)
721        } else {
722            candidate.triggers.clone()
723        };
724
725        let title = build_sampling_title(&candidate);
726        let request = ProposeMemoryRequest {
727            title,
728            summary: redacted.redacted.clone(),
729            memory_type: candidate.kind.memory_type().to_string(),
730            scope: MemoryScope::Project,
731            source_ref: source_ref.to_string(),
732            project_id: None,
733            user_id: None,
734            sensitivity: None,
735            metadata: TransitionMetadata {
736                actor: Some(actor.to_string()),
737                reason: Some(format!("sampling extraction ({:?})", candidate.kind)),
738                evidence_refs: Vec::new(),
739            },
740            entities,
741            tags,
742            triggers,
743            related_files: context.related_files.clone(),
744            related_records: Vec::new(),
745            supersedes: None,
746            applies_to: context.applies_to.clone(),
747            valid_until: None,
748        };
749        match service.propose_ai(config_path, request) {
750            Ok(result) => {
751                vault_writer::writeback_from_config(config_path, &result.entry);
752                written.push(summary_lower);
753                written_summaries.push(redacted.redacted);
754                report.candidates_persisted.push(result.entry.record_id);
755            }
756            Err(err) => {
757                eprintln!(
758                    "[spool distill] failed to persist sampling candidate ({:?}): {:#}",
759                    candidate.kind, err
760                );
761            }
762        }
763    }
764    written_summaries
765}
766
767fn build_sampling_title(candidate: &SamplingCandidate) -> String {
768    let head = first_chars(&candidate.summary, 60);
769    format!("[{}] {}", candidate.kind.memory_type(), head)
770}
771
772fn ensure_runtime_dir(cwd: &Path) -> Result<PathBuf> {
773    let dir = cwd.join(".spool");
774    if !dir.exists() {
775        std::fs::create_dir_all(&dir)
776            .with_context(|| format!("creating runtime dir {}", dir.display()))?;
777    }
778    Ok(dir)
779}
780
781fn collect_user_self_tags(
782    path: &Path,
783    user_rules: &[crate::rules::ExtractionRule],
784) -> Vec<SelfTagSignal> {
785    let entries = match transcript::read_tail(path, 500) {
786        Ok(e) => e,
787        Err(err) => {
788            eprintln!(
789                "[spool distill] failed to read transcript {}: {:#}",
790                path.display(),
791                err
792            );
793            return Vec::new();
794        }
795    };
796    let mut signals = Vec::new();
797    for entry in entries {
798        if !matches!(entry, TranscriptEntry::User { .. }) {
799            continue;
800        }
801        signals.extend(self_tag::detect_with_user_rules(
802            entry.authored_text(),
803            user_rules,
804        ));
805    }
806    signals
807}
808
809fn collect_user_extraction(path: &Path) -> Vec<ExtractionSignal> {
810    let entries = match transcript::read_tail(path, 500) {
811        Ok(e) => e,
812        Err(_) => return Vec::new(),
813    };
814    let user_texts: Vec<&str> = entries
815        .iter()
816        .filter(|e| matches!(e, TranscriptEntry::User { .. }))
817        .map(|e| e.authored_text())
818        .collect();
819    extraction::detect(&user_texts)
820}
821
822fn persist_candidates(
823    config_path: &Path,
824    candidates: Vec<ExtractionSignal>,
825    self_tag_summaries: &[String],
826    actor: &str,
827    source_ref: &str,
828    context: &ContextSignals,
829    report: &mut DistillReport,
830) {
831    let service = LifecycleService::new();
832    let existing_pending = load_existing_pending_summaries(service, config_path);
833    let mut written: Vec<String> = Vec::new();
834
835    for signal in candidates {
836        let redacted = redact::redact(&signal.summary);
837        if !redacted.is_clean() {
838            report.candidates_redacted_dropped += 1;
839            continue;
840        }
841        let summary_lower = redacted.redacted.to_lowercase();
842        let dup = existing_pending
843            .iter()
844            .any(|s| s.eq_ignore_ascii_case(&summary_lower))
845            || written
846                .iter()
847                .any(|s| s.eq_ignore_ascii_case(&summary_lower))
848            || self_tag_summaries
849                .iter()
850                .any(|s| s.eq_ignore_ascii_case(&summary_lower));
851        if dup {
852            report.candidates_duplicate_dropped += 1;
853            continue;
854        }
855        match persist_extraction(
856            service,
857            config_path,
858            &signal,
859            &redacted.redacted,
860            actor,
861            source_ref,
862            context,
863        ) {
864            Ok(record_id) => {
865                written.push(summary_lower);
866                report.candidates_persisted.push(record_id);
867            }
868            Err(err) => {
869                eprintln!(
870                    "[spool distill] failed to persist candidate ({:?}): {:#}",
871                    signal.kind, err
872                );
873            }
874        }
875    }
876}
877
878fn is_suppressed(text: &str, suppress_rules: &[crate::rules::SuppressRule]) -> bool {
879    if suppress_rules.is_empty() {
880        return false;
881    }
882    let text_lower = text.to_lowercase();
883    for rule in suppress_rules {
884        if rule.pattern.is_empty() {
885            continue;
886        }
887        match regex::Regex::new(&rule.pattern) {
888            Ok(re) => {
889                if re.is_match(&text_lower) {
890                    return true;
891                }
892            }
893            Err(_) => {
894                if text_lower.contains(&rule.pattern.to_lowercase()) {
895                    return true;
896                }
897            }
898        }
899    }
900    false
901}
902
903fn load_existing_wakeup_summaries(service: LifecycleService, config_path: &Path) -> Vec<String> {
904    match service.load_workbench(config_path) {
905        Ok(snap) => snap
906            .wakeup_ready
907            .into_iter()
908            .map(|e| e.record.summary.to_lowercase())
909            .collect(),
910        Err(err) => {
911            eprintln!(
912                "[spool distill] failed to load wakeup-ready snapshot: {:#}",
913                err
914            );
915            Vec::new()
916        }
917    }
918}
919
920fn load_existing_pending_summaries(service: LifecycleService, config_path: &Path) -> Vec<String> {
921    match service.load_workbench(config_path) {
922        Ok(snap) => snap
923            .pending_review
924            .into_iter()
925            .map(|e| e.record.summary.to_lowercase())
926            .collect(),
927        Err(err) => {
928            eprintln!(
929                "[spool distill] failed to load pending-review snapshot: {:#}",
930                err
931            );
932            Vec::new()
933        }
934    }
935}
936
937fn persist_self_tag(
938    service: LifecycleService,
939    config_path: &Path,
940    signal: &SelfTagSignal,
941    summary: &str,
942    actor: &str,
943    source_ref: &str,
944    context: &ContextSignals,
945) -> Result<String> {
946    let title = build_self_tag_title(signal);
947    let triggers = triggers_from_summary(summary);
948    let mut tags = tags_for_kind(match signal.kind {
949        super::heuristic::self_tag::SelfTagKind::Decision => ExtractionKind::Decision,
950        _ => ExtractionKind::BehaviorPattern,
951    });
952    tags.extend(context.tags.iter().cloned());
953    let request = RecordMemoryRequest {
954        title,
955        summary: summary.to_string(),
956        memory_type: signal.kind.memory_type().to_string(),
957        scope: if context.project_id.is_some() {
958            MemoryScope::Project
959        } else {
960            MemoryScope::User
961        },
962        source_ref: source_ref.to_string(),
963        project_id: context.project_id.clone(),
964        user_id: None,
965        sensitivity: None,
966        metadata: TransitionMetadata {
967            actor: Some(actor.to_string()),
968            reason: Some(format!("self-tag detected: {}", signal.trigger)),
969            evidence_refs: Vec::new(),
970        },
971        entities: context.entities.clone(),
972        tags,
973        triggers,
974        related_files: context.related_files.clone(),
975        related_records: Vec::new(),
976        supersedes: None,
977        applies_to: context.applies_to.clone(),
978        valid_until: None,
979    };
980    let result = service.record_manual(config_path, request)?;
981    vault_writer::writeback_from_config(config_path, &result.entry);
982    Ok(result.entry.record_id)
983}
984
985fn persist_extraction(
986    service: LifecycleService,
987    config_path: &Path,
988    signal: &ExtractionSignal,
989    summary: &str,
990    actor: &str,
991    source_ref: &str,
992    context: &ContextSignals,
993) -> Result<String> {
994    let title = build_extraction_title(signal);
995    let triggers = triggers_from_summary(summary);
996    let mut tags = tags_for_kind(signal.kind);
997    tags.extend(context.tags.iter().cloned());
998    let request = ProposeMemoryRequest {
999        title,
1000        summary: summary.to_string(),
1001        memory_type: signal.kind.memory_type().to_string(),
1002        scope: if context.project_id.is_some() {
1003            MemoryScope::Project
1004        } else {
1005            MemoryScope::User
1006        },
1007        source_ref: source_ref.to_string(),
1008        project_id: context.project_id.clone(),
1009        user_id: None,
1010        sensitivity: None,
1011        metadata: TransitionMetadata {
1012            actor: Some(actor.to_string()),
1013            reason: Some(format!(
1014                "extraction heuristic ({:?}) hits={}",
1015                signal.kind,
1016                signal.evidence_indices.len()
1017            )),
1018            evidence_refs: Vec::new(),
1019        },
1020        entities: context.entities.clone(),
1021        tags,
1022        triggers,
1023        related_files: context.related_files.clone(),
1024        related_records: Vec::new(),
1025        supersedes: None,
1026        applies_to: context.applies_to.clone(),
1027        valid_until: None,
1028    };
1029    let result = service.propose_ai(config_path, request)?;
1030    vault_writer::writeback_from_config(config_path, &result.entry);
1031    Ok(result.entry.record_id)
1032}
1033
1034fn build_self_tag_title(signal: &SelfTagSignal) -> String {
1035    let head = first_chars(&signal.content, 60);
1036    format!("[{}] {}", signal.trigger, head)
1037}
1038
1039fn build_extraction_title(signal: &ExtractionSignal) -> String {
1040    let head = first_chars(&signal.summary, 60);
1041    format!("[{}] {}", signal.kind.memory_type(), head)
1042}
1043
1044fn first_chars(s: &str, max: usize) -> String {
1045    let mut out = String::new();
1046    for (i, ch) in s.chars().enumerate() {
1047        if i >= max {
1048            out.push('…');
1049            break;
1050        }
1051        out.push(ch);
1052    }
1053    out
1054}
1055
1056#[cfg(test)]
1057mod tests {
1058    use super::*;
1059    use serde_json::{Value, json};
1060    use std::fs;
1061    use tempfile::tempdir;
1062
1063    fn fixture_config(temp: &tempfile::TempDir) -> PathBuf {
1064        let cfg = temp.path().join("spool.toml");
1065        fs::write(&cfg, "[vault]\nroot = \"/tmp\"\n").unwrap();
1066        cfg
1067    }
1068
1069    fn write_transcript(path: &Path, entries: &[Value]) {
1070        let mut body = String::new();
1071        for e in entries {
1072            body.push_str(&e.to_string());
1073            body.push('\n');
1074        }
1075        fs::write(path, body).unwrap();
1076    }
1077
1078    #[test]
1079    fn run_without_transcript_returns_empty_report() {
1080        let temp = tempdir().unwrap();
1081        let cfg = fixture_config(&temp);
1082        let cwd = temp.path().join("repo");
1083        fs::create_dir_all(&cwd).unwrap();
1084
1085        let report = run(DistillRequest::new(cfg, cwd, None)).unwrap();
1086        assert_eq!(report.signals_detected, 0);
1087        assert_eq!(report.candidates_detected, 0);
1088        assert!(!report.sampling_attempted);
1089        assert_eq!(report.fallback_used, "tier1_heuristic");
1090    }
1091
1092    #[test]
1093    fn run_persists_self_tag_signal_with_custom_source_ref() {
1094        let temp = tempdir().unwrap();
1095        let cfg = fixture_config(&temp);
1096        let cwd = temp.path().join("repo");
1097        fs::create_dir_all(&cwd).unwrap();
1098        let transcript = temp.path().join("session.jsonl");
1099        write_transcript(
1100            &transcript,
1101            &[json!({
1102                "type": "user",
1103                "message": {"role": "user", "content": "记一下: cargo install 是默认"}
1104            })],
1105        );
1106
1107        let request = DistillRequest::new(cfg.clone(), cwd, Some(transcript))
1108            .with_actor("test-actor")
1109            .with_source_refs("test:self-tag", "test:extraction");
1110        let report = run(request).unwrap();
1111
1112        assert_eq!(report.signals_persisted.len(), 1);
1113        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1114        assert_eq!(snap.wakeup_ready.len(), 1);
1115        assert_eq!(
1116            snap.wakeup_ready[0].record.origin.source_ref,
1117            "test:self-tag"
1118        );
1119        assert_eq!(
1120            snap.wakeup_ready[0].metadata.actor.as_deref(),
1121            Some("test-actor")
1122        );
1123    }
1124
1125    #[test]
1126    fn run_persists_extraction_candidate_with_custom_source_ref() {
1127        let temp = tempdir().unwrap();
1128        let cfg = fixture_config(&temp);
1129        let cwd = temp.path().join("repo");
1130        fs::create_dir_all(&cwd).unwrap();
1131        let transcript = temp.path().join("session.jsonl");
1132        write_transcript(
1133            &transcript,
1134            &[
1135                json!({"type":"user","message":{"role":"user","content":"试一下"}}),
1136                json!({"type":"user","message":{"role":"user","content":"还是错了"}}),
1137                json!({"type":"user","message":{"role":"user","content":"又失败了"}}),
1138            ],
1139        );
1140
1141        let request = DistillRequest::new(cfg.clone(), cwd, Some(transcript))
1142            .with_actor("mcp")
1143            .with_source_refs("mcp:self-tag", "mcp:extraction");
1144        let report = run(request).unwrap();
1145
1146        assert_eq!(report.candidates_persisted.len(), 1);
1147        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1148        assert_eq!(snap.pending_review.len(), 1);
1149        assert_eq!(
1150            snap.pending_review[0].record.origin.source_ref,
1151            "mcp:extraction"
1152        );
1153    }
1154
1155    #[test]
1156    fn run_drops_self_tag_with_secret_and_keeps_pipeline_alive() {
1157        let temp = tempdir().unwrap();
1158        let cfg = fixture_config(&temp);
1159        let cwd = temp.path().join("repo");
1160        fs::create_dir_all(&cwd).unwrap();
1161        let transcript = temp.path().join("session.jsonl");
1162        write_transcript(
1163            &transcript,
1164            &[json!({
1165                "type": "user",
1166                "message": {
1167                    "role": "user",
1168                    "content": "记一下: prod token sk-abcDEF1234567890ABCDEFGHIJ stays here"
1169                }
1170            })],
1171        );
1172
1173        let report = run(DistillRequest::new(cfg.clone(), cwd, Some(transcript))).unwrap();
1174        assert_eq!(report.signals_detected, 1);
1175        assert_eq!(report.signals_redacted_dropped, 1);
1176        assert!(report.signals_persisted.is_empty());
1177        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1178        assert!(snap.wakeup_ready.is_empty());
1179    }
1180
1181    #[test]
1182    fn run_drains_queue_even_without_transcript() {
1183        let temp = tempdir().unwrap();
1184        let cfg = fixture_config(&temp);
1185        let cwd = temp.path().join("repo");
1186        fs::create_dir_all(&cwd).unwrap();
1187        let runtime_dir = cwd.join(".spool");
1188        fs::create_dir_all(&runtime_dir).unwrap();
1189        distill_queue::append(
1190            &runtime_dir,
1191            &distill_queue::DistillSignal {
1192                recorded_at: 1,
1193                tool_name: Some("Bash".into()),
1194                cwd: cwd.display().to_string(),
1195                payload: Some("ls".into()),
1196            },
1197            distill_queue::DEFAULT_LRU_CAP,
1198        )
1199        .unwrap();
1200
1201        let report = run(DistillRequest::new(cfg, cwd, None)).unwrap();
1202        assert_eq!(report.queue_drained, 1);
1203        assert!(distill_queue::peek_all(&runtime_dir).unwrap().is_empty());
1204    }
1205
1206    /// FakeSamplingClient with configurable result, used to exercise
1207    /// the R4b sampling branch in `run_with_sampling` without any
1208    /// real MCP reverse-call infrastructure. We use a `Mutex` instead
1209    /// of `RefCell` because the trait now requires `Sync` (boxed
1210    /// futures need to be sendable across the runtime).
1211    struct FakeSamplingClient {
1212        available: bool,
1213        outcome: std::sync::Mutex<Result<String, SamplingError>>,
1214    }
1215    impl SamplingClient for FakeSamplingClient {
1216        fn is_available(&self) -> bool {
1217            self.available
1218        }
1219        fn create_message<'a>(&'a self, _prompt: &'a str) -> SamplingFuture<'a> {
1220            let outcome = self.outcome.lock().unwrap().clone();
1221            Box::pin(async move { outcome })
1222        }
1223    }
1224
1225    fn block_on<F: std::future::Future>(fut: F) -> F::Output {
1226        tokio::runtime::Builder::new_current_thread()
1227            .build()
1228            .unwrap()
1229            .block_on(fut)
1230    }
1231
1232    #[test]
1233    fn noop_sampling_client_reports_unavailable_and_keeps_tier1_label() {
1234        // Contract: NoopSamplingClient never trips the sampling
1235        // branch — `is_available` is false so neither
1236        // `sampling_attempted` nor `fallback_used` move. The Stop
1237        // hook and CLI rely on this so they stay on Tier 1 without
1238        // any client-capability negotiation.
1239        let temp = tempdir().unwrap();
1240        let cfg = fixture_config(&temp);
1241        let cwd = temp.path().join("repo");
1242        fs::create_dir_all(&cwd).unwrap();
1243
1244        let report = block_on(run_with_sampling(
1245            DistillRequest::new(cfg, cwd, None),
1246            &NoopSamplingClient,
1247        ))
1248        .unwrap();
1249        assert!(!report.sampling_attempted);
1250        assert_eq!(report.fallback_used, "tier1_heuristic");
1251    }
1252
1253    #[test]
1254    fn run_with_sampling_records_failure_and_falls_back_to_tier1() {
1255        // Contract: when the client *does* advertise sampling but the
1256        // reverse-call fails (rejected, timeout, transport, …), the
1257        // pipeline must (1) flag `sampling_attempted=true`, (2) embed
1258        // the failure reason in `fallback_used`, and (3) still run
1259        // Tier 1 so the baseline self-tag / extraction signal isn't
1260        // lost.
1261        let temp = tempdir().unwrap();
1262        let cfg = fixture_config(&temp);
1263        let cwd = temp.path().join("repo");
1264        fs::create_dir_all(&cwd).unwrap();
1265        let transcript = temp.path().join("session.jsonl");
1266        write_transcript(
1267            &transcript,
1268            &[json!({
1269                "type": "user",
1270                "message": {"role": "user", "content": "记一下: sampling fallback path"}
1271            })],
1272        );
1273
1274        let fake = FakeSamplingClient {
1275            available: true,
1276            outcome: std::sync::Mutex::new(Err(SamplingError::Rejected("user denied".into()))),
1277        };
1278        let report = block_on(run_with_sampling(
1279            DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
1280            &fake,
1281        ))
1282        .unwrap();
1283        assert!(report.sampling_attempted);
1284        assert!(
1285            report.fallback_used.contains("sampling_failed"),
1286            "fallback label must mention sampling failure: {}",
1287            report.fallback_used
1288        );
1289        assert!(
1290            report.fallback_used.contains("rejected"),
1291            "fallback label must surface rejection cause: {}",
1292            report.fallback_used
1293        );
1294        // Tier 1 still ran underneath: the self-tag was persisted.
1295        assert_eq!(report.signals_persisted.len(), 1);
1296    }
1297
1298    #[test]
1299    fn run_with_sampling_writes_candidates_from_llm_response() {
1300        // Contract for R4b-2: when sampling succeeds and returns a
1301        // valid JSON array, every well-formed entry must land in the
1302        // ledger as a `candidate` (never accepted) with kind drawn
1303        // from the `kind` field. Tier 1 still runs underneath.
1304        let temp = tempdir().unwrap();
1305        let cfg = fixture_config(&temp);
1306        let cwd = temp.path().join("repo");
1307        fs::create_dir_all(&cwd).unwrap();
1308        let transcript = temp.path().join("session.jsonl");
1309        write_transcript(
1310            &transcript,
1311            &[json!({
1312                "type": "user",
1313                "message": {"role": "user", "content": "we keep retrying after failures"}
1314            })],
1315        );
1316
1317        let llm_json = r#"[
1318            {"kind":"behavior","summary":"prefers cargo install + ~/.cargo/bin"},
1319            {"kind":"decision","summary":"all sampling writes start as candidate"},
1320            {"kind":"incident","summary":"target/debug binary kept getting wiped"}
1321        ]"#;
1322        let fake = FakeSamplingClient {
1323            available: true,
1324            outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
1325        };
1326        let report = block_on(run_with_sampling(
1327            DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
1328            &fake,
1329        ))
1330        .unwrap();
1331        assert!(report.sampling_attempted);
1332        assert_eq!(
1333            report.candidates_persisted.len(),
1334            3,
1335            "all 3 LLM candidates must be persisted: {:?}",
1336            report.candidates_persisted
1337        );
1338        assert!(
1339            report.fallback_used.starts_with("sampling+tier1_combined"),
1340            "fallback label must reflect successful sampling: {}",
1341            report.fallback_used
1342        );
1343
1344        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1345        // 3 candidates from sampling are pending review; no accepted
1346        // memory should appear because LLM never produces self-tags.
1347        assert_eq!(snap.pending_review.len(), 3);
1348        let memory_types: Vec<&str> = snap
1349            .pending_review
1350            .iter()
1351            .map(|e| e.record.memory_type.as_str())
1352            .collect();
1353        assert!(memory_types.contains(&"behavior_pattern"));
1354        assert!(memory_types.contains(&"decision"));
1355        assert!(memory_types.contains(&"incident"));
1356    }
1357
1358    #[test]
1359    fn run_with_sampling_tolerates_fenced_json_response() {
1360        // The LLM occasionally wraps even after we ask it not to.
1361        // Strip the fence and proceed.
1362        let temp = tempdir().unwrap();
1363        let cfg = fixture_config(&temp);
1364        let cwd = temp.path().join("repo");
1365        fs::create_dir_all(&cwd).unwrap();
1366        let transcript = temp.path().join("session.jsonl");
1367        write_transcript(&transcript, &[]);
1368
1369        let fake = FakeSamplingClient {
1370            available: true,
1371            outcome: std::sync::Mutex::new(Ok(
1372                "```json\n[{\"kind\":\"decision\",\"summary\":\"fenced\"}]\n```".to_string(),
1373            )),
1374        };
1375        let report = block_on(run_with_sampling(
1376            DistillRequest::new(cfg, cwd, Some(transcript)),
1377            &fake,
1378        ))
1379        .unwrap();
1380        assert_eq!(report.candidates_persisted.len(), 1);
1381    }
1382
1383    #[test]
1384    fn run_with_sampling_drops_secret_carrying_candidates() {
1385        let temp = tempdir().unwrap();
1386        let cfg = fixture_config(&temp);
1387        let cwd = temp.path().join("repo");
1388        fs::create_dir_all(&cwd).unwrap();
1389
1390        let llm_json =
1391            r#"[{"kind":"decision","summary":"prod token sk-abcDEF1234567890ABCDEFGHIJ leaked"}]"#;
1392        let fake = FakeSamplingClient {
1393            available: true,
1394            outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
1395        };
1396        let report = block_on(run_with_sampling(
1397            DistillRequest::new(cfg, cwd, None),
1398            &fake,
1399        ))
1400        .unwrap();
1401        assert!(report.sampling_attempted);
1402        assert_eq!(report.candidates_persisted.len(), 0);
1403        assert_eq!(report.candidates_redacted_dropped, 1);
1404    }
1405
1406    #[test]
1407    fn self_tag_should_populate_context_signals_from_cwd() {
1408        let temp = tempdir().unwrap();
1409        let cfg = fixture_config(&temp);
1410        // Use a meaningful project name in the cwd path
1411        let cwd = temp.path().join("my-project");
1412        fs::create_dir_all(&cwd).unwrap();
1413        let transcript = temp.path().join("session.jsonl");
1414        write_transcript(
1415            &transcript,
1416            &[json!({
1417                "type": "user",
1418                "message": {"role": "user", "content": "记一下: 用 cargo test 跑测试"}
1419            })],
1420        );
1421
1422        let report = run(DistillRequest::new(cfg.clone(), cwd, Some(transcript))).unwrap();
1423        assert_eq!(report.signals_persisted.len(), 1);
1424
1425        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1426        let record = &snap.wakeup_ready[0].record;
1427        // applies_to should contain the project name from cwd
1428        assert!(
1429            record.applies_to.iter().any(|a| a == "my-project"),
1430            "applies_to should contain project name: {:?}",
1431            record.applies_to
1432        );
1433        // triggers should be populated from summary
1434        assert!(
1435            !record.triggers.is_empty(),
1436            "triggers should be populated from summary"
1437        );
1438        // entities should contain the project name
1439        assert!(
1440            record.entities.iter().any(|e| e == "my-project"),
1441            "entities should contain project name: {:?}",
1442            record.entities
1443        );
1444    }
1445
1446    #[test]
1447    fn sampling_should_use_llm_provided_structured_fields() {
1448        let temp = tempdir().unwrap();
1449        let cfg = fixture_config(&temp);
1450        let cwd = temp.path().join("repo");
1451        fs::create_dir_all(&cwd).unwrap();
1452        let transcript = temp.path().join("session.jsonl");
1453        write_transcript(&transcript, &[]);
1454
1455        let llm_json = r#"[{
1456            "kind": "decision",
1457            "summary": "use SQLite for local storage",
1458            "entities": ["SQLite", "rusqlite"],
1459            "tags": ["database", "storage"],
1460            "triggers": ["sqlite", "local storage", "database"]
1461        }]"#;
1462        let fake = FakeSamplingClient {
1463            available: true,
1464            outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
1465        };
1466        let report = block_on(run_with_sampling(
1467            DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
1468            &fake,
1469        ))
1470        .unwrap();
1471        assert_eq!(report.candidates_persisted.len(), 1);
1472
1473        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1474        let record = &snap.pending_review[0].record;
1475        // LLM-provided fields should be used directly
1476        assert_eq!(record.entities, vec!["SQLite", "rusqlite"]);
1477        assert_eq!(record.tags, vec!["database", "storage"]);
1478        assert_eq!(record.triggers, vec!["sqlite", "local storage", "database"]);
1479    }
1480
1481    #[test]
1482    fn sampling_should_fallback_to_inferred_fields_when_llm_omits_them() {
1483        let temp = tempdir().unwrap();
1484        let cfg = fixture_config(&temp);
1485        let cwd = temp.path().join("spool");
1486        fs::create_dir_all(&cwd).unwrap();
1487        let transcript = temp.path().join("session.jsonl");
1488        write_transcript(&transcript, &[]);
1489
1490        // Old format without entities/tags/triggers
1491        let llm_json = r#"[{"kind":"behavior","summary":"prefers cargo install for binaries"}]"#;
1492        let fake = FakeSamplingClient {
1493            available: true,
1494            outcome: std::sync::Mutex::new(Ok(llm_json.to_string())),
1495        };
1496        let report = block_on(run_with_sampling(
1497            DistillRequest::new(cfg.clone(), cwd, Some(transcript)),
1498            &fake,
1499        ))
1500        .unwrap();
1501        assert_eq!(report.candidates_persisted.len(), 1);
1502
1503        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1504        let record = &snap.pending_review[0].record;
1505        // Should have inferred entities from cwd
1506        assert!(
1507            record.entities.iter().any(|e| e == "spool"),
1508            "entities should be inferred from cwd: {:?}",
1509            record.entities
1510        );
1511        // Should have inferred tags from kind (behavior → workflow)
1512        assert!(
1513            record.tags.iter().any(|t| t == "workflow"),
1514            "tags should be inferred from kind: {:?}",
1515            record.tags
1516        );
1517        // Should have inferred triggers from summary
1518        assert!(
1519            !record.triggers.is_empty(),
1520            "triggers should be inferred from summary"
1521        );
1522        // applies_to should contain project name
1523        assert!(
1524            record.applies_to.iter().any(|a| a == "spool"),
1525            "applies_to should contain project name: {:?}",
1526            record.applies_to
1527        );
1528    }
1529
1530    #[test]
1531    fn extraction_candidate_should_populate_context_signals() {
1532        let temp = tempdir().unwrap();
1533        let cfg = fixture_config(&temp);
1534        let cwd = temp.path().join("my-app");
1535        fs::create_dir_all(&cwd).unwrap();
1536        let transcript = temp.path().join("session.jsonl");
1537        write_transcript(
1538            &transcript,
1539            &[
1540                json!({"type":"user","message":{"role":"user","content":"试一下 src/main.rs"}}),
1541                json!({"type":"user","message":{"role":"user","content":"还是错了"}}),
1542                json!({"type":"user","message":{"role":"user","content":"又失败了"}}),
1543            ],
1544        );
1545
1546        let report = run(DistillRequest::new(cfg.clone(), cwd, Some(transcript))).unwrap();
1547        assert_eq!(report.candidates_persisted.len(), 1);
1548
1549        let snap = LifecycleService::new().load_workbench(&cfg).unwrap();
1550        let record = &snap.pending_review[0].record;
1551        // applies_to from cwd
1552        assert!(
1553            record.applies_to.iter().any(|a| a == "my-app"),
1554            "applies_to should contain project name: {:?}",
1555            record.applies_to
1556        );
1557        // tags should include debugging (incident kind)
1558        assert!(
1559            record.tags.iter().any(|t| t == "debugging"),
1560            "tags should include debugging for incident: {:?}",
1561            record.tags
1562        );
1563        // related_files should pick up src/main.rs from transcript
1564        assert!(
1565            record.related_files.iter().any(|f| f == "src/main.rs"),
1566            "related_files should contain src/main.rs: {:?}",
1567            record.related_files
1568        );
1569    }
1570
1571    #[test]
1572    fn infer_context_signals_should_extract_project_name() {
1573        let signals = infer_context_signals(Path::new("/Users/dev/Work/my-project"), None, None);
1574        assert_eq!(signals.applies_to, vec!["my-project"]);
1575        assert_eq!(signals.entities, vec!["my-project"]);
1576    }
1577
1578    #[test]
1579    fn infer_context_signals_should_skip_dot_directories() {
1580        let signals = infer_context_signals(Path::new("/Users/dev/.hidden"), None, None);
1581        // Should skip .hidden and use "dev" instead
1582        assert_eq!(signals.applies_to, vec!["dev"]);
1583    }
1584
1585    #[test]
1586    fn extract_file_paths_should_find_source_paths() {
1587        let text = "check src/engine/scorer.rs and also lib/utils.ts for the fix";
1588        let paths = extract_file_paths(text);
1589        assert!(paths.contains(&"src/engine/scorer.rs".to_string()));
1590        assert!(paths.contains(&"lib/utils.ts".to_string()));
1591    }
1592
1593    #[test]
1594    fn extract_file_paths_should_reject_urls() {
1595        let text = "see https://example.com/path/to/file.rs for docs";
1596        let paths = extract_file_paths(text);
1597        assert!(paths.is_empty());
1598    }
1599
1600    #[test]
1601    fn triggers_from_summary_should_extract_significant_words() {
1602        let triggers = triggers_from_summary("prefer cargo install for binary distribution");
1603        assert!(triggers.contains(&"prefer".to_string()));
1604        assert!(triggers.contains(&"cargo".to_string()));
1605        assert!(triggers.contains(&"install".to_string()));
1606        // Should not contain stop words
1607        assert!(!triggers.contains(&"for".to_string()));
1608    }
1609
1610    #[test]
1611    fn parse_sampling_candidates_should_handle_new_format_with_structured_fields() {
1612        let response = r#"[{
1613            "kind": "decision",
1614            "summary": "use tokio for async",
1615            "entities": ["tokio", "async-std"],
1616            "tags": ["async", "runtime"],
1617            "triggers": ["tokio", "async runtime"]
1618        }]"#;
1619        let candidates = parse_sampling_candidates(response);
1620        assert_eq!(candidates.len(), 1);
1621        assert_eq!(candidates[0].entities, vec!["tokio", "async-std"]);
1622        assert_eq!(candidates[0].tags, vec!["async", "runtime"]);
1623        assert_eq!(candidates[0].triggers, vec!["tokio", "async runtime"]);
1624    }
1625
1626    #[test]
1627    fn parse_sampling_candidates_should_handle_old_format_without_structured_fields() {
1628        let response = r#"[{"kind": "incident", "summary": "build failed twice"}]"#;
1629        let candidates = parse_sampling_candidates(response);
1630        assert_eq!(candidates.len(), 1);
1631        assert!(candidates[0].entities.is_empty());
1632        assert!(candidates[0].tags.is_empty());
1633        assert!(candidates[0].triggers.is_empty());
1634    }
1635}