Skip to main content

innate_core/
kb.rs

1//! KnowledgeBase — all 8 Public APIs.
2
3/// Return type for pack(): (selected_chunks, skipped_groups, skip_reasons)
4type PackResult = (
5    Vec<Value>,
6    Vec<(Vec<Value>, f64, usize)>,
7    std::collections::HashMap<String, String>,
8);
9
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::embedding::{DummyEmbeddingProvider, EmbeddingProvider};
17use crate::errors::{InnateError, Result};
18use crate::refine::{
19    DefaultSanitizer, DistilledChunk, Distiller, HeuristicDistiller, NullRefiner, Refiner,
20    Sanitizer,
21};
22use crate::storage::{ChunkRow, EpisodicLogRow, Storage};
23use crate::utils::{
24    content_hash, estimate_tokens, gen_uuid, pack_embedding, utc_now_iso, SanitizeAction,
25};
26
27// ---------------------------------------------------------------------------
28// Tuning defaults
29// ---------------------------------------------------------------------------
30
31const W_CONTENT: f64 = 0.55;
32const W_TRIGGER: f64 = 0.25;
33const W_CONFIDENCE: f64 = 0.10;
34const W_CONTEXT: f64 = 0.15;
35const TOP_K_CANDIDATES: usize = 20;
36const ANTI_TRIGGER_PENALTY: f64 = 0.6;
37const DENSITY_REFILL: bool = true;
38
39const LOW_CONF_THRESHOLD: f64 = 0.25;
40const LOW_CONF_IDLE_DAYS: i64 = 60;
41const REPEAT_SELECT_MIN: i64 = 10;
42const REPEAT_SELECT_CONF_MAX: f64 = 0.5;
43const NEVER_USED_AGE_DAYS: i64 = 30;
44const OPEN_TTL_DAYS: i64 = 14;
45const SCREENING_TIMEOUT_MINUTES: i64 = 30;
46const PROMOTE_USED_SUCCESS_MIN: i64 = 3;
47const PROMOTE_CONFIDENCE_MIN: f64 = 0.60;
48const DECAY_FLOOR: f64 = 0.20;
49const EVOLVE_THRESHOLD: i64 = 5;
50const DISTILL_BATCH_SIZE: usize = 20;
51const PENDING_RECALL_PENALTY: f64 = 0.60;
52const GOVERNANCE_ARCHIVE_THRESHOLD: i64 = 3;
53const NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD: i64 = 5;
54const GOVERNANCE_EVOLVE_THRESHOLD: i64 = 3;
55const FAILURE_MIN_USES: i64 = 5;
56const FAILURE_MAX_SUCCESS_RATE: f64 = 0.20;
57const FAILURE_CONFIDENCE_MAX: f64 = 0.35;
58
59// ---------------------------------------------------------------------------
60// Public result types
61// ---------------------------------------------------------------------------
62
63#[derive(Debug, Default, Clone)]
64pub struct RecallResult {
65    pub knowledge: Vec<Value>,
66    pub sparks: Vec<Value>,
67    pub trace_id: String,
68    pub empty: bool,
69    pub depth_skipped: Vec<String>,
70    pub skipped_reasons: HashMap<String, String>,
71}
72
73#[derive(Debug, Default)]
74pub struct CurateReport {
75    pub archived: Vec<String>,
76    pub deduped: Vec<String>,
77    pub decayed: Vec<String>,
78    pub cycles: Vec<Vec<String>>,
79    pub orphans: Vec<String>,
80    pub recovered: Vec<String>,
81    pub warnings: Vec<String>,
82    pub stats: HashMap<String, Value>,
83}
84
85/// Scope for a single Curate run — allows limiting governance to a subset of chunks.
86#[derive(Debug, Default, Clone)]
87pub struct CurateScope {
88    /// If set, only process chunks with this origin (e.g. "distilled").
89    pub origin: Option<String>,
90    /// If set, only process chunks belonging to this skill.
91    pub skill_name: Option<String>,
92    /// When true, compute the report but do not write any changes.
93    pub dry_run: bool,
94}
95
96/// Replaceable governance interface (§二·六). Inject via `KnowledgeBase::open_with`.
97/// Default implementation: `BuiltinCurator`.
98pub trait Curator: Send + Sync {
99    fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport>;
100}
101
102/// Built-in curator — implements the full §四 governance pipeline.
103pub struct BuiltinCurator;
104
105impl Curator for BuiltinCurator {
106    fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport> {
107        kb.builtin_curate_impl(scope)
108    }
109}
110
111// ---------------------------------------------------------------------------
112// KnowledgeBase
113// ---------------------------------------------------------------------------
114
115pub struct KnowledgeBase {
116    pub storage: Storage,
117    embedding: Arc<dyn EmbeddingProvider>,
118    refiner: Arc<dyn Refiner>,
119    distiller: Arc<dyn Distiller>,
120    curator: Arc<dyn Curator>,
121    sanitizer: Arc<dyn Sanitizer>,
122
123    // Tuning params (loaded from meta at init)
124    w_content: f64,
125    w_trigger: f64,
126    w_confidence: f64,
127    w_context: f64,
128    top_k_candidates: usize,
129    anti_trigger_penalty: f64,
130    density_refill: bool,
131
132    low_conf_threshold: f64,
133    low_conf_idle_days: i64,
134    repeat_select_min: i64,
135    repeat_select_conf_max: f64,
136    never_used_age_days: i64,
137    open_ttl_days: i64,
138    screening_timeout_minutes: i64,
139    promote_used_success_min: i64,
140    promote_confidence_min: f64,
141    decay_floor: f64,
142    evolve_threshold: i64,
143    distill_batch_size: usize,
144    evolve_schedule_interval_hours: i64,
145    governance_archive_threshold: i64,
146    negative_feedback_archive_threshold: i64,
147    governance_evolve_threshold: i64,
148    governance_proposal_max_age_days: i64,
149    failure_min_uses: i64,
150    failure_max_success_rate: f64,
151    failure_confidence_max: f64,
152}
153
154impl KnowledgeBase {
155    pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
156        Self::open_with(db_path, None, None, None, None, None)
157    }
158
159    pub fn open_with(
160        db_path: impl AsRef<Path>,
161        embedding: Option<Arc<dyn EmbeddingProvider>>,
162        refiner: Option<Arc<dyn Refiner>>,
163        distiller: Option<Arc<dyn Distiller>>,
164        curator: Option<Arc<dyn Curator>>,
165        sanitizer: Option<Arc<dyn Sanitizer>>,
166    ) -> Result<Self> {
167        let embedding = embedding.unwrap_or_else(|| Arc::new(DummyEmbeddingProvider::default()));
168        let refiner = refiner.unwrap_or_else(|| Arc::new(NullRefiner));
169        let distiller = distiller.unwrap_or_else(|| Arc::new(HeuristicDistiller));
170        let curator = curator.unwrap_or_else(|| Arc::new(BuiltinCurator));
171        let sanitizer = sanitizer.unwrap_or_else(|| Arc::new(DefaultSanitizer));
172
173        let storage = Storage::open(db_path, embedding.content_dim(), embedding.trigger_dim())?;
174
175        let mut kb = Self {
176            storage,
177            embedding,
178            refiner,
179            distiller,
180            curator,
181            sanitizer,
182            w_content: W_CONTENT,
183            w_trigger: W_TRIGGER,
184            w_confidence: W_CONFIDENCE,
185            w_context: W_CONTEXT,
186            top_k_candidates: TOP_K_CANDIDATES,
187            anti_trigger_penalty: ANTI_TRIGGER_PENALTY,
188            density_refill: DENSITY_REFILL,
189            low_conf_threshold: LOW_CONF_THRESHOLD,
190            low_conf_idle_days: LOW_CONF_IDLE_DAYS,
191            repeat_select_min: REPEAT_SELECT_MIN,
192            repeat_select_conf_max: REPEAT_SELECT_CONF_MAX,
193            never_used_age_days: NEVER_USED_AGE_DAYS,
194            open_ttl_days: OPEN_TTL_DAYS,
195            screening_timeout_minutes: SCREENING_TIMEOUT_MINUTES,
196            promote_used_success_min: PROMOTE_USED_SUCCESS_MIN,
197            promote_confidence_min: PROMOTE_CONFIDENCE_MIN,
198            decay_floor: DECAY_FLOOR,
199            evolve_threshold: EVOLVE_THRESHOLD,
200            distill_batch_size: DISTILL_BATCH_SIZE,
201            evolve_schedule_interval_hours: 6,
202            governance_archive_threshold: GOVERNANCE_ARCHIVE_THRESHOLD,
203            negative_feedback_archive_threshold: NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
204            governance_evolve_threshold: GOVERNANCE_EVOLVE_THRESHOLD,
205            governance_proposal_max_age_days: 30,
206            failure_min_uses: FAILURE_MIN_USES,
207            failure_max_success_rate: FAILURE_MAX_SUCCESS_RATE,
208            failure_confidence_max: FAILURE_CONFIDENCE_MAX,
209        };
210        kb.init_meta()?;
211        kb.load_params()?;
212        Ok(kb)
213    }
214
215    fn init_meta(&self) -> Result<()> {
216        let lib_id = gen_uuid();
217        let content_dim = self.embedding.content_dim().to_string();
218        let trigger_dim = self.embedding.trigger_dim().to_string();
219        let embed_model = self.embedding.model_name();
220
221        for (key, expected) in [
222            ("content_dim", self.embedding.content_dim()),
223            ("trigger_dim", self.embedding.trigger_dim()),
224        ] {
225            if let Some(stored) = self.storage.get_meta(key)? {
226                let actual = stored.parse::<usize>().map_err(|_| {
227                    InnateError::Other(format!("invalid {key} metadata value: {stored}"))
228                })?;
229                if actual != expected {
230                    return Err(InnateError::Other(format!(
231                        "{key} mismatch: database uses {actual}, embedding provider uses {expected}"
232                    )));
233                }
234            }
235        }
236
237        let defaults: &[(&str, &str)] = &[
238            ("lib_id", &lib_id),
239            ("lib_role", "personal"),
240            ("schema_version", "4.13"),
241            ("content_dim", &content_dim),
242            ("trigger_dim", &trigger_dim),
243            ("embed_model", embed_model),
244            ("embed_version", "1"),
245            ("vector_revision", "0"),
246            ("last_agg_ts", "1970-01-01T00:00:00.000Z"),
247            ("recall.w_content", "0.55"),
248            ("recall.w_trigger", "0.25"),
249            ("recall.w_confidence", "0.10"),
250            ("recall.w_context", "0.15"),
251            ("recall.top_k_candidates", "20"),
252            ("recall.anti_trigger_penalty", "0.6"),
253            ("recall.density_refill", "true"),
254            ("curate.low_conf_threshold", "0.25"),
255            ("curate.low_conf_idle_days", "60"),
256            ("curate.repeat_select_min", "10"),
257            ("curate.repeat_select_conf_max", "0.5"),
258            ("curate.never_used_age_days", "30"),
259            ("curate.open_ttl_days", "14"),
260            ("curate.screening_timeout_minutes", "30"),
261            ("curate.promote_used_success_min", "3"),
262            ("curate.promote_confidence_min", "0.60"),
263            ("curate.decay_floor", "0.20"),
264            ("evolve.threshold_new_count", "5"),
265            ("evolve.distill_batch_size", "20"),
266            ("evolve.schedule_interval_hours", "6"),
267            ("curate.soft_mature_threshold", "5"),
268            ("evolve.distill_token_window_hours", "24"),
269            ("curate.governance_archive_threshold", "3"),
270            ("curate.negative_feedback_archive_threshold", "5"),
271            ("evolve.governance_pending_threshold", "3"),
272            ("curate.governance_proposal_max_age_days", "30"),
273            ("curate.failure_min_uses", "5"),
274            ("curate.failure_max_success_rate", "0.20"),
275            ("curate.failure_confidence_max", "0.35"),
276        ];
277        self.storage.begin_immediate()?;
278        let result = (|| -> Result<()> {
279            for (k, v) in defaults {
280                if self.storage.get_meta(k)?.is_none() {
281                    self.storage.set_meta(k, v)?;
282                }
283            }
284            self.storage.commit()
285        })();
286        if result.is_err() {
287            let _ = self.storage.rollback();
288        }
289        result
290    }
291
292    fn load_params(&mut self) -> Result<()> {
293        let f = |k: &str, d: f64| -> f64 {
294            self.storage
295                .get_meta(k)
296                .ok()
297                .flatten()
298                .and_then(|v| v.parse().ok())
299                .unwrap_or(d)
300        };
301        let i = |k: &str, d: i64| -> i64 {
302            self.storage
303                .get_meta(k)
304                .ok()
305                .flatten()
306                .and_then(|v| v.parse().ok())
307                .unwrap_or(d)
308        };
309        let b = |k: &str, d: bool| -> bool {
310            self.storage
311                .get_meta(k)
312                .ok()
313                .flatten()
314                .map(|v| v.to_lowercase() == "true")
315                .unwrap_or(d)
316        };
317        self.w_content = f("recall.w_content", W_CONTENT);
318        self.w_trigger = f("recall.w_trigger", W_TRIGGER);
319        self.w_confidence = f("recall.w_confidence", W_CONFIDENCE);
320        self.w_context = f("recall.w_context", W_CONTEXT);
321        self.top_k_candidates =
322            i("recall.top_k_candidates", TOP_K_CANDIDATES as i64).max(1) as usize;
323        self.anti_trigger_penalty = f("recall.anti_trigger_penalty", ANTI_TRIGGER_PENALTY);
324        self.density_refill = b("recall.density_refill", DENSITY_REFILL);
325        self.low_conf_threshold = f("curate.low_conf_threshold", LOW_CONF_THRESHOLD);
326        self.low_conf_idle_days = i("curate.low_conf_idle_days", LOW_CONF_IDLE_DAYS);
327        self.repeat_select_min = i("curate.repeat_select_min", REPEAT_SELECT_MIN);
328        self.repeat_select_conf_max = f("curate.repeat_select_conf_max", REPEAT_SELECT_CONF_MAX);
329        self.never_used_age_days = i("curate.never_used_age_days", NEVER_USED_AGE_DAYS);
330        self.open_ttl_days = i("curate.open_ttl_days", OPEN_TTL_DAYS);
331        self.screening_timeout_minutes = i(
332            "curate.screening_timeout_minutes",
333            SCREENING_TIMEOUT_MINUTES,
334        );
335        self.promote_used_success_min =
336            i("curate.promote_used_success_min", PROMOTE_USED_SUCCESS_MIN);
337        self.promote_confidence_min = f("curate.promote_confidence_min", PROMOTE_CONFIDENCE_MIN);
338        self.decay_floor = f("curate.decay_floor", DECAY_FLOOR).clamp(0.0, 0.4);
339        self.evolve_threshold = i("evolve.threshold_new_count", EVOLVE_THRESHOLD);
340        self.distill_batch_size =
341            i("evolve.distill_batch_size", DISTILL_BATCH_SIZE as i64) as usize;
342        self.evolve_schedule_interval_hours = i("evolve.schedule_interval_hours", 6).max(1);
343        self.governance_archive_threshold =
344            i("curate.governance_archive_threshold", GOVERNANCE_ARCHIVE_THRESHOLD).max(1);
345        self.negative_feedback_archive_threshold = i(
346            "curate.negative_feedback_archive_threshold",
347            NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
348        )
349        .max(1);
350        self.governance_evolve_threshold =
351            i("evolve.governance_pending_threshold", GOVERNANCE_EVOLVE_THRESHOLD).max(1);
352        self.governance_proposal_max_age_days =
353            i("curate.governance_proposal_max_age_days", 30).max(1);
354        self.failure_min_uses = i("curate.failure_min_uses", FAILURE_MIN_USES).max(1);
355        self.failure_max_success_rate = f(
356            "curate.failure_max_success_rate",
357            FAILURE_MAX_SUCCESS_RATE,
358        )
359        .clamp(0.0, 1.0);
360        self.failure_confidence_max =
361            f("curate.failure_confidence_max", FAILURE_CONFIDENCE_MAX).clamp(0.0, 1.0);
362        Ok(())
363    }
364
365    // ------------------------------------------------------------------
366    // Public API 1: recall
367    // ------------------------------------------------------------------
368
369    #[allow(clippy::too_many_arguments)]
370    pub fn recall(
371        &self,
372        query: &str,
373        budget: usize,
374        trace: bool,
375        include_sparks: bool,
376        top: Option<usize>,
377        source: &str,
378        expand_deps: &str, // "false" | "direct" | "closure"
379        allow_trim: bool,  // if true, invoke Refiner::trim when block doesn't fit
380        refine_mode: &str, // "off" | "trim" | "adapt" — recorded in trace
381    ) -> Result<RecallResult> {
382        validate_source(source)?;
383        let trace_id = gen_uuid();
384        let now = utc_now_iso();
385
386        let q_content = self
387            .embedding
388            .embed_content(query)
389            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
390        let q_trigger = self
391            .embedding
392            .embed_trigger(query)
393            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
394
395        // ANN candidates (non-spark)
396        let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
397        self.apply_soft_dep_bonus(&mut candidates)?;
398
399        // Score + anti-trigger penalty
400        let scored = self.score_candidates(candidates, query)?;
401
402        // First-fit pack with dep expansion
403        let (selected, skipped, skipped_reasons) =
404            self.pack(&scored, budget, expand_deps, allow_trim, query)?;
405
406        let depth_skipped: Vec<String> = skipped_reasons
407            .iter()
408            .filter(|(_, r)| r.as_str() == "dep_depth_limit")
409            .map(|(id, _)| id.clone())
410            .collect();
411
412        // Density refill
413        let mut selected = selected;
414        if self.density_refill {
415            selected = self.density_refill(selected, &skipped, budget);
416        }
417
418        let limited = limit_knowledge(selected, top);
419        let visible = if refine_mode == "adapt" {
420            self.refiner
421                .refine(limited.clone(), Some(budget))
422                .unwrap_or(limited)
423        } else {
424            limited
425        };
426
427        // Sparks
428        let sparks = if include_sparks {
429            self.recall_sparks(&q_content, &q_trigger)?
430        } else {
431            vec![]
432        };
433
434        if trace {
435            self.write_recall_trace(
436                &trace_id,
437                query,
438                &scored,
439                &visible,
440                &sparks,
441                &depth_skipped,
442                &skipped_reasons,
443                refine_mode,
444                source,
445                &now,
446            )?;
447        }
448
449        let empty = visible.is_empty() && sparks.is_empty();
450        Ok(RecallResult {
451            knowledge: visible,
452            sparks,
453            trace_id,
454            empty,
455            depth_skipped,
456            skipped_reasons,
457        })
458    }
459
460    fn ann_candidates(
461        &self,
462        q_content: &[f32],
463        q_trigger: &[f32],
464    ) -> Result<HashMap<String, CandidateInfo>> {
465        let embed_version = self
466            .storage
467            .get_meta("embed_version")?
468            .and_then(|v| v.parse::<i64>().ok())
469            .unwrap_or(1);
470
471        let content_res = self
472            .storage
473            .search_vec_content(q_content, self.top_k_candidates * 2)?;
474        let trigger_res = self
475            .storage
476            .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
477
478        // Collect unique ids and batch-fetch all chunks in two queries instead of N individual ones.
479        let all_ids: Vec<&str> = {
480            let mut seen = HashSet::new();
481            content_res
482                .iter()
483                .chain(trigger_res.iter())
484                .map(|(id, _)| id.as_str())
485                .filter(|id| seen.insert(*id))
486                .collect()
487        };
488        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
489
490        let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
491
492        for (cid, sim) in &content_res {
493            if let Some(chunk) = chunks.get(cid) {
494                if chunk_is_valid_for_recall(chunk, embed_version) {
495                    let e = candidates
496                        .entry(cid.clone())
497                        .or_insert_with(|| CandidateInfo {
498                            chunk: chunk.clone(),
499                            sim_content: 0.0,
500                            sim_trigger: 0.0,
501                        });
502                    e.sim_content = e.sim_content.max(*sim);
503                }
504            }
505        }
506        for (cid, sim) in &trigger_res {
507            if let Some(chunk) = chunks.get(cid) {
508                if chunk_is_valid_for_recall(chunk, embed_version) {
509                    let e = candidates
510                        .entry(cid.clone())
511                        .or_insert_with(|| CandidateInfo {
512                            chunk: chunk.clone(),
513                            sim_content: 0.0,
514                            sim_trigger: 0.0,
515                        });
516                    e.sim_trigger = e.sim_trigger.max(*sim);
517                }
518            }
519        }
520        Ok(candidates)
521    }
522
523    fn apply_soft_dep_bonus(&self, candidates: &mut HashMap<String, CandidateInfo>) -> Result<()> {
524        let ids: Vec<String> = candidates.keys().cloned().collect();
525        for cid in ids {
526            if candidates[&cid].chunk.get("origin").and_then(Value::as_str) == Some("spark") {
527                continue;
528            }
529            let deps = self.storage.get_deps(&cid)?;
530            for (dst, kind, _) in &deps {
531                if kind != "soft" {
532                    continue;
533                }
534                if let Some(target) = self.storage.get_chunk(dst)? {
535                    if target.get("state").and_then(Value::as_str) == Some("archived") {
536                        continue;
537                    }
538                    if target.get("origin").and_then(Value::as_str) == Some("spark") {
539                        continue;
540                    }
541                    let e = candidates
542                        .entry(dst.clone())
543                        .or_insert_with(|| CandidateInfo {
544                            chunk: target,
545                            sim_content: 0.0,
546                            sim_trigger: 0.0,
547                        });
548                    e.sim_content = (e.sim_content + 0.05).min(1.0);
549                }
550            }
551        }
552        Ok(())
553    }
554
555    fn score_candidates(
556        &self,
557        candidates: HashMap<String, CandidateInfo>,
558        query: &str,
559    ) -> Result<Vec<(f64, Value)>> {
560        let context_key = content_hash(&normalize_query(query));
561        let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
562        for info in candidates.into_values() {
563            let conf = info
564                .chunk
565                .get("confidence")
566                .and_then(Value::as_f64)
567                .unwrap_or(0.5);
568            let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
569            let context_score = self.storage.context_score(chunk_id, &context_key)?;
570            let mut fused = self.w_content * info.sim_content as f64
571                + self.w_trigger * info.sim_trigger as f64
572                + self.w_confidence * conf
573                + self.w_context * context_score;
574            if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
575                fused *= PENDING_RECALL_PENALTY;
576            }
577            let anti = info
578                .chunk
579                .get("anti_trigger_desc")
580                .and_then(Value::as_str)
581                .unwrap_or("");
582            if !anti.is_empty() && anti_trigger_hit(query, anti) {
583                fused *= self.anti_trigger_penalty;
584            }
585            let mut chunk = info.chunk;
586            chunk["_context_score"] = json!(context_score);
587            chunk["_fused_score"] = json!(fused);
588            scored.push((fused, chunk));
589        }
590        scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
591        scored.truncate(self.top_k_candidates);
592        Ok(scored)
593    }
594
595    fn pack(
596        &self,
597        scored: &[(f64, Value)],
598        budget: usize,
599        expand_deps: &str,
600        allow_trim: bool,
601        query: &str,
602    ) -> Result<PackResult> {
603        let mut selected: Vec<Value> = vec![];
604        let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
605        let mut skipped_reasons: HashMap<String, String> = HashMap::new();
606        let mut used_ids: HashSet<String> = HashSet::new();
607        let mut used_tokens: usize = 0;
608
609        for (fused, chunk) in scored {
610            let cid = chunk["id"].as_str().unwrap_or("").to_string();
611            if used_ids.contains(&cid) {
612                continue;
613            }
614
615            // Build block with dep expansion; fail-closed on dep issues.
616            let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
617            if let Some(reason) = dep_skip_reason {
618                skipped_reasons.insert(cid, reason);
619                continue;
620            }
621
622            let new_block: Vec<Value> = block
623                .iter()
624                .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
625                .cloned()
626                .collect();
627            let cost = block_cost(&new_block);
628
629            if used_tokens + cost <= budget {
630                for b in &block {
631                    let bid = b["id"].as_str().unwrap_or("").to_string();
632                    if !used_ids.contains(&bid) {
633                        let mut b = b.clone();
634                        b["_fused_score"] = json!(fused);
635                        selected.push(b);
636                        used_ids.insert(bid);
637                    }
638                }
639                used_tokens += cost;
640            } else if allow_trim {
641                // Attempt refiner trim — NullRefiner returns None (no-op).
642                if let Some(trimmed) =
643                    self.refiner
644                        .trim(&block, query, budget.saturating_sub(used_tokens))
645                {
646                    let trim_cost = block_cost(&trimmed);
647                    if used_tokens + trim_cost <= budget {
648                        for b in &trimmed {
649                            let bid = b["id"].as_str().unwrap_or("").to_string();
650                            if !used_ids.contains(&bid) {
651                                let mut b = b.clone();
652                                b["_fused_score"] = json!(fused);
653                                b["_trimmed"] = json!(true);
654                                selected.push(b);
655                                used_ids.insert(bid);
656                            }
657                        }
658                        used_tokens += trim_cost;
659                        continue;
660                    }
661                }
662                skipped.push((block, *fused, cost));
663            } else {
664                skipped.push((block, *fused, cost));
665            }
666        }
667        Ok((selected, skipped, skipped_reasons))
668    }
669
670    /// Expand a seed chunk into a block according to `expand_deps`.
671    /// Returns `(block, Some(skip_reason))` if the block should be discarded (fail-closed).
672    fn build_dep_block(
673        &self,
674        seed: &Value,
675        expand_deps: &str,
676    ) -> Result<(Vec<Value>, Option<String>)> {
677        if expand_deps == "false" || expand_deps.is_empty() {
678            return Ok((vec![seed.clone()], None));
679        }
680        let seed_id = seed["id"].as_str().unwrap_or("");
681        match expand_deps {
682            "direct" => {
683                let deps = self.storage.get_deps(seed_id)?;
684                let mut block = vec![seed.clone()];
685                for (dep_id, kind, _) in &deps {
686                    if kind != "hard" {
687                        continue;
688                    }
689                    match self.validate_hard_dep(dep_id)? {
690                        Some(chunk) => block.push(chunk),
691                        None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
692                    }
693                }
694                Ok((block, None))
695            }
696            "closure" => {
697                let mut block = vec![seed.clone()];
698                let mut visited: HashSet<String> = [seed_id.to_string()].into();
699                match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
700                    Some(reason) => Ok((vec![], Some(reason))),
701                    None => Ok((block, None)),
702                }
703            }
704            _ => Ok((vec![seed.clone()], None)),
705        }
706    }
707
708    /// Returns the chunk if the hard dep is usable, None if it should cause fail-closed.
709    fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
710        match self.storage.get_chunk(dep_id)? {
711            None => Ok(None),
712            Some(chunk) => {
713                let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
714                let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
715                let embed_v = chunk
716                    .get("embed_version")
717                    .and_then(Value::as_i64)
718                    .unwrap_or(0);
719                if state == "archived" || origin == "spark" || embed_v == 0 {
720                    Ok(None)
721                } else {
722                    Ok(Some(chunk))
723                }
724            }
725        }
726    }
727
728    /// BFS hard-dep expansion up to `max_depth`. Returns Some(reason) on fail-closed.
729    fn expand_hard_closure(
730        &self,
731        id: &str,
732        visited: &mut HashSet<String>,
733        block: &mut Vec<Value>,
734        depth: usize,
735        max_depth: usize,
736    ) -> Result<Option<String>> {
737        if depth >= max_depth {
738            return Ok(Some("dep_depth_limit".to_string()));
739        }
740        let deps = self.storage.get_deps(id)?;
741        for (dep_id, kind, _) in &deps {
742            if kind != "hard" {
743                continue;
744            }
745            if visited.contains(dep_id) {
746                continue;
747            } // cycle guard
748            visited.insert(dep_id.clone());
749            match self.validate_hard_dep(dep_id)? {
750                None => return Ok(Some("hard_dep_unavailable".to_string())),
751                Some(chunk) => {
752                    block.push(chunk);
753                    if let Some(reason) =
754                        self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
755                    {
756                        return Ok(Some(reason));
757                    }
758                }
759            }
760        }
761        Ok(None)
762    }
763
764    fn density_refill(
765        &self,
766        mut selected: Vec<Value>,
767        skipped: &[(Vec<Value>, f64, usize)],
768        budget: usize,
769    ) -> Vec<Value> {
770        let used_tokens = block_cost(&selected);
771        if used_tokens >= budget {
772            return selected;
773        }
774
775        let selected_ids: HashSet<String> = selected
776            .iter()
777            .filter_map(|c| c["id"].as_str().map(str::to_string))
778            .collect();
779
780        let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
781            .iter()
782            .filter_map(|(block, fscore, _)| {
783                let block: Vec<Value> = block
784                    .iter()
785                    .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
786                    .cloned()
787                    .collect();
788                if block.is_empty() {
789                    return None;
790                }
791                let cost = block_cost(&block);
792                let density = fscore / cost.max(1) as f64;
793                Some((density, block, cost))
794            })
795            .collect();
796        density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
797
798        let mut used_tokens = block_cost(&selected);
799        let mut added_ids: HashSet<String> = selected_ids;
800        for (_, block, cost) in density_items {
801            if used_tokens + cost <= budget {
802                for b in block {
803                    let bid = b["id"].as_str().unwrap_or("").to_string();
804                    if !added_ids.contains(&bid) {
805                        selected.push(b);
806                        added_ids.insert(bid);
807                    }
808                }
809                used_tokens += cost;
810            }
811        }
812        selected
813    }
814
815    fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
816        let embed_version = self
817            .storage
818            .get_meta("embed_version")?
819            .and_then(|v| v.parse::<i64>().ok())
820            .unwrap_or(1);
821
822        let content_res = self
823            .storage
824            .search_vec_content(q_content, self.top_k_candidates)?;
825        let trigger_res = self
826            .storage
827            .search_vec_trigger(q_trigger, self.top_k_candidates)?;
828
829        // Batch-fetch all candidate chunk IDs (mirrors the pattern in ann_candidates).
830        let all_ids: Vec<&str> = {
831            let mut seen = HashSet::new();
832            content_res
833                .iter()
834                .chain(trigger_res.iter())
835                .map(|(id, _)| id.as_str())
836                .filter(|id| seen.insert(*id))
837                .collect()
838        };
839        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
840
841        let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
842        for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
843            if let Some(chunk) = chunks.get(cid) {
844                if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
845                    continue;
846                }
847                if chunk.get("state").and_then(Value::as_str) == Some("archived") {
848                    continue;
849                }
850                let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
851                if maturity == "promoted" || maturity == "dropped" {
852                    continue;
853                }
854                let ev = chunk
855                    .get("embed_version")
856                    .and_then(Value::as_i64)
857                    .unwrap_or(1);
858                if ev < embed_version {
859                    continue;
860                }
861                let entry = spark_scores
862                    .entry(cid.clone())
863                    .or_insert_with(|| (*sim, chunk.clone()));
864                if *sim > entry.0 {
865                    *entry = (*sim, chunk.clone());
866                }
867            }
868        }
869        let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
870        sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
871        Ok(sparks
872            .into_iter()
873            .take(self.top_k_candidates)
874            .map(|(_, c)| c)
875            .collect())
876    }
877
878    #[allow(clippy::too_many_arguments)]
879    fn write_recall_trace(
880        &self,
881        trace_id: &str,
882        query: &str,
883        scored: &[(f64, Value)],
884        visible: &[Value],
885        sparks: &[Value],
886        depth_skipped: &[String],
887        skipped_reasons: &HashMap<String, String>,
888        refine_mode: &str,
889        source: &str,
890        now: &str,
891    ) -> Result<()> {
892        let lib_id = self.storage.lib_id()?;
893        self.storage.begin_immediate()?;
894        let result = (|| -> Result<()> {
895            for (rank, (_, chunk)) in scored.iter().enumerate() {
896                let cid = chunk["id"].as_str().unwrap_or("");
897                let sim = chunk.get("_fused_score").and_then(Value::as_f64);
898                // For dep-skipped seeds, record their skip reason as refine_mode.
899                let rm = skipped_reasons
900                    .get(cid)
901                    .map(|r| format!("skipped:{r}"))
902                    .or_else(|| {
903                        if refine_mode != "off" && !refine_mode.is_empty() {
904                            Some(refine_mode.to_string())
905                        } else {
906                            None
907                        }
908                    });
909                self.storage.insert_usage_trace(
910                    trace_id,
911                    Some(cid),
912                    "retrieved",
913                    1.0,
914                    sim,
915                    rm.as_deref(),
916                    None,
917                    Some((rank + 1) as i64),
918                    None,
919                    source,
920                    now,
921                )?;
922            }
923            for (rank, chunk) in visible.iter().enumerate() {
924                let cid = chunk["id"].as_str().unwrap_or("");
925                self.storage.insert_usage_trace(
926                    trace_id,
927                    Some(cid),
928                    "selected",
929                    1.0,
930                    None,
931                    None,
932                    None,
933                    Some((rank + 1) as i64),
934                    None,
935                    source,
936                    now,
937                )?;
938                // Write 'refined' event for chunks that came through the trim path.
939                if chunk
940                    .get("_trimmed")
941                    .and_then(Value::as_bool)
942                    .unwrap_or(false)
943                {
944                    self.storage.insert_usage_trace(
945                        trace_id,
946                        Some(cid),
947                        "refined",
948                        1.0,
949                        None,
950                        Some("trim"),
951                        None,
952                        Some((rank + 1) as i64),
953                        None,
954                        source,
955                        now,
956                    )?;
957                }
958            }
959            // Write 'retrieved' events for sparks (for recurring-spark count tracking).
960            for (rank, chunk) in sparks.iter().enumerate() {
961                let cid = chunk["id"].as_str().unwrap_or("");
962                self.storage.insert_usage_trace(
963                    trace_id,
964                    Some(cid),
965                    "retrieved",
966                    1.0,
967                    None,
968                    Some("spark"),
969                    None,
970                    Some((rank + 1) as i64),
971                    None,
972                    source,
973                    now,
974                )?;
975            }
976            let snapshot = json!({
977                "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
978                "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
979                "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
980                "depth_skipped": depth_skipped,
981                "skipped_reasons": skipped_reasons,
982            });
983            let log = EpisodicLogRow {
984                id: gen_uuid(),
985                trace_id: trace_id.to_string(),
986                lib_id,
987                ts: now.to_string(),
988                query: Some(query.to_string()),
989                recall_snapshot: Some(snapshot.to_string()),
990                event_source: source.to_string(),
991                task_state: "recalled".to_string(),
992                usage_state: "unknown".to_string(),
993                context_key: Some(content_hash(&normalize_query(query))),
994                distill_state: "open".to_string(),
995                ..Default::default()
996            };
997            self.storage.upsert_episodic_log(&log)?;
998            self.storage.commit()
999        })();
1000        if result.is_err() {
1001            let _ = self.storage.rollback();
1002        }
1003        result
1004    }
1005
1006    // ------------------------------------------------------------------
1007    // Public API 2: record
1008    // ------------------------------------------------------------------
1009
1010    #[allow(clippy::too_many_arguments)]
1011    pub fn record(
1012        &self,
1013        trace_id: &str,
1014        query: Option<&str>,
1015        output: Option<&str>,
1016        output_summary: Option<&str>,
1017        outcome: Option<&str>,
1018        used: Option<&[String]>,
1019        feedback_up: Option<&[String]>,
1020        feedback_down: Option<&[String]>,
1021        nomination: Option<&str>,
1022        priority: i64,
1023        source: &str,
1024    ) -> Result<()> {
1025        self.record_detailed(
1026            trace_id,
1027            query,
1028            output,
1029            output_summary,
1030            outcome,
1031            used,
1032            "explicit",
1033            true,
1034            feedback_up,
1035            feedback_down,
1036            "user",
1037            None,
1038            None,
1039            nomination,
1040            priority,
1041            None,
1042            source,
1043        )
1044    }
1045
1046    #[allow(clippy::too_many_arguments)]
1047    pub fn record_detailed(
1048        &self,
1049        trace_id: &str,
1050        query: Option<&str>,
1051        output: Option<&str>,
1052        output_summary: Option<&str>,
1053        outcome: Option<&str>,
1054        used: Option<&[String]>,
1055        used_attribution: &str,
1056        used_complete: bool,
1057        feedback_up: Option<&[String]>,
1058        feedback_down: Option<&[String]>,
1059        feedback_kind: &str,
1060        feedback_actor: Option<&str>,
1061        feedback_reason: Option<&str>,
1062        nomination: Option<&str>,
1063        priority: i64,
1064        task_state: Option<&str>,
1065        source: &str,
1066    ) -> Result<()> {
1067        let dedupe_ids = |ids: &[String]| {
1068            let mut seen = HashSet::new();
1069            ids.iter()
1070                .filter(|id| seen.insert((*id).clone()))
1071                .cloned()
1072                .collect::<Vec<_>>()
1073        };
1074        let normalized_used = used.map(dedupe_ids);
1075        let normalized_feedback_up = feedback_up.map(dedupe_ids);
1076        let normalized_feedback_down = feedback_down.map(dedupe_ids);
1077        let used = normalized_used.as_deref();
1078        let feedback_up = normalized_feedback_up.as_deref();
1079        let feedback_down = normalized_feedback_down.as_deref();
1080
1081        if let Some(o) = outcome {
1082            if !matches!(o, "ok" | "fail" | "unknown") {
1083                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
1084            }
1085        }
1086        if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
1087            return Err(InnateError::InvalidState(format!(
1088                "invalid used attribution: {used_attribution}"
1089            )));
1090        }
1091        if !matches!(feedback_kind, "user" | "judge") {
1092            return Err(InnateError::InvalidState(format!(
1093                "invalid feedback kind: {feedback_kind}"
1094            )));
1095        }
1096        if let Some(state) = task_state {
1097            if !matches!(
1098                state,
1099                "recalled" | "running" | "completed" | "abandoned" | "timed_out"
1100            ) {
1101                return Err(InnateError::InvalidState(format!(
1102                    "invalid task state: {state}"
1103                )));
1104            }
1105        }
1106        validate_source(source)?;
1107        if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
1108            let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
1109            if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
1110                return Err(InnateError::InvalidState(format!(
1111                    "conflicting feedback for chunk {chunk_id}"
1112                )));
1113            }
1114        }
1115        let effective_priority = if nomination.is_some() && priority == 0 {
1116            1
1117        } else {
1118            priority
1119        };
1120        let now = utc_now_iso();
1121        let lib_id = self.storage.lib_id()?;
1122
1123        self.storage.begin_immediate()?;
1124        let result = (|| -> Result<()> {
1125            let log = self.storage.get_episodic_log(trace_id)?;
1126            let mut is_fresh_insert = false;
1127            let log = match log {
1128                Some(l) => l,
1129                None => {
1130                    let used_ids = used.map(serde_json::to_string).transpose()?;
1131                    let row = EpisodicLogRow {
1132                        id: gen_uuid(),
1133                        trace_id: trace_id.to_string(),
1134                        lib_id,
1135                        ts: now.clone(),
1136                        query: query.map(str::to_string).or_else(|| Some(String::new())),
1137                        output: output.map(str::to_string),
1138                        output_summary: output_summary.map(str::to_string),
1139                        outcome: outcome.map(str::to_string),
1140                        event_source: source.to_string(),
1141                        task_state: if matches!(outcome, Some("ok") | Some("fail")) {
1142                            "completed".to_string()
1143                        } else {
1144                            task_state.unwrap_or("running").to_string()
1145                        },
1146                        completed_at: matches!(outcome, Some("ok") | Some("fail"))
1147                            .then(|| now.clone()),
1148                        usage_state: usage_state(used).to_string(),
1149                        used_ids,
1150                        used_attribution: used.map(|_| used_attribution.to_string()),
1151                        used_complete,
1152                        context_key: query.map(|q| content_hash(&normalize_query(q))),
1153                        nomination: nomination.map(str::to_string),
1154                        priority: effective_priority,
1155                        distill_state: "open".to_string(),
1156                        ..Default::default()
1157                    };
1158                    self.storage.upsert_episodic_log(&row)?;
1159                    is_fresh_insert = true;
1160                    self.storage.get_episodic_log(trace_id)?.unwrap()
1161                }
1162            };
1163            self.validate_trace_attribution(trace_id, used, "used")?;
1164            self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
1165            self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
1166
1167            let existing_outcome = log
1168                .get("outcome")
1169                .and_then(Value::as_str)
1170                .map(str::to_string);
1171            if let Some(new_outcome) = outcome {
1172                if let Some(ref ex) = existing_outcome {
1173                    if ex != "unknown" && ex != new_outcome {
1174                        return Err(InnateError::OutcomeConflict {
1175                            trace_id: trace_id.to_string(),
1176                            existing: ex.clone(),
1177                            requested: new_outcome.to_string(),
1178                        });
1179                    }
1180                }
1181            }
1182
1183            // usage_trace: used
1184            let effective_used_attribution = if used.is_some() {
1185                used_attribution
1186            } else {
1187                log.get("used_attribution")
1188                    .and_then(Value::as_str)
1189                    .unwrap_or(used_attribution)
1190            };
1191            let used_strength = match effective_used_attribution {
1192                "explicit" => 0.3,
1193                "cited" => 0.25,
1194                "inferred" => 0.15,
1195                _ => unreachable!(),
1196            };
1197            let existing_used_ids: Vec<String> = log
1198                .get("used_ids")
1199                .and_then(Value::as_str)
1200                .and_then(|raw| serde_json::from_str(raw).ok())
1201                .unwrap_or_default();
1202            let existing_used_complete = log
1203                .get("used_complete")
1204                .and_then(Value::as_i64)
1205                .unwrap_or(0)
1206                != 0;
1207            let effective_used_complete = used_complete || existing_used_complete;
1208            let effective_used_ids = used.map(|reported| {
1209                if used_complete {
1210                    reported.to_vec()
1211                } else {
1212                    let mut merged = existing_used_ids.clone();
1213                    let mut seen: HashSet<String> = merged.iter().cloned().collect();
1214                    merged.extend(
1215                        reported
1216                            .iter()
1217                            .filter(|id| seen.insert((*id).clone()))
1218                            .cloned(),
1219                    );
1220                    merged
1221                }
1222            });
1223            if let Some(used_ids) = effective_used_ids.as_deref() {
1224                let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
1225                if used_complete {
1226                    self.storage.replace_used_trace(
1227                        trace_id,
1228                        used_ids,
1229                        used_strength,
1230                        used_attribution,
1231                        source,
1232                        &now,
1233                    )?;
1234                } else if let Some(reported) = used {
1235                    self.storage.merge_used_trace(
1236                        trace_id,
1237                        reported,
1238                        used_strength,
1239                        used_attribution,
1240                        source,
1241                        &now,
1242                    )?;
1243                }
1244                let affected: HashSet<String> = previously_used
1245                    .into_iter()
1246                    .chain(used_ids.iter().cloned())
1247                    .collect();
1248                for cid in affected {
1249                    self.storage.refresh_chunk_last_used(&cid, &now)?;
1250                }
1251            }
1252
1253            // usage_trace: task_ok / task_fail
1254            if let Some(o) = outcome {
1255                if matches!(o, "ok" | "fail") {
1256                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
1257                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
1258                    self.storage.insert_usage_trace(
1259                        trace_id, None, event, strength, None, None, None, None, None, source, &now,
1260                    )?;
1261                }
1262            }
1263
1264            // Rebuild trace-derived evidence whenever either side of the pair arrives.
1265            // This makes `outcome → used` and `used → outcome` equivalent and lets a
1266            // later complete usage declaration replace an earlier one safely.
1267            let effective_outcome = outcome
1268                .filter(|value| *value != "unknown")
1269                .or(existing_outcome.as_deref().filter(|value| *value != "unknown"));
1270            if let Some(o @ ("ok" | "fail")) = effective_outcome {
1271                if used.is_some()
1272                    || (outcome.is_some_and(|value| value != "unknown")
1273                        && existing_outcome.as_deref() != outcome)
1274                {
1275                    let fallback_ids: Vec<String>;
1276                    let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
1277                        effective_used_ids.as_deref()
1278                    } else {
1279                        fallback_ids = log
1280                            .get("used_ids")
1281                            .and_then(Value::as_str)
1282                            .and_then(|s| serde_json::from_str(s).ok())
1283                            .unwrap_or_default();
1284                        if fallback_ids.is_empty() {
1285                            None
1286                        } else {
1287                            Some(&fallback_ids)
1288                        }
1289                    };
1290                    let effective_complete = if used.is_some() {
1291                        effective_used_complete
1292                    } else {
1293                        log.get("usage_state").and_then(Value::as_str) != Some("unknown")
1294                            && log
1295                                .get("used_complete")
1296                            .and_then(Value::as_i64)
1297                            .unwrap_or(1)
1298                            != 0
1299                    };
1300                    self.replace_outcome_evidence(
1301                        trace_id,
1302                        o,
1303                        effective_used,
1304                        effective_complete,
1305                        &now,
1306                    )?;
1307                }
1308            } else if used.is_some() && effective_used_complete {
1309                self.replace_selected_unused_evidence(
1310                    trace_id,
1311                    effective_used_ids.as_deref().unwrap_or_default(),
1312                    &now,
1313                )?;
1314            }
1315
1316            let context_key = log
1317                .get("context_key")
1318                .and_then(Value::as_str)
1319                .map(str::to_string)
1320                .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
1321            let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
1322
1323            // Persist feedback facts before reducing them into confidence.
1324            // INSERT OR IGNORE: skip all derived updates for duplicate (trace_id, chunk_id, signal).
1325            // Track affected chunks so we only rebuild their context_stats (not the full table).
1326            let mut context_affected: HashSet<String> = HashSet::new();
1327            if let Some(used_ids) = effective_used_ids.as_deref() {
1328                for cid in used_ids {
1329                    context_affected.insert(cid.clone());
1330                }
1331            }
1332            if let Some(ups) = feedback_up {
1333                for cid in ups {
1334                    let corrected = self.storage
1335                        .delete_feedback_event(trace_id, cid, "down")?;
1336                    self.storage
1337                        .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_down")?;
1338                    let inserted = self.storage.insert_feedback_event(
1339                        &gen_uuid(),
1340                        trace_id,
1341                        cid,
1342                        "up",
1343                        feedback_strength,
1344                        source,
1345                        feedback_actor,
1346                        feedback_reason,
1347                        context_key.as_deref(),
1348                        &now,
1349                    )?;
1350                    if inserted > 0 {
1351                        self.upsert_trace_confidence_evidence(
1352                            trace_id,
1353                            cid,
1354                            "feedback_up",
1355                            1.0,
1356                            feedback_strength,
1357                            if feedback_kind == "judge" {
1358                                "judge_up"
1359                            } else {
1360                                "user_up"
1361                            },
1362                            context_key.as_deref(),
1363                            &now,
1364                            true,
1365                        )?;
1366                        self.storage.update_chunk_last_used(cid, &now)?;
1367                        self.refresh_governance_evidence(cid, &now)?;
1368                        context_affected.insert(cid.clone());
1369                    } else if corrected > 0 {
1370                        self.recompute_chunk_confidence(cid, &now)?;
1371                        self.refresh_governance_evidence(cid, &now)?;
1372                        context_affected.insert(cid.clone());
1373                    }
1374                }
1375            }
1376            if let Some(downs) = feedback_down {
1377                for cid in downs {
1378                    let corrected = self.storage
1379                        .delete_feedback_event(trace_id, cid, "up")?;
1380                    self.storage
1381                        .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_up")?;
1382                    let inserted = self.storage.insert_feedback_event(
1383                        &gen_uuid(),
1384                        trace_id,
1385                        cid,
1386                        "down",
1387                        feedback_strength,
1388                        source,
1389                        feedback_actor,
1390                        feedback_reason,
1391                        context_key.as_deref(),
1392                        &now,
1393                    )?;
1394                    if inserted > 0 {
1395                        self.upsert_trace_confidence_evidence(
1396                            trace_id,
1397                            cid,
1398                            "feedback_down",
1399                            0.0,
1400                            feedback_strength,
1401                            if feedback_kind == "judge" {
1402                                "judge_down"
1403                            } else {
1404                                "user_down"
1405                            },
1406                            context_key.as_deref(),
1407                            &now,
1408                            true,
1409                        )?;
1410                        self.refresh_governance_evidence(cid, &now)?;
1411                        context_affected.insert(cid.clone());
1412                    } else if corrected > 0 {
1413                        self.recompute_chunk_confidence(cid, &now)?;
1414                        self.refresh_governance_evidence(cid, &now)?;
1415                        context_affected.insert(cid.clone());
1416                    }
1417                }
1418            }
1419            // Targeted rebuild — only update context_stats for chunks touched in this call.
1420            self.rebuild_context_stats_for(&context_affected, &now)?;
1421
1422            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
1423            if !is_fresh_insert {
1424                self.storage.patch_episodic_log_content(
1425                    trace_id,
1426                    query,
1427                    output,
1428                    output_summary,
1429                    nomination,
1430                    effective_priority,
1431                )?;
1432            }
1433
1434            let lifecycle_state = if effective_outcome.is_some() {
1435                "completed"
1436            } else {
1437                task_state.unwrap_or_else(|| {
1438                    log.get("task_state")
1439                        .and_then(Value::as_str)
1440                        .unwrap_or("running")
1441                })
1442            };
1443            let used_ids_json = effective_used_ids
1444                .as_deref()
1445                .map(serde_json::to_string)
1446                .transpose()?;
1447            self.storage.update_trace_lifecycle(
1448                trace_id,
1449                lifecycle_state,
1450                (lifecycle_state == "completed").then_some(now.as_str()),
1451                effective_used_ids
1452                    .as_deref()
1453                    .map(|ids| usage_state(Some(ids))),
1454                used_ids_json.as_deref(),
1455                used.map(|_| used_attribution),
1456                used.map(|_| effective_used_complete),
1457            )?;
1458
1459            // Update episodic log
1460            let current_state = log
1461                .get("distill_state")
1462                .and_then(Value::as_str)
1463                .unwrap_or("open");
1464            let lifecycle_completed = lifecycle_state == "completed";
1465            let new_state = if current_state == "open"
1466                && matches!(lifecycle_state, "abandoned" | "timed_out")
1467            {
1468                Some("discarded")
1469            } else if current_state == "open" && lifecycle_completed {
1470                let has_material = output_summary.is_some()
1471                    || nomination.is_some()
1472                    || output.is_some()
1473                    || log.get("output_summary").and_then(Value::as_str).is_some()
1474                    || log.get("nomination").and_then(Value::as_str).is_some()
1475                    || log.get("output").and_then(Value::as_str).is_some();
1476                if has_material {
1477                    Some("new")
1478                } else {
1479                    Some("discarded")
1480                }
1481            } else {
1482                None
1483            };
1484            if let Some(state) = new_state {
1485                let note = if state == "discarded" {
1486                    Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
1487                        lifecycle_state
1488                    } else {
1489                        "insufficient_material"
1490                    })
1491                } else {
1492                    None
1493                };
1494                let outcome_str = outcome.map(str::to_string);
1495                self.storage.update_episodic_log_state(
1496                    trace_id,
1497                    state,
1498                    note,
1499                    outcome_str.as_deref(),
1500                )?;
1501            } else if outcome.is_some() {
1502                let outcome_str = outcome.map(str::to_string);
1503                self.storage.update_episodic_log_state(
1504                    trace_id,
1505                    current_state,
1506                    None,
1507                    outcome_str.as_deref(),
1508                )?;
1509            }
1510
1511            self.storage.commit()
1512        })();
1513        if result.is_err() {
1514            let _ = self.storage.rollback();
1515        }
1516        result?;
1517        self.enqueue_evolve_if_needed(&now)?;
1518        Ok(())
1519    }
1520
1521    fn validate_trace_attribution(
1522        &self,
1523        trace_id: &str,
1524        chunk_ids: Option<&[String]>,
1525        field: &str,
1526    ) -> Result<()> {
1527        let Some(chunk_ids) = chunk_ids else {
1528            return Ok(());
1529        };
1530        if chunk_ids.is_empty() {
1531            return Ok(());
1532        }
1533
1534        let log = self.storage.get_episodic_log(trace_id)?.ok_or_else(|| {
1535            InnateError::InvalidState(format!(
1536                "{field} requires a trace created by recall: {trace_id}"
1537            ))
1538        })?;
1539        let mut attributable = HashSet::new();
1540        if let Some(raw) = log.get("recall_snapshot").and_then(Value::as_str) {
1541            if let Ok(snapshot) = serde_json::from_str::<Value>(raw) {
1542                if let Some(ids) = snapshot.get("selected").and_then(Value::as_array) {
1543                    attributable.extend(ids.iter().filter_map(Value::as_str).map(str::to_string));
1544                }
1545            }
1546        }
1547        let rows = self.storage.query_chunks_params(
1548            "SELECT DISTINCT chunk_id FROM usage_trace
1549             WHERE trace_id=? AND chunk_id IS NOT NULL
1550               AND event='selected'",
1551            rusqlite::params![trace_id],
1552        )?;
1553        attributable.extend(rows.iter().filter_map(|row| {
1554            row.get("chunk_id")
1555                .and_then(Value::as_str)
1556                .map(str::to_string)
1557        }));
1558
1559        for chunk_id in chunk_ids {
1560            if self.storage.get_chunk(chunk_id)?.is_none() || !attributable.contains(chunk_id) {
1561                return Err(InnateError::InvalidState(format!(
1562                    "{field} chunk {chunk_id} was not attributable to trace {trace_id}"
1563                )));
1564            }
1565        }
1566        Ok(())
1567    }
1568
1569    fn replace_selected_unused_evidence(
1570        &self,
1571        trace_id: &str,
1572        used_ids: &[String],
1573        now: &str,
1574    ) -> Result<()> {
1575        let old_rows = self.storage.query_chunks_params(
1576            "SELECT DISTINCT chunk_id FROM confidence_evidence
1577             WHERE trace_id=? AND kind='selected_unused'",
1578            rusqlite::params![trace_id],
1579        )?;
1580        let mut affected: HashSet<String> = old_rows
1581            .iter()
1582            .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1583            .collect();
1584        self.storage
1585            .delete_trace_confidence_evidence(trace_id, &["selected_unused"])?;
1586
1587        let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1588        let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1589            log.get("context_key")
1590                .and_then(Value::as_str)
1591                .map(str::to_string)
1592        });
1593        let selected_rows = self.storage.query_chunks_params(
1594            "SELECT chunk_id FROM usage_trace
1595             WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1596            rusqlite::params![trace_id],
1597        )?;
1598        for row in selected_rows {
1599            if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1600                if !used_set.contains(chunk_id) {
1601                    self.upsert_trace_confidence_evidence(
1602                        trace_id,
1603                        chunk_id,
1604                        "selected_unused",
1605                        0.0,
1606                        0.08,
1607                        "selected_unused",
1608                        context_key.as_deref(),
1609                        now,
1610                        false,
1611                    )?;
1612                    affected.insert(chunk_id.to_string());
1613                }
1614            }
1615        }
1616        for chunk_id in affected {
1617            self.recompute_chunk_confidence(&chunk_id, now)?;
1618        }
1619        Ok(())
1620    }
1621
1622    #[allow(clippy::too_many_arguments)]
1623    fn upsert_trace_confidence_evidence(
1624        &self,
1625        trace_id: &str,
1626        chunk_id: &str,
1627        kind: &str,
1628        target: f64,
1629        strength: f64,
1630        reason: &str,
1631        context_key: Option<&str>,
1632        now: &str,
1633        explicit: bool,
1634    ) -> Result<()> {
1635        let chunk = match self.storage.get_chunk(chunk_id)? {
1636            Some(chunk) => chunk,
1637            None => return Ok(()),
1638        };
1639        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1640            return Ok(());
1641        }
1642        let recency_weight = if explicit {
1643            const KAPPA: f64 = 0.5;
1644            const WINDOW_DAYS: f64 = 14.0;
1645            let gap_days = chunk
1646                .get("last_used_at")
1647                .and_then(Value::as_str)
1648                .map(|ts| iso_days_diff(now, ts) as f64)
1649                .unwrap_or(0.0);
1650            (1.0
1651                + KAPPA
1652                    * (-(gap_days / WINDOW_DAYS) * std::f64::consts::LN_2).exp())
1653            .min(1.5)
1654        } else {
1655            1.0
1656        };
1657        let alpha = (0.2 * strength * recency_weight).clamp(0.0, 1.0);
1658        self.storage.upsert_confidence_evidence(
1659            &gen_uuid(),
1660            Some(trace_id),
1661            chunk_id,
1662            kind,
1663            target,
1664            alpha,
1665            reason,
1666            context_key,
1667            now,
1668        )?;
1669        self.recompute_chunk_confidence(chunk_id, now)
1670    }
1671
1672    fn recompute_chunk_confidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1673        let Some(chunk) = self.storage.get_chunk(chunk_id)? else {
1674            return Ok(());
1675        };
1676        let mut confidence = chunk
1677            .get("confidence_base")
1678            .and_then(Value::as_f64)
1679            .unwrap_or_else(|| {
1680                chunk
1681                    .get("confidence")
1682                    .and_then(Value::as_f64)
1683                    .unwrap_or(0.5)
1684            });
1685        let mut reason = chunk
1686            .get("confidence_reason")
1687            .and_then(Value::as_str)
1688            .unwrap_or("base")
1689            .to_string();
1690        for evidence in self.storage.confidence_evidence_for_chunk(chunk_id)? {
1691            let target = evidence
1692                .get("target")
1693                .and_then(Value::as_f64)
1694                .unwrap_or(0.5);
1695            let alpha = evidence
1696                .get("alpha")
1697                .and_then(Value::as_f64)
1698                .unwrap_or(0.0)
1699                .clamp(0.0, 1.0);
1700            confidence = (confidence + alpha * (target - confidence)).clamp(0.0, 1.0);
1701            reason = evidence
1702                .get("reason")
1703                .and_then(Value::as_str)
1704                .unwrap_or("evidence")
1705                .to_string();
1706        }
1707        self.storage.conn_execute(
1708            "UPDATE chunks SET confidence=?, confidence_reason=?, updated_at=? WHERE id=?",
1709            rusqlite::params![confidence, reason, now, chunk_id],
1710        )
1711    }
1712
1713    #[allow(clippy::too_many_arguments)]
1714    fn replace_outcome_evidence(
1715        &self,
1716        trace_id: &str,
1717        outcome: &str,
1718        used: Option<&[String]>,
1719        used_complete: bool,
1720        now: &str,
1721    ) -> Result<()> {
1722        let old_rows = self.storage.query_chunks_params(
1723            "SELECT DISTINCT chunk_id FROM confidence_evidence
1724             WHERE trace_id=? AND kind IN ('outcome_ok','outcome_fail','selected_unused')",
1725            rusqlite::params![trace_id],
1726        )?;
1727        let mut affected: HashSet<String> = old_rows
1728            .iter()
1729            .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1730            .collect();
1731        self.storage.delete_trace_confidence_evidence(
1732            trace_id,
1733            &["outcome_ok", "outcome_fail", "selected_unused"],
1734        )?;
1735
1736        let used_ids = used.unwrap_or_default();
1737        let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1738        let attribution_divisor = used_ids.len().max(1) as f64;
1739        let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1740            log.get("context_key")
1741                .and_then(Value::as_str)
1742                .map(str::to_string)
1743        });
1744        for chunk_id in used_ids {
1745            let attribution = self
1746                .storage
1747                .query_chunks_params(
1748                    "SELECT strength, attribution FROM usage_trace
1749                     WHERE trace_id=? AND chunk_id=? AND event='used'",
1750                    rusqlite::params![trace_id, chunk_id],
1751                )?
1752                .into_iter()
1753                .next();
1754            let base_strength = attribution
1755                .as_ref()
1756                .and_then(|row| row.get("strength"))
1757                .and_then(Value::as_f64)
1758                .unwrap_or(0.15)
1759                / attribution_divisor;
1760            let attribution_reason = attribution
1761                .as_ref()
1762                .and_then(|row| row.get("attribution"))
1763                .and_then(Value::as_str)
1764                .unwrap_or("inferred");
1765            let (kind, target, strength, reason) = if outcome == "ok" {
1766                ("outcome_ok", 1.0, base_strength, attribution_reason)
1767            } else {
1768                ("outcome_fail", 0.0, base_strength * 0.5, "task_fail")
1769            };
1770            self.upsert_trace_confidence_evidence(
1771                trace_id,
1772                chunk_id,
1773                kind,
1774                target,
1775                strength,
1776                reason,
1777                context_key.as_deref(),
1778                now,
1779                false,
1780            )?;
1781            affected.insert(chunk_id.clone());
1782        }
1783
1784        if used_complete {
1785            let selected_rows = self.storage.query_chunks_params(
1786                "SELECT chunk_id FROM usage_trace
1787                 WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1788                rusqlite::params![trace_id],
1789            )?;
1790            for row in selected_rows {
1791                if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1792                    if !used_set.contains(chunk_id) {
1793                        self.upsert_trace_confidence_evidence(
1794                            trace_id,
1795                            chunk_id,
1796                            "selected_unused",
1797                            0.0,
1798                            0.08,
1799                            "selected_unused",
1800                            context_key.as_deref(),
1801                            now,
1802                            false,
1803                        )?;
1804                        affected.insert(chunk_id.to_string());
1805                    }
1806                }
1807            }
1808        }
1809
1810        for chunk_id in affected {
1811            self.recompute_chunk_confidence(&chunk_id, now)?;
1812        }
1813        Ok(())
1814    }
1815
1816    fn rebuild_context_stats(&self, now: &str) -> Result<()> {
1817        self.storage.conn_execute(
1818            "DELETE FROM chunk_context_stats",
1819            rusqlite::params![],
1820        )?;
1821        self.storage.conn_execute(
1822            "INSERT INTO chunk_context_stats
1823             (chunk_id, context_key, success_count, failure_count,
1824              positive_feedback, negative_feedback, last_updated_at)
1825             SELECT chunk_id, context_key, success_count, failure_count,
1826                    positive_feedback, negative_feedback, ?
1827             FROM chunk_context_stats_base",
1828            rusqlite::params![now],
1829        )?;
1830        self.storage.conn_execute(
1831            "INSERT INTO chunk_context_stats
1832             (chunk_id, context_key, success_count, failure_count,
1833              positive_feedback, negative_feedback, last_updated_at)
1834             SELECT chunk_id, context_key,
1835                    SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1836                    SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1837                    0, 0, ?
1838             FROM confidence_evidence
1839             WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1840             GROUP BY chunk_id, context_key
1841             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1842               success_count=success_count+excluded.success_count,
1843               failure_count=failure_count+excluded.failure_count,
1844               last_updated_at=excluded.last_updated_at",
1845            rusqlite::params![now],
1846        )?;
1847        self.storage.conn_execute(
1848            "INSERT INTO chunk_context_stats
1849             (chunk_id, context_key, success_count, failure_count,
1850              positive_feedback, negative_feedback, last_updated_at)
1851             SELECT fe.chunk_id, fe.context_key, 0, 0,
1852                    SUM(CASE WHEN fe.signal='up' THEN 1 ELSE 0 END),
1853                    SUM(CASE WHEN fe.signal='down' THEN 1 ELSE 0 END), ?
1854             FROM feedback_events fe
1855             WHERE fe.context_key IS NOT NULL
1856               AND fe.ts > COALESCE((
1857                 SELECT c.state_updated_at FROM chunks c
1858                 WHERE c.id = fe.chunk_id AND c.state_reason = 'restore'
1859               ), '')
1860             GROUP BY fe.chunk_id, fe.context_key
1861             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1862               positive_feedback=positive_feedback+excluded.positive_feedback,
1863               negative_feedback=negative_feedback+excluded.negative_feedback,
1864               last_updated_at=excluded.last_updated_at",
1865            rusqlite::params![now],
1866        )
1867    }
1868
1869    /// Targeted context_stats rebuild for only the specified chunk_ids.
1870    /// Used by record() to avoid a full-table rebuild on every call.
1871    /// curate() still calls the full rebuild_context_stats() for periodic accuracy.
1872    fn rebuild_context_stats_for(&self, chunk_ids: &HashSet<String>, now: &str) -> Result<()> {
1873        if chunk_ids.is_empty() {
1874            return Ok(());
1875        }
1876        for chunk_id in chunk_ids {
1877            self.storage.conn_execute(
1878                "DELETE FROM chunk_context_stats WHERE chunk_id=?",
1879                rusqlite::params![chunk_id],
1880            )?;
1881            self.storage.conn_execute(
1882                "INSERT OR IGNORE INTO chunk_context_stats
1883                 (chunk_id, context_key, success_count, failure_count,
1884                  positive_feedback, negative_feedback, last_updated_at)
1885                 SELECT chunk_id, context_key, success_count, failure_count,
1886                        positive_feedback, negative_feedback, ?
1887                 FROM chunk_context_stats_base WHERE chunk_id=?",
1888                rusqlite::params![now, chunk_id],
1889            )?;
1890            self.storage.conn_execute(
1891                "INSERT INTO chunk_context_stats
1892                 (chunk_id, context_key, success_count, failure_count,
1893                  positive_feedback, negative_feedback, last_updated_at)
1894                 SELECT chunk_id, context_key,
1895                        SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1896                        SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1897                        0, 0, ?
1898                 FROM confidence_evidence
1899                 WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1900                   AND chunk_id=?
1901                 GROUP BY chunk_id, context_key
1902                 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1903                   success_count=success_count+excluded.success_count,
1904                   failure_count=failure_count+excluded.failure_count,
1905                   last_updated_at=excluded.last_updated_at",
1906                rusqlite::params![now, chunk_id],
1907            )?;
1908            self.storage.conn_execute(
1909                "INSERT INTO chunk_context_stats
1910                 (chunk_id, context_key, success_count, failure_count,
1911                  positive_feedback, negative_feedback, last_updated_at)
1912                 SELECT chunk_id, context_key, 0, 0,
1913                        SUM(CASE WHEN signal='up' THEN 1 ELSE 0 END),
1914                        SUM(CASE WHEN signal='down' THEN 1 ELSE 0 END), ?
1915                 FROM feedback_events
1916                 WHERE context_key IS NOT NULL AND chunk_id=?
1917                   AND ts > COALESCE((
1918                     SELECT state_updated_at FROM chunks
1919                     WHERE id=? AND state_reason='restore'
1920                   ), '')
1921                 GROUP BY chunk_id, context_key
1922                 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1923                   positive_feedback=positive_feedback+excluded.positive_feedback,
1924                   negative_feedback=negative_feedback+excluded.negative_feedback,
1925                   last_updated_at=excluded.last_updated_at",
1926                rusqlite::params![now, chunk_id, chunk_id],
1927            )?;
1928        }
1929        Ok(())
1930    }
1931
1932    fn refresh_governance_evidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1933        let rows = self.storage.query_chunks_params(
1934            "SELECT COALESCE(actor, 'anonymous:' || source) AS actor_key,
1935                    signal, strength, ts
1936             FROM feedback_events
1937             WHERE chunk_id=?
1938               AND ts > COALESCE((
1939                 SELECT state_updated_at FROM chunks
1940                 WHERE id=? AND state_reason='restore'
1941               ), '')",
1942            rusqlite::params![chunk_id, chunk_id],
1943        )?;
1944        let mut actor_contributions: HashMap<String, f64> = HashMap::new();
1945        for row in rows {
1946            let actor = row
1947                .get("actor_key")
1948                .and_then(Value::as_str)
1949                .unwrap_or("anonymous")
1950                .to_string();
1951            let age_days = row
1952                .get("ts")
1953                .and_then(Value::as_str)
1954                .map(|ts| iso_days_diff(now, ts).max(0) as f64)
1955                .unwrap_or(0.0);
1956            let recency_weight = 0.5_f64.powf(age_days / 90.0);
1957            let strength = row.get("strength").and_then(Value::as_f64).unwrap_or(0.0);
1958            let signed = if row.get("signal").and_then(Value::as_str) == Some("down") {
1959                strength
1960            } else {
1961                -strength
1962            };
1963            *actor_contributions.entry(actor).or_default() += signed * recency_weight;
1964        }
1965        let mut score = 0.0_f64;
1966        let mut actor_count = 0_i64;
1967        for contribution in actor_contributions.values().copied() {
1968            let contribution = contribution.clamp(-1.0, 1.0);
1969            score += contribution;
1970            if contribution > 0.0 {
1971                actor_count += 1;
1972            }
1973        }
1974        let score = score.max(0.0);
1975        if score >= 2.0 && actor_count >= 2 {
1976            self.storage.upsert_governance_proposal(
1977                &gen_uuid(),
1978                chunk_id,
1979                "review_applicability",
1980                "Weighted negative feedback",
1981                score.ceil() as i64,
1982                score,
1983                actor_count,
1984                now,
1985            )?;
1986        } else {
1987            self.storage.conn_execute(
1988                "UPDATE governance_proposals
1989                 SET state='rejected', evidence_count=?, evidence_score=?, actor_count=?, updated_at=?
1990                 WHERE chunk_id=? AND state='pending'",
1991                rusqlite::params![score.ceil() as i64, score, actor_count, now, chunk_id],
1992            )?;
1993        }
1994        Ok(())
1995    }
1996
1997    fn enqueue_evolve_if_needed(&self, now: &str) -> Result<()> {
1998        let ready = count_query(
1999            &self.storage,
2000            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2001        )?;
2002        let oldest = self
2003            .storage
2004            .query_chunks("SELECT MIN(ts) AS oldest FROM episodic_log WHERE distill_state='new'")?
2005            .first()
2006            .and_then(|row| row.get("oldest"))
2007            .and_then(Value::as_str)
2008            .map(str::to_string);
2009        let age_due = oldest
2010            .as_deref()
2011            .is_some_and(|ts| ts <= hours_ago(now, self.evolve_schedule_interval_hours).as_str());
2012        let governance_pending = count_query(
2013            &self.storage,
2014            "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
2015        )?;
2016        // A chunk whose proposal already has enough evidence should trigger evolve
2017        // immediately — don't wait for 3 different pending proposals.
2018        let governance_ready = count_query_params(
2019            &self.storage,
2020            "SELECT COUNT(*) FROM governance_proposals
2021             WHERE state='pending'
2022               AND evidence_score >= ? AND actor_count >= 2",
2023            rusqlite::params![self.governance_archive_threshold as f64],
2024        )?;
2025        if ready >= self.evolve_threshold
2026            || (ready > 0 && age_due)
2027            || governance_pending >= self.governance_evolve_threshold
2028            || governance_ready > 0
2029        {
2030            let reason = if ready >= self.evolve_threshold {
2031                "threshold"
2032            } else if governance_ready > 0 {
2033                "governance_ready"
2034            } else if governance_pending >= self.governance_evolve_threshold {
2035                "governance"
2036            } else {
2037                "scheduled"
2038            };
2039            self.storage.request_evolve(&gen_uuid(), reason, now)?;
2040        }
2041        Ok(())
2042    }
2043
2044    // ------------------------------------------------------------------
2045    // Public API 3: add
2046    // ------------------------------------------------------------------
2047
2048    pub fn add(
2049        &self,
2050        content: &str,
2051        kind: &str,
2052        trigger_desc: Option<&str>,
2053        anti_trigger_desc: Option<&str>,
2054        source: &str,
2055        skill_name: Option<&str>,
2056    ) -> Result<String> {
2057        if !matches!(kind, "note" | "skill") {
2058            return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
2059        }
2060        if !matches!(source, "chat" | "manual" | "doc" | "agent") {
2061            return Err(InnateError::InvalidState(format!(
2062                "invalid source: {source}"
2063            )));
2064        }
2065
2066        let (content, action) = self.sanitize_content(content);
2067        if action == SanitizeAction::Discard {
2068            return Ok(String::new());
2069        }
2070
2071        let trigger_clean = trigger_desc.and_then(|t| {
2072            let (cleaned, act) = self.sanitizer.sanitize(t);
2073            if act == SanitizeAction::Discard {
2074                None
2075            } else {
2076                Some(cleaned)
2077            }
2078        });
2079        let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2080            let (cleaned, act) = self.sanitizer.sanitize(t);
2081            if act == SanitizeAction::Discard {
2082                None
2083            } else {
2084                Some(cleaned)
2085            }
2086        });
2087
2088        let h = content_hash(&content);
2089        if self.storage.is_hash_invalidated(&h)? {
2090            return Err(InnateError::InvalidState(
2091                "content hash is invalidated".into(),
2092            ));
2093        }
2094
2095        // Idempotency check
2096        let existing = self.storage.query_chunks_params(
2097            "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2098            rusqlite::params![h],
2099        )?;
2100        if let Some(e) = existing.first() {
2101            if let Some(id) = e.get("id").and_then(Value::as_str) {
2102                return Ok(id.to_string());
2103            }
2104        }
2105
2106        let now = utc_now_iso();
2107        let chunk_id = gen_uuid();
2108        let redacted = action == SanitizeAction::Redact;
2109
2110        let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
2111            (
2112                "captured",
2113                "pending",
2114                if redacted { 0.4 } else { 0.60 },
2115                0,
2116                "init:captured_agent",
2117            )
2118        } else if kind == "skill" {
2119            (
2120                "installed",
2121                "active",
2122                if redacted { 0.4 } else { 0.85 },
2123                1,
2124                "init:installed",
2125            )
2126        } else {
2127            (
2128                "captured",
2129                "active",
2130                if redacted { 0.4 } else { 0.60 },
2131                0,
2132                "init:captured",
2133            )
2134        };
2135
2136        // Embedding — fall back to embedding_pending on failure.
2137        let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2138        let (cvec, tvec, embed_ver, final_state_reason) = match (
2139            self.embedding.embed_content(&content),
2140            self.embedding.embed_trigger(trigger_str),
2141        ) {
2142            (Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
2143            _ => (
2144                vec![],
2145                vec![],
2146                0i64,
2147                format!("embedding_pending:target={state}"),
2148            ),
2149        };
2150
2151        let tokens = estimate_tokens(&content) as i64;
2152        let row = ChunkRow {
2153            id: chunk_id.clone(),
2154            skill_name: skill_name.map(str::to_string),
2155            content: content.clone(),
2156            trigger_desc: trigger_clean.clone(),
2157            anti_trigger_desc: anti_trigger_clean.clone(),
2158            content_hash: h,
2159            token_count: Some(tokens),
2160            origin: origin.to_string(),
2161            source: Some(source.to_string()),
2162            protected: prot,
2163            state: state.to_string(),
2164            state_reason: Some(final_state_reason),
2165            confidence: conf,
2166            confidence_reason: Some(format!("init:{origin}")),
2167            version: 1,
2168            embed_version: embed_ver,
2169            created_at: now.clone(),
2170            updated_at: now.clone(),
2171            ..Default::default()
2172        };
2173
2174        self.storage.begin_immediate()?;
2175        let result = (|| -> Result<()> {
2176            self.storage.insert_chunk(&row)?;
2177            if embed_ver > 0 {
2178                self.storage
2179                    .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2180                self.storage
2181                    .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2182            }
2183            self.storage.commit()
2184        })();
2185        if result.is_err() {
2186            let _ = self.storage.rollback();
2187        }
2188        result?;
2189        Ok(chunk_id)
2190    }
2191
2192    // ------------------------------------------------------------------
2193    // Public API 4: spark
2194    // ------------------------------------------------------------------
2195
2196    pub fn spark(
2197        &self,
2198        content: &str,
2199        trigger_desc: Option<&str>,
2200        anti_trigger_desc: Option<&str>,
2201    ) -> Result<String> {
2202        let (content, action) = self.sanitize_content(content);
2203        if action == SanitizeAction::Discard {
2204            return Ok(String::new());
2205        }
2206
2207        let trigger_clean = trigger_desc.and_then(|t| {
2208            let (cleaned, act) = self.sanitizer.sanitize(t);
2209            if act == SanitizeAction::Discard {
2210                None
2211            } else {
2212                Some(cleaned)
2213            }
2214        });
2215        let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2216            let (cleaned, act) = self.sanitizer.sanitize(t);
2217            if act == SanitizeAction::Discard {
2218                None
2219            } else {
2220                Some(cleaned)
2221            }
2222        });
2223
2224        let h = content_hash(&content);
2225        if self.storage.is_hash_invalidated(&h)? {
2226            return Err(InnateError::InvalidState(
2227                "content hash is invalidated".into(),
2228            ));
2229        }
2230
2231        // Quick related recall (trace=false, no recursion risk)
2232        let related: Vec<String> = self
2233            .recall(
2234                &content,
2235                2000,
2236                false,
2237                false,
2238                Some(5),
2239                "sdk",
2240                "false",
2241                false,
2242                "off",
2243            )
2244            .map(|r| {
2245                r.knowledge
2246                    .iter()
2247                    .filter_map(|c| c["id"].as_str().map(str::to_string))
2248                    .collect()
2249            })
2250            .unwrap_or_default();
2251
2252        let now = utc_now_iso();
2253        let chunk_id = gen_uuid();
2254        let tokens = estimate_tokens(&content) as i64;
2255
2256        let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2257        let (cvec, tvec, embed_ver, state_reason) = match (
2258            self.embedding.embed_content(&content),
2259            self.embedding.embed_trigger(trigger_str),
2260        ) {
2261            (Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
2262            _ => (
2263                vec![],
2264                vec![],
2265                0i64,
2266                "embedding_pending:target=active".to_string(),
2267            ),
2268        };
2269
2270        let row = ChunkRow {
2271            id: chunk_id.clone(),
2272            content: content.clone(),
2273            trigger_desc: trigger_clean.clone(),
2274            anti_trigger_desc: anti_trigger_clean.clone(),
2275            content_hash: h,
2276            token_count: Some(tokens),
2277            origin: "spark".to_string(),
2278            maturity: Some("seed".to_string()),
2279            related_ids: if related.is_empty() {
2280                None
2281            } else {
2282                Some(related.join(","))
2283            },
2284            state: "active".to_string(),
2285            state_reason: Some(state_reason),
2286            confidence: 0.5,
2287            version: 1,
2288            embed_version: embed_ver,
2289            created_at: now.clone(),
2290            updated_at: now.clone(),
2291            ..Default::default()
2292        };
2293
2294        self.storage.begin_immediate()?;
2295        let result = (|| -> Result<()> {
2296            self.storage.insert_chunk(&row)?;
2297            if embed_ver > 0 {
2298                self.storage
2299                    .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2300                self.storage
2301                    .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2302            }
2303            self.storage.commit()
2304        })();
2305        if result.is_err() {
2306            let _ = self.storage.rollback();
2307        }
2308        result?;
2309        Ok(chunk_id)
2310    }
2311
2312    // ------------------------------------------------------------------
2313    // Public API 5: mature_spark / promote_spark / drop_spark
2314    // ------------------------------------------------------------------
2315
2316    pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
2317        let chunk = self
2318            .storage
2319            .get_chunk(spark_id)?
2320            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2321        if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
2322            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2323        }
2324        let current = chunk
2325            .get("maturity")
2326            .and_then(Value::as_str)
2327            .unwrap_or("seed");
2328        let valid_next: &[&str] = match current {
2329            "seed" => &["sprouting"],
2330            "sprouting" => &["incubating"],
2331            _ => {
2332                return Err(InnateError::InvalidState(format!(
2333                    "spark {spark_id} already {current}"
2334                )))
2335            }
2336        };
2337        if current == to {
2338            return Ok(());
2339        }
2340        if !valid_next.contains(&to) {
2341            return Err(InnateError::InvalidState(format!(
2342                "invalid spark maturity transition: {current} -> {to}"
2343            )));
2344        }
2345        let now = utc_now_iso();
2346        self.storage.begin_immediate()?;
2347        let result = self
2348            .storage
2349            .query_chunks_params(
2350                "UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
2351                rusqlite::params![to, now, spark_id],
2352            )
2353            .and_then(|_| self.storage.commit());
2354        if result.is_err() {
2355            let _ = self.storage.rollback();
2356        }
2357        result.map(|_| ())
2358    }
2359
2360    pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
2361        let spark = self
2362            .storage
2363            .get_chunk(spark_id)?
2364            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2365        if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2366            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2367        }
2368        let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2369        if maturity == "promoted" || maturity == "dropped" {
2370            return Err(InnateError::InvalidState(format!(
2371                "spark {spark_id} already {maturity}"
2372            )));
2373        }
2374        if !matches!(to, "note" | "skill") {
2375            return Err(InnateError::InvalidState(format!(
2376                "invalid spark promotion target: {to}"
2377            )));
2378        }
2379
2380        let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
2381        let (content, action) = self.sanitize_content(content);
2382        if action == SanitizeAction::Discard {
2383            return Err(InnateError::InvalidState(
2384                "sanitize discard on promote".into(),
2385            ));
2386        }
2387
2388        let promoted_hash = content_hash(&content);
2389        if self.storage.is_hash_invalidated(&promoted_hash)? {
2390            return Err(InnateError::InvalidState(
2391                "spark content hash is invalidated".into(),
2392            ));
2393        }
2394
2395        let now = utc_now_iso();
2396
2397        // Idempotency: existing non-spark chunk with same hash
2398        let existing = self.storage.query_chunks_params(
2399            "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2400            rusqlite::params![promoted_hash],
2401        )?;
2402        if let Some(e) = existing.first() {
2403            if let Some(id) = e.get("id").and_then(Value::as_str) {
2404                let id = id.to_string();
2405                self.storage.begin_immediate()?;
2406                let result = self
2407                    .storage
2408                    .query_chunks_params(
2409                        "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2410                        rusqlite::params![now, spark_id],
2411                    )
2412                    .and_then(|_| self.storage.commit());
2413                if result.is_err() {
2414                    let _ = self.storage.rollback();
2415                    result?;
2416                }
2417                return Ok(id);
2418            }
2419        }
2420
2421        let (state, conf, prot, origin, state_reason) = if to == "skill" {
2422            ("active", 0.85, 1, "installed", "init:installed")
2423        } else {
2424            ("active", 0.60, 0, "captured", "init:captured")
2425        };
2426
2427        let conf = if action == SanitizeAction::Redact {
2428            0.4_f64
2429        } else {
2430            conf
2431        };
2432        let new_id = gen_uuid();
2433        let trigger = spark.get("trigger_desc").and_then(Value::as_str);
2434        let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
2435
2436        let row = ChunkRow {
2437            id: new_id.clone(),
2438            content: content.clone(),
2439            trigger_desc: trigger.map(str::to_string),
2440            anti_trigger_desc: anti.map(str::to_string),
2441            content_hash: promoted_hash,
2442            token_count: Some(estimate_tokens(&content) as i64),
2443            origin: origin.to_string(),
2444            source: Some("manual".to_string()),
2445            protected: prot,
2446            state: state.to_string(),
2447            state_reason: Some(state_reason.to_string()),
2448            confidence: conf,
2449            confidence_reason: Some("manual_set".to_string()),
2450            parent_id: Some(spark_id.to_string()),
2451            version: 1,
2452            embed_version: 1,
2453            created_at: now.clone(),
2454            updated_at: now.clone(),
2455            ..Default::default()
2456        };
2457
2458        let cvec = self.embedding.embed_content(&content)?;
2459        let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
2460
2461        self.storage.begin_immediate()?;
2462        let result = (|| -> Result<()> {
2463            self.storage.insert_chunk(&row)?;
2464            self.storage
2465                .insert_vec_content(&new_id, &pack_embedding(&cvec))?;
2466            self.storage
2467                .insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
2468            self.storage.query_chunks_params(
2469                "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2470                rusqlite::params![now, spark_id],
2471            )?;
2472            self.storage.commit()
2473        })();
2474        if result.is_err() {
2475            let _ = self.storage.rollback();
2476        }
2477        result?;
2478        Ok(new_id)
2479    }
2480
2481    pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
2482        let spark = self
2483            .storage
2484            .get_chunk(spark_id)?
2485            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2486        if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2487            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2488        }
2489        let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2490        if maturity == "promoted" {
2491            return Err(InnateError::InvalidState(format!(
2492                "spark {spark_id} already promoted"
2493            )));
2494        }
2495        if maturity == "dropped" {
2496            return Ok(());
2497        }
2498        let now = utc_now_iso();
2499        let reason_str = if reason.is_empty() {
2500            "dropped".to_string()
2501        } else {
2502            format!("dropped:{reason}")
2503        };
2504        self.storage.begin_immediate()?;
2505        let result = self
2506            .storage
2507            .query_chunks_params(
2508                "UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
2509                rusqlite::params![reason_str, now, spark_id],
2510            )
2511            .and_then(|_| self.storage.commit());
2512        if result.is_err() {
2513            let _ = self.storage.rollback();
2514        }
2515        result.map(|_| ())
2516    }
2517
2518    // ------------------------------------------------------------------
2519    // Public API 6: approve / archive / invalidate / restore
2520    // ------------------------------------------------------------------
2521
2522    pub fn approve(&self, chunk_id: &str) -> Result<()> {
2523        let chunk = self
2524            .storage
2525            .get_chunk(chunk_id)?
2526            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2527        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2528            return Err(InnateError::InvalidState(
2529                "spark lifecycle uses promote_spark() or invalidate()".into(),
2530            ));
2531        }
2532        if chunk.get("state").and_then(Value::as_str) == Some("active") {
2533            return Ok(());
2534        }
2535        if chunk.get("state").and_then(Value::as_str) != Some("pending") {
2536            return Err(InnateError::InvalidState(
2537                "approve requires pending chunk".into(),
2538            ));
2539        }
2540        let now = utc_now_iso();
2541        self.storage.begin_immediate()?;
2542        let result = (|| -> Result<()> {
2543            self.storage
2544                .update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
2545            self.storage.query_chunks_params(
2546                "UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
2547                rusqlite::params![now, chunk_id],
2548            )?;
2549            self.storage.commit()
2550        })();
2551        if result.is_err() {
2552            let _ = self.storage.rollback();
2553        }
2554        result
2555    }
2556
2557    pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
2558        let chunk = self
2559            .storage
2560            .get_chunk(chunk_id)?
2561            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2562        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2563            return Err(InnateError::InvalidState(
2564                "spark lifecycle uses drop_spark() or invalidate()".into(),
2565            ));
2566        }
2567        let now = utc_now_iso();
2568        self.storage.begin_immediate()?;
2569        let result = self
2570            .storage
2571            .update_chunk_state(chunk_id, "archived", Some(reason), &now)
2572            .and_then(|_| self.storage.commit());
2573        if result.is_err() {
2574            let _ = self.storage.rollback();
2575        }
2576        result
2577    }
2578
2579    pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
2580        let chunk = self
2581            .storage
2582            .get_chunk(chunk_id)?
2583            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2584        let h = chunk
2585            .get("content_hash")
2586            .and_then(Value::as_str)
2587            .unwrap_or("")
2588            .to_string();
2589        let now = utc_now_iso();
2590        let reason_str = if reason.is_empty() {
2591            "invalidated".to_string()
2592        } else {
2593            format!("invalidated:{reason}")
2594        };
2595
2596        self.storage.begin_immediate()?;
2597        let result = (|| -> Result<()> {
2598            self.storage.query_chunks_params(
2599                "UPDATE chunks
2600                 SET state='archived', confidence=0.0, confidence_base=0.0,
2601                     confidence_reason='invalidated', state_reason=?,
2602                     state_updated_at=?, updated_at=?
2603                 WHERE id=?",
2604                rusqlite::params![reason_str, now, now, chunk_id],
2605            )?;
2606            self.storage.query_chunks_params(
2607                "UPDATE chunks
2608                 SET state='archived', confidence=0.0, confidence_base=0.0,
2609                     confidence_reason='invalidated',
2610                     state_reason='invalidated:same_hash',
2611                     state_updated_at=?, updated_at=?
2612                 WHERE content_hash=? AND id!=?",
2613                rusqlite::params![now, now, h, chunk_id],
2614            )?;
2615            self.storage.conn_execute(
2616                "DELETE FROM confidence_evidence
2617                 WHERE chunk_id IN (SELECT id FROM chunks WHERE content_hash=?)",
2618                rusqlite::params![h],
2619            )?;
2620            self.storage
2621                .insert_invalidated_hash(&h, Some(reason), &now)?;
2622            self.storage.commit()
2623        })();
2624        if result.is_err() {
2625            let _ = self.storage.rollback();
2626        }
2627        result
2628    }
2629
2630    pub fn restore(&self, chunk_id: &str) -> Result<()> {
2631        let chunk = self
2632            .storage
2633            .get_chunk(chunk_id)?
2634            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2635        let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
2636        if state == "active" {
2637            return Ok(());
2638        }
2639        if state != "archived" {
2640            return Err(InnateError::InvalidState(
2641                "restore requires archived chunk".into(),
2642            ));
2643        }
2644        let was_invalidated = chunk
2645            .get("state_reason")
2646            .and_then(Value::as_str)
2647            .map(|r| r.starts_with("invalidated"))
2648            .unwrap_or(false);
2649        let h = chunk
2650            .get("content_hash")
2651            .and_then(Value::as_str)
2652            .unwrap_or("")
2653            .to_string();
2654        let now = utc_now_iso();
2655
2656        self.storage.begin_immediate()?;
2657        let result = (|| -> Result<()> {
2658            self.storage
2659                .update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
2660            if was_invalidated {
2661                self.storage.query_chunks_params(
2662                    "DELETE FROM invalidated_hashes WHERE content_hash=?",
2663                    rusqlite::params![h],
2664                )?;
2665            }
2666            self.storage.query_chunks_params(
2667                "UPDATE chunks
2668                 SET confidence_base=CASE WHEN ? THEN 0.5 ELSE confidence_base END,
2669                     confidence=CASE WHEN ? THEN 0.5 ELSE confidence END,
2670                     confidence_reason='restore', updated_at=?
2671                 WHERE id=?",
2672                rusqlite::params![was_invalidated, was_invalidated, now, chunk_id],
2673            )?;
2674            self.storage.conn_execute(
2675                "DELETE FROM confidence_evidence
2676                 WHERE chunk_id=? AND kind IN ('feedback_up','feedback_down')",
2677                rusqlite::params![chunk_id],
2678            )?;
2679            self.storage.conn_execute(
2680                "UPDATE chunk_context_stats_base
2681                 SET positive_feedback=0, negative_feedback=0
2682                 WHERE chunk_id=?",
2683                rusqlite::params![chunk_id],
2684            )?;
2685            self.storage.conn_execute(
2686                "UPDATE governance_proposals
2687                 SET state='rejected', reason=reason || '; restored by user', updated_at=?
2688                 WHERE chunk_id=? AND state IN ('pending','accepted')",
2689                rusqlite::params![now, chunk_id],
2690            )?;
2691            self.recompute_chunk_confidence(chunk_id, &now)?;
2692            self.rebuild_context_stats_for(&HashSet::from([chunk_id.to_string()]), &now)?;
2693            self.storage.commit()
2694        })();
2695        if result.is_err() {
2696            let _ = self.storage.rollback();
2697        }
2698        result
2699    }
2700
2701    // ------------------------------------------------------------------
2702    // Public API 7: evolve
2703    // ------------------------------------------------------------------
2704
2705    pub fn evolve(&self, trigger: &str) -> Result<Value> {
2706        if !matches!(trigger, "manual" | "scheduled" | "threshold") {
2707            return Err(InnateError::InvalidState(format!(
2708                "invalid evolve trigger: {trigger}"
2709            )));
2710        }
2711        let evolve_started_at = utc_now_iso();
2712        let request_id = self.storage.claim_evolve_request(
2713            &evolve_started_at,
2714            &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
2715        )?;
2716        // Issue 7: scheduled without pending request → still run curate (time-based maintenance).
2717        if trigger == "scheduled" && request_id.is_none() {
2718            let curator = Arc::clone(&self.curator);
2719            let curate = curator.run(self, &CurateScope::default())?;
2720            return Ok(json!({
2721                "distilled": 0,
2722                "curate": self.format_curate_report(&curate),
2723                "skipped": "no_evolve_request"
2724            }));
2725        }
2726
2727        // Issue 8: threshold / token-limit gates should not suppress curate.
2728        if trigger == "threshold" {
2729            let rows = self.storage.query_chunks(
2730                "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
2731            )?;
2732            let cnt = rows
2733                .first()
2734                .and_then(|r| r.get("cnt"))
2735                .and_then(Value::as_i64)
2736                .unwrap_or(0);
2737            if cnt < self.evolve_threshold {
2738                let curator = Arc::clone(&self.curator);
2739                let curate = curator.run(self, &CurateScope::default())?;
2740                if let Some(ref id) = request_id {
2741                    self.storage.finish_evolve_request(
2742                        id, "completed", Some("below_threshold"), &utc_now_iso(),
2743                    )?;
2744                }
2745                return Ok(json!({
2746                    "distilled": 0,
2747                    "curate": self.format_curate_report(&curate),
2748                    "skipped": "below_threshold"
2749                }));
2750            }
2751
2752            if let Some(limit) = self
2753                .storage
2754                .get_meta("max_distill_tokens_per_period")?
2755                .and_then(|value| value.parse::<i64>().ok())
2756                .filter(|value| *value > 0)
2757            {
2758                let period_start = self.distill_token_period_start(&utc_now_iso())?;
2759                let rows = self.storage.query_chunks_params(
2760                    "SELECT COALESCE(SUM(distill_prompt_tokens),0)
2761                            + COALESCE(SUM(distill_completion_tokens),0) AS used
2762                     FROM episodic_log
2763                     WHERE distill_accounted_at >= ?",
2764                    rusqlite::params![period_start],
2765                )?;
2766                let used_tokens = rows
2767                    .first()
2768                    .and_then(|row| row.get("used"))
2769                    .and_then(Value::as_i64)
2770                    .unwrap_or(0);
2771                if used_tokens >= limit {
2772                    let curator = Arc::clone(&self.curator);
2773                    let curate = curator.run(self, &CurateScope::default())?;
2774                    if let Some(ref id) = request_id {
2775                        self.storage.finish_evolve_request(
2776                            id, "completed", Some("distill_token_limit"), &utc_now_iso(),
2777                        )?;
2778                    }
2779                    return Ok(json!({
2780                        "distilled": 0,
2781                        "curate": self.format_curate_report(&curate),
2782                        "skipped": "distill_token_limit",
2783                        "distill_tokens_used": used_tokens,
2784                        "distill_token_limit": limit,
2785                        "period_start": period_start,
2786                    }));
2787                }
2788            }
2789        }
2790
2791        let result = (|| -> Result<Value> {
2792            let retry_cutoff = minutes_ago(&utc_now_iso(), 5);
2793            self.storage.conn_execute(
2794                "UPDATE episodic_log
2795                 SET distill_state='new', distill_note='retry_failed',
2796                     distill_locked_at=NULL, distill_run_id=NULL
2797                 WHERE distill_state='failed'
2798                   AND distill_attempts < 3
2799                   AND COALESCE(distill_accounted_at, ts) < ?",
2800                rusqlite::params![retry_cutoff],
2801            )?;
2802            let distilled = self.distill_batch()?;
2803            let curator = Arc::clone(&self.curator);
2804            let curate = curator.run(self, &CurateScope::default())?;
2805            Ok(json!({
2806                "distilled": distilled,
2807                "curate": self.format_curate_report(&curate),
2808            }))
2809        })();
2810        if let Some(ref id) = request_id {
2811            let (state, note) = match &result {
2812                Ok(_) => ("completed", None),
2813                Err(error) => ("failed", Some(error.to_string())),
2814            };
2815            self.storage
2816                .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
2817        }
2818        if result.is_ok() {
2819            self.storage
2820                .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
2821        }
2822
2823        // Issue 9: if more 'new' logs remain after this batch, self-queue so the next evolve drains them.
2824        if result.is_ok() {
2825            let remaining = count_query(
2826                &self.storage,
2827                "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2828            )?;
2829            if remaining > 0 {
2830                let _ = self.storage.request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
2831            }
2832        }
2833        result
2834    }
2835
2836    fn format_curate_report(&self, curate: &CurateReport) -> Value {
2837        json!({
2838            "archived": curate.archived.len(),
2839            "deduped": curate.deduped.len(),
2840            "decayed": curate.decayed.len(),
2841            "recovered": curate.recovered.len(),
2842            "orphans": curate.orphans.len(),
2843            "warnings": curate.warnings,
2844        })
2845    }
2846
2847    fn distill_batch(&self) -> Result<usize> {
2848        let run_id = gen_uuid();
2849        let now = utc_now_iso();
2850
2851        // Atomically claim a batch of 'new' logs → mark 'screening'.
2852        self.storage.begin_immediate()?;
2853        let logs = match self
2854            .storage
2855            .claim_distill_batch(&run_id, self.distill_batch_size, &now)
2856        {
2857            Ok(l) => {
2858                self.storage.commit()?;
2859                l
2860            }
2861            Err(e) => {
2862                let _ = self.storage.rollback();
2863                return Err(e);
2864            }
2865        };
2866
2867        let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
2868        let mut failed_logs = HashSet::new();
2869        let mut distill_errors = Vec::new();
2870        for log in &logs {
2871            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
2872            match self.distiller.distill_with_context(log, &logs) {
2873                Ok(chunks) => {
2874                    if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
2875                        let error = "distiller returned a chunk for an unknown source log";
2876                        failed_logs.insert(log_id.to_string());
2877                        distill_errors.push(format!("{log_id}: {error}"));
2878                        self.finish_distill_log(
2879                            log_id,
2880                            "failed",
2881                            Some(&format!("distill_failed:{error}")),
2882                            estimate_distill_prompt_tokens(log, &logs),
2883                            0,
2884                        )?;
2885                        continue;
2886                    }
2887                    chunks_by_log.insert(log_id.to_string(), chunks);
2888                }
2889                Err(error) => {
2890                    let note = format!("distill_failed:{error}");
2891                    failed_logs.insert(log_id.to_string());
2892                    distill_errors.push(format!("{log_id}: {error}"));
2893                    self.finish_distill_log(
2894                        log_id,
2895                        "failed",
2896                        Some(&note),
2897                        estimate_distill_prompt_tokens(log, &logs),
2898                        0,
2899                    )?;
2900                }
2901            }
2902        }
2903
2904        let mut count = 0;
2905        let provenance = self.distiller.provenance();
2906        for log in &logs {
2907            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
2908            if failed_logs.contains(log_id) {
2909                continue;
2910            }
2911            let prompt_tokens = estimate_distill_prompt_tokens(log, &logs);
2912            let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
2913            let completion_tokens = chunks
2914                .iter()
2915                .map(estimate_distilled_chunk_tokens)
2916                .sum::<i64>();
2917            if chunks.is_empty() {
2918                self.finish_distill_log(
2919                    log_id,
2920                    "discarded",
2921                    Some("insufficient_material"),
2922                    prompt_tokens,
2923                    completion_tokens,
2924                )?;
2925                continue;
2926            }
2927
2928            // Prepare all chunks + embeddings outside the write transaction so
2929            // slow embedding calls do not hold an exclusive SQLite lock.
2930            // Supports N >= 1 chunks per log (e.g. multi-concept LLM distillation).
2931            // Bad individual chunks are skipped; valid siblings still survive.
2932            struct PreparedChunk {
2933                row: ChunkRow,
2934                cvec_bytes: Vec<u8>,
2935                tvec_bytes: Vec<u8>,
2936            }
2937            let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
2938            let mut embedding_failures = 0_usize;
2939            for dc in chunks {
2940                let (content, action) = self.sanitize_content(&dc.content);
2941                if action == SanitizeAction::Discard {
2942                    continue; // skip this chunk, try others
2943                }
2944                let h = content_hash(&content);
2945                if self.storage.is_hash_invalidated(&h)? {
2946                    continue; // skip invalidated content, try others
2947                }
2948                let redacted = action == SanitizeAction::Redact;
2949                let conf = if redacted { 0.4 } else { 0.55 };
2950                let now2 = utc_now_iso();
2951                let chunk_id = gen_uuid();
2952                let tokens = estimate_tokens(&content) as i64;
2953                let row = ChunkRow {
2954                    id: chunk_id,
2955                    content: content.clone(),
2956                    trigger_desc: dc.trigger_desc.clone(),
2957                    anti_trigger_desc: dc.anti_trigger_desc,
2958                    content_hash: h,
2959                    token_count: Some(tokens),
2960                    origin: "distilled".to_string(),
2961                    distilled_from: Some(dc.source_log_id),
2962                    distill_provider: provenance.provider.clone(),
2963                    distill_model: provenance.model.clone(),
2964                    distill_prompt_version: provenance.prompt_version.clone(),
2965                    state: "pending".to_string(),
2966                    state_reason: Some("init:distilled".to_string()),
2967                    confidence: conf,
2968                    confidence_reason: Some("init:distilled".to_string()),
2969                    version: 1,
2970                    embed_version: 1,
2971                    created_at: now2.clone(),
2972                    updated_at: now2,
2973                    ..Default::default()
2974                };
2975                let cvec = match self.embedding.embed_content(&content) {
2976                    Ok(v) => v,
2977                    Err(_) => {
2978                        embedding_failures += 1;
2979                        continue;
2980                    }
2981                };
2982                let tvec = match self
2983                    .embedding
2984                    .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
2985                {
2986                    Ok(v) => v,
2987                    Err(_) => {
2988                        embedding_failures += 1;
2989                        continue;
2990                    }
2991                };
2992                prepared.push(PreparedChunk {
2993                    row,
2994                    cvec_bytes: pack_embedding(&cvec),
2995                    tvec_bytes: pack_embedding(&tvec),
2996                });
2997            }
2998
2999            if prepared.is_empty() {
3000                let note = if embedding_failures > 0 {
3001                    "embedding_failed"
3002                } else {
3003                    "all_chunks_filtered"
3004                };
3005                self.finish_distill_log(
3006                    log_id,
3007                    if embedding_failures > 0 {
3008                        "failed"
3009                    } else {
3010                        "discarded"
3011                    },
3012                    Some(note),
3013                    prompt_tokens,
3014                    completion_tokens,
3015                )?;
3016                continue;
3017            }
3018
3019            // Write all chunks, vectors, token accounting, and terminal log state atomically.
3020            let accounted_at = utc_now_iso();
3021            self.storage.begin_immediate()?;
3022            let write_result = (|| -> Result<()> {
3023                for pc in &prepared {
3024                    self.storage.insert_chunk(&pc.row)?;
3025                    self.storage.insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
3026                    self.storage.insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
3027                }
3028                let note = (embedding_failures > 0)
3029                    .then(|| format!("partial_embedding_failures:{embedding_failures}"));
3030                self.storage.finish_distill_log(
3031                    log_id,
3032                    "distilled",
3033                    note.as_deref(),
3034                    prompt_tokens,
3035                    completion_tokens,
3036                    &accounted_at,
3037                )?;
3038                self.storage.commit()
3039            })();
3040            if let Err(error) = write_result {
3041                let _ = self.storage.rollback();
3042                let note = format!("distill_write_failed:{error}");
3043                self.finish_distill_log(
3044                    log_id,
3045                    "failed",
3046                    Some(&note),
3047                    prompt_tokens,
3048                    completion_tokens,
3049                )?;
3050                continue;
3051            }
3052            count += 1;
3053        }
3054        if !distill_errors.is_empty() {
3055            // Log failures but do not abort: successful chunks are already committed and
3056            // failed logs are marked 'failed' for bounded retry. Returning Ok preserves
3057            // evolve request state and allows finish_covered_evolve_requests to run.
3058            eprintln!(
3059                "[innate] distillation partial failure ({} log(s)): {}",
3060                distill_errors.len(),
3061                distill_errors.join("; ")
3062            );
3063        }
3064        Ok(count)
3065    }
3066
3067    fn finish_distill_log(
3068        &self,
3069        log_id: &str,
3070        state: &str,
3071        note: Option<&str>,
3072        prompt_tokens: i64,
3073        completion_tokens: i64,
3074    ) -> Result<()> {
3075        let accounted_at = utc_now_iso();
3076        self.storage.begin_immediate()?;
3077        let result = (|| -> Result<()> {
3078            self.storage.finish_distill_log(
3079                log_id,
3080                state,
3081                note,
3082                prompt_tokens,
3083                completion_tokens,
3084                &accounted_at,
3085            )?;
3086            self.storage.commit()
3087        })();
3088        if result.is_err() {
3089            let _ = self.storage.rollback();
3090        }
3091        result
3092    }
3093
3094    fn distill_token_period_start(&self, now: &str) -> Result<String> {
3095        let window_hours = self
3096            .storage
3097            .get_meta("evolve.distill_token_window_hours")?
3098            .and_then(|value| value.parse::<i64>().ok())
3099            .unwrap_or(24)
3100            .max(1);
3101        Ok(hours_ago(now, window_hours))
3102    }
3103
3104    pub(crate) fn builtin_curate_impl(&self, scope: &CurateScope) -> Result<CurateReport> {
3105        let mut report = CurateReport::default();
3106        let now_iso = utc_now_iso();
3107        if scope.dry_run {
3108            // dry_run: compute report without writing
3109            let archived_count: i64 = count_query(&self.storage,
3110                "SELECT COUNT(*) FROM chunks WHERE origin!='spark' AND protected=0 AND state='active'")?;
3111            report.stats.insert("dry_run".to_string(), json!(true));
3112            report
3113                .stats
3114                .insert("eligible_for_governance".to_string(), json!(archived_count));
3115            return Ok(report);
3116        }
3117
3118        // ── Step 1-4: aggregate (single BEGIN IMMEDIATE, half-open cutoff window) ──
3119        self.storage.begin_immediate()?;
3120        let agg_result = (|| -> Result<()> {
3121            let cutoff_ts = now_iso.clone();
3122
3123            // 1. Rebuild post-4.12 success facts from retained attribution events.
3124            self.storage.conn_execute(
3125                "DELETE FROM chunk_success_traces",
3126                rusqlite::params![],
3127            )?;
3128            self.storage.conn_execute(
3129                "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts)
3130                 SELECT ut.chunk_id, ut.trace_id, MAX(ut.ts)
3131                 FROM usage_trace ut
3132                 WHERE ut.event = 'used'
3133                   AND ut.chunk_id IS NOT NULL
3134                   AND (
3135                     EXISTS (SELECT 1 FROM usage_trace ok
3136                             WHERE ok.trace_id = ut.trace_id
3137                               AND ok.event = 'task_ok' AND ok.chunk_id IS NULL)
3138                     OR EXISTS (SELECT 1 FROM episodic_log el
3139                                WHERE el.trace_id = ut.trace_id AND el.outcome = 'ok')
3140                   )
3141                 GROUP BY ut.chunk_id, ut.trace_id",
3142                rusqlite::params![],
3143            )?;
3144
3145            // 2. Derive success counts from migration baseline + replayable facts.
3146            self.storage.conn_execute(
3147                "WITH cst AS (
3148                   SELECT chunk_id, COUNT(*) AS cnt, MAX(ts) AS max_ts
3149                   FROM chunk_success_traces
3150                   GROUP BY chunk_id
3151                 )
3152                 UPDATE chunks SET
3153                   used_success_count = used_success_count_base
3154                     + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3155                   success_trace_ids_count = used_success_count_base
3156                     + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3157                   last_success_at = COALESCE(
3158                     (SELECT max_ts FROM cst WHERE cst.chunk_id=chunks.id),
3159                     last_success_at
3160                   )
3161                 WHERE origin!='spark'",
3162                rusqlite::params![],
3163            )?;
3164
3165            // 3. Recompute selected/used counts and last-use from retained facts.
3166            self.storage.conn_execute(
3167                "UPDATE chunks SET
3168                   selected_count = selected_count_base + COALESCE(
3169                     (SELECT COUNT(*) FROM usage_trace
3170                      WHERE chunk_id = chunks.id AND event = 'selected'), 0),
3171                   used_count = used_count_base + COALESCE(
3172                     (SELECT COUNT(*) FROM usage_trace
3173                      WHERE chunk_id = chunks.id AND event = 'used'), 0),
3174                   last_used_at = COALESCE(
3175                     (SELECT MAX(ts) FROM usage_trace
3176                      WHERE chunk_id=chunks.id AND event='used'),
3177                     last_used_base
3178                   )
3179                 WHERE origin!='spark'",
3180                rusqlite::params![],
3181            )?;
3182
3183            // 4. Advance watermark and purge only verbose retrieval events.
3184            // Attributed facts remain replayable so repeated curate runs are idempotent
3185            // and a later correction can subtract the previous trace contribution.
3186            self.storage.set_meta("last_agg_ts", &cutoff_ts)?;
3187            self.storage.purge_usage_trace(&cutoff_ts)?;
3188            self.storage.commit()
3189        })();
3190        if agg_result.is_err() {
3191            let _ = self.storage.rollback();
3192            agg_result?;
3193        }
3194
3195        // ── Step 2: recover_logs ──
3196        self.storage.begin_immediate()?;
3197        let recover_result = (|| -> Result<()> {
3198            // Stale screening rows → 'failed' (not 'open'), note = 'screening_timeout:<run_id>'.
3199            let screening_cutoff = minutes_ago(&now_iso, self.screening_timeout_minutes);
3200            let stale = self.storage.query_chunks_params(
3201                "SELECT id, distill_run_id FROM episodic_log
3202                 WHERE distill_state='screening' AND distill_locked_at < ?",
3203                rusqlite::params![screening_cutoff],
3204            )?;
3205            for row in &stale {
3206                let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3207                let run_id = row
3208                    .get("distill_run_id")
3209                    .and_then(Value::as_str)
3210                    .unwrap_or("unknown");
3211                let note = format!("screening_timeout:{run_id}");
3212                self.storage.conn_execute(
3213                "UPDATE episodic_log
3214                 SET distill_state='failed', distill_note=?,
3215                     distill_attempts=distill_attempts+1,
3216                     distill_last_failed_at=?,
3217                     distill_run_id=NULL, distill_locked_at=NULL
3218                 WHERE id=?",
3219                    rusqlite::params![note, now_iso, id],
3220                )?;
3221                report.recovered.push(id.to_string());
3222                report
3223                    .warnings
3224                    .push(format!("stale screening recovered as failed: {id}"));
3225            }
3226
3227            // Open logs past TTL → discarded (insufficient material, never record'd).
3228            let open_ttl_cutoff = days_ago(&now_iso, self.open_ttl_days);
3229            self.storage.conn_execute(
3230                "UPDATE episodic_log
3231                 SET distill_state='discarded', distill_note='no_record_timeout',
3232                     task_state='timed_out', completed_at=?
3233                 WHERE distill_state='open' AND ts < ?",
3234                rusqlite::params![now_iso, open_ttl_cutoff],
3235            )?;
3236            self.storage.commit()
3237        })();
3238        if recover_result.is_err() {
3239            let _ = self.storage.rollback();
3240            recover_result?;
3241        }
3242
3243        // ── Steps 3-7: governance (archive, dedupe, decay, promote, cycle) ──
3244        let scope_origin = scope.origin.clone();
3245        let scope_skill = scope.skill_name.clone();
3246        self.storage.begin_immediate()?;
3247        let gov_result = (|| -> Result<()> {
3248            // New feedback refreshes its chunk synchronously in Record. Curate only needs
3249            // to age pending proposals; scanning every historical feedback row is unbounded.
3250            let governance_chunks = self
3251                .storage
3252                .query_chunks(
3253                    "SELECT DISTINCT chunk_id FROM governance_proposals WHERE state='pending'",
3254                )?;
3255            for row in governance_chunks {
3256                if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
3257                    self.refresh_governance_evidence(chunk_id, &now_iso)?;
3258                }
3259            }
3260
3261            // ── 3a. Archive: low_confidence — only blocks that HAVE been used ──
3262            let low_conf_cutoff = days_ago(&now_iso, self.low_conf_idle_days);
3263            let low_conf = self.storage.query_chunks_params(
3264                "SELECT id FROM chunks
3265                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3266                   AND last_used_at IS NOT NULL
3267                   AND confidence < ?
3268                   AND last_used_at < ?
3269                   AND (? IS NULL OR origin=?)
3270                   AND (? IS NULL OR skill_name=?)",
3271                rusqlite::params![
3272                    self.low_conf_threshold,
3273                    low_conf_cutoff,
3274                    scope_origin,
3275                    scope_origin,
3276                    scope_skill,
3277                    scope_skill
3278                ],
3279            )?;
3280            for c in &low_conf {
3281                if let Some(id) = c.get("id").and_then(Value::as_str) {
3282                    self.storage.update_chunk_state(
3283                        id,
3284                        "archived",
3285                        Some("low_confidence"),
3286                        &now_iso,
3287                    )?;
3288                    report.archived.push(id.to_string());
3289                }
3290            }
3291
3292            // ── 3b. Archive: repeated_selected_unused ──
3293            let rep_sel = self.storage.query_chunks_params(
3294                "SELECT id FROM chunks
3295                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3296                   AND selected_count >= ? AND used_count = 0 AND confidence < ?
3297                   AND (? IS NULL OR origin=?)
3298                   AND (? IS NULL OR skill_name=?)",
3299                rusqlite::params![
3300                    self.repeat_select_min,
3301                    self.repeat_select_conf_max,
3302                    scope_origin,
3303                    scope_origin,
3304                    scope_skill,
3305                    scope_skill
3306                ],
3307            )?;
3308            for c in &rep_sel {
3309                if let Some(id) = c.get("id").and_then(Value::as_str) {
3310                    if !report.archived.contains(&id.to_string()) {
3311                        self.storage.update_chunk_state(
3312                            id,
3313                            "archived",
3314                            Some("repeated_selected_unused"),
3315                            &now_iso,
3316                        )?;
3317                        report.archived.push(id.to_string());
3318                    }
3319                }
3320            }
3321
3322            // ── 3c. Archive: never_used — never entered context at all ──
3323            let never_used_cutoff = days_ago(&now_iso, self.never_used_age_days);
3324            let never_used = self.storage.query_chunks_params(
3325                "SELECT id FROM chunks
3326                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3327                   AND used_count = 0 AND selected_count = 0
3328                   AND created_at < ?
3329                   AND (? IS NULL OR origin=?)
3330                   AND (? IS NULL OR skill_name=?)",
3331                rusqlite::params![
3332                    never_used_cutoff,
3333                    scope_origin,
3334                    scope_origin,
3335                    scope_skill,
3336                    scope_skill
3337                ],
3338            )?;
3339            for c in &never_used {
3340                if let Some(id) = c.get("id").and_then(Value::as_str) {
3341                    if !report.archived.contains(&id.to_string()) {
3342                        self.storage.update_chunk_state(
3343                            id,
3344                            "archived",
3345                            Some("never_used"),
3346                            &now_iso,
3347                        )?;
3348                        report.archived.push(id.to_string());
3349                    }
3350                }
3351            }
3352
3353            // ── 3d. Archive: governance_proposal ──
3354            // Chunks whose pending governance proposals have accumulated enough evidence
3355            // are archived and the proposals accepted atomically.
3356            let gov_proposals = self.storage.query_chunks_params(
3357                "SELECT DISTINCT chunk_id FROM governance_proposals
3358                 WHERE state='pending'
3359                   AND evidence_score >= ? AND actor_count >= 2",
3360                rusqlite::params![self.governance_archive_threshold as f64],
3361            )?;
3362            for c in &gov_proposals {
3363                if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3364                    let already_archived = report.archived.contains(&cid.to_string());
3365                    let eligible = !already_archived && self.storage.get_chunk(cid)?.map(|ch| {
3366                        ch.get("origin").and_then(Value::as_str) != Some("spark")
3367                            && ch.get("protected").and_then(Value::as_i64).unwrap_or(0) == 0
3368                            && matches!(
3369                                ch.get("state").and_then(Value::as_str),
3370                                Some("active") | Some("pending")
3371                            )
3372                    }).unwrap_or(false);
3373                    if eligible {
3374                        self.storage.update_chunk_state(
3375                            cid,
3376                            "archived",
3377                            Some("governance_proposal"),
3378                            &now_iso,
3379                        )?;
3380                        report.archived.push(cid.to_string());
3381                        self.storage.conn_execute(
3382                            "UPDATE governance_proposals
3383                             SET state='accepted', updated_at=?
3384                             WHERE chunk_id=? AND state='pending'",
3385                            rusqlite::params![now_iso, cid],
3386                        )?;
3387                    } else {
3388                        self.storage.conn_execute(
3389                            "UPDATE governance_proposals
3390                             SET state='rejected', updated_at=?
3391                             WHERE chunk_id=? AND state='pending'",
3392                            rusqlite::params![now_iso, cid],
3393                        )?;
3394                    }
3395                }
3396            }
3397
3398            // ── 3d2. Expire stale governance proposals (insufficient evidence, too old) ──
3399            // Proposals that never accumulate enough evidence cause repeated evolve cycles.
3400            // Reject them after governance_proposal_max_age_days so they stop triggering.
3401            let proposal_expiry_cutoff = days_ago(&now_iso, self.governance_proposal_max_age_days);
3402            self.storage.conn_execute(
3403                "UPDATE governance_proposals
3404                 SET state='rejected', updated_at=?
3405                 WHERE state='pending'
3406                   AND evidence_score < ?
3407                   AND created_at < ?",
3408                rusqlite::params![
3409                    now_iso,
3410                    self.governance_archive_threshold as f64,
3411                    proposal_expiry_cutoff
3412                ],
3413            )?;
3414
3415            // ── 3e. Archive: sustained_negative_feedback ──
3416            // Chunks with too many negative feedback events are archived regardless of
3417            // how long they've been idle, giving feedback a direct archival path.
3418            let neg_feedback_chunks = self.storage.query_chunks_params(
3419                "SELECT p.chunk_id FROM governance_proposals p
3420                 JOIN chunks c ON c.id = p.chunk_id
3421                 WHERE c.origin!='spark' AND c.protected=0
3422                   AND c.state IN ('active','pending')
3423                   AND p.state='pending'
3424                   AND p.evidence_score >= ? AND p.actor_count >= 2
3425                   AND (? IS NULL OR c.origin=?)
3426                   AND (? IS NULL OR c.skill_name=?)
3427                 GROUP BY p.chunk_id",
3428                rusqlite::params![
3429                    self.negative_feedback_archive_threshold as f64,
3430                    scope_origin,
3431                    scope_origin,
3432                    scope_skill,
3433                    scope_skill
3434                ],
3435            )?;
3436            for c in &neg_feedback_chunks {
3437                if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3438                    if !report.archived.contains(&cid.to_string()) {
3439                        self.storage.update_chunk_state(
3440                            cid,
3441                            "archived",
3442                            Some("sustained_negative_feedback"),
3443                            &now_iso,
3444                        )?;
3445                        report.archived.push(cid.to_string());
3446                    }
3447                }
3448            }
3449
3450            // ── 3f. Archive: sustained_task_failure ──
3451            // Covers both active and pending chunks: a pending chunk recalled repeatedly
3452            // but never producing successful tasks also has no other archive path.
3453            let high_fail_chunks = self.storage.query_chunks_params(
3454                "SELECT id FROM chunks
3455                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3456                   AND used_count >= ?
3457                   AND CAST(used_success_count AS REAL) / CAST(used_count AS REAL) < ?
3458                   AND confidence < ?
3459                   AND (? IS NULL OR origin=?)
3460                   AND (? IS NULL OR skill_name=?)",
3461                rusqlite::params![
3462                    self.failure_min_uses,
3463                    self.failure_max_success_rate,
3464                    self.failure_confidence_max,
3465                    scope_origin, scope_origin, scope_skill, scope_skill
3466                ],
3467            )?;
3468            for c in &high_fail_chunks {
3469                if let Some(cid) = c.get("id").and_then(Value::as_str) {
3470                    if !report.archived.contains(&cid.to_string()) {
3471                        self.storage.update_chunk_state(
3472                            cid, "archived", Some("sustained_task_failure"), &now_iso,
3473                        )?;
3474                        report.archived.push(cid.to_string());
3475                    }
3476                }
3477            }
3478
3479            // ── 4. Dedupe: same content_hash — keep protected or highest confidence ──
3480            let dupes = self.storage.query_chunks_params(
3481                "SELECT content_hash FROM chunks
3482                 WHERE origin!='spark' AND state IN ('active','pending')
3483                   AND (? IS NULL OR origin=?)
3484                   AND (? IS NULL OR skill_name=?)
3485                 GROUP BY content_hash HAVING COUNT(*) > 1",
3486                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3487            )?;
3488            for d in &dupes {
3489                if let Some(h) = d.get("content_hash").and_then(Value::as_str) {
3490                    let group = self.storage.query_chunks_params(
3491                        "SELECT id, confidence, protected FROM chunks
3492                         WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending')
3493                           AND (? IS NULL OR origin=?)
3494                           AND (? IS NULL OR skill_name=?)
3495                         ORDER BY protected DESC, confidence DESC",
3496                        rusqlite::params![h, scope_origin, scope_origin, scope_skill, scope_skill],
3497                    )?;
3498                    let canonical_id = group
3499                        .first()
3500                        .and_then(|row| row.get("id"))
3501                        .and_then(Value::as_str)
3502                        .unwrap_or("");
3503                    for row in group.iter().skip(1) {
3504                        let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3505                        let reason = format!("duplicate:{canonical_id}");
3506                        self.storage
3507                            .update_chunk_state(id, "archived", Some(&reason), &now_iso)?;
3508                        self.storage.conn_execute(
3509                            "UPDATE chunks SET parent_id=?, updated_at=? WHERE id=?",
3510                            rusqlite::params![canonical_id, now_iso, id],
3511                        )?;
3512                        report.deduped.push(id.to_string());
3513                    }
3514                }
3515            }
3516
3517            // ── 5. Decay: confidence time-decay for idle non-spark non-protected active chunks ──
3518            // Issue 6: Use last_decayed_at as the delta reference to avoid compounding.
3519            // Each curate only applies the INCREMENTAL decay since the previous run, so the
3520            // 90-day half-life is preserved regardless of how often curate runs.
3521            let decay_candidates = self.storage.query_chunks_params(
3522                "SELECT id, confidence, last_used_at, last_decayed_at FROM chunks
3523                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3524                   AND last_used_at IS NOT NULL
3525                   AND (? IS NULL OR origin=?)
3526                   AND (? IS NULL OR skill_name=?)",
3527                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3528            )?;
3529            for c in &decay_candidates {
3530                let id = match c.get("id").and_then(Value::as_str) {
3531                    Some(v) => v,
3532                    None => continue,
3533                };
3534                let conf = c.get("confidence").and_then(Value::as_f64).unwrap_or(0.5);
3535                let last_used = c.get("last_used_at").and_then(Value::as_str).unwrap_or(&now_iso);
3536                // Use last_decayed_at (or last_used_at) as the reference for incremental delta.
3537                let decay_ref = c
3538                    .get("last_decayed_at")
3539                    .and_then(Value::as_str)
3540                    .filter(|s| *s > last_used)
3541                    .unwrap_or(last_used);
3542                let delta_days = iso_days_diff(&now_iso, decay_ref);
3543                if delta_days <= 0 {
3544                    continue;
3545                }
3546                let floor = self.decay_floor;
3547                let decay_alpha = 1.0 - 0.5_f64.powf(delta_days as f64 / 90.0);
3548                let new_conf = conf + decay_alpha * (floor - conf);
3549                if (new_conf - conf).abs() > 0.001 {
3550                    let note = format!("decay:{delta_days}d");
3551                    self.storage.upsert_confidence_evidence(
3552                        &gen_uuid(),
3553                        None,
3554                        id,
3555                        "decay",
3556                        floor,
3557                        decay_alpha,
3558                        &note,
3559                        None,
3560                        &now_iso,
3561                    )?;
3562                    self.recompute_chunk_confidence(id, &now_iso)?;
3563                    self.storage.update_chunk_last_decayed_at(id, &now_iso)?;
3564                    report.decayed.push(id.to_string());
3565                }
3566            }
3567
3568            // ── 6. Promote: pending → active when three-guard criteria met ──
3569            let promotable = self.storage.query_chunks_params(
3570                "SELECT id FROM chunks
3571                 WHERE state='pending' AND origin!='spark'
3572                   AND used_success_count >= ?
3573                   AND success_trace_ids_count >= 2
3574                   AND confidence >= ?
3575                   AND (? IS NULL OR origin=?)
3576                   AND (? IS NULL OR skill_name=?)",
3577                rusqlite::params![
3578                    self.promote_used_success_min,
3579                    self.promote_confidence_min,
3580                    scope_origin,
3581                    scope_origin,
3582                    scope_skill,
3583                    scope_skill
3584                ],
3585            )?;
3586            for c in &promotable {
3587                if let Some(id) = c.get("id").and_then(Value::as_str) {
3588                    self.storage.update_chunk_state(
3589                        id,
3590                        "active",
3591                        Some("repeated_success"),
3592                        &now_iso,
3593                    )?;
3594                }
3595            }
3596
3597            // ── 7. Cycle/orphan detection (report only, no auto-fix) ──
3598            let all_deps = self
3599                .storage
3600                .query_chunks("SELECT src, dst FROM deps WHERE kind='hard'")?;
3601            let cycles = detect_cycles(&all_deps);
3602            report.cycles = cycles;
3603            let orphan_rows = self.storage.query_chunks_params(
3604                "SELECT d.src, d.dst, s.id AS src_exists, t.id AS dst_exists
3605                 FROM deps d
3606                 LEFT JOIN chunks s ON s.id=d.src
3607                 LEFT JOIN chunks t ON t.id=d.dst
3608                 WHERE d.kind='hard'
3609                   AND (? IS NULL OR s.origin=?)
3610                   AND (? IS NULL OR s.skill_name=?)",
3611                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3612            )?;
3613            let mut orphans = HashSet::new();
3614            for row in orphan_rows {
3615                if row.get("src_exists").is_none_or(Value::is_null) {
3616                    if let Some(id) = row.get("src").and_then(Value::as_str) {
3617                        orphans.insert(id.to_string());
3618                    }
3619                }
3620                if row.get("dst_exists").is_none_or(Value::is_null) {
3621                    if let Some(id) = row.get("dst").and_then(Value::as_str) {
3622                        orphans.insert(id.to_string());
3623                    }
3624                }
3625            }
3626            report.orphans = orphans.into_iter().collect();
3627            report.orphans.sort();
3628
3629            // ── 8. Full context_stats rebuild — periodic correction pass.
3630            // record() only does targeted per-chunk updates; curate keeps the whole
3631            // table accurate by rebuilding from all sources once per cycle.
3632            self.rebuild_context_stats(&now_iso)?;
3633
3634            self.storage.commit()
3635        })();
3636        if gov_result.is_err() {
3637            let _ = self.storage.rollback();
3638            gov_result?;
3639        }
3640
3641        // ── Step 8: compact old terminal logs while preserving trace identity. ──
3642        // The compact row keeps attribution corrections and audit joins possible without
3643        // retaining potentially large raw outputs indefinitely.
3644        self.storage.begin_immediate()?;
3645        let purge_cutoff = days_ago(&now_iso, 30);
3646        let purge_result = self
3647            .storage
3648            .conn_execute(
3649                "UPDATE episodic_log
3650                 SET query=NULL, recall_snapshot=NULL, output=NULL, output_summary=NULL,
3651                     nomination=NULL,
3652                     distill_note=COALESCE(distill_note, 'compacted')
3653                 WHERE distill_state IN ('distilled','discarded','failed')
3654                   AND ts < ?",
3655                rusqlite::params![purge_cutoff],
3656            )
3657            .and_then(|_| self.storage.commit());
3658        if purge_result.is_err() {
3659            let _ = self.storage.rollback();
3660            purge_result?;
3661        }
3662
3663        // ── Step 9: prune completed/failed evolve_requests older than 30 days ──
3664        // Prevents unbounded table growth that would slow COUNT(*) queries.
3665        self.storage.begin_immediate()?;
3666        let evolve_req_cutoff = days_ago(&now_iso, 30);
3667        let prune_req_result = self
3668            .storage
3669            .conn_execute(
3670                "DELETE FROM evolve_requests
3671                 WHERE state IN ('completed','failed') AND requested_at < ?",
3672                rusqlite::params![evolve_req_cutoff],
3673            )
3674            .and_then(|_| self.storage.commit());
3675        if prune_req_result.is_err() {
3676            let _ = self.storage.rollback();
3677            prune_req_result?;
3678        }
3679
3680        Ok(report)
3681    }
3682
3683    // ------------------------------------------------------------------
3684    // Public API 8: inspect
3685    // ------------------------------------------------------------------
3686
3687    pub fn inspect(&self) -> Result<Value> {
3688        let total: i64 = count_query(
3689            &self.storage,
3690            "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
3691        )?;
3692        let active: i64 = count_query(
3693            &self.storage,
3694            "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
3695        )?;
3696        let pending: i64 = count_query(
3697            &self.storage,
3698            "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
3699        )?;
3700        let archived: i64 = count_query(
3701            &self.storage,
3702            "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
3703        )?;
3704        let sparks: i64 = count_query(
3705            &self.storage,
3706            "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
3707        )?;
3708        let open_logs: i64 = count_query(
3709            &self.storage,
3710            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
3711        )?;
3712        let new_logs: i64 = count_query(
3713            &self.storage,
3714            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
3715        )?;
3716        let embed_rebuild: i64 = count_query(&self.storage,
3717            "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
3718        let schema_version = self.storage.get_meta_or("schema_version", "?");
3719        let lib_id = self.storage.get_meta_or("lib_id", "?");
3720        let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
3721
3722        let metric_window_start = days_ago(&utc_now_iso(), 30);
3723        let trace_metrics = self.storage.query_chunks_params(
3724            "SELECT COUNT(*) AS total,
3725                    SUM(CASE WHEN task_state='completed' THEN 1 ELSE 0 END) AS completed,
3726                    SUM(CASE WHEN task_state='timed_out' THEN 1 ELSE 0 END) AS timed_out,
3727                    SUM(CASE WHEN task_state='completed' AND usage_state!='unknown'
3728                             THEN 1 ELSE 0 END) AS usage_known,
3729                    SUM(CASE WHEN task_state='completed' AND usage_state='known_some'
3730                             THEN 1 ELSE 0 END) AS usage_some,
3731                    SUM(CASE WHEN outcome='ok' THEN 1 ELSE 0 END) AS succeeded
3732             FROM episodic_log WHERE ts >= ?",
3733            rusqlite::params![metric_window_start],
3734        )?;
3735        let trace_row = trace_metrics.first();
3736        let trace_total = trace_row
3737            .and_then(|row| row.get("total"))
3738            .and_then(Value::as_i64)
3739            .unwrap_or(0);
3740        let trace_completed = trace_row
3741            .and_then(|row| row.get("completed"))
3742            .and_then(Value::as_i64)
3743            .unwrap_or(0);
3744        let trace_timed_out = trace_row
3745            .and_then(|row| row.get("timed_out"))
3746            .and_then(Value::as_i64)
3747            .unwrap_or(0);
3748        let usage_known = trace_row
3749            .and_then(|row| row.get("usage_known"))
3750            .and_then(Value::as_i64)
3751            .unwrap_or(0);
3752        let usage_some = trace_row
3753            .and_then(|row| row.get("usage_some"))
3754            .and_then(Value::as_i64)
3755            .unwrap_or(0);
3756        let succeeded = trace_row
3757            .and_then(|row| row.get("succeeded"))
3758            .and_then(Value::as_i64)
3759            .unwrap_or(0);
3760        let usage_rows = self.storage.query_chunks_params(
3761            "SELECT recall_snapshot, used_ids FROM episodic_log
3762             WHERE usage_state!='unknown' AND used_complete=1
3763               AND recall_snapshot IS NOT NULL AND used_ids IS NOT NULL
3764               AND ts >= ?",
3765            rusqlite::params![metric_window_start],
3766        )?;
3767        let mut selected_total = 0_i64;
3768        let mut selected_used = 0_i64;
3769        for row in usage_rows {
3770            let selected: HashSet<String> = row
3771                .get("recall_snapshot")
3772                .and_then(Value::as_str)
3773                .and_then(|raw| serde_json::from_str::<Value>(raw).ok())
3774                .and_then(|snapshot| snapshot.get("selected").cloned())
3775                .and_then(|value| serde_json::from_value::<Vec<String>>(value).ok())
3776                .unwrap_or_default()
3777                .into_iter()
3778                .collect();
3779            let used: HashSet<String> = row
3780                .get("used_ids")
3781                .and_then(Value::as_str)
3782                .and_then(|raw| serde_json::from_str::<Vec<String>>(raw).ok())
3783                .unwrap_or_default()
3784                .into_iter()
3785                .collect();
3786            selected_total += selected.len() as i64;
3787            selected_used += selected.intersection(&used).count() as i64;
3788        }
3789        let feedback_count = count_query_params(
3790            &self.storage,
3791            "SELECT COUNT(*) FROM feedback_events WHERE ts >= ?",
3792            rusqlite::params![metric_window_start],
3793        )?;
3794        let feedback_traces = count_query_params(
3795            &self.storage,
3796            "SELECT COUNT(DISTINCT f.trace_id)
3797             FROM feedback_events f
3798             JOIN episodic_log e ON e.trace_id=f.trace_id
3799             WHERE f.ts >= ? AND e.ts >= ? AND e.task_state='completed'",
3800            rusqlite::params![metric_window_start, metric_window_start],
3801        )?;
3802        let pending_evolve = count_query(
3803            &self.storage,
3804            "SELECT COUNT(*) FROM evolve_requests WHERE state IN ('pending','running')",
3805        )?;
3806        let governance_pending = count_query(
3807            &self.storage,
3808            "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
3809        )?;
3810        let failed_evolve = count_query_params(
3811            &self.storage,
3812            "SELECT COUNT(*) FROM evolve_requests
3813             WHERE last_failed_at >= ?",
3814            rusqlite::params![metric_window_start],
3815        )?;
3816        let failed_distill = count_query_params(
3817            &self.storage,
3818            "SELECT COUNT(*) FROM episodic_log
3819             WHERE distill_last_failed_at >= ?",
3820            rusqlite::params![metric_window_start],
3821        )?;
3822        let confidence_buckets = self.storage.query_chunks(
3823            &format!("SELECT
3824               SUM(CASE WHEN confidence < 0.25 THEN 1 ELSE 0 END) AS low,
3825               SUM(CASE WHEN confidence >= 0.25 AND confidence < {0} THEN 1 ELSE 0 END) AS medium,
3826               SUM(CASE WHEN confidence >= {0} THEN 1 ELSE 0 END) AS high
3827             FROM chunks WHERE origin!='spark' AND state!='archived'",
3828                self.promote_confidence_min),
3829        )?;
3830        let confidence_row = confidence_buckets.first();
3831
3832        // P3-A: oldest pending chunk timestamp — surfaces long-lived pending debt.
3833        let pending_oldest_ts = self.storage.query_chunks(
3834            "SELECT MIN(created_at) AS oldest FROM chunks WHERE state='pending' AND origin!='spark'",
3835        )?.into_iter().next()
3836            .and_then(|r| r.get("oldest").cloned())
3837            .and_then(|v| if v.is_null() { None } else { Some(v) });
3838
3839        // Health signal 1: knowledge debt ratio.
3840        // Zombie = active chunks with middling confidence (stuck, neither good nor bad)
3841        // that are at least 14d old and have been used at least once.
3842        // "never-recalled old" chunks are handled by curate 3c (never_used archive).
3843        let zombie_cutoff = days_ago(&utc_now_iso(), 14);
3844        let zombie: i64 = count_query_params(
3845            &self.storage,
3846            "SELECT COUNT(*) FROM chunks
3847             WHERE origin!='spark' AND state='active'
3848               AND confidence >= 0.4 AND confidence <= 0.6
3849               AND last_used_at IS NOT NULL
3850               AND created_at < ?",
3851            rusqlite::params![zombie_cutoff],
3852        )?;
3853        let debt_numerator = pending + zombie;
3854        let debt_denominator = active.max(1);
3855        let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
3856
3857        // Health signal 5: stale screening count
3858        let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
3859        let stale_screening: i64 = count_query_params(
3860            &self.storage,
3861            "SELECT COUNT(*) FROM episodic_log
3862             WHERE distill_state='screening' AND distill_locked_at < ?",
3863            rusqlite::params![screening_cutoff],
3864        )?;
3865
3866        // Health signal 4: actual Distill cost within the configured rolling window.
3867        let distill_period_start = self.distill_token_period_start(&utc_now_iso())?;
3868        let distill_cost = self.storage.query_chunks_params(
3869            "SELECT COALESCE(SUM(distill_prompt_tokens),0) AS pt,
3870                    COALESCE(SUM(distill_completion_tokens),0) AS ct
3871             FROM episodic_log
3872             WHERE distill_accounted_at >= ?",
3873            rusqlite::params![distill_period_start],
3874        )?;
3875        let prompt_tokens = distill_cost
3876            .first()
3877            .and_then(|r| r.get("pt"))
3878            .and_then(Value::as_i64)
3879            .unwrap_or(0);
3880        let completion_tokens = distill_cost
3881            .first()
3882            .and_then(|r| r.get("ct"))
3883            .and_then(Value::as_i64)
3884            .unwrap_or(0);
3885
3886        // Health signal 2: sparks that have been recalled often (soft incubation threshold = 5)
3887        let spark_threshold: i64 = self
3888            .storage
3889            .get_meta("curate.soft_mature_threshold")
3890            .ok()
3891            .flatten()
3892            .and_then(|v| v.parse::<i64>().ok())
3893            .unwrap_or(5);
3894        let recurring_sparks = self.storage.query_chunks_params(
3895            "SELECT ut.chunk_id, COUNT(*) AS cnt,
3896                    c.content, c.trigger_desc, c.maturity
3897             FROM usage_trace ut
3898             JOIN chunks c ON c.id = ut.chunk_id
3899             WHERE ut.event='retrieved'
3900               AND c.origin='spark'
3901             GROUP BY ut.chunk_id HAVING cnt >= ?",
3902            rusqlite::params![spark_threshold],
3903        )?;
3904        let recurring_spark_ids: Vec<Value> = recurring_sparks
3905            .iter()
3906            .map(|r| {
3907                json!({
3908                    "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
3909                    "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
3910                    "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
3911                    "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
3912                        .chars().take(80).collect::<String>(),
3913                })
3914            })
3915            .collect();
3916
3917        let mut suggestions: Vec<Value> = Vec::new();
3918        if embed_rebuild > 0 {
3919            suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
3920        }
3921        if new_logs > 0 {
3922            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
3923        }
3924        if pending > 0 {
3925            suggestions.push(json!({"action": "innate approve <id>  # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
3926        }
3927        if !recurring_spark_ids.is_empty() {
3928            suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
3929        }
3930        if stale_screening > 0 {
3931            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
3932        }
3933        if governance_pending > 0 {
3934            suggestions.push(json!({
3935                "action": "review governance_proposals",
3936                "reason": format!("{governance_pending} chunk(s) have repeated negative feedback")
3937            }));
3938        }
3939
3940        Ok(json!({
3941            "schema_version": schema_version,
3942            "lib_id": lib_id,
3943            "last_agg_ts": last_agg,
3944            "chunks": {
3945                "total": total, "active": active, "pending": pending, "archived": archived,
3946                "pending_oldest_ts": pending_oldest_ts,
3947            },
3948            "sparks": sparks,
3949            "episodic_log": {"open": open_logs, "new": new_logs},
3950            "embed_rebuild_queue": embed_rebuild,
3951            "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
3952            "stale_screening_count": stale_screening,
3953            "feedback_loop": {
3954                "trace_completion_rate": ratio(trace_completed, trace_total),
3955                "usage_annotation_rate": ratio(usage_known, trace_completed),
3956                "trace_use_rate": ratio(usage_some, usage_known),
3957                "selected_to_used_rate": ratio(selected_used, selected_total),
3958                "task_success_rate": ratio(succeeded, trace_completed),
3959                "feedback_coverage": ratio(feedback_traces, trace_completed),
3960                "feedback_events": feedback_count,
3961                "timed_out_traces": trace_timed_out,
3962                "pending_evolve_requests": pending_evolve,
3963                "failed_evolve_requests_30d": failed_evolve,
3964                "failed_distill_logs_30d": failed_distill,
3965                "pending_governance_proposals": governance_pending,
3966                "window_days": 30,
3967                "confidence_distribution": {
3968                    "low": confidence_row.and_then(|row| row.get("low")).and_then(Value::as_i64).unwrap_or(0),
3969                    "medium": confidence_row.and_then(|row| row.get("medium")).and_then(Value::as_i64).unwrap_or(0),
3970                    "high": confidence_row.and_then(|row| row.get("high")).and_then(Value::as_i64).unwrap_or(0),
3971                }
3972            },
3973            "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
3974            "recurring_sparks": recurring_sparks.len(),
3975            "recurring_spark_ids": recurring_spark_ids,
3976            "params": {
3977                "recall.w_content": self.w_content,
3978                "recall.w_trigger": self.w_trigger,
3979                "recall.w_context": self.w_context,
3980                "recall.top_k_candidates": self.top_k_candidates,
3981                "curate.low_conf_threshold": self.low_conf_threshold,
3982                "curate.low_conf_idle_days": self.low_conf_idle_days,
3983                "curate.repeat_select_min": self.repeat_select_min,
3984                "curate.never_used_age_days": self.never_used_age_days,
3985                "curate.promote_used_success_min": self.promote_used_success_min,
3986                "curate.promote_confidence_min": self.promote_confidence_min,
3987                "curate.screening_timeout_minutes": self.screening_timeout_minutes,
3988                "curate.open_ttl_days": self.open_ttl_days,
3989                "evolve.schedule_interval_hours": self.evolve_schedule_interval_hours,
3990            },
3991            "suggestions": suggestions
3992        }))
3993    }
3994
3995    // ------------------------------------------------------------------
3996    // Public: rebuild_embeddings (evolve --rebuild-embeddings)
3997    // ------------------------------------------------------------------
3998
3999    pub fn rebuild_embeddings(&self) -> Result<usize> {
4000        let meta_version = self
4001            .storage
4002            .get_meta("embed_version")?
4003            .and_then(|v| v.parse::<i64>().ok())
4004            .unwrap_or(1);
4005        // Fetch chunks with embed_version=0 (failed writes) or below current meta version.
4006        let stale = self.storage.query_chunks_params(
4007            "SELECT id, content, trigger_desc, state_reason FROM chunks
4008             WHERE embed_version = 0 OR embed_version < ?",
4009            rusqlite::params![meta_version],
4010        )?;
4011        let mut count = 0;
4012        for row in &stale {
4013            let id = match row.get("id").and_then(Value::as_str) {
4014                Some(v) => v,
4015                None => continue,
4016            };
4017            let content = row.get("content").and_then(Value::as_str).unwrap_or("");
4018            let trigger = row
4019                .get("trigger_desc")
4020                .and_then(Value::as_str)
4021                .unwrap_or(content);
4022            let state_reason = row
4023                .get("state_reason")
4024                .and_then(Value::as_str)
4025                .unwrap_or("");
4026
4027            let cvec = match self.embedding.embed_content(content) {
4028                Ok(v) => v,
4029                Err(_) => continue,
4030            };
4031            let tvec = match self.embedding.embed_trigger(trigger) {
4032                Ok(v) => v,
4033                Err(_) => continue,
4034            };
4035
4036            self.storage.begin_immediate()?;
4037            let r = (|| -> Result<()> {
4038                self.storage
4039                    .insert_vec_content(id, &pack_embedding(&cvec))?;
4040                self.storage
4041                    .insert_vec_trigger(id, &pack_embedding(&tvec))?;
4042                // Restore intended state if encoded in state_reason.
4043                let new_reason = if state_reason.starts_with("embedding_pending:target=") {
4044                    let target_state = state_reason.trim_start_matches("embedding_pending:target=");
4045                    let now = utc_now_iso();
4046                    self.storage.update_chunk_state(
4047                        id,
4048                        target_state,
4049                        Some("embedding_rebuilt"),
4050                        &now,
4051                    )?;
4052                    "embedding_rebuilt".to_string()
4053                } else {
4054                    "embedding_rebuilt".to_string()
4055                };
4056                let now = utc_now_iso();
4057                self.storage.conn_execute(
4058                    "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
4059                    rusqlite::params![meta_version, new_reason, now, id],
4060                )?;
4061                self.storage.commit()
4062            })();
4063            if r.is_err() {
4064                let _ = self.storage.rollback();
4065            } else {
4066                count += 1;
4067            }
4068        }
4069        Ok(count)
4070    }
4071
4072    // ------------------------------------------------------------------
4073    // Public: inspect_id (inspect <chunk_id> or <trace_id>)
4074    // ------------------------------------------------------------------
4075
4076    pub fn inspect_id(&self, id: &str) -> Result<Value> {
4077        // Try as chunk_id first, then as trace_id.
4078        if let Some(chunk) = self.storage.get_chunk(id)? {
4079            let traces = self.storage.query_chunks_params(
4080                "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
4081                rusqlite::params![id],
4082            )?;
4083            let derived = self.storage.query_chunks_params(
4084                "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
4085                   SELECT id FROM episodic_log WHERE trace_id IN (
4086                     SELECT trace_id FROM usage_trace WHERE chunk_id=?
4087                   )
4088                 ) LIMIT 10",
4089                rusqlite::params![id],
4090            )?;
4091            return Ok(json!({
4092                "kind": "chunk",
4093                "chunk": chunk,
4094                "recent_traces": traces,
4095                "derived_chunks": derived,
4096            }));
4097        }
4098        // Try as trace_id.
4099        if let Some(log) = self.storage.get_episodic_log(id)? {
4100            let traces = self.storage.query_chunks_params(
4101                "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
4102                rusqlite::params![id],
4103            )?;
4104            return Ok(json!({
4105                "kind": "trace",
4106                "episodic_log": log,
4107                "usage_traces": traces,
4108            }));
4109        }
4110        Err(InnateError::ChunkNotFound(id.to_string()))
4111    }
4112
4113    // ------------------------------------------------------------------
4114    // Sanitize
4115    // ------------------------------------------------------------------
4116
4117    fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
4118        self.sanitizer.sanitize(content)
4119    }
4120}
4121
4122// ---------------------------------------------------------------------------
4123// Helpers
4124// ---------------------------------------------------------------------------
4125
4126struct CandidateInfo {
4127    chunk: Value,
4128    sim_content: f32,
4129    sim_trigger: f32,
4130}
4131
4132fn chunk_is_valid_for_recall(chunk: &Value, embed_version: i64) -> bool {
4133    chunk.get("state").and_then(Value::as_str) != Some("archived")
4134        && chunk.get("origin").and_then(Value::as_str) != Some("spark")
4135        && chunk
4136            .get("embed_version")
4137            .and_then(Value::as_i64)
4138            .unwrap_or(1)
4139            >= embed_version
4140}
4141
4142/// Normalize a query string before hashing into a context_key.
4143///
4144/// Goals: collapse whitespace variations and case differences so that
4145/// semantically equivalent queries (same words, different capitalisation or
4146/// spacing) accumulate statistics in the same context_stat bucket.
4147///
4148/// Deliberately conservative: no stemming, no stop-word removal. The canonical
4149/// query guidance in SKILL.md handles vocabulary consistency at the agent level.
4150fn normalize_query(query: &str) -> String {
4151    const STOP_WORDS: &[&str] = &[
4152        "a", "an", "and", "for", "in", "of", "on", "the", "to", "with",
4153    ];
4154    let cleaned: String = query
4155        .to_lowercase()
4156        .chars()
4157        .map(|ch| {
4158            if ch.is_alphanumeric() || ch.is_whitespace() {
4159                ch
4160            } else {
4161                ' '
4162            }
4163        })
4164        .collect();
4165    let mut tokens: Vec<&str> = cleaned
4166        .split_whitespace()
4167        .filter(|token| !STOP_WORDS.contains(token))
4168        .collect();
4169    tokens.sort_unstable();
4170    tokens.dedup();
4171    tokens.join(" ")
4172}
4173
4174fn estimate_distill_prompt_tokens(log: &Value, related_logs: &[Value]) -> i64 {
4175    let primary: i64 = [
4176        "query",
4177        "recall_snapshot",
4178        "output",
4179        "output_summary",
4180        "nomination",
4181    ]
4182    .iter()
4183    .filter_map(|key| log.get(*key).and_then(Value::as_str))
4184    .map(|text| estimate_tokens(text) as i64)
4185    .sum();
4186    let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
4187    let related: i64 = related_logs
4188        .iter()
4189        .filter(|other| other.get("id").and_then(Value::as_str).unwrap_or("") != log_id)
4190        .take(4)
4191        .flat_map(|other| {
4192            ["query", "output_summary", "outcome"]
4193                .into_iter()
4194                .filter_map(|key| other.get(key).and_then(Value::as_str))
4195        })
4196        .map(|text| estimate_tokens(text) as i64)
4197        .sum();
4198    primary + related
4199}
4200
4201fn estimate_distilled_chunk_tokens(chunk: &DistilledChunk) -> i64 {
4202    estimate_tokens(&chunk.content) as i64
4203        + chunk
4204            .trigger_desc
4205            .as_deref()
4206            .map(estimate_tokens)
4207            .unwrap_or(0) as i64
4208        + chunk
4209            .anti_trigger_desc
4210            .as_deref()
4211            .map(estimate_tokens)
4212            .unwrap_or(0) as i64
4213}
4214
4215fn anti_trigger_hit(query: &str, anti: &str) -> bool {
4216    let q_lower = query.to_lowercase();
4217    anti.to_lowercase().split(',').any(|part| {
4218        let p = part.trim();
4219        !p.is_empty() && q_lower.contains(p)
4220    })
4221}
4222
4223fn block_cost(block: &[Value]) -> usize {
4224    block
4225        .iter()
4226        .map(|b| {
4227            b.get("token_count")
4228                .and_then(Value::as_u64)
4229                .map(|t| t as usize)
4230                .unwrap_or_else(|| {
4231                    estimate_tokens(b.get("content").and_then(Value::as_str).unwrap_or("")).max(100)
4232                })
4233        })
4234        .sum()
4235}
4236
4237fn limit_knowledge(knowledge: Vec<Value>, top: Option<usize>) -> Vec<Value> {
4238    match top {
4239        None => knowledge,
4240        Some(0) => vec![],
4241        Some(n) => knowledge.into_iter().take(n).collect(),
4242    }
4243}
4244
4245fn usage_state(used: Option<&[String]>) -> &'static str {
4246    match used {
4247        None => "unknown",
4248        Some([]) => "known_none",
4249        Some(_) => "known_some",
4250    }
4251}
4252
4253fn ratio(numerator: i64, denominator: i64) -> f64 {
4254    if denominator <= 0 {
4255        0.0
4256    } else {
4257        ((numerator as f64 / denominator as f64) * 1000.0).round() / 1000.0
4258    }
4259}
4260
4261fn validate_source(source: &str) -> Result<()> {
4262    if !matches!(
4263        source,
4264        "mcp" | "sdk" | "cli" | "hook" | "daemon" | "augmented"
4265    ) {
4266        return Err(InnateError::InvalidState(format!(
4267            "invalid event source: {source}"
4268        )));
4269    }
4270    Ok(())
4271}
4272
4273fn count_query(storage: &Storage, sql: &str) -> Result<i64> {
4274    Ok(storage
4275        .query_chunks(sql)?
4276        .first()
4277        .and_then(|r| r.as_object())
4278        .and_then(|m| m.values().next())
4279        .and_then(Value::as_i64)
4280        .unwrap_or(0))
4281}
4282
4283fn count_query_params<P: rusqlite::Params>(storage: &Storage, sql: &str, p: P) -> Result<i64> {
4284    Ok(storage
4285        .query_chunks_params(sql, p)?
4286        .first()
4287        .and_then(|r| r.as_object())
4288        .and_then(|m| m.values().next())
4289        .and_then(Value::as_i64)
4290        .unwrap_or(0))
4291}
4292
4293fn days_ago(now_iso: &str, days: i64) -> String {
4294    use chrono::{DateTime, Duration, Utc};
4295    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4296        let cutoff = t - Duration::days(days);
4297        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4298    }
4299    now_iso.to_string()
4300}
4301
4302fn minutes_ago(now_iso: &str, minutes: i64) -> String {
4303    use chrono::{DateTime, Duration, Utc};
4304    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4305        let cutoff = t - Duration::minutes(minutes);
4306        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4307    }
4308    now_iso.to_string()
4309}
4310
4311fn hours_ago(now_iso: &str, hours: i64) -> String {
4312    use chrono::{DateTime, Duration, Utc};
4313    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4314        let cutoff = t - Duration::hours(hours);
4315        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4316    }
4317    now_iso.to_string()
4318}
4319
4320/// Return the number of whole days between two ISO timestamps (now - past; clamped ≥ 0).
4321fn iso_days_diff(now_iso: &str, past_iso: &str) -> i64 {
4322    use chrono::{DateTime, Utc};
4323    let parse = |s: &str| s.parse::<DateTime<Utc>>().ok();
4324    if let (Some(a), Some(b)) = (parse(now_iso), parse(past_iso)) {
4325        let diff = a - b;
4326        diff.num_days().max(0)
4327    } else {
4328        0
4329    }
4330}
4331
4332/// DFS-based cycle detection on the hard-dep graph. Returns list of cycles (each is a Vec of ids).
4333fn detect_cycles(deps: &[Value]) -> Vec<Vec<String>> {
4334    use std::collections::HashMap;
4335    let mut adj: HashMap<String, Vec<String>> = HashMap::new();
4336    for d in deps {
4337        let src = d
4338            .get("src")
4339            .and_then(Value::as_str)
4340            .unwrap_or("")
4341            .to_string();
4342        let dst = d
4343            .get("dst")
4344            .and_then(Value::as_str)
4345            .unwrap_or("")
4346            .to_string();
4347        if !src.is_empty() && !dst.is_empty() {
4348            adj.entry(src).or_default().push(dst);
4349        }
4350    }
4351    let nodes: Vec<String> = adj.keys().cloned().collect();
4352    let mut visited: HashSet<String> = HashSet::new();
4353    let mut on_stack: HashSet<String> = HashSet::new();
4354    let mut cycles: Vec<Vec<String>> = vec![];
4355
4356    fn dfs(
4357        node: &str,
4358        adj: &HashMap<String, Vec<String>>,
4359        visited: &mut HashSet<String>,
4360        on_stack: &mut HashSet<String>,
4361        path: &mut Vec<String>,
4362        cycles: &mut Vec<Vec<String>>,
4363    ) {
4364        if on_stack.contains(node) {
4365            // Found cycle — extract loop segment.
4366            let start = path.iter().position(|n| n == node).unwrap_or(0);
4367            cycles.push(path[start..].to_vec());
4368            return;
4369        }
4370        if visited.contains(node) {
4371            return;
4372        }
4373        visited.insert(node.to_string());
4374        on_stack.insert(node.to_string());
4375        path.push(node.to_string());
4376        if let Some(children) = adj.get(node) {
4377            for child in children {
4378                dfs(child, adj, visited, on_stack, path, cycles);
4379            }
4380        }
4381        path.pop();
4382        on_stack.remove(node);
4383    }
4384
4385    for node in nodes {
4386        let mut path = vec![];
4387        dfs(
4388            &node,
4389            &adj,
4390            &mut visited,
4391            &mut on_stack,
4392            &mut path,
4393            &mut cycles,
4394        );
4395    }
4396    cycles
4397}