Skip to main content

innate_core/kb/
recall.rs

1use super::*;
2
3/// Parameters for [`KnowledgeBase::recall`].
4///
5/// Borrowed, `Default`-able: construct with `RecallParams { query, budget, source, ..Default::default() }`.
6/// Empty-string defaults are normalized inside `recall`: `expand_deps` empty → `"false"`,
7/// `refine_mode` empty → `"off"`.
8#[derive(Debug, Clone, Default)]
9pub struct RecallParams<'a> {
10    pub query: &'a str,
11    pub budget: usize,
12    pub trace: bool,
13    pub include_sparks: bool,
14    pub top: Option<usize>,
15    pub source: &'a str,
16    pub expand_deps: &'a str, // "false" | "direct" | "closure"
17    pub allow_trim: bool,     // if true, invoke Refiner::trim when block doesn't fit
18    pub refine_mode: &'a str, // "off" | "trim" | "adapt" — recorded in trace
19    /// Relevance gate: drop candidates whose fused score is below this value
20    /// **before** packing/trace, so the trace only records knowledge that was
21    /// actually surfaced. `None` disables the gate. Used by always-on hooks
22    /// (UserPromptSubmit / SessionStart) to stay high-frequency without noise.
23    pub min_score: Option<f64>,
24    /// Session trace mode: open an episodic log for later record-correlation but
25    /// write **no** per-chunk `retrieved`/`selected` usage events. Used by the
26    /// daemon, which recalls only to obtain a `trace_id` and discards the
27    /// knowledge without ever placing it in a model context. `selected` must
28    /// strictly mean "entered the model context", so a caller that does not
29    /// inject the result must set this. Defaults to `false` (full injection).
30    pub session_only: bool,
31}
32
33impl KnowledgeBase {
34    pub fn recall(&self, params: RecallParams<'_>) -> Result<RecallResult> {
35        let RecallParams {
36            query,
37            budget,
38            trace,
39            include_sparks,
40            top,
41            source,
42            expand_deps,
43            allow_trim,
44            refine_mode,
45            min_score,
46            session_only,
47        } = params;
48        let expand_deps = if expand_deps.is_empty() {
49            "false"
50        } else {
51            expand_deps
52        };
53        let refine_mode = if refine_mode.is_empty() {
54            "off"
55        } else {
56            refine_mode
57        };
58        validate_source(source)?;
59        let trace_id = gen_uuid();
60        let now = utc_now_iso();
61
62        // Calibration path: derive the context_key from a Situation. A bare query degrades
63        // exactly to the legacy `content_hash(normalize_query(query))`, so recall stays
64        // zero-regression while sharing one key derivation with appraise (Spec §2.2).
65        let situation = Situation::from_query(query);
66        let context_key = situation.context_key(&self.situation_coarse_keys);
67
68        let (q_content, q_trigger) = self
69            .embedding
70            .embed_both(query)
71            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
72
73        // ANN candidates (non-spark)
74        let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
75        self.apply_soft_dep_bonus(&mut candidates)?;
76
77        // Score + anti-trigger penalty
78        let mut scored = self.score_candidates(candidates, query, &context_key, &now)?;
79
80        // Relevance gate — drop sub-threshold candidates before packing/trace so the
81        // trace records only what was actually surfaced (keeps selected→used stats clean).
82        if let Some(min) = min_score {
83            scored.retain(|(fused, _)| *fused >= min);
84        }
85
86        // First-fit pack with dep expansion
87        let (selected, skipped, skipped_reasons) =
88            self.pack(&scored, budget, expand_deps, allow_trim, query)?;
89
90        let depth_skipped: Vec<String> = skipped_reasons
91            .iter()
92            .filter(|(_, r)| r.as_str() == "dep_depth_limit")
93            .map(|(id, _)| id.clone())
94            .collect();
95
96        // Density refill
97        let mut selected = selected;
98        if self.density_refill {
99            selected = self.density_refill(selected, &skipped, budget);
100        }
101
102        let limited = limit_knowledge(selected, top);
103        let visible = if refine_mode == "adapt" {
104            self.refiner
105                .refine(limited.clone(), Some(budget))
106                .unwrap_or(limited)
107        } else {
108            limited
109        };
110
111        // Sparks
112        let sparks = if include_sparks {
113            self.recall_sparks(&q_content, &q_trigger)?
114        } else {
115            vec![]
116        };
117
118        if trace {
119            self.write_recall_trace(
120                &trace_id,
121                query,
122                &context_key,
123                &scored,
124                &visible,
125                &sparks,
126                &depth_skipped,
127                &skipped_reasons,
128                refine_mode,
129                source,
130                &now,
131                session_only,
132            )?;
133        }
134
135        let empty = visible.is_empty() && sparks.is_empty();
136        Ok(RecallResult {
137            knowledge: visible,
138            sparks,
139            trace_id,
140            empty,
141            depth_skipped,
142            skipped_reasons,
143        })
144    }
145
146    pub(super) fn ann_candidates(
147        &self,
148        q_content: &[f32],
149        q_trigger: &[f32],
150    ) -> Result<HashMap<String, CandidateInfo>> {
151        let embed_version = self
152            .storage
153            .get_meta("embed_version")?
154            .and_then(|v| v.parse::<i64>().ok())
155            .unwrap_or(1);
156
157        let content_res = self
158            .storage
159            .search_vec_content(q_content, self.top_k_candidates * 2)?;
160        let trigger_res = self
161            .storage
162            .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
163
164        // Collect unique ids and batch-fetch all chunks in two queries instead of N individual ones.
165        let all_ids: Vec<&str> = {
166            let mut seen = HashSet::new();
167            content_res
168                .iter()
169                .chain(trigger_res.iter())
170                .map(|(id, _)| id.as_str())
171                .filter(|id| seen.insert(*id))
172                .collect()
173        };
174        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
175
176        let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
177
178        for (cid, sim) in &content_res {
179            if let Some(chunk) = chunks.get(cid) {
180                if chunk_is_valid_for_recall(chunk, embed_version) {
181                    let e = candidates
182                        .entry(cid.clone())
183                        .or_insert_with(|| CandidateInfo {
184                            chunk: chunk.clone(),
185                            sim_content: 0.0,
186                            sim_trigger: 0.0,
187                        });
188                    e.sim_content = e.sim_content.max(*sim);
189                }
190            }
191        }
192        for (cid, sim) in &trigger_res {
193            if let Some(chunk) = chunks.get(cid) {
194                if chunk_is_valid_for_recall(chunk, embed_version) {
195                    let e = candidates
196                        .entry(cid.clone())
197                        .or_insert_with(|| CandidateInfo {
198                            chunk: chunk.clone(),
199                            sim_content: 0.0,
200                            sim_trigger: 0.0,
201                        });
202                    e.sim_trigger = e.sim_trigger.max(*sim);
203                }
204            }
205        }
206        Ok(candidates)
207    }
208
209    pub(super) fn apply_soft_dep_bonus(
210        &self,
211        candidates: &mut HashMap<String, CandidateInfo>,
212    ) -> Result<()> {
213        // Collect non-spark candidate ids and batch-fetch their outgoing deps
214        // in a single query (was one get_deps per candidate).
215        let src_ids: Vec<String> = candidates
216            .iter()
217            .filter(|(_, info)| info.chunk.get("origin").and_then(Value::as_str) != Some("spark"))
218            .map(|(cid, _)| cid.clone())
219            .collect();
220        if src_ids.is_empty() {
221            return Ok(());
222        }
223        let src_refs: Vec<&str> = src_ids.iter().map(String::as_str).collect();
224        let deps_map = self.storage.get_deps_batch(&src_refs)?;
225
226        // Gather distinct soft-dep targets and batch-fetch them in one query
227        // (was one get_chunk per soft edge).
228        let mut target_ids: Vec<String> = Vec::new();
229        let mut seen: HashSet<String> = HashSet::new();
230        for deps in deps_map.values() {
231            for (dst, kind, _) in deps {
232                if kind == "soft" && seen.insert(dst.clone()) {
233                    target_ids.push(dst.clone());
234                }
235            }
236        }
237        if target_ids.is_empty() {
238            return Ok(());
239        }
240        let target_refs: Vec<&str> = target_ids.iter().map(String::as_str).collect();
241        let targets = self.storage.get_chunks_by_ids(&target_refs)?;
242
243        for src in &src_ids {
244            let Some(deps) = deps_map.get(src) else {
245                continue;
246            };
247            for (dst, kind, _) in deps {
248                if kind != "soft" {
249                    continue;
250                }
251                let Some(target) = targets.get(dst) else {
252                    continue;
253                };
254                if target.get("state").and_then(Value::as_str) == Some("archived") {
255                    continue;
256                }
257                if target.get("origin").and_then(Value::as_str) == Some("spark") {
258                    continue;
259                }
260                let e = candidates
261                    .entry(dst.clone())
262                    .or_insert_with(|| CandidateInfo {
263                        chunk: target.clone(),
264                        sim_content: 0.0,
265                        sim_trigger: 0.0,
266                    });
267                e.sim_content = (e.sim_content + 0.05).min(1.0);
268            }
269        }
270        Ok(())
271    }
272
273    fn score_candidates(
274        &self,
275        candidates: HashMap<String, CandidateInfo>,
276        query: &str,
277        context_key: &str,
278        now: &str,
279    ) -> Result<Vec<(f64, Value)>> {
280        // Batch-fetch context scores for all candidates in one query
281        // (was one context_score lookup per candidate).
282        let cand_ids: Vec<String> = candidates
283            .values()
284            .filter_map(|info| {
285                info.chunk
286                    .get("id")
287                    .and_then(Value::as_str)
288                    .map(str::to_string)
289            })
290            .collect();
291        let cand_refs: Vec<&str> = cand_ids.iter().map(String::as_str).collect();
292        // 方案 D 与 recall 解耦:recall 恒用中性 Laplace 先验,不读 intuition.* 旋钮。
293        let ctx_scores = self.storage.context_scores_batch(
294            &cand_refs,
295            context_key,
296            RECALL_PRIOR_M,
297            RECALL_BASE_RATE,
298        )?;
299
300        let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
301        for info in candidates.into_values() {
302            let conf = info
303                .chunk
304                .get("confidence")
305                .and_then(Value::as_f64)
306                .unwrap_or(0.5);
307            let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
308            let context_score = ctx_scores.get(chunk_id).copied().unwrap_or(0.0);
309            // ACT-R base-level activation: recency × frequency from usage history.
310            // Zero for never-used chunks, so freshly-added knowledge is unaffected.
311            let used_count = info
312                .chunk
313                .get("used_count")
314                .and_then(Value::as_i64)
315                .unwrap_or(0);
316            let last_used_at = info.chunk.get("last_used_at").and_then(Value::as_str);
317            let activation = actr_activation(used_count, last_used_at, now);
318            let mut fused = self.w_content * info.sim_content as f64
319                + self.w_trigger * info.sim_trigger as f64
320                + self.w_confidence * conf
321                + self.w_context * context_score
322                + self.w_activation * activation;
323            if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
324                fused *= PENDING_RECALL_PENALTY;
325            }
326            let anti = info
327                .chunk
328                .get("anti_trigger_desc")
329                .and_then(Value::as_str)
330                .unwrap_or("");
331            if !anti.is_empty() && anti_trigger_hit(query, anti) {
332                fused *= self.anti_trigger_penalty;
333            }
334            let mut chunk = info.chunk;
335            chunk["_context_score"] = json!(context_score);
336            chunk["_activation"] = json!(activation);
337            chunk["_fused_score"] = json!(fused);
338            scored.push((fused, chunk));
339        }
340        scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
341        scored.truncate(self.top_k_candidates);
342        Ok(scored)
343    }
344
345    fn pack(
346        &self,
347        scored: &[(f64, Value)],
348        budget: usize,
349        expand_deps: &str,
350        allow_trim: bool,
351        query: &str,
352    ) -> Result<PackResult> {
353        let mut selected: Vec<Value> = vec![];
354        let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
355        let mut skipped_reasons: HashMap<String, String> = HashMap::new();
356        let mut used_ids: HashSet<String> = HashSet::new();
357        let mut used_tokens: usize = 0;
358
359        for (fused, chunk) in scored {
360            let cid = chunk["id"].as_str().unwrap_or("").to_string();
361            if used_ids.contains(&cid) {
362                continue;
363            }
364
365            // Build block with dep expansion; fail-closed on dep issues.
366            let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
367            if let Some(reason) = dep_skip_reason {
368                skipped_reasons.insert(cid, reason);
369                continue;
370            }
371
372            let new_block: Vec<Value> = block
373                .iter()
374                .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
375                .cloned()
376                .collect();
377            let cost = block_cost(&new_block);
378
379            if used_tokens + cost <= budget {
380                for b in &block {
381                    let bid = b["id"].as_str().unwrap_or("").to_string();
382                    if !used_ids.contains(&bid) {
383                        let mut b = b.clone();
384                        b["_fused_score"] = json!(fused);
385                        selected.push(b);
386                        used_ids.insert(bid);
387                    }
388                }
389                used_tokens += cost;
390            } else if allow_trim {
391                // Attempt refiner trim — NullRefiner returns None (no-op).
392                if let Some(trimmed) =
393                    self.refiner
394                        .trim(&block, query, budget.saturating_sub(used_tokens))
395                {
396                    let trim_cost = block_cost(&trimmed);
397                    if used_tokens + trim_cost <= budget {
398                        for b in &trimmed {
399                            let bid = b["id"].as_str().unwrap_or("").to_string();
400                            if !used_ids.contains(&bid) {
401                                let mut b = b.clone();
402                                b["_fused_score"] = json!(fused);
403                                b["_trimmed"] = json!(true);
404                                selected.push(b);
405                                used_ids.insert(bid);
406                            }
407                        }
408                        used_tokens += trim_cost;
409                        continue;
410                    }
411                }
412                skipped.push((block, *fused, cost));
413            } else {
414                skipped.push((block, *fused, cost));
415            }
416        }
417        Ok((selected, skipped, skipped_reasons))
418    }
419
420    /// Expand a seed chunk into a block according to `expand_deps`.
421    /// Returns `(block, Some(skip_reason))` if the block should be discarded (fail-closed).
422    fn build_dep_block(
423        &self,
424        seed: &Value,
425        expand_deps: &str,
426    ) -> Result<(Vec<Value>, Option<String>)> {
427        if expand_deps == "false" || expand_deps.is_empty() {
428            return Ok((vec![seed.clone()], None));
429        }
430        let seed_id = seed["id"].as_str().unwrap_or("");
431        match expand_deps {
432            "direct" => {
433                let deps = self.storage.get_deps(seed_id)?;
434                let mut block = vec![seed.clone()];
435                for (dep_id, kind, _) in &deps {
436                    if kind != "hard" {
437                        continue;
438                    }
439                    match self.validate_hard_dep(dep_id)? {
440                        Some(chunk) => block.push(chunk),
441                        None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
442                    }
443                }
444                Ok((block, None))
445            }
446            "closure" => {
447                let mut block = vec![seed.clone()];
448                let mut visited: HashSet<String> = [seed_id.to_string()].into();
449                match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
450                    Some(reason) => Ok((vec![], Some(reason))),
451                    None => Ok((block, None)),
452                }
453            }
454            _ => Ok((vec![seed.clone()], None)),
455        }
456    }
457
458    /// Returns the chunk if the hard dep is usable, None if it should cause fail-closed.
459    fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
460        match self.storage.get_chunk(dep_id)? {
461            None => Ok(None),
462            Some(chunk) => {
463                let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
464                let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
465                let embed_v = chunk
466                    .get("embed_version")
467                    .and_then(Value::as_i64)
468                    .unwrap_or(0);
469                if state == "archived" || origin == "spark" || embed_v == 0 {
470                    Ok(None)
471                } else {
472                    Ok(Some(chunk))
473                }
474            }
475        }
476    }
477
478    /// BFS hard-dep expansion up to `max_depth`. Returns Some(reason) on fail-closed.
479    fn expand_hard_closure(
480        &self,
481        id: &str,
482        visited: &mut HashSet<String>,
483        block: &mut Vec<Value>,
484        depth: usize,
485        max_depth: usize,
486    ) -> Result<Option<String>> {
487        if depth >= max_depth {
488            return Ok(Some("dep_depth_limit".to_string()));
489        }
490        let deps = self.storage.get_deps(id)?;
491        for (dep_id, kind, _) in &deps {
492            if kind != "hard" {
493                continue;
494            }
495            if visited.contains(dep_id) {
496                continue;
497            } // cycle guard
498            visited.insert(dep_id.clone());
499            match self.validate_hard_dep(dep_id)? {
500                None => return Ok(Some("hard_dep_unavailable".to_string())),
501                Some(chunk) => {
502                    block.push(chunk);
503                    if let Some(reason) =
504                        self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
505                    {
506                        return Ok(Some(reason));
507                    }
508                }
509            }
510        }
511        Ok(None)
512    }
513
514    fn density_refill(
515        &self,
516        mut selected: Vec<Value>,
517        skipped: &[(Vec<Value>, f64, usize)],
518        budget: usize,
519    ) -> Vec<Value> {
520        let used_tokens = block_cost(&selected);
521        if used_tokens >= budget {
522            return selected;
523        }
524
525        let selected_ids: HashSet<String> = selected
526            .iter()
527            .filter_map(|c| c["id"].as_str().map(str::to_string))
528            .collect();
529
530        let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
531            .iter()
532            .filter_map(|(block, fscore, _)| {
533                let block: Vec<Value> = block
534                    .iter()
535                    .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
536                    .cloned()
537                    .collect();
538                if block.is_empty() {
539                    return None;
540                }
541                let cost = block_cost(&block);
542                let density = fscore / cost.max(1) as f64;
543                Some((density, block, cost))
544            })
545            .collect();
546        density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
547
548        let mut used_tokens = block_cost(&selected);
549        let mut added_ids: HashSet<String> = selected_ids;
550        for (_, block, cost) in density_items {
551            if used_tokens + cost <= budget {
552                for b in block {
553                    let bid = b["id"].as_str().unwrap_or("").to_string();
554                    if !added_ids.contains(&bid) {
555                        selected.push(b);
556                        added_ids.insert(bid);
557                    }
558                }
559                used_tokens += cost;
560            }
561        }
562        selected
563    }
564
565    fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
566        let embed_version = self
567            .storage
568            .get_meta("embed_version")?
569            .and_then(|v| v.parse::<i64>().ok())
570            .unwrap_or(1);
571
572        let content_res = self
573            .storage
574            .search_vec_content(q_content, self.top_k_candidates)?;
575        let trigger_res = self
576            .storage
577            .search_vec_trigger(q_trigger, self.top_k_candidates)?;
578
579        // Batch-fetch all candidate chunk IDs (mirrors the pattern in ann_candidates).
580        let all_ids: Vec<&str> = {
581            let mut seen = HashSet::new();
582            content_res
583                .iter()
584                .chain(trigger_res.iter())
585                .map(|(id, _)| id.as_str())
586                .filter(|id| seen.insert(*id))
587                .collect()
588        };
589        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
590
591        let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
592        for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
593            if let Some(chunk) = chunks.get(cid) {
594                if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
595                    continue;
596                }
597                if chunk.get("state").and_then(Value::as_str) == Some("archived") {
598                    continue;
599                }
600                let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
601                if maturity == "promoted" || maturity == "dropped" {
602                    continue;
603                }
604                let ev = chunk
605                    .get("embed_version")
606                    .and_then(Value::as_i64)
607                    .unwrap_or(1);
608                if ev < embed_version {
609                    continue;
610                }
611                let entry = spark_scores
612                    .entry(cid.clone())
613                    .or_insert_with(|| (*sim, chunk.clone()));
614                if *sim > entry.0 {
615                    *entry = (*sim, chunk.clone());
616                }
617            }
618        }
619        let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
620        sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
621        Ok(sparks
622            .into_iter()
623            .take(self.top_k_candidates)
624            .map(|(_, c)| c)
625            .collect())
626    }
627
628    #[allow(clippy::too_many_arguments)]
629    fn write_recall_trace(
630        &self,
631        trace_id: &str,
632        query: &str,
633        context_key: &str,
634        scored: &[(f64, Value)],
635        visible: &[Value],
636        sparks: &[Value],
637        depth_skipped: &[String],
638        skipped_reasons: &HashMap<String, String>,
639        refine_mode: &str,
640        source: &str,
641        now: &str,
642        session_only: bool,
643    ) -> Result<()> {
644        let lib_id = self.storage.lib_id()?;
645        // `selected` must strictly mean "entered the model context". Record the
646        // per-chunk retrieved/selected/refined events only when the result is
647        // actually surfaced: skip them for empty results (nothing surfaced) and
648        // for session-only recalls (daemon discards the knowledge). The episodic
649        // log is still written in both cases — an empty result as a terminal
650        // `known_none`/`discarded` row (no-answer telemetry, never `open`), a
651        // session recall as an `open` row for later record-correlation.
652        let is_empty = visible.is_empty() && sparks.is_empty();
653        let record_selection = !is_empty && !session_only;
654        self.storage.begin_immediate()?;
655        let result = (|| -> Result<()> {
656            if record_selection {
657                for (rank, (_, chunk)) in scored.iter().enumerate() {
658                    let cid = chunk["id"].as_str().unwrap_or("");
659                    let sim = chunk.get("_fused_score").and_then(Value::as_f64);
660                    // For dep-skipped seeds, record their skip reason as refine_mode.
661                    let rm = skipped_reasons
662                        .get(cid)
663                        .map(|r| format!("skipped:{r}"))
664                        .or_else(|| {
665                            if refine_mode != "off" && !refine_mode.is_empty() {
666                                Some(refine_mode.to_string())
667                            } else {
668                                None
669                            }
670                        });
671                    self.storage.insert_usage_trace(
672                        trace_id,
673                        Some(cid),
674                        "retrieved",
675                        1.0,
676                        sim,
677                        rm.as_deref(),
678                        None,
679                        Some((rank + 1) as i64),
680                        None,
681                        source,
682                        now,
683                    )?;
684                }
685                for (rank, chunk) in visible.iter().enumerate() {
686                    let cid = chunk["id"].as_str().unwrap_or("");
687                    self.storage.insert_usage_trace(
688                        trace_id,
689                        Some(cid),
690                        "selected",
691                        1.0,
692                        None,
693                        None,
694                        None,
695                        Some((rank + 1) as i64),
696                        None,
697                        source,
698                        now,
699                    )?;
700                    // Write 'refined' event for chunks that came through the trim path.
701                    if chunk
702                        .get("_trimmed")
703                        .and_then(Value::as_bool)
704                        .unwrap_or(false)
705                    {
706                        self.storage.insert_usage_trace(
707                            trace_id,
708                            Some(cid),
709                            "refined",
710                            1.0,
711                            None,
712                            Some("trim"),
713                            None,
714                            Some((rank + 1) as i64),
715                            None,
716                            source,
717                            now,
718                        )?;
719                    }
720                }
721                // Write 'retrieved' events for sparks (for recurring-spark count tracking).
722                for (rank, chunk) in sparks.iter().enumerate() {
723                    let cid = chunk["id"].as_str().unwrap_or("");
724                    self.storage.insert_usage_trace(
725                        trace_id,
726                        Some(cid),
727                        "retrieved",
728                        1.0,
729                        None,
730                        Some("spark"),
731                        None,
732                        Some((rank + 1) as i64),
733                        None,
734                        source,
735                        now,
736                    )?;
737                }
738            }
739            // The snapshot mirrors what was surfaced: empty for known_none and
740            // session-only recalls so no chunk is credited with a selection.
741            let snapshot = json!({
742                "retrieved": if record_selection { scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
743                "selected": if record_selection { visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
744                "sparks": if record_selection { sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
745                "depth_skipped": depth_skipped,
746                "skipped_reasons": skipped_reasons,
747                "session_only": session_only,
748            });
749            // Empty recall → terminal known_none/discarded (kept out of the `open`
750            // pool that feeds trace-completion stats). Otherwise `open` for the
751            // record() outcome transition (incl. session-only daemon traces).
752            // `usage_state='known_none'` is the no-answer signal; `task_state`
753            // stays 'recalled' (both bounded by schema CHECK constraints).
754            let (usage_state, distill_state) = if is_empty {
755                ("known_none", "discarded")
756            } else {
757                ("unknown", "open")
758            };
759            let log = EpisodicLogRow {
760                id: gen_uuid(),
761                trace_id: trace_id.to_string(),
762                lib_id,
763                ts: now.to_string(),
764                query: Some(query.to_string()),
765                recall_snapshot: Some(snapshot.to_string()),
766                event_source: source.to_string(),
767                task_state: "recalled".to_string(),
768                usage_state: usage_state.to_string(),
769                context_key: Some(context_key.to_string()),
770                distill_state: distill_state.to_string(),
771                ..Default::default()
772            };
773            self.storage.upsert_episodic_log(&log)?;
774            self.storage.commit()
775        })();
776        if result.is_err() {
777            let _ = self.storage.rollback();
778        }
779        result
780    }
781}