Skip to main content

innate_core/kb/
inspection.rs

1use super::*;
2
3impl KnowledgeBase {
4    pub fn inspect(&self) -> Result<Value> {
5        let total: i64 = count_query(
6            &self.storage,
7            "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
8        )?;
9        let active: i64 = count_query(
10            &self.storage,
11            "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
12        )?;
13        let pending: i64 = count_query(
14            &self.storage,
15            "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
16        )?;
17        let archived: i64 = count_query(
18            &self.storage,
19            "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
20        )?;
21        let sparks: i64 = count_query(
22            &self.storage,
23            "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
24        )?;
25        let open_logs: i64 = count_query(
26            &self.storage,
27            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
28        )?;
29        let new_logs: i64 = count_query(
30            &self.storage,
31            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
32        )?;
33        let embed_rebuild: i64 = count_query(&self.storage,
34            "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
35        let schema_version = self.storage.get_meta_or("schema_version", "?");
36        let lib_id = self.storage.get_meta_or("lib_id", "?");
37        let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
38
39        let metric_window_start = days_ago(&utc_now_iso(), 30);
40        let trace_metrics = self.storage.query_chunks_params(
41            "SELECT COUNT(*) AS total,
42                    SUM(CASE WHEN task_state='completed' THEN 1 ELSE 0 END) AS completed,
43                    SUM(CASE WHEN task_state='timed_out' THEN 1 ELSE 0 END) AS timed_out,
44                    SUM(CASE WHEN task_state='completed' AND usage_state!='unknown'
45                             THEN 1 ELSE 0 END) AS usage_known,
46                    SUM(CASE WHEN task_state='completed' AND usage_state='known_some'
47                             THEN 1 ELSE 0 END) AS usage_some,
48                    SUM(CASE WHEN task_state='completed'
49                                  AND outcome IN ('ok','fail')
50                             THEN 1 ELSE 0 END) AS outcome_known,
51                    SUM(CASE WHEN outcome='ok' THEN 1 ELSE 0 END) AS succeeded
52             FROM episodic_log WHERE ts >= ?",
53            rusqlite::params![metric_window_start],
54        )?;
55        let trace_row = trace_metrics.first();
56        let trace_total = trace_row
57            .and_then(|row| row.get("total"))
58            .and_then(Value::as_i64)
59            .unwrap_or(0);
60        let trace_completed = trace_row
61            .and_then(|row| row.get("completed"))
62            .and_then(Value::as_i64)
63            .unwrap_or(0);
64        let trace_timed_out = trace_row
65            .and_then(|row| row.get("timed_out"))
66            .and_then(Value::as_i64)
67            .unwrap_or(0);
68        let usage_known = trace_row
69            .and_then(|row| row.get("usage_known"))
70            .and_then(Value::as_i64)
71            .unwrap_or(0);
72        let usage_some = trace_row
73            .and_then(|row| row.get("usage_some"))
74            .and_then(Value::as_i64)
75            .unwrap_or(0);
76        let succeeded = trace_row
77            .and_then(|row| row.get("succeeded"))
78            .and_then(Value::as_i64)
79            .unwrap_or(0);
80        let outcome_known = trace_row
81            .and_then(|row| row.get("outcome_known"))
82            .and_then(Value::as_i64)
83            .unwrap_or(0);
84        let usage_rows = self.storage.query_chunks_params(
85            "SELECT recall_snapshot, used_ids FROM episodic_log
86             WHERE task_state='completed'
87               AND usage_state!='unknown' AND used_complete=1
88               AND recall_snapshot IS NOT NULL AND used_ids IS NOT NULL
89               AND ts >= ?",
90            rusqlite::params![metric_window_start],
91        )?;
92        let mut selected_total = 0_i64;
93        let mut selected_used = 0_i64;
94        for row in usage_rows {
95            let selected: HashSet<String> = row
96                .get("recall_snapshot")
97                .and_then(Value::as_str)
98                .and_then(|raw| serde_json::from_str::<Value>(raw).ok())
99                .and_then(|snapshot| snapshot.get("selected").cloned())
100                .and_then(|value| serde_json::from_value::<Vec<String>>(value).ok())
101                .unwrap_or_default()
102                .into_iter()
103                .collect();
104            let used: HashSet<String> = row
105                .get("used_ids")
106                .and_then(Value::as_str)
107                .and_then(|raw| serde_json::from_str::<Vec<String>>(raw).ok())
108                .unwrap_or_default()
109                .into_iter()
110                .collect();
111            selected_total += selected.len() as i64;
112            selected_used += selected.intersection(&used).count() as i64;
113        }
114        let feedback_count = count_query_params(
115            &self.storage,
116            "SELECT COUNT(*) FROM feedback_events WHERE ts >= ?",
117            rusqlite::params![metric_window_start],
118        )?;
119        let feedback_traces = count_query_params(
120            &self.storage,
121            "SELECT COUNT(DISTINCT f.trace_id)
122             FROM feedback_events f
123             JOIN episodic_log e ON e.trace_id=f.trace_id
124             WHERE f.ts >= ? AND e.ts >= ? AND e.task_state='completed'",
125            rusqlite::params![metric_window_start, metric_window_start],
126        )?;
127        let pending_evolve = count_query(
128            &self.storage,
129            "SELECT COUNT(*) FROM evolve_requests WHERE state IN ('pending','running')",
130        )?;
131        let governance_pending = count_query(
132            &self.storage,
133            "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
134        )?;
135        let failed_evolve = count_query_params(
136            &self.storage,
137            "SELECT COUNT(*) FROM evolve_requests
138             WHERE last_failed_at >= ?",
139            rusqlite::params![metric_window_start],
140        )?;
141        let failed_distill = count_query_params(
142            &self.storage,
143            "SELECT COUNT(*) FROM episodic_log
144             WHERE distill_last_failed_at >= ?",
145            rusqlite::params![metric_window_start],
146        )?;
147        let confidence_buckets = self.storage.query_chunks(&format!(
148            "SELECT
149               SUM(CASE WHEN confidence < 0.25 THEN 1 ELSE 0 END) AS low,
150               SUM(CASE WHEN confidence >= 0.25 AND confidence < {0} THEN 1 ELSE 0 END) AS medium,
151               SUM(CASE WHEN confidence >= {0} THEN 1 ELSE 0 END) AS high
152             FROM chunks WHERE origin!='spark' AND state!='archived'",
153            self.promote_confidence_min
154        ))?;
155        let confidence_row = confidence_buckets.first();
156
157        // P3-A: oldest pending chunk timestamp — surfaces long-lived pending debt.
158        let pending_oldest_ts = self.storage.query_chunks(
159            "SELECT MIN(created_at) AS oldest FROM chunks WHERE state='pending' AND origin!='spark'",
160        )?.into_iter().next()
161            .and_then(|r| r.get("oldest").cloned())
162            .and_then(|v| if v.is_null() { None } else { Some(v) });
163
164        // Health signal 1: knowledge debt ratio.
165        // Zombie = active chunks with middling confidence (stuck, neither good nor bad)
166        // that are at least 14d old and have been used at least once.
167        // "never-recalled old" chunks are handled by curate 3c (never_used archive).
168        let zombie_cutoff = days_ago(&utc_now_iso(), 14);
169        let zombie: i64 = count_query_params(
170            &self.storage,
171            "SELECT COUNT(*) FROM chunks
172             WHERE origin!='spark' AND state='active'
173               AND confidence >= 0.4 AND confidence <= 0.6
174               AND last_used_at IS NOT NULL
175               AND created_at < ?",
176            rusqlite::params![zombie_cutoff],
177        )?;
178        let debt_numerator = pending + zombie;
179        let debt_denominator = active.max(1);
180        let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
181
182        // Health signal 5: stale screening count
183        let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
184        let stale_screening: i64 = count_query_params(
185            &self.storage,
186            "SELECT COUNT(*) FROM episodic_log
187             WHERE distill_state='screening' AND distill_locked_at < ?",
188            rusqlite::params![screening_cutoff],
189        )?;
190
191        // Health signal 4: actual Distill cost within the configured rolling window.
192        let distill_period_start = self.distill_token_period_start(&utc_now_iso())?;
193        let distill_cost = self.storage.query_chunks_params(
194            "SELECT COALESCE(SUM(prompt_tokens),0) AS pt,
195                    COALESCE(SUM(completion_tokens),0) AS ct
196             FROM distill_token_usage
197             WHERE accounted_at >= ?",
198            rusqlite::params![distill_period_start],
199        )?;
200        let prompt_tokens = distill_cost
201            .first()
202            .and_then(|r| r.get("pt"))
203            .and_then(Value::as_i64)
204            .unwrap_or(0);
205        let completion_tokens = distill_cost
206            .first()
207            .and_then(|r| r.get("ct"))
208            .and_then(Value::as_i64)
209            .unwrap_or(0);
210
211        // Health signal 2: sparks that have been recalled often (soft incubation threshold = 5)
212        let spark_threshold: i64 = self
213            .storage
214            .get_meta("curate.soft_mature_threshold")
215            .ok()
216            .flatten()
217            .and_then(|v| v.parse::<i64>().ok())
218            .unwrap_or(5);
219        let recurring_sparks = self.storage.query_chunks_params(
220            "SELECT ut.chunk_id, COUNT(*) AS cnt,
221                    c.content, c.trigger_desc, c.maturity
222             FROM usage_trace ut
223             JOIN chunks c ON c.id = ut.chunk_id
224             WHERE ut.event='retrieved'
225               AND c.origin='spark'
226             GROUP BY ut.chunk_id HAVING cnt >= ?",
227            rusqlite::params![spark_threshold],
228        )?;
229        let recurring_spark_ids: Vec<Value> = recurring_sparks
230            .iter()
231            .map(|r| {
232                json!({
233                    "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
234                    "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
235                    "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
236                    "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
237                        .chars().take(80).collect::<String>(),
238                })
239            })
240            .collect();
241
242        let mut suggestions: Vec<Value> = Vec::new();
243        if embed_rebuild > 0 {
244            suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
245        }
246        if new_logs > 0 {
247            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
248        }
249        if pending > 0 {
250            suggestions.push(json!({"action": "innate approve <id>  # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
251        }
252        if !recurring_spark_ids.is_empty() {
253            suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
254        }
255        if stale_screening > 0 {
256            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
257        }
258        if governance_pending > 0 {
259            suggestions.push(json!({
260                "action": "review governance_proposals",
261                "reason": format!("{governance_pending} chunk(s) have repeated negative feedback")
262            }));
263        }
264
265        // Intuition honesty (PRD §4): does high strength actually predict success, and is
266        // the critic crying wolf? Only nudge once enough appraisals carry an outcome.
267        let intuition = self.intuition_calibration(&metric_window_start)?;
268        let appraisals = intuition
269            .get("appraisals")
270            .and_then(Value::as_i64)
271            .unwrap_or(0);
272        let mono_gap = intuition
273            .get("monotonicity_gap")
274            .and_then(Value::as_f64)
275            .unwrap_or(0.0);
276        let false_alarm = intuition
277            .get("false_alarm_rate")
278            .and_then(Value::as_f64)
279            .unwrap_or(0.0);
280        if appraisals >= 20 && mono_gap <= 0.0 {
281            suggestions.push(json!({
282                "action": "tune recall.w_* / situation.coarse_keys",
283                "reason": "appraise strength may be noise — strong tier does not beat weak on task_ok"
284            }));
285        }
286        if appraisals >= 20 && false_alarm >= 0.5 {
287            suggestions.push(json!({
288                "action": "review caution chunks / raise appraise.tier_strong",
289                "reason": format!("intuition false-alarm rate {false_alarm} — strong cautions often end ok")
290            }));
291        }
292
293        // Storage growth metrics — trace/log bloat is driven by recall/record
294        // activity over time, independent of chunk count, so it is surfaced here
295        // for monitoring before it becomes a problem.
296        let usage_trace_total = count_query(&self.storage, "SELECT COUNT(*) FROM usage_trace")?;
297        let episodic_log_total = count_query(&self.storage, "SELECT COUNT(*) FROM episodic_log")?;
298        let page_count = count_query(&self.storage, "PRAGMA page_count")?;
299        let page_size = count_query(&self.storage, "PRAGMA page_size")?;
300        let db_size_bytes = page_count * page_size;
301
302        Ok(json!({
303            "schema_version": schema_version,
304            "lib_id": lib_id,
305            "last_agg_ts": last_agg,
306            "chunks": {
307                "total": total, "active": active, "pending": pending, "archived": archived,
308                "pending_oldest_ts": pending_oldest_ts,
309            },
310            "storage": {
311                "usage_trace_rows": usage_trace_total,
312                "episodic_log_rows": episodic_log_total,
313                "db_size_bytes": db_size_bytes,
314                "db_size_mb": (db_size_bytes as f64 / 1_048_576.0 * 100.0).round() / 100.0,
315            },
316            "sparks": sparks,
317            "episodic_log": {"open": open_logs, "new": new_logs},
318            "embed_rebuild_queue": embed_rebuild,
319            "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
320            "stale_screening_count": stale_screening,
321            "feedback_loop": {
322                "trace_completion_rate": ratio(trace_completed, trace_total),
323                "usage_annotation_rate": ratio(usage_known, trace_completed),
324                "trace_use_rate": ratio(usage_some, usage_known),
325                "selected_to_used_rate": ratio(selected_used, selected_total),
326                "task_success_rate": ratio(succeeded, outcome_known),
327                "feedback_coverage": ratio(feedback_traces, trace_completed),
328                "feedback_events": feedback_count,
329                "timed_out_traces": trace_timed_out,
330                "pending_evolve_requests": pending_evolve,
331                "failed_evolve_requests_30d": failed_evolve,
332                "failed_distill_logs_30d": failed_distill,
333                "pending_governance_proposals": governance_pending,
334                "window_days": 30,
335                "confidence_distribution": {
336                    "low": confidence_row.and_then(|row| row.get("low")).and_then(Value::as_i64).unwrap_or(0),
337                    "medium": confidence_row.and_then(|row| row.get("medium")).and_then(Value::as_i64).unwrap_or(0),
338                    "high": confidence_row.and_then(|row| row.get("high")).and_then(Value::as_i64).unwrap_or(0),
339                }
340            },
341            "intuition_calibration": intuition,
342            "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
343            "recurring_sparks": recurring_sparks.len(),
344            "recurring_spark_ids": recurring_spark_ids,
345            "params": {
346                "recall.w_content": self.w_content,
347                "recall.w_trigger": self.w_trigger,
348                "recall.w_context": self.w_context,
349                "recall.w_activation": self.w_activation,
350                "recall.top_k_candidates": self.top_k_candidates,
351                "curate.low_conf_threshold": self.low_conf_threshold,
352                "curate.low_conf_idle_days": self.low_conf_idle_days,
353                "curate.repeat_select_min": self.repeat_select_min,
354                "curate.never_used_age_days": self.never_used_age_days,
355                "curate.promote_used_success_min": self.promote_used_success_min,
356                "curate.promote_confidence_min": self.promote_confidence_min,
357                "curate.screening_timeout_minutes": self.screening_timeout_minutes,
358                "curate.open_ttl_days": self.open_ttl_days,
359                "curate.log_compact_days": self.log_compact_days,
360                "evolve.schedule_interval_hours": self.evolve_schedule_interval_hours,
361            },
362            "suggestions": suggestions
363        }))
364    }
365
366    // ------------------------------------------------------------------
367    // Intuition honesty metrics (PRD §4 / Spec §7)
368    //
369    // The core KPI is not recall but discrimination quality: "loud when it should
370    // be, silent when it shouldn't." All inputs already exist — appraise persists
371    // {valence, tier, strength} into episodic_log.recall_snapshot, and record fills
372    // in `outcome`. We bucket appraisals by tier and check the actual task_ok rate.
373    // ------------------------------------------------------------------
374
375    fn intuition_calibration(&self, window_start: &str) -> Result<Value> {
376        let rows = self.storage.query_chunks_params(
377            "SELECT recall_snapshot, outcome FROM episodic_log
378             WHERE ts >= ? AND recall_snapshot LIKE '%\"appraise\"%'",
379            rusqlite::params![window_start],
380        )?;
381
382        // Per-tier accumulators: (n_total, n_with_outcome, ok, sum_strength_with_outcome).
383        let mut buckets: std::collections::BTreeMap<String, [f64; 4]> =
384            std::collections::BTreeMap::new();
385        for tier in ["weak", "medium", "strong"] {
386            buckets.insert(tier.to_string(), [0.0; 4]);
387        }
388        let mut total = 0_i64;
389        let mut silent = 0_i64;
390        let mut caution_strong = 0_i64;
391        let mut caution_strong_false = 0_i64;
392
393        for row in &rows {
394            let snapshot = row
395                .get("recall_snapshot")
396                .and_then(Value::as_str)
397                .and_then(|raw| serde_json::from_str::<Value>(raw).ok());
398            let Some(appraise) = snapshot.as_ref().and_then(|s| s.get("appraise")) else {
399                continue;
400            };
401            let tier = appraise
402                .get("tier")
403                .and_then(Value::as_str)
404                .unwrap_or("weak");
405            let valence = appraise
406                .get("valence")
407                .and_then(Value::as_str)
408                .unwrap_or("neutral");
409            let strength = appraise
410                .get("strength")
411                .and_then(Value::as_f64)
412                .unwrap_or(0.0);
413            let outcome = row.get("outcome").and_then(Value::as_str);
414
415            total += 1;
416            if tier == "weak" || valence == "neutral" {
417                silent += 1;
418            }
419            let has_outcome = matches!(outcome, Some("ok") | Some("fail"));
420            let is_ok = outcome == Some("ok");
421            if let Some(b) = buckets.get_mut(tier) {
422                b[0] += 1.0;
423                if has_outcome {
424                    b[1] += 1.0;
425                    b[3] += strength;
426                    if is_ok {
427                        b[2] += 1.0;
428                    }
429                }
430            }
431            if valence == "caution" && tier == "strong" && has_outcome {
432                caution_strong += 1;
433                if is_ok {
434                    caution_strong_false += 1;
435                }
436            }
437        }
438
439        let hit_rate = |b: &[f64; 4]| if b[1] > 0.0 { b[2] / b[1] } else { 0.0 };
440        let weak = buckets.get("weak").copied().unwrap_or([0.0; 4]);
441        let strong = buckets.get("strong").copied().unwrap_or([0.0; 4]);
442        let monotonicity_gap = hit_rate(&strong) - hit_rate(&weak);
443
444        // ECE: evidence-weighted gap between mean strength and actual hit rate per bucket.
445        let outcome_total: f64 = buckets.values().map(|b| b[1]).sum();
446        let ece = if outcome_total > 0.0 {
447            buckets
448                .values()
449                .filter(|b| b[1] > 0.0)
450                .map(|b| {
451                    let avg_strength = b[3] / b[1];
452                    (b[1] / outcome_total) * (avg_strength - hit_rate(b)).abs()
453                })
454                .sum::<f64>()
455        } else {
456            0.0
457        };
458
459        let bucket_detail: Vec<Value> = ["weak", "medium", "strong"]
460            .iter()
461            .map(|tier| {
462                let b = buckets.get(*tier).copied().unwrap_or([0.0; 4]);
463                json!({
464                    "tier": tier,
465                    "n": b[0] as i64,
466                    "n_with_outcome": b[1] as i64,
467                    "avg_strength": if b[1] > 0.0 { (b[3] / b[1] * 1000.0).round() / 1000.0 } else { 0.0 },
468                    "actual_hit_rate": (hit_rate(&b) * 1000.0).round() / 1000.0,
469                })
470            })
471            .collect();
472
473        // 方案 B —— verdict_log 仪表盘:可证伪的 ECE / 弃权率(头号体检指标)。
474        // 与上面基于 recall_snapshot 的 tier-bucket 指标互补:verdict_log 直接用
475        // emitted_conf 分桶 + observed 回填算 ECE,且把弃权率作为一等健康信号。
476        let (vl_total, vl_abstained, vl_observed) =
477            self.storage.verdict_log_overview().unwrap_or((0, 0, 0));
478        let samples = self.storage.verdict_calibration_samples().unwrap_or_default();
479        let bins = self.calibration_bins.max(2);
480        let mut vhit = vec![0.0_f64; bins as usize];
481        let mut vtot = vec![0.0_f64; bins as usize];
482        // ECE 按 **emitted_conf** 分桶:衡量「声称置信度」的真实兑现率(校准映射重算
483        // 则按 strength 分桶,见 curate::recompute_calibration_map —— 两者域不同)。
484        for (_strength, conf, h) in &samples {
485            let b = ((conf * bins as f64).floor() as i64).clamp(0, bins - 1) as usize;
486            vtot[b] += 1.0;
487            vhit[b] += *h;
488        }
489        let n_obs: f64 = vtot.iter().sum();
490        let verdict_ece = if n_obs > 0.0 {
491            (0..bins as usize)
492                .filter(|&b| vtot[b] > 0.0)
493                .map(|b| {
494                    let claimed = (b as f64 + 0.5) / bins as f64;
495                    let actual = vhit[b] / vtot[b];
496                    (vtot[b] / n_obs) * (claimed - actual).abs()
497                })
498                .sum::<f64>()
499        } else {
500            0.0
501        };
502
503        Ok(json!({
504            "appraisals": total,
505            "monotonicity_gap": (monotonicity_gap * 1000.0).round() / 1000.0,
506            "ece": (ece * 1000.0).round() / 1000.0,
507            "false_alarm_rate": ratio(caution_strong_false, caution_strong),
508            "silence_rate": ratio(silent, total),
509            "buckets": bucket_detail,
510            // 方案 B verdict_log 仪表盘
511            "verdict_log": {
512                "total": vl_total,
513                "abstained": vl_abstained,
514                "abstain_rate": ratio(vl_abstained, vl_total),
515                "observed": vl_observed,
516                "ece": (verdict_ece * 1000.0).round() / 1000.0,
517            },
518        }))
519    }
520
521    // ------------------------------------------------------------------
522    // Public: rebuild_embeddings (evolve --rebuild-embeddings)
523    // ------------------------------------------------------------------
524
525    pub fn rebuild_embeddings(&self) -> Result<usize> {
526        let meta_version = self
527            .storage
528            .get_meta("embed_version")?
529            .and_then(|v| v.parse::<i64>().ok())
530            .unwrap_or(1);
531        // Fetch chunks with embed_version=0 (failed writes) or below current meta version.
532        let stale = self.storage.query_chunks_params(
533            "SELECT id, content, trigger_desc, state_reason FROM chunks
534             WHERE embed_version = 0 OR embed_version < ?",
535            rusqlite::params![meta_version],
536        )?;
537        // Bulk re-embed: drop the warm cache once so the per-row in-place upserts
538        // stay no-ops (cold) and the loop runs O(N) instead of O(N²). The next
539        // search reloads the rebuilt vectors from disk.
540        self.storage.invalidate_vector_caches();
541        let mut count = 0;
542        for row in &stale {
543            let id = match row.get("id").and_then(Value::as_str) {
544                Some(v) => v,
545                None => continue,
546            };
547            let content = row.get("content").and_then(Value::as_str).unwrap_or("");
548            let trigger = row
549                .get("trigger_desc")
550                .and_then(Value::as_str)
551                .unwrap_or(content);
552            let state_reason = row
553                .get("state_reason")
554                .and_then(Value::as_str)
555                .unwrap_or("");
556
557            let cvec = match self.embedding.embed_content(content) {
558                Ok(v) => v,
559                Err(_) => continue,
560            };
561            let tvec = match self.embedding.embed_trigger(trigger) {
562                Ok(v) => v,
563                Err(_) => continue,
564            };
565
566            self.storage.begin_immediate()?;
567            let r = (|| -> Result<()> {
568                self.store_vec_content(id, &cvec)?;
569                self.store_vec_trigger(id, &tvec)?;
570                // Restore intended state if encoded in state_reason.
571                let new_reason = if state_reason.starts_with("embedding_pending:target=") {
572                    let target_state = state_reason.trim_start_matches("embedding_pending:target=");
573                    let now = utc_now_iso();
574                    self.storage.update_chunk_state(
575                        id,
576                        target_state,
577                        Some("embedding_rebuilt"),
578                        &now,
579                    )?;
580                    "embedding_rebuilt".to_string()
581                } else {
582                    "embedding_rebuilt".to_string()
583                };
584                let now = utc_now_iso();
585                self.storage.conn_execute(
586                    "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
587                    rusqlite::params![meta_version, new_reason, now, id],
588                )?;
589                self.storage.commit()
590            })();
591            if r.is_err() {
592                let _ = self.storage.rollback();
593            } else {
594                count += 1;
595            }
596        }
597        Ok(count)
598    }
599
600    // ------------------------------------------------------------------
601    // Public: inspect_id (inspect <chunk_id> or <trace_id>)
602    // ------------------------------------------------------------------
603
604    pub fn inspect_id(&self, id: &str) -> Result<Value> {
605        // Try as chunk_id first, then as trace_id.
606        if let Some(chunk) = self.storage.get_chunk(id)? {
607            let traces = self.storage.query_chunks_params(
608                "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
609                rusqlite::params![id],
610            )?;
611            let derived = self.storage.query_chunks_params(
612                "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
613                   SELECT id FROM episodic_log WHERE trace_id IN (
614                     SELECT trace_id FROM usage_trace WHERE chunk_id=?
615                   )
616                 ) LIMIT 10",
617                rusqlite::params![id],
618            )?;
619            return Ok(json!({
620                "kind": "chunk",
621                "chunk": chunk,
622                "recent_traces": traces,
623                "derived_chunks": derived,
624            }));
625        }
626        // Try as trace_id.
627        if let Some(log) = self.storage.get_episodic_log(id)? {
628            let traces = self.storage.query_chunks_params(
629                "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
630                rusqlite::params![id],
631            )?;
632            return Ok(json!({
633                "kind": "trace",
634                "episodic_log": log,
635                "usage_traces": traces,
636            }));
637        }
638        Err(InnateError::ChunkNotFound(id.to_string()))
639    }
640
641    // ------------------------------------------------------------------
642    // Sanitize
643    // ------------------------------------------------------------------
644
645    pub(super) fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
646        self.sanitizer.sanitize(content)
647    }
648}