Skip to main content

innate_core/kb/
recall.rs

1use super::*;
2
3/// Parameters for [`KnowledgeBase::recall`].
4///
5/// Borrowed, `Default`-able: construct with `RecallParams { query, budget, source, ..Default::default() }`.
6/// Empty-string defaults are normalized inside `recall`: `expand_deps` empty → `"false"`,
7/// `refine_mode` empty → `"off"`.
8#[derive(Debug, Clone, Default)]
9pub struct RecallParams<'a> {
10    pub query: &'a str,
11    pub budget: usize,
12    pub trace: bool,
13    pub include_sparks: bool,
14    pub top: Option<usize>,
15    pub source: &'a str,
16    pub expand_deps: &'a str, // "false" | "direct" | "closure"
17    pub allow_trim: bool,     // if true, invoke Refiner::trim when block doesn't fit
18    pub refine_mode: &'a str, // "off" | "trim" | "adapt" — recorded in trace
19    /// Relevance gate: drop candidates whose fused score is below this value
20    /// **before** packing/trace, so the trace only records knowledge that was
21    /// actually surfaced. `None` disables the gate. Used by always-on hooks
22    /// (UserPromptSubmit / SessionStart) to stay high-frequency without noise.
23    pub min_score: Option<f64>,
24    /// Session trace mode: open an episodic log for later record-correlation but
25    /// write **no** per-chunk `retrieved`/`selected` usage events. Used by the
26    /// daemon, which recalls only to obtain a `trace_id` and discards the
27    /// knowledge without ever placing it in a model context. `selected` must
28    /// strictly mean "entered the model context", so a caller that does not
29    /// inject the result must set this. Defaults to `false` (full injection).
30    pub session_only: bool,
31    /// Part (d) — opt-in offline LLM rerank of the candidate shortlist. Off by
32    /// default so the hot hook path stays no-LLM; only set it in latency-tolerant
33    /// callers ("deep recall"). No-op unless a reranker was injected (LLM configured),
34    /// and non-fatal: a reranker error falls back to the fused order.
35    pub rerank: bool,
36}
37
38impl KnowledgeBase {
39    pub fn recall(&self, params: RecallParams<'_>) -> Result<RecallResult> {
40        let RecallParams {
41            query,
42            budget,
43            trace,
44            include_sparks,
45            top,
46            source,
47            expand_deps,
48            allow_trim,
49            refine_mode,
50            min_score,
51            session_only,
52            rerank,
53        } = params;
54        let expand_deps = if expand_deps.is_empty() {
55            "false"
56        } else {
57            expand_deps
58        };
59        let refine_mode = if refine_mode.is_empty() {
60            "off"
61        } else {
62            refine_mode
63        };
64        validate_source(source)?;
65        let trace_id = gen_uuid();
66        let now = utc_now_iso();
67
68        // Calibration path: derive the context_key from a Situation. A bare query degrades
69        // exactly to the legacy `content_hash(normalize_query(query))`, so recall stays
70        // zero-regression while sharing one key derivation with appraise (Spec §2.2).
71        let situation = Situation::from_query(query);
72        let context_key = situation.context_key(&self.situation_coarse_keys);
73
74        // Part (c) — query-embedding granularity. When enabled, anchor the vector
75        // query on the normalized situation signature in addition to the raw words.
76        // The lexical channel still matches the raw query (it indexes chunk text).
77        let embed_query = if self.embed_situation_signature {
78            let sig = situation.coarse_signature(&self.situation_coarse_keys);
79            if signature_has_signal(&sig) {
80                format!("{sig}\n{query}")
81            } else {
82                query.to_string()
83            }
84        } else {
85            query.to_string()
86        };
87
88        let (q_content, q_trigger) = self
89            .embedding
90            .embed_both(&embed_query)
91            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
92
93        // ANN candidates (non-spark) — vector channels + lexical/BM25 (hybrid).
94        let mut candidates = self.ann_candidates(&q_content, &q_trigger, query)?;
95        self.apply_soft_dep_bonus(&mut candidates)?;
96
97        // Score + anti-trigger penalty
98        let mut scored = self.score_candidates(candidates, query, &context_key, &now)?;
99
100        // Part (d) — opt-in offline rerank of the shortlist before packing. Non-fatal:
101        // a reranker error or empty result leaves the fused order untouched.
102        if rerank {
103            self.apply_rerank(query, &mut scored);
104        }
105
106        // Relevance gate — drop sub-threshold candidates before packing/trace so the
107        // trace records only what was actually surfaced (keeps selected→used stats clean).
108        if let Some(min) = min_score {
109            scored.retain(|(fused, _)| *fused >= min);
110        }
111
112        // First-fit pack with dep expansion
113        let (selected, skipped, skipped_reasons) =
114            self.pack(&scored, budget, expand_deps, allow_trim, query)?;
115
116        let depth_skipped: Vec<String> = skipped_reasons
117            .iter()
118            .filter(|(_, r)| r.as_str() == "dep_depth_limit")
119            .map(|(id, _)| id.clone())
120            .collect();
121
122        // Density refill
123        let mut selected = selected;
124        if self.density_refill {
125            selected = self.density_refill(selected, &skipped, budget);
126        }
127
128        let limited = limit_knowledge(selected, top);
129        let visible = if refine_mode == "adapt" {
130            self.refiner
131                .refine(limited.clone(), Some(budget))
132                .unwrap_or(limited)
133        } else {
134            limited
135        };
136
137        // Sparks
138        let sparks = if include_sparks {
139            self.recall_sparks(&q_content, &q_trigger)?
140        } else {
141            vec![]
142        };
143
144        if trace {
145            self.write_recall_trace(
146                &trace_id,
147                query,
148                &context_key,
149                &scored,
150                &visible,
151                &sparks,
152                &depth_skipped,
153                &skipped_reasons,
154                refine_mode,
155                source,
156                &now,
157                session_only,
158            )?;
159        }
160
161        let empty = visible.is_empty() && sparks.is_empty();
162        Ok(RecallResult {
163            knowledge: visible,
164            sparks,
165            trace_id,
166            empty,
167            depth_skipped,
168            skipped_reasons,
169        })
170    }
171
172    pub(super) fn ann_candidates(
173        &self,
174        q_content: &[f32],
175        q_trigger: &[f32],
176        query: &str,
177    ) -> Result<HashMap<String, CandidateInfo>> {
178        let embed_version = self
179            .storage
180            .get_meta("embed_version")?
181            .and_then(|v| v.parse::<i64>().ok())
182            .unwrap_or(1);
183
184        let content_res = self
185            .storage
186            .search_vec_content(q_content, self.top_k_candidates * 2)?;
187        let trigger_res = self
188            .storage
189            .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
190        // Hybrid 检索 — lexical/BM25 channel. Recovers exact-term matches (error
191        // codes, flags, symbol names) that embedding similarity blurs away. Empty
192        // when the query has no usable tokens, so vector-only behaviour is preserved.
193        let lexical_res = self
194            .storage
195            .search_lexical(query, self.top_k_candidates * 2)?;
196
197        // Collect unique ids across all three channels and batch-fetch in one query.
198        let all_ids: Vec<&str> = {
199            let mut seen = HashSet::new();
200            content_res
201                .iter()
202                .chain(trigger_res.iter())
203                .chain(lexical_res.iter())
204                .map(|(id, _)| id.as_str())
205                .filter(|id| seen.insert(*id))
206                .collect()
207        };
208        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
209
210        let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
211        for (cid, sim) in &content_res {
212            if let Some(chunk) = chunks.get(cid) {
213                if chunk_is_valid_for_recall(chunk, embed_version) {
214                    let e = candidates.entry(cid.clone()).or_insert_with(|| new_candidate(chunk));
215                    e.sim_content = e.sim_content.max(*sim);
216                }
217            }
218        }
219        for (cid, sim) in &trigger_res {
220            if let Some(chunk) = chunks.get(cid) {
221                if chunk_is_valid_for_recall(chunk, embed_version) {
222                    let e = candidates.entry(cid.clone()).or_insert_with(|| new_candidate(chunk));
223                    e.sim_trigger = e.sim_trigger.max(*sim);
224                }
225            }
226        }
227        for (cid, sim) in &lexical_res {
228            if let Some(chunk) = chunks.get(cid) {
229                if chunk_is_valid_for_recall(chunk, embed_version) {
230                    let e = candidates.entry(cid.clone()).or_insert_with(|| new_candidate(chunk));
231                    e.sim_lexical = e.sim_lexical.max(*sim);
232                }
233            }
234        }
235        Ok(candidates)
236    }
237
238    pub(super) fn apply_soft_dep_bonus(
239        &self,
240        candidates: &mut HashMap<String, CandidateInfo>,
241    ) -> Result<()> {
242        // Collect non-spark candidate ids and batch-fetch their outgoing deps
243        // in a single query (was one get_deps per candidate).
244        let src_ids: Vec<String> = candidates
245            .iter()
246            .filter(|(_, info)| info.chunk.get("origin").and_then(Value::as_str) != Some("spark"))
247            .map(|(cid, _)| cid.clone())
248            .collect();
249        if src_ids.is_empty() {
250            return Ok(());
251        }
252        let src_refs: Vec<&str> = src_ids.iter().map(String::as_str).collect();
253        let deps_map = self.storage.get_deps_batch(&src_refs)?;
254
255        // Gather distinct soft-dep targets and batch-fetch them in one query
256        // (was one get_chunk per soft edge).
257        let mut target_ids: Vec<String> = Vec::new();
258        let mut seen: HashSet<String> = HashSet::new();
259        for deps in deps_map.values() {
260            for (dst, kind, _) in deps {
261                if kind == "soft" && seen.insert(dst.clone()) {
262                    target_ids.push(dst.clone());
263                }
264            }
265        }
266        if target_ids.is_empty() {
267            return Ok(());
268        }
269        let target_refs: Vec<&str> = target_ids.iter().map(String::as_str).collect();
270        let targets = self.storage.get_chunks_by_ids(&target_refs)?;
271
272        for src in &src_ids {
273            let Some(deps) = deps_map.get(src) else {
274                continue;
275            };
276            for (dst, kind, _) in deps {
277                if kind != "soft" {
278                    continue;
279                }
280                let Some(target) = targets.get(dst) else {
281                    continue;
282                };
283                if target.get("state").and_then(Value::as_str) == Some("archived") {
284                    continue;
285                }
286                if target.get("origin").and_then(Value::as_str) == Some("spark") {
287                    continue;
288                }
289                let e = candidates
290                    .entry(dst.clone())
291                    .or_insert_with(|| new_candidate(target));
292                e.sim_content = (e.sim_content + 0.05).min(1.0);
293            }
294        }
295        Ok(())
296    }
297
298    fn score_candidates(
299        &self,
300        candidates: HashMap<String, CandidateInfo>,
301        query: &str,
302        context_key: &str,
303        now: &str,
304    ) -> Result<Vec<(f64, Value)>> {
305        // Batch-fetch context scores for all candidates in one query
306        // (was one context_score lookup per candidate).
307        let cand_ids: Vec<String> = candidates
308            .values()
309            .filter_map(|info| {
310                info.chunk
311                    .get("id")
312                    .and_then(Value::as_str)
313                    .map(str::to_string)
314            })
315            .collect();
316        let cand_refs: Vec<&str> = cand_ids.iter().map(String::as_str).collect();
317        // 方案 D 与 recall 解耦:recall 恒用中性 Laplace 先验,不读 intuition.* 旋钮。
318        let ctx_scores = self.storage.context_scores_batch(
319            &cand_refs,
320            context_key,
321            RECALL_PRIOR_M,
322            RECALL_BASE_RATE,
323        )?;
324
325        let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
326        for info in candidates.into_values() {
327            let conf = info
328                .chunk
329                .get("confidence")
330                .and_then(Value::as_f64)
331                .unwrap_or(0.5);
332            let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
333            let context_score = ctx_scores.get(chunk_id).copied().unwrap_or(0.0);
334            // ACT-R base-level activation: recency × frequency from usage history.
335            // Zero for never-used chunks, so freshly-added knowledge is unaffected.
336            let used_count = info
337                .chunk
338                .get("used_count")
339                .and_then(Value::as_i64)
340                .unwrap_or(0);
341            let last_used_at = info.chunk.get("last_used_at").and_then(Value::as_str);
342            let activation = actr_activation(used_count, last_used_at, now);
343            let mut fused = self.w_content * info.sim_content as f64
344                + self.w_trigger * info.sim_trigger as f64
345                + self.w_lexical * info.sim_lexical as f64
346                + self.w_confidence * conf
347                + self.w_context * context_score
348                + self.w_activation * activation;
349            if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
350                fused *= PENDING_RECALL_PENALTY;
351            }
352            let anti = info
353                .chunk
354                .get("anti_trigger_desc")
355                .and_then(Value::as_str)
356                .unwrap_or("");
357            if !anti.is_empty() && anti_trigger_hit(query, anti) {
358                fused *= self.anti_trigger_penalty;
359            }
360            let mut chunk = info.chunk;
361            chunk["_context_score"] = json!(context_score);
362            chunk["_activation"] = json!(activation);
363            chunk["_sim_lexical"] = json!(info.sim_lexical);
364            chunk["_fused_score"] = json!(fused);
365            scored.push((fused, chunk));
366        }
367        scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
368        scored.truncate(self.top_k_candidates);
369        Ok(scored)
370    }
371
372    /// Part (d) — reorder the scored shortlist by the injected reranker. Stable:
373    /// reranked ids move to the front in the reranker's order; anything the reranker
374    /// omits keeps its fused-relative position. Non-fatal — a reranker error or empty
375    /// result leaves `scored` untouched, so retrieval never depends on the LLM.
376    fn apply_rerank(&self, query: &str, scored: &mut [(f64, Value)]) {
377        let chunks: Vec<Value> = scored.iter().map(|(_, c)| c.clone()).collect();
378        let order = match self.reranker.rerank(query, &chunks) {
379            Ok(order) if !order.is_empty() => order,
380            _ => return,
381        };
382        let rank: HashMap<&str, usize> = order
383            .iter()
384            .enumerate()
385            .map(|(i, id)| (id.as_str(), i))
386            .collect();
387        scored.sort_by_key(|(_, c)| {
388            rank.get(c["id"].as_str().unwrap_or(""))
389                .copied()
390                .unwrap_or(usize::MAX)
391        });
392    }
393
394    fn pack(
395        &self,
396        scored: &[(f64, Value)],
397        budget: usize,
398        expand_deps: &str,
399        allow_trim: bool,
400        query: &str,
401    ) -> Result<PackResult> {
402        let mut selected: Vec<Value> = vec![];
403        let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
404        let mut skipped_reasons: HashMap<String, String> = HashMap::new();
405        let mut used_ids: HashSet<String> = HashSet::new();
406        let mut used_tokens: usize = 0;
407
408        for (fused, chunk) in scored {
409            let cid = chunk["id"].as_str().unwrap_or("").to_string();
410            if used_ids.contains(&cid) {
411                continue;
412            }
413
414            // Build block with dep expansion; fail-closed on dep issues.
415            let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
416            if let Some(reason) = dep_skip_reason {
417                skipped_reasons.insert(cid, reason);
418                continue;
419            }
420
421            let new_block: Vec<Value> = block
422                .iter()
423                .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
424                .cloned()
425                .collect();
426            let cost = block_cost(&new_block);
427
428            if used_tokens + cost <= budget {
429                for b in &block {
430                    let bid = b["id"].as_str().unwrap_or("").to_string();
431                    if !used_ids.contains(&bid) {
432                        let mut b = b.clone();
433                        b["_fused_score"] = json!(fused);
434                        selected.push(b);
435                        used_ids.insert(bid);
436                    }
437                }
438                used_tokens += cost;
439            } else if allow_trim {
440                // Attempt refiner trim — NullRefiner returns None (no-op).
441                if let Some(trimmed) =
442                    self.refiner
443                        .trim(&block, query, budget.saturating_sub(used_tokens))
444                {
445                    let trim_cost = block_cost(&trimmed);
446                    if used_tokens + trim_cost <= budget {
447                        for b in &trimmed {
448                            let bid = b["id"].as_str().unwrap_or("").to_string();
449                            if !used_ids.contains(&bid) {
450                                let mut b = b.clone();
451                                b["_fused_score"] = json!(fused);
452                                b["_trimmed"] = json!(true);
453                                selected.push(b);
454                                used_ids.insert(bid);
455                            }
456                        }
457                        used_tokens += trim_cost;
458                        continue;
459                    }
460                }
461                skipped.push((block, *fused, cost));
462            } else {
463                skipped.push((block, *fused, cost));
464            }
465        }
466        Ok((selected, skipped, skipped_reasons))
467    }
468
469    /// Expand a seed chunk into a block according to `expand_deps`.
470    /// Returns `(block, Some(skip_reason))` if the block should be discarded (fail-closed).
471    fn build_dep_block(
472        &self,
473        seed: &Value,
474        expand_deps: &str,
475    ) -> Result<(Vec<Value>, Option<String>)> {
476        if expand_deps == "false" || expand_deps.is_empty() {
477            return Ok((vec![seed.clone()], None));
478        }
479        let seed_id = seed["id"].as_str().unwrap_or("");
480        match expand_deps {
481            "direct" => {
482                let deps = self.storage.get_deps(seed_id)?;
483                let mut block = vec![seed.clone()];
484                for (dep_id, kind, _) in &deps {
485                    if kind != "hard" {
486                        continue;
487                    }
488                    match self.validate_hard_dep(dep_id)? {
489                        Some(chunk) => block.push(chunk),
490                        None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
491                    }
492                }
493                Ok((block, None))
494            }
495            "closure" => {
496                let mut block = vec![seed.clone()];
497                let mut visited: HashSet<String> = [seed_id.to_string()].into();
498                match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
499                    Some(reason) => Ok((vec![], Some(reason))),
500                    None => Ok((block, None)),
501                }
502            }
503            _ => Ok((vec![seed.clone()], None)),
504        }
505    }
506
507    /// Returns the chunk if the hard dep is usable, None if it should cause fail-closed.
508    fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
509        match self.storage.get_chunk(dep_id)? {
510            None => Ok(None),
511            Some(chunk) => {
512                let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
513                let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
514                let embed_v = chunk
515                    .get("embed_version")
516                    .and_then(Value::as_i64)
517                    .unwrap_or(0);
518                if state == "archived" || origin == "spark" || embed_v == 0 {
519                    Ok(None)
520                } else {
521                    Ok(Some(chunk))
522                }
523            }
524        }
525    }
526
527    /// BFS hard-dep expansion up to `max_depth`. Returns Some(reason) on fail-closed.
528    fn expand_hard_closure(
529        &self,
530        id: &str,
531        visited: &mut HashSet<String>,
532        block: &mut Vec<Value>,
533        depth: usize,
534        max_depth: usize,
535    ) -> Result<Option<String>> {
536        if depth >= max_depth {
537            return Ok(Some("dep_depth_limit".to_string()));
538        }
539        let deps = self.storage.get_deps(id)?;
540        for (dep_id, kind, _) in &deps {
541            if kind != "hard" {
542                continue;
543            }
544            if visited.contains(dep_id) {
545                continue;
546            } // cycle guard
547            visited.insert(dep_id.clone());
548            match self.validate_hard_dep(dep_id)? {
549                None => return Ok(Some("hard_dep_unavailable".to_string())),
550                Some(chunk) => {
551                    block.push(chunk);
552                    if let Some(reason) =
553                        self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
554                    {
555                        return Ok(Some(reason));
556                    }
557                }
558            }
559        }
560        Ok(None)
561    }
562
563    fn density_refill(
564        &self,
565        mut selected: Vec<Value>,
566        skipped: &[(Vec<Value>, f64, usize)],
567        budget: usize,
568    ) -> Vec<Value> {
569        let used_tokens = block_cost(&selected);
570        if used_tokens >= budget {
571            return selected;
572        }
573
574        let selected_ids: HashSet<String> = selected
575            .iter()
576            .filter_map(|c| c["id"].as_str().map(str::to_string))
577            .collect();
578
579        let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
580            .iter()
581            .filter_map(|(block, fscore, _)| {
582                let block: Vec<Value> = block
583                    .iter()
584                    .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
585                    .cloned()
586                    .collect();
587                if block.is_empty() {
588                    return None;
589                }
590                let cost = block_cost(&block);
591                let density = fscore / cost.max(1) as f64;
592                Some((density, block, cost))
593            })
594            .collect();
595        density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
596
597        let mut used_tokens = block_cost(&selected);
598        let mut added_ids: HashSet<String> = selected_ids;
599        for (_, block, cost) in density_items {
600            if used_tokens + cost <= budget {
601                for b in block {
602                    let bid = b["id"].as_str().unwrap_or("").to_string();
603                    if !added_ids.contains(&bid) {
604                        selected.push(b);
605                        added_ids.insert(bid);
606                    }
607                }
608                used_tokens += cost;
609            }
610        }
611        selected
612    }
613
614    fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
615        let embed_version = self
616            .storage
617            .get_meta("embed_version")?
618            .and_then(|v| v.parse::<i64>().ok())
619            .unwrap_or(1);
620
621        let content_res = self
622            .storage
623            .search_vec_content(q_content, self.top_k_candidates)?;
624        let trigger_res = self
625            .storage
626            .search_vec_trigger(q_trigger, self.top_k_candidates)?;
627
628        // Batch-fetch all candidate chunk IDs (mirrors the pattern in ann_candidates).
629        let all_ids: Vec<&str> = {
630            let mut seen = HashSet::new();
631            content_res
632                .iter()
633                .chain(trigger_res.iter())
634                .map(|(id, _)| id.as_str())
635                .filter(|id| seen.insert(*id))
636                .collect()
637        };
638        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
639
640        let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
641        for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
642            if let Some(chunk) = chunks.get(cid) {
643                if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
644                    continue;
645                }
646                if chunk.get("state").and_then(Value::as_str) == Some("archived") {
647                    continue;
648                }
649                let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
650                if maturity == "promoted" || maturity == "dropped" {
651                    continue;
652                }
653                let ev = chunk
654                    .get("embed_version")
655                    .and_then(Value::as_i64)
656                    .unwrap_or(1);
657                if ev < embed_version {
658                    continue;
659                }
660                let entry = spark_scores
661                    .entry(cid.clone())
662                    .or_insert_with(|| (*sim, chunk.clone()));
663                if *sim > entry.0 {
664                    *entry = (*sim, chunk.clone());
665                }
666            }
667        }
668        let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
669        sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
670        Ok(sparks
671            .into_iter()
672            .take(self.top_k_candidates)
673            .map(|(_, c)| c)
674            .collect())
675    }
676
677    #[allow(clippy::too_many_arguments)]
678    fn write_recall_trace(
679        &self,
680        trace_id: &str,
681        query: &str,
682        context_key: &str,
683        scored: &[(f64, Value)],
684        visible: &[Value],
685        sparks: &[Value],
686        depth_skipped: &[String],
687        skipped_reasons: &HashMap<String, String>,
688        refine_mode: &str,
689        source: &str,
690        now: &str,
691        session_only: bool,
692    ) -> Result<()> {
693        let lib_id = self.storage.lib_id()?;
694        // `selected` must strictly mean "entered the model context". Record the
695        // per-chunk retrieved/selected/refined events only when the result is
696        // actually surfaced: skip them for empty results (nothing surfaced) and
697        // for session-only recalls (daemon discards the knowledge). The episodic
698        // log is still written in both cases — an empty result as a terminal
699        // `known_none`/`discarded` row (no-answer telemetry, never `open`), a
700        // session recall as an `open` row for later record-correlation.
701        let is_empty = visible.is_empty() && sparks.is_empty();
702        let record_selection = !is_empty && !session_only;
703        self.storage.begin_immediate()?;
704        let result = (|| -> Result<()> {
705            if record_selection {
706                for (rank, (_, chunk)) in scored.iter().enumerate() {
707                    let cid = chunk["id"].as_str().unwrap_or("");
708                    let sim = chunk.get("_fused_score").and_then(Value::as_f64);
709                    // For dep-skipped seeds, record their skip reason as refine_mode.
710                    let rm = skipped_reasons
711                        .get(cid)
712                        .map(|r| format!("skipped:{r}"))
713                        .or_else(|| {
714                            if refine_mode != "off" && !refine_mode.is_empty() {
715                                Some(refine_mode.to_string())
716                            } else {
717                                None
718                            }
719                        });
720                    self.storage.insert_usage_trace(
721                        trace_id,
722                        Some(cid),
723                        "retrieved",
724                        1.0,
725                        sim,
726                        rm.as_deref(),
727                        None,
728                        Some((rank + 1) as i64),
729                        None,
730                        source,
731                        now,
732                    )?;
733                }
734                for (rank, chunk) in visible.iter().enumerate() {
735                    let cid = chunk["id"].as_str().unwrap_or("");
736                    self.storage.insert_usage_trace(
737                        trace_id,
738                        Some(cid),
739                        "selected",
740                        1.0,
741                        None,
742                        None,
743                        None,
744                        Some((rank + 1) as i64),
745                        None,
746                        source,
747                        now,
748                    )?;
749                    // Write 'refined' event for chunks that came through the trim path.
750                    if chunk
751                        .get("_trimmed")
752                        .and_then(Value::as_bool)
753                        .unwrap_or(false)
754                    {
755                        self.storage.insert_usage_trace(
756                            trace_id,
757                            Some(cid),
758                            "refined",
759                            1.0,
760                            None,
761                            Some("trim"),
762                            None,
763                            Some((rank + 1) as i64),
764                            None,
765                            source,
766                            now,
767                        )?;
768                    }
769                }
770                // Write 'retrieved' events for sparks (for recurring-spark count tracking).
771                for (rank, chunk) in sparks.iter().enumerate() {
772                    let cid = chunk["id"].as_str().unwrap_or("");
773                    self.storage.insert_usage_trace(
774                        trace_id,
775                        Some(cid),
776                        "retrieved",
777                        1.0,
778                        None,
779                        Some("spark"),
780                        None,
781                        Some((rank + 1) as i64),
782                        None,
783                        source,
784                        now,
785                    )?;
786                }
787            }
788            // The snapshot mirrors what was surfaced: empty for known_none and
789            // session-only recalls so no chunk is credited with a selection.
790            let snapshot = json!({
791                "retrieved": if record_selection { scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
792                "selected": if record_selection { visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
793                "sparks": if record_selection { sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
794                "depth_skipped": depth_skipped,
795                "skipped_reasons": skipped_reasons,
796                "session_only": session_only,
797            });
798            // Empty recall → terminal known_none/discarded (kept out of the `open`
799            // pool that feeds trace-completion stats). Otherwise `open` for the
800            // record() outcome transition (incl. session-only daemon traces).
801            // `usage_state='known_none'` is the no-answer signal; `task_state`
802            // stays 'recalled' (both bounded by schema CHECK constraints).
803            let (usage_state, distill_state) = if is_empty {
804                ("known_none", "discarded")
805            } else {
806                ("unknown", "open")
807            };
808            let log = EpisodicLogRow {
809                id: gen_uuid(),
810                trace_id: trace_id.to_string(),
811                lib_id,
812                ts: now.to_string(),
813                query: Some(query.to_string()),
814                recall_snapshot: Some(snapshot.to_string()),
815                event_source: source.to_string(),
816                agent: agent_source(),
817                task_state: "recalled".to_string(),
818                usage_state: usage_state.to_string(),
819                context_key: Some(context_key.to_string()),
820                distill_state: distill_state.to_string(),
821                ..Default::default()
822            };
823            self.storage.upsert_episodic_log(&log)?;
824            self.storage.commit()
825        })();
826        if result.is_err() {
827            let _ = self.storage.rollback();
828        }
829        result
830    }
831}