1type PackResult = (
5 Vec<Value>,
6 Vec<(Vec<Value>, f64, usize)>,
7 std::collections::HashMap<String, String>,
8);
9
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::embedding::{DummyEmbeddingProvider, EmbeddingProvider};
17use crate::errors::{InnateError, Result};
18use crate::refine::{
19 DefaultSanitizer, DistilledChunk, Distiller, HeuristicDistiller, NullRefiner, Refiner,
20 Sanitizer,
21};
22use crate::storage::{ChunkRow, EpisodicLogRow, Storage};
23use crate::utils::{
24 content_hash, estimate_tokens, gen_uuid, pack_embedding, utc_now_iso, SanitizeAction,
25};
26
27const W_CONTENT: f64 = 0.55;
32const W_TRIGGER: f64 = 0.25;
33const W_CONFIDENCE: f64 = 0.10;
34const W_CONTEXT: f64 = 0.15;
35const TOP_K_CANDIDATES: usize = 20;
36const ANTI_TRIGGER_PENALTY: f64 = 0.6;
37const DENSITY_REFILL: bool = true;
38
39const LOW_CONF_THRESHOLD: f64 = 0.25;
40const LOW_CONF_IDLE_DAYS: i64 = 60;
41const REPEAT_SELECT_MIN: i64 = 10;
42const REPEAT_SELECT_CONF_MAX: f64 = 0.5;
43const NEVER_USED_AGE_DAYS: i64 = 30;
44const OPEN_TTL_DAYS: i64 = 14;
45const SCREENING_TIMEOUT_MINUTES: i64 = 30;
46const PROMOTE_USED_SUCCESS_MIN: i64 = 3;
47const PROMOTE_CONFIDENCE_MIN: f64 = 0.60;
48const DECAY_FLOOR: f64 = 0.20;
49const EVOLVE_THRESHOLD: i64 = 5;
50const DISTILL_BATCH_SIZE: usize = 20;
51const PENDING_RECALL_PENALTY: f64 = 0.60;
52const GOVERNANCE_ARCHIVE_THRESHOLD: i64 = 3;
53const NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD: i64 = 5;
54const GOVERNANCE_EVOLVE_THRESHOLD: i64 = 3;
55const FAILURE_MIN_USES: i64 = 5;
56const FAILURE_MAX_SUCCESS_RATE: f64 = 0.20;
57const FAILURE_CONFIDENCE_MAX: f64 = 0.35;
58
59#[derive(Debug, Default, Clone)]
64pub struct RecallResult {
65 pub knowledge: Vec<Value>,
66 pub sparks: Vec<Value>,
67 pub trace_id: String,
68 pub empty: bool,
69 pub depth_skipped: Vec<String>,
70 pub skipped_reasons: HashMap<String, String>,
71}
72
73#[derive(Debug, Default)]
74pub struct CurateReport {
75 pub archived: Vec<String>,
76 pub deduped: Vec<String>,
77 pub decayed: Vec<String>,
78 pub cycles: Vec<Vec<String>>,
79 pub orphans: Vec<String>,
80 pub recovered: Vec<String>,
81 pub warnings: Vec<String>,
82 pub stats: HashMap<String, Value>,
83}
84
85#[derive(Debug, Default)]
86struct DistillBatchReport {
87 distilled: usize,
88 failed: usize,
89}
90
91#[derive(Debug, Default, Clone)]
93pub struct CurateScope {
94 pub origin: Option<String>,
96 pub skill_name: Option<String>,
98 pub dry_run: bool,
100}
101
102pub trait Curator: Send + Sync {
105 fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport>;
106}
107
108pub struct BuiltinCurator;
110
111impl Curator for BuiltinCurator {
112 fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport> {
113 kb.builtin_curate_impl(scope)
114 }
115}
116
117pub struct KnowledgeBase {
122 pub storage: Storage,
123 embedding: Arc<dyn EmbeddingProvider>,
124 refiner: Arc<dyn Refiner>,
125 distiller: Arc<dyn Distiller>,
126 curator: Arc<dyn Curator>,
127 sanitizer: Arc<dyn Sanitizer>,
128
129 w_content: f64,
131 w_trigger: f64,
132 w_confidence: f64,
133 w_context: f64,
134 top_k_candidates: usize,
135 anti_trigger_penalty: f64,
136 density_refill: bool,
137
138 low_conf_threshold: f64,
139 low_conf_idle_days: i64,
140 repeat_select_min: i64,
141 repeat_select_conf_max: f64,
142 never_used_age_days: i64,
143 open_ttl_days: i64,
144 screening_timeout_minutes: i64,
145 promote_used_success_min: i64,
146 promote_confidence_min: f64,
147 decay_floor: f64,
148 evolve_threshold: i64,
149 distill_batch_size: usize,
150 evolve_schedule_interval_hours: i64,
151 governance_archive_threshold: i64,
152 negative_feedback_archive_threshold: i64,
153 governance_evolve_threshold: i64,
154 governance_proposal_max_age_days: i64,
155 failure_min_uses: i64,
156 failure_max_success_rate: f64,
157 failure_confidence_max: f64,
158}
159
160impl KnowledgeBase {
161 pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
162 Self::open_with(db_path, None, None, None, None, None)
163 }
164
165 pub fn open_with(
166 db_path: impl AsRef<Path>,
167 embedding: Option<Arc<dyn EmbeddingProvider>>,
168 refiner: Option<Arc<dyn Refiner>>,
169 distiller: Option<Arc<dyn Distiller>>,
170 curator: Option<Arc<dyn Curator>>,
171 sanitizer: Option<Arc<dyn Sanitizer>>,
172 ) -> Result<Self> {
173 let embedding = embedding.unwrap_or_else(|| Arc::new(DummyEmbeddingProvider::default()));
174 let refiner = refiner.unwrap_or_else(|| Arc::new(NullRefiner));
175 let distiller = distiller.unwrap_or_else(|| Arc::new(HeuristicDistiller));
176 let curator = curator.unwrap_or_else(|| Arc::new(BuiltinCurator));
177 let sanitizer = sanitizer.unwrap_or_else(|| Arc::new(DefaultSanitizer));
178
179 let storage = Storage::open(db_path, embedding.content_dim(), embedding.trigger_dim())?;
180
181 let mut kb = Self {
182 storage,
183 embedding,
184 refiner,
185 distiller,
186 curator,
187 sanitizer,
188 w_content: W_CONTENT,
189 w_trigger: W_TRIGGER,
190 w_confidence: W_CONFIDENCE,
191 w_context: W_CONTEXT,
192 top_k_candidates: TOP_K_CANDIDATES,
193 anti_trigger_penalty: ANTI_TRIGGER_PENALTY,
194 density_refill: DENSITY_REFILL,
195 low_conf_threshold: LOW_CONF_THRESHOLD,
196 low_conf_idle_days: LOW_CONF_IDLE_DAYS,
197 repeat_select_min: REPEAT_SELECT_MIN,
198 repeat_select_conf_max: REPEAT_SELECT_CONF_MAX,
199 never_used_age_days: NEVER_USED_AGE_DAYS,
200 open_ttl_days: OPEN_TTL_DAYS,
201 screening_timeout_minutes: SCREENING_TIMEOUT_MINUTES,
202 promote_used_success_min: PROMOTE_USED_SUCCESS_MIN,
203 promote_confidence_min: PROMOTE_CONFIDENCE_MIN,
204 decay_floor: DECAY_FLOOR,
205 evolve_threshold: EVOLVE_THRESHOLD,
206 distill_batch_size: DISTILL_BATCH_SIZE,
207 evolve_schedule_interval_hours: 6,
208 governance_archive_threshold: GOVERNANCE_ARCHIVE_THRESHOLD,
209 negative_feedback_archive_threshold: NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
210 governance_evolve_threshold: GOVERNANCE_EVOLVE_THRESHOLD,
211 governance_proposal_max_age_days: 30,
212 failure_min_uses: FAILURE_MIN_USES,
213 failure_max_success_rate: FAILURE_MAX_SUCCESS_RATE,
214 failure_confidence_max: FAILURE_CONFIDENCE_MAX,
215 };
216 kb.init_meta()?;
217 kb.load_params()?;
218 Ok(kb)
219 }
220
221 fn init_meta(&self) -> Result<()> {
222 let lib_id = gen_uuid();
223 let content_dim = self.embedding.content_dim().to_string();
224 let trigger_dim = self.embedding.trigger_dim().to_string();
225 let embed_model = self.embedding.model_name();
226
227 for (key, expected) in [
228 ("content_dim", self.embedding.content_dim()),
229 ("trigger_dim", self.embedding.trigger_dim()),
230 ] {
231 if let Some(stored) = self.storage.get_meta(key)? {
232 let actual = stored.parse::<usize>().map_err(|_| {
233 InnateError::Other(format!("invalid {key} metadata value: {stored}"))
234 })?;
235 if actual != expected {
236 return Err(InnateError::Other(format!(
237 "{key} mismatch: database uses {actual}, embedding provider uses {expected}"
238 )));
239 }
240 }
241 }
242
243 let defaults: &[(&str, &str)] = &[
244 ("lib_id", &lib_id),
245 ("lib_role", "personal"),
246 ("schema_version", "4.14"),
247 ("content_dim", &content_dim),
248 ("trigger_dim", &trigger_dim),
249 ("embed_model", embed_model),
250 ("embed_version", "1"),
251 ("vector_revision", "0"),
252 ("last_agg_ts", "1970-01-01T00:00:00.000Z"),
253 ("recall.w_content", "0.55"),
254 ("recall.w_trigger", "0.25"),
255 ("recall.w_confidence", "0.10"),
256 ("recall.w_context", "0.15"),
257 ("recall.top_k_candidates", "20"),
258 ("recall.anti_trigger_penalty", "0.6"),
259 ("recall.density_refill", "true"),
260 ("curate.low_conf_threshold", "0.25"),
261 ("curate.low_conf_idle_days", "60"),
262 ("curate.repeat_select_min", "10"),
263 ("curate.repeat_select_conf_max", "0.5"),
264 ("curate.never_used_age_days", "30"),
265 ("curate.open_ttl_days", "14"),
266 ("curate.screening_timeout_minutes", "30"),
267 ("curate.promote_used_success_min", "3"),
268 ("curate.promote_confidence_min", "0.60"),
269 ("curate.decay_floor", "0.20"),
270 ("evolve.threshold_new_count", "5"),
271 ("evolve.distill_batch_size", "20"),
272 ("evolve.schedule_interval_hours", "6"),
273 ("curate.soft_mature_threshold", "5"),
274 ("evolve.distill_token_window_hours", "24"),
275 ("curate.governance_archive_threshold", "3"),
276 ("curate.negative_feedback_archive_threshold", "5"),
277 ("evolve.governance_pending_threshold", "3"),
278 ("curate.governance_proposal_max_age_days", "30"),
279 ("curate.failure_min_uses", "5"),
280 ("curate.failure_max_success_rate", "0.20"),
281 ("curate.failure_confidence_max", "0.35"),
282 ];
283 self.storage.begin_immediate()?;
284 let result = (|| -> Result<()> {
285 for (k, v) in defaults {
286 if self.storage.get_meta(k)?.is_none() {
287 self.storage.set_meta(k, v)?;
288 }
289 }
290 self.storage.commit()
291 })();
292 if result.is_err() {
293 let _ = self.storage.rollback();
294 }
295 result
296 }
297
298 fn load_params(&mut self) -> Result<()> {
299 let f = |k: &str, d: f64| -> f64 {
300 self.storage
301 .get_meta(k)
302 .ok()
303 .flatten()
304 .and_then(|v| v.parse().ok())
305 .unwrap_or(d)
306 };
307 let i = |k: &str, d: i64| -> i64 {
308 self.storage
309 .get_meta(k)
310 .ok()
311 .flatten()
312 .and_then(|v| v.parse().ok())
313 .unwrap_or(d)
314 };
315 let b = |k: &str, d: bool| -> bool {
316 self.storage
317 .get_meta(k)
318 .ok()
319 .flatten()
320 .map(|v| v.to_lowercase() == "true")
321 .unwrap_or(d)
322 };
323 self.w_content = f("recall.w_content", W_CONTENT);
324 self.w_trigger = f("recall.w_trigger", W_TRIGGER);
325 self.w_confidence = f("recall.w_confidence", W_CONFIDENCE);
326 self.w_context = f("recall.w_context", W_CONTEXT);
327 self.top_k_candidates =
328 i("recall.top_k_candidates", TOP_K_CANDIDATES as i64).max(1) as usize;
329 self.anti_trigger_penalty = f("recall.anti_trigger_penalty", ANTI_TRIGGER_PENALTY);
330 self.density_refill = b("recall.density_refill", DENSITY_REFILL);
331 self.low_conf_threshold = f("curate.low_conf_threshold", LOW_CONF_THRESHOLD);
332 self.low_conf_idle_days = i("curate.low_conf_idle_days", LOW_CONF_IDLE_DAYS);
333 self.repeat_select_min = i("curate.repeat_select_min", REPEAT_SELECT_MIN);
334 self.repeat_select_conf_max = f("curate.repeat_select_conf_max", REPEAT_SELECT_CONF_MAX);
335 self.never_used_age_days = i("curate.never_used_age_days", NEVER_USED_AGE_DAYS);
336 self.open_ttl_days = i("curate.open_ttl_days", OPEN_TTL_DAYS);
337 self.screening_timeout_minutes = i(
338 "curate.screening_timeout_minutes",
339 SCREENING_TIMEOUT_MINUTES,
340 );
341 self.promote_used_success_min =
342 i("curate.promote_used_success_min", PROMOTE_USED_SUCCESS_MIN);
343 self.promote_confidence_min = f("curate.promote_confidence_min", PROMOTE_CONFIDENCE_MIN);
344 self.decay_floor = f("curate.decay_floor", DECAY_FLOOR).clamp(0.0, 0.4);
345 self.evolve_threshold = i("evolve.threshold_new_count", EVOLVE_THRESHOLD);
346 self.distill_batch_size =
347 i("evolve.distill_batch_size", DISTILL_BATCH_SIZE as i64) as usize;
348 self.evolve_schedule_interval_hours = i("evolve.schedule_interval_hours", 6).max(1);
349 self.governance_archive_threshold =
350 i("curate.governance_archive_threshold", GOVERNANCE_ARCHIVE_THRESHOLD).max(1);
351 self.negative_feedback_archive_threshold = i(
352 "curate.negative_feedback_archive_threshold",
353 NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
354 )
355 .max(1);
356 self.governance_evolve_threshold =
357 i("evolve.governance_pending_threshold", GOVERNANCE_EVOLVE_THRESHOLD).max(1);
358 self.governance_proposal_max_age_days =
359 i("curate.governance_proposal_max_age_days", 30).max(1);
360 self.failure_min_uses = i("curate.failure_min_uses", FAILURE_MIN_USES).max(1);
361 self.failure_max_success_rate = f(
362 "curate.failure_max_success_rate",
363 FAILURE_MAX_SUCCESS_RATE,
364 )
365 .clamp(0.0, 1.0);
366 self.failure_confidence_max =
367 f("curate.failure_confidence_max", FAILURE_CONFIDENCE_MAX).clamp(0.0, 1.0);
368 Ok(())
369 }
370
371 #[allow(clippy::too_many_arguments)]
376 pub fn recall(
377 &self,
378 query: &str,
379 budget: usize,
380 trace: bool,
381 include_sparks: bool,
382 top: Option<usize>,
383 source: &str,
384 expand_deps: &str, allow_trim: bool, refine_mode: &str, ) -> Result<RecallResult> {
388 validate_source(source)?;
389 let trace_id = gen_uuid();
390 let now = utc_now_iso();
391
392 let q_content = self
393 .embedding
394 .embed_content(query)
395 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
396 let q_trigger = self
397 .embedding
398 .embed_trigger(query)
399 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
400
401 let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
403 self.apply_soft_dep_bonus(&mut candidates)?;
404
405 let scored = self.score_candidates(candidates, query)?;
407
408 let (selected, skipped, skipped_reasons) =
410 self.pack(&scored, budget, expand_deps, allow_trim, query)?;
411
412 let depth_skipped: Vec<String> = skipped_reasons
413 .iter()
414 .filter(|(_, r)| r.as_str() == "dep_depth_limit")
415 .map(|(id, _)| id.clone())
416 .collect();
417
418 let mut selected = selected;
420 if self.density_refill {
421 selected = self.density_refill(selected, &skipped, budget);
422 }
423
424 let limited = limit_knowledge(selected, top);
425 let visible = if refine_mode == "adapt" {
426 self.refiner
427 .refine(limited.clone(), Some(budget))
428 .unwrap_or(limited)
429 } else {
430 limited
431 };
432
433 let sparks = if include_sparks {
435 self.recall_sparks(&q_content, &q_trigger)?
436 } else {
437 vec![]
438 };
439
440 if trace {
441 self.write_recall_trace(
442 &trace_id,
443 query,
444 &scored,
445 &visible,
446 &sparks,
447 &depth_skipped,
448 &skipped_reasons,
449 refine_mode,
450 source,
451 &now,
452 )?;
453 }
454
455 let empty = visible.is_empty() && sparks.is_empty();
456 Ok(RecallResult {
457 knowledge: visible,
458 sparks,
459 trace_id,
460 empty,
461 depth_skipped,
462 skipped_reasons,
463 })
464 }
465
466 fn ann_candidates(
467 &self,
468 q_content: &[f32],
469 q_trigger: &[f32],
470 ) -> Result<HashMap<String, CandidateInfo>> {
471 let embed_version = self
472 .storage
473 .get_meta("embed_version")?
474 .and_then(|v| v.parse::<i64>().ok())
475 .unwrap_or(1);
476
477 let content_res = self
478 .storage
479 .search_vec_content(q_content, self.top_k_candidates * 2)?;
480 let trigger_res = self
481 .storage
482 .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
483
484 let all_ids: Vec<&str> = {
486 let mut seen = HashSet::new();
487 content_res
488 .iter()
489 .chain(trigger_res.iter())
490 .map(|(id, _)| id.as_str())
491 .filter(|id| seen.insert(*id))
492 .collect()
493 };
494 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
495
496 let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
497
498 for (cid, sim) in &content_res {
499 if let Some(chunk) = chunks.get(cid) {
500 if chunk_is_valid_for_recall(chunk, embed_version) {
501 let e = candidates
502 .entry(cid.clone())
503 .or_insert_with(|| CandidateInfo {
504 chunk: chunk.clone(),
505 sim_content: 0.0,
506 sim_trigger: 0.0,
507 });
508 e.sim_content = e.sim_content.max(*sim);
509 }
510 }
511 }
512 for (cid, sim) in &trigger_res {
513 if let Some(chunk) = chunks.get(cid) {
514 if chunk_is_valid_for_recall(chunk, embed_version) {
515 let e = candidates
516 .entry(cid.clone())
517 .or_insert_with(|| CandidateInfo {
518 chunk: chunk.clone(),
519 sim_content: 0.0,
520 sim_trigger: 0.0,
521 });
522 e.sim_trigger = e.sim_trigger.max(*sim);
523 }
524 }
525 }
526 Ok(candidates)
527 }
528
529 fn apply_soft_dep_bonus(&self, candidates: &mut HashMap<String, CandidateInfo>) -> Result<()> {
530 let ids: Vec<String> = candidates.keys().cloned().collect();
531 for cid in ids {
532 if candidates[&cid].chunk.get("origin").and_then(Value::as_str) == Some("spark") {
533 continue;
534 }
535 let deps = self.storage.get_deps(&cid)?;
536 for (dst, kind, _) in &deps {
537 if kind != "soft" {
538 continue;
539 }
540 if let Some(target) = self.storage.get_chunk(dst)? {
541 if target.get("state").and_then(Value::as_str) == Some("archived") {
542 continue;
543 }
544 if target.get("origin").and_then(Value::as_str) == Some("spark") {
545 continue;
546 }
547 let e = candidates
548 .entry(dst.clone())
549 .or_insert_with(|| CandidateInfo {
550 chunk: target,
551 sim_content: 0.0,
552 sim_trigger: 0.0,
553 });
554 e.sim_content = (e.sim_content + 0.05).min(1.0);
555 }
556 }
557 }
558 Ok(())
559 }
560
561 fn score_candidates(
562 &self,
563 candidates: HashMap<String, CandidateInfo>,
564 query: &str,
565 ) -> Result<Vec<(f64, Value)>> {
566 let context_key = content_hash(&normalize_query(query));
567 let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
568 for info in candidates.into_values() {
569 let conf = info
570 .chunk
571 .get("confidence")
572 .and_then(Value::as_f64)
573 .unwrap_or(0.5);
574 let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
575 let context_score = self.storage.context_score(chunk_id, &context_key)?;
576 let mut fused = self.w_content * info.sim_content as f64
577 + self.w_trigger * info.sim_trigger as f64
578 + self.w_confidence * conf
579 + self.w_context * context_score;
580 if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
581 fused *= PENDING_RECALL_PENALTY;
582 }
583 let anti = info
584 .chunk
585 .get("anti_trigger_desc")
586 .and_then(Value::as_str)
587 .unwrap_or("");
588 if !anti.is_empty() && anti_trigger_hit(query, anti) {
589 fused *= self.anti_trigger_penalty;
590 }
591 let mut chunk = info.chunk;
592 chunk["_context_score"] = json!(context_score);
593 chunk["_fused_score"] = json!(fused);
594 scored.push((fused, chunk));
595 }
596 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
597 scored.truncate(self.top_k_candidates);
598 Ok(scored)
599 }
600
601 fn pack(
602 &self,
603 scored: &[(f64, Value)],
604 budget: usize,
605 expand_deps: &str,
606 allow_trim: bool,
607 query: &str,
608 ) -> Result<PackResult> {
609 let mut selected: Vec<Value> = vec![];
610 let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
611 let mut skipped_reasons: HashMap<String, String> = HashMap::new();
612 let mut used_ids: HashSet<String> = HashSet::new();
613 let mut used_tokens: usize = 0;
614
615 for (fused, chunk) in scored {
616 let cid = chunk["id"].as_str().unwrap_or("").to_string();
617 if used_ids.contains(&cid) {
618 continue;
619 }
620
621 let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
623 if let Some(reason) = dep_skip_reason {
624 skipped_reasons.insert(cid, reason);
625 continue;
626 }
627
628 let new_block: Vec<Value> = block
629 .iter()
630 .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
631 .cloned()
632 .collect();
633 let cost = block_cost(&new_block);
634
635 if used_tokens + cost <= budget {
636 for b in &block {
637 let bid = b["id"].as_str().unwrap_or("").to_string();
638 if !used_ids.contains(&bid) {
639 let mut b = b.clone();
640 b["_fused_score"] = json!(fused);
641 selected.push(b);
642 used_ids.insert(bid);
643 }
644 }
645 used_tokens += cost;
646 } else if allow_trim {
647 if let Some(trimmed) =
649 self.refiner
650 .trim(&block, query, budget.saturating_sub(used_tokens))
651 {
652 let trim_cost = block_cost(&trimmed);
653 if used_tokens + trim_cost <= budget {
654 for b in &trimmed {
655 let bid = b["id"].as_str().unwrap_or("").to_string();
656 if !used_ids.contains(&bid) {
657 let mut b = b.clone();
658 b["_fused_score"] = json!(fused);
659 b["_trimmed"] = json!(true);
660 selected.push(b);
661 used_ids.insert(bid);
662 }
663 }
664 used_tokens += trim_cost;
665 continue;
666 }
667 }
668 skipped.push((block, *fused, cost));
669 } else {
670 skipped.push((block, *fused, cost));
671 }
672 }
673 Ok((selected, skipped, skipped_reasons))
674 }
675
676 fn build_dep_block(
679 &self,
680 seed: &Value,
681 expand_deps: &str,
682 ) -> Result<(Vec<Value>, Option<String>)> {
683 if expand_deps == "false" || expand_deps.is_empty() {
684 return Ok((vec![seed.clone()], None));
685 }
686 let seed_id = seed["id"].as_str().unwrap_or("");
687 match expand_deps {
688 "direct" => {
689 let deps = self.storage.get_deps(seed_id)?;
690 let mut block = vec![seed.clone()];
691 for (dep_id, kind, _) in &deps {
692 if kind != "hard" {
693 continue;
694 }
695 match self.validate_hard_dep(dep_id)? {
696 Some(chunk) => block.push(chunk),
697 None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
698 }
699 }
700 Ok((block, None))
701 }
702 "closure" => {
703 let mut block = vec![seed.clone()];
704 let mut visited: HashSet<String> = [seed_id.to_string()].into();
705 match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
706 Some(reason) => Ok((vec![], Some(reason))),
707 None => Ok((block, None)),
708 }
709 }
710 _ => Ok((vec![seed.clone()], None)),
711 }
712 }
713
714 fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
716 match self.storage.get_chunk(dep_id)? {
717 None => Ok(None),
718 Some(chunk) => {
719 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
720 let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
721 let embed_v = chunk
722 .get("embed_version")
723 .and_then(Value::as_i64)
724 .unwrap_or(0);
725 if state == "archived" || origin == "spark" || embed_v == 0 {
726 Ok(None)
727 } else {
728 Ok(Some(chunk))
729 }
730 }
731 }
732 }
733
734 fn expand_hard_closure(
736 &self,
737 id: &str,
738 visited: &mut HashSet<String>,
739 block: &mut Vec<Value>,
740 depth: usize,
741 max_depth: usize,
742 ) -> Result<Option<String>> {
743 if depth >= max_depth {
744 return Ok(Some("dep_depth_limit".to_string()));
745 }
746 let deps = self.storage.get_deps(id)?;
747 for (dep_id, kind, _) in &deps {
748 if kind != "hard" {
749 continue;
750 }
751 if visited.contains(dep_id) {
752 continue;
753 } visited.insert(dep_id.clone());
755 match self.validate_hard_dep(dep_id)? {
756 None => return Ok(Some("hard_dep_unavailable".to_string())),
757 Some(chunk) => {
758 block.push(chunk);
759 if let Some(reason) =
760 self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
761 {
762 return Ok(Some(reason));
763 }
764 }
765 }
766 }
767 Ok(None)
768 }
769
770 fn density_refill(
771 &self,
772 mut selected: Vec<Value>,
773 skipped: &[(Vec<Value>, f64, usize)],
774 budget: usize,
775 ) -> Vec<Value> {
776 let used_tokens = block_cost(&selected);
777 if used_tokens >= budget {
778 return selected;
779 }
780
781 let selected_ids: HashSet<String> = selected
782 .iter()
783 .filter_map(|c| c["id"].as_str().map(str::to_string))
784 .collect();
785
786 let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
787 .iter()
788 .filter_map(|(block, fscore, _)| {
789 let block: Vec<Value> = block
790 .iter()
791 .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
792 .cloned()
793 .collect();
794 if block.is_empty() {
795 return None;
796 }
797 let cost = block_cost(&block);
798 let density = fscore / cost.max(1) as f64;
799 Some((density, block, cost))
800 })
801 .collect();
802 density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
803
804 let mut used_tokens = block_cost(&selected);
805 let mut added_ids: HashSet<String> = selected_ids;
806 for (_, block, cost) in density_items {
807 if used_tokens + cost <= budget {
808 for b in block {
809 let bid = b["id"].as_str().unwrap_or("").to_string();
810 if !added_ids.contains(&bid) {
811 selected.push(b);
812 added_ids.insert(bid);
813 }
814 }
815 used_tokens += cost;
816 }
817 }
818 selected
819 }
820
821 fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
822 let embed_version = self
823 .storage
824 .get_meta("embed_version")?
825 .and_then(|v| v.parse::<i64>().ok())
826 .unwrap_or(1);
827
828 let content_res = self
829 .storage
830 .search_vec_content(q_content, self.top_k_candidates)?;
831 let trigger_res = self
832 .storage
833 .search_vec_trigger(q_trigger, self.top_k_candidates)?;
834
835 let all_ids: Vec<&str> = {
837 let mut seen = HashSet::new();
838 content_res
839 .iter()
840 .chain(trigger_res.iter())
841 .map(|(id, _)| id.as_str())
842 .filter(|id| seen.insert(*id))
843 .collect()
844 };
845 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
846
847 let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
848 for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
849 if let Some(chunk) = chunks.get(cid) {
850 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
851 continue;
852 }
853 if chunk.get("state").and_then(Value::as_str) == Some("archived") {
854 continue;
855 }
856 let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
857 if maturity == "promoted" || maturity == "dropped" {
858 continue;
859 }
860 let ev = chunk
861 .get("embed_version")
862 .and_then(Value::as_i64)
863 .unwrap_or(1);
864 if ev < embed_version {
865 continue;
866 }
867 let entry = spark_scores
868 .entry(cid.clone())
869 .or_insert_with(|| (*sim, chunk.clone()));
870 if *sim > entry.0 {
871 *entry = (*sim, chunk.clone());
872 }
873 }
874 }
875 let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
876 sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
877 Ok(sparks
878 .into_iter()
879 .take(self.top_k_candidates)
880 .map(|(_, c)| c)
881 .collect())
882 }
883
884 #[allow(clippy::too_many_arguments)]
885 fn write_recall_trace(
886 &self,
887 trace_id: &str,
888 query: &str,
889 scored: &[(f64, Value)],
890 visible: &[Value],
891 sparks: &[Value],
892 depth_skipped: &[String],
893 skipped_reasons: &HashMap<String, String>,
894 refine_mode: &str,
895 source: &str,
896 now: &str,
897 ) -> Result<()> {
898 let lib_id = self.storage.lib_id()?;
899 self.storage.begin_immediate()?;
900 let result = (|| -> Result<()> {
901 for (rank, (_, chunk)) in scored.iter().enumerate() {
902 let cid = chunk["id"].as_str().unwrap_or("");
903 let sim = chunk.get("_fused_score").and_then(Value::as_f64);
904 let rm = skipped_reasons
906 .get(cid)
907 .map(|r| format!("skipped:{r}"))
908 .or_else(|| {
909 if refine_mode != "off" && !refine_mode.is_empty() {
910 Some(refine_mode.to_string())
911 } else {
912 None
913 }
914 });
915 self.storage.insert_usage_trace(
916 trace_id,
917 Some(cid),
918 "retrieved",
919 1.0,
920 sim,
921 rm.as_deref(),
922 None,
923 Some((rank + 1) as i64),
924 None,
925 source,
926 now,
927 )?;
928 }
929 for (rank, chunk) in visible.iter().enumerate() {
930 let cid = chunk["id"].as_str().unwrap_or("");
931 self.storage.insert_usage_trace(
932 trace_id,
933 Some(cid),
934 "selected",
935 1.0,
936 None,
937 None,
938 None,
939 Some((rank + 1) as i64),
940 None,
941 source,
942 now,
943 )?;
944 if chunk
946 .get("_trimmed")
947 .and_then(Value::as_bool)
948 .unwrap_or(false)
949 {
950 self.storage.insert_usage_trace(
951 trace_id,
952 Some(cid),
953 "refined",
954 1.0,
955 None,
956 Some("trim"),
957 None,
958 Some((rank + 1) as i64),
959 None,
960 source,
961 now,
962 )?;
963 }
964 }
965 for (rank, chunk) in sparks.iter().enumerate() {
967 let cid = chunk["id"].as_str().unwrap_or("");
968 self.storage.insert_usage_trace(
969 trace_id,
970 Some(cid),
971 "retrieved",
972 1.0,
973 None,
974 Some("spark"),
975 None,
976 Some((rank + 1) as i64),
977 None,
978 source,
979 now,
980 )?;
981 }
982 let snapshot = json!({
983 "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
984 "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
985 "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
986 "depth_skipped": depth_skipped,
987 "skipped_reasons": skipped_reasons,
988 });
989 let log = EpisodicLogRow {
990 id: gen_uuid(),
991 trace_id: trace_id.to_string(),
992 lib_id,
993 ts: now.to_string(),
994 query: Some(query.to_string()),
995 recall_snapshot: Some(snapshot.to_string()),
996 event_source: source.to_string(),
997 task_state: "recalled".to_string(),
998 usage_state: "unknown".to_string(),
999 context_key: Some(content_hash(&normalize_query(query))),
1000 distill_state: "open".to_string(),
1001 ..Default::default()
1002 };
1003 self.storage.upsert_episodic_log(&log)?;
1004 self.storage.commit()
1005 })();
1006 if result.is_err() {
1007 let _ = self.storage.rollback();
1008 }
1009 result
1010 }
1011
1012 #[allow(clippy::too_many_arguments)]
1017 pub fn record(
1018 &self,
1019 trace_id: &str,
1020 query: Option<&str>,
1021 output: Option<&str>,
1022 output_summary: Option<&str>,
1023 outcome: Option<&str>,
1024 used: Option<&[String]>,
1025 feedback_up: Option<&[String]>,
1026 feedback_down: Option<&[String]>,
1027 nomination: Option<&str>,
1028 priority: i64,
1029 source: &str,
1030 ) -> Result<()> {
1031 self.record_detailed(
1032 trace_id,
1033 query,
1034 output,
1035 output_summary,
1036 outcome,
1037 used,
1038 "explicit",
1039 true,
1040 feedback_up,
1041 feedback_down,
1042 "user",
1043 None,
1044 None,
1045 nomination,
1046 priority,
1047 None,
1048 source,
1049 )
1050 }
1051
1052 #[allow(clippy::too_many_arguments)]
1053 pub fn record_detailed(
1054 &self,
1055 trace_id: &str,
1056 query: Option<&str>,
1057 output: Option<&str>,
1058 output_summary: Option<&str>,
1059 outcome: Option<&str>,
1060 used: Option<&[String]>,
1061 used_attribution: &str,
1062 used_complete: bool,
1063 feedback_up: Option<&[String]>,
1064 feedback_down: Option<&[String]>,
1065 feedback_kind: &str,
1066 feedback_actor: Option<&str>,
1067 feedback_reason: Option<&str>,
1068 nomination: Option<&str>,
1069 priority: i64,
1070 task_state: Option<&str>,
1071 source: &str,
1072 ) -> Result<()> {
1073 let dedupe_ids = |ids: &[String]| {
1074 let mut seen = HashSet::new();
1075 ids.iter()
1076 .filter(|id| seen.insert((*id).clone()))
1077 .cloned()
1078 .collect::<Vec<_>>()
1079 };
1080 let normalized_used = used.map(dedupe_ids);
1081 let normalized_feedback_up = feedback_up.map(dedupe_ids);
1082 let normalized_feedback_down = feedback_down.map(dedupe_ids);
1083 let used = normalized_used.as_deref();
1084 let feedback_up = normalized_feedback_up.as_deref();
1085 let feedback_down = normalized_feedback_down.as_deref();
1086
1087 if let Some(o) = outcome {
1088 if !matches!(o, "ok" | "fail" | "unknown") {
1089 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
1090 }
1091 }
1092 if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
1093 return Err(InnateError::InvalidState(format!(
1094 "invalid used attribution: {used_attribution}"
1095 )));
1096 }
1097 if !matches!(feedback_kind, "user" | "judge") {
1098 return Err(InnateError::InvalidState(format!(
1099 "invalid feedback kind: {feedback_kind}"
1100 )));
1101 }
1102 if let Some(state) = task_state {
1103 if !matches!(
1104 state,
1105 "recalled" | "running" | "completed" | "abandoned" | "timed_out"
1106 ) {
1107 return Err(InnateError::InvalidState(format!(
1108 "invalid task state: {state}"
1109 )));
1110 }
1111 }
1112 validate_source(source)?;
1113 if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
1114 let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
1115 if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
1116 return Err(InnateError::InvalidState(format!(
1117 "conflicting feedback for chunk {chunk_id}"
1118 )));
1119 }
1120 }
1121 let effective_priority = if nomination.is_some() && priority == 0 {
1122 1
1123 } else {
1124 priority
1125 };
1126 let now = utc_now_iso();
1127 let lib_id = self.storage.lib_id()?;
1128
1129 self.storage.begin_immediate()?;
1130 let result = (|| -> Result<()> {
1131 let log = self.storage.get_episodic_log(trace_id)?;
1132 let mut is_fresh_insert = false;
1133 let log = match log {
1134 Some(l) => l,
1135 None => {
1136 let used_ids = used.map(serde_json::to_string).transpose()?;
1137 let row = EpisodicLogRow {
1138 id: gen_uuid(),
1139 trace_id: trace_id.to_string(),
1140 lib_id,
1141 ts: now.clone(),
1142 query: query.map(str::to_string).or_else(|| Some(String::new())),
1143 output: output.map(str::to_string),
1144 output_summary: output_summary.map(str::to_string),
1145 outcome: outcome.map(str::to_string),
1146 event_source: source.to_string(),
1147 task_state: if matches!(outcome, Some("ok") | Some("fail")) {
1148 "completed".to_string()
1149 } else {
1150 task_state.unwrap_or("running").to_string()
1151 },
1152 completed_at: matches!(outcome, Some("ok") | Some("fail"))
1153 .then(|| now.clone()),
1154 usage_state: usage_state(used).to_string(),
1155 used_ids,
1156 used_attribution: used.map(|_| used_attribution.to_string()),
1157 used_complete,
1158 context_key: query.map(|q| content_hash(&normalize_query(q))),
1159 nomination: nomination.map(str::to_string),
1160 priority: effective_priority,
1161 distill_state: "open".to_string(),
1162 ..Default::default()
1163 };
1164 self.storage.upsert_episodic_log(&row)?;
1165 is_fresh_insert = true;
1166 self.storage.get_episodic_log(trace_id)?.unwrap()
1167 }
1168 };
1169 self.validate_trace_attribution(trace_id, used, "used")?;
1170 self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
1171 self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
1172
1173 let existing_outcome = log
1174 .get("outcome")
1175 .and_then(Value::as_str)
1176 .map(str::to_string);
1177 let effective_used_attribution = if used.is_some() {
1179 used_attribution
1180 } else {
1181 log.get("used_attribution")
1182 .and_then(Value::as_str)
1183 .unwrap_or(used_attribution)
1184 };
1185 let used_strength = match effective_used_attribution {
1186 "explicit" => 0.3,
1187 "cited" => 0.25,
1188 "inferred" => 0.15,
1189 _ => unreachable!(),
1190 };
1191 let existing_used_ids: Vec<String> = log
1192 .get("used_ids")
1193 .and_then(Value::as_str)
1194 .and_then(|raw| serde_json::from_str(raw).ok())
1195 .unwrap_or_default();
1196 let existing_used_complete = log
1197 .get("used_complete")
1198 .and_then(Value::as_i64)
1199 .unwrap_or(0)
1200 != 0;
1201 let effective_used_complete = used_complete || existing_used_complete;
1202 let effective_used_ids = used.map(|reported| {
1203 if used_complete {
1204 reported.to_vec()
1205 } else {
1206 let mut merged = existing_used_ids.clone();
1207 let mut seen: HashSet<String> = merged.iter().cloned().collect();
1208 merged.extend(
1209 reported
1210 .iter()
1211 .filter(|id| seen.insert((*id).clone()))
1212 .cloned(),
1213 );
1214 merged
1215 }
1216 });
1217 if let Some(used_ids) = effective_used_ids.as_deref() {
1218 let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
1219 if used_complete {
1220 self.storage.replace_used_trace(
1221 trace_id,
1222 used_ids,
1223 used_strength,
1224 used_attribution,
1225 source,
1226 &now,
1227 )?;
1228 } else if let Some(reported) = used {
1229 self.storage.merge_used_trace(
1230 trace_id,
1231 reported,
1232 used_strength,
1233 used_attribution,
1234 source,
1235 &now,
1236 )?;
1237 }
1238 let affected: HashSet<String> = previously_used
1239 .into_iter()
1240 .chain(used_ids.iter().cloned())
1241 .collect();
1242 for cid in affected {
1243 self.storage.refresh_chunk_last_used(&cid, &now)?;
1244 }
1245 }
1246
1247 if let Some(o) = outcome {
1249 if matches!(o, "ok" | "fail") {
1250 let event = if o == "ok" { "task_ok" } else { "task_fail" };
1251 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
1252 self.storage.conn_execute(
1253 "DELETE FROM usage_trace
1254 WHERE trace_id=? AND event IN ('task_ok','task_fail')
1255 AND chunk_id IS NULL",
1256 rusqlite::params![trace_id],
1257 )?;
1258 self.storage.insert_usage_trace(
1259 trace_id, None, event, strength, None, None, None, None, None, source, &now,
1260 )?;
1261 }
1262 }
1263
1264 let effective_outcome = outcome
1268 .filter(|value| *value != "unknown")
1269 .or(existing_outcome.as_deref().filter(|value| *value != "unknown"));
1270 if let Some(o @ ("ok" | "fail")) = effective_outcome {
1271 if used.is_some()
1272 || (outcome.is_some_and(|value| value != "unknown")
1273 && existing_outcome.as_deref() != outcome)
1274 {
1275 let fallback_ids: Vec<String>;
1276 let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
1277 effective_used_ids.as_deref()
1278 } else {
1279 fallback_ids = log
1280 .get("used_ids")
1281 .and_then(Value::as_str)
1282 .and_then(|s| serde_json::from_str(s).ok())
1283 .unwrap_or_default();
1284 if fallback_ids.is_empty() {
1285 None
1286 } else {
1287 Some(&fallback_ids)
1288 }
1289 };
1290 let effective_complete = if used.is_some() {
1291 effective_used_complete
1292 } else {
1293 log.get("usage_state").and_then(Value::as_str) != Some("unknown")
1294 && log
1295 .get("used_complete")
1296 .and_then(Value::as_i64)
1297 .unwrap_or(1)
1298 != 0
1299 };
1300 self.replace_outcome_evidence(
1301 trace_id,
1302 o,
1303 effective_used,
1304 effective_complete,
1305 &now,
1306 )?;
1307 }
1308 } else if used.is_some() && effective_used_complete {
1309 self.replace_selected_unused_evidence(
1310 trace_id,
1311 effective_used_ids.as_deref().unwrap_or_default(),
1312 &now,
1313 )?;
1314 }
1315
1316 let context_key = log
1317 .get("context_key")
1318 .and_then(Value::as_str)
1319 .map(str::to_string)
1320 .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
1321 let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
1322
1323 let mut context_affected: HashSet<String> = HashSet::new();
1327 if let Some(used_ids) = effective_used_ids.as_deref() {
1328 for cid in used_ids {
1329 context_affected.insert(cid.clone());
1330 }
1331 }
1332 if let Some(ups) = feedback_up {
1333 for cid in ups {
1334 let corrected = self.storage
1335 .delete_feedback_event(trace_id, cid, "down")?;
1336 self.storage
1337 .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_down")?;
1338 let inserted = self.storage.insert_feedback_event(
1339 &gen_uuid(),
1340 trace_id,
1341 cid,
1342 "up",
1343 feedback_strength,
1344 source,
1345 feedback_actor,
1346 feedback_reason,
1347 context_key.as_deref(),
1348 &now,
1349 )?;
1350 if inserted > 0 {
1351 self.upsert_trace_confidence_evidence(
1352 trace_id,
1353 cid,
1354 "feedback_up",
1355 1.0,
1356 feedback_strength,
1357 if feedback_kind == "judge" {
1358 "judge_up"
1359 } else {
1360 "user_up"
1361 },
1362 context_key.as_deref(),
1363 &now,
1364 true,
1365 )?;
1366 self.storage.update_chunk_last_used(cid, &now)?;
1367 self.refresh_governance_evidence(cid, &now)?;
1368 context_affected.insert(cid.clone());
1369 } else if corrected > 0 {
1370 self.recompute_chunk_confidence(cid, &now)?;
1371 self.refresh_governance_evidence(cid, &now)?;
1372 context_affected.insert(cid.clone());
1373 }
1374 }
1375 }
1376 if let Some(downs) = feedback_down {
1377 for cid in downs {
1378 let corrected = self.storage
1379 .delete_feedback_event(trace_id, cid, "up")?;
1380 self.storage
1381 .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_up")?;
1382 let inserted = self.storage.insert_feedback_event(
1383 &gen_uuid(),
1384 trace_id,
1385 cid,
1386 "down",
1387 feedback_strength,
1388 source,
1389 feedback_actor,
1390 feedback_reason,
1391 context_key.as_deref(),
1392 &now,
1393 )?;
1394 if inserted > 0 {
1395 self.upsert_trace_confidence_evidence(
1396 trace_id,
1397 cid,
1398 "feedback_down",
1399 0.0,
1400 feedback_strength,
1401 if feedback_kind == "judge" {
1402 "judge_down"
1403 } else {
1404 "user_down"
1405 },
1406 context_key.as_deref(),
1407 &now,
1408 true,
1409 )?;
1410 self.refresh_governance_evidence(cid, &now)?;
1411 context_affected.insert(cid.clone());
1412 } else if corrected > 0 {
1413 self.recompute_chunk_confidence(cid, &now)?;
1414 self.refresh_governance_evidence(cid, &now)?;
1415 context_affected.insert(cid.clone());
1416 }
1417 }
1418 }
1419 self.rebuild_context_stats_for(&context_affected, &now)?;
1421
1422 if !is_fresh_insert {
1424 self.storage.patch_episodic_log_content(
1425 trace_id,
1426 query,
1427 output,
1428 output_summary,
1429 nomination,
1430 effective_priority,
1431 )?;
1432 }
1433
1434 let lifecycle_state = if effective_outcome.is_some() {
1435 "completed"
1436 } else {
1437 task_state.unwrap_or_else(|| {
1438 log.get("task_state")
1439 .and_then(Value::as_str)
1440 .unwrap_or("running")
1441 })
1442 };
1443 let used_ids_json = effective_used_ids
1444 .as_deref()
1445 .map(serde_json::to_string)
1446 .transpose()?;
1447 self.storage.update_trace_lifecycle(
1448 trace_id,
1449 lifecycle_state,
1450 (lifecycle_state == "completed").then_some(now.as_str()),
1451 effective_used_ids
1452 .as_deref()
1453 .map(|ids| usage_state(Some(ids))),
1454 used_ids_json.as_deref(),
1455 used.map(|_| used_attribution),
1456 used.map(|_| effective_used_complete),
1457 )?;
1458
1459 let current_state = log
1461 .get("distill_state")
1462 .and_then(Value::as_str)
1463 .unwrap_or("open");
1464 let lifecycle_completed = lifecycle_state == "completed";
1465 let has_material = output_summary.is_some()
1466 || nomination.is_some()
1467 || output.is_some()
1468 || log.get("output_summary").and_then(Value::as_str).is_some()
1469 || log.get("nomination").and_then(Value::as_str).is_some()
1470 || log.get("output").and_then(Value::as_str).is_some();
1471 let retryable_discard = current_state == "discarded"
1472 && matches!(
1473 log.get("distill_note").and_then(Value::as_str),
1474 Some("insufficient_material" | "abandoned" | "timed_out")
1475 );
1476 let new_state = if current_state == "open"
1477 && matches!(lifecycle_state, "abandoned" | "timed_out")
1478 {
1479 Some("discarded")
1480 } else if lifecycle_completed && (current_state == "open" || retryable_discard) {
1481 if has_material {
1482 Some("new")
1483 } else {
1484 Some("discarded")
1485 }
1486 } else {
1487 None
1488 };
1489 if let Some(state) = new_state {
1490 let note = if state == "discarded" {
1491 Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
1492 lifecycle_state
1493 } else {
1494 "insufficient_material"
1495 })
1496 } else {
1497 None
1498 };
1499 let outcome_str = outcome.map(str::to_string);
1500 self.storage.update_episodic_log_state(
1501 trace_id,
1502 state,
1503 note,
1504 outcome_str.as_deref(),
1505 )?;
1506 if state == "new" && retryable_discard {
1507 self.storage.conn_execute(
1508 "UPDATE episodic_log SET distill_note=NULL WHERE trace_id=?",
1509 rusqlite::params![trace_id],
1510 )?;
1511 }
1512 } else if outcome.is_some() {
1513 let outcome_str = outcome.map(str::to_string);
1514 self.storage.update_episodic_log_state(
1515 trace_id,
1516 current_state,
1517 None,
1518 outcome_str.as_deref(),
1519 )?;
1520 }
1521
1522 self.storage.commit()
1523 })();
1524 if result.is_err() {
1525 let _ = self.storage.rollback();
1526 }
1527 result?;
1528 self.enqueue_evolve_if_needed(&now)?;
1529 Ok(())
1530 }
1531
1532 fn validate_trace_attribution(
1533 &self,
1534 trace_id: &str,
1535 chunk_ids: Option<&[String]>,
1536 field: &str,
1537 ) -> Result<()> {
1538 let Some(chunk_ids) = chunk_ids else {
1539 return Ok(());
1540 };
1541 if chunk_ids.is_empty() {
1542 return Ok(());
1543 }
1544
1545 let log = self.storage.get_episodic_log(trace_id)?.ok_or_else(|| {
1546 InnateError::InvalidState(format!(
1547 "{field} requires a trace created by recall: {trace_id}"
1548 ))
1549 })?;
1550 let mut attributable = HashSet::new();
1551 if let Some(raw) = log.get("recall_snapshot").and_then(Value::as_str) {
1552 if let Ok(snapshot) = serde_json::from_str::<Value>(raw) {
1553 if let Some(ids) = snapshot.get("selected").and_then(Value::as_array) {
1554 attributable.extend(ids.iter().filter_map(Value::as_str).map(str::to_string));
1555 }
1556 }
1557 }
1558 let rows = self.storage.query_chunks_params(
1559 "SELECT DISTINCT chunk_id FROM usage_trace
1560 WHERE trace_id=? AND chunk_id IS NOT NULL
1561 AND event='selected'",
1562 rusqlite::params![trace_id],
1563 )?;
1564 attributable.extend(rows.iter().filter_map(|row| {
1565 row.get("chunk_id")
1566 .and_then(Value::as_str)
1567 .map(str::to_string)
1568 }));
1569
1570 for chunk_id in chunk_ids {
1571 if self.storage.get_chunk(chunk_id)?.is_none() || !attributable.contains(chunk_id) {
1572 return Err(InnateError::InvalidState(format!(
1573 "{field} chunk {chunk_id} was not attributable to trace {trace_id}"
1574 )));
1575 }
1576 }
1577 Ok(())
1578 }
1579
1580 fn replace_selected_unused_evidence(
1581 &self,
1582 trace_id: &str,
1583 used_ids: &[String],
1584 now: &str,
1585 ) -> Result<()> {
1586 let old_rows = self.storage.query_chunks_params(
1587 "SELECT DISTINCT chunk_id FROM confidence_evidence
1588 WHERE trace_id=? AND kind='selected_unused'",
1589 rusqlite::params![trace_id],
1590 )?;
1591 let mut affected: HashSet<String> = old_rows
1592 .iter()
1593 .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1594 .collect();
1595 self.storage
1596 .delete_trace_confidence_evidence(trace_id, &["selected_unused"])?;
1597
1598 let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1599 let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1600 log.get("context_key")
1601 .and_then(Value::as_str)
1602 .map(str::to_string)
1603 });
1604 let selected_rows = self.storage.query_chunks_params(
1605 "SELECT chunk_id FROM usage_trace
1606 WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1607 rusqlite::params![trace_id],
1608 )?;
1609 for row in selected_rows {
1610 if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1611 if !used_set.contains(chunk_id) {
1612 self.upsert_trace_confidence_evidence(
1613 trace_id,
1614 chunk_id,
1615 "selected_unused",
1616 0.0,
1617 0.08,
1618 "selected_unused",
1619 context_key.as_deref(),
1620 now,
1621 false,
1622 )?;
1623 affected.insert(chunk_id.to_string());
1624 }
1625 }
1626 }
1627 for chunk_id in affected {
1628 self.recompute_chunk_confidence(&chunk_id, now)?;
1629 }
1630 Ok(())
1631 }
1632
1633 #[allow(clippy::too_many_arguments)]
1634 fn upsert_trace_confidence_evidence(
1635 &self,
1636 trace_id: &str,
1637 chunk_id: &str,
1638 kind: &str,
1639 target: f64,
1640 strength: f64,
1641 reason: &str,
1642 context_key: Option<&str>,
1643 now: &str,
1644 explicit: bool,
1645 ) -> Result<()> {
1646 let chunk = match self.storage.get_chunk(chunk_id)? {
1647 Some(chunk) => chunk,
1648 None => return Ok(()),
1649 };
1650 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1651 return Ok(());
1652 }
1653 let recency_weight = if explicit {
1654 const KAPPA: f64 = 0.5;
1655 const WINDOW_DAYS: f64 = 14.0;
1656 let gap_days = chunk
1657 .get("last_used_at")
1658 .and_then(Value::as_str)
1659 .map(|ts| iso_days_diff(now, ts) as f64)
1660 .unwrap_or(0.0);
1661 (1.0
1662 + KAPPA
1663 * (-(gap_days / WINDOW_DAYS) * std::f64::consts::LN_2).exp())
1664 .min(1.5)
1665 } else {
1666 1.0
1667 };
1668 let alpha = (0.2 * strength * recency_weight).clamp(0.0, 1.0);
1669 self.storage.upsert_confidence_evidence(
1670 &gen_uuid(),
1671 Some(trace_id),
1672 chunk_id,
1673 kind,
1674 target,
1675 alpha,
1676 reason,
1677 context_key,
1678 now,
1679 )?;
1680 self.recompute_chunk_confidence(chunk_id, now)
1681 }
1682
1683 fn recompute_chunk_confidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1684 let Some(chunk) = self.storage.get_chunk(chunk_id)? else {
1685 return Ok(());
1686 };
1687 let mut confidence = chunk
1688 .get("confidence_base")
1689 .and_then(Value::as_f64)
1690 .unwrap_or_else(|| {
1691 chunk
1692 .get("confidence")
1693 .and_then(Value::as_f64)
1694 .unwrap_or(0.5)
1695 });
1696 let mut reason = chunk
1697 .get("confidence_reason")
1698 .and_then(Value::as_str)
1699 .unwrap_or("base")
1700 .to_string();
1701 for evidence in self.storage.confidence_evidence_for_chunk(chunk_id)? {
1702 let target = evidence
1703 .get("target")
1704 .and_then(Value::as_f64)
1705 .unwrap_or(0.5);
1706 let alpha = evidence
1707 .get("alpha")
1708 .and_then(Value::as_f64)
1709 .unwrap_or(0.0)
1710 .clamp(0.0, 1.0);
1711 confidence = (confidence + alpha * (target - confidence)).clamp(0.0, 1.0);
1712 reason = evidence
1713 .get("reason")
1714 .and_then(Value::as_str)
1715 .unwrap_or("evidence")
1716 .to_string();
1717 }
1718 self.storage.conn_execute(
1719 "UPDATE chunks SET confidence=?, confidence_reason=?, updated_at=? WHERE id=?",
1720 rusqlite::params![confidence, reason, now, chunk_id],
1721 )
1722 }
1723
1724 #[allow(clippy::too_many_arguments)]
1725 fn replace_outcome_evidence(
1726 &self,
1727 trace_id: &str,
1728 outcome: &str,
1729 used: Option<&[String]>,
1730 used_complete: bool,
1731 now: &str,
1732 ) -> Result<()> {
1733 let old_rows = self.storage.query_chunks_params(
1734 "SELECT DISTINCT chunk_id FROM confidence_evidence
1735 WHERE trace_id=? AND kind IN ('outcome_ok','outcome_fail','selected_unused')",
1736 rusqlite::params![trace_id],
1737 )?;
1738 let mut affected: HashSet<String> = old_rows
1739 .iter()
1740 .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1741 .collect();
1742 self.storage.delete_trace_confidence_evidence(
1743 trace_id,
1744 &["outcome_ok", "outcome_fail", "selected_unused"],
1745 )?;
1746
1747 let used_ids = used.unwrap_or_default();
1748 let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1749 let attribution_divisor = used_ids.len().max(1) as f64;
1750 let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1751 log.get("context_key")
1752 .and_then(Value::as_str)
1753 .map(str::to_string)
1754 });
1755 for chunk_id in used_ids {
1756 let attribution = self
1757 .storage
1758 .query_chunks_params(
1759 "SELECT strength, attribution FROM usage_trace
1760 WHERE trace_id=? AND chunk_id=? AND event='used'",
1761 rusqlite::params![trace_id, chunk_id],
1762 )?
1763 .into_iter()
1764 .next();
1765 let base_strength = attribution
1766 .as_ref()
1767 .and_then(|row| row.get("strength"))
1768 .and_then(Value::as_f64)
1769 .unwrap_or(0.15)
1770 / attribution_divisor;
1771 let attribution_reason = attribution
1772 .as_ref()
1773 .and_then(|row| row.get("attribution"))
1774 .and_then(Value::as_str)
1775 .unwrap_or("inferred");
1776 let (kind, target, strength, reason) = if outcome == "ok" {
1777 ("outcome_ok", 1.0, base_strength, attribution_reason)
1778 } else {
1779 ("outcome_fail", 0.0, base_strength * 0.5, "task_fail")
1780 };
1781 self.upsert_trace_confidence_evidence(
1782 trace_id,
1783 chunk_id,
1784 kind,
1785 target,
1786 strength,
1787 reason,
1788 context_key.as_deref(),
1789 now,
1790 false,
1791 )?;
1792 affected.insert(chunk_id.clone());
1793 }
1794
1795 if used_complete {
1796 let selected_rows = self.storage.query_chunks_params(
1797 "SELECT chunk_id FROM usage_trace
1798 WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1799 rusqlite::params![trace_id],
1800 )?;
1801 for row in selected_rows {
1802 if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1803 if !used_set.contains(chunk_id) {
1804 self.upsert_trace_confidence_evidence(
1805 trace_id,
1806 chunk_id,
1807 "selected_unused",
1808 0.0,
1809 0.08,
1810 "selected_unused",
1811 context_key.as_deref(),
1812 now,
1813 false,
1814 )?;
1815 affected.insert(chunk_id.to_string());
1816 }
1817 }
1818 }
1819 }
1820
1821 for chunk_id in affected {
1822 self.recompute_chunk_confidence(&chunk_id, now)?;
1823 }
1824 Ok(())
1825 }
1826
1827 fn rebuild_context_stats(&self, now: &str) -> Result<()> {
1828 self.storage.conn_execute(
1829 "DELETE FROM chunk_context_stats",
1830 rusqlite::params![],
1831 )?;
1832 self.storage.conn_execute(
1833 "INSERT INTO chunk_context_stats
1834 (chunk_id, context_key, success_count, failure_count,
1835 positive_feedback, negative_feedback, last_updated_at)
1836 SELECT chunk_id, context_key, success_count, failure_count,
1837 positive_feedback, negative_feedback, ?
1838 FROM chunk_context_stats_base",
1839 rusqlite::params![now],
1840 )?;
1841 self.storage.conn_execute(
1842 "INSERT INTO chunk_context_stats
1843 (chunk_id, context_key, success_count, failure_count,
1844 positive_feedback, negative_feedback, last_updated_at)
1845 SELECT chunk_id, context_key,
1846 SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1847 SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1848 0, 0, ?
1849 FROM confidence_evidence
1850 WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1851 GROUP BY chunk_id, context_key
1852 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1853 success_count=success_count+excluded.success_count,
1854 failure_count=failure_count+excluded.failure_count,
1855 last_updated_at=excluded.last_updated_at",
1856 rusqlite::params![now],
1857 )?;
1858 self.storage.conn_execute(
1859 "INSERT INTO chunk_context_stats
1860 (chunk_id, context_key, success_count, failure_count,
1861 positive_feedback, negative_feedback, last_updated_at)
1862 SELECT fe.chunk_id, fe.context_key, 0, 0,
1863 SUM(CASE WHEN fe.signal='up' THEN 1 ELSE 0 END),
1864 SUM(CASE WHEN fe.signal='down' THEN 1 ELSE 0 END), ?
1865 FROM feedback_events fe
1866 WHERE fe.context_key IS NOT NULL
1867 AND fe.ts > COALESCE((
1868 SELECT c.evidence_cutoff_at FROM chunks c
1869 WHERE c.id = fe.chunk_id
1870 ), '')
1871 GROUP BY fe.chunk_id, fe.context_key
1872 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1873 positive_feedback=positive_feedback+excluded.positive_feedback,
1874 negative_feedback=negative_feedback+excluded.negative_feedback,
1875 last_updated_at=excluded.last_updated_at",
1876 rusqlite::params![now],
1877 )
1878 }
1879
1880 fn rebuild_context_stats_for(&self, chunk_ids: &HashSet<String>, now: &str) -> Result<()> {
1884 if chunk_ids.is_empty() {
1885 return Ok(());
1886 }
1887 for chunk_id in chunk_ids {
1888 self.storage.conn_execute(
1889 "DELETE FROM chunk_context_stats WHERE chunk_id=?",
1890 rusqlite::params![chunk_id],
1891 )?;
1892 self.storage.conn_execute(
1893 "INSERT OR IGNORE INTO chunk_context_stats
1894 (chunk_id, context_key, success_count, failure_count,
1895 positive_feedback, negative_feedback, last_updated_at)
1896 SELECT chunk_id, context_key, success_count, failure_count,
1897 positive_feedback, negative_feedback, ?
1898 FROM chunk_context_stats_base WHERE chunk_id=?",
1899 rusqlite::params![now, chunk_id],
1900 )?;
1901 self.storage.conn_execute(
1902 "INSERT INTO chunk_context_stats
1903 (chunk_id, context_key, success_count, failure_count,
1904 positive_feedback, negative_feedback, last_updated_at)
1905 SELECT chunk_id, context_key,
1906 SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1907 SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1908 0, 0, ?
1909 FROM confidence_evidence
1910 WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1911 AND chunk_id=?
1912 GROUP BY chunk_id, context_key
1913 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1914 success_count=success_count+excluded.success_count,
1915 failure_count=failure_count+excluded.failure_count,
1916 last_updated_at=excluded.last_updated_at",
1917 rusqlite::params![now, chunk_id],
1918 )?;
1919 self.storage.conn_execute(
1920 "INSERT INTO chunk_context_stats
1921 (chunk_id, context_key, success_count, failure_count,
1922 positive_feedback, negative_feedback, last_updated_at)
1923 SELECT chunk_id, context_key, 0, 0,
1924 SUM(CASE WHEN signal='up' THEN 1 ELSE 0 END),
1925 SUM(CASE WHEN signal='down' THEN 1 ELSE 0 END), ?
1926 FROM feedback_events
1927 WHERE context_key IS NOT NULL AND chunk_id=?
1928 AND ts > COALESCE((
1929 SELECT evidence_cutoff_at FROM chunks
1930 WHERE id=?
1931 ), '')
1932 GROUP BY chunk_id, context_key
1933 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1934 positive_feedback=positive_feedback+excluded.positive_feedback,
1935 negative_feedback=negative_feedback+excluded.negative_feedback,
1936 last_updated_at=excluded.last_updated_at",
1937 rusqlite::params![now, chunk_id, chunk_id],
1938 )?;
1939 }
1940 Ok(())
1941 }
1942
1943 fn refresh_governance_evidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1944 let rows = self.storage.query_chunks_params(
1945 "SELECT actor AS actor_key,
1946 signal, strength, ts
1947 FROM feedback_events
1948 WHERE chunk_id=? AND actor IS NOT NULL AND actor!=''
1949 AND ts > COALESCE((
1950 SELECT evidence_cutoff_at FROM chunks
1951 WHERE id=?
1952 ), '')",
1953 rusqlite::params![chunk_id, chunk_id],
1954 )?;
1955 let mut actor_contributions: HashMap<String, f64> = HashMap::new();
1956 for row in rows {
1957 let actor = row
1958 .get("actor_key")
1959 .and_then(Value::as_str)
1960 .unwrap_or("anonymous")
1961 .to_string();
1962 let age_days = row
1963 .get("ts")
1964 .and_then(Value::as_str)
1965 .map(|ts| iso_days_diff(now, ts).max(0) as f64)
1966 .unwrap_or(0.0);
1967 let recency_weight = 0.5_f64.powf(age_days / 90.0);
1968 let strength = row.get("strength").and_then(Value::as_f64).unwrap_or(0.0);
1969 let signed = if row.get("signal").and_then(Value::as_str) == Some("down") {
1970 strength
1971 } else {
1972 -strength
1973 };
1974 *actor_contributions.entry(actor).or_default() += signed * recency_weight;
1975 }
1976 let mut score = 0.0_f64;
1977 let mut actor_count = 0_i64;
1978 for contribution in actor_contributions.values().copied() {
1979 let contribution = contribution.clamp(-1.0, 1.0);
1980 score += contribution;
1981 if contribution > 0.0 {
1982 actor_count += 1;
1983 }
1984 }
1985 let score = score.max(0.0);
1986 if score >= 2.0 && actor_count >= 2 {
1987 self.storage.upsert_governance_proposal(
1988 &gen_uuid(),
1989 chunk_id,
1990 "review_applicability",
1991 "Weighted negative feedback",
1992 score.ceil() as i64,
1993 score,
1994 actor_count,
1995 now,
1996 )?;
1997 } else {
1998 self.storage.conn_execute(
1999 "UPDATE governance_proposals
2000 SET state='rejected', evidence_count=?, evidence_score=?, actor_count=?, updated_at=?
2001 WHERE chunk_id=? AND state='pending'",
2002 rusqlite::params![score.ceil() as i64, score, actor_count, now, chunk_id],
2003 )?;
2004 }
2005 Ok(())
2006 }
2007
2008 fn enqueue_evolve_if_needed(&self, now: &str) -> Result<()> {
2009 let ready = count_query(
2010 &self.storage,
2011 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2012 )?;
2013 let oldest = self
2014 .storage
2015 .query_chunks("SELECT MIN(ts) AS oldest FROM episodic_log WHERE distill_state='new'")?
2016 .first()
2017 .and_then(|row| row.get("oldest"))
2018 .and_then(Value::as_str)
2019 .map(str::to_string);
2020 let age_due = oldest
2021 .as_deref()
2022 .is_some_and(|ts| ts <= hours_ago(now, self.evolve_schedule_interval_hours).as_str());
2023 let governance_pending = count_query(
2024 &self.storage,
2025 "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
2026 )?;
2027 let governance_ready = count_query_params(
2030 &self.storage,
2031 "SELECT COUNT(*) FROM governance_proposals
2032 WHERE state='pending'
2033 AND evidence_score >= ? AND actor_count >= 2",
2034 rusqlite::params![self.governance_archive_threshold as f64],
2035 )?;
2036 if ready >= self.evolve_threshold
2037 || (ready > 0 && age_due)
2038 || governance_pending >= self.governance_evolve_threshold
2039 || governance_ready > 0
2040 {
2041 let reason = if ready >= self.evolve_threshold {
2042 "threshold"
2043 } else if governance_ready > 0 {
2044 "governance_ready"
2045 } else if governance_pending >= self.governance_evolve_threshold {
2046 "governance"
2047 } else {
2048 "scheduled"
2049 };
2050 self.storage.request_evolve(&gen_uuid(), reason, now)?;
2051 }
2052 Ok(())
2053 }
2054
2055 pub fn add(
2060 &self,
2061 content: &str,
2062 kind: &str,
2063 trigger_desc: Option<&str>,
2064 anti_trigger_desc: Option<&str>,
2065 source: &str,
2066 skill_name: Option<&str>,
2067 ) -> Result<String> {
2068 if !matches!(kind, "note" | "skill") {
2069 return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
2070 }
2071 if !matches!(source, "chat" | "manual" | "doc" | "agent") {
2072 return Err(InnateError::InvalidState(format!(
2073 "invalid source: {source}"
2074 )));
2075 }
2076
2077 let (content, action) = self.sanitize_content(content);
2078 if action == SanitizeAction::Discard {
2079 return Ok(String::new());
2080 }
2081
2082 let trigger_clean = trigger_desc.and_then(|t| {
2083 let (cleaned, act) = self.sanitizer.sanitize(t);
2084 if act == SanitizeAction::Discard {
2085 None
2086 } else {
2087 Some(cleaned)
2088 }
2089 });
2090 let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2091 let (cleaned, act) = self.sanitizer.sanitize(t);
2092 if act == SanitizeAction::Discard {
2093 None
2094 } else {
2095 Some(cleaned)
2096 }
2097 });
2098
2099 let h = content_hash(&content);
2100 if self.storage.is_hash_invalidated(&h)? {
2101 return Err(InnateError::InvalidState(
2102 "content hash is invalidated".into(),
2103 ));
2104 }
2105
2106 let existing = self.storage.query_chunks_params(
2108 "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2109 rusqlite::params![h],
2110 )?;
2111 if let Some(e) = existing.first() {
2112 if let Some(id) = e.get("id").and_then(Value::as_str) {
2113 return Ok(id.to_string());
2114 }
2115 }
2116
2117 let now = utc_now_iso();
2118 let chunk_id = gen_uuid();
2119 let redacted = action == SanitizeAction::Redact;
2120
2121 let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
2122 (
2123 "captured",
2124 "pending",
2125 if redacted { 0.4 } else { 0.60 },
2126 0,
2127 "init:captured_agent",
2128 )
2129 } else if kind == "skill" {
2130 (
2131 "installed",
2132 "active",
2133 if redacted { 0.4 } else { 0.85 },
2134 1,
2135 "init:installed",
2136 )
2137 } else {
2138 (
2139 "captured",
2140 "active",
2141 if redacted { 0.4 } else { 0.60 },
2142 0,
2143 "init:captured",
2144 )
2145 };
2146
2147 let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2149 let (cvec, tvec, embed_ver, final_state_reason) = match (
2150 self.embedding.embed_content(&content),
2151 self.embedding.embed_trigger(trigger_str),
2152 ) {
2153 (Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
2154 _ => (
2155 vec![],
2156 vec![],
2157 0i64,
2158 format!("embedding_pending:target={state}"),
2159 ),
2160 };
2161
2162 let tokens = estimate_tokens(&content) as i64;
2163 let row = ChunkRow {
2164 id: chunk_id.clone(),
2165 skill_name: skill_name.map(str::to_string),
2166 content: content.clone(),
2167 trigger_desc: trigger_clean.clone(),
2168 anti_trigger_desc: anti_trigger_clean.clone(),
2169 content_hash: h,
2170 token_count: Some(tokens),
2171 origin: origin.to_string(),
2172 source: Some(source.to_string()),
2173 protected: prot,
2174 state: state.to_string(),
2175 state_reason: Some(final_state_reason),
2176 confidence: conf,
2177 confidence_reason: Some(format!("init:{origin}")),
2178 version: 1,
2179 embed_version: embed_ver,
2180 created_at: now.clone(),
2181 updated_at: now.clone(),
2182 ..Default::default()
2183 };
2184
2185 self.storage.begin_immediate()?;
2186 let result = (|| -> Result<()> {
2187 self.storage.insert_chunk(&row)?;
2188 if embed_ver > 0 {
2189 self.storage
2190 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2191 self.storage
2192 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2193 }
2194 self.storage.commit()
2195 })();
2196 if result.is_err() {
2197 let _ = self.storage.rollback();
2198 }
2199 result?;
2200 Ok(chunk_id)
2201 }
2202
2203 pub fn spark(
2208 &self,
2209 content: &str,
2210 trigger_desc: Option<&str>,
2211 anti_trigger_desc: Option<&str>,
2212 ) -> Result<String> {
2213 let (content, action) = self.sanitize_content(content);
2214 if action == SanitizeAction::Discard {
2215 return Ok(String::new());
2216 }
2217
2218 let trigger_clean = trigger_desc.and_then(|t| {
2219 let (cleaned, act) = self.sanitizer.sanitize(t);
2220 if act == SanitizeAction::Discard {
2221 None
2222 } else {
2223 Some(cleaned)
2224 }
2225 });
2226 let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2227 let (cleaned, act) = self.sanitizer.sanitize(t);
2228 if act == SanitizeAction::Discard {
2229 None
2230 } else {
2231 Some(cleaned)
2232 }
2233 });
2234
2235 let h = content_hash(&content);
2236 if self.storage.is_hash_invalidated(&h)? {
2237 return Err(InnateError::InvalidState(
2238 "content hash is invalidated".into(),
2239 ));
2240 }
2241
2242 let related: Vec<String> = self
2244 .recall(
2245 &content,
2246 2000,
2247 false,
2248 false,
2249 Some(5),
2250 "sdk",
2251 "false",
2252 false,
2253 "off",
2254 )
2255 .map(|r| {
2256 r.knowledge
2257 .iter()
2258 .filter_map(|c| c["id"].as_str().map(str::to_string))
2259 .collect()
2260 })
2261 .unwrap_or_default();
2262
2263 let now = utc_now_iso();
2264 let chunk_id = gen_uuid();
2265 let tokens = estimate_tokens(&content) as i64;
2266
2267 let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2268 let (cvec, tvec, embed_ver, state_reason) = match (
2269 self.embedding.embed_content(&content),
2270 self.embedding.embed_trigger(trigger_str),
2271 ) {
2272 (Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
2273 _ => (
2274 vec![],
2275 vec![],
2276 0i64,
2277 "embedding_pending:target=active".to_string(),
2278 ),
2279 };
2280
2281 let row = ChunkRow {
2282 id: chunk_id.clone(),
2283 content: content.clone(),
2284 trigger_desc: trigger_clean.clone(),
2285 anti_trigger_desc: anti_trigger_clean.clone(),
2286 content_hash: h,
2287 token_count: Some(tokens),
2288 origin: "spark".to_string(),
2289 maturity: Some("seed".to_string()),
2290 related_ids: if related.is_empty() {
2291 None
2292 } else {
2293 Some(related.join(","))
2294 },
2295 state: "active".to_string(),
2296 state_reason: Some(state_reason),
2297 confidence: 0.5,
2298 version: 1,
2299 embed_version: embed_ver,
2300 created_at: now.clone(),
2301 updated_at: now.clone(),
2302 ..Default::default()
2303 };
2304
2305 self.storage.begin_immediate()?;
2306 let result = (|| -> Result<()> {
2307 self.storage.insert_chunk(&row)?;
2308 if embed_ver > 0 {
2309 self.storage
2310 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2311 self.storage
2312 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2313 }
2314 self.storage.commit()
2315 })();
2316 if result.is_err() {
2317 let _ = self.storage.rollback();
2318 }
2319 result?;
2320 Ok(chunk_id)
2321 }
2322
2323 pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
2328 let chunk = self
2329 .storage
2330 .get_chunk(spark_id)?
2331 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2332 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
2333 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2334 }
2335 let current = chunk
2336 .get("maturity")
2337 .and_then(Value::as_str)
2338 .unwrap_or("seed");
2339 let valid_next: &[&str] = match current {
2340 "seed" => &["sprouting"],
2341 "sprouting" => &["incubating"],
2342 _ => {
2343 return Err(InnateError::InvalidState(format!(
2344 "spark {spark_id} already {current}"
2345 )))
2346 }
2347 };
2348 if current == to {
2349 return Ok(());
2350 }
2351 if !valid_next.contains(&to) {
2352 return Err(InnateError::InvalidState(format!(
2353 "invalid spark maturity transition: {current} -> {to}"
2354 )));
2355 }
2356 let now = utc_now_iso();
2357 self.storage.begin_immediate()?;
2358 let result = self
2359 .storage
2360 .query_chunks_params(
2361 "UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
2362 rusqlite::params![to, now, spark_id],
2363 )
2364 .and_then(|_| self.storage.commit());
2365 if result.is_err() {
2366 let _ = self.storage.rollback();
2367 }
2368 result.map(|_| ())
2369 }
2370
2371 pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
2372 let spark = self
2373 .storage
2374 .get_chunk(spark_id)?
2375 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2376 if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2377 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2378 }
2379 let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2380 if maturity == "promoted" || maturity == "dropped" {
2381 return Err(InnateError::InvalidState(format!(
2382 "spark {spark_id} already {maturity}"
2383 )));
2384 }
2385 if !matches!(to, "note" | "skill") {
2386 return Err(InnateError::InvalidState(format!(
2387 "invalid spark promotion target: {to}"
2388 )));
2389 }
2390
2391 let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
2392 let (content, action) = self.sanitize_content(content);
2393 if action == SanitizeAction::Discard {
2394 return Err(InnateError::InvalidState(
2395 "sanitize discard on promote".into(),
2396 ));
2397 }
2398
2399 let promoted_hash = content_hash(&content);
2400 if self.storage.is_hash_invalidated(&promoted_hash)? {
2401 return Err(InnateError::InvalidState(
2402 "spark content hash is invalidated".into(),
2403 ));
2404 }
2405
2406 let now = utc_now_iso();
2407
2408 let existing = self.storage.query_chunks_params(
2410 "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2411 rusqlite::params![promoted_hash],
2412 )?;
2413 if let Some(e) = existing.first() {
2414 if let Some(id) = e.get("id").and_then(Value::as_str) {
2415 let id = id.to_string();
2416 self.storage.begin_immediate()?;
2417 let result = self
2418 .storage
2419 .query_chunks_params(
2420 "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2421 rusqlite::params![now, spark_id],
2422 )
2423 .and_then(|_| self.storage.commit());
2424 if result.is_err() {
2425 let _ = self.storage.rollback();
2426 result?;
2427 }
2428 return Ok(id);
2429 }
2430 }
2431
2432 let (state, conf, prot, origin, state_reason) = if to == "skill" {
2433 ("active", 0.85, 1, "installed", "init:installed")
2434 } else {
2435 ("active", 0.60, 0, "captured", "init:captured")
2436 };
2437
2438 let conf = if action == SanitizeAction::Redact {
2439 0.4_f64
2440 } else {
2441 conf
2442 };
2443 let new_id = gen_uuid();
2444 let trigger = spark.get("trigger_desc").and_then(Value::as_str);
2445 let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
2446
2447 let row = ChunkRow {
2448 id: new_id.clone(),
2449 content: content.clone(),
2450 trigger_desc: trigger.map(str::to_string),
2451 anti_trigger_desc: anti.map(str::to_string),
2452 content_hash: promoted_hash,
2453 token_count: Some(estimate_tokens(&content) as i64),
2454 origin: origin.to_string(),
2455 source: Some("manual".to_string()),
2456 protected: prot,
2457 state: state.to_string(),
2458 state_reason: Some(state_reason.to_string()),
2459 confidence: conf,
2460 confidence_reason: Some("manual_set".to_string()),
2461 parent_id: Some(spark_id.to_string()),
2462 version: 1,
2463 embed_version: 1,
2464 created_at: now.clone(),
2465 updated_at: now.clone(),
2466 ..Default::default()
2467 };
2468
2469 let cvec = self.embedding.embed_content(&content)?;
2470 let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
2471
2472 self.storage.begin_immediate()?;
2473 let result = (|| -> Result<()> {
2474 self.storage.insert_chunk(&row)?;
2475 self.storage
2476 .insert_vec_content(&new_id, &pack_embedding(&cvec))?;
2477 self.storage
2478 .insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
2479 self.storage.query_chunks_params(
2480 "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2481 rusqlite::params![now, spark_id],
2482 )?;
2483 self.storage.commit()
2484 })();
2485 if result.is_err() {
2486 let _ = self.storage.rollback();
2487 }
2488 result?;
2489 Ok(new_id)
2490 }
2491
2492 pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
2493 let spark = self
2494 .storage
2495 .get_chunk(spark_id)?
2496 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2497 if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2498 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2499 }
2500 let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2501 if maturity == "promoted" {
2502 return Err(InnateError::InvalidState(format!(
2503 "spark {spark_id} already promoted"
2504 )));
2505 }
2506 if maturity == "dropped" {
2507 return Ok(());
2508 }
2509 let now = utc_now_iso();
2510 let reason_str = if reason.is_empty() {
2511 "dropped".to_string()
2512 } else {
2513 format!("dropped:{reason}")
2514 };
2515 self.storage.begin_immediate()?;
2516 let result = self
2517 .storage
2518 .query_chunks_params(
2519 "UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
2520 rusqlite::params![reason_str, now, spark_id],
2521 )
2522 .and_then(|_| self.storage.commit());
2523 if result.is_err() {
2524 let _ = self.storage.rollback();
2525 }
2526 result.map(|_| ())
2527 }
2528
2529 pub fn approve(&self, chunk_id: &str) -> Result<()> {
2534 let chunk = self
2535 .storage
2536 .get_chunk(chunk_id)?
2537 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2538 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2539 return Err(InnateError::InvalidState(
2540 "spark lifecycle uses promote_spark() or invalidate()".into(),
2541 ));
2542 }
2543 if chunk.get("state").and_then(Value::as_str) == Some("active") {
2544 return Ok(());
2545 }
2546 if chunk.get("state").and_then(Value::as_str) != Some("pending") {
2547 return Err(InnateError::InvalidState(
2548 "approve requires pending chunk".into(),
2549 ));
2550 }
2551 let now = utc_now_iso();
2552 self.storage.begin_immediate()?;
2553 let result = (|| -> Result<()> {
2554 self.storage
2555 .update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
2556 self.storage.query_chunks_params(
2557 "UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
2558 rusqlite::params![now, chunk_id],
2559 )?;
2560 self.storage.commit()
2561 })();
2562 if result.is_err() {
2563 let _ = self.storage.rollback();
2564 }
2565 result
2566 }
2567
2568 pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
2569 let chunk = self
2570 .storage
2571 .get_chunk(chunk_id)?
2572 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2573 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2574 return Err(InnateError::InvalidState(
2575 "spark lifecycle uses drop_spark() or invalidate()".into(),
2576 ));
2577 }
2578 let now = utc_now_iso();
2579 self.storage.begin_immediate()?;
2580 let result = self
2581 .storage
2582 .update_chunk_state(chunk_id, "archived", Some(reason), &now)
2583 .and_then(|_| self.storage.commit());
2584 if result.is_err() {
2585 let _ = self.storage.rollback();
2586 }
2587 result
2588 }
2589
2590 pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
2591 let chunk = self
2592 .storage
2593 .get_chunk(chunk_id)?
2594 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2595 let h = chunk
2596 .get("content_hash")
2597 .and_then(Value::as_str)
2598 .unwrap_or("")
2599 .to_string();
2600 let now = utc_now_iso();
2601 let reason_str = if reason.is_empty() {
2602 "invalidated".to_string()
2603 } else {
2604 format!("invalidated:{reason}")
2605 };
2606
2607 self.storage.begin_immediate()?;
2608 let result = (|| -> Result<()> {
2609 self.storage.query_chunks_params(
2610 "UPDATE chunks
2611 SET state='archived', confidence=0.0, confidence_base=0.0,
2612 confidence_reason='invalidated', state_reason=?,
2613 state_updated_at=?, updated_at=?
2614 WHERE id=?",
2615 rusqlite::params![reason_str, now, now, chunk_id],
2616 )?;
2617 self.storage.query_chunks_params(
2618 "UPDATE chunks
2619 SET state='archived', confidence=0.0, confidence_base=0.0,
2620 confidence_reason='invalidated',
2621 state_reason='invalidated:same_hash',
2622 state_updated_at=?, updated_at=?
2623 WHERE content_hash=? AND id!=?",
2624 rusqlite::params![now, now, h, chunk_id],
2625 )?;
2626 self.storage.conn_execute(
2627 "DELETE FROM confidence_evidence
2628 WHERE chunk_id IN (SELECT id FROM chunks WHERE content_hash=?)",
2629 rusqlite::params![h],
2630 )?;
2631 self.storage
2632 .insert_invalidated_hash(&h, Some(reason), &now)?;
2633 self.storage.commit()
2634 })();
2635 if result.is_err() {
2636 let _ = self.storage.rollback();
2637 }
2638 result
2639 }
2640
2641 pub fn restore(&self, chunk_id: &str) -> Result<()> {
2642 let chunk = self
2643 .storage
2644 .get_chunk(chunk_id)?
2645 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2646 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
2647 if state == "active" {
2648 return Ok(());
2649 }
2650 if state != "archived" {
2651 return Err(InnateError::InvalidState(
2652 "restore requires archived chunk".into(),
2653 ));
2654 }
2655 let was_invalidated = chunk
2656 .get("state_reason")
2657 .and_then(Value::as_str)
2658 .map(|r| r.starts_with("invalidated"))
2659 .unwrap_or(false);
2660 let h = chunk
2661 .get("content_hash")
2662 .and_then(Value::as_str)
2663 .unwrap_or("")
2664 .to_string();
2665 let now = utc_now_iso();
2666
2667 self.storage.begin_immediate()?;
2668 let result = (|| -> Result<()> {
2669 self.storage
2670 .update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
2671 if was_invalidated {
2672 self.storage.query_chunks_params(
2673 "DELETE FROM invalidated_hashes WHERE content_hash=?",
2674 rusqlite::params![h],
2675 )?;
2676 }
2677 self.storage.query_chunks_params(
2678 "UPDATE chunks
2679 SET confidence_base=0.5, confidence=0.5,
2680 confidence_reason='restore',
2681 selected_count=0, selected_count_base=0,
2682 used_count=0, used_count_base=0,
2683 used_success_count=0, used_success_count_base=0,
2684 success_trace_ids_count=0,
2685 last_used_at=NULL, last_used_base=NULL,
2686 last_success_at=NULL, last_decayed_at=NULL,
2687 evidence_cutoff_at=?, updated_at=?
2688 WHERE id=?",
2689 rusqlite::params![now, now, chunk_id],
2690 )?;
2691 self.storage.conn_execute(
2692 "DELETE FROM confidence_evidence WHERE chunk_id=?",
2693 rusqlite::params![chunk_id],
2694 )?;
2695 self.storage.conn_execute(
2696 "DELETE FROM chunk_success_traces WHERE chunk_id=?",
2697 rusqlite::params![chunk_id],
2698 )?;
2699 self.storage.conn_execute(
2700 "DELETE FROM chunk_context_stats_base WHERE chunk_id=?",
2701 rusqlite::params![chunk_id],
2702 )?;
2703 self.storage.conn_execute(
2704 "DELETE FROM chunk_context_stats WHERE chunk_id=?",
2705 rusqlite::params![chunk_id],
2706 )?;
2707 self.storage.conn_execute(
2708 "UPDATE governance_proposals
2709 SET state='rejected', reason=reason || '; restored by user', updated_at=?
2710 WHERE chunk_id=? AND state IN ('pending','accepted')",
2711 rusqlite::params![now, chunk_id],
2712 )?;
2713 self.storage.commit()
2714 })();
2715 if result.is_err() {
2716 let _ = self.storage.rollback();
2717 }
2718 result
2719 }
2720
2721 pub fn evolve(&self, trigger: &str) -> Result<Value> {
2726 if !matches!(trigger, "manual" | "scheduled" | "threshold") {
2727 return Err(InnateError::InvalidState(format!(
2728 "invalid evolve trigger: {trigger}"
2729 )));
2730 }
2731 let evolve_started_at = utc_now_iso();
2732 let retry_cutoff = minutes_ago(&evolve_started_at, 5);
2733 let recovered_failed = self.storage.conn_execute_count(
2734 "UPDATE episodic_log
2735 SET distill_state='new', distill_note='retry_failed',
2736 distill_locked_at=NULL, distill_run_id=NULL
2737 WHERE distill_state='failed'
2738 AND distill_attempts < 3
2739 AND COALESCE(distill_last_failed_at, distill_accounted_at, ts) < ?",
2740 rusqlite::params![retry_cutoff],
2741 )?;
2742 if recovered_failed > 0 {
2743 self.storage.request_evolve(
2744 &gen_uuid(),
2745 "distill_retry",
2746 &evolve_started_at,
2747 )?;
2748 }
2749 if trigger == "scheduled" {
2750 let age_cutoff = hours_ago(
2751 &evolve_started_at,
2752 self.evolve_schedule_interval_hours,
2753 );
2754 let aged_new = count_query_params(
2755 &self.storage,
2756 "SELECT COUNT(*) FROM episodic_log
2757 WHERE distill_state='new' AND ts <= ?",
2758 rusqlite::params![age_cutoff],
2759 )?;
2760 if aged_new > 0 {
2761 self.storage
2762 .request_evolve(&gen_uuid(), "scheduled", &evolve_started_at)?;
2763 }
2764 }
2765 let request = self.storage.claim_evolve_request_with_reason(
2766 &evolve_started_at,
2767 &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
2768 )?;
2769 let request_id = request.as_ref().map(|claim| claim.id.as_str());
2770 let request_reason = request.as_ref().map(|claim| claim.reason.as_str());
2771 if trigger == "scheduled" && request_id.is_none() {
2773 let curator = Arc::clone(&self.curator);
2774 let curate = curator.run(self, &CurateScope::default())?;
2775 return Ok(json!({
2776 "distilled": 0,
2777 "curate": self.format_curate_report(&curate),
2778 "skipped": "no_evolve_request"
2779 }));
2780 }
2781
2782 if trigger == "threshold" {
2784 let rows = self.storage.query_chunks(
2785 "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
2786 )?;
2787 let cnt = rows
2788 .first()
2789 .and_then(|r| r.get("cnt"))
2790 .and_then(Value::as_i64)
2791 .unwrap_or(0);
2792 if cnt < self.evolve_threshold {
2793 let curator = Arc::clone(&self.curator);
2794 let curate = curator.run(self, &CurateScope::default())?;
2795 if let Some(id) = request_id {
2796 if matches!(request_reason, Some("governance" | "governance_ready")) {
2797 self.storage.finish_evolve_request(
2798 id,
2799 "completed",
2800 Some("curate_only"),
2801 &utc_now_iso(),
2802 )?;
2803 } else {
2804 self.storage.defer_evolve_request(
2805 id,
2806 "below_threshold",
2807 &hours_after(
2808 &evolve_started_at,
2809 self.evolve_schedule_interval_hours.max(1),
2810 ),
2811 )?;
2812 }
2813 }
2814 return Ok(json!({
2815 "distilled": 0,
2816 "curate": self.format_curate_report(&curate),
2817 "skipped": "below_threshold"
2818 }));
2819 }
2820
2821 }
2822
2823 if trigger != "manual" {
2824 if let Some(limit) = self
2825 .storage
2826 .get_meta("max_distill_tokens_per_period")?
2827 .and_then(|value| value.parse::<i64>().ok())
2828 .filter(|value| *value > 0)
2829 {
2830 let period_start = self.distill_token_period_start(&evolve_started_at)?;
2831 let rows = self.storage.query_chunks_params(
2832 "SELECT COALESCE(SUM(prompt_tokens + completion_tokens),0) AS used
2833 FROM distill_token_usage
2834 WHERE accounted_at >= ?",
2835 rusqlite::params![period_start],
2836 )?;
2837 let used_tokens = rows
2838 .first()
2839 .and_then(|row| row.get("used"))
2840 .and_then(Value::as_i64)
2841 .unwrap_or(0);
2842 if used_tokens >= limit {
2843 let curator = Arc::clone(&self.curator);
2844 let curate = curator.run(self, &CurateScope::default())?;
2845 if let Some(id) = request_id {
2846 if matches!(request_reason, Some("governance" | "governance_ready")) {
2847 self.storage.finish_evolve_request(
2848 id,
2849 "completed",
2850 Some("curate_only"),
2851 &utc_now_iso(),
2852 )?;
2853 } else {
2854 self.storage.defer_evolve_request(
2855 id,
2856 "distill_token_limit",
2857 &hours_after(&evolve_started_at, 1),
2858 )?;
2859 }
2860 }
2861 return Ok(json!({
2862 "distilled": 0,
2863 "curate": self.format_curate_report(&curate),
2864 "skipped": "distill_token_limit",
2865 "distill_tokens_used": used_tokens,
2866 "distill_token_limit": limit,
2867 "period_start": period_start,
2868 }));
2869 }
2870 }
2871 }
2872
2873 let result = (|| -> Result<Value> {
2874 let distill = self.distill_batch()?;
2875 let curator = Arc::clone(&self.curator);
2876 let curate = curator.run(self, &CurateScope::default())?;
2877 Ok(json!({
2878 "distilled": distill.distilled,
2879 "distill_failed": distill.failed,
2880 "curate": self.format_curate_report(&curate),
2881 }))
2882 })();
2883 if let Some(id) = request_id {
2884 let (state, note) = match &result {
2885 Ok(_) => ("completed", None),
2886 Err(error) => ("failed", Some(error.to_string())),
2887 };
2888 self.storage
2889 .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
2890 }
2891 if result.is_ok() {
2892 self.storage
2893 .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
2894 }
2895
2896 let failed_remaining = count_query(
2897 &self.storage,
2898 "SELECT COUNT(*) FROM episodic_log
2899 WHERE distill_state='failed' AND distill_attempts < 3",
2900 )?;
2901 if failed_remaining > 0 {
2902 self.storage.request_evolve_at(
2903 &gen_uuid(),
2904 "distill_retry",
2905 &utc_now_iso(),
2906 Some(&minutes_after(&utc_now_iso(), 5)),
2907 )?;
2908 }
2909
2910 if result.is_ok() {
2912 let remaining = count_query(
2913 &self.storage,
2914 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2915 )?;
2916 if remaining > 0 {
2917 let _ = self.storage.request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
2918 }
2919 }
2920 result
2921 }
2922
2923 fn format_curate_report(&self, curate: &CurateReport) -> Value {
2924 json!({
2925 "archived": curate.archived.len(),
2926 "deduped": curate.deduped.len(),
2927 "decayed": curate.decayed.len(),
2928 "recovered": curate.recovered.len(),
2929 "orphans": curate.orphans.len(),
2930 "warnings": curate.warnings,
2931 })
2932 }
2933
2934 fn distill_batch(&self) -> Result<DistillBatchReport> {
2935 let run_id = gen_uuid();
2936 let now = utc_now_iso();
2937
2938 self.storage.begin_immediate()?;
2940 let logs = match self
2941 .storage
2942 .claim_distill_batch(&run_id, self.distill_batch_size, &now)
2943 {
2944 Ok(l) => {
2945 self.storage.commit()?;
2946 l
2947 }
2948 Err(e) => {
2949 let _ = self.storage.rollback();
2950 return Err(e);
2951 }
2952 };
2953
2954 let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
2955 let mut failed_logs = HashSet::new();
2956 let mut distill_errors = Vec::new();
2957 for log in &logs {
2958 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
2959 let context_key = log.get("context_key").and_then(Value::as_str);
2960 let related_logs: Vec<Value> = logs
2961 .iter()
2962 .filter(|other| {
2963 other.get("id").and_then(Value::as_str) == Some(log_id)
2964 || (context_key.is_some()
2965 && other.get("context_key").and_then(Value::as_str) == context_key)
2966 })
2967 .cloned()
2968 .collect();
2969 match self.distiller.distill_with_context(log, &related_logs) {
2970 Ok(chunks) => {
2971 if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
2972 let error = "distiller returned a chunk for an unknown source log";
2973 failed_logs.insert(log_id.to_string());
2974 distill_errors.push(format!("{log_id}: {error}"));
2975 self.finish_distill_log(
2976 log_id,
2977 "failed",
2978 Some(&format!("distill_failed:{error}")),
2979 estimate_distill_prompt_tokens(log, &related_logs),
2980 0,
2981 )?;
2982 continue;
2983 }
2984 chunks_by_log.insert(log_id.to_string(), chunks);
2985 }
2986 Err(error) => {
2987 let note = format!("distill_failed:{error}");
2988 failed_logs.insert(log_id.to_string());
2989 distill_errors.push(format!("{log_id}: {error}"));
2990 self.finish_distill_log(
2991 log_id,
2992 "failed",
2993 Some(¬e),
2994 estimate_distill_prompt_tokens(log, &related_logs),
2995 0,
2996 )?;
2997 }
2998 }
2999 }
3000
3001 let mut count = 0;
3002 let provenance = self.distiller.provenance();
3003 for log in &logs {
3004 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
3005 if failed_logs.contains(log_id) {
3006 continue;
3007 }
3008 let context_key = log.get("context_key").and_then(Value::as_str);
3009 let related_logs: Vec<Value> = logs
3010 .iter()
3011 .filter(|other| {
3012 other.get("id").and_then(Value::as_str) == Some(log_id)
3013 || (context_key.is_some()
3014 && other.get("context_key").and_then(Value::as_str) == context_key)
3015 })
3016 .cloned()
3017 .collect();
3018 let prompt_tokens = estimate_distill_prompt_tokens(log, &related_logs);
3019 let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
3020 let completion_tokens = chunks
3021 .iter()
3022 .map(estimate_distilled_chunk_tokens)
3023 .sum::<i64>();
3024 if chunks.is_empty() {
3025 self.finish_distill_log(
3026 log_id,
3027 "discarded",
3028 Some("insufficient_material"),
3029 prompt_tokens,
3030 completion_tokens,
3031 )?;
3032 continue;
3033 }
3034
3035 struct PreparedChunk {
3040 row: ChunkRow,
3041 cvec_bytes: Vec<u8>,
3042 tvec_bytes: Vec<u8>,
3043 }
3044 let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
3045 let mut embedding_failures = 0_usize;
3046 for dc in chunks {
3047 let (content, action) = self.sanitize_content(&dc.content);
3048 if action == SanitizeAction::Discard {
3049 continue; }
3051 let h = content_hash(&content);
3052 if self.storage.is_hash_invalidated(&h)? {
3053 continue; }
3055 let redacted = action == SanitizeAction::Redact;
3056 let conf = if redacted { 0.4 } else { 0.55 };
3057 let now2 = utc_now_iso();
3058 let chunk_id = gen_uuid();
3059 let tokens = estimate_tokens(&content) as i64;
3060 let row = ChunkRow {
3061 id: chunk_id,
3062 content: content.clone(),
3063 trigger_desc: dc.trigger_desc.clone(),
3064 anti_trigger_desc: dc.anti_trigger_desc,
3065 content_hash: h,
3066 token_count: Some(tokens),
3067 origin: "distilled".to_string(),
3068 distilled_from: Some(dc.source_log_id),
3069 distill_provider: provenance.provider.clone(),
3070 distill_model: provenance.model.clone(),
3071 distill_prompt_version: provenance.prompt_version.clone(),
3072 state: "pending".to_string(),
3073 state_reason: Some("init:distilled".to_string()),
3074 confidence: conf,
3075 confidence_reason: Some("init:distilled".to_string()),
3076 version: 1,
3077 embed_version: 1,
3078 created_at: now2.clone(),
3079 updated_at: now2,
3080 ..Default::default()
3081 };
3082 let cvec = match self.embedding.embed_content(&content) {
3083 Ok(v) => v,
3084 Err(_) => {
3085 embedding_failures += 1;
3086 continue;
3087 }
3088 };
3089 let tvec = match self
3090 .embedding
3091 .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
3092 {
3093 Ok(v) => v,
3094 Err(_) => {
3095 embedding_failures += 1;
3096 continue;
3097 }
3098 };
3099 prepared.push(PreparedChunk {
3100 row,
3101 cvec_bytes: pack_embedding(&cvec),
3102 tvec_bytes: pack_embedding(&tvec),
3103 });
3104 }
3105
3106 if prepared.is_empty() {
3107 let note = if embedding_failures > 0 {
3108 "embedding_failed"
3109 } else {
3110 "all_chunks_filtered"
3111 };
3112 self.finish_distill_log(
3113 log_id,
3114 if embedding_failures > 0 {
3115 "failed"
3116 } else {
3117 "discarded"
3118 },
3119 Some(note),
3120 prompt_tokens,
3121 completion_tokens,
3122 )?;
3123 if embedding_failures > 0 {
3124 failed_logs.insert(log_id.to_string());
3125 }
3126 continue;
3127 }
3128
3129 let accounted_at = utc_now_iso();
3131 self.storage.begin_immediate()?;
3132 let write_result = (|| -> Result<()> {
3133 for pc in &prepared {
3134 self.storage.insert_chunk(&pc.row)?;
3135 self.storage.insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
3136 self.storage.insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
3137 }
3138 let note = (embedding_failures > 0)
3139 .then(|| format!("partial_embedding_failures:{embedding_failures}"));
3140 self.storage.finish_distill_log(
3141 log_id,
3142 "distilled",
3143 note.as_deref(),
3144 prompt_tokens,
3145 completion_tokens,
3146 &accounted_at,
3147 )?;
3148 self.storage.commit()
3149 })();
3150 if let Err(error) = write_result {
3151 let _ = self.storage.rollback();
3152 let note = format!("distill_write_failed:{error}");
3153 self.finish_distill_log(
3154 log_id,
3155 "failed",
3156 Some(¬e),
3157 prompt_tokens,
3158 completion_tokens,
3159 )?;
3160 failed_logs.insert(log_id.to_string());
3161 continue;
3162 }
3163 count += 1;
3164 }
3165 if !distill_errors.is_empty() {
3166 eprintln!(
3170 "[innate] distillation partial failure ({} log(s)): {}",
3171 distill_errors.len(),
3172 distill_errors.join("; ")
3173 );
3174 }
3175 Ok(DistillBatchReport {
3176 distilled: count,
3177 failed: failed_logs.len(),
3178 })
3179 }
3180
3181 fn finish_distill_log(
3182 &self,
3183 log_id: &str,
3184 state: &str,
3185 note: Option<&str>,
3186 prompt_tokens: i64,
3187 completion_tokens: i64,
3188 ) -> Result<()> {
3189 let accounted_at = utc_now_iso();
3190 self.storage.begin_immediate()?;
3191 let result = (|| -> Result<()> {
3192 self.storage.finish_distill_log(
3193 log_id,
3194 state,
3195 note,
3196 prompt_tokens,
3197 completion_tokens,
3198 &accounted_at,
3199 )?;
3200 self.storage.commit()
3201 })();
3202 if result.is_err() {
3203 let _ = self.storage.rollback();
3204 }
3205 result
3206 }
3207
3208 fn distill_token_period_start(&self, now: &str) -> Result<String> {
3209 let window_hours = self
3210 .storage
3211 .get_meta("evolve.distill_token_window_hours")?
3212 .and_then(|value| value.parse::<i64>().ok())
3213 .unwrap_or(24)
3214 .max(1);
3215 Ok(hours_ago(now, window_hours))
3216 }
3217
3218 pub(crate) fn builtin_curate_impl(&self, scope: &CurateScope) -> Result<CurateReport> {
3219 let mut report = CurateReport::default();
3220 let now_iso = utc_now_iso();
3221 if scope.dry_run {
3222 let archived_count: i64 = count_query(&self.storage,
3224 "SELECT COUNT(*) FROM chunks WHERE origin!='spark' AND protected=0 AND state='active'")?;
3225 report.stats.insert("dry_run".to_string(), json!(true));
3226 report
3227 .stats
3228 .insert("eligible_for_governance".to_string(), json!(archived_count));
3229 return Ok(report);
3230 }
3231
3232 self.storage.begin_immediate()?;
3234 let agg_result = (|| -> Result<()> {
3235 let cutoff_ts = now_iso.clone();
3236
3237 self.storage.conn_execute(
3239 "DELETE FROM chunk_success_traces",
3240 rusqlite::params![],
3241 )?;
3242 self.storage.conn_execute(
3243 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts)
3244 SELECT ut.chunk_id, ut.trace_id, MAX(ut.ts)
3245 FROM usage_trace ut
3246 WHERE ut.event = 'used'
3247 AND ut.chunk_id IS NOT NULL
3248 AND ut.ts > COALESCE((
3249 SELECT evidence_cutoff_at FROM chunks c
3250 WHERE c.id=ut.chunk_id
3251 ), '')
3252 AND (
3253 EXISTS (SELECT 1 FROM usage_trace ok
3254 WHERE ok.trace_id = ut.trace_id
3255 AND ok.event = 'task_ok' AND ok.chunk_id IS NULL)
3256 OR EXISTS (SELECT 1 FROM episodic_log el
3257 WHERE el.trace_id = ut.trace_id AND el.outcome = 'ok')
3258 )
3259 GROUP BY ut.chunk_id, ut.trace_id",
3260 rusqlite::params![],
3261 )?;
3262
3263 self.storage.conn_execute(
3265 "WITH cst AS (
3266 SELECT chunk_id, COUNT(*) AS cnt, MAX(ts) AS max_ts
3267 FROM chunk_success_traces
3268 GROUP BY chunk_id
3269 )
3270 UPDATE chunks SET
3271 used_success_count = used_success_count_base
3272 + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3273 success_trace_ids_count = used_success_count_base
3274 + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3275 last_success_at = COALESCE(
3276 (SELECT max_ts FROM cst WHERE cst.chunk_id=chunks.id),
3277 last_success_at
3278 )
3279 WHERE origin!='spark'",
3280 rusqlite::params![],
3281 )?;
3282
3283 self.storage.conn_execute(
3285 "UPDATE chunks SET
3286 selected_count = selected_count_base + COALESCE(
3287 (SELECT COUNT(*) FROM usage_trace
3288 WHERE chunk_id = chunks.id AND event = 'selected'
3289 AND ts > COALESCE(chunks.evidence_cutoff_at, '')), 0),
3290 used_count = used_count_base + COALESCE(
3291 (SELECT COUNT(*) FROM usage_trace
3292 WHERE chunk_id = chunks.id AND event = 'used'
3293 AND ts > COALESCE(chunks.evidence_cutoff_at, '')), 0),
3294 last_used_at = COALESCE(
3295 (SELECT MAX(ts) FROM usage_trace
3296 WHERE chunk_id=chunks.id AND event='used'
3297 AND ts > COALESCE(chunks.evidence_cutoff_at, '')),
3298 last_used_base
3299 )
3300 WHERE origin!='spark'",
3301 rusqlite::params![],
3302 )?;
3303
3304 self.storage.set_meta("last_agg_ts", &cutoff_ts)?;
3308 self.storage.purge_usage_trace(&cutoff_ts)?;
3309 self.storage.commit()
3310 })();
3311 if agg_result.is_err() {
3312 let _ = self.storage.rollback();
3313 agg_result?;
3314 }
3315
3316 self.storage.begin_immediate()?;
3318 let recover_result = (|| -> Result<()> {
3319 let screening_cutoff = minutes_ago(&now_iso, self.screening_timeout_minutes);
3321 let stale = self.storage.query_chunks_params(
3322 "SELECT id, distill_run_id FROM episodic_log
3323 WHERE distill_state='screening' AND distill_locked_at < ?",
3324 rusqlite::params![screening_cutoff],
3325 )?;
3326 for row in &stale {
3327 let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3328 let run_id = row
3329 .get("distill_run_id")
3330 .and_then(Value::as_str)
3331 .unwrap_or("unknown");
3332 let note = format!("screening_timeout:{run_id}");
3333 self.storage.conn_execute(
3334 "UPDATE episodic_log
3335 SET distill_state='failed', distill_note=?,
3336 distill_attempts=distill_attempts+1,
3337 distill_last_failed_at=?,
3338 distill_run_id=NULL, distill_locked_at=NULL
3339 WHERE id=?",
3340 rusqlite::params![note, now_iso, id],
3341 )?;
3342 report.recovered.push(id.to_string());
3343 report
3344 .warnings
3345 .push(format!("stale screening recovered as failed: {id}"));
3346 }
3347
3348 let open_ttl_cutoff = days_ago(&now_iso, self.open_ttl_days);
3350 self.storage.conn_execute(
3351 "UPDATE episodic_log
3352 SET distill_state='discarded', distill_note='no_record_timeout',
3353 task_state='timed_out', completed_at=?
3354 WHERE distill_state='open' AND ts < ?",
3355 rusqlite::params![now_iso, open_ttl_cutoff],
3356 )?;
3357 self.storage.commit()
3358 })();
3359 if recover_result.is_err() {
3360 let _ = self.storage.rollback();
3361 recover_result?;
3362 }
3363
3364 let scope_origin = scope.origin.clone();
3366 let scope_skill = scope.skill_name.clone();
3367 self.storage.begin_immediate()?;
3368 let gov_result = (|| -> Result<()> {
3369 let governance_chunks = self
3372 .storage
3373 .query_chunks(
3374 "SELECT DISTINCT chunk_id FROM governance_proposals WHERE state='pending'",
3375 )?;
3376 for row in governance_chunks {
3377 if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
3378 self.refresh_governance_evidence(chunk_id, &now_iso)?;
3379 }
3380 }
3381
3382 let low_conf_cutoff = days_ago(&now_iso, self.low_conf_idle_days);
3384 let low_conf = self.storage.query_chunks_params(
3385 "SELECT id FROM chunks
3386 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3387 AND last_used_at IS NOT NULL
3388 AND confidence < ?
3389 AND last_used_at < ?
3390 AND (? IS NULL OR origin=?)
3391 AND (? IS NULL OR skill_name=?)",
3392 rusqlite::params![
3393 self.low_conf_threshold,
3394 low_conf_cutoff,
3395 scope_origin,
3396 scope_origin,
3397 scope_skill,
3398 scope_skill
3399 ],
3400 )?;
3401 for c in &low_conf {
3402 if let Some(id) = c.get("id").and_then(Value::as_str) {
3403 self.storage.update_chunk_state(
3404 id,
3405 "archived",
3406 Some("low_confidence"),
3407 &now_iso,
3408 )?;
3409 report.archived.push(id.to_string());
3410 }
3411 }
3412
3413 let rep_sel = self.storage.query_chunks_params(
3415 "SELECT id FROM chunks
3416 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3417 AND selected_count >= ? AND used_count = 0 AND confidence < ?
3418 AND (? IS NULL OR origin=?)
3419 AND (? IS NULL OR skill_name=?)",
3420 rusqlite::params![
3421 self.repeat_select_min,
3422 self.repeat_select_conf_max,
3423 scope_origin,
3424 scope_origin,
3425 scope_skill,
3426 scope_skill
3427 ],
3428 )?;
3429 for c in &rep_sel {
3430 if let Some(id) = c.get("id").and_then(Value::as_str) {
3431 if !report.archived.contains(&id.to_string()) {
3432 self.storage.update_chunk_state(
3433 id,
3434 "archived",
3435 Some("repeated_selected_unused"),
3436 &now_iso,
3437 )?;
3438 report.archived.push(id.to_string());
3439 }
3440 }
3441 }
3442
3443 let never_used_cutoff = days_ago(&now_iso, self.never_used_age_days);
3445 let never_used = self.storage.query_chunks_params(
3446 "SELECT id FROM chunks
3447 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3448 AND used_count = 0 AND selected_count = 0
3449 AND COALESCE(evidence_cutoff_at, created_at) < ?
3450 AND (? IS NULL OR origin=?)
3451 AND (? IS NULL OR skill_name=?)",
3452 rusqlite::params![
3453 never_used_cutoff,
3454 scope_origin,
3455 scope_origin,
3456 scope_skill,
3457 scope_skill
3458 ],
3459 )?;
3460 for c in &never_used {
3461 if let Some(id) = c.get("id").and_then(Value::as_str) {
3462 if !report.archived.contains(&id.to_string()) {
3463 self.storage.update_chunk_state(
3464 id,
3465 "archived",
3466 Some("never_used"),
3467 &now_iso,
3468 )?;
3469 report.archived.push(id.to_string());
3470 }
3471 }
3472 }
3473
3474 let gov_proposals = self.storage.query_chunks_params(
3478 "SELECT DISTINCT chunk_id FROM governance_proposals
3479 WHERE state='pending'
3480 AND evidence_score >= ? AND actor_count >= 2",
3481 rusqlite::params![self.governance_archive_threshold as f64],
3482 )?;
3483 for c in &gov_proposals {
3484 if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3485 let already_archived = report.archived.contains(&cid.to_string());
3486 let eligible = !already_archived && self.storage.get_chunk(cid)?.map(|ch| {
3487 ch.get("origin").and_then(Value::as_str) != Some("spark")
3488 && ch.get("protected").and_then(Value::as_i64).unwrap_or(0) == 0
3489 && matches!(
3490 ch.get("state").and_then(Value::as_str),
3491 Some("active") | Some("pending")
3492 )
3493 }).unwrap_or(false);
3494 if eligible {
3495 self.storage.update_chunk_state(
3496 cid,
3497 "archived",
3498 Some("governance_proposal"),
3499 &now_iso,
3500 )?;
3501 report.archived.push(cid.to_string());
3502 self.storage.conn_execute(
3503 "UPDATE governance_proposals
3504 SET state='accepted', updated_at=?
3505 WHERE chunk_id=? AND state='pending'",
3506 rusqlite::params![now_iso, cid],
3507 )?;
3508 } else {
3509 self.storage.conn_execute(
3510 "UPDATE governance_proposals
3511 SET state='rejected', updated_at=?
3512 WHERE chunk_id=? AND state='pending'",
3513 rusqlite::params![now_iso, cid],
3514 )?;
3515 }
3516 }
3517 }
3518
3519 let proposal_expiry_cutoff = days_ago(&now_iso, self.governance_proposal_max_age_days);
3523 self.storage.conn_execute(
3524 "UPDATE governance_proposals
3525 SET state='rejected', updated_at=?
3526 WHERE state='pending'
3527 AND evidence_score < ?
3528 AND created_at < ?",
3529 rusqlite::params![
3530 now_iso,
3531 self.governance_archive_threshold as f64,
3532 proposal_expiry_cutoff
3533 ],
3534 )?;
3535
3536 let neg_feedback_chunks = self.storage.query_chunks_params(
3540 "SELECT p.chunk_id FROM governance_proposals p
3541 JOIN chunks c ON c.id = p.chunk_id
3542 WHERE c.origin!='spark' AND c.protected=0
3543 AND c.state IN ('active','pending')
3544 AND p.state='pending'
3545 AND p.evidence_score >= ? AND p.actor_count >= 2
3546 AND (? IS NULL OR c.origin=?)
3547 AND (? IS NULL OR c.skill_name=?)
3548 GROUP BY p.chunk_id",
3549 rusqlite::params![
3550 self.negative_feedback_archive_threshold as f64,
3551 scope_origin,
3552 scope_origin,
3553 scope_skill,
3554 scope_skill
3555 ],
3556 )?;
3557 for c in &neg_feedback_chunks {
3558 if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3559 if !report.archived.contains(&cid.to_string()) {
3560 self.storage.update_chunk_state(
3561 cid,
3562 "archived",
3563 Some("sustained_negative_feedback"),
3564 &now_iso,
3565 )?;
3566 report.archived.push(cid.to_string());
3567 }
3568 }
3569 }
3570
3571 let high_fail_chunks = self.storage.query_chunks_params(
3575 "SELECT id FROM chunks
3576 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3577 AND used_count >= ?
3578 AND CAST(used_success_count AS REAL) / CAST(used_count AS REAL) < ?
3579 AND confidence < ?
3580 AND (? IS NULL OR origin=?)
3581 AND (? IS NULL OR skill_name=?)",
3582 rusqlite::params![
3583 self.failure_min_uses,
3584 self.failure_max_success_rate,
3585 self.failure_confidence_max,
3586 scope_origin, scope_origin, scope_skill, scope_skill
3587 ],
3588 )?;
3589 for c in &high_fail_chunks {
3590 if let Some(cid) = c.get("id").and_then(Value::as_str) {
3591 if !report.archived.contains(&cid.to_string()) {
3592 self.storage.update_chunk_state(
3593 cid, "archived", Some("sustained_task_failure"), &now_iso,
3594 )?;
3595 report.archived.push(cid.to_string());
3596 }
3597 }
3598 }
3599
3600 let dupes = self.storage.query_chunks_params(
3602 "SELECT content_hash FROM chunks
3603 WHERE origin!='spark' AND state IN ('active','pending')
3604 AND (? IS NULL OR origin=?)
3605 AND (? IS NULL OR skill_name=?)
3606 GROUP BY content_hash HAVING COUNT(*) > 1",
3607 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3608 )?;
3609 for d in &dupes {
3610 if let Some(h) = d.get("content_hash").and_then(Value::as_str) {
3611 let group = self.storage.query_chunks_params(
3612 "SELECT id, confidence, protected FROM chunks
3613 WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending')
3614 AND (? IS NULL OR origin=?)
3615 AND (? IS NULL OR skill_name=?)
3616 ORDER BY protected DESC, confidence DESC",
3617 rusqlite::params![h, scope_origin, scope_origin, scope_skill, scope_skill],
3618 )?;
3619 let canonical_id = group
3620 .first()
3621 .and_then(|row| row.get("id"))
3622 .and_then(Value::as_str)
3623 .unwrap_or("");
3624 for row in group.iter().skip(1) {
3625 let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3626 let reason = format!("duplicate:{canonical_id}");
3627 self.storage
3628 .update_chunk_state(id, "archived", Some(&reason), &now_iso)?;
3629 self.storage.conn_execute(
3630 "UPDATE chunks SET parent_id=?, updated_at=? WHERE id=?",
3631 rusqlite::params![canonical_id, now_iso, id],
3632 )?;
3633 report.deduped.push(id.to_string());
3634 }
3635 }
3636 }
3637
3638 let decay_candidates = self.storage.query_chunks_params(
3643 "SELECT id, confidence, last_used_at, last_decayed_at FROM chunks
3644 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3645 AND last_used_at IS NOT NULL
3646 AND (? IS NULL OR origin=?)
3647 AND (? IS NULL OR skill_name=?)",
3648 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3649 )?;
3650 for c in &decay_candidates {
3651 let id = match c.get("id").and_then(Value::as_str) {
3652 Some(v) => v,
3653 None => continue,
3654 };
3655 let conf = c.get("confidence").and_then(Value::as_f64).unwrap_or(0.5);
3656 let last_used = c.get("last_used_at").and_then(Value::as_str).unwrap_or(&now_iso);
3657 let decay_ref = c
3659 .get("last_decayed_at")
3660 .and_then(Value::as_str)
3661 .filter(|s| *s > last_used)
3662 .unwrap_or(last_used);
3663 let delta_days = iso_days_diff(&now_iso, decay_ref);
3664 if delta_days <= 0 {
3665 continue;
3666 }
3667 let floor = self.decay_floor;
3668 let decay_alpha = 1.0 - 0.5_f64.powf(delta_days as f64 / 90.0);
3669 let new_conf = conf + decay_alpha * (floor - conf);
3670 if (new_conf - conf).abs() > 0.001 {
3671 let note = format!("decay:{delta_days}d");
3672 self.storage.upsert_confidence_evidence(
3673 &gen_uuid(),
3674 None,
3675 id,
3676 "decay",
3677 floor,
3678 decay_alpha,
3679 ¬e,
3680 None,
3681 &now_iso,
3682 )?;
3683 self.recompute_chunk_confidence(id, &now_iso)?;
3684 self.storage.update_chunk_last_decayed_at(id, &now_iso)?;
3685 report.decayed.push(id.to_string());
3686 }
3687 }
3688
3689 let promotable = self.storage.query_chunks_params(
3691 "SELECT id FROM chunks
3692 WHERE state='pending' AND origin!='spark'
3693 AND used_success_count >= ?
3694 AND success_trace_ids_count >= 2
3695 AND confidence >= ?
3696 AND (? IS NULL OR origin=?)
3697 AND (? IS NULL OR skill_name=?)",
3698 rusqlite::params![
3699 self.promote_used_success_min,
3700 self.promote_confidence_min,
3701 scope_origin,
3702 scope_origin,
3703 scope_skill,
3704 scope_skill
3705 ],
3706 )?;
3707 for c in &promotable {
3708 if let Some(id) = c.get("id").and_then(Value::as_str) {
3709 self.storage.update_chunk_state(
3710 id,
3711 "active",
3712 Some("repeated_success"),
3713 &now_iso,
3714 )?;
3715 }
3716 }
3717
3718 let all_deps = self
3720 .storage
3721 .query_chunks("SELECT src, dst FROM deps WHERE kind='hard'")?;
3722 let cycles = detect_cycles(&all_deps);
3723 report.cycles = cycles;
3724 let orphan_rows = self.storage.query_chunks_params(
3725 "SELECT d.src, d.dst, s.id AS src_exists, t.id AS dst_exists
3726 FROM deps d
3727 LEFT JOIN chunks s ON s.id=d.src
3728 LEFT JOIN chunks t ON t.id=d.dst
3729 WHERE d.kind='hard'
3730 AND (? IS NULL OR s.origin=?)
3731 AND (? IS NULL OR s.skill_name=?)",
3732 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3733 )?;
3734 let mut orphans = HashSet::new();
3735 for row in orphan_rows {
3736 if row.get("src_exists").is_none_or(Value::is_null) {
3737 if let Some(id) = row.get("src").and_then(Value::as_str) {
3738 orphans.insert(id.to_string());
3739 }
3740 }
3741 if row.get("dst_exists").is_none_or(Value::is_null) {
3742 if let Some(id) = row.get("dst").and_then(Value::as_str) {
3743 orphans.insert(id.to_string());
3744 }
3745 }
3746 }
3747 report.orphans = orphans.into_iter().collect();
3748 report.orphans.sort();
3749
3750 self.rebuild_context_stats(&now_iso)?;
3754
3755 self.storage.commit()
3756 })();
3757 if gov_result.is_err() {
3758 let _ = self.storage.rollback();
3759 gov_result?;
3760 }
3761
3762 self.storage.begin_immediate()?;
3766 let purge_cutoff = days_ago(&now_iso, 30);
3767 let purge_result = self
3768 .storage
3769 .conn_execute(
3770 "UPDATE episodic_log
3771 SET query=NULL, recall_snapshot=NULL, output=NULL, output_summary=NULL,
3772 nomination=NULL,
3773 distill_note=COALESCE(distill_note, 'compacted')
3774 WHERE distill_state IN ('distilled','discarded','failed')
3775 AND ts < ?",
3776 rusqlite::params![purge_cutoff],
3777 )
3778 .and_then(|_| self.storage.commit());
3779 if purge_result.is_err() {
3780 let _ = self.storage.rollback();
3781 purge_result?;
3782 }
3783
3784 self.storage.begin_immediate()?;
3787 let evolve_req_cutoff = days_ago(&now_iso, 30);
3788 let prune_req_result = self
3789 .storage
3790 .conn_execute(
3791 "DELETE FROM evolve_requests
3792 WHERE state IN ('completed','failed') AND requested_at < ?",
3793 rusqlite::params![evolve_req_cutoff],
3794 )
3795 .and_then(|_| self.storage.commit());
3796 if prune_req_result.is_err() {
3797 let _ = self.storage.rollback();
3798 prune_req_result?;
3799 }
3800
3801 Ok(report)
3802 }
3803
3804 pub fn inspect(&self) -> Result<Value> {
3809 let total: i64 = count_query(
3810 &self.storage,
3811 "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
3812 )?;
3813 let active: i64 = count_query(
3814 &self.storage,
3815 "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
3816 )?;
3817 let pending: i64 = count_query(
3818 &self.storage,
3819 "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
3820 )?;
3821 let archived: i64 = count_query(
3822 &self.storage,
3823 "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
3824 )?;
3825 let sparks: i64 = count_query(
3826 &self.storage,
3827 "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
3828 )?;
3829 let open_logs: i64 = count_query(
3830 &self.storage,
3831 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
3832 )?;
3833 let new_logs: i64 = count_query(
3834 &self.storage,
3835 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
3836 )?;
3837 let embed_rebuild: i64 = count_query(&self.storage,
3838 "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
3839 let schema_version = self.storage.get_meta_or("schema_version", "?");
3840 let lib_id = self.storage.get_meta_or("lib_id", "?");
3841 let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
3842
3843 let metric_window_start = days_ago(&utc_now_iso(), 30);
3844 let trace_metrics = self.storage.query_chunks_params(
3845 "SELECT COUNT(*) AS total,
3846 SUM(CASE WHEN task_state='completed' THEN 1 ELSE 0 END) AS completed,
3847 SUM(CASE WHEN task_state='timed_out' THEN 1 ELSE 0 END) AS timed_out,
3848 SUM(CASE WHEN task_state='completed' AND usage_state!='unknown'
3849 THEN 1 ELSE 0 END) AS usage_known,
3850 SUM(CASE WHEN task_state='completed' AND usage_state='known_some'
3851 THEN 1 ELSE 0 END) AS usage_some,
3852 SUM(CASE WHEN task_state='completed'
3853 AND outcome IN ('ok','fail')
3854 THEN 1 ELSE 0 END) AS outcome_known,
3855 SUM(CASE WHEN outcome='ok' THEN 1 ELSE 0 END) AS succeeded
3856 FROM episodic_log WHERE ts >= ?",
3857 rusqlite::params![metric_window_start],
3858 )?;
3859 let trace_row = trace_metrics.first();
3860 let trace_total = trace_row
3861 .and_then(|row| row.get("total"))
3862 .and_then(Value::as_i64)
3863 .unwrap_or(0);
3864 let trace_completed = trace_row
3865 .and_then(|row| row.get("completed"))
3866 .and_then(Value::as_i64)
3867 .unwrap_or(0);
3868 let trace_timed_out = trace_row
3869 .and_then(|row| row.get("timed_out"))
3870 .and_then(Value::as_i64)
3871 .unwrap_or(0);
3872 let usage_known = trace_row
3873 .and_then(|row| row.get("usage_known"))
3874 .and_then(Value::as_i64)
3875 .unwrap_or(0);
3876 let usage_some = trace_row
3877 .and_then(|row| row.get("usage_some"))
3878 .and_then(Value::as_i64)
3879 .unwrap_or(0);
3880 let succeeded = trace_row
3881 .and_then(|row| row.get("succeeded"))
3882 .and_then(Value::as_i64)
3883 .unwrap_or(0);
3884 let outcome_known = trace_row
3885 .and_then(|row| row.get("outcome_known"))
3886 .and_then(Value::as_i64)
3887 .unwrap_or(0);
3888 let usage_rows = self.storage.query_chunks_params(
3889 "SELECT recall_snapshot, used_ids FROM episodic_log
3890 WHERE task_state='completed'
3891 AND usage_state!='unknown' AND used_complete=1
3892 AND recall_snapshot IS NOT NULL AND used_ids IS NOT NULL
3893 AND ts >= ?",
3894 rusqlite::params![metric_window_start],
3895 )?;
3896 let mut selected_total = 0_i64;
3897 let mut selected_used = 0_i64;
3898 for row in usage_rows {
3899 let selected: HashSet<String> = row
3900 .get("recall_snapshot")
3901 .and_then(Value::as_str)
3902 .and_then(|raw| serde_json::from_str::<Value>(raw).ok())
3903 .and_then(|snapshot| snapshot.get("selected").cloned())
3904 .and_then(|value| serde_json::from_value::<Vec<String>>(value).ok())
3905 .unwrap_or_default()
3906 .into_iter()
3907 .collect();
3908 let used: HashSet<String> = row
3909 .get("used_ids")
3910 .and_then(Value::as_str)
3911 .and_then(|raw| serde_json::from_str::<Vec<String>>(raw).ok())
3912 .unwrap_or_default()
3913 .into_iter()
3914 .collect();
3915 selected_total += selected.len() as i64;
3916 selected_used += selected.intersection(&used).count() as i64;
3917 }
3918 let feedback_count = count_query_params(
3919 &self.storage,
3920 "SELECT COUNT(*) FROM feedback_events WHERE ts >= ?",
3921 rusqlite::params![metric_window_start],
3922 )?;
3923 let feedback_traces = count_query_params(
3924 &self.storage,
3925 "SELECT COUNT(DISTINCT f.trace_id)
3926 FROM feedback_events f
3927 JOIN episodic_log e ON e.trace_id=f.trace_id
3928 WHERE f.ts >= ? AND e.ts >= ? AND e.task_state='completed'",
3929 rusqlite::params![metric_window_start, metric_window_start],
3930 )?;
3931 let pending_evolve = count_query(
3932 &self.storage,
3933 "SELECT COUNT(*) FROM evolve_requests WHERE state IN ('pending','running')",
3934 )?;
3935 let governance_pending = count_query(
3936 &self.storage,
3937 "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
3938 )?;
3939 let failed_evolve = count_query_params(
3940 &self.storage,
3941 "SELECT COUNT(*) FROM evolve_requests
3942 WHERE last_failed_at >= ?",
3943 rusqlite::params![metric_window_start],
3944 )?;
3945 let failed_distill = count_query_params(
3946 &self.storage,
3947 "SELECT COUNT(*) FROM episodic_log
3948 WHERE distill_last_failed_at >= ?",
3949 rusqlite::params![metric_window_start],
3950 )?;
3951 let confidence_buckets = self.storage.query_chunks(
3952 &format!("SELECT
3953 SUM(CASE WHEN confidence < 0.25 THEN 1 ELSE 0 END) AS low,
3954 SUM(CASE WHEN confidence >= 0.25 AND confidence < {0} THEN 1 ELSE 0 END) AS medium,
3955 SUM(CASE WHEN confidence >= {0} THEN 1 ELSE 0 END) AS high
3956 FROM chunks WHERE origin!='spark' AND state!='archived'",
3957 self.promote_confidence_min),
3958 )?;
3959 let confidence_row = confidence_buckets.first();
3960
3961 let pending_oldest_ts = self.storage.query_chunks(
3963 "SELECT MIN(created_at) AS oldest FROM chunks WHERE state='pending' AND origin!='spark'",
3964 )?.into_iter().next()
3965 .and_then(|r| r.get("oldest").cloned())
3966 .and_then(|v| if v.is_null() { None } else { Some(v) });
3967
3968 let zombie_cutoff = days_ago(&utc_now_iso(), 14);
3973 let zombie: i64 = count_query_params(
3974 &self.storage,
3975 "SELECT COUNT(*) FROM chunks
3976 WHERE origin!='spark' AND state='active'
3977 AND confidence >= 0.4 AND confidence <= 0.6
3978 AND last_used_at IS NOT NULL
3979 AND created_at < ?",
3980 rusqlite::params![zombie_cutoff],
3981 )?;
3982 let debt_numerator = pending + zombie;
3983 let debt_denominator = active.max(1);
3984 let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
3985
3986 let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
3988 let stale_screening: i64 = count_query_params(
3989 &self.storage,
3990 "SELECT COUNT(*) FROM episodic_log
3991 WHERE distill_state='screening' AND distill_locked_at < ?",
3992 rusqlite::params![screening_cutoff],
3993 )?;
3994
3995 let distill_period_start = self.distill_token_period_start(&utc_now_iso())?;
3997 let distill_cost = self.storage.query_chunks_params(
3998 "SELECT COALESCE(SUM(prompt_tokens),0) AS pt,
3999 COALESCE(SUM(completion_tokens),0) AS ct
4000 FROM distill_token_usage
4001 WHERE accounted_at >= ?",
4002 rusqlite::params![distill_period_start],
4003 )?;
4004 let prompt_tokens = distill_cost
4005 .first()
4006 .and_then(|r| r.get("pt"))
4007 .and_then(Value::as_i64)
4008 .unwrap_or(0);
4009 let completion_tokens = distill_cost
4010 .first()
4011 .and_then(|r| r.get("ct"))
4012 .and_then(Value::as_i64)
4013 .unwrap_or(0);
4014
4015 let spark_threshold: i64 = self
4017 .storage
4018 .get_meta("curate.soft_mature_threshold")
4019 .ok()
4020 .flatten()
4021 .and_then(|v| v.parse::<i64>().ok())
4022 .unwrap_or(5);
4023 let recurring_sparks = self.storage.query_chunks_params(
4024 "SELECT ut.chunk_id, COUNT(*) AS cnt,
4025 c.content, c.trigger_desc, c.maturity
4026 FROM usage_trace ut
4027 JOIN chunks c ON c.id = ut.chunk_id
4028 WHERE ut.event='retrieved'
4029 AND c.origin='spark'
4030 GROUP BY ut.chunk_id HAVING cnt >= ?",
4031 rusqlite::params![spark_threshold],
4032 )?;
4033 let recurring_spark_ids: Vec<Value> = recurring_sparks
4034 .iter()
4035 .map(|r| {
4036 json!({
4037 "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
4038 "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
4039 "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
4040 "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
4041 .chars().take(80).collect::<String>(),
4042 })
4043 })
4044 .collect();
4045
4046 let mut suggestions: Vec<Value> = Vec::new();
4047 if embed_rebuild > 0 {
4048 suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
4049 }
4050 if new_logs > 0 {
4051 suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
4052 }
4053 if pending > 0 {
4054 suggestions.push(json!({"action": "innate approve <id> # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
4055 }
4056 if !recurring_spark_ids.is_empty() {
4057 suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
4058 }
4059 if stale_screening > 0 {
4060 suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
4061 }
4062 if governance_pending > 0 {
4063 suggestions.push(json!({
4064 "action": "review governance_proposals",
4065 "reason": format!("{governance_pending} chunk(s) have repeated negative feedback")
4066 }));
4067 }
4068
4069 Ok(json!({
4070 "schema_version": schema_version,
4071 "lib_id": lib_id,
4072 "last_agg_ts": last_agg,
4073 "chunks": {
4074 "total": total, "active": active, "pending": pending, "archived": archived,
4075 "pending_oldest_ts": pending_oldest_ts,
4076 },
4077 "sparks": sparks,
4078 "episodic_log": {"open": open_logs, "new": new_logs},
4079 "embed_rebuild_queue": embed_rebuild,
4080 "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
4081 "stale_screening_count": stale_screening,
4082 "feedback_loop": {
4083 "trace_completion_rate": ratio(trace_completed, trace_total),
4084 "usage_annotation_rate": ratio(usage_known, trace_completed),
4085 "trace_use_rate": ratio(usage_some, usage_known),
4086 "selected_to_used_rate": ratio(selected_used, selected_total),
4087 "task_success_rate": ratio(succeeded, outcome_known),
4088 "feedback_coverage": ratio(feedback_traces, trace_completed),
4089 "feedback_events": feedback_count,
4090 "timed_out_traces": trace_timed_out,
4091 "pending_evolve_requests": pending_evolve,
4092 "failed_evolve_requests_30d": failed_evolve,
4093 "failed_distill_logs_30d": failed_distill,
4094 "pending_governance_proposals": governance_pending,
4095 "window_days": 30,
4096 "confidence_distribution": {
4097 "low": confidence_row.and_then(|row| row.get("low")).and_then(Value::as_i64).unwrap_or(0),
4098 "medium": confidence_row.and_then(|row| row.get("medium")).and_then(Value::as_i64).unwrap_or(0),
4099 "high": confidence_row.and_then(|row| row.get("high")).and_then(Value::as_i64).unwrap_or(0),
4100 }
4101 },
4102 "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
4103 "recurring_sparks": recurring_sparks.len(),
4104 "recurring_spark_ids": recurring_spark_ids,
4105 "params": {
4106 "recall.w_content": self.w_content,
4107 "recall.w_trigger": self.w_trigger,
4108 "recall.w_context": self.w_context,
4109 "recall.top_k_candidates": self.top_k_candidates,
4110 "curate.low_conf_threshold": self.low_conf_threshold,
4111 "curate.low_conf_idle_days": self.low_conf_idle_days,
4112 "curate.repeat_select_min": self.repeat_select_min,
4113 "curate.never_used_age_days": self.never_used_age_days,
4114 "curate.promote_used_success_min": self.promote_used_success_min,
4115 "curate.promote_confidence_min": self.promote_confidence_min,
4116 "curate.screening_timeout_minutes": self.screening_timeout_minutes,
4117 "curate.open_ttl_days": self.open_ttl_days,
4118 "evolve.schedule_interval_hours": self.evolve_schedule_interval_hours,
4119 },
4120 "suggestions": suggestions
4121 }))
4122 }
4123
4124 pub fn rebuild_embeddings(&self) -> Result<usize> {
4129 let meta_version = self
4130 .storage
4131 .get_meta("embed_version")?
4132 .and_then(|v| v.parse::<i64>().ok())
4133 .unwrap_or(1);
4134 let stale = self.storage.query_chunks_params(
4136 "SELECT id, content, trigger_desc, state_reason FROM chunks
4137 WHERE embed_version = 0 OR embed_version < ?",
4138 rusqlite::params![meta_version],
4139 )?;
4140 let mut count = 0;
4141 for row in &stale {
4142 let id = match row.get("id").and_then(Value::as_str) {
4143 Some(v) => v,
4144 None => continue,
4145 };
4146 let content = row.get("content").and_then(Value::as_str).unwrap_or("");
4147 let trigger = row
4148 .get("trigger_desc")
4149 .and_then(Value::as_str)
4150 .unwrap_or(content);
4151 let state_reason = row
4152 .get("state_reason")
4153 .and_then(Value::as_str)
4154 .unwrap_or("");
4155
4156 let cvec = match self.embedding.embed_content(content) {
4157 Ok(v) => v,
4158 Err(_) => continue,
4159 };
4160 let tvec = match self.embedding.embed_trigger(trigger) {
4161 Ok(v) => v,
4162 Err(_) => continue,
4163 };
4164
4165 self.storage.begin_immediate()?;
4166 let r = (|| -> Result<()> {
4167 self.storage
4168 .insert_vec_content(id, &pack_embedding(&cvec))?;
4169 self.storage
4170 .insert_vec_trigger(id, &pack_embedding(&tvec))?;
4171 let new_reason = if state_reason.starts_with("embedding_pending:target=") {
4173 let target_state = state_reason.trim_start_matches("embedding_pending:target=");
4174 let now = utc_now_iso();
4175 self.storage.update_chunk_state(
4176 id,
4177 target_state,
4178 Some("embedding_rebuilt"),
4179 &now,
4180 )?;
4181 "embedding_rebuilt".to_string()
4182 } else {
4183 "embedding_rebuilt".to_string()
4184 };
4185 let now = utc_now_iso();
4186 self.storage.conn_execute(
4187 "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
4188 rusqlite::params![meta_version, new_reason, now, id],
4189 )?;
4190 self.storage.commit()
4191 })();
4192 if r.is_err() {
4193 let _ = self.storage.rollback();
4194 } else {
4195 count += 1;
4196 }
4197 }
4198 Ok(count)
4199 }
4200
4201 pub fn inspect_id(&self, id: &str) -> Result<Value> {
4206 if let Some(chunk) = self.storage.get_chunk(id)? {
4208 let traces = self.storage.query_chunks_params(
4209 "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
4210 rusqlite::params![id],
4211 )?;
4212 let derived = self.storage.query_chunks_params(
4213 "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
4214 SELECT id FROM episodic_log WHERE trace_id IN (
4215 SELECT trace_id FROM usage_trace WHERE chunk_id=?
4216 )
4217 ) LIMIT 10",
4218 rusqlite::params![id],
4219 )?;
4220 return Ok(json!({
4221 "kind": "chunk",
4222 "chunk": chunk,
4223 "recent_traces": traces,
4224 "derived_chunks": derived,
4225 }));
4226 }
4227 if let Some(log) = self.storage.get_episodic_log(id)? {
4229 let traces = self.storage.query_chunks_params(
4230 "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
4231 rusqlite::params![id],
4232 )?;
4233 return Ok(json!({
4234 "kind": "trace",
4235 "episodic_log": log,
4236 "usage_traces": traces,
4237 }));
4238 }
4239 Err(InnateError::ChunkNotFound(id.to_string()))
4240 }
4241
4242 fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
4247 self.sanitizer.sanitize(content)
4248 }
4249}
4250
4251struct CandidateInfo {
4256 chunk: Value,
4257 sim_content: f32,
4258 sim_trigger: f32,
4259}
4260
4261fn chunk_is_valid_for_recall(chunk: &Value, embed_version: i64) -> bool {
4262 chunk.get("state").and_then(Value::as_str) != Some("archived")
4263 && chunk.get("origin").and_then(Value::as_str) != Some("spark")
4264 && chunk
4265 .get("embed_version")
4266 .and_then(Value::as_i64)
4267 .unwrap_or(1)
4268 >= embed_version
4269}
4270
4271fn normalize_query(query: &str) -> String {
4280 const STOP_WORDS: &[&str] = &[
4281 "a", "an", "and", "for", "in", "of", "on", "the", "to", "with",
4282 ];
4283 let cleaned: String = query
4284 .to_lowercase()
4285 .chars()
4286 .map(|ch| {
4287 if ch.is_alphanumeric() || ch.is_whitespace() {
4288 ch
4289 } else {
4290 ' '
4291 }
4292 })
4293 .collect();
4294 let mut tokens: Vec<&str> = cleaned
4295 .split_whitespace()
4296 .filter(|token| !STOP_WORDS.contains(token))
4297 .collect();
4298 tokens.sort_unstable();
4299 tokens.dedup();
4300 tokens.join(" ")
4301}
4302
4303fn estimate_distill_prompt_tokens(log: &Value, related_logs: &[Value]) -> i64 {
4304 let primary: i64 = [
4305 "query",
4306 "recall_snapshot",
4307 "output",
4308 "output_summary",
4309 "nomination",
4310 ]
4311 .iter()
4312 .filter_map(|key| log.get(*key).and_then(Value::as_str))
4313 .map(|text| estimate_tokens(text) as i64)
4314 .sum();
4315 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
4316 let context_key = log.get("context_key").and_then(Value::as_str);
4317 let related: i64 = related_logs
4318 .iter()
4319 .filter(|other| other.get("id").and_then(Value::as_str).unwrap_or("") != log_id)
4320 .filter(|other| {
4321 context_key.is_some()
4322 && other.get("context_key").and_then(Value::as_str) == context_key
4323 })
4324 .take(4)
4325 .flat_map(|other| {
4326 ["query", "output_summary", "outcome"]
4327 .into_iter()
4328 .filter_map(|key| other.get(key).and_then(Value::as_str))
4329 })
4330 .map(|text| estimate_tokens(text) as i64)
4331 .sum();
4332 primary + related
4333}
4334
4335fn estimate_distilled_chunk_tokens(chunk: &DistilledChunk) -> i64 {
4336 estimate_tokens(&chunk.content) as i64
4337 + chunk
4338 .trigger_desc
4339 .as_deref()
4340 .map(estimate_tokens)
4341 .unwrap_or(0) as i64
4342 + chunk
4343 .anti_trigger_desc
4344 .as_deref()
4345 .map(estimate_tokens)
4346 .unwrap_or(0) as i64
4347}
4348
4349fn anti_trigger_hit(query: &str, anti: &str) -> bool {
4350 let q_lower = query.to_lowercase();
4351 anti.to_lowercase().split(',').any(|part| {
4352 let p = part.trim();
4353 !p.is_empty() && q_lower.contains(p)
4354 })
4355}
4356
4357fn block_cost(block: &[Value]) -> usize {
4358 block
4359 .iter()
4360 .map(|b| {
4361 b.get("token_count")
4362 .and_then(Value::as_u64)
4363 .map(|t| t as usize)
4364 .unwrap_or_else(|| {
4365 estimate_tokens(b.get("content").and_then(Value::as_str).unwrap_or("")).max(100)
4366 })
4367 })
4368 .sum()
4369}
4370
4371fn limit_knowledge(knowledge: Vec<Value>, top: Option<usize>) -> Vec<Value> {
4372 match top {
4373 None => knowledge,
4374 Some(0) => vec![],
4375 Some(n) => knowledge.into_iter().take(n).collect(),
4376 }
4377}
4378
4379fn usage_state(used: Option<&[String]>) -> &'static str {
4380 match used {
4381 None => "unknown",
4382 Some([]) => "known_none",
4383 Some(_) => "known_some",
4384 }
4385}
4386
4387fn ratio(numerator: i64, denominator: i64) -> f64 {
4388 if denominator <= 0 {
4389 0.0
4390 } else {
4391 ((numerator as f64 / denominator as f64) * 1000.0).round() / 1000.0
4392 }
4393}
4394
4395fn validate_source(source: &str) -> Result<()> {
4396 if !matches!(
4397 source,
4398 "mcp" | "sdk" | "cli" | "hook" | "daemon" | "augmented"
4399 ) {
4400 return Err(InnateError::InvalidState(format!(
4401 "invalid event source: {source}"
4402 )));
4403 }
4404 Ok(())
4405}
4406
4407fn count_query(storage: &Storage, sql: &str) -> Result<i64> {
4408 Ok(storage
4409 .query_chunks(sql)?
4410 .first()
4411 .and_then(|r| r.as_object())
4412 .and_then(|m| m.values().next())
4413 .and_then(Value::as_i64)
4414 .unwrap_or(0))
4415}
4416
4417fn count_query_params<P: rusqlite::Params>(storage: &Storage, sql: &str, p: P) -> Result<i64> {
4418 Ok(storage
4419 .query_chunks_params(sql, p)?
4420 .first()
4421 .and_then(|r| r.as_object())
4422 .and_then(|m| m.values().next())
4423 .and_then(Value::as_i64)
4424 .unwrap_or(0))
4425}
4426
4427fn days_ago(now_iso: &str, days: i64) -> String {
4428 use chrono::{DateTime, Duration, Utc};
4429 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4430 let cutoff = t - Duration::days(days);
4431 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4432 }
4433 now_iso.to_string()
4434}
4435
4436fn minutes_ago(now_iso: &str, minutes: i64) -> String {
4437 use chrono::{DateTime, Duration, Utc};
4438 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4439 let cutoff = t - Duration::minutes(minutes);
4440 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4441 }
4442 now_iso.to_string()
4443}
4444
4445fn hours_ago(now_iso: &str, hours: i64) -> String {
4446 use chrono::{DateTime, Duration, Utc};
4447 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4448 let cutoff = t - Duration::hours(hours);
4449 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4450 }
4451 now_iso.to_string()
4452}
4453
4454fn minutes_after(now_iso: &str, minutes: i64) -> String {
4455 use chrono::{DateTime, Duration, Utc};
4456 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4457 let cutoff = t + Duration::minutes(minutes);
4458 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4459 }
4460 now_iso.to_string()
4461}
4462
4463fn hours_after(now_iso: &str, hours: i64) -> String {
4464 use chrono::{DateTime, Duration, Utc};
4465 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4466 let cutoff = t + Duration::hours(hours);
4467 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4468 }
4469 now_iso.to_string()
4470}
4471
4472fn iso_days_diff(now_iso: &str, past_iso: &str) -> i64 {
4474 use chrono::{DateTime, Utc};
4475 let parse = |s: &str| s.parse::<DateTime<Utc>>().ok();
4476 if let (Some(a), Some(b)) = (parse(now_iso), parse(past_iso)) {
4477 let diff = a - b;
4478 diff.num_days().max(0)
4479 } else {
4480 0
4481 }
4482}
4483
4484fn detect_cycles(deps: &[Value]) -> Vec<Vec<String>> {
4486 use std::collections::HashMap;
4487 let mut adj: HashMap<String, Vec<String>> = HashMap::new();
4488 for d in deps {
4489 let src = d
4490 .get("src")
4491 .and_then(Value::as_str)
4492 .unwrap_or("")
4493 .to_string();
4494 let dst = d
4495 .get("dst")
4496 .and_then(Value::as_str)
4497 .unwrap_or("")
4498 .to_string();
4499 if !src.is_empty() && !dst.is_empty() {
4500 adj.entry(src).or_default().push(dst);
4501 }
4502 }
4503 let nodes: Vec<String> = adj.keys().cloned().collect();
4504 let mut visited: HashSet<String> = HashSet::new();
4505 let mut on_stack: HashSet<String> = HashSet::new();
4506 let mut cycles: Vec<Vec<String>> = vec![];
4507
4508 fn dfs(
4509 node: &str,
4510 adj: &HashMap<String, Vec<String>>,
4511 visited: &mut HashSet<String>,
4512 on_stack: &mut HashSet<String>,
4513 path: &mut Vec<String>,
4514 cycles: &mut Vec<Vec<String>>,
4515 ) {
4516 if on_stack.contains(node) {
4517 let start = path.iter().position(|n| n == node).unwrap_or(0);
4519 cycles.push(path[start..].to_vec());
4520 return;
4521 }
4522 if visited.contains(node) {
4523 return;
4524 }
4525 visited.insert(node.to_string());
4526 on_stack.insert(node.to_string());
4527 path.push(node.to_string());
4528 if let Some(children) = adj.get(node) {
4529 for child in children {
4530 dfs(child, adj, visited, on_stack, path, cycles);
4531 }
4532 }
4533 path.pop();
4534 on_stack.remove(node);
4535 }
4536
4537 for node in nodes {
4538 let mut path = vec![];
4539 dfs(
4540 &node,
4541 &adj,
4542 &mut visited,
4543 &mut on_stack,
4544 &mut path,
4545 &mut cycles,
4546 );
4547 }
4548 cycles
4549}