Skip to main content

innate_core/
kb.rs

1//! KnowledgeBase — all 8 Public APIs.
2
3/// Return type for pack(): (selected_chunks, skipped_groups, skip_reasons)
4type PackResult = (
5    Vec<Value>,
6    Vec<(Vec<Value>, f64, usize)>,
7    std::collections::HashMap<String, String>,
8);
9
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::embedding::{DummyEmbeddingProvider, EmbeddingProvider};
17use crate::errors::{InnateError, Result};
18use crate::refine::{
19    DefaultSanitizer, DistilledChunk, Distiller, HeuristicDistiller, NullRefiner, Refiner,
20    Sanitizer,
21};
22use crate::storage::{ChunkRow, EpisodicLogRow, Storage};
23use crate::utils::{
24    content_hash, estimate_tokens, gen_uuid, pack_embedding, utc_now_iso, SanitizeAction,
25};
26
27// ---------------------------------------------------------------------------
28// Tuning defaults
29// ---------------------------------------------------------------------------
30
31const W_CONTENT: f64 = 0.55;
32const W_TRIGGER: f64 = 0.25;
33const W_CONFIDENCE: f64 = 0.10;
34const W_CONTEXT: f64 = 0.15;
35const TOP_K_CANDIDATES: usize = 20;
36const ANTI_TRIGGER_PENALTY: f64 = 0.6;
37const DENSITY_REFILL: bool = true;
38
39const LOW_CONF_THRESHOLD: f64 = 0.25;
40const LOW_CONF_IDLE_DAYS: i64 = 60;
41const REPEAT_SELECT_MIN: i64 = 10;
42const REPEAT_SELECT_CONF_MAX: f64 = 0.5;
43const NEVER_USED_AGE_DAYS: i64 = 30;
44const OPEN_TTL_DAYS: i64 = 14;
45const SCREENING_TIMEOUT_MINUTES: i64 = 30;
46const PROMOTE_USED_SUCCESS_MIN: i64 = 3;
47const PROMOTE_CONFIDENCE_MIN: f64 = 0.60;
48const DECAY_FLOOR: f64 = 0.20;
49const EVOLVE_THRESHOLD: i64 = 5;
50const DISTILL_BATCH_SIZE: usize = 20;
51const PENDING_RECALL_PENALTY: f64 = 0.60;
52const GOVERNANCE_ARCHIVE_THRESHOLD: i64 = 3;
53const NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD: i64 = 5;
54const GOVERNANCE_EVOLVE_THRESHOLD: i64 = 3;
55const FAILURE_MIN_USES: i64 = 5;
56const FAILURE_MAX_SUCCESS_RATE: f64 = 0.20;
57const FAILURE_CONFIDENCE_MAX: f64 = 0.35;
58
59// ---------------------------------------------------------------------------
60// Public result types
61// ---------------------------------------------------------------------------
62
63#[derive(Debug, Default, Clone)]
64pub struct RecallResult {
65    pub knowledge: Vec<Value>,
66    pub sparks: Vec<Value>,
67    pub trace_id: String,
68    pub empty: bool,
69    pub depth_skipped: Vec<String>,
70    pub skipped_reasons: HashMap<String, String>,
71}
72
73#[derive(Debug, Default)]
74pub struct CurateReport {
75    pub archived: Vec<String>,
76    pub deduped: Vec<String>,
77    pub decayed: Vec<String>,
78    pub cycles: Vec<Vec<String>>,
79    pub orphans: Vec<String>,
80    pub recovered: Vec<String>,
81    pub warnings: Vec<String>,
82    pub stats: HashMap<String, Value>,
83}
84
85#[derive(Debug, Default)]
86struct DistillBatchReport {
87    distilled: usize,
88    failed: usize,
89}
90
91/// Scope for a single Curate run — allows limiting governance to a subset of chunks.
92#[derive(Debug, Default, Clone)]
93pub struct CurateScope {
94    /// If set, only process chunks with this origin (e.g. "distilled").
95    pub origin: Option<String>,
96    /// If set, only process chunks belonging to this skill.
97    pub skill_name: Option<String>,
98    /// When true, compute the report but do not write any changes.
99    pub dry_run: bool,
100}
101
102/// Replaceable governance interface (§二·六). Inject via `KnowledgeBase::open_with`.
103/// Default implementation: `BuiltinCurator`.
104pub trait Curator: Send + Sync {
105    fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport>;
106}
107
108/// Built-in curator — implements the full §四 governance pipeline.
109pub struct BuiltinCurator;
110
111impl Curator for BuiltinCurator {
112    fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport> {
113        kb.builtin_curate_impl(scope)
114    }
115}
116
117// ---------------------------------------------------------------------------
118// KnowledgeBase
119// ---------------------------------------------------------------------------
120
121pub struct KnowledgeBase {
122    pub storage: Storage,
123    embedding: Arc<dyn EmbeddingProvider>,
124    refiner: Arc<dyn Refiner>,
125    distiller: Arc<dyn Distiller>,
126    curator: Arc<dyn Curator>,
127    sanitizer: Arc<dyn Sanitizer>,
128
129    // Tuning params (loaded from meta at init)
130    w_content: f64,
131    w_trigger: f64,
132    w_confidence: f64,
133    w_context: f64,
134    top_k_candidates: usize,
135    anti_trigger_penalty: f64,
136    density_refill: bool,
137
138    low_conf_threshold: f64,
139    low_conf_idle_days: i64,
140    repeat_select_min: i64,
141    repeat_select_conf_max: f64,
142    never_used_age_days: i64,
143    open_ttl_days: i64,
144    screening_timeout_minutes: i64,
145    promote_used_success_min: i64,
146    promote_confidence_min: f64,
147    decay_floor: f64,
148    evolve_threshold: i64,
149    distill_batch_size: usize,
150    evolve_schedule_interval_hours: i64,
151    governance_archive_threshold: i64,
152    negative_feedback_archive_threshold: i64,
153    governance_evolve_threshold: i64,
154    governance_proposal_max_age_days: i64,
155    failure_min_uses: i64,
156    failure_max_success_rate: f64,
157    failure_confidence_max: f64,
158}
159
160impl KnowledgeBase {
161    pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
162        Self::open_with(db_path, None, None, None, None, None)
163    }
164
165    pub fn open_with(
166        db_path: impl AsRef<Path>,
167        embedding: Option<Arc<dyn EmbeddingProvider>>,
168        refiner: Option<Arc<dyn Refiner>>,
169        distiller: Option<Arc<dyn Distiller>>,
170        curator: Option<Arc<dyn Curator>>,
171        sanitizer: Option<Arc<dyn Sanitizer>>,
172    ) -> Result<Self> {
173        let embedding = embedding.unwrap_or_else(|| Arc::new(DummyEmbeddingProvider::default()));
174        let refiner = refiner.unwrap_or_else(|| Arc::new(NullRefiner));
175        let distiller = distiller.unwrap_or_else(|| Arc::new(HeuristicDistiller));
176        let curator = curator.unwrap_or_else(|| Arc::new(BuiltinCurator));
177        let sanitizer = sanitizer.unwrap_or_else(|| Arc::new(DefaultSanitizer));
178
179        let storage = Storage::open(db_path, embedding.content_dim(), embedding.trigger_dim())?;
180
181        let mut kb = Self {
182            storage,
183            embedding,
184            refiner,
185            distiller,
186            curator,
187            sanitizer,
188            w_content: W_CONTENT,
189            w_trigger: W_TRIGGER,
190            w_confidence: W_CONFIDENCE,
191            w_context: W_CONTEXT,
192            top_k_candidates: TOP_K_CANDIDATES,
193            anti_trigger_penalty: ANTI_TRIGGER_PENALTY,
194            density_refill: DENSITY_REFILL,
195            low_conf_threshold: LOW_CONF_THRESHOLD,
196            low_conf_idle_days: LOW_CONF_IDLE_DAYS,
197            repeat_select_min: REPEAT_SELECT_MIN,
198            repeat_select_conf_max: REPEAT_SELECT_CONF_MAX,
199            never_used_age_days: NEVER_USED_AGE_DAYS,
200            open_ttl_days: OPEN_TTL_DAYS,
201            screening_timeout_minutes: SCREENING_TIMEOUT_MINUTES,
202            promote_used_success_min: PROMOTE_USED_SUCCESS_MIN,
203            promote_confidence_min: PROMOTE_CONFIDENCE_MIN,
204            decay_floor: DECAY_FLOOR,
205            evolve_threshold: EVOLVE_THRESHOLD,
206            distill_batch_size: DISTILL_BATCH_SIZE,
207            evolve_schedule_interval_hours: 6,
208            governance_archive_threshold: GOVERNANCE_ARCHIVE_THRESHOLD,
209            negative_feedback_archive_threshold: NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
210            governance_evolve_threshold: GOVERNANCE_EVOLVE_THRESHOLD,
211            governance_proposal_max_age_days: 30,
212            failure_min_uses: FAILURE_MIN_USES,
213            failure_max_success_rate: FAILURE_MAX_SUCCESS_RATE,
214            failure_confidence_max: FAILURE_CONFIDENCE_MAX,
215        };
216        kb.init_meta()?;
217        kb.load_params()?;
218        Ok(kb)
219    }
220
221    fn init_meta(&self) -> Result<()> {
222        let lib_id = gen_uuid();
223        let content_dim = self.embedding.content_dim().to_string();
224        let trigger_dim = self.embedding.trigger_dim().to_string();
225        let embed_model = self.embedding.model_name();
226
227        for (key, expected) in [
228            ("content_dim", self.embedding.content_dim()),
229            ("trigger_dim", self.embedding.trigger_dim()),
230        ] {
231            if let Some(stored) = self.storage.get_meta(key)? {
232                let actual = stored.parse::<usize>().map_err(|_| {
233                    InnateError::Other(format!("invalid {key} metadata value: {stored}"))
234                })?;
235                if actual != expected {
236                    return Err(InnateError::Other(format!(
237                        "{key} mismatch: database uses {actual}, embedding provider uses {expected}"
238                    )));
239                }
240            }
241        }
242
243        let defaults: &[(&str, &str)] = &[
244            ("lib_id", &lib_id),
245            ("lib_role", "personal"),
246            ("schema_version", "4.14"),
247            ("content_dim", &content_dim),
248            ("trigger_dim", &trigger_dim),
249            ("embed_model", embed_model),
250            ("embed_version", "1"),
251            ("vector_revision", "0"),
252            ("last_agg_ts", "1970-01-01T00:00:00.000Z"),
253            ("recall.w_content", "0.55"),
254            ("recall.w_trigger", "0.25"),
255            ("recall.w_confidence", "0.10"),
256            ("recall.w_context", "0.15"),
257            ("recall.top_k_candidates", "20"),
258            ("recall.anti_trigger_penalty", "0.6"),
259            ("recall.density_refill", "true"),
260            ("curate.low_conf_threshold", "0.25"),
261            ("curate.low_conf_idle_days", "60"),
262            ("curate.repeat_select_min", "10"),
263            ("curate.repeat_select_conf_max", "0.5"),
264            ("curate.never_used_age_days", "30"),
265            ("curate.open_ttl_days", "14"),
266            ("curate.screening_timeout_minutes", "30"),
267            ("curate.promote_used_success_min", "3"),
268            ("curate.promote_confidence_min", "0.60"),
269            ("curate.decay_floor", "0.20"),
270            ("evolve.threshold_new_count", "5"),
271            ("evolve.distill_batch_size", "20"),
272            ("evolve.schedule_interval_hours", "6"),
273            ("curate.soft_mature_threshold", "5"),
274            ("evolve.distill_token_window_hours", "24"),
275            ("curate.governance_archive_threshold", "3"),
276            ("curate.negative_feedback_archive_threshold", "5"),
277            ("evolve.governance_pending_threshold", "3"),
278            ("curate.governance_proposal_max_age_days", "30"),
279            ("curate.failure_min_uses", "5"),
280            ("curate.failure_max_success_rate", "0.20"),
281            ("curate.failure_confidence_max", "0.35"),
282        ];
283        self.storage.begin_immediate()?;
284        let result = (|| -> Result<()> {
285            for (k, v) in defaults {
286                if self.storage.get_meta(k)?.is_none() {
287                    self.storage.set_meta(k, v)?;
288                }
289            }
290            self.storage.commit()
291        })();
292        if result.is_err() {
293            let _ = self.storage.rollback();
294        }
295        result
296    }
297
298    fn load_params(&mut self) -> Result<()> {
299        let f = |k: &str, d: f64| -> f64 {
300            self.storage
301                .get_meta(k)
302                .ok()
303                .flatten()
304                .and_then(|v| v.parse().ok())
305                .unwrap_or(d)
306        };
307        let i = |k: &str, d: i64| -> i64 {
308            self.storage
309                .get_meta(k)
310                .ok()
311                .flatten()
312                .and_then(|v| v.parse().ok())
313                .unwrap_or(d)
314        };
315        let b = |k: &str, d: bool| -> bool {
316            self.storage
317                .get_meta(k)
318                .ok()
319                .flatten()
320                .map(|v| v.to_lowercase() == "true")
321                .unwrap_or(d)
322        };
323        self.w_content = f("recall.w_content", W_CONTENT);
324        self.w_trigger = f("recall.w_trigger", W_TRIGGER);
325        self.w_confidence = f("recall.w_confidence", W_CONFIDENCE);
326        self.w_context = f("recall.w_context", W_CONTEXT);
327        self.top_k_candidates =
328            i("recall.top_k_candidates", TOP_K_CANDIDATES as i64).max(1) as usize;
329        self.anti_trigger_penalty = f("recall.anti_trigger_penalty", ANTI_TRIGGER_PENALTY);
330        self.density_refill = b("recall.density_refill", DENSITY_REFILL);
331        self.low_conf_threshold = f("curate.low_conf_threshold", LOW_CONF_THRESHOLD);
332        self.low_conf_idle_days = i("curate.low_conf_idle_days", LOW_CONF_IDLE_DAYS);
333        self.repeat_select_min = i("curate.repeat_select_min", REPEAT_SELECT_MIN);
334        self.repeat_select_conf_max = f("curate.repeat_select_conf_max", REPEAT_SELECT_CONF_MAX);
335        self.never_used_age_days = i("curate.never_used_age_days", NEVER_USED_AGE_DAYS);
336        self.open_ttl_days = i("curate.open_ttl_days", OPEN_TTL_DAYS);
337        self.screening_timeout_minutes = i(
338            "curate.screening_timeout_minutes",
339            SCREENING_TIMEOUT_MINUTES,
340        );
341        self.promote_used_success_min =
342            i("curate.promote_used_success_min", PROMOTE_USED_SUCCESS_MIN);
343        self.promote_confidence_min = f("curate.promote_confidence_min", PROMOTE_CONFIDENCE_MIN);
344        self.decay_floor = f("curate.decay_floor", DECAY_FLOOR).clamp(0.0, 0.4);
345        self.evolve_threshold = i("evolve.threshold_new_count", EVOLVE_THRESHOLD);
346        self.distill_batch_size =
347            i("evolve.distill_batch_size", DISTILL_BATCH_SIZE as i64) as usize;
348        self.evolve_schedule_interval_hours = i("evolve.schedule_interval_hours", 6).max(1);
349        self.governance_archive_threshold =
350            i("curate.governance_archive_threshold", GOVERNANCE_ARCHIVE_THRESHOLD).max(1);
351        self.negative_feedback_archive_threshold = i(
352            "curate.negative_feedback_archive_threshold",
353            NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
354        )
355        .max(1);
356        self.governance_evolve_threshold =
357            i("evolve.governance_pending_threshold", GOVERNANCE_EVOLVE_THRESHOLD).max(1);
358        self.governance_proposal_max_age_days =
359            i("curate.governance_proposal_max_age_days", 30).max(1);
360        self.failure_min_uses = i("curate.failure_min_uses", FAILURE_MIN_USES).max(1);
361        self.failure_max_success_rate = f(
362            "curate.failure_max_success_rate",
363            FAILURE_MAX_SUCCESS_RATE,
364        )
365        .clamp(0.0, 1.0);
366        self.failure_confidence_max =
367            f("curate.failure_confidence_max", FAILURE_CONFIDENCE_MAX).clamp(0.0, 1.0);
368        Ok(())
369    }
370
371    // ------------------------------------------------------------------
372    // Public API 1: recall
373    // ------------------------------------------------------------------
374
375    #[allow(clippy::too_many_arguments)]
376    pub fn recall(
377        &self,
378        query: &str,
379        budget: usize,
380        trace: bool,
381        include_sparks: bool,
382        top: Option<usize>,
383        source: &str,
384        expand_deps: &str, // "false" | "direct" | "closure"
385        allow_trim: bool,  // if true, invoke Refiner::trim when block doesn't fit
386        refine_mode: &str, // "off" | "trim" | "adapt" — recorded in trace
387    ) -> Result<RecallResult> {
388        validate_source(source)?;
389        let trace_id = gen_uuid();
390        let now = utc_now_iso();
391
392        let q_content = self
393            .embedding
394            .embed_content(query)
395            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
396        let q_trigger = self
397            .embedding
398            .embed_trigger(query)
399            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
400
401        // ANN candidates (non-spark)
402        let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
403        self.apply_soft_dep_bonus(&mut candidates)?;
404
405        // Score + anti-trigger penalty
406        let scored = self.score_candidates(candidates, query)?;
407
408        // First-fit pack with dep expansion
409        let (selected, skipped, skipped_reasons) =
410            self.pack(&scored, budget, expand_deps, allow_trim, query)?;
411
412        let depth_skipped: Vec<String> = skipped_reasons
413            .iter()
414            .filter(|(_, r)| r.as_str() == "dep_depth_limit")
415            .map(|(id, _)| id.clone())
416            .collect();
417
418        // Density refill
419        let mut selected = selected;
420        if self.density_refill {
421            selected = self.density_refill(selected, &skipped, budget);
422        }
423
424        let limited = limit_knowledge(selected, top);
425        let visible = if refine_mode == "adapt" {
426            self.refiner
427                .refine(limited.clone(), Some(budget))
428                .unwrap_or(limited)
429        } else {
430            limited
431        };
432
433        // Sparks
434        let sparks = if include_sparks {
435            self.recall_sparks(&q_content, &q_trigger)?
436        } else {
437            vec![]
438        };
439
440        if trace {
441            self.write_recall_trace(
442                &trace_id,
443                query,
444                &scored,
445                &visible,
446                &sparks,
447                &depth_skipped,
448                &skipped_reasons,
449                refine_mode,
450                source,
451                &now,
452            )?;
453        }
454
455        let empty = visible.is_empty() && sparks.is_empty();
456        Ok(RecallResult {
457            knowledge: visible,
458            sparks,
459            trace_id,
460            empty,
461            depth_skipped,
462            skipped_reasons,
463        })
464    }
465
466    fn ann_candidates(
467        &self,
468        q_content: &[f32],
469        q_trigger: &[f32],
470    ) -> Result<HashMap<String, CandidateInfo>> {
471        let embed_version = self
472            .storage
473            .get_meta("embed_version")?
474            .and_then(|v| v.parse::<i64>().ok())
475            .unwrap_or(1);
476
477        let content_res = self
478            .storage
479            .search_vec_content(q_content, self.top_k_candidates * 2)?;
480        let trigger_res = self
481            .storage
482            .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
483
484        // Collect unique ids and batch-fetch all chunks in two queries instead of N individual ones.
485        let all_ids: Vec<&str> = {
486            let mut seen = HashSet::new();
487            content_res
488                .iter()
489                .chain(trigger_res.iter())
490                .map(|(id, _)| id.as_str())
491                .filter(|id| seen.insert(*id))
492                .collect()
493        };
494        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
495
496        let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
497
498        for (cid, sim) in &content_res {
499            if let Some(chunk) = chunks.get(cid) {
500                if chunk_is_valid_for_recall(chunk, embed_version) {
501                    let e = candidates
502                        .entry(cid.clone())
503                        .or_insert_with(|| CandidateInfo {
504                            chunk: chunk.clone(),
505                            sim_content: 0.0,
506                            sim_trigger: 0.0,
507                        });
508                    e.sim_content = e.sim_content.max(*sim);
509                }
510            }
511        }
512        for (cid, sim) in &trigger_res {
513            if let Some(chunk) = chunks.get(cid) {
514                if chunk_is_valid_for_recall(chunk, embed_version) {
515                    let e = candidates
516                        .entry(cid.clone())
517                        .or_insert_with(|| CandidateInfo {
518                            chunk: chunk.clone(),
519                            sim_content: 0.0,
520                            sim_trigger: 0.0,
521                        });
522                    e.sim_trigger = e.sim_trigger.max(*sim);
523                }
524            }
525        }
526        Ok(candidates)
527    }
528
529    fn apply_soft_dep_bonus(&self, candidates: &mut HashMap<String, CandidateInfo>) -> Result<()> {
530        let ids: Vec<String> = candidates.keys().cloned().collect();
531        for cid in ids {
532            if candidates[&cid].chunk.get("origin").and_then(Value::as_str) == Some("spark") {
533                continue;
534            }
535            let deps = self.storage.get_deps(&cid)?;
536            for (dst, kind, _) in &deps {
537                if kind != "soft" {
538                    continue;
539                }
540                if let Some(target) = self.storage.get_chunk(dst)? {
541                    if target.get("state").and_then(Value::as_str) == Some("archived") {
542                        continue;
543                    }
544                    if target.get("origin").and_then(Value::as_str) == Some("spark") {
545                        continue;
546                    }
547                    let e = candidates
548                        .entry(dst.clone())
549                        .or_insert_with(|| CandidateInfo {
550                            chunk: target,
551                            sim_content: 0.0,
552                            sim_trigger: 0.0,
553                        });
554                    e.sim_content = (e.sim_content + 0.05).min(1.0);
555                }
556            }
557        }
558        Ok(())
559    }
560
561    fn score_candidates(
562        &self,
563        candidates: HashMap<String, CandidateInfo>,
564        query: &str,
565    ) -> Result<Vec<(f64, Value)>> {
566        let context_key = content_hash(&normalize_query(query));
567        let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
568        for info in candidates.into_values() {
569            let conf = info
570                .chunk
571                .get("confidence")
572                .and_then(Value::as_f64)
573                .unwrap_or(0.5);
574            let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
575            let context_score = self.storage.context_score(chunk_id, &context_key)?;
576            let mut fused = self.w_content * info.sim_content as f64
577                + self.w_trigger * info.sim_trigger as f64
578                + self.w_confidence * conf
579                + self.w_context * context_score;
580            if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
581                fused *= PENDING_RECALL_PENALTY;
582            }
583            let anti = info
584                .chunk
585                .get("anti_trigger_desc")
586                .and_then(Value::as_str)
587                .unwrap_or("");
588            if !anti.is_empty() && anti_trigger_hit(query, anti) {
589                fused *= self.anti_trigger_penalty;
590            }
591            let mut chunk = info.chunk;
592            chunk["_context_score"] = json!(context_score);
593            chunk["_fused_score"] = json!(fused);
594            scored.push((fused, chunk));
595        }
596        scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
597        scored.truncate(self.top_k_candidates);
598        Ok(scored)
599    }
600
601    fn pack(
602        &self,
603        scored: &[(f64, Value)],
604        budget: usize,
605        expand_deps: &str,
606        allow_trim: bool,
607        query: &str,
608    ) -> Result<PackResult> {
609        let mut selected: Vec<Value> = vec![];
610        let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
611        let mut skipped_reasons: HashMap<String, String> = HashMap::new();
612        let mut used_ids: HashSet<String> = HashSet::new();
613        let mut used_tokens: usize = 0;
614
615        for (fused, chunk) in scored {
616            let cid = chunk["id"].as_str().unwrap_or("").to_string();
617            if used_ids.contains(&cid) {
618                continue;
619            }
620
621            // Build block with dep expansion; fail-closed on dep issues.
622            let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
623            if let Some(reason) = dep_skip_reason {
624                skipped_reasons.insert(cid, reason);
625                continue;
626            }
627
628            let new_block: Vec<Value> = block
629                .iter()
630                .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
631                .cloned()
632                .collect();
633            let cost = block_cost(&new_block);
634
635            if used_tokens + cost <= budget {
636                for b in &block {
637                    let bid = b["id"].as_str().unwrap_or("").to_string();
638                    if !used_ids.contains(&bid) {
639                        let mut b = b.clone();
640                        b["_fused_score"] = json!(fused);
641                        selected.push(b);
642                        used_ids.insert(bid);
643                    }
644                }
645                used_tokens += cost;
646            } else if allow_trim {
647                // Attempt refiner trim — NullRefiner returns None (no-op).
648                if let Some(trimmed) =
649                    self.refiner
650                        .trim(&block, query, budget.saturating_sub(used_tokens))
651                {
652                    let trim_cost = block_cost(&trimmed);
653                    if used_tokens + trim_cost <= budget {
654                        for b in &trimmed {
655                            let bid = b["id"].as_str().unwrap_or("").to_string();
656                            if !used_ids.contains(&bid) {
657                                let mut b = b.clone();
658                                b["_fused_score"] = json!(fused);
659                                b["_trimmed"] = json!(true);
660                                selected.push(b);
661                                used_ids.insert(bid);
662                            }
663                        }
664                        used_tokens += trim_cost;
665                        continue;
666                    }
667                }
668                skipped.push((block, *fused, cost));
669            } else {
670                skipped.push((block, *fused, cost));
671            }
672        }
673        Ok((selected, skipped, skipped_reasons))
674    }
675
676    /// Expand a seed chunk into a block according to `expand_deps`.
677    /// Returns `(block, Some(skip_reason))` if the block should be discarded (fail-closed).
678    fn build_dep_block(
679        &self,
680        seed: &Value,
681        expand_deps: &str,
682    ) -> Result<(Vec<Value>, Option<String>)> {
683        if expand_deps == "false" || expand_deps.is_empty() {
684            return Ok((vec![seed.clone()], None));
685        }
686        let seed_id = seed["id"].as_str().unwrap_or("");
687        match expand_deps {
688            "direct" => {
689                let deps = self.storage.get_deps(seed_id)?;
690                let mut block = vec![seed.clone()];
691                for (dep_id, kind, _) in &deps {
692                    if kind != "hard" {
693                        continue;
694                    }
695                    match self.validate_hard_dep(dep_id)? {
696                        Some(chunk) => block.push(chunk),
697                        None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
698                    }
699                }
700                Ok((block, None))
701            }
702            "closure" => {
703                let mut block = vec![seed.clone()];
704                let mut visited: HashSet<String> = [seed_id.to_string()].into();
705                match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
706                    Some(reason) => Ok((vec![], Some(reason))),
707                    None => Ok((block, None)),
708                }
709            }
710            _ => Ok((vec![seed.clone()], None)),
711        }
712    }
713
714    /// Returns the chunk if the hard dep is usable, None if it should cause fail-closed.
715    fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
716        match self.storage.get_chunk(dep_id)? {
717            None => Ok(None),
718            Some(chunk) => {
719                let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
720                let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
721                let embed_v = chunk
722                    .get("embed_version")
723                    .and_then(Value::as_i64)
724                    .unwrap_or(0);
725                if state == "archived" || origin == "spark" || embed_v == 0 {
726                    Ok(None)
727                } else {
728                    Ok(Some(chunk))
729                }
730            }
731        }
732    }
733
734    /// BFS hard-dep expansion up to `max_depth`. Returns Some(reason) on fail-closed.
735    fn expand_hard_closure(
736        &self,
737        id: &str,
738        visited: &mut HashSet<String>,
739        block: &mut Vec<Value>,
740        depth: usize,
741        max_depth: usize,
742    ) -> Result<Option<String>> {
743        if depth >= max_depth {
744            return Ok(Some("dep_depth_limit".to_string()));
745        }
746        let deps = self.storage.get_deps(id)?;
747        for (dep_id, kind, _) in &deps {
748            if kind != "hard" {
749                continue;
750            }
751            if visited.contains(dep_id) {
752                continue;
753            } // cycle guard
754            visited.insert(dep_id.clone());
755            match self.validate_hard_dep(dep_id)? {
756                None => return Ok(Some("hard_dep_unavailable".to_string())),
757                Some(chunk) => {
758                    block.push(chunk);
759                    if let Some(reason) =
760                        self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
761                    {
762                        return Ok(Some(reason));
763                    }
764                }
765            }
766        }
767        Ok(None)
768    }
769
770    fn density_refill(
771        &self,
772        mut selected: Vec<Value>,
773        skipped: &[(Vec<Value>, f64, usize)],
774        budget: usize,
775    ) -> Vec<Value> {
776        let used_tokens = block_cost(&selected);
777        if used_tokens >= budget {
778            return selected;
779        }
780
781        let selected_ids: HashSet<String> = selected
782            .iter()
783            .filter_map(|c| c["id"].as_str().map(str::to_string))
784            .collect();
785
786        let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
787            .iter()
788            .filter_map(|(block, fscore, _)| {
789                let block: Vec<Value> = block
790                    .iter()
791                    .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
792                    .cloned()
793                    .collect();
794                if block.is_empty() {
795                    return None;
796                }
797                let cost = block_cost(&block);
798                let density = fscore / cost.max(1) as f64;
799                Some((density, block, cost))
800            })
801            .collect();
802        density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
803
804        let mut used_tokens = block_cost(&selected);
805        let mut added_ids: HashSet<String> = selected_ids;
806        for (_, block, cost) in density_items {
807            if used_tokens + cost <= budget {
808                for b in block {
809                    let bid = b["id"].as_str().unwrap_or("").to_string();
810                    if !added_ids.contains(&bid) {
811                        selected.push(b);
812                        added_ids.insert(bid);
813                    }
814                }
815                used_tokens += cost;
816            }
817        }
818        selected
819    }
820
821    fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
822        let embed_version = self
823            .storage
824            .get_meta("embed_version")?
825            .and_then(|v| v.parse::<i64>().ok())
826            .unwrap_or(1);
827
828        let content_res = self
829            .storage
830            .search_vec_content(q_content, self.top_k_candidates)?;
831        let trigger_res = self
832            .storage
833            .search_vec_trigger(q_trigger, self.top_k_candidates)?;
834
835        // Batch-fetch all candidate chunk IDs (mirrors the pattern in ann_candidates).
836        let all_ids: Vec<&str> = {
837            let mut seen = HashSet::new();
838            content_res
839                .iter()
840                .chain(trigger_res.iter())
841                .map(|(id, _)| id.as_str())
842                .filter(|id| seen.insert(*id))
843                .collect()
844        };
845        let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
846
847        let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
848        for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
849            if let Some(chunk) = chunks.get(cid) {
850                if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
851                    continue;
852                }
853                if chunk.get("state").and_then(Value::as_str) == Some("archived") {
854                    continue;
855                }
856                let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
857                if maturity == "promoted" || maturity == "dropped" {
858                    continue;
859                }
860                let ev = chunk
861                    .get("embed_version")
862                    .and_then(Value::as_i64)
863                    .unwrap_or(1);
864                if ev < embed_version {
865                    continue;
866                }
867                let entry = spark_scores
868                    .entry(cid.clone())
869                    .or_insert_with(|| (*sim, chunk.clone()));
870                if *sim > entry.0 {
871                    *entry = (*sim, chunk.clone());
872                }
873            }
874        }
875        let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
876        sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
877        Ok(sparks
878            .into_iter()
879            .take(self.top_k_candidates)
880            .map(|(_, c)| c)
881            .collect())
882    }
883
884    #[allow(clippy::too_many_arguments)]
885    fn write_recall_trace(
886        &self,
887        trace_id: &str,
888        query: &str,
889        scored: &[(f64, Value)],
890        visible: &[Value],
891        sparks: &[Value],
892        depth_skipped: &[String],
893        skipped_reasons: &HashMap<String, String>,
894        refine_mode: &str,
895        source: &str,
896        now: &str,
897    ) -> Result<()> {
898        let lib_id = self.storage.lib_id()?;
899        self.storage.begin_immediate()?;
900        let result = (|| -> Result<()> {
901            for (rank, (_, chunk)) in scored.iter().enumerate() {
902                let cid = chunk["id"].as_str().unwrap_or("");
903                let sim = chunk.get("_fused_score").and_then(Value::as_f64);
904                // For dep-skipped seeds, record their skip reason as refine_mode.
905                let rm = skipped_reasons
906                    .get(cid)
907                    .map(|r| format!("skipped:{r}"))
908                    .or_else(|| {
909                        if refine_mode != "off" && !refine_mode.is_empty() {
910                            Some(refine_mode.to_string())
911                        } else {
912                            None
913                        }
914                    });
915                self.storage.insert_usage_trace(
916                    trace_id,
917                    Some(cid),
918                    "retrieved",
919                    1.0,
920                    sim,
921                    rm.as_deref(),
922                    None,
923                    Some((rank + 1) as i64),
924                    None,
925                    source,
926                    now,
927                )?;
928            }
929            for (rank, chunk) in visible.iter().enumerate() {
930                let cid = chunk["id"].as_str().unwrap_or("");
931                self.storage.insert_usage_trace(
932                    trace_id,
933                    Some(cid),
934                    "selected",
935                    1.0,
936                    None,
937                    None,
938                    None,
939                    Some((rank + 1) as i64),
940                    None,
941                    source,
942                    now,
943                )?;
944                // Write 'refined' event for chunks that came through the trim path.
945                if chunk
946                    .get("_trimmed")
947                    .and_then(Value::as_bool)
948                    .unwrap_or(false)
949                {
950                    self.storage.insert_usage_trace(
951                        trace_id,
952                        Some(cid),
953                        "refined",
954                        1.0,
955                        None,
956                        Some("trim"),
957                        None,
958                        Some((rank + 1) as i64),
959                        None,
960                        source,
961                        now,
962                    )?;
963                }
964            }
965            // Write 'retrieved' events for sparks (for recurring-spark count tracking).
966            for (rank, chunk) in sparks.iter().enumerate() {
967                let cid = chunk["id"].as_str().unwrap_or("");
968                self.storage.insert_usage_trace(
969                    trace_id,
970                    Some(cid),
971                    "retrieved",
972                    1.0,
973                    None,
974                    Some("spark"),
975                    None,
976                    Some((rank + 1) as i64),
977                    None,
978                    source,
979                    now,
980                )?;
981            }
982            let snapshot = json!({
983                "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
984                "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
985                "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
986                "depth_skipped": depth_skipped,
987                "skipped_reasons": skipped_reasons,
988            });
989            let log = EpisodicLogRow {
990                id: gen_uuid(),
991                trace_id: trace_id.to_string(),
992                lib_id,
993                ts: now.to_string(),
994                query: Some(query.to_string()),
995                recall_snapshot: Some(snapshot.to_string()),
996                event_source: source.to_string(),
997                task_state: "recalled".to_string(),
998                usage_state: "unknown".to_string(),
999                context_key: Some(content_hash(&normalize_query(query))),
1000                distill_state: "open".to_string(),
1001                ..Default::default()
1002            };
1003            self.storage.upsert_episodic_log(&log)?;
1004            self.storage.commit()
1005        })();
1006        if result.is_err() {
1007            let _ = self.storage.rollback();
1008        }
1009        result
1010    }
1011
1012    // ------------------------------------------------------------------
1013    // Public API 2: record
1014    // ------------------------------------------------------------------
1015
1016    #[allow(clippy::too_many_arguments)]
1017    pub fn record(
1018        &self,
1019        trace_id: &str,
1020        query: Option<&str>,
1021        output: Option<&str>,
1022        output_summary: Option<&str>,
1023        outcome: Option<&str>,
1024        used: Option<&[String]>,
1025        feedback_up: Option<&[String]>,
1026        feedback_down: Option<&[String]>,
1027        nomination: Option<&str>,
1028        priority: i64,
1029        source: &str,
1030    ) -> Result<()> {
1031        self.record_detailed(
1032            trace_id,
1033            query,
1034            output,
1035            output_summary,
1036            outcome,
1037            used,
1038            "explicit",
1039            true,
1040            feedback_up,
1041            feedback_down,
1042            "user",
1043            None,
1044            None,
1045            nomination,
1046            priority,
1047            None,
1048            source,
1049        )
1050    }
1051
1052    #[allow(clippy::too_many_arguments)]
1053    pub fn record_detailed(
1054        &self,
1055        trace_id: &str,
1056        query: Option<&str>,
1057        output: Option<&str>,
1058        output_summary: Option<&str>,
1059        outcome: Option<&str>,
1060        used: Option<&[String]>,
1061        used_attribution: &str,
1062        used_complete: bool,
1063        feedback_up: Option<&[String]>,
1064        feedback_down: Option<&[String]>,
1065        feedback_kind: &str,
1066        feedback_actor: Option<&str>,
1067        feedback_reason: Option<&str>,
1068        nomination: Option<&str>,
1069        priority: i64,
1070        task_state: Option<&str>,
1071        source: &str,
1072    ) -> Result<()> {
1073        let dedupe_ids = |ids: &[String]| {
1074            let mut seen = HashSet::new();
1075            ids.iter()
1076                .filter(|id| seen.insert((*id).clone()))
1077                .cloned()
1078                .collect::<Vec<_>>()
1079        };
1080        let normalized_used = used.map(dedupe_ids);
1081        let normalized_feedback_up = feedback_up.map(dedupe_ids);
1082        let normalized_feedback_down = feedback_down.map(dedupe_ids);
1083        let used = normalized_used.as_deref();
1084        let feedback_up = normalized_feedback_up.as_deref();
1085        let feedback_down = normalized_feedback_down.as_deref();
1086
1087        if let Some(o) = outcome {
1088            if !matches!(o, "ok" | "fail" | "unknown") {
1089                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
1090            }
1091        }
1092        if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
1093            return Err(InnateError::InvalidState(format!(
1094                "invalid used attribution: {used_attribution}"
1095            )));
1096        }
1097        if !matches!(feedback_kind, "user" | "judge") {
1098            return Err(InnateError::InvalidState(format!(
1099                "invalid feedback kind: {feedback_kind}"
1100            )));
1101        }
1102        if let Some(state) = task_state {
1103            if !matches!(
1104                state,
1105                "recalled" | "running" | "completed" | "abandoned" | "timed_out"
1106            ) {
1107                return Err(InnateError::InvalidState(format!(
1108                    "invalid task state: {state}"
1109                )));
1110            }
1111        }
1112        validate_source(source)?;
1113        if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
1114            let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
1115            if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
1116                return Err(InnateError::InvalidState(format!(
1117                    "conflicting feedback for chunk {chunk_id}"
1118                )));
1119            }
1120        }
1121        let effective_priority = if nomination.is_some() && priority == 0 {
1122            1
1123        } else {
1124            priority
1125        };
1126        let now = utc_now_iso();
1127        let lib_id = self.storage.lib_id()?;
1128
1129        self.storage.begin_immediate()?;
1130        let result = (|| -> Result<()> {
1131            let log = self.storage.get_episodic_log(trace_id)?;
1132            let mut is_fresh_insert = false;
1133            let log = match log {
1134                Some(l) => l,
1135                None => {
1136                    let used_ids = used.map(serde_json::to_string).transpose()?;
1137                    let row = EpisodicLogRow {
1138                        id: gen_uuid(),
1139                        trace_id: trace_id.to_string(),
1140                        lib_id,
1141                        ts: now.clone(),
1142                        query: query.map(str::to_string).or_else(|| Some(String::new())),
1143                        output: output.map(str::to_string),
1144                        output_summary: output_summary.map(str::to_string),
1145                        outcome: outcome.map(str::to_string),
1146                        event_source: source.to_string(),
1147                        task_state: if matches!(outcome, Some("ok") | Some("fail")) {
1148                            "completed".to_string()
1149                        } else {
1150                            task_state.unwrap_or("running").to_string()
1151                        },
1152                        completed_at: matches!(outcome, Some("ok") | Some("fail"))
1153                            .then(|| now.clone()),
1154                        usage_state: usage_state(used).to_string(),
1155                        used_ids,
1156                        used_attribution: used.map(|_| used_attribution.to_string()),
1157                        used_complete,
1158                        context_key: query.map(|q| content_hash(&normalize_query(q))),
1159                        nomination: nomination.map(str::to_string),
1160                        priority: effective_priority,
1161                        distill_state: "open".to_string(),
1162                        ..Default::default()
1163                    };
1164                    self.storage.upsert_episodic_log(&row)?;
1165                    is_fresh_insert = true;
1166                    self.storage.get_episodic_log(trace_id)?.unwrap()
1167                }
1168            };
1169            self.validate_trace_attribution(trace_id, used, "used")?;
1170            self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
1171            self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
1172
1173            let existing_outcome = log
1174                .get("outcome")
1175                .and_then(Value::as_str)
1176                .map(str::to_string);
1177            // usage_trace: used
1178            let effective_used_attribution = if used.is_some() {
1179                used_attribution
1180            } else {
1181                log.get("used_attribution")
1182                    .and_then(Value::as_str)
1183                    .unwrap_or(used_attribution)
1184            };
1185            let used_strength = match effective_used_attribution {
1186                "explicit" => 0.3,
1187                "cited" => 0.25,
1188                "inferred" => 0.15,
1189                _ => unreachable!(),
1190            };
1191            let existing_used_ids: Vec<String> = log
1192                .get("used_ids")
1193                .and_then(Value::as_str)
1194                .and_then(|raw| serde_json::from_str(raw).ok())
1195                .unwrap_or_default();
1196            let existing_used_complete = log
1197                .get("used_complete")
1198                .and_then(Value::as_i64)
1199                .unwrap_or(0)
1200                != 0;
1201            let effective_used_complete = used_complete || existing_used_complete;
1202            let effective_used_ids = used.map(|reported| {
1203                if used_complete {
1204                    reported.to_vec()
1205                } else {
1206                    let mut merged = existing_used_ids.clone();
1207                    let mut seen: HashSet<String> = merged.iter().cloned().collect();
1208                    merged.extend(
1209                        reported
1210                            .iter()
1211                            .filter(|id| seen.insert((*id).clone()))
1212                            .cloned(),
1213                    );
1214                    merged
1215                }
1216            });
1217            if let Some(used_ids) = effective_used_ids.as_deref() {
1218                let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
1219                if used_complete {
1220                    self.storage.replace_used_trace(
1221                        trace_id,
1222                        used_ids,
1223                        used_strength,
1224                        used_attribution,
1225                        source,
1226                        &now,
1227                    )?;
1228                } else if let Some(reported) = used {
1229                    self.storage.merge_used_trace(
1230                        trace_id,
1231                        reported,
1232                        used_strength,
1233                        used_attribution,
1234                        source,
1235                        &now,
1236                    )?;
1237                }
1238                let affected: HashSet<String> = previously_used
1239                    .into_iter()
1240                    .chain(used_ids.iter().cloned())
1241                    .collect();
1242                for cid in affected {
1243                    self.storage.refresh_chunk_last_used(&cid, &now)?;
1244                }
1245            }
1246
1247            // usage_trace: task_ok / task_fail
1248            if let Some(o) = outcome {
1249                if matches!(o, "ok" | "fail") {
1250                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
1251                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
1252                    self.storage.conn_execute(
1253                        "DELETE FROM usage_trace
1254                         WHERE trace_id=? AND event IN ('task_ok','task_fail')
1255                           AND chunk_id IS NULL",
1256                        rusqlite::params![trace_id],
1257                    )?;
1258                    self.storage.insert_usage_trace(
1259                        trace_id, None, event, strength, None, None, None, None, None, source, &now,
1260                    )?;
1261                }
1262            }
1263
1264            // Rebuild trace-derived evidence whenever either side of the pair arrives.
1265            // This makes `outcome → used` and `used → outcome` equivalent and lets a
1266            // later complete usage declaration replace an earlier one safely.
1267            let effective_outcome = outcome
1268                .filter(|value| *value != "unknown")
1269                .or(existing_outcome.as_deref().filter(|value| *value != "unknown"));
1270            if let Some(o @ ("ok" | "fail")) = effective_outcome {
1271                if used.is_some()
1272                    || (outcome.is_some_and(|value| value != "unknown")
1273                        && existing_outcome.as_deref() != outcome)
1274                {
1275                    let fallback_ids: Vec<String>;
1276                    let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
1277                        effective_used_ids.as_deref()
1278                    } else {
1279                        fallback_ids = log
1280                            .get("used_ids")
1281                            .and_then(Value::as_str)
1282                            .and_then(|s| serde_json::from_str(s).ok())
1283                            .unwrap_or_default();
1284                        if fallback_ids.is_empty() {
1285                            None
1286                        } else {
1287                            Some(&fallback_ids)
1288                        }
1289                    };
1290                    let effective_complete = if used.is_some() {
1291                        effective_used_complete
1292                    } else {
1293                        log.get("usage_state").and_then(Value::as_str) != Some("unknown")
1294                            && log
1295                                .get("used_complete")
1296                            .and_then(Value::as_i64)
1297                            .unwrap_or(1)
1298                            != 0
1299                    };
1300                    self.replace_outcome_evidence(
1301                        trace_id,
1302                        o,
1303                        effective_used,
1304                        effective_complete,
1305                        &now,
1306                    )?;
1307                }
1308            } else if used.is_some() && effective_used_complete {
1309                self.replace_selected_unused_evidence(
1310                    trace_id,
1311                    effective_used_ids.as_deref().unwrap_or_default(),
1312                    &now,
1313                )?;
1314            }
1315
1316            let context_key = log
1317                .get("context_key")
1318                .and_then(Value::as_str)
1319                .map(str::to_string)
1320                .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
1321            let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
1322
1323            // Persist feedback facts before reducing them into confidence.
1324            // INSERT OR IGNORE: skip all derived updates for duplicate (trace_id, chunk_id, signal).
1325            // Track affected chunks so we only rebuild their context_stats (not the full table).
1326            let mut context_affected: HashSet<String> = HashSet::new();
1327            if let Some(used_ids) = effective_used_ids.as_deref() {
1328                for cid in used_ids {
1329                    context_affected.insert(cid.clone());
1330                }
1331            }
1332            if let Some(ups) = feedback_up {
1333                for cid in ups {
1334                    let corrected = self.storage
1335                        .delete_feedback_event(trace_id, cid, "down")?;
1336                    self.storage
1337                        .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_down")?;
1338                    let inserted = self.storage.insert_feedback_event(
1339                        &gen_uuid(),
1340                        trace_id,
1341                        cid,
1342                        "up",
1343                        feedback_strength,
1344                        source,
1345                        feedback_actor,
1346                        feedback_reason,
1347                        context_key.as_deref(),
1348                        &now,
1349                    )?;
1350                    if inserted > 0 {
1351                        self.upsert_trace_confidence_evidence(
1352                            trace_id,
1353                            cid,
1354                            "feedback_up",
1355                            1.0,
1356                            feedback_strength,
1357                            if feedback_kind == "judge" {
1358                                "judge_up"
1359                            } else {
1360                                "user_up"
1361                            },
1362                            context_key.as_deref(),
1363                            &now,
1364                            true,
1365                        )?;
1366                        self.storage.update_chunk_last_used(cid, &now)?;
1367                        self.refresh_governance_evidence(cid, &now)?;
1368                        context_affected.insert(cid.clone());
1369                    } else if corrected > 0 {
1370                        self.recompute_chunk_confidence(cid, &now)?;
1371                        self.refresh_governance_evidence(cid, &now)?;
1372                        context_affected.insert(cid.clone());
1373                    }
1374                }
1375            }
1376            if let Some(downs) = feedback_down {
1377                for cid in downs {
1378                    let corrected = self.storage
1379                        .delete_feedback_event(trace_id, cid, "up")?;
1380                    self.storage
1381                        .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_up")?;
1382                    let inserted = self.storage.insert_feedback_event(
1383                        &gen_uuid(),
1384                        trace_id,
1385                        cid,
1386                        "down",
1387                        feedback_strength,
1388                        source,
1389                        feedback_actor,
1390                        feedback_reason,
1391                        context_key.as_deref(),
1392                        &now,
1393                    )?;
1394                    if inserted > 0 {
1395                        self.upsert_trace_confidence_evidence(
1396                            trace_id,
1397                            cid,
1398                            "feedback_down",
1399                            0.0,
1400                            feedback_strength,
1401                            if feedback_kind == "judge" {
1402                                "judge_down"
1403                            } else {
1404                                "user_down"
1405                            },
1406                            context_key.as_deref(),
1407                            &now,
1408                            true,
1409                        )?;
1410                        self.refresh_governance_evidence(cid, &now)?;
1411                        context_affected.insert(cid.clone());
1412                    } else if corrected > 0 {
1413                        self.recompute_chunk_confidence(cid, &now)?;
1414                        self.refresh_governance_evidence(cid, &now)?;
1415                        context_affected.insert(cid.clone());
1416                    }
1417                }
1418            }
1419            // Targeted rebuild — only update context_stats for chunks touched in this call.
1420            self.rebuild_context_stats_for(&context_affected, &now)?;
1421
1422            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
1423            if !is_fresh_insert {
1424                self.storage.patch_episodic_log_content(
1425                    trace_id,
1426                    query,
1427                    output,
1428                    output_summary,
1429                    nomination,
1430                    effective_priority,
1431                )?;
1432            }
1433
1434            let lifecycle_state = if effective_outcome.is_some() {
1435                "completed"
1436            } else {
1437                task_state.unwrap_or_else(|| {
1438                    log.get("task_state")
1439                        .and_then(Value::as_str)
1440                        .unwrap_or("running")
1441                })
1442            };
1443            let used_ids_json = effective_used_ids
1444                .as_deref()
1445                .map(serde_json::to_string)
1446                .transpose()?;
1447            self.storage.update_trace_lifecycle(
1448                trace_id,
1449                lifecycle_state,
1450                (lifecycle_state == "completed").then_some(now.as_str()),
1451                effective_used_ids
1452                    .as_deref()
1453                    .map(|ids| usage_state(Some(ids))),
1454                used_ids_json.as_deref(),
1455                used.map(|_| used_attribution),
1456                used.map(|_| effective_used_complete),
1457            )?;
1458
1459            // Update episodic log
1460            let current_state = log
1461                .get("distill_state")
1462                .and_then(Value::as_str)
1463                .unwrap_or("open");
1464            let lifecycle_completed = lifecycle_state == "completed";
1465            let has_material = output_summary.is_some()
1466                || nomination.is_some()
1467                || output.is_some()
1468                || log.get("output_summary").and_then(Value::as_str).is_some()
1469                || log.get("nomination").and_then(Value::as_str).is_some()
1470                || log.get("output").and_then(Value::as_str).is_some();
1471            let retryable_discard = current_state == "discarded"
1472                && matches!(
1473                    log.get("distill_note").and_then(Value::as_str),
1474                    Some("insufficient_material" | "abandoned" | "timed_out")
1475                );
1476            let new_state = if current_state == "open"
1477                && matches!(lifecycle_state, "abandoned" | "timed_out")
1478            {
1479                Some("discarded")
1480            } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
1481                if has_material {
1482                    Some("new")
1483                } else {
1484                    Some("discarded")
1485                }
1486            } else {
1487                None
1488            };
1489            if let Some(state) = new_state {
1490                let note = if state == "discarded" {
1491                    Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
1492                        lifecycle_state
1493                    } else {
1494                        "insufficient_material"
1495                    })
1496                } else {
1497                    None
1498                };
1499                let outcome_str = outcome.map(str::to_string);
1500                self.storage.update_episodic_log_state(
1501                    trace_id,
1502                    state,
1503                    note,
1504                    outcome_str.as_deref(),
1505                )?;
1506                if state == "new" && retryable_discard {
1507                    self.storage.conn_execute(
1508                        "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
1509                        rusqlite::params![trace_id],
1510                    )?;
1511                }
1512            } else if outcome.is_some() {
1513                let outcome_str = outcome.map(str::to_string);
1514                self.storage.update_episodic_log_state(
1515                    trace_id,
1516                    current_state,
1517                    None,
1518                    outcome_str.as_deref(),
1519                )?;
1520            }
1521
1522            self.storage.commit()
1523        })();
1524        if result.is_err() {
1525            let _ = self.storage.rollback();
1526        }
1527        result?;
1528        self.enqueue_evolve_if_needed(&now)?;
1529        Ok(())
1530    }
1531
1532    fn validate_trace_attribution(
1533        &self,
1534        trace_id: &str,
1535        chunk_ids: Option<&[String]>,
1536        field: &str,
1537    ) -> Result<()> {
1538        let Some(chunk_ids) = chunk_ids else {
1539            return Ok(());
1540        };
1541        if chunk_ids.is_empty() {
1542            return Ok(());
1543        }
1544
1545        let log = self.storage.get_episodic_log(trace_id)?.ok_or_else(|| {
1546            InnateError::InvalidState(format!(
1547                "{field} requires a trace created by recall: {trace_id}"
1548            ))
1549        })?;
1550        let mut attributable = HashSet::new();
1551        if let Some(raw) = log.get("recall_snapshot").and_then(Value::as_str) {
1552            if let Ok(snapshot) = serde_json::from_str::<Value>(raw) {
1553                if let Some(ids) = snapshot.get("selected").and_then(Value::as_array) {
1554                    attributable.extend(ids.iter().filter_map(Value::as_str).map(str::to_string));
1555                }
1556            }
1557        }
1558        let rows = self.storage.query_chunks_params(
1559            "SELECT DISTINCT chunk_id FROM usage_trace
1560             WHERE trace_id=? AND chunk_id IS NOT NULL
1561               AND event='selected'",
1562            rusqlite::params![trace_id],
1563        )?;
1564        attributable.extend(rows.iter().filter_map(|row| {
1565            row.get("chunk_id")
1566                .and_then(Value::as_str)
1567                .map(str::to_string)
1568        }));
1569
1570        for chunk_id in chunk_ids {
1571            if self.storage.get_chunk(chunk_id)?.is_none() || !attributable.contains(chunk_id) {
1572                return Err(InnateError::InvalidState(format!(
1573                    "{field} chunk {chunk_id} was not attributable to trace {trace_id}"
1574                )));
1575            }
1576        }
1577        Ok(())
1578    }
1579
1580    fn replace_selected_unused_evidence(
1581        &self,
1582        trace_id: &str,
1583        used_ids: &[String],
1584        now: &str,
1585    ) -> Result<()> {
1586        let old_rows = self.storage.query_chunks_params(
1587            "SELECT DISTINCT chunk_id FROM confidence_evidence
1588             WHERE trace_id=? AND kind='selected_unused'",
1589            rusqlite::params![trace_id],
1590        )?;
1591        let mut affected: HashSet<String> = old_rows
1592            .iter()
1593            .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1594            .collect();
1595        self.storage
1596            .delete_trace_confidence_evidence(trace_id, &["selected_unused"])?;
1597
1598        let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1599        let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1600            log.get("context_key")
1601                .and_then(Value::as_str)
1602                .map(str::to_string)
1603        });
1604        let selected_rows = self.storage.query_chunks_params(
1605            "SELECT chunk_id FROM usage_trace
1606             WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1607            rusqlite::params![trace_id],
1608        )?;
1609        for row in selected_rows {
1610            if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1611                if !used_set.contains(chunk_id) {
1612                    self.upsert_trace_confidence_evidence(
1613                        trace_id,
1614                        chunk_id,
1615                        "selected_unused",
1616                        0.0,
1617                        0.08,
1618                        "selected_unused",
1619                        context_key.as_deref(),
1620                        now,
1621                        false,
1622                    )?;
1623                    affected.insert(chunk_id.to_string());
1624                }
1625            }
1626        }
1627        for chunk_id in affected {
1628            self.recompute_chunk_confidence(&chunk_id, now)?;
1629        }
1630        Ok(())
1631    }
1632
1633    #[allow(clippy::too_many_arguments)]
1634    fn upsert_trace_confidence_evidence(
1635        &self,
1636        trace_id: &str,
1637        chunk_id: &str,
1638        kind: &str,
1639        target: f64,
1640        strength: f64,
1641        reason: &str,
1642        context_key: Option<&str>,
1643        now: &str,
1644        explicit: bool,
1645    ) -> Result<()> {
1646        let chunk = match self.storage.get_chunk(chunk_id)? {
1647            Some(chunk) => chunk,
1648            None => return Ok(()),
1649        };
1650        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1651            return Ok(());
1652        }
1653        let recency_weight = if explicit {
1654            const KAPPA: f64 = 0.5;
1655            const WINDOW_DAYS: f64 = 14.0;
1656            let gap_days = chunk
1657                .get("last_used_at")
1658                .and_then(Value::as_str)
1659                .map(|ts| iso_days_diff(now, ts) as f64)
1660                .unwrap_or(0.0);
1661            (1.0
1662                + KAPPA
1663                    * (-(gap_days / WINDOW_DAYS) * std::f64::consts::LN_2).exp())
1664            .min(1.5)
1665        } else {
1666            1.0
1667        };
1668        let alpha = (0.2 * strength * recency_weight).clamp(0.0, 1.0);
1669        self.storage.upsert_confidence_evidence(
1670            &gen_uuid(),
1671            Some(trace_id),
1672            chunk_id,
1673            kind,
1674            target,
1675            alpha,
1676            reason,
1677            context_key,
1678            now,
1679        )?;
1680        self.recompute_chunk_confidence(chunk_id, now)
1681    }
1682
1683    fn recompute_chunk_confidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1684        let Some(chunk) = self.storage.get_chunk(chunk_id)? else {
1685            return Ok(());
1686        };
1687        let mut confidence = chunk
1688            .get("confidence_base")
1689            .and_then(Value::as_f64)
1690            .unwrap_or_else(|| {
1691                chunk
1692                    .get("confidence")
1693                    .and_then(Value::as_f64)
1694                    .unwrap_or(0.5)
1695            });
1696        let mut reason = chunk
1697            .get("confidence_reason")
1698            .and_then(Value::as_str)
1699            .unwrap_or("base")
1700            .to_string();
1701        for evidence in self.storage.confidence_evidence_for_chunk(chunk_id)? {
1702            let target = evidence
1703                .get("target")
1704                .and_then(Value::as_f64)
1705                .unwrap_or(0.5);
1706            let alpha = evidence
1707                .get("alpha")
1708                .and_then(Value::as_f64)
1709                .unwrap_or(0.0)
1710                .clamp(0.0, 1.0);
1711            confidence = (confidence + alpha * (target - confidence)).clamp(0.0, 1.0);
1712            reason = evidence
1713                .get("reason")
1714                .and_then(Value::as_str)
1715                .unwrap_or("evidence")
1716                .to_string();
1717        }
1718        self.storage.conn_execute(
1719            "UPDATE chunks SET confidence=?, confidence_reason=?, updated_at=? WHERE id=?",
1720            rusqlite::params![confidence, reason, now, chunk_id],
1721        )
1722    }
1723
1724    #[allow(clippy::too_many_arguments)]
1725    fn replace_outcome_evidence(
1726        &self,
1727        trace_id: &str,
1728        outcome: &str,
1729        used: Option<&[String]>,
1730        used_complete: bool,
1731        now: &str,
1732    ) -> Result<()> {
1733        let old_rows = self.storage.query_chunks_params(
1734            "SELECT DISTINCT chunk_id FROM confidence_evidence
1735             WHERE trace_id=? AND kind IN ('outcome_ok','outcome_fail','selected_unused')",
1736            rusqlite::params![trace_id],
1737        )?;
1738        let mut affected: HashSet<String> = old_rows
1739            .iter()
1740            .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1741            .collect();
1742        self.storage.delete_trace_confidence_evidence(
1743            trace_id,
1744            &["outcome_ok", "outcome_fail", "selected_unused"],
1745        )?;
1746
1747        let used_ids = used.unwrap_or_default();
1748        let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1749        let attribution_divisor = used_ids.len().max(1) as f64;
1750        let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1751            log.get("context_key")
1752                .and_then(Value::as_str)
1753                .map(str::to_string)
1754        });
1755        for chunk_id in used_ids {
1756            let attribution = self
1757                .storage
1758                .query_chunks_params(
1759                    "SELECT strength, attribution FROM usage_trace
1760                     WHERE trace_id=? AND chunk_id=? AND event='used'",
1761                    rusqlite::params![trace_id, chunk_id],
1762                )?
1763                .into_iter()
1764                .next();
1765            let base_strength = attribution
1766                .as_ref()
1767                .and_then(|row| row.get("strength"))
1768                .and_then(Value::as_f64)
1769                .unwrap_or(0.15)
1770                / attribution_divisor;
1771            let attribution_reason = attribution
1772                .as_ref()
1773                .and_then(|row| row.get("attribution"))
1774                .and_then(Value::as_str)
1775                .unwrap_or("inferred");
1776            let (kind, target, strength, reason) = if outcome == "ok" {
1777                ("outcome_ok", 1.0, base_strength, attribution_reason)
1778            } else {
1779                ("outcome_fail", 0.0, base_strength * 0.5, "task_fail")
1780            };
1781            self.upsert_trace_confidence_evidence(
1782                trace_id,
1783                chunk_id,
1784                kind,
1785                target,
1786                strength,
1787                reason,
1788                context_key.as_deref(),
1789                now,
1790                false,
1791            )?;
1792            affected.insert(chunk_id.clone());
1793        }
1794
1795        if used_complete {
1796            let selected_rows = self.storage.query_chunks_params(
1797                "SELECT chunk_id FROM usage_trace
1798                 WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1799                rusqlite::params![trace_id],
1800            )?;
1801            for row in selected_rows {
1802                if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1803                    if !used_set.contains(chunk_id) {
1804                        self.upsert_trace_confidence_evidence(
1805                            trace_id,
1806                            chunk_id,
1807                            "selected_unused",
1808                            0.0,
1809                            0.08,
1810                            "selected_unused",
1811                            context_key.as_deref(),
1812                            now,
1813                            false,
1814                        )?;
1815                        affected.insert(chunk_id.to_string());
1816                    }
1817                }
1818            }
1819        }
1820
1821        for chunk_id in affected {
1822            self.recompute_chunk_confidence(&chunk_id, now)?;
1823        }
1824        Ok(())
1825    }
1826
1827    fn rebuild_context_stats(&self, now: &str) -> Result<()> {
1828        self.storage.conn_execute(
1829            "DELETE FROM chunk_context_stats",
1830            rusqlite::params![],
1831        )?;
1832        self.storage.conn_execute(
1833            "INSERT INTO chunk_context_stats
1834             (chunk_id, context_key, success_count, failure_count,
1835              positive_feedback, negative_feedback, last_updated_at)
1836             SELECT chunk_id, context_key, success_count, failure_count,
1837                    positive_feedback, negative_feedback, ?
1838             FROM chunk_context_stats_base",
1839            rusqlite::params![now],
1840        )?;
1841        self.storage.conn_execute(
1842            "INSERT INTO chunk_context_stats
1843             (chunk_id, context_key, success_count, failure_count,
1844              positive_feedback, negative_feedback, last_updated_at)
1845             SELECT chunk_id, context_key,
1846                    SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1847                    SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1848                    0, 0, ?
1849             FROM confidence_evidence
1850             WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1851             GROUP BY chunk_id, context_key
1852             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1853               success_count=success_count+excluded.success_count,
1854               failure_count=failure_count+excluded.failure_count,
1855               last_updated_at=excluded.last_updated_at",
1856            rusqlite::params![now],
1857        )?;
1858        self.storage.conn_execute(
1859            "INSERT INTO chunk_context_stats
1860             (chunk_id, context_key, success_count, failure_count,
1861              positive_feedback, negative_feedback, last_updated_at)
1862             SELECT fe.chunk_id, fe.context_key, 0, 0,
1863                    SUM(CASE WHEN fe.signal='up' THEN 1 ELSE 0 END),
1864                    SUM(CASE WHEN fe.signal='down' THEN 1 ELSE 0 END), ?
1865             FROM feedback_events fe
1866             WHERE fe.context_key IS NOT NULL
1867               AND fe.ts > COALESCE((
1868                 SELECT c.evidence_cutoff_at FROM chunks c
1869                 WHERE c.id = fe.chunk_id
1870               ), '')
1871             GROUP BY fe.chunk_id, fe.context_key
1872             ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1873               positive_feedback=positive_feedback+excluded.positive_feedback,
1874               negative_feedback=negative_feedback+excluded.negative_feedback,
1875               last_updated_at=excluded.last_updated_at",
1876            rusqlite::params![now],
1877        )
1878    }
1879
1880    /// Targeted context_stats rebuild for only the specified chunk_ids.
1881    /// Used by record() to avoid a full-table rebuild on every call.
1882    /// curate() still calls the full rebuild_context_stats() for periodic accuracy.
1883    fn rebuild_context_stats_for(&self, chunk_ids: &HashSet<String>, now: &str) -> Result<()> {
1884        if chunk_ids.is_empty() {
1885            return Ok(());
1886        }
1887        for chunk_id in chunk_ids {
1888            self.storage.conn_execute(
1889                "DELETE FROM chunk_context_stats WHERE chunk_id=?",
1890                rusqlite::params![chunk_id],
1891            )?;
1892            self.storage.conn_execute(
1893                "INSERT OR IGNORE INTO chunk_context_stats
1894                 (chunk_id, context_key, success_count, failure_count,
1895                  positive_feedback, negative_feedback, last_updated_at)
1896                 SELECT chunk_id, context_key, success_count, failure_count,
1897                        positive_feedback, negative_feedback, ?
1898                 FROM chunk_context_stats_base WHERE chunk_id=?",
1899                rusqlite::params![now, chunk_id],
1900            )?;
1901            self.storage.conn_execute(
1902                "INSERT INTO chunk_context_stats
1903                 (chunk_id, context_key, success_count, failure_count,
1904                  positive_feedback, negative_feedback, last_updated_at)
1905                 SELECT chunk_id, context_key,
1906                        SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1907                        SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1908                        0, 0, ?
1909                 FROM confidence_evidence
1910                 WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1911                   AND chunk_id=?
1912                 GROUP BY chunk_id, context_key
1913                 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1914                   success_count=success_count+excluded.success_count,
1915                   failure_count=failure_count+excluded.failure_count,
1916                   last_updated_at=excluded.last_updated_at",
1917                rusqlite::params![now, chunk_id],
1918            )?;
1919            self.storage.conn_execute(
1920                "INSERT INTO chunk_context_stats
1921                 (chunk_id, context_key, success_count, failure_count,
1922                  positive_feedback, negative_feedback, last_updated_at)
1923                 SELECT chunk_id, context_key, 0, 0,
1924                        SUM(CASE WHEN signal='up' THEN 1 ELSE 0 END),
1925                        SUM(CASE WHEN signal='down' THEN 1 ELSE 0 END), ?
1926                 FROM feedback_events
1927                 WHERE context_key IS NOT NULL AND chunk_id=?
1928                   AND ts > COALESCE((
1929                     SELECT evidence_cutoff_at FROM chunks
1930                     WHERE id=?
1931                   ), '')
1932                 GROUP BY chunk_id, context_key
1933                 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1934                   positive_feedback=positive_feedback+excluded.positive_feedback,
1935                   negative_feedback=negative_feedback+excluded.negative_feedback,
1936                   last_updated_at=excluded.last_updated_at",
1937                rusqlite::params![now, chunk_id, chunk_id],
1938            )?;
1939        }
1940        Ok(())
1941    }
1942
1943    fn refresh_governance_evidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1944        let rows = self.storage.query_chunks_params(
1945            "SELECT actor AS actor_key,
1946                    signal, strength, ts
1947             FROM feedback_events
1948             WHERE chunk_id=? AND actor IS NOT NULL AND actor!=''
1949               AND ts > COALESCE((
1950                 SELECT evidence_cutoff_at FROM chunks
1951                 WHERE id=?
1952               ), '')",
1953            rusqlite::params![chunk_id, chunk_id],
1954        )?;
1955        let mut actor_contributions: HashMap<String, f64> = HashMap::new();
1956        for row in rows {
1957            let actor = row
1958                .get("actor_key")
1959                .and_then(Value::as_str)
1960                .unwrap_or("anonymous")
1961                .to_string();
1962            let age_days = row
1963                .get("ts")
1964                .and_then(Value::as_str)
1965                .map(|ts| iso_days_diff(now, ts).max(0) as f64)
1966                .unwrap_or(0.0);
1967            let recency_weight = 0.5_f64.powf(age_days / 90.0);
1968            let strength = row.get("strength").and_then(Value::as_f64).unwrap_or(0.0);
1969            let signed = if row.get("signal").and_then(Value::as_str) == Some("down") {
1970                strength
1971            } else {
1972                -strength
1973            };
1974            *actor_contributions.entry(actor).or_default() += signed * recency_weight;
1975        }
1976        let mut score = 0.0_f64;
1977        let mut actor_count = 0_i64;
1978        for contribution in actor_contributions.values().copied() {
1979            let contribution = contribution.clamp(-1.0, 1.0);
1980            score += contribution;
1981            if contribution > 0.0 {
1982                actor_count += 1;
1983            }
1984        }
1985        let score = score.max(0.0);
1986        if score >= 2.0 && actor_count >= 2 {
1987            self.storage.upsert_governance_proposal(
1988                &gen_uuid(),
1989                chunk_id,
1990                "review_applicability",
1991                "Weighted negative feedback",
1992                score.ceil() as i64,
1993                score,
1994                actor_count,
1995                now,
1996            )?;
1997        } else {
1998            self.storage.conn_execute(
1999                "UPDATE governance_proposals
2000                 SET state='rejected', evidence_count=?, evidence_score=?, actor_count=?, updated_at=?
2001                 WHERE chunk_id=? AND state='pending'",
2002                rusqlite::params![score.ceil() as i64, score, actor_count, now, chunk_id],
2003            )?;
2004        }
2005        Ok(())
2006    }
2007
2008    fn enqueue_evolve_if_needed(&self, now: &str) -> Result<()> {
2009        let ready = count_query(
2010            &self.storage,
2011            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2012        )?;
2013        let oldest = self
2014            .storage
2015            .query_chunks("SELECT MIN(ts) AS oldest FROM episodic_log WHERE distill_state='new'")?
2016            .first()
2017            .and_then(|row| row.get("oldest"))
2018            .and_then(Value::as_str)
2019            .map(str::to_string);
2020        let age_due = oldest
2021            .as_deref()
2022            .is_some_and(|ts| ts <= hours_ago(now, self.evolve_schedule_interval_hours).as_str());
2023        let governance_pending = count_query(
2024            &self.storage,
2025            "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
2026        )?;
2027        // A chunk whose proposal already has enough evidence should trigger evolve
2028        // immediately — don't wait for 3 different pending proposals.
2029        let governance_ready = count_query_params(
2030            &self.storage,
2031            "SELECT COUNT(*) FROM governance_proposals
2032             WHERE state='pending'
2033               AND evidence_score >= ? AND actor_count >= 2",
2034            rusqlite::params![self.governance_archive_threshold as f64],
2035        )?;
2036        if ready >= self.evolve_threshold
2037            || (ready > 0 && age_due)
2038            || governance_pending >= self.governance_evolve_threshold
2039            || governance_ready > 0
2040        {
2041            let reason = if ready >= self.evolve_threshold {
2042                "threshold"
2043            } else if governance_ready > 0 {
2044                "governance_ready"
2045            } else if governance_pending >= self.governance_evolve_threshold {
2046                "governance"
2047            } else {
2048                "scheduled"
2049            };
2050            self.storage.request_evolve(&gen_uuid(), reason, now)?;
2051        }
2052        Ok(())
2053    }
2054
2055    // ------------------------------------------------------------------
2056    // Public API 3: add
2057    // ------------------------------------------------------------------
2058
2059    pub fn add(
2060        &self,
2061        content: &str,
2062        kind: &str,
2063        trigger_desc: Option<&str>,
2064        anti_trigger_desc: Option<&str>,
2065        source: &str,
2066        skill_name: Option<&str>,
2067    ) -> Result<String> {
2068        if !matches!(kind, "note" | "skill") {
2069            return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
2070        }
2071        if !matches!(source, "chat" | "manual" | "doc" | "agent") {
2072            return Err(InnateError::InvalidState(format!(
2073                "invalid source: {source}"
2074            )));
2075        }
2076
2077        let (content, action) = self.sanitize_content(content);
2078        if action == SanitizeAction::Discard {
2079            return Ok(String::new());
2080        }
2081
2082        let trigger_clean = trigger_desc.and_then(|t| {
2083            let (cleaned, act) = self.sanitizer.sanitize(t);
2084            if act == SanitizeAction::Discard {
2085                None
2086            } else {
2087                Some(cleaned)
2088            }
2089        });
2090        let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2091            let (cleaned, act) = self.sanitizer.sanitize(t);
2092            if act == SanitizeAction::Discard {
2093                None
2094            } else {
2095                Some(cleaned)
2096            }
2097        });
2098
2099        let h = content_hash(&content);
2100        if self.storage.is_hash_invalidated(&h)? {
2101            return Err(InnateError::InvalidState(
2102                "content hash is invalidated".into(),
2103            ));
2104        }
2105
2106        // Idempotency check
2107        let existing = self.storage.query_chunks_params(
2108            "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2109            rusqlite::params![h],
2110        )?;
2111        if let Some(e) = existing.first() {
2112            if let Some(id) = e.get("id").and_then(Value::as_str) {
2113                return Ok(id.to_string());
2114            }
2115        }
2116
2117        let now = utc_now_iso();
2118        let chunk_id = gen_uuid();
2119        let redacted = action == SanitizeAction::Redact;
2120
2121        let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
2122            (
2123                "captured",
2124                "pending",
2125                if redacted { 0.4 } else { 0.60 },
2126                0,
2127                "init:captured_agent",
2128            )
2129        } else if kind == "skill" {
2130            (
2131                "installed",
2132                "active",
2133                if redacted { 0.4 } else { 0.85 },
2134                1,
2135                "init:installed",
2136            )
2137        } else {
2138            (
2139                "captured",
2140                "active",
2141                if redacted { 0.4 } else { 0.60 },
2142                0,
2143                "init:captured",
2144            )
2145        };
2146
2147        // Embedding — fall back to embedding_pending on failure.
2148        let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2149        let (cvec, tvec, embed_ver, final_state_reason) = match (
2150            self.embedding.embed_content(&content),
2151            self.embedding.embed_trigger(trigger_str),
2152        ) {
2153            (Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
2154            _ => (
2155                vec![],
2156                vec![],
2157                0i64,
2158                format!("embedding_pending:target={state}"),
2159            ),
2160        };
2161
2162        let tokens = estimate_tokens(&content) as i64;
2163        let row = ChunkRow {
2164            id: chunk_id.clone(),
2165            skill_name: skill_name.map(str::to_string),
2166            content: content.clone(),
2167            trigger_desc: trigger_clean.clone(),
2168            anti_trigger_desc: anti_trigger_clean.clone(),
2169            content_hash: h,
2170            token_count: Some(tokens),
2171            origin: origin.to_string(),
2172            source: Some(source.to_string()),
2173            protected: prot,
2174            state: state.to_string(),
2175            state_reason: Some(final_state_reason),
2176            confidence: conf,
2177            confidence_reason: Some(format!("init:{origin}")),
2178            version: 1,
2179            embed_version: embed_ver,
2180            created_at: now.clone(),
2181            updated_at: now.clone(),
2182            ..Default::default()
2183        };
2184
2185        self.storage.begin_immediate()?;
2186        let result = (|| -> Result<()> {
2187            self.storage.insert_chunk(&row)?;
2188            if embed_ver > 0 {
2189                self.storage
2190                    .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2191                self.storage
2192                    .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2193            }
2194            self.storage.commit()
2195        })();
2196        if result.is_err() {
2197            let _ = self.storage.rollback();
2198        }
2199        result?;
2200        Ok(chunk_id)
2201    }
2202
2203    // ------------------------------------------------------------------
2204    // Public API 4: spark
2205    // ------------------------------------------------------------------
2206
2207    pub fn spark(
2208        &self,
2209        content: &str,
2210        trigger_desc: Option<&str>,
2211        anti_trigger_desc: Option<&str>,
2212    ) -> Result<String> {
2213        let (content, action) = self.sanitize_content(content);
2214        if action == SanitizeAction::Discard {
2215            return Ok(String::new());
2216        }
2217
2218        let trigger_clean = trigger_desc.and_then(|t| {
2219            let (cleaned, act) = self.sanitizer.sanitize(t);
2220            if act == SanitizeAction::Discard {
2221                None
2222            } else {
2223                Some(cleaned)
2224            }
2225        });
2226        let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2227            let (cleaned, act) = self.sanitizer.sanitize(t);
2228            if act == SanitizeAction::Discard {
2229                None
2230            } else {
2231                Some(cleaned)
2232            }
2233        });
2234
2235        let h = content_hash(&content);
2236        if self.storage.is_hash_invalidated(&h)? {
2237            return Err(InnateError::InvalidState(
2238                "content hash is invalidated".into(),
2239            ));
2240        }
2241
2242        // Quick related recall (trace=false, no recursion risk)
2243        let related: Vec<String> = self
2244            .recall(
2245                &content,
2246                2000,
2247                false,
2248                false,
2249                Some(5),
2250                "sdk",
2251                "false",
2252                false,
2253                "off",
2254            )
2255            .map(|r| {
2256                r.knowledge
2257                    .iter()
2258                    .filter_map(|c| c["id"].as_str().map(str::to_string))
2259                    .collect()
2260            })
2261            .unwrap_or_default();
2262
2263        let now = utc_now_iso();
2264        let chunk_id = gen_uuid();
2265        let tokens = estimate_tokens(&content) as i64;
2266
2267        let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2268        let (cvec, tvec, embed_ver, state_reason) = match (
2269            self.embedding.embed_content(&content),
2270            self.embedding.embed_trigger(trigger_str),
2271        ) {
2272            (Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
2273            _ => (
2274                vec![],
2275                vec![],
2276                0i64,
2277                "embedding_pending:target=active".to_string(),
2278            ),
2279        };
2280
2281        let row = ChunkRow {
2282            id: chunk_id.clone(),
2283            content: content.clone(),
2284            trigger_desc: trigger_clean.clone(),
2285            anti_trigger_desc: anti_trigger_clean.clone(),
2286            content_hash: h,
2287            token_count: Some(tokens),
2288            origin: "spark".to_string(),
2289            maturity: Some("seed".to_string()),
2290            related_ids: if related.is_empty() {
2291                None
2292            } else {
2293                Some(related.join(","))
2294            },
2295            state: "active".to_string(),
2296            state_reason: Some(state_reason),
2297            confidence: 0.5,
2298            version: 1,
2299            embed_version: embed_ver,
2300            created_at: now.clone(),
2301            updated_at: now.clone(),
2302            ..Default::default()
2303        };
2304
2305        self.storage.begin_immediate()?;
2306        let result = (|| -> Result<()> {
2307            self.storage.insert_chunk(&row)?;
2308            if embed_ver > 0 {
2309                self.storage
2310                    .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2311                self.storage
2312                    .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2313            }
2314            self.storage.commit()
2315        })();
2316        if result.is_err() {
2317            let _ = self.storage.rollback();
2318        }
2319        result?;
2320        Ok(chunk_id)
2321    }
2322
2323    // ------------------------------------------------------------------
2324    // Public API 5: mature_spark / promote_spark / drop_spark
2325    // ------------------------------------------------------------------
2326
2327    pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
2328        let chunk = self
2329            .storage
2330            .get_chunk(spark_id)?
2331            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2332        if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
2333            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2334        }
2335        let current = chunk
2336            .get("maturity")
2337            .and_then(Value::as_str)
2338            .unwrap_or("seed");
2339        let valid_next: &[&str] = match current {
2340            "seed" => &["sprouting"],
2341            "sprouting" => &["incubating"],
2342            _ => {
2343                return Err(InnateError::InvalidState(format!(
2344                    "spark {spark_id} already {current}"
2345                )))
2346            }
2347        };
2348        if current == to {
2349            return Ok(());
2350        }
2351        if !valid_next.contains(&to) {
2352            return Err(InnateError::InvalidState(format!(
2353                "invalid spark maturity transition: {current} -> {to}"
2354            )));
2355        }
2356        let now = utc_now_iso();
2357        self.storage.begin_immediate()?;
2358        let result = self
2359            .storage
2360            .query_chunks_params(
2361                "UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
2362                rusqlite::params![to, now, spark_id],
2363            )
2364            .and_then(|_| self.storage.commit());
2365        if result.is_err() {
2366            let _ = self.storage.rollback();
2367        }
2368        result.map(|_| ())
2369    }
2370
2371    pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
2372        let spark = self
2373            .storage
2374            .get_chunk(spark_id)?
2375            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2376        if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2377            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2378        }
2379        let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2380        if maturity == "promoted" || maturity == "dropped" {
2381            return Err(InnateError::InvalidState(format!(
2382                "spark {spark_id} already {maturity}"
2383            )));
2384        }
2385        if !matches!(to, "note" | "skill") {
2386            return Err(InnateError::InvalidState(format!(
2387                "invalid spark promotion target: {to}"
2388            )));
2389        }
2390
2391        let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
2392        let (content, action) = self.sanitize_content(content);
2393        if action == SanitizeAction::Discard {
2394            return Err(InnateError::InvalidState(
2395                "sanitize discard on promote".into(),
2396            ));
2397        }
2398
2399        let promoted_hash = content_hash(&content);
2400        if self.storage.is_hash_invalidated(&promoted_hash)? {
2401            return Err(InnateError::InvalidState(
2402                "spark content hash is invalidated".into(),
2403            ));
2404        }
2405
2406        let now = utc_now_iso();
2407
2408        // Idempotency: existing non-spark chunk with same hash
2409        let existing = self.storage.query_chunks_params(
2410            "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2411            rusqlite::params![promoted_hash],
2412        )?;
2413        if let Some(e) = existing.first() {
2414            if let Some(id) = e.get("id").and_then(Value::as_str) {
2415                let id = id.to_string();
2416                self.storage.begin_immediate()?;
2417                let result = self
2418                    .storage
2419                    .query_chunks_params(
2420                        "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2421                        rusqlite::params![now, spark_id],
2422                    )
2423                    .and_then(|_| self.storage.commit());
2424                if result.is_err() {
2425                    let _ = self.storage.rollback();
2426                    result?;
2427                }
2428                return Ok(id);
2429            }
2430        }
2431
2432        let (state, conf, prot, origin, state_reason) = if to == "skill" {
2433            ("active", 0.85, 1, "installed", "init:installed")
2434        } else {
2435            ("active", 0.60, 0, "captured", "init:captured")
2436        };
2437
2438        let conf = if action == SanitizeAction::Redact {
2439            0.4_f64
2440        } else {
2441            conf
2442        };
2443        let new_id = gen_uuid();
2444        let trigger = spark.get("trigger_desc").and_then(Value::as_str);
2445        let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
2446
2447        let row = ChunkRow {
2448            id: new_id.clone(),
2449            content: content.clone(),
2450            trigger_desc: trigger.map(str::to_string),
2451            anti_trigger_desc: anti.map(str::to_string),
2452            content_hash: promoted_hash,
2453            token_count: Some(estimate_tokens(&content) as i64),
2454            origin: origin.to_string(),
2455            source: Some("manual".to_string()),
2456            protected: prot,
2457            state: state.to_string(),
2458            state_reason: Some(state_reason.to_string()),
2459            confidence: conf,
2460            confidence_reason: Some("manual_set".to_string()),
2461            parent_id: Some(spark_id.to_string()),
2462            version: 1,
2463            embed_version: 1,
2464            created_at: now.clone(),
2465            updated_at: now.clone(),
2466            ..Default::default()
2467        };
2468
2469        let cvec = self.embedding.embed_content(&content)?;
2470        let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
2471
2472        self.storage.begin_immediate()?;
2473        let result = (|| -> Result<()> {
2474            self.storage.insert_chunk(&row)?;
2475            self.storage
2476                .insert_vec_content(&new_id, &pack_embedding(&cvec))?;
2477            self.storage
2478                .insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
2479            self.storage.query_chunks_params(
2480                "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2481                rusqlite::params![now, spark_id],
2482            )?;
2483            self.storage.commit()
2484        })();
2485        if result.is_err() {
2486            let _ = self.storage.rollback();
2487        }
2488        result?;
2489        Ok(new_id)
2490    }
2491
2492    pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
2493        let spark = self
2494            .storage
2495            .get_chunk(spark_id)?
2496            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2497        if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2498            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2499        }
2500        let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2501        if maturity == "promoted" {
2502            return Err(InnateError::InvalidState(format!(
2503                "spark {spark_id} already promoted"
2504            )));
2505        }
2506        if maturity == "dropped" {
2507            return Ok(());
2508        }
2509        let now = utc_now_iso();
2510        let reason_str = if reason.is_empty() {
2511            "dropped".to_string()
2512        } else {
2513            format!("dropped:{reason}")
2514        };
2515        self.storage.begin_immediate()?;
2516        let result = self
2517            .storage
2518            .query_chunks_params(
2519                "UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
2520                rusqlite::params![reason_str, now, spark_id],
2521            )
2522            .and_then(|_| self.storage.commit());
2523        if result.is_err() {
2524            let _ = self.storage.rollback();
2525        }
2526        result.map(|_| ())
2527    }
2528
2529    // ------------------------------------------------------------------
2530    // Public API 6: approve / archive / invalidate / restore
2531    // ------------------------------------------------------------------
2532
2533    pub fn approve(&self, chunk_id: &str) -> Result<()> {
2534        let chunk = self
2535            .storage
2536            .get_chunk(chunk_id)?
2537            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2538        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2539            return Err(InnateError::InvalidState(
2540                "spark lifecycle uses promote_spark() or invalidate()".into(),
2541            ));
2542        }
2543        if chunk.get("state").and_then(Value::as_str) == Some("active") {
2544            return Ok(());
2545        }
2546        if chunk.get("state").and_then(Value::as_str) != Some("pending") {
2547            return Err(InnateError::InvalidState(
2548                "approve requires pending chunk".into(),
2549            ));
2550        }
2551        let now = utc_now_iso();
2552        self.storage.begin_immediate()?;
2553        let result = (|| -> Result<()> {
2554            self.storage
2555                .update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
2556            self.storage.query_chunks_params(
2557                "UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
2558                rusqlite::params![now, chunk_id],
2559            )?;
2560            self.storage.commit()
2561        })();
2562        if result.is_err() {
2563            let _ = self.storage.rollback();
2564        }
2565        result
2566    }
2567
2568    pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
2569        let chunk = self
2570            .storage
2571            .get_chunk(chunk_id)?
2572            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2573        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2574            return Err(InnateError::InvalidState(
2575                "spark lifecycle uses drop_spark() or invalidate()".into(),
2576            ));
2577        }
2578        let now = utc_now_iso();
2579        self.storage.begin_immediate()?;
2580        let result = self
2581            .storage
2582            .update_chunk_state(chunk_id, "archived", Some(reason), &now)
2583            .and_then(|_| self.storage.commit());
2584        if result.is_err() {
2585            let _ = self.storage.rollback();
2586        }
2587        result
2588    }
2589
2590    pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
2591        let chunk = self
2592            .storage
2593            .get_chunk(chunk_id)?
2594            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2595        let h = chunk
2596            .get("content_hash")
2597            .and_then(Value::as_str)
2598            .unwrap_or("")
2599            .to_string();
2600        let now = utc_now_iso();
2601        let reason_str = if reason.is_empty() {
2602            "invalidated".to_string()
2603        } else {
2604            format!("invalidated:{reason}")
2605        };
2606
2607        self.storage.begin_immediate()?;
2608        let result = (|| -> Result<()> {
2609            self.storage.query_chunks_params(
2610                "UPDATE chunks
2611                 SET state='archived', confidence=0.0, confidence_base=0.0,
2612                     confidence_reason='invalidated', state_reason=?,
2613                     state_updated_at=?, updated_at=?
2614                 WHERE id=?",
2615                rusqlite::params![reason_str, now, now, chunk_id],
2616            )?;
2617            self.storage.query_chunks_params(
2618                "UPDATE chunks
2619                 SET state='archived', confidence=0.0, confidence_base=0.0,
2620                     confidence_reason='invalidated',
2621                     state_reason='invalidated:same_hash',
2622                     state_updated_at=?, updated_at=?
2623                 WHERE content_hash=? AND id!=?",
2624                rusqlite::params![now, now, h, chunk_id],
2625            )?;
2626            self.storage.conn_execute(
2627                "DELETE FROM confidence_evidence
2628                 WHERE chunk_id IN (SELECT id FROM chunks WHERE content_hash=?)",
2629                rusqlite::params![h],
2630            )?;
2631            self.storage
2632                .insert_invalidated_hash(&h, Some(reason), &now)?;
2633            self.storage.commit()
2634        })();
2635        if result.is_err() {
2636            let _ = self.storage.rollback();
2637        }
2638        result
2639    }
2640
2641    pub fn restore(&self, chunk_id: &str) -> Result<()> {
2642        let chunk = self
2643            .storage
2644            .get_chunk(chunk_id)?
2645            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2646        let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
2647        if state == "active" {
2648            return Ok(());
2649        }
2650        if state != "archived" {
2651            return Err(InnateError::InvalidState(
2652                "restore requires archived chunk".into(),
2653            ));
2654        }
2655        let was_invalidated = chunk
2656            .get("state_reason")
2657            .and_then(Value::as_str)
2658            .map(|r| r.starts_with("invalidated"))
2659            .unwrap_or(false);
2660        let h = chunk
2661            .get("content_hash")
2662            .and_then(Value::as_str)
2663            .unwrap_or("")
2664            .to_string();
2665        let now = utc_now_iso();
2666
2667        self.storage.begin_immediate()?;
2668        let result = (|| -> Result<()> {
2669            self.storage
2670                .update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
2671            if was_invalidated {
2672                self.storage.query_chunks_params(
2673                    "DELETE FROM invalidated_hashes WHERE content_hash=?",
2674                    rusqlite::params![h],
2675                )?;
2676            }
2677            self.storage.query_chunks_params(
2678                "UPDATE chunks
2679                 SET confidence_base=0.5, confidence=0.5,
2680                     confidence_reason='restore',
2681                     selected_count=0, selected_count_base=0,
2682                     used_count=0, used_count_base=0,
2683                     used_success_count=0, used_success_count_base=0,
2684                     success_trace_ids_count=0,
2685                     last_used_at=NULL, last_used_base=NULL,
2686                     last_success_at=NULL, last_decayed_at=NULL,
2687                     evidence_cutoff_at=?, updated_at=?
2688                 WHERE id=?",
2689                rusqlite::params![now, now, chunk_id],
2690            )?;
2691            self.storage.conn_execute(
2692                "DELETE FROM confidence_evidence WHERE chunk_id=?",
2693                rusqlite::params![chunk_id],
2694            )?;
2695            self.storage.conn_execute(
2696                "DELETE FROM chunk_success_traces WHERE chunk_id=?",
2697                rusqlite::params![chunk_id],
2698            )?;
2699            self.storage.conn_execute(
2700                "DELETE FROM chunk_context_stats_base WHERE chunk_id=?",
2701                rusqlite::params![chunk_id],
2702            )?;
2703            self.storage.conn_execute(
2704                "DELETE FROM chunk_context_stats WHERE chunk_id=?",
2705                rusqlite::params![chunk_id],
2706            )?;
2707            self.storage.conn_execute(
2708                "UPDATE governance_proposals
2709                 SET state='rejected', reason=reason || '; restored by user', updated_at=?
2710                 WHERE chunk_id=? AND state IN ('pending','accepted')",
2711                rusqlite::params![now, chunk_id],
2712            )?;
2713            self.storage.commit()
2714        })();
2715        if result.is_err() {
2716            let _ = self.storage.rollback();
2717        }
2718        result
2719    }
2720
2721    // ------------------------------------------------------------------
2722    // Public API 7: evolve
2723    // ------------------------------------------------------------------
2724
2725    pub fn evolve(&self, trigger: &str) -> Result<Value> {
2726        if !matches!(trigger, "manual" | "scheduled" | "threshold") {
2727            return Err(InnateError::InvalidState(format!(
2728                "invalid evolve trigger: {trigger}"
2729            )));
2730        }
2731        let evolve_started_at = utc_now_iso();
2732        let retry_cutoff = minutes_ago(&evolve_started_at, 5);
2733        let recovered_failed = self.storage.conn_execute_count(
2734            "UPDATE episodic_log
2735             SET distill_state='new', distill_note='retry_failed',
2736                 distill_locked_at=NULL, distill_run_id=NULL
2737             WHERE distill_state='failed'
2738               AND distill_attempts < 3
2739               AND COALESCE(distill_last_failed_at, distill_accounted_at, ts) < ?",
2740            rusqlite::params![retry_cutoff],
2741        )?;
2742        if recovered_failed > 0 {
2743            self.storage.request_evolve(
2744                &gen_uuid(),
2745                "distill_retry",
2746                &evolve_started_at,
2747            )?;
2748        }
2749        if trigger == "scheduled" {
2750            let age_cutoff = hours_ago(
2751                &evolve_started_at,
2752                self.evolve_schedule_interval_hours,
2753            );
2754            let aged_new = count_query_params(
2755                &self.storage,
2756                "SELECT COUNT(*) FROM episodic_log
2757                 WHERE distill_state='new' AND ts <= ?",
2758                rusqlite::params![age_cutoff],
2759            )?;
2760            if aged_new > 0 {
2761                self.storage
2762                    .request_evolve(&gen_uuid(), "scheduled", &evolve_started_at)?;
2763            }
2764        }
2765        let request = self.storage.claim_evolve_request_with_reason(
2766            &evolve_started_at,
2767            &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
2768        )?;
2769        let request_id = request.as_ref().map(|claim| claim.id.as_str());
2770        let request_reason = request.as_ref().map(|claim| claim.reason.as_str());
2771        // Issue 7: scheduled without pending request → still run curate (time-based maintenance).
2772        if trigger == "scheduled" && request_id.is_none() {
2773            let curator = Arc::clone(&self.curator);
2774            let curate = curator.run(self, &CurateScope::default())?;
2775            return Ok(json!({
2776                "distilled": 0,
2777                "curate": self.format_curate_report(&curate),
2778                "skipped": "no_evolve_request"
2779            }));
2780        }
2781
2782        // Issue 8: threshold / token-limit gates should not suppress curate.
2783        if trigger == "threshold" {
2784            let rows = self.storage.query_chunks(
2785                "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
2786            )?;
2787            let cnt = rows
2788                .first()
2789                .and_then(|r| r.get("cnt"))
2790                .and_then(Value::as_i64)
2791                .unwrap_or(0);
2792            if cnt < self.evolve_threshold {
2793                let curator = Arc::clone(&self.curator);
2794                let curate = curator.run(self, &CurateScope::default())?;
2795                if let Some(id) = request_id {
2796                    if matches!(request_reason, Some("governance" | "governance_ready")) {
2797                        self.storage.finish_evolve_request(
2798                            id,
2799                            "completed",
2800                            Some("curate_only"),
2801                            &utc_now_iso(),
2802                        )?;
2803                    } else {
2804                        self.storage.defer_evolve_request(
2805                            id,
2806                            "below_threshold",
2807                            &hours_after(
2808                                &evolve_started_at,
2809                                self.evolve_schedule_interval_hours.max(1),
2810                            ),
2811                        )?;
2812                    }
2813                }
2814                return Ok(json!({
2815                    "distilled": 0,
2816                    "curate": self.format_curate_report(&curate),
2817                    "skipped": "below_threshold"
2818                }));
2819            }
2820
2821        }
2822
2823        if trigger != "manual" {
2824            if let Some(limit) = self
2825                .storage
2826                .get_meta("max_distill_tokens_per_period")?
2827                .and_then(|value| value.parse::<i64>().ok())
2828                .filter(|value| *value > 0)
2829            {
2830                let period_start = self.distill_token_period_start(&evolve_started_at)?;
2831                let rows = self.storage.query_chunks_params(
2832                    "SELECT COALESCE(SUM(prompt_tokens + completion_tokens),0) AS used
2833                     FROM distill_token_usage
2834                     WHERE accounted_at >= ?",
2835                    rusqlite::params![period_start],
2836                )?;
2837                let used_tokens = rows
2838                    .first()
2839                    .and_then(|row| row.get("used"))
2840                    .and_then(Value::as_i64)
2841                    .unwrap_or(0);
2842                if used_tokens >= limit {
2843                    let curator = Arc::clone(&self.curator);
2844                    let curate = curator.run(self, &CurateScope::default())?;
2845                    if let Some(id) = request_id {
2846                        if matches!(request_reason, Some("governance" | "governance_ready")) {
2847                            self.storage.finish_evolve_request(
2848                                id,
2849                                "completed",
2850                                Some("curate_only"),
2851                                &utc_now_iso(),
2852                            )?;
2853                        } else {
2854                            self.storage.defer_evolve_request(
2855                                id,
2856                                "distill_token_limit",
2857                                &hours_after(&evolve_started_at, 1),
2858                            )?;
2859                        }
2860                    }
2861                    return Ok(json!({
2862                        "distilled": 0,
2863                        "curate": self.format_curate_report(&curate),
2864                        "skipped": "distill_token_limit",
2865                        "distill_tokens_used": used_tokens,
2866                        "distill_token_limit": limit,
2867                        "period_start": period_start,
2868                    }));
2869                }
2870            }
2871        }
2872
2873        let result = (|| -> Result<Value> {
2874            let distill = self.distill_batch()?;
2875            let curator = Arc::clone(&self.curator);
2876            let curate = curator.run(self, &CurateScope::default())?;
2877            Ok(json!({
2878                "distilled": distill.distilled,
2879                "distill_failed": distill.failed,
2880                "curate": self.format_curate_report(&curate),
2881            }))
2882        })();
2883        if let Some(id) = request_id {
2884            let (state, note) = match &result {
2885                Ok(_) => ("completed", None),
2886                Err(error) => ("failed", Some(error.to_string())),
2887            };
2888            self.storage
2889                .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
2890        }
2891        if result.is_ok() {
2892            self.storage
2893                .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
2894        }
2895
2896        let failed_remaining = count_query(
2897            &self.storage,
2898            "SELECT COUNT(*) FROM episodic_log
2899             WHERE distill_state='failed' AND distill_attempts < 3",
2900        )?;
2901        if failed_remaining > 0 {
2902            self.storage.request_evolve_at(
2903                &gen_uuid(),
2904                "distill_retry",
2905                &utc_now_iso(),
2906                Some(&minutes_after(&utc_now_iso(), 5)),
2907            )?;
2908        }
2909
2910        // Issue 9: if more 'new' logs remain after this batch, self-queue so the next evolve drains them.
2911        if result.is_ok() {
2912            let remaining = count_query(
2913                &self.storage,
2914                "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2915            )?;
2916            if remaining > 0 {
2917                let _ = self.storage.request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
2918            }
2919        }
2920        result
2921    }
2922
2923    fn format_curate_report(&self, curate: &CurateReport) -> Value {
2924        json!({
2925            "archived": curate.archived.len(),
2926            "deduped": curate.deduped.len(),
2927            "decayed": curate.decayed.len(),
2928            "recovered": curate.recovered.len(),
2929            "orphans": curate.orphans.len(),
2930            "warnings": curate.warnings,
2931        })
2932    }
2933
2934    fn distill_batch(&self) -> Result<DistillBatchReport> {
2935        let run_id = gen_uuid();
2936        let now = utc_now_iso();
2937
2938        // Atomically claim a batch of 'new' logs → mark 'screening'.
2939        self.storage.begin_immediate()?;
2940        let logs = match self
2941            .storage
2942            .claim_distill_batch(&run_id, self.distill_batch_size, &now)
2943        {
2944            Ok(l) => {
2945                self.storage.commit()?;
2946                l
2947            }
2948            Err(e) => {
2949                let _ = self.storage.rollback();
2950                return Err(e);
2951            }
2952        };
2953
2954        let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
2955        let mut failed_logs = HashSet::new();
2956        let mut distill_errors = Vec::new();
2957        for log in &logs {
2958            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
2959            let context_key = log.get("context_key").and_then(Value::as_str);
2960            let related_logs: Vec<Value> = logs
2961                .iter()
2962                .filter(|other| {
2963                    other.get("id").and_then(Value::as_str) == Some(log_id)
2964                        || (context_key.is_some()
2965                            && other.get("context_key").and_then(Value::as_str) == context_key)
2966                })
2967                .cloned()
2968                .collect();
2969            match self.distiller.distill_with_context(log, &related_logs) {
2970                Ok(chunks) => {
2971                    if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
2972                        let error = "distiller returned a chunk for an unknown source log";
2973                        failed_logs.insert(log_id.to_string());
2974                        distill_errors.push(format!("{log_id}: {error}"));
2975                        self.finish_distill_log(
2976                            log_id,
2977                            "failed",
2978                            Some(&format!("distill_failed:{error}")),
2979                            estimate_distill_prompt_tokens(log, &related_logs),
2980                            0,
2981                        )?;
2982                        continue;
2983                    }
2984                    chunks_by_log.insert(log_id.to_string(), chunks);
2985                }
2986                Err(error) => {
2987                    let note = format!("distill_failed:{error}");
2988                    failed_logs.insert(log_id.to_string());
2989                    distill_errors.push(format!("{log_id}: {error}"));
2990                    self.finish_distill_log(
2991                        log_id,
2992                        "failed",
2993                        Some(&note),
2994                        estimate_distill_prompt_tokens(log, &related_logs),
2995                        0,
2996                    )?;
2997                }
2998            }
2999        }
3000
3001        let mut count = 0;
3002        let provenance = self.distiller.provenance();
3003        for log in &logs {
3004            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
3005            if failed_logs.contains(log_id) {
3006                continue;
3007            }
3008            let context_key = log.get("context_key").and_then(Value::as_str);
3009            let related_logs: Vec<Value> = logs
3010                .iter()
3011                .filter(|other| {
3012                    other.get("id").and_then(Value::as_str) == Some(log_id)
3013                        || (context_key.is_some()
3014                            && other.get("context_key").and_then(Value::as_str) == context_key)
3015                })
3016                .cloned()
3017                .collect();
3018            let prompt_tokens = estimate_distill_prompt_tokens(log, &related_logs);
3019            let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
3020            let completion_tokens = chunks
3021                .iter()
3022                .map(estimate_distilled_chunk_tokens)
3023                .sum::<i64>();
3024            if chunks.is_empty() {
3025                self.finish_distill_log(
3026                    log_id,
3027                    "discarded",
3028                    Some("insufficient_material"),
3029                    prompt_tokens,
3030                    completion_tokens,
3031                )?;
3032                continue;
3033            }
3034
3035            // Prepare all chunks + embeddings outside the write transaction so
3036            // slow embedding calls do not hold an exclusive SQLite lock.
3037            // Supports N >= 1 chunks per log (e.g. multi-concept LLM distillation).
3038            // Bad individual chunks are skipped; valid siblings still survive.
3039            struct PreparedChunk {
3040                row: ChunkRow,
3041                cvec_bytes: Vec<u8>,
3042                tvec_bytes: Vec<u8>,
3043            }
3044            let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
3045            let mut embedding_failures = 0_usize;
3046            for dc in chunks {
3047                let (content, action) = self.sanitize_content(&dc.content);
3048                if action == SanitizeAction::Discard {
3049                    continue; // skip this chunk, try others
3050                }
3051                let h = content_hash(&content);
3052                if self.storage.is_hash_invalidated(&h)? {
3053                    continue; // skip invalidated content, try others
3054                }
3055                let redacted = action == SanitizeAction::Redact;
3056                let conf = if redacted { 0.4 } else { 0.55 };
3057                let now2 = utc_now_iso();
3058                let chunk_id = gen_uuid();
3059                let tokens = estimate_tokens(&content) as i64;
3060                let row = ChunkRow {
3061                    id: chunk_id,
3062                    content: content.clone(),
3063                    trigger_desc: dc.trigger_desc.clone(),
3064                    anti_trigger_desc: dc.anti_trigger_desc,
3065                    content_hash: h,
3066                    token_count: Some(tokens),
3067                    origin: "distilled".to_string(),
3068                    distilled_from: Some(dc.source_log_id),
3069                    distill_provider: provenance.provider.clone(),
3070                    distill_model: provenance.model.clone(),
3071                    distill_prompt_version: provenance.prompt_version.clone(),
3072                    state: "pending".to_string(),
3073                    state_reason: Some("init:distilled".to_string()),
3074                    confidence: conf,
3075                    confidence_reason: Some("init:distilled".to_string()),
3076                    version: 1,
3077                    embed_version: 1,
3078                    created_at: now2.clone(),
3079                    updated_at: now2,
3080                    ..Default::default()
3081                };
3082                let cvec = match self.embedding.embed_content(&content) {
3083                    Ok(v) => v,
3084                    Err(_) => {
3085                        embedding_failures += 1;
3086                        continue;
3087                    }
3088                };
3089                let tvec = match self
3090                    .embedding
3091                    .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
3092                {
3093                    Ok(v) => v,
3094                    Err(_) => {
3095                        embedding_failures += 1;
3096                        continue;
3097                    }
3098                };
3099                prepared.push(PreparedChunk {
3100                    row,
3101                    cvec_bytes: pack_embedding(&cvec),
3102                    tvec_bytes: pack_embedding(&tvec),
3103                });
3104            }
3105
3106            if prepared.is_empty() {
3107                let note = if embedding_failures > 0 {
3108                    "embedding_failed"
3109                } else {
3110                    "all_chunks_filtered"
3111                };
3112                self.finish_distill_log(
3113                    log_id,
3114                    if embedding_failures > 0 {
3115                        "failed"
3116                    } else {
3117                        "discarded"
3118                    },
3119                    Some(note),
3120                    prompt_tokens,
3121                    completion_tokens,
3122                )?;
3123                if embedding_failures > 0 {
3124                    failed_logs.insert(log_id.to_string());
3125                }
3126                continue;
3127            }
3128
3129            // Write all chunks, vectors, token accounting, and terminal log state atomically.
3130            let accounted_at = utc_now_iso();
3131            self.storage.begin_immediate()?;
3132            let write_result = (|| -> Result<()> {
3133                for pc in &prepared {
3134                    self.storage.insert_chunk(&pc.row)?;
3135                    self.storage.insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
3136                    self.storage.insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
3137                }
3138                let note = (embedding_failures > 0)
3139                    .then(|| format!("partial_embedding_failures:{embedding_failures}"));
3140                self.storage.finish_distill_log(
3141                    log_id,
3142                    "distilled",
3143                    note.as_deref(),
3144                    prompt_tokens,
3145                    completion_tokens,
3146                    &accounted_at,
3147                )?;
3148                self.storage.commit()
3149            })();
3150            if let Err(error) = write_result {
3151                let _ = self.storage.rollback();
3152                let note = format!("distill_write_failed:{error}");
3153                self.finish_distill_log(
3154                    log_id,
3155                    "failed",
3156                    Some(&note),
3157                    prompt_tokens,
3158                    completion_tokens,
3159                )?;
3160                failed_logs.insert(log_id.to_string());
3161                continue;
3162            }
3163            count += 1;
3164        }
3165        if !distill_errors.is_empty() {
3166            // Log failures but do not abort: successful chunks are already committed and
3167            // failed logs are marked 'failed' for bounded retry. Returning Ok preserves
3168            // evolve request state and allows finish_covered_evolve_requests to run.
3169            eprintln!(
3170                "[innate] distillation partial failure ({} log(s)): {}",
3171                distill_errors.len(),
3172                distill_errors.join("; ")
3173            );
3174        }
3175        Ok(DistillBatchReport {
3176            distilled: count,
3177            failed: failed_logs.len(),
3178        })
3179    }
3180
3181    fn finish_distill_log(
3182        &self,
3183        log_id: &str,
3184        state: &str,
3185        note: Option<&str>,
3186        prompt_tokens: i64,
3187        completion_tokens: i64,
3188    ) -> Result<()> {
3189        let accounted_at = utc_now_iso();
3190        self.storage.begin_immediate()?;
3191        let result = (|| -> Result<()> {
3192            self.storage.finish_distill_log(
3193                log_id,
3194                state,
3195                note,
3196                prompt_tokens,
3197                completion_tokens,
3198                &accounted_at,
3199            )?;
3200            self.storage.commit()
3201        })();
3202        if result.is_err() {
3203            let _ = self.storage.rollback();
3204        }
3205        result
3206    }
3207
3208    fn distill_token_period_start(&self, now: &str) -> Result<String> {
3209        let window_hours = self
3210            .storage
3211            .get_meta("evolve.distill_token_window_hours")?
3212            .and_then(|value| value.parse::<i64>().ok())
3213            .unwrap_or(24)
3214            .max(1);
3215        Ok(hours_ago(now, window_hours))
3216    }
3217
3218    pub(crate) fn builtin_curate_impl(&self, scope: &CurateScope) -> Result<CurateReport> {
3219        let mut report = CurateReport::default();
3220        let now_iso = utc_now_iso();
3221        if scope.dry_run {
3222            // dry_run: compute report without writing
3223            let archived_count: i64 = count_query(&self.storage,
3224                "SELECT COUNT(*) FROM chunks WHERE origin!='spark' AND protected=0 AND state='active'")?;
3225            report.stats.insert("dry_run".to_string(), json!(true));
3226            report
3227                .stats
3228                .insert("eligible_for_governance".to_string(), json!(archived_count));
3229            return Ok(report);
3230        }
3231
3232        // ── Step 1-4: aggregate (single BEGIN IMMEDIATE, half-open cutoff window) ──
3233        self.storage.begin_immediate()?;
3234        let agg_result = (|| -> Result<()> {
3235            let cutoff_ts = now_iso.clone();
3236
3237            // 1. Rebuild post-4.12 success facts from retained attribution events.
3238            self.storage.conn_execute(
3239                "DELETE FROM chunk_success_traces",
3240                rusqlite::params![],
3241            )?;
3242            self.storage.conn_execute(
3243                "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts)
3244                 SELECT ut.chunk_id, ut.trace_id, MAX(ut.ts)
3245                 FROM usage_trace ut
3246                 WHERE ut.event = 'used'
3247                   AND ut.chunk_id IS NOT NULL
3248                   AND ut.ts > COALESCE((
3249                     SELECT evidence_cutoff_at FROM chunks c
3250                     WHERE c.id=ut.chunk_id
3251                   ), '')
3252                   AND (
3253                     EXISTS (SELECT 1 FROM usage_trace ok
3254                             WHERE ok.trace_id = ut.trace_id
3255                               AND ok.event = 'task_ok' AND ok.chunk_id IS NULL)
3256                     OR EXISTS (SELECT 1 FROM episodic_log el
3257                                WHERE el.trace_id = ut.trace_id AND el.outcome = 'ok')
3258                   )
3259                 GROUP BY ut.chunk_id, ut.trace_id",
3260                rusqlite::params![],
3261            )?;
3262
3263            // 2. Derive success counts from migration baseline + replayable facts.
3264            self.storage.conn_execute(
3265                "WITH cst AS (
3266                   SELECT chunk_id, COUNT(*) AS cnt, MAX(ts) AS max_ts
3267                   FROM chunk_success_traces
3268                   GROUP BY chunk_id
3269                 )
3270                 UPDATE chunks SET
3271                   used_success_count = used_success_count_base
3272                     + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3273                   success_trace_ids_count = used_success_count_base
3274                     + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3275                   last_success_at = COALESCE(
3276                     (SELECT max_ts FROM cst WHERE cst.chunk_id=chunks.id),
3277                     last_success_at
3278                   )
3279                 WHERE origin!='spark'",
3280                rusqlite::params![],
3281            )?;
3282
3283            // 3. Recompute selected/used counts and last-use from retained facts.
3284            self.storage.conn_execute(
3285                "UPDATE chunks SET
3286                   selected_count = selected_count_base + COALESCE(
3287                     (SELECT COUNT(*) FROM usage_trace
3288                      WHERE chunk_id = chunks.id AND event = 'selected'
3289                        AND ts > COALESCE(chunks.evidence_cutoff_at, '')), 0),
3290                   used_count = used_count_base + COALESCE(
3291                     (SELECT COUNT(*) FROM usage_trace
3292                      WHERE chunk_id = chunks.id AND event = 'used'
3293                        AND ts > COALESCE(chunks.evidence_cutoff_at, '')), 0),
3294                   last_used_at = COALESCE(
3295                     (SELECT MAX(ts) FROM usage_trace
3296                      WHERE chunk_id=chunks.id AND event='used'
3297                        AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
3298                     last_used_base
3299                   )
3300                 WHERE origin!='spark'",
3301                rusqlite::params![],
3302            )?;
3303
3304            // 4. Advance watermark and purge only verbose retrieval events.
3305            // Attributed facts remain replayable so repeated curate runs are idempotent
3306            // and a later correction can subtract the previous trace contribution.
3307            self.storage.set_meta("last_agg_ts", &cutoff_ts)?;
3308            self.storage.purge_usage_trace(&cutoff_ts)?;
3309            self.storage.commit()
3310        })();
3311        if agg_result.is_err() {
3312            let _ = self.storage.rollback();
3313            agg_result?;
3314        }
3315
3316        // ── Step 2: recover_logs ──
3317        self.storage.begin_immediate()?;
3318        let recover_result = (|| -> Result<()> {
3319            // Stale screening rows → 'failed' (not 'open'), note = 'screening_timeout:<run_id>'.
3320            let screening_cutoff = minutes_ago(&now_iso, self.screening_timeout_minutes);
3321            let stale = self.storage.query_chunks_params(
3322                "SELECT id, distill_run_id FROM episodic_log
3323                 WHERE distill_state='screening' AND distill_locked_at < ?",
3324                rusqlite::params![screening_cutoff],
3325            )?;
3326            for row in &stale {
3327                let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3328                let run_id = row
3329                    .get("distill_run_id")
3330                    .and_then(Value::as_str)
3331                    .unwrap_or("unknown");
3332                let note = format!("screening_timeout:{run_id}");
3333                self.storage.conn_execute(
3334                "UPDATE episodic_log
3335                 SET distill_state='failed', distill_note=?,
3336                     distill_attempts=distill_attempts+1,
3337                     distill_last_failed_at=?,
3338                     distill_run_id=NULL, distill_locked_at=NULL
3339                 WHERE id=?",
3340                    rusqlite::params![note, now_iso, id],
3341                )?;
3342                report.recovered.push(id.to_string());
3343                report
3344                    .warnings
3345                    .push(format!("stale screening recovered as failed: {id}"));
3346            }
3347
3348            // Open logs past TTL → discarded (insufficient material, never record'd).
3349            let open_ttl_cutoff = days_ago(&now_iso, self.open_ttl_days);
3350            self.storage.conn_execute(
3351                "UPDATE episodic_log
3352                 SET distill_state='discarded', distill_note='no_record_timeout',
3353                     task_state='timed_out', completed_at=?
3354                 WHERE distill_state='open' AND ts < ?",
3355                rusqlite::params![now_iso, open_ttl_cutoff],
3356            )?;
3357            self.storage.commit()
3358        })();
3359        if recover_result.is_err() {
3360            let _ = self.storage.rollback();
3361            recover_result?;
3362        }
3363
3364        // ── Steps 3-7: governance (archive, dedupe, decay, promote, cycle) ──
3365        let scope_origin = scope.origin.clone();
3366        let scope_skill = scope.skill_name.clone();
3367        self.storage.begin_immediate()?;
3368        let gov_result = (|| -> Result<()> {
3369            // New feedback refreshes its chunk synchronously in Record. Curate only needs
3370            // to age pending proposals; scanning every historical feedback row is unbounded.
3371            let governance_chunks = self
3372                .storage
3373                .query_chunks(
3374                    "SELECT DISTINCT chunk_id FROM governance_proposals WHERE state='pending'",
3375                )?;
3376            for row in governance_chunks {
3377                if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
3378                    self.refresh_governance_evidence(chunk_id, &now_iso)?;
3379                }
3380            }
3381
3382            // ── 3a. Archive: low_confidence — only blocks that HAVE been used ──
3383            let low_conf_cutoff = days_ago(&now_iso, self.low_conf_idle_days);
3384            let low_conf = self.storage.query_chunks_params(
3385                "SELECT id FROM chunks
3386                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3387                   AND last_used_at IS NOT NULL
3388                   AND confidence < ?
3389                   AND last_used_at < ?
3390                   AND (? IS NULL OR origin=?)
3391                   AND (? IS NULL OR skill_name=?)",
3392                rusqlite::params![
3393                    self.low_conf_threshold,
3394                    low_conf_cutoff,
3395                    scope_origin,
3396                    scope_origin,
3397                    scope_skill,
3398                    scope_skill
3399                ],
3400            )?;
3401            for c in &low_conf {
3402                if let Some(id) = c.get("id").and_then(Value::as_str) {
3403                    self.storage.update_chunk_state(
3404                        id,
3405                        "archived",
3406                        Some("low_confidence"),
3407                        &now_iso,
3408                    )?;
3409                    report.archived.push(id.to_string());
3410                }
3411            }
3412
3413            // ── 3b. Archive: repeated_selected_unused ──
3414            let rep_sel = self.storage.query_chunks_params(
3415                "SELECT id FROM chunks
3416                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3417                   AND selected_count >= ? AND used_count = 0 AND confidence < ?
3418                   AND (? IS NULL OR origin=?)
3419                   AND (? IS NULL OR skill_name=?)",
3420                rusqlite::params![
3421                    self.repeat_select_min,
3422                    self.repeat_select_conf_max,
3423                    scope_origin,
3424                    scope_origin,
3425                    scope_skill,
3426                    scope_skill
3427                ],
3428            )?;
3429            for c in &rep_sel {
3430                if let Some(id) = c.get("id").and_then(Value::as_str) {
3431                    if !report.archived.contains(&id.to_string()) {
3432                        self.storage.update_chunk_state(
3433                            id,
3434                            "archived",
3435                            Some("repeated_selected_unused"),
3436                            &now_iso,
3437                        )?;
3438                        report.archived.push(id.to_string());
3439                    }
3440                }
3441            }
3442
3443            // ── 3c. Archive: never_used — never entered context at all ──
3444            let never_used_cutoff = days_ago(&now_iso, self.never_used_age_days);
3445            let never_used = self.storage.query_chunks_params(
3446                "SELECT id FROM chunks
3447                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3448                   AND used_count = 0 AND selected_count = 0
3449                   AND COALESCE(evidence_cutoff_at, created_at) < ?
3450                   AND (? IS NULL OR origin=?)
3451                   AND (? IS NULL OR skill_name=?)",
3452                rusqlite::params![
3453                    never_used_cutoff,
3454                    scope_origin,
3455                    scope_origin,
3456                    scope_skill,
3457                    scope_skill
3458                ],
3459            )?;
3460            for c in &never_used {
3461                if let Some(id) = c.get("id").and_then(Value::as_str) {
3462                    if !report.archived.contains(&id.to_string()) {
3463                        self.storage.update_chunk_state(
3464                            id,
3465                            "archived",
3466                            Some("never_used"),
3467                            &now_iso,
3468                        )?;
3469                        report.archived.push(id.to_string());
3470                    }
3471                }
3472            }
3473
3474            // ── 3d. Archive: governance_proposal ──
3475            // Chunks whose pending governance proposals have accumulated enough evidence
3476            // are archived and the proposals accepted atomically.
3477            let gov_proposals = self.storage.query_chunks_params(
3478                "SELECT DISTINCT chunk_id FROM governance_proposals
3479                 WHERE state='pending'
3480                   AND evidence_score >= ? AND actor_count >= 2",
3481                rusqlite::params![self.governance_archive_threshold as f64],
3482            )?;
3483            for c in &gov_proposals {
3484                if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3485                    let already_archived = report.archived.contains(&cid.to_string());
3486                    let eligible = !already_archived && self.storage.get_chunk(cid)?.map(|ch| {
3487                        ch.get("origin").and_then(Value::as_str) != Some("spark")
3488                            && ch.get("protected").and_then(Value::as_i64).unwrap_or(0) == 0
3489                            && matches!(
3490                                ch.get("state").and_then(Value::as_str),
3491                                Some("active") | Some("pending")
3492                            )
3493                    }).unwrap_or(false);
3494                    if eligible {
3495                        self.storage.update_chunk_state(
3496                            cid,
3497                            "archived",
3498                            Some("governance_proposal"),
3499                            &now_iso,
3500                        )?;
3501                        report.archived.push(cid.to_string());
3502                        self.storage.conn_execute(
3503                            "UPDATE governance_proposals
3504                             SET state='accepted', updated_at=?
3505                             WHERE chunk_id=? AND state='pending'",
3506                            rusqlite::params![now_iso, cid],
3507                        )?;
3508                    } else {
3509                        self.storage.conn_execute(
3510                            "UPDATE governance_proposals
3511                             SET state='rejected', updated_at=?
3512                             WHERE chunk_id=? AND state='pending'",
3513                            rusqlite::params![now_iso, cid],
3514                        )?;
3515                    }
3516                }
3517            }
3518
3519            // ── 3d2. Expire stale governance proposals (insufficient evidence, too old) ──
3520            // Proposals that never accumulate enough evidence cause repeated evolve cycles.
3521            // Reject them after governance_proposal_max_age_days so they stop triggering.
3522            let proposal_expiry_cutoff = days_ago(&now_iso, self.governance_proposal_max_age_days);
3523            self.storage.conn_execute(
3524                "UPDATE governance_proposals
3525                 SET state='rejected', updated_at=?
3526                 WHERE state='pending'
3527                   AND evidence_score < ?
3528                   AND created_at < ?",
3529                rusqlite::params![
3530                    now_iso,
3531                    self.governance_archive_threshold as f64,
3532                    proposal_expiry_cutoff
3533                ],
3534            )?;
3535
3536            // ── 3e. Archive: sustained_negative_feedback ──
3537            // Chunks with too many negative feedback events are archived regardless of
3538            // how long they've been idle, giving feedback a direct archival path.
3539            let neg_feedback_chunks = self.storage.query_chunks_params(
3540                "SELECT p.chunk_id FROM governance_proposals p
3541                 JOIN chunks c ON c.id = p.chunk_id
3542                 WHERE c.origin!='spark' AND c.protected=0
3543                   AND c.state IN ('active','pending')
3544                   AND p.state='pending'
3545                   AND p.evidence_score >= ? AND p.actor_count >= 2
3546                   AND (? IS NULL OR c.origin=?)
3547                   AND (? IS NULL OR c.skill_name=?)
3548                 GROUP BY p.chunk_id",
3549                rusqlite::params![
3550                    self.negative_feedback_archive_threshold as f64,
3551                    scope_origin,
3552                    scope_origin,
3553                    scope_skill,
3554                    scope_skill
3555                ],
3556            )?;
3557            for c in &neg_feedback_chunks {
3558                if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3559                    if !report.archived.contains(&cid.to_string()) {
3560                        self.storage.update_chunk_state(
3561                            cid,
3562                            "archived",
3563                            Some("sustained_negative_feedback"),
3564                            &now_iso,
3565                        )?;
3566                        report.archived.push(cid.to_string());
3567                    }
3568                }
3569            }
3570
3571            // ── 3f. Archive: sustained_task_failure ──
3572            // Covers both active and pending chunks: a pending chunk recalled repeatedly
3573            // but never producing successful tasks also has no other archive path.
3574            let high_fail_chunks = self.storage.query_chunks_params(
3575                "SELECT id FROM chunks
3576                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3577                   AND used_count >= ?
3578                   AND CAST(used_success_count AS REAL) / CAST(used_count AS REAL) < ?
3579                   AND confidence < ?
3580                   AND (? IS NULL OR origin=?)
3581                   AND (? IS NULL OR skill_name=?)",
3582                rusqlite::params![
3583                    self.failure_min_uses,
3584                    self.failure_max_success_rate,
3585                    self.failure_confidence_max,
3586                    scope_origin, scope_origin, scope_skill, scope_skill
3587                ],
3588            )?;
3589            for c in &high_fail_chunks {
3590                if let Some(cid) = c.get("id").and_then(Value::as_str) {
3591                    if !report.archived.contains(&cid.to_string()) {
3592                        self.storage.update_chunk_state(
3593                            cid, "archived", Some("sustained_task_failure"), &now_iso,
3594                        )?;
3595                        report.archived.push(cid.to_string());
3596                    }
3597                }
3598            }
3599
3600            // ── 4. Dedupe: same content_hash — keep protected or highest confidence ──
3601            let dupes = self.storage.query_chunks_params(
3602                "SELECT content_hash FROM chunks
3603                 WHERE origin!='spark' AND state IN ('active','pending')
3604                   AND (? IS NULL OR origin=?)
3605                   AND (? IS NULL OR skill_name=?)
3606                 GROUP BY content_hash HAVING COUNT(*) > 1",
3607                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3608            )?;
3609            for d in &dupes {
3610                if let Some(h) = d.get("content_hash").and_then(Value::as_str) {
3611                    let group = self.storage.query_chunks_params(
3612                        "SELECT id, confidence, protected FROM chunks
3613                         WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending')
3614                           AND (? IS NULL OR origin=?)
3615                           AND (? IS NULL OR skill_name=?)
3616                         ORDER BY protected DESC, confidence DESC",
3617                        rusqlite::params![h, scope_origin, scope_origin, scope_skill, scope_skill],
3618                    )?;
3619                    let canonical_id = group
3620                        .first()
3621                        .and_then(|row| row.get("id"))
3622                        .and_then(Value::as_str)
3623                        .unwrap_or("");
3624                    for row in group.iter().skip(1) {
3625                        let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3626                        let reason = format!("duplicate:{canonical_id}");
3627                        self.storage
3628                            .update_chunk_state(id, "archived", Some(&reason), &now_iso)?;
3629                        self.storage.conn_execute(
3630                            "UPDATE chunks SET parent_id=?, updated_at=? WHERE id=?",
3631                            rusqlite::params![canonical_id, now_iso, id],
3632                        )?;
3633                        report.deduped.push(id.to_string());
3634                    }
3635                }
3636            }
3637
3638            // ── 5. Decay: confidence time-decay for idle non-spark non-protected active chunks ──
3639            // Issue 6: Use last_decayed_at as the delta reference to avoid compounding.
3640            // Each curate only applies the INCREMENTAL decay since the previous run, so the
3641            // 90-day half-life is preserved regardless of how often curate runs.
3642            let decay_candidates = self.storage.query_chunks_params(
3643                "SELECT id, confidence, last_used_at, last_decayed_at FROM chunks
3644                 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3645                   AND last_used_at IS NOT NULL
3646                   AND (? IS NULL OR origin=?)
3647                   AND (? IS NULL OR skill_name=?)",
3648                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3649            )?;
3650            for c in &decay_candidates {
3651                let id = match c.get("id").and_then(Value::as_str) {
3652                    Some(v) => v,
3653                    None => continue,
3654                };
3655                let conf = c.get("confidence").and_then(Value::as_f64).unwrap_or(0.5);
3656                let last_used = c.get("last_used_at").and_then(Value::as_str).unwrap_or(&now_iso);
3657                // Use last_decayed_at (or last_used_at) as the reference for incremental delta.
3658                let decay_ref = c
3659                    .get("last_decayed_at")
3660                    .and_then(Value::as_str)
3661                    .filter(|s| *s > last_used)
3662                    .unwrap_or(last_used);
3663                let delta_days = iso_days_diff(&now_iso, decay_ref);
3664                if delta_days <= 0 {
3665                    continue;
3666                }
3667                let floor = self.decay_floor;
3668                let decay_alpha = 1.0 - 0.5_f64.powf(delta_days as f64 / 90.0);
3669                let new_conf = conf + decay_alpha * (floor - conf);
3670                if (new_conf - conf).abs() > 0.001 {
3671                    let note = format!("decay:{delta_days}d");
3672                    self.storage.upsert_confidence_evidence(
3673                        &gen_uuid(),
3674                        None,
3675                        id,
3676                        "decay",
3677                        floor,
3678                        decay_alpha,
3679                        &note,
3680                        None,
3681                        &now_iso,
3682                    )?;
3683                    self.recompute_chunk_confidence(id, &now_iso)?;
3684                    self.storage.update_chunk_last_decayed_at(id, &now_iso)?;
3685                    report.decayed.push(id.to_string());
3686                }
3687            }
3688
3689            // ── 6. Promote: pending → active when three-guard criteria met ──
3690            let promotable = self.storage.query_chunks_params(
3691                "SELECT id FROM chunks
3692                 WHERE state='pending' AND origin!='spark'
3693                   AND used_success_count >= ?
3694                   AND success_trace_ids_count >= 2
3695                   AND confidence >= ?
3696                   AND (? IS NULL OR origin=?)
3697                   AND (? IS NULL OR skill_name=?)",
3698                rusqlite::params![
3699                    self.promote_used_success_min,
3700                    self.promote_confidence_min,
3701                    scope_origin,
3702                    scope_origin,
3703                    scope_skill,
3704                    scope_skill
3705                ],
3706            )?;
3707            for c in &promotable {
3708                if let Some(id) = c.get("id").and_then(Value::as_str) {
3709                    self.storage.update_chunk_state(
3710                        id,
3711                        "active",
3712                        Some("repeated_success"),
3713                        &now_iso,
3714                    )?;
3715                }
3716            }
3717
3718            // ── 7. Cycle/orphan detection (report only, no auto-fix) ──
3719            let all_deps = self
3720                .storage
3721                .query_chunks("SELECT src, dst FROM deps WHERE kind='hard'")?;
3722            let cycles = detect_cycles(&all_deps);
3723            report.cycles = cycles;
3724            let orphan_rows = self.storage.query_chunks_params(
3725                "SELECT d.src, d.dst, s.id AS src_exists, t.id AS dst_exists
3726                 FROM deps d
3727                 LEFT JOIN chunks s ON s.id=d.src
3728                 LEFT JOIN chunks t ON t.id=d.dst
3729                 WHERE d.kind='hard'
3730                   AND (? IS NULL OR s.origin=?)
3731                   AND (? IS NULL OR s.skill_name=?)",
3732                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3733            )?;
3734            let mut orphans = HashSet::new();
3735            for row in orphan_rows {
3736                if row.get("src_exists").is_none_or(Value::is_null) {
3737                    if let Some(id) = row.get("src").and_then(Value::as_str) {
3738                        orphans.insert(id.to_string());
3739                    }
3740                }
3741                if row.get("dst_exists").is_none_or(Value::is_null) {
3742                    if let Some(id) = row.get("dst").and_then(Value::as_str) {
3743                        orphans.insert(id.to_string());
3744                    }
3745                }
3746            }
3747            report.orphans = orphans.into_iter().collect();
3748            report.orphans.sort();
3749
3750            // ── 8. Full context_stats rebuild — periodic correction pass.
3751            // record() only does targeted per-chunk updates; curate keeps the whole
3752            // table accurate by rebuilding from all sources once per cycle.
3753            self.rebuild_context_stats(&now_iso)?;
3754
3755            self.storage.commit()
3756        })();
3757        if gov_result.is_err() {
3758            let _ = self.storage.rollback();
3759            gov_result?;
3760        }
3761
3762        // ── Step 8: compact old terminal logs while preserving trace identity. ──
3763        // The compact row keeps attribution corrections and audit joins possible without
3764        // retaining potentially large raw outputs indefinitely.
3765        self.storage.begin_immediate()?;
3766        let purge_cutoff = days_ago(&now_iso, 30);
3767        let purge_result = self
3768            .storage
3769            .conn_execute(
3770                "UPDATE episodic_log
3771                 SET query=NULL, recall_snapshot=NULL, output=NULL, output_summary=NULL,
3772                     nomination=NULL,
3773                     distill_note=COALESCE(distill_note, 'compacted')
3774                 WHERE distill_state IN ('distilled','discarded','failed')
3775                   AND ts < ?",
3776                rusqlite::params![purge_cutoff],
3777            )
3778            .and_then(|_| self.storage.commit());
3779        if purge_result.is_err() {
3780            let _ = self.storage.rollback();
3781            purge_result?;
3782        }
3783
3784        // ── Step 9: prune completed/failed evolve_requests older than 30 days ──
3785        // Prevents unbounded table growth that would slow COUNT(*) queries.
3786        self.storage.begin_immediate()?;
3787        let evolve_req_cutoff = days_ago(&now_iso, 30);
3788        let prune_req_result = self
3789            .storage
3790            .conn_execute(
3791                "DELETE FROM evolve_requests
3792                 WHERE state IN ('completed','failed') AND requested_at < ?",
3793                rusqlite::params![evolve_req_cutoff],
3794            )
3795            .and_then(|_| self.storage.commit());
3796        if prune_req_result.is_err() {
3797            let _ = self.storage.rollback();
3798            prune_req_result?;
3799        }
3800
3801        Ok(report)
3802    }
3803
3804    // ------------------------------------------------------------------
3805    // Public API 8: inspect
3806    // ------------------------------------------------------------------
3807
3808    pub fn inspect(&self) -> Result<Value> {
3809        let total: i64 = count_query(
3810            &self.storage,
3811            "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
3812        )?;
3813        let active: i64 = count_query(
3814            &self.storage,
3815            "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
3816        )?;
3817        let pending: i64 = count_query(
3818            &self.storage,
3819            "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
3820        )?;
3821        let archived: i64 = count_query(
3822            &self.storage,
3823            "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
3824        )?;
3825        let sparks: i64 = count_query(
3826            &self.storage,
3827            "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
3828        )?;
3829        let open_logs: i64 = count_query(
3830            &self.storage,
3831            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
3832        )?;
3833        let new_logs: i64 = count_query(
3834            &self.storage,
3835            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
3836        )?;
3837        let embed_rebuild: i64 = count_query(&self.storage,
3838            "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
3839        let schema_version = self.storage.get_meta_or("schema_version", "?");
3840        let lib_id = self.storage.get_meta_or("lib_id", "?");
3841        let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
3842
3843        let metric_window_start = days_ago(&utc_now_iso(), 30);
3844        let trace_metrics = self.storage.query_chunks_params(
3845            "SELECT COUNT(*) AS total,
3846                    SUM(CASE WHEN task_state='completed' THEN 1 ELSE 0 END) AS completed,
3847                    SUM(CASE WHEN task_state='timed_out' THEN 1 ELSE 0 END) AS timed_out,
3848                    SUM(CASE WHEN task_state='completed' AND usage_state!='unknown'
3849                             THEN 1 ELSE 0 END) AS usage_known,
3850                    SUM(CASE WHEN task_state='completed' AND usage_state='known_some'
3851                             THEN 1 ELSE 0 END) AS usage_some,
3852                    SUM(CASE WHEN task_state='completed'
3853                                  AND outcome IN ('ok','fail')
3854                             THEN 1 ELSE 0 END) AS outcome_known,
3855                    SUM(CASE WHEN outcome='ok' THEN 1 ELSE 0 END) AS succeeded
3856             FROM episodic_log WHERE ts >= ?",
3857            rusqlite::params![metric_window_start],
3858        )?;
3859        let trace_row = trace_metrics.first();
3860        let trace_total = trace_row
3861            .and_then(|row| row.get("total"))
3862            .and_then(Value::as_i64)
3863            .unwrap_or(0);
3864        let trace_completed = trace_row
3865            .and_then(|row| row.get("completed"))
3866            .and_then(Value::as_i64)
3867            .unwrap_or(0);
3868        let trace_timed_out = trace_row
3869            .and_then(|row| row.get("timed_out"))
3870            .and_then(Value::as_i64)
3871            .unwrap_or(0);
3872        let usage_known = trace_row
3873            .and_then(|row| row.get("usage_known"))
3874            .and_then(Value::as_i64)
3875            .unwrap_or(0);
3876        let usage_some = trace_row
3877            .and_then(|row| row.get("usage_some"))
3878            .and_then(Value::as_i64)
3879            .unwrap_or(0);
3880        let succeeded = trace_row
3881            .and_then(|row| row.get("succeeded"))
3882            .and_then(Value::as_i64)
3883            .unwrap_or(0);
3884        let outcome_known = trace_row
3885            .and_then(|row| row.get("outcome_known"))
3886            .and_then(Value::as_i64)
3887            .unwrap_or(0);
3888        let usage_rows = self.storage.query_chunks_params(
3889            "SELECT recall_snapshot, used_ids FROM episodic_log
3890             WHERE task_state='completed'
3891               AND usage_state!='unknown' AND used_complete=1
3892               AND recall_snapshot IS NOT NULL AND used_ids IS NOT NULL
3893               AND ts >= ?",
3894            rusqlite::params![metric_window_start],
3895        )?;
3896        let mut selected_total = 0_i64;
3897        let mut selected_used = 0_i64;
3898        for row in usage_rows {
3899            let selected: HashSet<String> = row
3900                .get("recall_snapshot")
3901                .and_then(Value::as_str)
3902                .and_then(|raw| serde_json::from_str::<Value>(raw).ok())
3903                .and_then(|snapshot| snapshot.get("selected").cloned())
3904                .and_then(|value| serde_json::from_value::<Vec<String>>(value).ok())
3905                .unwrap_or_default()
3906                .into_iter()
3907                .collect();
3908            let used: HashSet<String> = row
3909                .get("used_ids")
3910                .and_then(Value::as_str)
3911                .and_then(|raw| serde_json::from_str::<Vec<String>>(raw).ok())
3912                .unwrap_or_default()
3913                .into_iter()
3914                .collect();
3915            selected_total += selected.len() as i64;
3916            selected_used += selected.intersection(&used).count() as i64;
3917        }
3918        let feedback_count = count_query_params(
3919            &self.storage,
3920            "SELECT COUNT(*) FROM feedback_events WHERE ts >= ?",
3921            rusqlite::params![metric_window_start],
3922        )?;
3923        let feedback_traces = count_query_params(
3924            &self.storage,
3925            "SELECT COUNT(DISTINCT f.trace_id)
3926             FROM feedback_events f
3927             JOIN episodic_log e ON e.trace_id=f.trace_id
3928             WHERE f.ts >= ? AND e.ts >= ? AND e.task_state='completed'",
3929            rusqlite::params![metric_window_start, metric_window_start],
3930        )?;
3931        let pending_evolve = count_query(
3932            &self.storage,
3933            "SELECT COUNT(*) FROM evolve_requests WHERE state IN ('pending','running')",
3934        )?;
3935        let governance_pending = count_query(
3936            &self.storage,
3937            "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
3938        )?;
3939        let failed_evolve = count_query_params(
3940            &self.storage,
3941            "SELECT COUNT(*) FROM evolve_requests
3942             WHERE last_failed_at >= ?",
3943            rusqlite::params![metric_window_start],
3944        )?;
3945        let failed_distill = count_query_params(
3946            &self.storage,
3947            "SELECT COUNT(*) FROM episodic_log
3948             WHERE distill_last_failed_at >= ?",
3949            rusqlite::params![metric_window_start],
3950        )?;
3951        let confidence_buckets = self.storage.query_chunks(
3952            &format!("SELECT
3953               SUM(CASE WHEN confidence < 0.25 THEN 1 ELSE 0 END) AS low,
3954               SUM(CASE WHEN confidence >= 0.25 AND confidence < {0} THEN 1 ELSE 0 END) AS medium,
3955               SUM(CASE WHEN confidence >= {0} THEN 1 ELSE 0 END) AS high
3956             FROM chunks WHERE origin!='spark' AND state!='archived'",
3957                self.promote_confidence_min),
3958        )?;
3959        let confidence_row = confidence_buckets.first();
3960
3961        // P3-A: oldest pending chunk timestamp — surfaces long-lived pending debt.
3962        let pending_oldest_ts = self.storage.query_chunks(
3963            "SELECT MIN(created_at) AS oldest FROM chunks WHERE state='pending' AND origin!='spark'",
3964        )?.into_iter().next()
3965            .and_then(|r| r.get("oldest").cloned())
3966            .and_then(|v| if v.is_null() { None } else { Some(v) });
3967
3968        // Health signal 1: knowledge debt ratio.
3969        // Zombie = active chunks with middling confidence (stuck, neither good nor bad)
3970        // that are at least 14d old and have been used at least once.
3971        // "never-recalled old" chunks are handled by curate 3c (never_used archive).
3972        let zombie_cutoff = days_ago(&utc_now_iso(), 14);
3973        let zombie: i64 = count_query_params(
3974            &self.storage,
3975            "SELECT COUNT(*) FROM chunks
3976             WHERE origin!='spark' AND state='active'
3977               AND confidence >= 0.4 AND confidence <= 0.6
3978               AND last_used_at IS NOT NULL
3979               AND created_at < ?",
3980            rusqlite::params![zombie_cutoff],
3981        )?;
3982        let debt_numerator = pending + zombie;
3983        let debt_denominator = active.max(1);
3984        let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
3985
3986        // Health signal 5: stale screening count
3987        let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
3988        let stale_screening: i64 = count_query_params(
3989            &self.storage,
3990            "SELECT COUNT(*) FROM episodic_log
3991             WHERE distill_state='screening' AND distill_locked_at < ?",
3992            rusqlite::params![screening_cutoff],
3993        )?;
3994
3995        // Health signal 4: actual Distill cost within the configured rolling window.
3996        let distill_period_start = self.distill_token_period_start(&utc_now_iso())?;
3997        let distill_cost = self.storage.query_chunks_params(
3998            "SELECT COALESCE(SUM(prompt_tokens),0) AS pt,
3999                    COALESCE(SUM(completion_tokens),0) AS ct
4000             FROM distill_token_usage
4001             WHERE accounted_at >= ?",
4002            rusqlite::params![distill_period_start],
4003        )?;
4004        let prompt_tokens = distill_cost
4005            .first()
4006            .and_then(|r| r.get("pt"))
4007            .and_then(Value::as_i64)
4008            .unwrap_or(0);
4009        let completion_tokens = distill_cost
4010            .first()
4011            .and_then(|r| r.get("ct"))
4012            .and_then(Value::as_i64)
4013            .unwrap_or(0);
4014
4015        // Health signal 2: sparks that have been recalled often (soft incubation threshold = 5)
4016        let spark_threshold: i64 = self
4017            .storage
4018            .get_meta("curate.soft_mature_threshold")
4019            .ok()
4020            .flatten()
4021            .and_then(|v| v.parse::<i64>().ok())
4022            .unwrap_or(5);
4023        let recurring_sparks = self.storage.query_chunks_params(
4024            "SELECT ut.chunk_id, COUNT(*) AS cnt,
4025                    c.content, c.trigger_desc, c.maturity
4026             FROM usage_trace ut
4027             JOIN chunks c ON c.id = ut.chunk_id
4028             WHERE ut.event='retrieved'
4029               AND c.origin='spark'
4030             GROUP BY ut.chunk_id HAVING cnt >= ?",
4031            rusqlite::params![spark_threshold],
4032        )?;
4033        let recurring_spark_ids: Vec<Value> = recurring_sparks
4034            .iter()
4035            .map(|r| {
4036                json!({
4037                    "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
4038                    "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
4039                    "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
4040                    "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
4041                        .chars().take(80).collect::<String>(),
4042                })
4043            })
4044            .collect();
4045
4046        let mut suggestions: Vec<Value> = Vec::new();
4047        if embed_rebuild > 0 {
4048            suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
4049        }
4050        if new_logs > 0 {
4051            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
4052        }
4053        if pending > 0 {
4054            suggestions.push(json!({"action": "innate approve <id>  # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
4055        }
4056        if !recurring_spark_ids.is_empty() {
4057            suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
4058        }
4059        if stale_screening > 0 {
4060            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
4061        }
4062        if governance_pending > 0 {
4063            suggestions.push(json!({
4064                "action": "review governance_proposals",
4065                "reason": format!("{governance_pending} chunk(s) have repeated negative feedback")
4066            }));
4067        }
4068
4069        Ok(json!({
4070            "schema_version": schema_version,
4071            "lib_id": lib_id,
4072            "last_agg_ts": last_agg,
4073            "chunks": {
4074                "total": total, "active": active, "pending": pending, "archived": archived,
4075                "pending_oldest_ts": pending_oldest_ts,
4076            },
4077            "sparks": sparks,
4078            "episodic_log": {"open": open_logs, "new": new_logs},
4079            "embed_rebuild_queue": embed_rebuild,
4080            "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
4081            "stale_screening_count": stale_screening,
4082            "feedback_loop": {
4083                "trace_completion_rate": ratio(trace_completed, trace_total),
4084                "usage_annotation_rate": ratio(usage_known, trace_completed),
4085                "trace_use_rate": ratio(usage_some, usage_known),
4086                "selected_to_used_rate": ratio(selected_used, selected_total),
4087                "task_success_rate": ratio(succeeded, outcome_known),
4088                "feedback_coverage": ratio(feedback_traces, trace_completed),
4089                "feedback_events": feedback_count,
4090                "timed_out_traces": trace_timed_out,
4091                "pending_evolve_requests": pending_evolve,
4092                "failed_evolve_requests_30d": failed_evolve,
4093                "failed_distill_logs_30d": failed_distill,
4094                "pending_governance_proposals": governance_pending,
4095                "window_days": 30,
4096                "confidence_distribution": {
4097                    "low": confidence_row.and_then(|row| row.get("low")).and_then(Value::as_i64).unwrap_or(0),
4098                    "medium": confidence_row.and_then(|row| row.get("medium")).and_then(Value::as_i64).unwrap_or(0),
4099                    "high": confidence_row.and_then(|row| row.get("high")).and_then(Value::as_i64).unwrap_or(0),
4100                }
4101            },
4102            "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
4103            "recurring_sparks": recurring_sparks.len(),
4104            "recurring_spark_ids": recurring_spark_ids,
4105            "params": {
4106                "recall.w_content": self.w_content,
4107                "recall.w_trigger": self.w_trigger,
4108                "recall.w_context": self.w_context,
4109                "recall.top_k_candidates": self.top_k_candidates,
4110                "curate.low_conf_threshold": self.low_conf_threshold,
4111                "curate.low_conf_idle_days": self.low_conf_idle_days,
4112                "curate.repeat_select_min": self.repeat_select_min,
4113                "curate.never_used_age_days": self.never_used_age_days,
4114                "curate.promote_used_success_min": self.promote_used_success_min,
4115                "curate.promote_confidence_min": self.promote_confidence_min,
4116                "curate.screening_timeout_minutes": self.screening_timeout_minutes,
4117                "curate.open_ttl_days": self.open_ttl_days,
4118                "evolve.schedule_interval_hours": self.evolve_schedule_interval_hours,
4119            },
4120            "suggestions": suggestions
4121        }))
4122    }
4123
4124    // ------------------------------------------------------------------
4125    // Public: rebuild_embeddings (evolve --rebuild-embeddings)
4126    // ------------------------------------------------------------------
4127
4128    pub fn rebuild_embeddings(&self) -> Result<usize> {
4129        let meta_version = self
4130            .storage
4131            .get_meta("embed_version")?
4132            .and_then(|v| v.parse::<i64>().ok())
4133            .unwrap_or(1);
4134        // Fetch chunks with embed_version=0 (failed writes) or below current meta version.
4135        let stale = self.storage.query_chunks_params(
4136            "SELECT id, content, trigger_desc, state_reason FROM chunks
4137             WHERE embed_version = 0 OR embed_version < ?",
4138            rusqlite::params![meta_version],
4139        )?;
4140        let mut count = 0;
4141        for row in &stale {
4142            let id = match row.get("id").and_then(Value::as_str) {
4143                Some(v) => v,
4144                None => continue,
4145            };
4146            let content = row.get("content").and_then(Value::as_str).unwrap_or("");
4147            let trigger = row
4148                .get("trigger_desc")
4149                .and_then(Value::as_str)
4150                .unwrap_or(content);
4151            let state_reason = row
4152                .get("state_reason")
4153                .and_then(Value::as_str)
4154                .unwrap_or("");
4155
4156            let cvec = match self.embedding.embed_content(content) {
4157                Ok(v) => v,
4158                Err(_) => continue,
4159            };
4160            let tvec = match self.embedding.embed_trigger(trigger) {
4161                Ok(v) => v,
4162                Err(_) => continue,
4163            };
4164
4165            self.storage.begin_immediate()?;
4166            let r = (|| -> Result<()> {
4167                self.storage
4168                    .insert_vec_content(id, &pack_embedding(&cvec))?;
4169                self.storage
4170                    .insert_vec_trigger(id, &pack_embedding(&tvec))?;
4171                // Restore intended state if encoded in state_reason.
4172                let new_reason = if state_reason.starts_with("embedding_pending:target=") {
4173                    let target_state = state_reason.trim_start_matches("embedding_pending:target=");
4174                    let now = utc_now_iso();
4175                    self.storage.update_chunk_state(
4176                        id,
4177                        target_state,
4178                        Some("embedding_rebuilt"),
4179                        &now,
4180                    )?;
4181                    "embedding_rebuilt".to_string()
4182                } else {
4183                    "embedding_rebuilt".to_string()
4184                };
4185                let now = utc_now_iso();
4186                self.storage.conn_execute(
4187                    "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
4188                    rusqlite::params![meta_version, new_reason, now, id],
4189                )?;
4190                self.storage.commit()
4191            })();
4192            if r.is_err() {
4193                let _ = self.storage.rollback();
4194            } else {
4195                count += 1;
4196            }
4197        }
4198        Ok(count)
4199    }
4200
4201    // ------------------------------------------------------------------
4202    // Public: inspect_id (inspect <chunk_id> or <trace_id>)
4203    // ------------------------------------------------------------------
4204
4205    pub fn inspect_id(&self, id: &str) -> Result<Value> {
4206        // Try as chunk_id first, then as trace_id.
4207        if let Some(chunk) = self.storage.get_chunk(id)? {
4208            let traces = self.storage.query_chunks_params(
4209                "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
4210                rusqlite::params![id],
4211            )?;
4212            let derived = self.storage.query_chunks_params(
4213                "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
4214                   SELECT id FROM episodic_log WHERE trace_id IN (
4215                     SELECT trace_id FROM usage_trace WHERE chunk_id=?
4216                   )
4217                 ) LIMIT 10",
4218                rusqlite::params![id],
4219            )?;
4220            return Ok(json!({
4221                "kind": "chunk",
4222                "chunk": chunk,
4223                "recent_traces": traces,
4224                "derived_chunks": derived,
4225            }));
4226        }
4227        // Try as trace_id.
4228        if let Some(log) = self.storage.get_episodic_log(id)? {
4229            let traces = self.storage.query_chunks_params(
4230                "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
4231                rusqlite::params![id],
4232            )?;
4233            return Ok(json!({
4234                "kind": "trace",
4235                "episodic_log": log,
4236                "usage_traces": traces,
4237            }));
4238        }
4239        Err(InnateError::ChunkNotFound(id.to_string()))
4240    }
4241
4242    // ------------------------------------------------------------------
4243    // Sanitize
4244    // ------------------------------------------------------------------
4245
4246    fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
4247        self.sanitizer.sanitize(content)
4248    }
4249}
4250
4251// ---------------------------------------------------------------------------
4252// Helpers
4253// ---------------------------------------------------------------------------
4254
4255struct CandidateInfo {
4256    chunk: Value,
4257    sim_content: f32,
4258    sim_trigger: f32,
4259}
4260
4261fn chunk_is_valid_for_recall(chunk: &Value, embed_version: i64) -> bool {
4262    chunk.get("state").and_then(Value::as_str) != Some("archived")
4263        && chunk.get("origin").and_then(Value::as_str) != Some("spark")
4264        && chunk
4265            .get("embed_version")
4266            .and_then(Value::as_i64)
4267            .unwrap_or(1)
4268            >= embed_version
4269}
4270
4271/// Normalize a query string before hashing into a context_key.
4272///
4273/// Goals: collapse whitespace variations and case differences so that
4274/// semantically equivalent queries (same words, different capitalisation or
4275/// spacing) accumulate statistics in the same context_stat bucket.
4276///
4277/// Deliberately conservative: no stemming, no stop-word removal. The canonical
4278/// query guidance in SKILL.md handles vocabulary consistency at the agent level.
4279fn normalize_query(query: &str) -> String {
4280    const STOP_WORDS: &[&str] = &[
4281        "a", "an", "and", "for", "in", "of", "on", "the", "to", "with",
4282    ];
4283    let cleaned: String = query
4284        .to_lowercase()
4285        .chars()
4286        .map(|ch| {
4287            if ch.is_alphanumeric() || ch.is_whitespace() {
4288                ch
4289            } else {
4290                ' '
4291            }
4292        })
4293        .collect();
4294    let mut tokens: Vec<&str> = cleaned
4295        .split_whitespace()
4296        .filter(|token| !STOP_WORDS.contains(token))
4297        .collect();
4298    tokens.sort_unstable();
4299    tokens.dedup();
4300    tokens.join(" ")
4301}
4302
4303fn estimate_distill_prompt_tokens(log: &Value, related_logs: &[Value]) -> i64 {
4304    let primary: i64 = [
4305        "query",
4306        "recall_snapshot",
4307        "output",
4308        "output_summary",
4309        "nomination",
4310    ]
4311    .iter()
4312    .filter_map(|key| log.get(*key).and_then(Value::as_str))
4313    .map(|text| estimate_tokens(text) as i64)
4314    .sum();
4315    let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
4316    let context_key = log.get("context_key").and_then(Value::as_str);
4317    let related: i64 = related_logs
4318        .iter()
4319        .filter(|other| other.get("id").and_then(Value::as_str).unwrap_or("") != log_id)
4320        .filter(|other| {
4321            context_key.is_some()
4322                && other.get("context_key").and_then(Value::as_str) == context_key
4323        })
4324        .take(4)
4325        .flat_map(|other| {
4326            ["query", "output_summary", "outcome"]
4327                .into_iter()
4328                .filter_map(|key| other.get(key).and_then(Value::as_str))
4329        })
4330        .map(|text| estimate_tokens(text) as i64)
4331        .sum();
4332    primary + related
4333}
4334
4335fn estimate_distilled_chunk_tokens(chunk: &DistilledChunk) -> i64 {
4336    estimate_tokens(&chunk.content) as i64
4337        + chunk
4338            .trigger_desc
4339            .as_deref()
4340            .map(estimate_tokens)
4341            .unwrap_or(0) as i64
4342        + chunk
4343            .anti_trigger_desc
4344            .as_deref()
4345            .map(estimate_tokens)
4346            .unwrap_or(0) as i64
4347}
4348
4349fn anti_trigger_hit(query: &str, anti: &str) -> bool {
4350    let q_lower = query.to_lowercase();
4351    anti.to_lowercase().split(',').any(|part| {
4352        let p = part.trim();
4353        !p.is_empty() && q_lower.contains(p)
4354    })
4355}
4356
4357fn block_cost(block: &[Value]) -> usize {
4358    block
4359        .iter()
4360        .map(|b| {
4361            b.get("token_count")
4362                .and_then(Value::as_u64)
4363                .map(|t| t as usize)
4364                .unwrap_or_else(|| {
4365                    estimate_tokens(b.get("content").and_then(Value::as_str).unwrap_or("")).max(100)
4366                })
4367        })
4368        .sum()
4369}
4370
4371fn limit_knowledge(knowledge: Vec<Value>, top: Option<usize>) -> Vec<Value> {
4372    match top {
4373        None => knowledge,
4374        Some(0) => vec![],
4375        Some(n) => knowledge.into_iter().take(n).collect(),
4376    }
4377}
4378
4379fn usage_state(used: Option<&[String]>) -> &'static str {
4380    match used {
4381        None => "unknown",
4382        Some([]) => "known_none",
4383        Some(_) => "known_some",
4384    }
4385}
4386
4387fn ratio(numerator: i64, denominator: i64) -> f64 {
4388    if denominator <= 0 {
4389        0.0
4390    } else {
4391        ((numerator as f64 / denominator as f64) * 1000.0).round() / 1000.0
4392    }
4393}
4394
4395fn validate_source(source: &str) -> Result<()> {
4396    if !matches!(
4397        source,
4398        "mcp" | "sdk" | "cli" | "hook" | "daemon" | "augmented"
4399    ) {
4400        return Err(InnateError::InvalidState(format!(
4401            "invalid event source: {source}"
4402        )));
4403    }
4404    Ok(())
4405}
4406
4407fn count_query(storage: &Storage, sql: &str) -> Result<i64> {
4408    Ok(storage
4409        .query_chunks(sql)?
4410        .first()
4411        .and_then(|r| r.as_object())
4412        .and_then(|m| m.values().next())
4413        .and_then(Value::as_i64)
4414        .unwrap_or(0))
4415}
4416
4417fn count_query_params<P: rusqlite::Params>(storage: &Storage, sql: &str, p: P) -> Result<i64> {
4418    Ok(storage
4419        .query_chunks_params(sql, p)?
4420        .first()
4421        .and_then(|r| r.as_object())
4422        .and_then(|m| m.values().next())
4423        .and_then(Value::as_i64)
4424        .unwrap_or(0))
4425}
4426
4427fn days_ago(now_iso: &str, days: i64) -> String {
4428    use chrono::{DateTime, Duration, Utc};
4429    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4430        let cutoff = t - Duration::days(days);
4431        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4432    }
4433    now_iso.to_string()
4434}
4435
4436fn minutes_ago(now_iso: &str, minutes: i64) -> String {
4437    use chrono::{DateTime, Duration, Utc};
4438    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4439        let cutoff = t - Duration::minutes(minutes);
4440        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4441    }
4442    now_iso.to_string()
4443}
4444
4445fn hours_ago(now_iso: &str, hours: i64) -> String {
4446    use chrono::{DateTime, Duration, Utc};
4447    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4448        let cutoff = t - Duration::hours(hours);
4449        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4450    }
4451    now_iso.to_string()
4452}
4453
4454fn minutes_after(now_iso: &str, minutes: i64) -> String {
4455    use chrono::{DateTime, Duration, Utc};
4456    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4457        let cutoff = t + Duration::minutes(minutes);
4458        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4459    }
4460    now_iso.to_string()
4461}
4462
4463fn hours_after(now_iso: &str, hours: i64) -> String {
4464    use chrono::{DateTime, Duration, Utc};
4465    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4466        let cutoff = t + Duration::hours(hours);
4467        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4468    }
4469    now_iso.to_string()
4470}
4471
4472/// Return the number of whole days between two ISO timestamps (now - past; clamped ≥ 0).
4473fn iso_days_diff(now_iso: &str, past_iso: &str) -> i64 {
4474    use chrono::{DateTime, Utc};
4475    let parse = |s: &str| s.parse::<DateTime<Utc>>().ok();
4476    if let (Some(a), Some(b)) = (parse(now_iso), parse(past_iso)) {
4477        let diff = a - b;
4478        diff.num_days().max(0)
4479    } else {
4480        0
4481    }
4482}
4483
4484/// DFS-based cycle detection on the hard-dep graph. Returns list of cycles (each is a Vec of ids).
4485fn detect_cycles(deps: &[Value]) -> Vec<Vec<String>> {
4486    use std::collections::HashMap;
4487    let mut adj: HashMap<String, Vec<String>> = HashMap::new();
4488    for d in deps {
4489        let src = d
4490            .get("src")
4491            .and_then(Value::as_str)
4492            .unwrap_or("")
4493            .to_string();
4494        let dst = d
4495            .get("dst")
4496            .and_then(Value::as_str)
4497            .unwrap_or("")
4498            .to_string();
4499        if !src.is_empty() && !dst.is_empty() {
4500            adj.entry(src).or_default().push(dst);
4501        }
4502    }
4503    let nodes: Vec<String> = adj.keys().cloned().collect();
4504    let mut visited: HashSet<String> = HashSet::new();
4505    let mut on_stack: HashSet<String> = HashSet::new();
4506    let mut cycles: Vec<Vec<String>> = vec![];
4507
4508    fn dfs(
4509        node: &str,
4510        adj: &HashMap<String, Vec<String>>,
4511        visited: &mut HashSet<String>,
4512        on_stack: &mut HashSet<String>,
4513        path: &mut Vec<String>,
4514        cycles: &mut Vec<Vec<String>>,
4515    ) {
4516        if on_stack.contains(node) {
4517            // Found cycle — extract loop segment.
4518            let start = path.iter().position(|n| n == node).unwrap_or(0);
4519            cycles.push(path[start..].to_vec());
4520            return;
4521        }
4522        if visited.contains(node) {
4523            return;
4524        }
4525        visited.insert(node.to_string());
4526        on_stack.insert(node.to_string());
4527        path.push(node.to_string());
4528        if let Some(children) = adj.get(node) {
4529            for child in children {
4530                dfs(child, adj, visited, on_stack, path, cycles);
4531            }
4532        }
4533        path.pop();
4534        on_stack.remove(node);
4535    }
4536
4537    for node in nodes {
4538        let mut path = vec![];
4539        dfs(
4540            &node,
4541            &adj,
4542            &mut visited,
4543            &mut on_stack,
4544            &mut path,
4545            &mut cycles,
4546        );
4547    }
4548    cycles
4549}