Skip to main content

innate_core/
kb.rs

1//! KnowledgeBase — all 8 Public APIs.
2
3/// Return type for pack(): (selected_chunks, skipped_groups, skip_reasons)
4type PackResult = (
5    Vec<Value>,
6    Vec<(Vec<Value>, f64, usize)>,
7    std::collections::HashMap<String, String>,
8);
9
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::embedding::{DummyEmbeddingProvider, EmbeddingProvider};
17use crate::errors::{InnateError, Result};
18use crate::refine::{
19    DefaultSanitizer, DistilledChunk, Distiller, HeuristicDistiller, NullRefiner, Refiner,
20    Sanitizer,
21};
22use crate::storage::{ChunkRow, EpisodicLogRow, Storage};
23use crate::utils::{
24    content_hash, estimate_tokens, gen_uuid, pack_embedding, utc_now_iso, SanitizeAction,
25};
26
27// ---------------------------------------------------------------------------
28// Tuning defaults
29// ---------------------------------------------------------------------------
30
31const W_CONTENT: f64 = 0.65;
32const W_TRIGGER: f64 = 0.25;
33const W_CONFIDENCE: f64 = 0.10;
34const TOP_K_CANDIDATES: usize = 20;
35const ANTI_TRIGGER_PENALTY: f64 = 0.6;
36const DENSITY_REFILL: bool = true;
37
38const LOW_CONF_THRESHOLD: f64 = 0.25;
39const LOW_CONF_IDLE_DAYS: i64 = 60;
40const REPEAT_SELECT_MIN: i64 = 10;
41const REPEAT_SELECT_CONF_MAX: f64 = 0.5;
42const NEVER_USED_AGE_DAYS: i64 = 30;
43const OPEN_TTL_DAYS: i64 = 7;
44const SCREENING_TIMEOUT_MINUTES: i64 = 30;
45const PROMOTE_USED_SUCCESS_MIN: i64 = 3;
46const PROMOTE_CONFIDENCE_MIN: f64 = 0.65;
47const EVOLVE_THRESHOLD: i64 = 5;
48const DISTILL_BATCH_SIZE: usize = 20;
49
50// ---------------------------------------------------------------------------
51// Public result types
52// ---------------------------------------------------------------------------
53
54#[derive(Debug, Default, Clone)]
55pub struct RecallResult {
56    pub knowledge: Vec<Value>,
57    pub sparks: Vec<Value>,
58    pub trace_id: String,
59    pub empty: bool,
60    pub depth_skipped: Vec<String>,
61    pub skipped_reasons: HashMap<String, String>,
62}
63
64#[derive(Debug, Default)]
65pub struct CurateReport {
66    pub archived: Vec<String>,
67    pub deduped: Vec<String>,
68    pub decayed: Vec<String>,
69    pub cycles: Vec<Vec<String>>,
70    pub orphans: Vec<String>,
71    pub recovered: Vec<String>,
72    pub warnings: Vec<String>,
73    pub stats: HashMap<String, Value>,
74}
75
76/// Scope for a single Curate run — allows limiting governance to a subset of chunks.
77#[derive(Debug, Default, Clone)]
78pub struct CurateScope {
79    /// If set, only process chunks with this origin (e.g. "distilled").
80    pub origin: Option<String>,
81    /// If set, only process chunks belonging to this skill.
82    pub skill_name: Option<String>,
83    /// When true, compute the report but do not write any changes.
84    pub dry_run: bool,
85}
86
87/// Replaceable governance interface (§二·六). Inject via `KnowledgeBase::open_with`.
88/// Default implementation: `BuiltinCurator`.
89pub trait Curator: Send + Sync {
90    fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport>;
91}
92
93/// Built-in curator — implements the full §四 governance pipeline.
94pub struct BuiltinCurator;
95
96impl Curator for BuiltinCurator {
97    fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport> {
98        kb.builtin_curate_impl(scope)
99    }
100}
101
102// ---------------------------------------------------------------------------
103// KnowledgeBase
104// ---------------------------------------------------------------------------
105
106pub struct KnowledgeBase {
107    pub storage: Storage,
108    embedding: Arc<dyn EmbeddingProvider>,
109    refiner: Arc<dyn Refiner>,
110    distiller: Arc<dyn Distiller>,
111    curator: Arc<dyn Curator>,
112    sanitizer: Arc<dyn Sanitizer>,
113
114    // Tuning params (loaded from meta at init)
115    w_content: f64,
116    w_trigger: f64,
117    w_confidence: f64,
118    top_k_candidates: usize,
119    anti_trigger_penalty: f64,
120    density_refill: bool,
121
122    low_conf_threshold: f64,
123    low_conf_idle_days: i64,
124    repeat_select_min: i64,
125    repeat_select_conf_max: f64,
126    never_used_age_days: i64,
127    open_ttl_days: i64,
128    screening_timeout_minutes: i64,
129    promote_used_success_min: i64,
130    promote_confidence_min: f64,
131    evolve_threshold: i64,
132    distill_batch_size: usize,
133}
134
135impl KnowledgeBase {
136    pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
137        Self::open_with(db_path, None, None, None, None, None)
138    }
139
140    pub fn open_with(
141        db_path: impl AsRef<Path>,
142        embedding: Option<Arc<dyn EmbeddingProvider>>,
143        refiner: Option<Arc<dyn Refiner>>,
144        distiller: Option<Arc<dyn Distiller>>,
145        curator: Option<Arc<dyn Curator>>,
146        sanitizer: Option<Arc<dyn Sanitizer>>,
147    ) -> Result<Self> {
148        let embedding = embedding.unwrap_or_else(|| Arc::new(DummyEmbeddingProvider::default()));
149        let refiner = refiner.unwrap_or_else(|| Arc::new(NullRefiner));
150        let distiller = distiller.unwrap_or_else(|| Arc::new(HeuristicDistiller));
151        let curator = curator.unwrap_or_else(|| Arc::new(BuiltinCurator));
152        let sanitizer = sanitizer.unwrap_or_else(|| Arc::new(DefaultSanitizer));
153
154        let storage = Storage::open(db_path, embedding.content_dim(), embedding.trigger_dim())?;
155
156        let mut kb = Self {
157            storage,
158            embedding,
159            refiner,
160            distiller,
161            curator,
162            sanitizer,
163            w_content: W_CONTENT,
164            w_trigger: W_TRIGGER,
165            w_confidence: W_CONFIDENCE,
166            top_k_candidates: TOP_K_CANDIDATES,
167            anti_trigger_penalty: ANTI_TRIGGER_PENALTY,
168            density_refill: DENSITY_REFILL,
169            low_conf_threshold: LOW_CONF_THRESHOLD,
170            low_conf_idle_days: LOW_CONF_IDLE_DAYS,
171            repeat_select_min: REPEAT_SELECT_MIN,
172            repeat_select_conf_max: REPEAT_SELECT_CONF_MAX,
173            never_used_age_days: NEVER_USED_AGE_DAYS,
174            open_ttl_days: OPEN_TTL_DAYS,
175            screening_timeout_minutes: SCREENING_TIMEOUT_MINUTES,
176            promote_used_success_min: PROMOTE_USED_SUCCESS_MIN,
177            promote_confidence_min: PROMOTE_CONFIDENCE_MIN,
178            evolve_threshold: EVOLVE_THRESHOLD,
179            distill_batch_size: DISTILL_BATCH_SIZE,
180        };
181        kb.init_meta()?;
182        kb.load_params()?;
183        Ok(kb)
184    }
185
186    fn init_meta(&self) -> Result<()> {
187        let lib_id = gen_uuid();
188        let content_dim = self.embedding.content_dim().to_string();
189        let trigger_dim = self.embedding.trigger_dim().to_string();
190        let embed_model = self.embedding.model_name();
191
192        for (key, expected) in [
193            ("content_dim", self.embedding.content_dim()),
194            ("trigger_dim", self.embedding.trigger_dim()),
195        ] {
196            if let Some(stored) = self.storage.get_meta(key)? {
197                let actual = stored.parse::<usize>().map_err(|_| {
198                    InnateError::Other(format!("invalid {key} metadata value: {stored}"))
199                })?;
200                if actual != expected {
201                    return Err(InnateError::Other(format!(
202                        "{key} mismatch: database uses {actual}, embedding provider uses {expected}"
203                    )));
204                }
205            }
206        }
207
208        let defaults: &[(&str, &str)] = &[
209            ("lib_id", &lib_id),
210            ("lib_role", "personal"),
211            ("schema_version", "4.5.1"),
212            ("content_dim", &content_dim),
213            ("trigger_dim", &trigger_dim),
214            ("embed_model", embed_model),
215            ("embed_version", "1"),
216            ("last_agg_ts", "1970-01-01T00:00:00.000Z"),
217            ("recall.w_content", "0.65"),
218            ("recall.w_trigger", "0.25"),
219            ("recall.w_confidence", "0.10"),
220            ("recall.top_k_candidates", "20"),
221            ("recall.anti_trigger_penalty", "0.6"),
222            ("recall.density_refill", "true"),
223            ("curate.low_conf_threshold", "0.25"),
224            ("curate.low_conf_idle_days", "60"),
225            ("curate.repeat_select_min", "10"),
226            ("curate.repeat_select_conf_max", "0.5"),
227            ("curate.never_used_age_days", "30"),
228            ("curate.open_ttl_days", "7"),
229            ("curate.screening_timeout_minutes", "30"),
230            ("curate.promote_used_success_min", "3"),
231            ("curate.promote_confidence_min", "0.65"),
232            ("evolve.threshold_new_count", "5"),
233            ("evolve.distill_batch_size", "20"),
234            ("curate.soft_mature_threshold", "5"),
235        ];
236        self.storage.begin_immediate()?;
237        let result = (|| -> Result<()> {
238            for (k, v) in defaults {
239                if self.storage.get_meta(k)?.is_none() {
240                    self.storage.set_meta(k, v)?;
241                }
242            }
243            self.storage.commit()
244        })();
245        if result.is_err() {
246            let _ = self.storage.rollback();
247        }
248        result
249    }
250
251    fn load_params(&mut self) -> Result<()> {
252        let f = |k: &str, d: f64| -> f64 {
253            self.storage
254                .get_meta(k)
255                .ok()
256                .flatten()
257                .and_then(|v| v.parse().ok())
258                .unwrap_or(d)
259        };
260        let i = |k: &str, d: i64| -> i64 {
261            self.storage
262                .get_meta(k)
263                .ok()
264                .flatten()
265                .and_then(|v| v.parse().ok())
266                .unwrap_or(d)
267        };
268        let b = |k: &str, d: bool| -> bool {
269            self.storage
270                .get_meta(k)
271                .ok()
272                .flatten()
273                .map(|v| v.to_lowercase() == "true")
274                .unwrap_or(d)
275        };
276        self.w_content = f("recall.w_content", W_CONTENT);
277        self.w_trigger = f("recall.w_trigger", W_TRIGGER);
278        self.w_confidence = f("recall.w_confidence", W_CONFIDENCE);
279        self.top_k_candidates = i("recall.top_k_candidates", TOP_K_CANDIDATES as i64) as usize;
280        self.anti_trigger_penalty = f("recall.anti_trigger_penalty", ANTI_TRIGGER_PENALTY);
281        self.density_refill = b("recall.density_refill", DENSITY_REFILL);
282        self.low_conf_threshold = f("curate.low_conf_threshold", LOW_CONF_THRESHOLD);
283        self.low_conf_idle_days = i("curate.low_conf_idle_days", LOW_CONF_IDLE_DAYS);
284        self.repeat_select_min = i("curate.repeat_select_min", REPEAT_SELECT_MIN);
285        self.repeat_select_conf_max = f("curate.repeat_select_conf_max", REPEAT_SELECT_CONF_MAX);
286        self.never_used_age_days = i("curate.never_used_age_days", NEVER_USED_AGE_DAYS);
287        self.open_ttl_days = i("curate.open_ttl_days", OPEN_TTL_DAYS);
288        self.screening_timeout_minutes = i(
289            "curate.screening_timeout_minutes",
290            SCREENING_TIMEOUT_MINUTES,
291        );
292        self.promote_used_success_min =
293            i("curate.promote_used_success_min", PROMOTE_USED_SUCCESS_MIN);
294        self.promote_confidence_min = f("curate.promote_confidence_min", PROMOTE_CONFIDENCE_MIN);
295        self.evolve_threshold = i("evolve.threshold_new_count", EVOLVE_THRESHOLD);
296        self.distill_batch_size =
297            i("evolve.distill_batch_size", DISTILL_BATCH_SIZE as i64) as usize;
298        Ok(())
299    }
300
301    // ------------------------------------------------------------------
302    // Public API 1: recall
303    // ------------------------------------------------------------------
304
305    #[allow(clippy::too_many_arguments)]
306    pub fn recall(
307        &self,
308        query: &str,
309        budget: usize,
310        trace: bool,
311        include_sparks: bool,
312        top: Option<usize>,
313        source: &str,
314        expand_deps: &str, // "false" | "direct" | "closure"
315        allow_trim: bool,  // if true, invoke Refiner::trim when block doesn't fit
316        refine_mode: &str, // "off" | "trim" | "adapt" — recorded in trace
317    ) -> Result<RecallResult> {
318        validate_source(source)?;
319        let trace_id = gen_uuid();
320        let now = utc_now_iso();
321
322        let q_content = self
323            .embedding
324            .embed_content(query)
325            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
326        let q_trigger = self
327            .embedding
328            .embed_trigger(query)
329            .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
330
331        // ANN candidates (non-spark)
332        let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
333        self.apply_soft_dep_bonus(&mut candidates)?;
334
335        // Score + anti-trigger penalty
336        let scored = self.score_candidates(candidates, query);
337
338        // First-fit pack with dep expansion
339        let (selected, skipped, skipped_reasons) =
340            self.pack(&scored, budget, expand_deps, allow_trim, query)?;
341
342        let depth_skipped: Vec<String> = skipped_reasons
343            .iter()
344            .filter(|(_, r)| r.as_str() == "dep_depth_limit")
345            .map(|(id, _)| id.clone())
346            .collect();
347
348        // Density refill
349        let mut selected = selected;
350        if self.density_refill {
351            selected = self.density_refill(selected, &skipped, budget);
352        }
353
354        let limited = limit_knowledge(selected, top);
355        let visible = if refine_mode == "adapt" {
356            self.refiner
357                .refine(limited.clone(), Some(budget))
358                .unwrap_or(limited)
359        } else {
360            limited
361        };
362
363        // Sparks
364        let sparks = if include_sparks {
365            self.recall_sparks(&q_content, &q_trigger)?
366        } else {
367            vec![]
368        };
369
370        if trace {
371            self.write_recall_trace(
372                &trace_id,
373                query,
374                &scored,
375                &visible,
376                &sparks,
377                &depth_skipped,
378                &skipped_reasons,
379                refine_mode,
380                source,
381                &now,
382            )?;
383        }
384
385        let empty = visible.is_empty() && sparks.is_empty();
386        Ok(RecallResult {
387            knowledge: visible,
388            sparks,
389            trace_id,
390            empty,
391            depth_skipped,
392            skipped_reasons,
393        })
394    }
395
396    fn ann_candidates(
397        &self,
398        q_content: &[f32],
399        q_trigger: &[f32],
400    ) -> Result<HashMap<String, CandidateInfo>> {
401        let embed_version = self
402            .storage
403            .get_meta("embed_version")?
404            .and_then(|v| v.parse::<i64>().ok())
405            .unwrap_or(1);
406
407        let content_res = self
408            .storage
409            .search_vec_content(q_content, self.top_k_candidates * 2)?;
410        let trigger_res = self
411            .storage
412            .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
413
414        let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
415
416        for (cid, sim) in &content_res {
417            if let Some(chunk) = self.storage.get_chunk(cid)? {
418                if chunk_is_valid_for_recall(&chunk, embed_version) {
419                    let e = candidates
420                        .entry(cid.clone())
421                        .or_insert_with(|| CandidateInfo {
422                            chunk: chunk.clone(),
423                            sim_content: 0.0,
424                            sim_trigger: 0.0,
425                        });
426                    e.sim_content = e.sim_content.max(*sim);
427                }
428            }
429        }
430        for (cid, sim) in &trigger_res {
431            if let Some(chunk) = self.storage.get_chunk(cid)? {
432                if chunk_is_valid_for_recall(&chunk, embed_version) {
433                    let e = candidates
434                        .entry(cid.clone())
435                        .or_insert_with(|| CandidateInfo {
436                            chunk: chunk.clone(),
437                            sim_content: 0.0,
438                            sim_trigger: 0.0,
439                        });
440                    e.sim_trigger = e.sim_trigger.max(*sim);
441                }
442            }
443        }
444        Ok(candidates)
445    }
446
447    fn apply_soft_dep_bonus(&self, candidates: &mut HashMap<String, CandidateInfo>) -> Result<()> {
448        let ids: Vec<String> = candidates.keys().cloned().collect();
449        for cid in ids {
450            if candidates[&cid].chunk.get("origin").and_then(Value::as_str) == Some("spark") {
451                continue;
452            }
453            let deps = self.storage.get_deps(&cid)?;
454            for (dst, kind, _) in &deps {
455                if kind != "soft" {
456                    continue;
457                }
458                if let Some(target) = self.storage.get_chunk(dst)? {
459                    if target.get("state").and_then(Value::as_str) == Some("archived") {
460                        continue;
461                    }
462                    if target.get("origin").and_then(Value::as_str) == Some("spark") {
463                        continue;
464                    }
465                    let e = candidates
466                        .entry(dst.clone())
467                        .or_insert_with(|| CandidateInfo {
468                            chunk: target,
469                            sim_content: 0.0,
470                            sim_trigger: 0.0,
471                        });
472                    e.sim_content = (e.sim_content + 0.05).min(1.0);
473                }
474            }
475        }
476        Ok(())
477    }
478
479    fn score_candidates(
480        &self,
481        candidates: HashMap<String, CandidateInfo>,
482        query: &str,
483    ) -> Vec<(f64, Value)> {
484        let mut scored: Vec<(f64, Value)> = candidates
485            .into_values()
486            .map(|info| {
487                let conf = info
488                    .chunk
489                    .get("confidence")
490                    .and_then(Value::as_f64)
491                    .unwrap_or(0.5);
492                let mut fused = self.w_content * info.sim_content as f64
493                    + self.w_trigger * info.sim_trigger as f64
494                    + self.w_confidence * conf;
495                let anti = info
496                    .chunk
497                    .get("anti_trigger_desc")
498                    .and_then(Value::as_str)
499                    .unwrap_or("");
500                if !anti.is_empty() && anti_trigger_hit(query, anti) {
501                    fused *= self.anti_trigger_penalty;
502                }
503                let mut chunk = info.chunk;
504                chunk["_fused_score"] = json!(fused);
505                (fused, chunk)
506            })
507            .collect();
508        scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
509        scored.truncate(self.top_k_candidates);
510        scored
511    }
512
513    fn pack(
514        &self,
515        scored: &[(f64, Value)],
516        budget: usize,
517        expand_deps: &str,
518        allow_trim: bool,
519        query: &str,
520    ) -> Result<PackResult> {
521        let mut selected: Vec<Value> = vec![];
522        let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
523        let mut skipped_reasons: HashMap<String, String> = HashMap::new();
524        let mut used_ids: HashSet<String> = HashSet::new();
525        let mut used_tokens: usize = 0;
526
527        for (fused, chunk) in scored {
528            let cid = chunk["id"].as_str().unwrap_or("").to_string();
529            if used_ids.contains(&cid) {
530                continue;
531            }
532
533            // Build block with dep expansion; fail-closed on dep issues.
534            let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
535            if let Some(reason) = dep_skip_reason {
536                skipped_reasons.insert(cid, reason);
537                continue;
538            }
539
540            let new_block: Vec<Value> = block
541                .iter()
542                .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
543                .cloned()
544                .collect();
545            let cost = block_cost(&new_block);
546
547            if used_tokens + cost <= budget {
548                for b in &block {
549                    let bid = b["id"].as_str().unwrap_or("").to_string();
550                    if !used_ids.contains(&bid) {
551                        let mut b = b.clone();
552                        b["_fused_score"] = json!(fused);
553                        selected.push(b);
554                        used_ids.insert(bid);
555                    }
556                }
557                used_tokens += cost;
558            } else if allow_trim {
559                // Attempt refiner trim — NullRefiner returns None (no-op).
560                if let Some(trimmed) =
561                    self.refiner
562                        .trim(&block, query, budget.saturating_sub(used_tokens))
563                {
564                    let trim_cost = block_cost(&trimmed);
565                    if used_tokens + trim_cost <= budget {
566                        for b in &trimmed {
567                            let bid = b["id"].as_str().unwrap_or("").to_string();
568                            if !used_ids.contains(&bid) {
569                                let mut b = b.clone();
570                                b["_fused_score"] = json!(fused);
571                                b["_trimmed"] = json!(true);
572                                selected.push(b);
573                                used_ids.insert(bid);
574                            }
575                        }
576                        used_tokens += trim_cost;
577                        continue;
578                    }
579                }
580                skipped.push((block, *fused, cost));
581            } else {
582                skipped.push((block, *fused, cost));
583            }
584        }
585        Ok((selected, skipped, skipped_reasons))
586    }
587
588    /// Expand a seed chunk into a block according to `expand_deps`.
589    /// Returns `(block, Some(skip_reason))` if the block should be discarded (fail-closed).
590    fn build_dep_block(
591        &self,
592        seed: &Value,
593        expand_deps: &str,
594    ) -> Result<(Vec<Value>, Option<String>)> {
595        if expand_deps == "false" || expand_deps.is_empty() {
596            return Ok((vec![seed.clone()], None));
597        }
598        let seed_id = seed["id"].as_str().unwrap_or("");
599        match expand_deps {
600            "direct" => {
601                let deps = self.storage.get_deps(seed_id)?;
602                let mut block = vec![seed.clone()];
603                for (dep_id, kind, _) in &deps {
604                    if kind != "hard" {
605                        continue;
606                    }
607                    match self.validate_hard_dep(dep_id)? {
608                        Some(chunk) => block.push(chunk),
609                        None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
610                    }
611                }
612                Ok((block, None))
613            }
614            "closure" => {
615                let mut block = vec![seed.clone()];
616                let mut visited: HashSet<String> = [seed_id.to_string()].into();
617                match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
618                    Some(reason) => Ok((vec![], Some(reason))),
619                    None => Ok((block, None)),
620                }
621            }
622            _ => Ok((vec![seed.clone()], None)),
623        }
624    }
625
626    /// Returns the chunk if the hard dep is usable, None if it should cause fail-closed.
627    fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
628        match self.storage.get_chunk(dep_id)? {
629            None => Ok(None),
630            Some(chunk) => {
631                let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
632                let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
633                let embed_v = chunk
634                    .get("embed_version")
635                    .and_then(Value::as_i64)
636                    .unwrap_or(0);
637                if state == "archived" || origin == "spark" || embed_v == 0 {
638                    Ok(None)
639                } else {
640                    Ok(Some(chunk))
641                }
642            }
643        }
644    }
645
646    /// BFS hard-dep expansion up to `max_depth`. Returns Some(reason) on fail-closed.
647    fn expand_hard_closure(
648        &self,
649        id: &str,
650        visited: &mut HashSet<String>,
651        block: &mut Vec<Value>,
652        depth: usize,
653        max_depth: usize,
654    ) -> Result<Option<String>> {
655        if depth >= max_depth {
656            return Ok(Some("dep_depth_limit".to_string()));
657        }
658        let deps = self.storage.get_deps(id)?;
659        for (dep_id, kind, _) in &deps {
660            if kind != "hard" {
661                continue;
662            }
663            if visited.contains(dep_id) {
664                continue;
665            } // cycle guard
666            visited.insert(dep_id.clone());
667            match self.validate_hard_dep(dep_id)? {
668                None => return Ok(Some("hard_dep_unavailable".to_string())),
669                Some(chunk) => {
670                    block.push(chunk);
671                    if let Some(reason) =
672                        self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
673                    {
674                        return Ok(Some(reason));
675                    }
676                }
677            }
678        }
679        Ok(None)
680    }
681
682    fn density_refill(
683        &self,
684        mut selected: Vec<Value>,
685        skipped: &[(Vec<Value>, f64, usize)],
686        budget: usize,
687    ) -> Vec<Value> {
688        let used_tokens = block_cost(&selected);
689        if used_tokens >= budget {
690            return selected;
691        }
692
693        let selected_ids: HashSet<String> = selected
694            .iter()
695            .filter_map(|c| c["id"].as_str().map(str::to_string))
696            .collect();
697
698        let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
699            .iter()
700            .filter_map(|(block, fscore, _)| {
701                let block: Vec<Value> = block
702                    .iter()
703                    .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
704                    .cloned()
705                    .collect();
706                if block.is_empty() {
707                    return None;
708                }
709                let cost = block_cost(&block);
710                let density = fscore / cost.max(1) as f64;
711                Some((density, block, cost))
712            })
713            .collect();
714        density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
715
716        let mut used_tokens = block_cost(&selected);
717        let mut added_ids: HashSet<String> = selected_ids;
718        for (_, block, cost) in density_items {
719            if used_tokens + cost <= budget {
720                for b in block {
721                    let bid = b["id"].as_str().unwrap_or("").to_string();
722                    if !added_ids.contains(&bid) {
723                        selected.push(b);
724                        added_ids.insert(bid);
725                    }
726                }
727                used_tokens += cost;
728            }
729        }
730        selected
731    }
732
733    fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
734        let embed_version = self
735            .storage
736            .get_meta("embed_version")?
737            .and_then(|v| v.parse::<i64>().ok())
738            .unwrap_or(1);
739
740        let content_res = self
741            .storage
742            .search_vec_content(q_content, self.top_k_candidates)?;
743        let trigger_res = self
744            .storage
745            .search_vec_trigger(q_trigger, self.top_k_candidates)?;
746
747        let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
748        for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
749            if let Some(chunk) = self.storage.get_chunk(cid)? {
750                if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
751                    continue;
752                }
753                if chunk.get("state").and_then(Value::as_str) == Some("archived") {
754                    continue;
755                }
756                let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
757                if maturity == "promoted" || maturity == "dropped" {
758                    continue;
759                }
760                let ev = chunk
761                    .get("embed_version")
762                    .and_then(Value::as_i64)
763                    .unwrap_or(1);
764                if ev < embed_version {
765                    continue;
766                }
767                let entry = spark_scores
768                    .entry(cid.clone())
769                    .or_insert_with(|| (*sim, chunk.clone()));
770                if *sim > entry.0 {
771                    *entry = (*sim, chunk);
772                }
773            }
774        }
775        let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
776        sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
777        Ok(sparks
778            .into_iter()
779            .take(self.top_k_candidates)
780            .map(|(_, c)| c)
781            .collect())
782    }
783
784    #[allow(clippy::too_many_arguments)]
785    fn write_recall_trace(
786        &self,
787        trace_id: &str,
788        query: &str,
789        scored: &[(f64, Value)],
790        visible: &[Value],
791        sparks: &[Value],
792        depth_skipped: &[String],
793        skipped_reasons: &HashMap<String, String>,
794        refine_mode: &str,
795        source: &str,
796        now: &str,
797    ) -> Result<()> {
798        let lib_id = self.storage.lib_id()?;
799        self.storage.begin_immediate()?;
800        let result = (|| -> Result<()> {
801            for (rank, (_, chunk)) in scored.iter().enumerate() {
802                let cid = chunk["id"].as_str().unwrap_or("");
803                let sim = chunk.get("_fused_score").and_then(Value::as_f64);
804                // For dep-skipped seeds, record their skip reason as refine_mode.
805                let rm = skipped_reasons
806                    .get(cid)
807                    .map(|r| format!("skipped:{r}"))
808                    .or_else(|| {
809                        if refine_mode != "off" && !refine_mode.is_empty() {
810                            Some(refine_mode.to_string())
811                        } else {
812                            None
813                        }
814                    });
815                self.storage.insert_usage_trace(
816                    trace_id,
817                    Some(cid),
818                    "retrieved",
819                    1.0,
820                    sim,
821                    rm.as_deref(),
822                    None,
823                    Some((rank + 1) as i64),
824                    source,
825                    now,
826                )?;
827            }
828            for (rank, chunk) in visible.iter().enumerate() {
829                let cid = chunk["id"].as_str().unwrap_or("");
830                self.storage.insert_usage_trace(
831                    trace_id,
832                    Some(cid),
833                    "selected",
834                    1.0,
835                    None,
836                    None,
837                    None,
838                    Some((rank + 1) as i64),
839                    source,
840                    now,
841                )?;
842                // Write 'refined' event for chunks that came through the trim path.
843                if chunk
844                    .get("_trimmed")
845                    .and_then(Value::as_bool)
846                    .unwrap_or(false)
847                {
848                    self.storage.insert_usage_trace(
849                        trace_id,
850                        Some(cid),
851                        "refined",
852                        1.0,
853                        None,
854                        Some("trim"),
855                        None,
856                        Some((rank + 1) as i64),
857                        source,
858                        now,
859                    )?;
860                }
861            }
862            // Write 'retrieved' events for sparks (for recurring-spark count tracking).
863            for (rank, chunk) in sparks.iter().enumerate() {
864                let cid = chunk["id"].as_str().unwrap_or("");
865                self.storage.insert_usage_trace(
866                    trace_id,
867                    Some(cid),
868                    "retrieved",
869                    1.0,
870                    None,
871                    Some("spark"),
872                    None,
873                    Some((rank + 1) as i64),
874                    source,
875                    now,
876                )?;
877            }
878            let snapshot = json!({
879                "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
880                "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
881                "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
882                "depth_skipped": depth_skipped,
883                "skipped_reasons": skipped_reasons,
884            });
885            let log = EpisodicLogRow {
886                id: gen_uuid(),
887                trace_id: trace_id.to_string(),
888                lib_id,
889                ts: now.to_string(),
890                query: Some(query.to_string()),
891                recall_snapshot: Some(snapshot.to_string()),
892                event_source: source.to_string(),
893                distill_state: "open".to_string(),
894                ..Default::default()
895            };
896            self.storage.upsert_episodic_log(&log)?;
897            self.storage.commit()
898        })();
899        if result.is_err() {
900            let _ = self.storage.rollback();
901        }
902        result
903    }
904
905    // ------------------------------------------------------------------
906    // Public API 2: record
907    // ------------------------------------------------------------------
908
909    #[allow(clippy::too_many_arguments)]
910    pub fn record(
911        &self,
912        trace_id: &str,
913        query: Option<&str>,
914        output: Option<&str>,
915        output_summary: Option<&str>,
916        outcome: Option<&str>,
917        used: Option<&[String]>,
918        feedback_up: Option<&[String]>,
919        feedback_down: Option<&[String]>,
920        nomination: Option<&str>,
921        priority: i64,
922        source: &str,
923    ) -> Result<()> {
924        if let Some(o) = outcome {
925            if !matches!(o, "ok" | "fail" | "unknown") {
926                return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
927            }
928        }
929        validate_source(source)?;
930        let effective_priority = if nomination.is_some() && priority == 0 {
931            1
932        } else {
933            priority
934        };
935        let now = utc_now_iso();
936        let lib_id = self.storage.lib_id()?;
937
938        self.storage.begin_immediate()?;
939        let result = (|| -> Result<()> {
940            let log = self.storage.get_episodic_log(trace_id)?;
941            let mut is_fresh_insert = false;
942            let log = match log {
943                Some(l) => l,
944                None => {
945                    let row = EpisodicLogRow {
946                        id: gen_uuid(),
947                        trace_id: trace_id.to_string(),
948                        lib_id,
949                        ts: now.clone(),
950                        query: query.map(str::to_string).or_else(|| Some(String::new())),
951                        output: output.map(str::to_string),
952                        output_summary: output_summary.map(str::to_string),
953                        outcome: outcome.map(str::to_string),
954                        event_source: source.to_string(),
955                        nomination: nomination.map(str::to_string),
956                        priority: effective_priority,
957                        distill_state: "open".to_string(),
958                        ..Default::default()
959                    };
960                    self.storage.upsert_episodic_log(&row)?;
961                    is_fresh_insert = true;
962                    self.storage.get_episodic_log(trace_id)?.unwrap()
963                }
964            };
965
966            let existing_outcome = log
967                .get("outcome")
968                .and_then(Value::as_str)
969                .map(str::to_string);
970            if let Some(new_outcome) = outcome {
971                if let Some(ref ex) = existing_outcome {
972                    if ex != new_outcome {
973                        return Err(InnateError::OutcomeConflict {
974                            trace_id: trace_id.to_string(),
975                            existing: ex.clone(),
976                            requested: new_outcome.to_string(),
977                        });
978                    }
979                }
980            }
981
982            // usage_trace: used
983            if let Some(used_ids) = used {
984                for cid in used_ids {
985                    self.storage.insert_usage_trace(
986                        trace_id,
987                        Some(cid),
988                        "used",
989                        0.3,
990                        None,
991                        None,
992                        None,
993                        None,
994                        source,
995                        &now,
996                    )?;
997                    self.storage.update_chunk_last_used(cid, &now)?;
998                }
999            }
1000
1001            // usage_trace: task_ok / task_fail
1002            if let Some(o) = outcome {
1003                if matches!(o, "ok" | "fail") {
1004                    let event = if o == "ok" { "task_ok" } else { "task_fail" };
1005                    let strength = if event == "task_fail" { 0.15 } else { 1.0 };
1006                    self.storage.insert_usage_trace(
1007                        trace_id, None, event, strength, None, None, None, None, source, &now,
1008                    )?;
1009                }
1010            }
1011
1012            // confidence implicit update
1013            if let Some(o) = outcome {
1014                if is_fresh_insert || existing_outcome.is_none() {
1015                    self.apply_outcome_implicit(trace_id, o, used, &now)?;
1016                }
1017            }
1018
1019            // feedback
1020            if let Some(ups) = feedback_up {
1021                for cid in ups {
1022                    self.update_confidence(cid, 1.0, 1.0, "user_up", &now, true)?;
1023                    self.storage.update_chunk_last_used(cid, &now)?;
1024                }
1025            }
1026            if let Some(downs) = feedback_down {
1027                for cid in downs {
1028                    self.update_confidence(cid, 0.0, 1.0, "user_down", &now, true)?;
1029                }
1030            }
1031
1032            // Fill in content fields (補写: output_summary, nomination, output, query) on existing log.
1033            if !is_fresh_insert {
1034                self.storage.patch_episodic_log_content(
1035                    trace_id,
1036                    query,
1037                    output,
1038                    output_summary,
1039                    nomination,
1040                    effective_priority,
1041                )?;
1042            }
1043
1044            // Update episodic log
1045            let current_state = log
1046                .get("distill_state")
1047                .and_then(Value::as_str)
1048                .unwrap_or("open");
1049            let outcome_completed = outcome.is_some() || existing_outcome.is_some();
1050            let new_state = if current_state == "open" && outcome_completed {
1051                let has_material = output_summary.is_some()
1052                    || nomination.is_some()
1053                    || output.is_some()
1054                    || log.get("output_summary").and_then(Value::as_str).is_some()
1055                    || log.get("nomination").and_then(Value::as_str).is_some()
1056                    || log.get("output").and_then(Value::as_str).is_some()
1057                    || (used.map(|u| !u.is_empty()).unwrap_or(false)
1058                        && outcome.map(|o| o != "unknown").unwrap_or(false));
1059                if has_material {
1060                    Some("new")
1061                } else {
1062                    Some("discarded")
1063                }
1064            } else {
1065                None
1066            };
1067            if let Some(state) = new_state {
1068                let note = if state == "discarded" {
1069                    Some("insufficient_material")
1070                } else {
1071                    None
1072                };
1073                let outcome_str = outcome.map(str::to_string);
1074                self.storage.update_episodic_log_state(
1075                    trace_id,
1076                    state,
1077                    note,
1078                    outcome_str.as_deref(),
1079                )?;
1080            } else if outcome.is_some() {
1081                let outcome_str = outcome.map(str::to_string);
1082                self.storage.update_episodic_log_state(
1083                    trace_id,
1084                    current_state,
1085                    None,
1086                    outcome_str.as_deref(),
1087                )?;
1088            }
1089
1090            self.storage.commit()
1091        })();
1092        if result.is_err() {
1093            let _ = self.storage.rollback();
1094        }
1095        result
1096    }
1097
1098    fn apply_outcome_implicit(
1099        &self,
1100        trace_id: &str,
1101        outcome: &str,
1102        used: Option<&[String]>,
1103        now: &str,
1104    ) -> Result<()> {
1105        let used_set: HashSet<&str> = used
1106            .map(|u| u.iter().map(String::as_str).collect())
1107            .unwrap_or_default();
1108        let (target, strength, reason) = if outcome == "ok" {
1109            (1.0, 0.3, "agent_used")
1110        } else {
1111            (0.0, 0.15, "task_fail")
1112        };
1113        for cid in &used_set {
1114            self.update_confidence(cid, target, strength, reason, now, false)?;
1115        }
1116        // selected but not used: very weak downgrade (implicit)
1117        let selected_rows = self.storage.query_chunks_params(
1118            "SELECT chunk_id FROM usage_trace WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1119            rusqlite::params![trace_id],
1120        )?;
1121        for row in selected_rows {
1122            if let Some(cid) = row.get("chunk_id").and_then(Value::as_str) {
1123                if !used_set.contains(cid) {
1124                    self.update_confidence(cid, 0.3, 0.1, "selected_unused", now, false)?;
1125                }
1126            }
1127        }
1128        Ok(())
1129    }
1130
1131    /// Update confidence via EMA.
1132    /// `explicit=true` → user_up/user_down/judge signals; applies recency_w (κ=0.5, W=14d).
1133    /// `explicit=false` → implicit/agent signals; recency_w ≡ 1.
1134    fn update_confidence(
1135        &self,
1136        chunk_id: &str,
1137        target: f64,
1138        strength: f64,
1139        reason: &str,
1140        now: &str,
1141        explicit: bool,
1142    ) -> Result<()> {
1143        let chunk = match self.storage.get_chunk(chunk_id)? {
1144            Some(c) => c,
1145            None => return Ok(()),
1146        };
1147        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1148            return Ok(());
1149        }
1150        let conf = chunk
1151            .get("confidence")
1152            .and_then(Value::as_f64)
1153            .unwrap_or(0.5);
1154
1155        // §二·五B v3.8: recency_w only for explicit signals.
1156        let recency_w = if explicit {
1157            const KAPPA: f64 = 0.5;
1158            const W_DAYS: f64 = 14.0;
1159            let gap_days = chunk
1160                .get("last_used_at")
1161                .and_then(Value::as_str)
1162                .map(|t| iso_days_diff(now, t) as f64)
1163                .unwrap_or(0.0);
1164            (1.0 + KAPPA * (-(gap_days / W_DAYS) * std::f64::consts::LN_2).exp()).min(1.5)
1165        } else {
1166            1.0
1167        };
1168
1169        let alpha = 0.2_f64;
1170        let effective_alpha = (alpha * strength * recency_w).min(1.0);
1171        let new_conf = (conf + effective_alpha * (target - conf)).clamp(0.0, 1.0);
1172        self.storage
1173            .update_chunk_confidence(chunk_id, new_conf, Some(reason), now)?;
1174        Ok(())
1175    }
1176
1177    // ------------------------------------------------------------------
1178    // Public API 3: add
1179    // ------------------------------------------------------------------
1180
1181    pub fn add(
1182        &self,
1183        content: &str,
1184        kind: &str,
1185        trigger_desc: Option<&str>,
1186        anti_trigger_desc: Option<&str>,
1187        source: &str,
1188        skill_name: Option<&str>,
1189    ) -> Result<String> {
1190        if !matches!(kind, "note" | "skill") {
1191            return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
1192        }
1193        if !matches!(source, "chat" | "manual" | "doc" | "agent") {
1194            return Err(InnateError::InvalidState(format!(
1195                "invalid source: {source}"
1196            )));
1197        }
1198
1199        let (content, action) = self.sanitize_content(content);
1200        if action == SanitizeAction::Discard {
1201            return Ok(String::new());
1202        }
1203
1204        let trigger_clean = trigger_desc.and_then(|t| {
1205            let (cleaned, act) = self.sanitizer.sanitize(t);
1206            if act == SanitizeAction::Discard {
1207                None
1208            } else {
1209                Some(cleaned)
1210            }
1211        });
1212        let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
1213            let (cleaned, act) = self.sanitizer.sanitize(t);
1214            if act == SanitizeAction::Discard {
1215                None
1216            } else {
1217                Some(cleaned)
1218            }
1219        });
1220
1221        let h = content_hash(&content);
1222        if self.storage.is_hash_invalidated(&h)? {
1223            return Err(InnateError::InvalidState(
1224                "content hash is invalidated".into(),
1225            ));
1226        }
1227
1228        // Idempotency check
1229        let existing = self.storage.query_chunks_params(
1230            "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
1231            rusqlite::params![h],
1232        )?;
1233        if let Some(e) = existing.first() {
1234            if let Some(id) = e.get("id").and_then(Value::as_str) {
1235                return Ok(id.to_string());
1236            }
1237        }
1238
1239        let now = utc_now_iso();
1240        let chunk_id = gen_uuid();
1241        let redacted = action == SanitizeAction::Redact;
1242
1243        let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
1244            (
1245                "captured",
1246                "pending",
1247                if redacted { 0.4 } else { 0.60 },
1248                0,
1249                "init:captured_agent",
1250            )
1251        } else if kind == "skill" {
1252            (
1253                "installed",
1254                "active",
1255                if redacted { 0.4 } else { 0.85 },
1256                1,
1257                "init:installed",
1258            )
1259        } else {
1260            (
1261                "captured",
1262                "active",
1263                if redacted { 0.4 } else { 0.60 },
1264                0,
1265                "init:captured",
1266            )
1267        };
1268
1269        // Embedding — fall back to embedding_pending on failure.
1270        let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
1271        let (cvec, tvec, embed_ver, final_state_reason) = match (
1272            self.embedding.embed_content(&content),
1273            self.embedding.embed_trigger(trigger_str),
1274        ) {
1275            (Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
1276            _ => (
1277                vec![],
1278                vec![],
1279                0i64,
1280                format!("embedding_pending:target={state}"),
1281            ),
1282        };
1283
1284        let tokens = estimate_tokens(&content) as i64;
1285        let row = ChunkRow {
1286            id: chunk_id.clone(),
1287            skill_name: skill_name.map(str::to_string),
1288            content: content.clone(),
1289            trigger_desc: trigger_clean.clone(),
1290            anti_trigger_desc: anti_trigger_clean.clone(),
1291            content_hash: h,
1292            token_count: Some(tokens),
1293            origin: origin.to_string(),
1294            source: Some(source.to_string()),
1295            protected: prot,
1296            state: state.to_string(),
1297            state_reason: Some(final_state_reason),
1298            confidence: conf,
1299            confidence_reason: Some(format!("init:{origin}")),
1300            version: 1,
1301            embed_version: embed_ver,
1302            created_at: now.clone(),
1303            updated_at: now.clone(),
1304            ..Default::default()
1305        };
1306
1307        self.storage.begin_immediate()?;
1308        let result = (|| -> Result<()> {
1309            self.storage.insert_chunk(&row)?;
1310            if embed_ver > 0 {
1311                self.storage
1312                    .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
1313                self.storage
1314                    .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
1315            }
1316            self.storage.commit()
1317        })();
1318        if result.is_err() {
1319            let _ = self.storage.rollback();
1320        }
1321        result?;
1322        Ok(chunk_id)
1323    }
1324
1325    // ------------------------------------------------------------------
1326    // Public API 4: spark
1327    // ------------------------------------------------------------------
1328
1329    pub fn spark(
1330        &self,
1331        content: &str,
1332        trigger_desc: Option<&str>,
1333        anti_trigger_desc: Option<&str>,
1334    ) -> Result<String> {
1335        let (content, action) = self.sanitize_content(content);
1336        if action == SanitizeAction::Discard {
1337            return Ok(String::new());
1338        }
1339
1340        let trigger_clean = trigger_desc.and_then(|t| {
1341            let (cleaned, act) = self.sanitizer.sanitize(t);
1342            if act == SanitizeAction::Discard {
1343                None
1344            } else {
1345                Some(cleaned)
1346            }
1347        });
1348        let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
1349            let (cleaned, act) = self.sanitizer.sanitize(t);
1350            if act == SanitizeAction::Discard {
1351                None
1352            } else {
1353                Some(cleaned)
1354            }
1355        });
1356
1357        let h = content_hash(&content);
1358        if self.storage.is_hash_invalidated(&h)? {
1359            return Err(InnateError::InvalidState(
1360                "content hash is invalidated".into(),
1361            ));
1362        }
1363
1364        // Quick related recall (trace=false, no recursion risk)
1365        let related: Vec<String> = self
1366            .recall(
1367                &content,
1368                2000,
1369                false,
1370                false,
1371                Some(5),
1372                "sdk",
1373                "false",
1374                false,
1375                "off",
1376            )
1377            .map(|r| {
1378                r.knowledge
1379                    .iter()
1380                    .filter_map(|c| c["id"].as_str().map(str::to_string))
1381                    .collect()
1382            })
1383            .unwrap_or_default();
1384
1385        let now = utc_now_iso();
1386        let chunk_id = gen_uuid();
1387        let tokens = estimate_tokens(&content) as i64;
1388
1389        let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
1390        let (cvec, tvec, embed_ver, state_reason) = match (
1391            self.embedding.embed_content(&content),
1392            self.embedding.embed_trigger(trigger_str),
1393        ) {
1394            (Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
1395            _ => (
1396                vec![],
1397                vec![],
1398                0i64,
1399                "embedding_pending:target=active".to_string(),
1400            ),
1401        };
1402
1403        let row = ChunkRow {
1404            id: chunk_id.clone(),
1405            content: content.clone(),
1406            trigger_desc: trigger_clean.clone(),
1407            anti_trigger_desc: anti_trigger_clean.clone(),
1408            content_hash: h,
1409            token_count: Some(tokens),
1410            origin: "spark".to_string(),
1411            maturity: Some("seed".to_string()),
1412            related_ids: if related.is_empty() {
1413                None
1414            } else {
1415                Some(related.join(","))
1416            },
1417            state: "active".to_string(),
1418            state_reason: Some(state_reason),
1419            confidence: 0.5,
1420            version: 1,
1421            embed_version: embed_ver,
1422            created_at: now.clone(),
1423            updated_at: now.clone(),
1424            ..Default::default()
1425        };
1426
1427        self.storage.begin_immediate()?;
1428        let result = (|| -> Result<()> {
1429            self.storage.insert_chunk(&row)?;
1430            if embed_ver > 0 {
1431                self.storage
1432                    .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
1433                self.storage
1434                    .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
1435            }
1436            self.storage.commit()
1437        })();
1438        if result.is_err() {
1439            let _ = self.storage.rollback();
1440        }
1441        result?;
1442        Ok(chunk_id)
1443    }
1444
1445    // ------------------------------------------------------------------
1446    // Public API 5: mature_spark / promote_spark / drop_spark
1447    // ------------------------------------------------------------------
1448
1449    pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
1450        let chunk = self
1451            .storage
1452            .get_chunk(spark_id)?
1453            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
1454        if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
1455            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
1456        }
1457        let current = chunk
1458            .get("maturity")
1459            .and_then(Value::as_str)
1460            .unwrap_or("seed");
1461        let valid_next: &[&str] = match current {
1462            "seed" => &["sprouting"],
1463            "sprouting" => &["incubating"],
1464            _ => {
1465                return Err(InnateError::InvalidState(format!(
1466                    "spark {spark_id} already {current}"
1467                )))
1468            }
1469        };
1470        if current == to {
1471            return Ok(());
1472        }
1473        if !valid_next.contains(&to) {
1474            return Err(InnateError::InvalidState(format!(
1475                "invalid spark maturity transition: {current} -> {to}"
1476            )));
1477        }
1478        let now = utc_now_iso();
1479        self.storage.begin_immediate()?;
1480        let result = self
1481            .storage
1482            .query_chunks_params(
1483                "UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
1484                rusqlite::params![to, now, spark_id],
1485            )
1486            .and_then(|_| self.storage.commit());
1487        if result.is_err() {
1488            let _ = self.storage.rollback();
1489        }
1490        result.map(|_| ())
1491    }
1492
1493    pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
1494        let spark = self
1495            .storage
1496            .get_chunk(spark_id)?
1497            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
1498        if spark.get("origin").and_then(Value::as_str) != Some("spark") {
1499            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
1500        }
1501        let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
1502        if maturity == "promoted" || maturity == "dropped" {
1503            return Err(InnateError::InvalidState(format!(
1504                "spark {spark_id} already {maturity}"
1505            )));
1506        }
1507        if !matches!(to, "note" | "skill") {
1508            return Err(InnateError::InvalidState(format!(
1509                "invalid spark promotion target: {to}"
1510            )));
1511        }
1512
1513        let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
1514        let (content, action) = self.sanitize_content(content);
1515        if action == SanitizeAction::Discard {
1516            return Err(InnateError::InvalidState(
1517                "sanitize discard on promote".into(),
1518            ));
1519        }
1520
1521        let promoted_hash = content_hash(&content);
1522        if self.storage.is_hash_invalidated(&promoted_hash)? {
1523            return Err(InnateError::InvalidState(
1524                "spark content hash is invalidated".into(),
1525            ));
1526        }
1527
1528        let now = utc_now_iso();
1529
1530        // Idempotency: existing non-spark chunk with same hash
1531        let existing = self.storage.query_chunks_params(
1532            "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
1533            rusqlite::params![promoted_hash],
1534        )?;
1535        if let Some(e) = existing.first() {
1536            if let Some(id) = e.get("id").and_then(Value::as_str) {
1537                let id = id.to_string();
1538                self.storage.query_chunks_params(
1539                    "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
1540                    rusqlite::params![now, spark_id],
1541                )?;
1542                self.storage.commit()?;
1543                return Ok(id);
1544            }
1545        }
1546
1547        let (state, conf, prot, origin, state_reason) = if to == "skill" {
1548            ("active", 0.85, 1, "installed", "init:installed")
1549        } else {
1550            ("active", 0.60, 0, "captured", "init:captured")
1551        };
1552
1553        let conf = if action == SanitizeAction::Redact {
1554            0.4_f64
1555        } else {
1556            conf
1557        };
1558        let new_id = gen_uuid();
1559        let trigger = spark.get("trigger_desc").and_then(Value::as_str);
1560        let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
1561
1562        let row = ChunkRow {
1563            id: new_id.clone(),
1564            content: content.clone(),
1565            trigger_desc: trigger.map(str::to_string),
1566            anti_trigger_desc: anti.map(str::to_string),
1567            content_hash: promoted_hash,
1568            token_count: Some(estimate_tokens(&content) as i64),
1569            origin: origin.to_string(),
1570            source: Some("manual".to_string()),
1571            protected: prot,
1572            state: state.to_string(),
1573            state_reason: Some(state_reason.to_string()),
1574            confidence: conf,
1575            confidence_reason: Some("manual_set".to_string()),
1576            parent_id: Some(spark_id.to_string()),
1577            version: 1,
1578            embed_version: 1,
1579            created_at: now.clone(),
1580            updated_at: now.clone(),
1581            ..Default::default()
1582        };
1583
1584        let cvec = self.embedding.embed_content(&content)?;
1585        let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
1586
1587        self.storage.begin_immediate()?;
1588        let result = (|| -> Result<()> {
1589            self.storage.insert_chunk(&row)?;
1590            self.storage
1591                .insert_vec_content(&new_id, &pack_embedding(&cvec))?;
1592            self.storage
1593                .insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
1594            self.storage.query_chunks_params(
1595                "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
1596                rusqlite::params![now, spark_id],
1597            )?;
1598            self.storage.commit()
1599        })();
1600        if result.is_err() {
1601            let _ = self.storage.rollback();
1602        }
1603        result?;
1604        Ok(new_id)
1605    }
1606
1607    pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
1608        let spark = self
1609            .storage
1610            .get_chunk(spark_id)?
1611            .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
1612        if spark.get("origin").and_then(Value::as_str) != Some("spark") {
1613            return Err(InnateError::ChunkNotFound(spark_id.to_string()));
1614        }
1615        let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
1616        if maturity == "promoted" {
1617            return Err(InnateError::InvalidState(format!(
1618                "spark {spark_id} already promoted"
1619            )));
1620        }
1621        if maturity == "dropped" {
1622            return Ok(());
1623        }
1624        let now = utc_now_iso();
1625        let reason_str = if reason.is_empty() {
1626            "dropped".to_string()
1627        } else {
1628            format!("dropped:{reason}")
1629        };
1630        self.storage.begin_immediate()?;
1631        let result = self
1632            .storage
1633            .query_chunks_params(
1634                "UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
1635                rusqlite::params![reason_str, now, spark_id],
1636            )
1637            .and_then(|_| self.storage.commit());
1638        if result.is_err() {
1639            let _ = self.storage.rollback();
1640        }
1641        result.map(|_| ())
1642    }
1643
1644    // ------------------------------------------------------------------
1645    // Public API 6: approve / archive / invalidate / restore
1646    // ------------------------------------------------------------------
1647
1648    pub fn approve(&self, chunk_id: &str) -> Result<()> {
1649        let chunk = self
1650            .storage
1651            .get_chunk(chunk_id)?
1652            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1653        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1654            return Err(InnateError::InvalidState(
1655                "spark lifecycle uses promote_spark() or invalidate()".into(),
1656            ));
1657        }
1658        if chunk.get("state").and_then(Value::as_str) == Some("active") {
1659            return Ok(());
1660        }
1661        if chunk.get("state").and_then(Value::as_str) != Some("pending") {
1662            return Err(InnateError::InvalidState(
1663                "approve requires pending chunk".into(),
1664            ));
1665        }
1666        let now = utc_now_iso();
1667        self.storage.begin_immediate()?;
1668        let result = (|| -> Result<()> {
1669            self.storage
1670                .update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
1671            self.storage.query_chunks_params(
1672                "UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
1673                rusqlite::params![now, chunk_id],
1674            )?;
1675            self.storage.commit()
1676        })();
1677        if result.is_err() {
1678            let _ = self.storage.rollback();
1679        }
1680        result
1681    }
1682
1683    pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
1684        let chunk = self
1685            .storage
1686            .get_chunk(chunk_id)?
1687            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1688        if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1689            return Err(InnateError::InvalidState(
1690                "spark lifecycle uses drop_spark() or invalidate()".into(),
1691            ));
1692        }
1693        let now = utc_now_iso();
1694        self.storage.begin_immediate()?;
1695        let result = self
1696            .storage
1697            .update_chunk_state(chunk_id, "archived", Some(reason), &now)
1698            .and_then(|_| self.storage.commit());
1699        if result.is_err() {
1700            let _ = self.storage.rollback();
1701        }
1702        result
1703    }
1704
1705    pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
1706        let chunk = self
1707            .storage
1708            .get_chunk(chunk_id)?
1709            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1710        let h = chunk
1711            .get("content_hash")
1712            .and_then(Value::as_str)
1713            .unwrap_or("")
1714            .to_string();
1715        let now = utc_now_iso();
1716        let reason_str = if reason.is_empty() {
1717            "invalidated".to_string()
1718        } else {
1719            format!("invalidated:{reason}")
1720        };
1721
1722        self.storage.begin_immediate()?;
1723        let result = (|| -> Result<()> {
1724            self.storage.query_chunks_params(
1725                "UPDATE chunks SET state='archived', confidence=0.0, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
1726                rusqlite::params![reason_str, now, now, chunk_id],
1727            )?;
1728            self.storage.query_chunks_params(
1729                "UPDATE chunks SET state='archived', confidence=0.0, state_reason='invalidated:same_hash', state_updated_at=?, updated_at=? WHERE content_hash=? AND id!=?",
1730                rusqlite::params![now, now, h, chunk_id],
1731            )?;
1732            self.storage
1733                .insert_invalidated_hash(&h, Some(reason), &now)?;
1734            self.storage.commit()
1735        })();
1736        if result.is_err() {
1737            let _ = self.storage.rollback();
1738        }
1739        result
1740    }
1741
1742    pub fn restore(&self, chunk_id: &str) -> Result<()> {
1743        let chunk = self
1744            .storage
1745            .get_chunk(chunk_id)?
1746            .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1747        let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
1748        if state == "active" {
1749            return Ok(());
1750        }
1751        if state != "archived" {
1752            return Err(InnateError::InvalidState(
1753                "restore requires archived chunk".into(),
1754            ));
1755        }
1756        let was_invalidated = chunk
1757            .get("state_reason")
1758            .and_then(Value::as_str)
1759            .map(|r| r.starts_with("invalidated"))
1760            .unwrap_or(false);
1761        let h = chunk
1762            .get("content_hash")
1763            .and_then(Value::as_str)
1764            .unwrap_or("")
1765            .to_string();
1766        let now = utc_now_iso();
1767
1768        self.storage.begin_immediate()?;
1769        let result = (|| -> Result<()> {
1770            self.storage
1771                .update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
1772            if was_invalidated {
1773                self.storage.query_chunks_params(
1774                    "DELETE FROM invalidated_hashes WHERE content_hash=?",
1775                    rusqlite::params![h],
1776                )?;
1777            }
1778            self.storage.query_chunks_params(
1779                "UPDATE chunks SET confidence_reason='restore', updated_at=? WHERE id=?",
1780                rusqlite::params![now, chunk_id],
1781            )?;
1782            self.storage.commit()
1783        })();
1784        if result.is_err() {
1785            let _ = self.storage.rollback();
1786        }
1787        result
1788    }
1789
1790    // ------------------------------------------------------------------
1791    // Public API 7: evolve
1792    // ------------------------------------------------------------------
1793
1794    pub fn evolve(&self, trigger: &str) -> Result<Value> {
1795        if !matches!(trigger, "manual" | "scheduled" | "threshold") {
1796            return Err(InnateError::InvalidState(format!(
1797                "invalid evolve trigger: {trigger}"
1798            )));
1799        }
1800
1801        // Threshold check
1802        if trigger == "threshold" {
1803            let rows = self.storage.query_chunks(
1804                "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
1805            )?;
1806            let cnt = rows
1807                .first()
1808                .and_then(|r| r.get("cnt"))
1809                .and_then(Value::as_i64)
1810                .unwrap_or(0);
1811            if cnt < self.evolve_threshold {
1812                return Ok(json!({"distilled": 0, "curate": null}));
1813            }
1814
1815            if let Some(limit) = self
1816                .storage
1817                .get_meta("max_distill_tokens_per_period")?
1818                .and_then(|value| value.parse::<i64>().ok())
1819                .filter(|value| *value > 0)
1820            {
1821                let rows = self.storage.query_chunks(
1822                    "SELECT COALESCE(SUM(distill_prompt_tokens),0)
1823                            + COALESCE(SUM(distill_completion_tokens),0) AS used
1824                     FROM episodic_log",
1825                )?;
1826                let used = rows
1827                    .first()
1828                    .and_then(|row| row.get("used"))
1829                    .and_then(Value::as_i64)
1830                    .unwrap_or(0);
1831                if used >= limit {
1832                    return Ok(json!({
1833                        "distilled": 0,
1834                        "curate": null,
1835                        "skipped": "distill_token_limit",
1836                        "distill_tokens_used": used,
1837                        "distill_token_limit": limit,
1838                    }));
1839                }
1840            }
1841        }
1842
1843        let distilled = self.distill_batch()?;
1844        let curator = Arc::clone(&self.curator);
1845        let curate = curator.run(self, &CurateScope::default())?;
1846
1847        Ok(json!({
1848            "distilled": distilled,
1849            "curate": {
1850                "archived": curate.archived.len(),
1851                "deduped": curate.deduped.len(),
1852                "decayed": curate.decayed.len(),
1853                "recovered": curate.recovered.len(),
1854                "orphans": curate.orphans.len(),
1855                "warnings": curate.warnings,
1856            }
1857        }))
1858    }
1859
1860    fn distill_batch(&self) -> Result<usize> {
1861        let run_id = gen_uuid();
1862        let now = utc_now_iso();
1863
1864        // Atomically claim a batch of 'new' logs → mark 'screening'.
1865        self.storage.begin_immediate()?;
1866        let logs = match self
1867            .storage
1868            .claim_distill_batch(&run_id, self.distill_batch_size, &now)
1869        {
1870            Ok(l) => {
1871                self.storage.commit()?;
1872                l
1873            }
1874            Err(e) => {
1875                let _ = self.storage.rollback();
1876                return Err(e);
1877            }
1878        };
1879
1880        let mut count = 0;
1881        for log in &logs {
1882            let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
1883            let prompt_tokens = estimate_distill_prompt_tokens(log);
1884            self.storage
1885                .update_episodic_log_tokens(log_id, prompt_tokens, 0)?;
1886            let chunks = match self.distiller.distill(std::slice::from_ref(log)) {
1887                Ok(chunks) => chunks,
1888                Err(error) => {
1889                    let note = format!("distill_failed:{error}");
1890                    self.storage.update_episodic_log_state_by_id(
1891                        log_id,
1892                        "failed",
1893                        Some(&note),
1894                        None,
1895                    )?;
1896                    continue;
1897                }
1898            };
1899            let completion_tokens = chunks
1900                .iter()
1901                .map(estimate_distilled_chunk_tokens)
1902                .sum::<i64>();
1903            self.storage
1904                .update_episodic_log_tokens(log_id, prompt_tokens, completion_tokens)?;
1905            if chunks.is_empty() {
1906                // Mark log discarded — use id not trace_id for the update query
1907                let _ = self.storage.update_episodic_log_state_by_id(
1908                    log_id,
1909                    "discarded",
1910                    Some("insufficient_material"),
1911                    None,
1912                );
1913                continue;
1914            }
1915            let mut log_written = false;
1916            for dc in chunks {
1917                let (content, action) = self.sanitize_content(&dc.content);
1918                if action == SanitizeAction::Discard {
1919                    let _ = self.storage.update_episodic_log_state_by_id(
1920                        log_id,
1921                        "discarded",
1922                        Some("sanitize_discard"),
1923                        None,
1924                    );
1925                    continue;
1926                }
1927                let h = content_hash(&content);
1928                if self.storage.is_hash_invalidated(&h)? {
1929                    let _ = self.storage.update_episodic_log_state_by_id(
1930                        log_id,
1931                        "discarded",
1932                        Some("invalidated_hash"),
1933                        None,
1934                    );
1935                    continue;
1936                }
1937                let redacted = action == SanitizeAction::Redact;
1938                let conf = if redacted { 0.4 } else { 0.45 };
1939                let now2 = utc_now_iso();
1940                let chunk_id = gen_uuid();
1941                let tokens = estimate_tokens(&content) as i64;
1942                let row = ChunkRow {
1943                    id: chunk_id.clone(),
1944                    content: content.clone(),
1945                    trigger_desc: dc.trigger_desc,
1946                    anti_trigger_desc: dc.anti_trigger_desc,
1947                    content_hash: h,
1948                    token_count: Some(tokens),
1949                    origin: "distilled".to_string(),
1950                    distilled_from: Some(dc.source_log_id),
1951                    state: "pending".to_string(),
1952                    state_reason: Some("init:distilled".to_string()),
1953                    confidence: conf,
1954                    confidence_reason: Some("init:distilled".to_string()),
1955                    version: 1,
1956                    embed_version: 1,
1957                    created_at: now2.clone(),
1958                    updated_at: now2.clone(),
1959                    ..Default::default()
1960                };
1961                let cvec = match self.embedding.embed_content(&content) {
1962                    Ok(v) => v,
1963                    Err(_) => {
1964                        // embedding failed — do NOT write chunk; mark log failed
1965                        let _ = self.storage.update_episodic_log_state_by_id(
1966                            log_id,
1967                            "failed",
1968                            Some("embedding_failed"),
1969                            None,
1970                        );
1971                        continue; // skip to next log entry
1972                    }
1973                };
1974                let tvec = match self
1975                    .embedding
1976                    .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
1977                {
1978                    Ok(v) => v,
1979                    Err(_) => {
1980                        let _ = self.storage.update_episodic_log_state_by_id(
1981                            log_id,
1982                            "failed",
1983                            Some("embedding_failed"),
1984                            None,
1985                        );
1986                        continue;
1987                    }
1988                };
1989                self.storage.begin_immediate()?;
1990                let r = (|| -> Result<()> {
1991                    self.storage.insert_chunk(&row)?;
1992                    self.storage
1993                        .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
1994                    self.storage
1995                        .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
1996                    self.storage.commit()
1997                })();
1998                if r.is_err() {
1999                    let _ = self.storage.rollback();
2000                    r?;
2001                }
2002                count += 1;
2003                log_written = true;
2004            }
2005            if log_written {
2006                let _ =
2007                    self.storage
2008                        .update_episodic_log_state_by_id(log_id, "distilled", None, None);
2009            }
2010        }
2011        Ok(count)
2012    }
2013
2014    pub(crate) fn builtin_curate_impl(&self, scope: &CurateScope) -> Result<CurateReport> {
2015        let mut report = CurateReport::default();
2016        let now_iso = utc_now_iso();
2017        if scope.dry_run {
2018            // dry_run: compute report without writing
2019            let archived_count: i64 = count_query(&self.storage,
2020                "SELECT COUNT(*) FROM chunks WHERE origin!='spark' AND protected=0 AND state='active'")?;
2021            report.stats.insert("dry_run".to_string(), json!(true));
2022            report
2023                .stats
2024                .insert("eligible_for_governance".to_string(), json!(archived_count));
2025            return Ok(report);
2026        }
2027
2028        // ── Step 1-4: aggregate (single BEGIN IMMEDIATE, half-open cutoff window) ──
2029        self.storage.begin_immediate()?;
2030        let agg_result = (|| -> Result<()> {
2031            let last_ts = self
2032                .storage
2033                .get_meta("last_agg_ts")?
2034                .unwrap_or_else(|| "1970-01-01T00:00:00.000Z".to_string());
2035            let cutoff_ts = now_iso.clone();
2036
2037            // 1. Aggregate success traces from 'used' events (not 'selected') in the window.
2038            self.storage.conn_execute(
2039                "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts)
2040                 SELECT ut.chunk_id, ut.trace_id, MAX(ut.ts)
2041                 FROM usage_trace ut
2042                 WHERE ut.event = 'used'
2043                   AND ut.chunk_id IS NOT NULL
2044                   AND ut.ts >= ? AND ut.ts < ?
2045                   AND (
2046                     EXISTS (SELECT 1 FROM usage_trace ok
2047                             WHERE ok.trace_id = ut.trace_id
2048                               AND ok.event = 'task_ok' AND ok.chunk_id IS NULL)
2049                     OR EXISTS (SELECT 1 FROM episodic_log el
2050                                WHERE el.trace_id = ut.trace_id AND el.outcome = 'ok')
2051                   )
2052                 GROUP BY ut.chunk_id, ut.trace_id",
2053                rusqlite::params![last_ts, cutoff_ts],
2054            )?;
2055
2056            // 2. Derive success counts from the persistent fact table.
2057            self.storage.conn_execute(
2058                "UPDATE chunks SET
2059                   used_success_count      = (SELECT COUNT(*)   FROM chunk_success_traces WHERE chunk_id = chunks.id),
2060                   success_trace_ids_count = (SELECT COUNT(*)   FROM chunk_success_traces WHERE chunk_id = chunks.id),
2061                   last_success_at         = (SELECT MAX(ts)    FROM chunk_success_traces WHERE chunk_id = chunks.id)
2062                 WHERE id IN (SELECT DISTINCT chunk_id FROM chunk_success_traces)",
2063                rusqlite::params![],
2064            )?;
2065
2066            // 3. Increment selected_count / used_count from window.
2067            self.storage.conn_execute(
2068                "UPDATE chunks SET
2069                   selected_count = selected_count + COALESCE(
2070                     (SELECT COUNT(*) FROM usage_trace
2071                      WHERE chunk_id = chunks.id AND event = 'selected'
2072                        AND ts >= ? AND ts < ?), 0),
2073                   used_count = used_count + COALESCE(
2074                     (SELECT COUNT(*) FROM usage_trace
2075                      WHERE chunk_id = chunks.id AND event = 'used'
2076                        AND ts >= ? AND ts < ?), 0)
2077                 WHERE id IN (SELECT DISTINCT chunk_id FROM usage_trace
2078                              WHERE ts >= ? AND ts < ?)",
2079                rusqlite::params![last_ts, cutoff_ts, last_ts, cutoff_ts, last_ts, cutoff_ts],
2080            )?;
2081
2082            // 4. Advance watermark and purge raw traces before cutoff.
2083            self.storage.set_meta("last_agg_ts", &cutoff_ts)?;
2084            self.storage.purge_usage_trace(&cutoff_ts)?;
2085            self.storage.commit()
2086        })();
2087        if agg_result.is_err() {
2088            let _ = self.storage.rollback();
2089            agg_result?;
2090        }
2091
2092        // ── Step 2: recover_logs ──
2093        self.storage.begin_immediate()?;
2094        let recover_result = (|| -> Result<()> {
2095            // Stale screening rows → 'failed' (not 'open'), note = 'screening_timeout:<run_id>'.
2096            let screening_cutoff = minutes_ago(&now_iso, self.screening_timeout_minutes);
2097            let stale = self.storage.query_chunks_params(
2098                "SELECT id, distill_run_id FROM episodic_log
2099                 WHERE distill_state='screening' AND distill_locked_at < ?",
2100                rusqlite::params![screening_cutoff],
2101            )?;
2102            for row in &stale {
2103                let id = row.get("id").and_then(Value::as_str).unwrap_or("");
2104                let run_id = row
2105                    .get("distill_run_id")
2106                    .and_then(Value::as_str)
2107                    .unwrap_or("unknown");
2108                let note = format!("screening_timeout:{run_id}");
2109                self.storage.conn_execute(
2110                    "UPDATE episodic_log
2111                     SET distill_state='failed', distill_note=?,
2112                         distill_run_id=NULL, distill_locked_at=NULL
2113                     WHERE id=?",
2114                    rusqlite::params![note, id],
2115                )?;
2116                report.recovered.push(id.to_string());
2117                report
2118                    .warnings
2119                    .push(format!("stale screening recovered as failed: {id}"));
2120            }
2121
2122            // Open logs past TTL → discarded (insufficient material, never record'd).
2123            let open_ttl_cutoff = days_ago(&now_iso, self.open_ttl_days);
2124            self.storage.conn_execute(
2125                "UPDATE episodic_log
2126                 SET distill_state='discarded', distill_note='no_record_timeout'
2127                 WHERE distill_state='open' AND ts < ?",
2128                rusqlite::params![open_ttl_cutoff],
2129            )?;
2130            self.storage.commit()
2131        })();
2132        if recover_result.is_err() {
2133            let _ = self.storage.rollback();
2134            recover_result?;
2135        }
2136
2137        // ── Steps 3-7: governance (archive, dedupe, decay, promote, cycle) ──
2138        let scope_origin = scope.origin.clone();
2139        let scope_skill = scope.skill_name.clone();
2140        self.storage.begin_immediate()?;
2141        let gov_result = (|| -> Result<()> {
2142            // ── 3a. Archive: low_confidence — only blocks that HAVE been used ──
2143            let low_conf_cutoff = days_ago(&now_iso, self.low_conf_idle_days);
2144            let low_conf = self.storage.query_chunks_params(
2145                "SELECT id FROM chunks
2146                 WHERE origin!='spark' AND protected=0 AND state='active'
2147                   AND last_used_at IS NOT NULL
2148                   AND confidence < ?
2149                   AND last_used_at < ?
2150                   AND (? IS NULL OR origin=?)
2151                   AND (? IS NULL OR skill_name=?)",
2152                rusqlite::params![
2153                    self.low_conf_threshold,
2154                    low_conf_cutoff,
2155                    scope_origin,
2156                    scope_origin,
2157                    scope_skill,
2158                    scope_skill
2159                ],
2160            )?;
2161            for c in &low_conf {
2162                if let Some(id) = c.get("id").and_then(Value::as_str) {
2163                    self.storage.update_chunk_state(
2164                        id,
2165                        "archived",
2166                        Some("low_confidence"),
2167                        &now_iso,
2168                    )?;
2169                    report.archived.push(id.to_string());
2170                }
2171            }
2172
2173            // ── 3b. Archive: repeated_selected_unused ──
2174            let rep_sel = self.storage.query_chunks_params(
2175                "SELECT id FROM chunks
2176                 WHERE origin!='spark' AND protected=0 AND state='active'
2177                   AND selected_count >= ? AND used_count = 0 AND confidence < ?
2178                   AND (? IS NULL OR origin=?)
2179                   AND (? IS NULL OR skill_name=?)",
2180                rusqlite::params![
2181                    self.repeat_select_min,
2182                    self.repeat_select_conf_max,
2183                    scope_origin,
2184                    scope_origin,
2185                    scope_skill,
2186                    scope_skill
2187                ],
2188            )?;
2189            for c in &rep_sel {
2190                if let Some(id) = c.get("id").and_then(Value::as_str) {
2191                    if !report.archived.contains(&id.to_string()) {
2192                        self.storage.update_chunk_state(
2193                            id,
2194                            "archived",
2195                            Some("repeated_selected_unused"),
2196                            &now_iso,
2197                        )?;
2198                        report.archived.push(id.to_string());
2199                    }
2200                }
2201            }
2202
2203            // ── 3c. Archive: never_used — never entered context at all ──
2204            let never_used_cutoff = days_ago(&now_iso, self.never_used_age_days);
2205            let never_used = self.storage.query_chunks_params(
2206                "SELECT id FROM chunks
2207                 WHERE origin!='spark' AND protected=0 AND state='active'
2208                   AND used_count = 0 AND selected_count = 0
2209                   AND created_at < ?
2210                   AND (? IS NULL OR origin=?)
2211                   AND (? IS NULL OR skill_name=?)",
2212                rusqlite::params![
2213                    never_used_cutoff,
2214                    scope_origin,
2215                    scope_origin,
2216                    scope_skill,
2217                    scope_skill
2218                ],
2219            )?;
2220            for c in &never_used {
2221                if let Some(id) = c.get("id").and_then(Value::as_str) {
2222                    if !report.archived.contains(&id.to_string()) {
2223                        self.storage.update_chunk_state(
2224                            id,
2225                            "archived",
2226                            Some("never_used"),
2227                            &now_iso,
2228                        )?;
2229                        report.archived.push(id.to_string());
2230                    }
2231                }
2232            }
2233
2234            // ── 4. Dedupe: same content_hash — keep protected or highest confidence ──
2235            let dupes = self.storage.query_chunks_params(
2236                "SELECT content_hash FROM chunks
2237                 WHERE origin!='spark' AND state IN ('active','pending')
2238                   AND (? IS NULL OR origin=?)
2239                   AND (? IS NULL OR skill_name=?)
2240                 GROUP BY content_hash HAVING COUNT(*) > 1",
2241                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
2242            )?;
2243            for d in &dupes {
2244                if let Some(h) = d.get("content_hash").and_then(Value::as_str) {
2245                    let group = self.storage.query_chunks_params(
2246                        "SELECT id, confidence, protected FROM chunks
2247                         WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending')
2248                           AND (? IS NULL OR origin=?)
2249                           AND (? IS NULL OR skill_name=?)
2250                         ORDER BY protected DESC, confidence DESC",
2251                        rusqlite::params![h, scope_origin, scope_origin, scope_skill, scope_skill],
2252                    )?;
2253                    let canonical_id = group
2254                        .first()
2255                        .and_then(|row| row.get("id"))
2256                        .and_then(Value::as_str)
2257                        .unwrap_or("");
2258                    for row in group.iter().skip(1) {
2259                        let id = row.get("id").and_then(Value::as_str).unwrap_or("");
2260                        let reason = format!("duplicate:{canonical_id}");
2261                        self.storage
2262                            .update_chunk_state(id, "archived", Some(&reason), &now_iso)?;
2263                        self.storage.conn_execute(
2264                            "UPDATE chunks SET parent_id=?, updated_at=? WHERE id=?",
2265                            rusqlite::params![canonical_id, now_iso, id],
2266                        )?;
2267                        report.deduped.push(id.to_string());
2268                    }
2269                }
2270            }
2271
2272            // ── 5. Decay: confidence time-decay for idle non-spark non-protected active chunks ──
2273            let decay_candidates = self.storage.query_chunks_params(
2274                "SELECT id, confidence, last_used_at FROM chunks
2275                 WHERE origin!='spark' AND protected=0 AND state='active'
2276                   AND last_used_at IS NOT NULL
2277                   AND (? IS NULL OR origin=?)
2278                   AND (? IS NULL OR skill_name=?)",
2279                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
2280            )?;
2281            for c in &decay_candidates {
2282                let id = match c.get("id").and_then(Value::as_str) {
2283                    Some(v) => v,
2284                    None => continue,
2285                };
2286                let conf = c.get("confidence").and_then(Value::as_f64).unwrap_or(0.5);
2287                let last_used = c
2288                    .get("last_used_at")
2289                    .and_then(Value::as_str)
2290                    .unwrap_or(&now_iso);
2291                let days_idle = iso_days_diff(&now_iso, last_used);
2292                if days_idle <= 0 {
2293                    continue;
2294                }
2295                let floor = 0.3_f64;
2296                let new_conf = floor + (conf - floor) * 0.5_f64.powf(days_idle as f64 / 90.0);
2297                let new_conf = new_conf.clamp(floor, 1.0);
2298                if (new_conf - conf).abs() > 0.001 {
2299                    let note = format!("decay:{days_idle}d");
2300                    self.storage
2301                        .update_chunk_confidence(id, new_conf, Some(&note), &now_iso)?;
2302                    report.decayed.push(id.to_string());
2303                }
2304            }
2305
2306            // ── 6. Promote: pending → active when three-guard criteria met ──
2307            let promotable = self.storage.query_chunks_params(
2308                "SELECT id FROM chunks
2309                 WHERE state='pending' AND origin!='spark'
2310                   AND used_success_count >= ?
2311                   AND success_trace_ids_count >= 2
2312                   AND confidence >= ?
2313                   AND (? IS NULL OR origin=?)
2314                   AND (? IS NULL OR skill_name=?)",
2315                rusqlite::params![
2316                    self.promote_used_success_min,
2317                    self.promote_confidence_min,
2318                    scope_origin,
2319                    scope_origin,
2320                    scope_skill,
2321                    scope_skill
2322                ],
2323            )?;
2324            for c in &promotable {
2325                if let Some(id) = c.get("id").and_then(Value::as_str) {
2326                    self.storage.update_chunk_state(
2327                        id,
2328                        "active",
2329                        Some("repeated_success"),
2330                        &now_iso,
2331                    )?;
2332                }
2333            }
2334
2335            // ── 7. Cycle/orphan detection (report only, no auto-fix) ──
2336            let all_deps = self
2337                .storage
2338                .query_chunks("SELECT src, dst FROM deps WHERE kind='hard'")?;
2339            let cycles = detect_cycles(&all_deps);
2340            report.cycles = cycles;
2341            let orphan_rows = self.storage.query_chunks_params(
2342                "SELECT d.src, d.dst, s.id AS src_exists, t.id AS dst_exists
2343                 FROM deps d
2344                 LEFT JOIN chunks s ON s.id=d.src
2345                 LEFT JOIN chunks t ON t.id=d.dst
2346                 WHERE d.kind='hard'
2347                   AND (? IS NULL OR s.origin=?)
2348                   AND (? IS NULL OR s.skill_name=?)",
2349                rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
2350            )?;
2351            let mut orphans = HashSet::new();
2352            for row in orphan_rows {
2353                if row.get("src_exists").is_none_or(Value::is_null) {
2354                    if let Some(id) = row.get("src").and_then(Value::as_str) {
2355                        orphans.insert(id.to_string());
2356                    }
2357                }
2358                if row.get("dst_exists").is_none_or(Value::is_null) {
2359                    if let Some(id) = row.get("dst").and_then(Value::as_str) {
2360                        orphans.insert(id.to_string());
2361                    }
2362                }
2363            }
2364            report.orphans = orphans.into_iter().collect();
2365            report.orphans.sort();
2366
2367            self.storage.commit()
2368        })();
2369        if gov_result.is_err() {
2370            let _ = self.storage.rollback();
2371            gov_result?;
2372        }
2373
2374        // ── Step 8: purge_old_logs (physical delete of terminal episodic_log rows >30d) ──
2375        self.storage.begin_immediate()?;
2376        let purge_cutoff = days_ago(&now_iso, 30);
2377        let purge_result = self
2378            .storage
2379            .conn_execute(
2380                "DELETE FROM episodic_log
2381             WHERE distill_state IN ('distilled','discarded','failed')
2382               AND ts < ?",
2383                rusqlite::params![purge_cutoff],
2384            )
2385            .and_then(|_| self.storage.commit());
2386        if purge_result.is_err() {
2387            let _ = self.storage.rollback();
2388            purge_result?;
2389        }
2390
2391        Ok(report)
2392    }
2393
2394    // ------------------------------------------------------------------
2395    // Public API 8: inspect
2396    // ------------------------------------------------------------------
2397
2398    pub fn inspect(&self) -> Result<Value> {
2399        let total: i64 = count_query(
2400            &self.storage,
2401            "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
2402        )?;
2403        let active: i64 = count_query(
2404            &self.storage,
2405            "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
2406        )?;
2407        let pending: i64 = count_query(
2408            &self.storage,
2409            "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
2410        )?;
2411        let archived: i64 = count_query(
2412            &self.storage,
2413            "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
2414        )?;
2415        let sparks: i64 = count_query(
2416            &self.storage,
2417            "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
2418        )?;
2419        let open_logs: i64 = count_query(
2420            &self.storage,
2421            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
2422        )?;
2423        let new_logs: i64 = count_query(
2424            &self.storage,
2425            "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2426        )?;
2427        let embed_rebuild: i64 = count_query(&self.storage,
2428            "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
2429        let schema_version = self.storage.get_meta_or("schema_version", "?");
2430        let lib_id = self.storage.get_meta_or("lib_id", "?");
2431        let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
2432
2433        // Health signal 1: knowledge debt ratio
2434        // Zombie = active, confidence in [0.4, 0.6], created > 7d ago, non-spark
2435        let zombie: i64 = count_query(
2436            &self.storage,
2437            "SELECT COUNT(*) FROM chunks
2438             WHERE origin!='spark' AND state='active'
2439               AND confidence >= 0.4 AND confidence <= 0.6
2440               AND created_at < datetime('now','-7 days')",
2441        )?;
2442        let debt_numerator = pending + zombie;
2443        let debt_denominator = active.max(1);
2444        let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
2445
2446        // Health signal 5: stale screening count
2447        let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
2448        let stale_screening: i64 = count_query_params(
2449            &self.storage,
2450            "SELECT COUNT(*) FROM episodic_log
2451             WHERE distill_state='screening' AND distill_locked_at < ?",
2452            rusqlite::params![screening_cutoff],
2453        )?;
2454
2455        // Health signal 4: distill cost estimate from retained logs.
2456        let distill_cost = self.storage.query_chunks(
2457            "SELECT COALESCE(SUM(distill_prompt_tokens),0) AS pt,
2458                    COALESCE(SUM(distill_completion_tokens),0) AS ct
2459             FROM episodic_log",
2460        )?;
2461        let prompt_tokens = distill_cost
2462            .first()
2463            .and_then(|r| r.get("pt"))
2464            .and_then(Value::as_i64)
2465            .unwrap_or(0);
2466        let completion_tokens = distill_cost
2467            .first()
2468            .and_then(|r| r.get("ct"))
2469            .and_then(Value::as_i64)
2470            .unwrap_or(0);
2471
2472        // Health signal 2: sparks that have been recalled often (soft incubation threshold = 5)
2473        let spark_threshold: i64 = self
2474            .storage
2475            .get_meta("curate.soft_mature_threshold")
2476            .ok()
2477            .flatten()
2478            .and_then(|v| v.parse::<i64>().ok())
2479            .unwrap_or(5);
2480        let recurring_sparks = self.storage.query_chunks_params(
2481            "SELECT ut.chunk_id, COUNT(*) AS cnt,
2482                    c.content, c.trigger_desc, c.maturity
2483             FROM usage_trace ut
2484             JOIN chunks c ON c.id = ut.chunk_id
2485             WHERE ut.event='retrieved'
2486               AND c.origin='spark'
2487             GROUP BY ut.chunk_id HAVING cnt >= ?",
2488            rusqlite::params![spark_threshold],
2489        )?;
2490        let recurring_spark_ids: Vec<Value> = recurring_sparks
2491            .iter()
2492            .map(|r| {
2493                json!({
2494                    "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
2495                    "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
2496                    "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
2497                    "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
2498                        .chars().take(80).collect::<String>(),
2499                })
2500            })
2501            .collect();
2502
2503        let mut suggestions: Vec<Value> = Vec::new();
2504        if embed_rebuild > 0 {
2505            suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
2506        }
2507        if new_logs > 0 {
2508            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
2509        }
2510        if pending > 0 {
2511            suggestions.push(json!({"action": "innate approve <id>  # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
2512        }
2513        if !recurring_spark_ids.is_empty() {
2514            suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
2515        }
2516        if stale_screening > 0 {
2517            suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
2518        }
2519
2520        Ok(json!({
2521            "schema_version": schema_version,
2522            "lib_id": lib_id,
2523            "last_agg_ts": last_agg,
2524            "chunks": {"total": total, "active": active, "pending": pending, "archived": archived},
2525            "sparks": sparks,
2526            "episodic_log": {"open": open_logs, "new": new_logs},
2527            "embed_rebuild_queue": embed_rebuild,
2528            "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
2529            "stale_screening_count": stale_screening,
2530            "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
2531            "recurring_sparks": recurring_sparks.len(),
2532            "recurring_spark_ids": recurring_spark_ids,
2533            "params": {
2534                "recall.w_content": self.w_content,
2535                "recall.w_trigger": self.w_trigger,
2536                "recall.top_k_candidates": self.top_k_candidates,
2537                "curate.low_conf_threshold": self.low_conf_threshold,
2538                "curate.low_conf_idle_days": self.low_conf_idle_days,
2539                "curate.repeat_select_min": self.repeat_select_min,
2540                "curate.never_used_age_days": self.never_used_age_days,
2541                "curate.promote_used_success_min": self.promote_used_success_min,
2542                "curate.promote_confidence_min": self.promote_confidence_min,
2543                "curate.screening_timeout_minutes": self.screening_timeout_minutes,
2544                "curate.open_ttl_days": self.open_ttl_days,
2545            },
2546            "suggestions": suggestions
2547        }))
2548    }
2549
2550    // ------------------------------------------------------------------
2551    // Public: rebuild_embeddings (evolve --rebuild-embeddings)
2552    // ------------------------------------------------------------------
2553
2554    pub fn rebuild_embeddings(&self) -> Result<usize> {
2555        let meta_version = self
2556            .storage
2557            .get_meta("embed_version")?
2558            .and_then(|v| v.parse::<i64>().ok())
2559            .unwrap_or(1);
2560        // Fetch chunks with embed_version=0 (failed writes) or below current meta version.
2561        let stale = self.storage.query_chunks_params(
2562            "SELECT id, content, trigger_desc, state_reason FROM chunks
2563             WHERE embed_version = 0 OR embed_version < ?",
2564            rusqlite::params![meta_version],
2565        )?;
2566        let mut count = 0;
2567        for row in &stale {
2568            let id = match row.get("id").and_then(Value::as_str) {
2569                Some(v) => v,
2570                None => continue,
2571            };
2572            let content = row.get("content").and_then(Value::as_str).unwrap_or("");
2573            let trigger = row
2574                .get("trigger_desc")
2575                .and_then(Value::as_str)
2576                .unwrap_or(content);
2577            let state_reason = row
2578                .get("state_reason")
2579                .and_then(Value::as_str)
2580                .unwrap_or("");
2581
2582            let cvec = match self.embedding.embed_content(content) {
2583                Ok(v) => v,
2584                Err(_) => continue,
2585            };
2586            let tvec = match self.embedding.embed_trigger(trigger) {
2587                Ok(v) => v,
2588                Err(_) => continue,
2589            };
2590
2591            self.storage.begin_immediate()?;
2592            let r = (|| -> Result<()> {
2593                self.storage
2594                    .insert_vec_content(id, &pack_embedding(&cvec))?;
2595                self.storage
2596                    .insert_vec_trigger(id, &pack_embedding(&tvec))?;
2597                // Restore intended state if encoded in state_reason.
2598                let new_reason = if state_reason.starts_with("embedding_pending:target=") {
2599                    let target_state = state_reason.trim_start_matches("embedding_pending:target=");
2600                    let now = utc_now_iso();
2601                    self.storage.update_chunk_state(
2602                        id,
2603                        target_state,
2604                        Some("embedding_rebuilt"),
2605                        &now,
2606                    )?;
2607                    "embedding_rebuilt".to_string()
2608                } else {
2609                    "embedding_rebuilt".to_string()
2610                };
2611                let now = utc_now_iso();
2612                self.storage.conn_execute(
2613                    "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
2614                    rusqlite::params![meta_version, new_reason, now, id],
2615                )?;
2616                self.storage.commit()
2617            })();
2618            if r.is_err() {
2619                let _ = self.storage.rollback();
2620            } else {
2621                count += 1;
2622            }
2623        }
2624        Ok(count)
2625    }
2626
2627    // ------------------------------------------------------------------
2628    // Public: inspect_id (inspect <chunk_id> or <trace_id>)
2629    // ------------------------------------------------------------------
2630
2631    pub fn inspect_id(&self, id: &str) -> Result<Value> {
2632        // Try as chunk_id first, then as trace_id.
2633        if let Some(chunk) = self.storage.get_chunk(id)? {
2634            let traces = self.storage.query_chunks_params(
2635                "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
2636                rusqlite::params![id],
2637            )?;
2638            let derived = self.storage.query_chunks_params(
2639                "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
2640                   SELECT id FROM episodic_log WHERE trace_id IN (
2641                     SELECT trace_id FROM usage_trace WHERE chunk_id=?
2642                   )
2643                 ) LIMIT 10",
2644                rusqlite::params![id],
2645            )?;
2646            return Ok(json!({
2647                "kind": "chunk",
2648                "chunk": chunk,
2649                "recent_traces": traces,
2650                "derived_chunks": derived,
2651            }));
2652        }
2653        // Try as trace_id.
2654        if let Some(log) = self.storage.get_episodic_log(id)? {
2655            let traces = self.storage.query_chunks_params(
2656                "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
2657                rusqlite::params![id],
2658            )?;
2659            return Ok(json!({
2660                "kind": "trace",
2661                "episodic_log": log,
2662                "usage_traces": traces,
2663            }));
2664        }
2665        Err(InnateError::ChunkNotFound(id.to_string()))
2666    }
2667
2668    // ------------------------------------------------------------------
2669    // Sanitize
2670    // ------------------------------------------------------------------
2671
2672    fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
2673        self.sanitizer.sanitize(content)
2674    }
2675}
2676
2677// ---------------------------------------------------------------------------
2678// Helpers
2679// ---------------------------------------------------------------------------
2680
2681struct CandidateInfo {
2682    chunk: Value,
2683    sim_content: f32,
2684    sim_trigger: f32,
2685}
2686
2687fn chunk_is_valid_for_recall(chunk: &Value, embed_version: i64) -> bool {
2688    chunk.get("state").and_then(Value::as_str) != Some("archived")
2689        && chunk.get("origin").and_then(Value::as_str) != Some("spark")
2690        && chunk
2691            .get("embed_version")
2692            .and_then(Value::as_i64)
2693            .unwrap_or(1)
2694            >= embed_version
2695}
2696
2697fn estimate_distill_prompt_tokens(log: &Value) -> i64 {
2698    [
2699        "query",
2700        "recall_snapshot",
2701        "output",
2702        "output_summary",
2703        "nomination",
2704    ]
2705    .iter()
2706    .filter_map(|key| log.get(*key).and_then(Value::as_str))
2707    .map(|text| estimate_tokens(text) as i64)
2708    .sum()
2709}
2710
2711fn estimate_distilled_chunk_tokens(chunk: &DistilledChunk) -> i64 {
2712    estimate_tokens(&chunk.content) as i64
2713        + chunk
2714            .trigger_desc
2715            .as_deref()
2716            .map(estimate_tokens)
2717            .unwrap_or(0) as i64
2718        + chunk
2719            .anti_trigger_desc
2720            .as_deref()
2721            .map(estimate_tokens)
2722            .unwrap_or(0) as i64
2723}
2724
2725fn anti_trigger_hit(query: &str, anti: &str) -> bool {
2726    let q_lower = query.to_lowercase();
2727    anti.to_lowercase().split(',').any(|part| {
2728        let p = part.trim();
2729        !p.is_empty() && q_lower.contains(p)
2730    })
2731}
2732
2733fn block_cost(block: &[Value]) -> usize {
2734    block
2735        .iter()
2736        .map(|b| {
2737            b.get("token_count")
2738                .and_then(Value::as_u64)
2739                .map(|t| t as usize)
2740                .unwrap_or_else(|| {
2741                    estimate_tokens(b.get("content").and_then(Value::as_str).unwrap_or("")).max(100)
2742                })
2743        })
2744        .sum()
2745}
2746
2747fn limit_knowledge(knowledge: Vec<Value>, top: Option<usize>) -> Vec<Value> {
2748    match top {
2749        None => knowledge,
2750        Some(0) => vec![],
2751        Some(n) => knowledge.into_iter().take(n).collect(),
2752    }
2753}
2754
2755fn validate_source(source: &str) -> Result<()> {
2756    if !matches!(source, "sdk" | "cli" | "hook" | "daemon" | "augmented") {
2757        return Err(InnateError::InvalidState(format!(
2758            "invalid event source: {source}"
2759        )));
2760    }
2761    Ok(())
2762}
2763
2764fn count_query(storage: &Storage, sql: &str) -> Result<i64> {
2765    Ok(storage
2766        .query_chunks(sql)?
2767        .first()
2768        .and_then(|r| r.as_object())
2769        .and_then(|m| m.values().next())
2770        .and_then(Value::as_i64)
2771        .unwrap_or(0))
2772}
2773
2774fn count_query_params<P: rusqlite::Params>(storage: &Storage, sql: &str, p: P) -> Result<i64> {
2775    Ok(storage
2776        .query_chunks_params(sql, p)?
2777        .first()
2778        .and_then(|r| r.as_object())
2779        .and_then(|m| m.values().next())
2780        .and_then(Value::as_i64)
2781        .unwrap_or(0))
2782}
2783
2784fn days_ago(now_iso: &str, days: i64) -> String {
2785    use chrono::{DateTime, Duration, Utc};
2786    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
2787        let cutoff = t - Duration::days(days);
2788        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
2789    }
2790    now_iso.to_string()
2791}
2792
2793fn minutes_ago(now_iso: &str, minutes: i64) -> String {
2794    use chrono::{DateTime, Duration, Utc};
2795    if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
2796        let cutoff = t - Duration::minutes(minutes);
2797        return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
2798    }
2799    now_iso.to_string()
2800}
2801
2802/// Return the number of whole days between two ISO timestamps (now - past; clamped ≥ 0).
2803fn iso_days_diff(now_iso: &str, past_iso: &str) -> i64 {
2804    use chrono::{DateTime, Utc};
2805    let parse = |s: &str| s.parse::<DateTime<Utc>>().ok();
2806    if let (Some(a), Some(b)) = (parse(now_iso), parse(past_iso)) {
2807        let diff = a - b;
2808        diff.num_days().max(0)
2809    } else {
2810        0
2811    }
2812}
2813
2814/// DFS-based cycle detection on the hard-dep graph. Returns list of cycles (each is a Vec of ids).
2815fn detect_cycles(deps: &[Value]) -> Vec<Vec<String>> {
2816    use std::collections::HashMap;
2817    let mut adj: HashMap<String, Vec<String>> = HashMap::new();
2818    for d in deps {
2819        let src = d
2820            .get("src")
2821            .and_then(Value::as_str)
2822            .unwrap_or("")
2823            .to_string();
2824        let dst = d
2825            .get("dst")
2826            .and_then(Value::as_str)
2827            .unwrap_or("")
2828            .to_string();
2829        if !src.is_empty() && !dst.is_empty() {
2830            adj.entry(src).or_default().push(dst);
2831        }
2832    }
2833    let nodes: Vec<String> = adj.keys().cloned().collect();
2834    let mut visited: HashSet<String> = HashSet::new();
2835    let mut on_stack: HashSet<String> = HashSet::new();
2836    let mut cycles: Vec<Vec<String>> = vec![];
2837
2838    fn dfs(
2839        node: &str,
2840        adj: &HashMap<String, Vec<String>>,
2841        visited: &mut HashSet<String>,
2842        on_stack: &mut HashSet<String>,
2843        path: &mut Vec<String>,
2844        cycles: &mut Vec<Vec<String>>,
2845    ) {
2846        if on_stack.contains(node) {
2847            // Found cycle — extract loop segment.
2848            let start = path.iter().position(|n| n == node).unwrap_or(0);
2849            cycles.push(path[start..].to_vec());
2850            return;
2851        }
2852        if visited.contains(node) {
2853            return;
2854        }
2855        visited.insert(node.to_string());
2856        on_stack.insert(node.to_string());
2857        path.push(node.to_string());
2858        if let Some(children) = adj.get(node) {
2859            for child in children {
2860                dfs(child, adj, visited, on_stack, path, cycles);
2861            }
2862        }
2863        path.pop();
2864        on_stack.remove(node);
2865    }
2866
2867    for node in nodes {
2868        let mut path = vec![];
2869        dfs(
2870            &node,
2871            &adj,
2872            &mut visited,
2873            &mut on_stack,
2874            &mut path,
2875            &mut cycles,
2876        );
2877    }
2878    cycles
2879}