1type PackResult = (
5 Vec<Value>,
6 Vec<(Vec<Value>, f64, usize)>,
7 std::collections::HashMap<String, String>,
8);
9
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::embedding::{DummyEmbeddingProvider, EmbeddingProvider};
17use crate::errors::{InnateError, Result};
18use crate::refine::{
19 DefaultSanitizer, DistilledChunk, Distiller, HeuristicDistiller, NullRefiner, Refiner,
20 Sanitizer,
21};
22use crate::storage::{ChunkRow, EpisodicLogRow, Storage};
23use crate::utils::{
24 content_hash, estimate_tokens, gen_uuid, pack_embedding, utc_now_iso, SanitizeAction,
25};
26
27const W_CONTENT: f64 = 0.55;
32const W_TRIGGER: f64 = 0.25;
33const W_CONFIDENCE: f64 = 0.10;
34const W_CONTEXT: f64 = 0.15;
35const TOP_K_CANDIDATES: usize = 20;
36const ANTI_TRIGGER_PENALTY: f64 = 0.6;
37const DENSITY_REFILL: bool = true;
38
39const LOW_CONF_THRESHOLD: f64 = 0.25;
40const LOW_CONF_IDLE_DAYS: i64 = 60;
41const REPEAT_SELECT_MIN: i64 = 10;
42const REPEAT_SELECT_CONF_MAX: f64 = 0.5;
43const NEVER_USED_AGE_DAYS: i64 = 30;
44const OPEN_TTL_DAYS: i64 = 14;
45const SCREENING_TIMEOUT_MINUTES: i64 = 30;
46const PROMOTE_USED_SUCCESS_MIN: i64 = 3;
47const PROMOTE_CONFIDENCE_MIN: f64 = 0.60;
48const DECAY_FLOOR: f64 = 0.20;
49const EVOLVE_THRESHOLD: i64 = 5;
50const DISTILL_BATCH_SIZE: usize = 20;
51const PENDING_RECALL_PENALTY: f64 = 0.60;
52const GOVERNANCE_ARCHIVE_THRESHOLD: i64 = 3;
53const NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD: i64 = 5;
54const GOVERNANCE_EVOLVE_THRESHOLD: i64 = 3;
55const FAILURE_MIN_USES: i64 = 5;
56const FAILURE_MAX_SUCCESS_RATE: f64 = 0.20;
57const FAILURE_CONFIDENCE_MAX: f64 = 0.35;
58
59#[derive(Debug, Default, Clone)]
64pub struct RecallResult {
65 pub knowledge: Vec<Value>,
66 pub sparks: Vec<Value>,
67 pub trace_id: String,
68 pub empty: bool,
69 pub depth_skipped: Vec<String>,
70 pub skipped_reasons: HashMap<String, String>,
71}
72
73#[derive(Debug, Default)]
74pub struct CurateReport {
75 pub archived: Vec<String>,
76 pub deduped: Vec<String>,
77 pub decayed: Vec<String>,
78 pub cycles: Vec<Vec<String>>,
79 pub orphans: Vec<String>,
80 pub recovered: Vec<String>,
81 pub warnings: Vec<String>,
82 pub stats: HashMap<String, Value>,
83}
84
85#[derive(Debug, Default, Clone)]
87pub struct CurateScope {
88 pub origin: Option<String>,
90 pub skill_name: Option<String>,
92 pub dry_run: bool,
94}
95
96pub trait Curator: Send + Sync {
99 fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport>;
100}
101
102pub struct BuiltinCurator;
104
105impl Curator for BuiltinCurator {
106 fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport> {
107 kb.builtin_curate_impl(scope)
108 }
109}
110
111pub struct KnowledgeBase {
116 pub storage: Storage,
117 embedding: Arc<dyn EmbeddingProvider>,
118 refiner: Arc<dyn Refiner>,
119 distiller: Arc<dyn Distiller>,
120 curator: Arc<dyn Curator>,
121 sanitizer: Arc<dyn Sanitizer>,
122
123 w_content: f64,
125 w_trigger: f64,
126 w_confidence: f64,
127 w_context: f64,
128 top_k_candidates: usize,
129 anti_trigger_penalty: f64,
130 density_refill: bool,
131
132 low_conf_threshold: f64,
133 low_conf_idle_days: i64,
134 repeat_select_min: i64,
135 repeat_select_conf_max: f64,
136 never_used_age_days: i64,
137 open_ttl_days: i64,
138 screening_timeout_minutes: i64,
139 promote_used_success_min: i64,
140 promote_confidence_min: f64,
141 decay_floor: f64,
142 evolve_threshold: i64,
143 distill_batch_size: usize,
144 evolve_schedule_interval_hours: i64,
145 governance_archive_threshold: i64,
146 negative_feedback_archive_threshold: i64,
147 governance_evolve_threshold: i64,
148 governance_proposal_max_age_days: i64,
149 failure_min_uses: i64,
150 failure_max_success_rate: f64,
151 failure_confidence_max: f64,
152}
153
154impl KnowledgeBase {
155 pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
156 Self::open_with(db_path, None, None, None, None, None)
157 }
158
159 pub fn open_with(
160 db_path: impl AsRef<Path>,
161 embedding: Option<Arc<dyn EmbeddingProvider>>,
162 refiner: Option<Arc<dyn Refiner>>,
163 distiller: Option<Arc<dyn Distiller>>,
164 curator: Option<Arc<dyn Curator>>,
165 sanitizer: Option<Arc<dyn Sanitizer>>,
166 ) -> Result<Self> {
167 let embedding = embedding.unwrap_or_else(|| Arc::new(DummyEmbeddingProvider::default()));
168 let refiner = refiner.unwrap_or_else(|| Arc::new(NullRefiner));
169 let distiller = distiller.unwrap_or_else(|| Arc::new(HeuristicDistiller));
170 let curator = curator.unwrap_or_else(|| Arc::new(BuiltinCurator));
171 let sanitizer = sanitizer.unwrap_or_else(|| Arc::new(DefaultSanitizer));
172
173 let storage = Storage::open(db_path, embedding.content_dim(), embedding.trigger_dim())?;
174
175 let mut kb = Self {
176 storage,
177 embedding,
178 refiner,
179 distiller,
180 curator,
181 sanitizer,
182 w_content: W_CONTENT,
183 w_trigger: W_TRIGGER,
184 w_confidence: W_CONFIDENCE,
185 w_context: W_CONTEXT,
186 top_k_candidates: TOP_K_CANDIDATES,
187 anti_trigger_penalty: ANTI_TRIGGER_PENALTY,
188 density_refill: DENSITY_REFILL,
189 low_conf_threshold: LOW_CONF_THRESHOLD,
190 low_conf_idle_days: LOW_CONF_IDLE_DAYS,
191 repeat_select_min: REPEAT_SELECT_MIN,
192 repeat_select_conf_max: REPEAT_SELECT_CONF_MAX,
193 never_used_age_days: NEVER_USED_AGE_DAYS,
194 open_ttl_days: OPEN_TTL_DAYS,
195 screening_timeout_minutes: SCREENING_TIMEOUT_MINUTES,
196 promote_used_success_min: PROMOTE_USED_SUCCESS_MIN,
197 promote_confidence_min: PROMOTE_CONFIDENCE_MIN,
198 decay_floor: DECAY_FLOOR,
199 evolve_threshold: EVOLVE_THRESHOLD,
200 distill_batch_size: DISTILL_BATCH_SIZE,
201 evolve_schedule_interval_hours: 6,
202 governance_archive_threshold: GOVERNANCE_ARCHIVE_THRESHOLD,
203 negative_feedback_archive_threshold: NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
204 governance_evolve_threshold: GOVERNANCE_EVOLVE_THRESHOLD,
205 governance_proposal_max_age_days: 30,
206 failure_min_uses: FAILURE_MIN_USES,
207 failure_max_success_rate: FAILURE_MAX_SUCCESS_RATE,
208 failure_confidence_max: FAILURE_CONFIDENCE_MAX,
209 };
210 kb.init_meta()?;
211 kb.load_params()?;
212 Ok(kb)
213 }
214
215 fn init_meta(&self) -> Result<()> {
216 let lib_id = gen_uuid();
217 let content_dim = self.embedding.content_dim().to_string();
218 let trigger_dim = self.embedding.trigger_dim().to_string();
219 let embed_model = self.embedding.model_name();
220
221 for (key, expected) in [
222 ("content_dim", self.embedding.content_dim()),
223 ("trigger_dim", self.embedding.trigger_dim()),
224 ] {
225 if let Some(stored) = self.storage.get_meta(key)? {
226 let actual = stored.parse::<usize>().map_err(|_| {
227 InnateError::Other(format!("invalid {key} metadata value: {stored}"))
228 })?;
229 if actual != expected {
230 return Err(InnateError::Other(format!(
231 "{key} mismatch: database uses {actual}, embedding provider uses {expected}"
232 )));
233 }
234 }
235 }
236
237 let defaults: &[(&str, &str)] = &[
238 ("lib_id", &lib_id),
239 ("lib_role", "personal"),
240 ("schema_version", "4.13"),
241 ("content_dim", &content_dim),
242 ("trigger_dim", &trigger_dim),
243 ("embed_model", embed_model),
244 ("embed_version", "1"),
245 ("vector_revision", "0"),
246 ("last_agg_ts", "1970-01-01T00:00:00.000Z"),
247 ("recall.w_content", "0.55"),
248 ("recall.w_trigger", "0.25"),
249 ("recall.w_confidence", "0.10"),
250 ("recall.w_context", "0.15"),
251 ("recall.top_k_candidates", "20"),
252 ("recall.anti_trigger_penalty", "0.6"),
253 ("recall.density_refill", "true"),
254 ("curate.low_conf_threshold", "0.25"),
255 ("curate.low_conf_idle_days", "60"),
256 ("curate.repeat_select_min", "10"),
257 ("curate.repeat_select_conf_max", "0.5"),
258 ("curate.never_used_age_days", "30"),
259 ("curate.open_ttl_days", "14"),
260 ("curate.screening_timeout_minutes", "30"),
261 ("curate.promote_used_success_min", "3"),
262 ("curate.promote_confidence_min", "0.60"),
263 ("curate.decay_floor", "0.20"),
264 ("evolve.threshold_new_count", "5"),
265 ("evolve.distill_batch_size", "20"),
266 ("evolve.schedule_interval_hours", "6"),
267 ("curate.soft_mature_threshold", "5"),
268 ("evolve.distill_token_window_hours", "24"),
269 ("curate.governance_archive_threshold", "3"),
270 ("curate.negative_feedback_archive_threshold", "5"),
271 ("evolve.governance_pending_threshold", "3"),
272 ("curate.governance_proposal_max_age_days", "30"),
273 ("curate.failure_min_uses", "5"),
274 ("curate.failure_max_success_rate", "0.20"),
275 ("curate.failure_confidence_max", "0.35"),
276 ];
277 self.storage.begin_immediate()?;
278 let result = (|| -> Result<()> {
279 for (k, v) in defaults {
280 if self.storage.get_meta(k)?.is_none() {
281 self.storage.set_meta(k, v)?;
282 }
283 }
284 self.storage.commit()
285 })();
286 if result.is_err() {
287 let _ = self.storage.rollback();
288 }
289 result
290 }
291
292 fn load_params(&mut self) -> Result<()> {
293 let f = |k: &str, d: f64| -> f64 {
294 self.storage
295 .get_meta(k)
296 .ok()
297 .flatten()
298 .and_then(|v| v.parse().ok())
299 .unwrap_or(d)
300 };
301 let i = |k: &str, d: i64| -> i64 {
302 self.storage
303 .get_meta(k)
304 .ok()
305 .flatten()
306 .and_then(|v| v.parse().ok())
307 .unwrap_or(d)
308 };
309 let b = |k: &str, d: bool| -> bool {
310 self.storage
311 .get_meta(k)
312 .ok()
313 .flatten()
314 .map(|v| v.to_lowercase() == "true")
315 .unwrap_or(d)
316 };
317 self.w_content = f("recall.w_content", W_CONTENT);
318 self.w_trigger = f("recall.w_trigger", W_TRIGGER);
319 self.w_confidence = f("recall.w_confidence", W_CONFIDENCE);
320 self.w_context = f("recall.w_context", W_CONTEXT);
321 self.top_k_candidates =
322 i("recall.top_k_candidates", TOP_K_CANDIDATES as i64).max(1) as usize;
323 self.anti_trigger_penalty = f("recall.anti_trigger_penalty", ANTI_TRIGGER_PENALTY);
324 self.density_refill = b("recall.density_refill", DENSITY_REFILL);
325 self.low_conf_threshold = f("curate.low_conf_threshold", LOW_CONF_THRESHOLD);
326 self.low_conf_idle_days = i("curate.low_conf_idle_days", LOW_CONF_IDLE_DAYS);
327 self.repeat_select_min = i("curate.repeat_select_min", REPEAT_SELECT_MIN);
328 self.repeat_select_conf_max = f("curate.repeat_select_conf_max", REPEAT_SELECT_CONF_MAX);
329 self.never_used_age_days = i("curate.never_used_age_days", NEVER_USED_AGE_DAYS);
330 self.open_ttl_days = i("curate.open_ttl_days", OPEN_TTL_DAYS);
331 self.screening_timeout_minutes = i(
332 "curate.screening_timeout_minutes",
333 SCREENING_TIMEOUT_MINUTES,
334 );
335 self.promote_used_success_min =
336 i("curate.promote_used_success_min", PROMOTE_USED_SUCCESS_MIN);
337 self.promote_confidence_min = f("curate.promote_confidence_min", PROMOTE_CONFIDENCE_MIN);
338 self.decay_floor = f("curate.decay_floor", DECAY_FLOOR).clamp(0.0, 0.4);
339 self.evolve_threshold = i("evolve.threshold_new_count", EVOLVE_THRESHOLD);
340 self.distill_batch_size =
341 i("evolve.distill_batch_size", DISTILL_BATCH_SIZE as i64) as usize;
342 self.evolve_schedule_interval_hours = i("evolve.schedule_interval_hours", 6).max(1);
343 self.governance_archive_threshold =
344 i("curate.governance_archive_threshold", GOVERNANCE_ARCHIVE_THRESHOLD).max(1);
345 self.negative_feedback_archive_threshold = i(
346 "curate.negative_feedback_archive_threshold",
347 NEGATIVE_FEEDBACK_ARCHIVE_THRESHOLD,
348 )
349 .max(1);
350 self.governance_evolve_threshold =
351 i("evolve.governance_pending_threshold", GOVERNANCE_EVOLVE_THRESHOLD).max(1);
352 self.governance_proposal_max_age_days =
353 i("curate.governance_proposal_max_age_days", 30).max(1);
354 self.failure_min_uses = i("curate.failure_min_uses", FAILURE_MIN_USES).max(1);
355 self.failure_max_success_rate = f(
356 "curate.failure_max_success_rate",
357 FAILURE_MAX_SUCCESS_RATE,
358 )
359 .clamp(0.0, 1.0);
360 self.failure_confidence_max =
361 f("curate.failure_confidence_max", FAILURE_CONFIDENCE_MAX).clamp(0.0, 1.0);
362 Ok(())
363 }
364
365 #[allow(clippy::too_many_arguments)]
370 pub fn recall(
371 &self,
372 query: &str,
373 budget: usize,
374 trace: bool,
375 include_sparks: bool,
376 top: Option<usize>,
377 source: &str,
378 expand_deps: &str, allow_trim: bool, refine_mode: &str, ) -> Result<RecallResult> {
382 validate_source(source)?;
383 let trace_id = gen_uuid();
384 let now = utc_now_iso();
385
386 let q_content = self
387 .embedding
388 .embed_content(query)
389 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
390 let q_trigger = self
391 .embedding
392 .embed_trigger(query)
393 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
394
395 let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
397 self.apply_soft_dep_bonus(&mut candidates)?;
398
399 let scored = self.score_candidates(candidates, query)?;
401
402 let (selected, skipped, skipped_reasons) =
404 self.pack(&scored, budget, expand_deps, allow_trim, query)?;
405
406 let depth_skipped: Vec<String> = skipped_reasons
407 .iter()
408 .filter(|(_, r)| r.as_str() == "dep_depth_limit")
409 .map(|(id, _)| id.clone())
410 .collect();
411
412 let mut selected = selected;
414 if self.density_refill {
415 selected = self.density_refill(selected, &skipped, budget);
416 }
417
418 let limited = limit_knowledge(selected, top);
419 let visible = if refine_mode == "adapt" {
420 self.refiner
421 .refine(limited.clone(), Some(budget))
422 .unwrap_or(limited)
423 } else {
424 limited
425 };
426
427 let sparks = if include_sparks {
429 self.recall_sparks(&q_content, &q_trigger)?
430 } else {
431 vec![]
432 };
433
434 if trace {
435 self.write_recall_trace(
436 &trace_id,
437 query,
438 &scored,
439 &visible,
440 &sparks,
441 &depth_skipped,
442 &skipped_reasons,
443 refine_mode,
444 source,
445 &now,
446 )?;
447 }
448
449 let empty = visible.is_empty() && sparks.is_empty();
450 Ok(RecallResult {
451 knowledge: visible,
452 sparks,
453 trace_id,
454 empty,
455 depth_skipped,
456 skipped_reasons,
457 })
458 }
459
460 fn ann_candidates(
461 &self,
462 q_content: &[f32],
463 q_trigger: &[f32],
464 ) -> Result<HashMap<String, CandidateInfo>> {
465 let embed_version = self
466 .storage
467 .get_meta("embed_version")?
468 .and_then(|v| v.parse::<i64>().ok())
469 .unwrap_or(1);
470
471 let content_res = self
472 .storage
473 .search_vec_content(q_content, self.top_k_candidates * 2)?;
474 let trigger_res = self
475 .storage
476 .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
477
478 let all_ids: Vec<&str> = {
480 let mut seen = HashSet::new();
481 content_res
482 .iter()
483 .chain(trigger_res.iter())
484 .map(|(id, _)| id.as_str())
485 .filter(|id| seen.insert(*id))
486 .collect()
487 };
488 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
489
490 let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
491
492 for (cid, sim) in &content_res {
493 if let Some(chunk) = chunks.get(cid) {
494 if chunk_is_valid_for_recall(chunk, embed_version) {
495 let e = candidates
496 .entry(cid.clone())
497 .or_insert_with(|| CandidateInfo {
498 chunk: chunk.clone(),
499 sim_content: 0.0,
500 sim_trigger: 0.0,
501 });
502 e.sim_content = e.sim_content.max(*sim);
503 }
504 }
505 }
506 for (cid, sim) in &trigger_res {
507 if let Some(chunk) = chunks.get(cid) {
508 if chunk_is_valid_for_recall(chunk, embed_version) {
509 let e = candidates
510 .entry(cid.clone())
511 .or_insert_with(|| CandidateInfo {
512 chunk: chunk.clone(),
513 sim_content: 0.0,
514 sim_trigger: 0.0,
515 });
516 e.sim_trigger = e.sim_trigger.max(*sim);
517 }
518 }
519 }
520 Ok(candidates)
521 }
522
523 fn apply_soft_dep_bonus(&self, candidates: &mut HashMap<String, CandidateInfo>) -> Result<()> {
524 let ids: Vec<String> = candidates.keys().cloned().collect();
525 for cid in ids {
526 if candidates[&cid].chunk.get("origin").and_then(Value::as_str) == Some("spark") {
527 continue;
528 }
529 let deps = self.storage.get_deps(&cid)?;
530 for (dst, kind, _) in &deps {
531 if kind != "soft" {
532 continue;
533 }
534 if let Some(target) = self.storage.get_chunk(dst)? {
535 if target.get("state").and_then(Value::as_str) == Some("archived") {
536 continue;
537 }
538 if target.get("origin").and_then(Value::as_str) == Some("spark") {
539 continue;
540 }
541 let e = candidates
542 .entry(dst.clone())
543 .or_insert_with(|| CandidateInfo {
544 chunk: target,
545 sim_content: 0.0,
546 sim_trigger: 0.0,
547 });
548 e.sim_content = (e.sim_content + 0.05).min(1.0);
549 }
550 }
551 }
552 Ok(())
553 }
554
555 fn score_candidates(
556 &self,
557 candidates: HashMap<String, CandidateInfo>,
558 query: &str,
559 ) -> Result<Vec<(f64, Value)>> {
560 let context_key = content_hash(&normalize_query(query));
561 let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
562 for info in candidates.into_values() {
563 let conf = info
564 .chunk
565 .get("confidence")
566 .and_then(Value::as_f64)
567 .unwrap_or(0.5);
568 let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
569 let context_score = self.storage.context_score(chunk_id, &context_key)?;
570 let mut fused = self.w_content * info.sim_content as f64
571 + self.w_trigger * info.sim_trigger as f64
572 + self.w_confidence * conf
573 + self.w_context * context_score;
574 if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
575 fused *= PENDING_RECALL_PENALTY;
576 }
577 let anti = info
578 .chunk
579 .get("anti_trigger_desc")
580 .and_then(Value::as_str)
581 .unwrap_or("");
582 if !anti.is_empty() && anti_trigger_hit(query, anti) {
583 fused *= self.anti_trigger_penalty;
584 }
585 let mut chunk = info.chunk;
586 chunk["_context_score"] = json!(context_score);
587 chunk["_fused_score"] = json!(fused);
588 scored.push((fused, chunk));
589 }
590 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
591 scored.truncate(self.top_k_candidates);
592 Ok(scored)
593 }
594
595 fn pack(
596 &self,
597 scored: &[(f64, Value)],
598 budget: usize,
599 expand_deps: &str,
600 allow_trim: bool,
601 query: &str,
602 ) -> Result<PackResult> {
603 let mut selected: Vec<Value> = vec![];
604 let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
605 let mut skipped_reasons: HashMap<String, String> = HashMap::new();
606 let mut used_ids: HashSet<String> = HashSet::new();
607 let mut used_tokens: usize = 0;
608
609 for (fused, chunk) in scored {
610 let cid = chunk["id"].as_str().unwrap_or("").to_string();
611 if used_ids.contains(&cid) {
612 continue;
613 }
614
615 let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
617 if let Some(reason) = dep_skip_reason {
618 skipped_reasons.insert(cid, reason);
619 continue;
620 }
621
622 let new_block: Vec<Value> = block
623 .iter()
624 .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
625 .cloned()
626 .collect();
627 let cost = block_cost(&new_block);
628
629 if used_tokens + cost <= budget {
630 for b in &block {
631 let bid = b["id"].as_str().unwrap_or("").to_string();
632 if !used_ids.contains(&bid) {
633 let mut b = b.clone();
634 b["_fused_score"] = json!(fused);
635 selected.push(b);
636 used_ids.insert(bid);
637 }
638 }
639 used_tokens += cost;
640 } else if allow_trim {
641 if let Some(trimmed) =
643 self.refiner
644 .trim(&block, query, budget.saturating_sub(used_tokens))
645 {
646 let trim_cost = block_cost(&trimmed);
647 if used_tokens + trim_cost <= budget {
648 for b in &trimmed {
649 let bid = b["id"].as_str().unwrap_or("").to_string();
650 if !used_ids.contains(&bid) {
651 let mut b = b.clone();
652 b["_fused_score"] = json!(fused);
653 b["_trimmed"] = json!(true);
654 selected.push(b);
655 used_ids.insert(bid);
656 }
657 }
658 used_tokens += trim_cost;
659 continue;
660 }
661 }
662 skipped.push((block, *fused, cost));
663 } else {
664 skipped.push((block, *fused, cost));
665 }
666 }
667 Ok((selected, skipped, skipped_reasons))
668 }
669
670 fn build_dep_block(
673 &self,
674 seed: &Value,
675 expand_deps: &str,
676 ) -> Result<(Vec<Value>, Option<String>)> {
677 if expand_deps == "false" || expand_deps.is_empty() {
678 return Ok((vec![seed.clone()], None));
679 }
680 let seed_id = seed["id"].as_str().unwrap_or("");
681 match expand_deps {
682 "direct" => {
683 let deps = self.storage.get_deps(seed_id)?;
684 let mut block = vec![seed.clone()];
685 for (dep_id, kind, _) in &deps {
686 if kind != "hard" {
687 continue;
688 }
689 match self.validate_hard_dep(dep_id)? {
690 Some(chunk) => block.push(chunk),
691 None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
692 }
693 }
694 Ok((block, None))
695 }
696 "closure" => {
697 let mut block = vec![seed.clone()];
698 let mut visited: HashSet<String> = [seed_id.to_string()].into();
699 match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
700 Some(reason) => Ok((vec![], Some(reason))),
701 None => Ok((block, None)),
702 }
703 }
704 _ => Ok((vec![seed.clone()], None)),
705 }
706 }
707
708 fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
710 match self.storage.get_chunk(dep_id)? {
711 None => Ok(None),
712 Some(chunk) => {
713 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
714 let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
715 let embed_v = chunk
716 .get("embed_version")
717 .and_then(Value::as_i64)
718 .unwrap_or(0);
719 if state == "archived" || origin == "spark" || embed_v == 0 {
720 Ok(None)
721 } else {
722 Ok(Some(chunk))
723 }
724 }
725 }
726 }
727
728 fn expand_hard_closure(
730 &self,
731 id: &str,
732 visited: &mut HashSet<String>,
733 block: &mut Vec<Value>,
734 depth: usize,
735 max_depth: usize,
736 ) -> Result<Option<String>> {
737 if depth >= max_depth {
738 return Ok(Some("dep_depth_limit".to_string()));
739 }
740 let deps = self.storage.get_deps(id)?;
741 for (dep_id, kind, _) in &deps {
742 if kind != "hard" {
743 continue;
744 }
745 if visited.contains(dep_id) {
746 continue;
747 } visited.insert(dep_id.clone());
749 match self.validate_hard_dep(dep_id)? {
750 None => return Ok(Some("hard_dep_unavailable".to_string())),
751 Some(chunk) => {
752 block.push(chunk);
753 if let Some(reason) =
754 self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
755 {
756 return Ok(Some(reason));
757 }
758 }
759 }
760 }
761 Ok(None)
762 }
763
764 fn density_refill(
765 &self,
766 mut selected: Vec<Value>,
767 skipped: &[(Vec<Value>, f64, usize)],
768 budget: usize,
769 ) -> Vec<Value> {
770 let used_tokens = block_cost(&selected);
771 if used_tokens >= budget {
772 return selected;
773 }
774
775 let selected_ids: HashSet<String> = selected
776 .iter()
777 .filter_map(|c| c["id"].as_str().map(str::to_string))
778 .collect();
779
780 let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
781 .iter()
782 .filter_map(|(block, fscore, _)| {
783 let block: Vec<Value> = block
784 .iter()
785 .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
786 .cloned()
787 .collect();
788 if block.is_empty() {
789 return None;
790 }
791 let cost = block_cost(&block);
792 let density = fscore / cost.max(1) as f64;
793 Some((density, block, cost))
794 })
795 .collect();
796 density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
797
798 let mut used_tokens = block_cost(&selected);
799 let mut added_ids: HashSet<String> = selected_ids;
800 for (_, block, cost) in density_items {
801 if used_tokens + cost <= budget {
802 for b in block {
803 let bid = b["id"].as_str().unwrap_or("").to_string();
804 if !added_ids.contains(&bid) {
805 selected.push(b);
806 added_ids.insert(bid);
807 }
808 }
809 used_tokens += cost;
810 }
811 }
812 selected
813 }
814
815 fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
816 let embed_version = self
817 .storage
818 .get_meta("embed_version")?
819 .and_then(|v| v.parse::<i64>().ok())
820 .unwrap_or(1);
821
822 let content_res = self
823 .storage
824 .search_vec_content(q_content, self.top_k_candidates)?;
825 let trigger_res = self
826 .storage
827 .search_vec_trigger(q_trigger, self.top_k_candidates)?;
828
829 let all_ids: Vec<&str> = {
831 let mut seen = HashSet::new();
832 content_res
833 .iter()
834 .chain(trigger_res.iter())
835 .map(|(id, _)| id.as_str())
836 .filter(|id| seen.insert(*id))
837 .collect()
838 };
839 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
840
841 let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
842 for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
843 if let Some(chunk) = chunks.get(cid) {
844 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
845 continue;
846 }
847 if chunk.get("state").and_then(Value::as_str) == Some("archived") {
848 continue;
849 }
850 let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
851 if maturity == "promoted" || maturity == "dropped" {
852 continue;
853 }
854 let ev = chunk
855 .get("embed_version")
856 .and_then(Value::as_i64)
857 .unwrap_or(1);
858 if ev < embed_version {
859 continue;
860 }
861 let entry = spark_scores
862 .entry(cid.clone())
863 .or_insert_with(|| (*sim, chunk.clone()));
864 if *sim > entry.0 {
865 *entry = (*sim, chunk.clone());
866 }
867 }
868 }
869 let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
870 sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
871 Ok(sparks
872 .into_iter()
873 .take(self.top_k_candidates)
874 .map(|(_, c)| c)
875 .collect())
876 }
877
878 #[allow(clippy::too_many_arguments)]
879 fn write_recall_trace(
880 &self,
881 trace_id: &str,
882 query: &str,
883 scored: &[(f64, Value)],
884 visible: &[Value],
885 sparks: &[Value],
886 depth_skipped: &[String],
887 skipped_reasons: &HashMap<String, String>,
888 refine_mode: &str,
889 source: &str,
890 now: &str,
891 ) -> Result<()> {
892 let lib_id = self.storage.lib_id()?;
893 self.storage.begin_immediate()?;
894 let result = (|| -> Result<()> {
895 for (rank, (_, chunk)) in scored.iter().enumerate() {
896 let cid = chunk["id"].as_str().unwrap_or("");
897 let sim = chunk.get("_fused_score").and_then(Value::as_f64);
898 let rm = skipped_reasons
900 .get(cid)
901 .map(|r| format!("skipped:{r}"))
902 .or_else(|| {
903 if refine_mode != "off" && !refine_mode.is_empty() {
904 Some(refine_mode.to_string())
905 } else {
906 None
907 }
908 });
909 self.storage.insert_usage_trace(
910 trace_id,
911 Some(cid),
912 "retrieved",
913 1.0,
914 sim,
915 rm.as_deref(),
916 None,
917 Some((rank + 1) as i64),
918 None,
919 source,
920 now,
921 )?;
922 }
923 for (rank, chunk) in visible.iter().enumerate() {
924 let cid = chunk["id"].as_str().unwrap_or("");
925 self.storage.insert_usage_trace(
926 trace_id,
927 Some(cid),
928 "selected",
929 1.0,
930 None,
931 None,
932 None,
933 Some((rank + 1) as i64),
934 None,
935 source,
936 now,
937 )?;
938 if chunk
940 .get("_trimmed")
941 .and_then(Value::as_bool)
942 .unwrap_or(false)
943 {
944 self.storage.insert_usage_trace(
945 trace_id,
946 Some(cid),
947 "refined",
948 1.0,
949 None,
950 Some("trim"),
951 None,
952 Some((rank + 1) as i64),
953 None,
954 source,
955 now,
956 )?;
957 }
958 }
959 for (rank, chunk) in sparks.iter().enumerate() {
961 let cid = chunk["id"].as_str().unwrap_or("");
962 self.storage.insert_usage_trace(
963 trace_id,
964 Some(cid),
965 "retrieved",
966 1.0,
967 None,
968 Some("spark"),
969 None,
970 Some((rank + 1) as i64),
971 None,
972 source,
973 now,
974 )?;
975 }
976 let snapshot = json!({
977 "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
978 "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
979 "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
980 "depth_skipped": depth_skipped,
981 "skipped_reasons": skipped_reasons,
982 });
983 let log = EpisodicLogRow {
984 id: gen_uuid(),
985 trace_id: trace_id.to_string(),
986 lib_id,
987 ts: now.to_string(),
988 query: Some(query.to_string()),
989 recall_snapshot: Some(snapshot.to_string()),
990 event_source: source.to_string(),
991 task_state: "recalled".to_string(),
992 usage_state: "unknown".to_string(),
993 context_key: Some(content_hash(&normalize_query(query))),
994 distill_state: "open".to_string(),
995 ..Default::default()
996 };
997 self.storage.upsert_episodic_log(&log)?;
998 self.storage.commit()
999 })();
1000 if result.is_err() {
1001 let _ = self.storage.rollback();
1002 }
1003 result
1004 }
1005
1006 #[allow(clippy::too_many_arguments)]
1011 pub fn record(
1012 &self,
1013 trace_id: &str,
1014 query: Option<&str>,
1015 output: Option<&str>,
1016 output_summary: Option<&str>,
1017 outcome: Option<&str>,
1018 used: Option<&[String]>,
1019 feedback_up: Option<&[String]>,
1020 feedback_down: Option<&[String]>,
1021 nomination: Option<&str>,
1022 priority: i64,
1023 source: &str,
1024 ) -> Result<()> {
1025 self.record_detailed(
1026 trace_id,
1027 query,
1028 output,
1029 output_summary,
1030 outcome,
1031 used,
1032 "explicit",
1033 true,
1034 feedback_up,
1035 feedback_down,
1036 "user",
1037 None,
1038 None,
1039 nomination,
1040 priority,
1041 None,
1042 source,
1043 )
1044 }
1045
1046 #[allow(clippy::too_many_arguments)]
1047 pub fn record_detailed(
1048 &self,
1049 trace_id: &str,
1050 query: Option<&str>,
1051 output: Option<&str>,
1052 output_summary: Option<&str>,
1053 outcome: Option<&str>,
1054 used: Option<&[String]>,
1055 used_attribution: &str,
1056 used_complete: bool,
1057 feedback_up: Option<&[String]>,
1058 feedback_down: Option<&[String]>,
1059 feedback_kind: &str,
1060 feedback_actor: Option<&str>,
1061 feedback_reason: Option<&str>,
1062 nomination: Option<&str>,
1063 priority: i64,
1064 task_state: Option<&str>,
1065 source: &str,
1066 ) -> Result<()> {
1067 let dedupe_ids = |ids: &[String]| {
1068 let mut seen = HashSet::new();
1069 ids.iter()
1070 .filter(|id| seen.insert((*id).clone()))
1071 .cloned()
1072 .collect::<Vec<_>>()
1073 };
1074 let normalized_used = used.map(dedupe_ids);
1075 let normalized_feedback_up = feedback_up.map(dedupe_ids);
1076 let normalized_feedback_down = feedback_down.map(dedupe_ids);
1077 let used = normalized_used.as_deref();
1078 let feedback_up = normalized_feedback_up.as_deref();
1079 let feedback_down = normalized_feedback_down.as_deref();
1080
1081 if let Some(o) = outcome {
1082 if !matches!(o, "ok" | "fail" | "unknown") {
1083 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
1084 }
1085 }
1086 if !matches!(used_attribution, "explicit" | "cited" | "inferred") {
1087 return Err(InnateError::InvalidState(format!(
1088 "invalid used attribution: {used_attribution}"
1089 )));
1090 }
1091 if !matches!(feedback_kind, "user" | "judge") {
1092 return Err(InnateError::InvalidState(format!(
1093 "invalid feedback kind: {feedback_kind}"
1094 )));
1095 }
1096 if let Some(state) = task_state {
1097 if !matches!(
1098 state,
1099 "recalled" | "running" | "completed" | "abandoned" | "timed_out"
1100 ) {
1101 return Err(InnateError::InvalidState(format!(
1102 "invalid task state: {state}"
1103 )));
1104 }
1105 }
1106 validate_source(source)?;
1107 if let (Some(ups), Some(downs)) = (feedback_up, feedback_down) {
1108 let down_set: HashSet<&str> = downs.iter().map(String::as_str).collect();
1109 if let Some(chunk_id) = ups.iter().find(|id| down_set.contains(id.as_str())) {
1110 return Err(InnateError::InvalidState(format!(
1111 "conflicting feedback for chunk {chunk_id}"
1112 )));
1113 }
1114 }
1115 let effective_priority = if nomination.is_some() && priority == 0 {
1116 1
1117 } else {
1118 priority
1119 };
1120 let now = utc_now_iso();
1121 let lib_id = self.storage.lib_id()?;
1122
1123 self.storage.begin_immediate()?;
1124 let result = (|| -> Result<()> {
1125 let log = self.storage.get_episodic_log(trace_id)?;
1126 let mut is_fresh_insert = false;
1127 let log = match log {
1128 Some(l) => l,
1129 None => {
1130 let used_ids = used.map(serde_json::to_string).transpose()?;
1131 let row = EpisodicLogRow {
1132 id: gen_uuid(),
1133 trace_id: trace_id.to_string(),
1134 lib_id,
1135 ts: now.clone(),
1136 query: query.map(str::to_string).or_else(|| Some(String::new())),
1137 output: output.map(str::to_string),
1138 output_summary: output_summary.map(str::to_string),
1139 outcome: outcome.map(str::to_string),
1140 event_source: source.to_string(),
1141 task_state: if matches!(outcome, Some("ok") | Some("fail")) {
1142 "completed".to_string()
1143 } else {
1144 task_state.unwrap_or("running").to_string()
1145 },
1146 completed_at: matches!(outcome, Some("ok") | Some("fail"))
1147 .then(|| now.clone()),
1148 usage_state: usage_state(used).to_string(),
1149 used_ids,
1150 used_attribution: used.map(|_| used_attribution.to_string()),
1151 used_complete,
1152 context_key: query.map(|q| content_hash(&normalize_query(q))),
1153 nomination: nomination.map(str::to_string),
1154 priority: effective_priority,
1155 distill_state: "open".to_string(),
1156 ..Default::default()
1157 };
1158 self.storage.upsert_episodic_log(&row)?;
1159 is_fresh_insert = true;
1160 self.storage.get_episodic_log(trace_id)?.unwrap()
1161 }
1162 };
1163 self.validate_trace_attribution(trace_id, used, "used")?;
1164 self.validate_trace_attribution(trace_id, feedback_up, "feedback_up")?;
1165 self.validate_trace_attribution(trace_id, feedback_down, "feedback_down")?;
1166
1167 let existing_outcome = log
1168 .get("outcome")
1169 .and_then(Value::as_str)
1170 .map(str::to_string);
1171 if let Some(new_outcome) = outcome {
1172 if let Some(ref ex) = existing_outcome {
1173 if ex != "unknown" && ex != new_outcome {
1174 return Err(InnateError::OutcomeConflict {
1175 trace_id: trace_id.to_string(),
1176 existing: ex.clone(),
1177 requested: new_outcome.to_string(),
1178 });
1179 }
1180 }
1181 }
1182
1183 let effective_used_attribution = if used.is_some() {
1185 used_attribution
1186 } else {
1187 log.get("used_attribution")
1188 .and_then(Value::as_str)
1189 .unwrap_or(used_attribution)
1190 };
1191 let used_strength = match effective_used_attribution {
1192 "explicit" => 0.3,
1193 "cited" => 0.25,
1194 "inferred" => 0.15,
1195 _ => unreachable!(),
1196 };
1197 let existing_used_ids: Vec<String> = log
1198 .get("used_ids")
1199 .and_then(Value::as_str)
1200 .and_then(|raw| serde_json::from_str(raw).ok())
1201 .unwrap_or_default();
1202 let existing_used_complete = log
1203 .get("used_complete")
1204 .and_then(Value::as_i64)
1205 .unwrap_or(0)
1206 != 0;
1207 let effective_used_complete = used_complete || existing_used_complete;
1208 let effective_used_ids = used.map(|reported| {
1209 if used_complete {
1210 reported.to_vec()
1211 } else {
1212 let mut merged = existing_used_ids.clone();
1213 let mut seen: HashSet<String> = merged.iter().cloned().collect();
1214 merged.extend(
1215 reported
1216 .iter()
1217 .filter(|id| seen.insert((*id).clone()))
1218 .cloned(),
1219 );
1220 merged
1221 }
1222 });
1223 if let Some(used_ids) = effective_used_ids.as_deref() {
1224 let previously_used: HashSet<String> = existing_used_ids.iter().cloned().collect();
1225 if used_complete {
1226 self.storage.replace_used_trace(
1227 trace_id,
1228 used_ids,
1229 used_strength,
1230 used_attribution,
1231 source,
1232 &now,
1233 )?;
1234 } else if let Some(reported) = used {
1235 self.storage.merge_used_trace(
1236 trace_id,
1237 reported,
1238 used_strength,
1239 used_attribution,
1240 source,
1241 &now,
1242 )?;
1243 }
1244 let affected: HashSet<String> = previously_used
1245 .into_iter()
1246 .chain(used_ids.iter().cloned())
1247 .collect();
1248 for cid in affected {
1249 self.storage.refresh_chunk_last_used(&cid, &now)?;
1250 }
1251 }
1252
1253 if let Some(o) = outcome {
1255 if matches!(o, "ok" | "fail") {
1256 let event = if o == "ok" { "task_ok" } else { "task_fail" };
1257 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
1258 self.storage.insert_usage_trace(
1259 trace_id, None, event, strength, None, None, None, None, None, source, &now,
1260 )?;
1261 }
1262 }
1263
1264 let effective_outcome = outcome
1268 .filter(|value| *value != "unknown")
1269 .or(existing_outcome.as_deref().filter(|value| *value != "unknown"));
1270 if let Some(o @ ("ok" | "fail")) = effective_outcome {
1271 if used.is_some()
1272 || (outcome.is_some_and(|value| value != "unknown")
1273 && existing_outcome.as_deref() != outcome)
1274 {
1275 let fallback_ids: Vec<String>;
1276 let effective_used: Option<&[String]> = if effective_used_ids.is_some() {
1277 effective_used_ids.as_deref()
1278 } else {
1279 fallback_ids = log
1280 .get("used_ids")
1281 .and_then(Value::as_str)
1282 .and_then(|s| serde_json::from_str(s).ok())
1283 .unwrap_or_default();
1284 if fallback_ids.is_empty() {
1285 None
1286 } else {
1287 Some(&fallback_ids)
1288 }
1289 };
1290 let effective_complete = if used.is_some() {
1291 effective_used_complete
1292 } else {
1293 log.get("usage_state").and_then(Value::as_str) != Some("unknown")
1294 && log
1295 .get("used_complete")
1296 .and_then(Value::as_i64)
1297 .unwrap_or(1)
1298 != 0
1299 };
1300 self.replace_outcome_evidence(
1301 trace_id,
1302 o,
1303 effective_used,
1304 effective_complete,
1305 &now,
1306 )?;
1307 }
1308 } else if used.is_some() && effective_used_complete {
1309 self.replace_selected_unused_evidence(
1310 trace_id,
1311 effective_used_ids.as_deref().unwrap_or_default(),
1312 &now,
1313 )?;
1314 }
1315
1316 let context_key = log
1317 .get("context_key")
1318 .and_then(Value::as_str)
1319 .map(str::to_string)
1320 .or_else(|| query.map(|q| content_hash(&normalize_query(q))));
1321 let feedback_strength = if feedback_kind == "judge" { 0.6 } else { 1.0 };
1322
1323 let mut context_affected: HashSet<String> = HashSet::new();
1327 if let Some(used_ids) = effective_used_ids.as_deref() {
1328 for cid in used_ids {
1329 context_affected.insert(cid.clone());
1330 }
1331 }
1332 if let Some(ups) = feedback_up {
1333 for cid in ups {
1334 let corrected = self.storage
1335 .delete_feedback_event(trace_id, cid, "down")?;
1336 self.storage
1337 .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_down")?;
1338 let inserted = self.storage.insert_feedback_event(
1339 &gen_uuid(),
1340 trace_id,
1341 cid,
1342 "up",
1343 feedback_strength,
1344 source,
1345 feedback_actor,
1346 feedback_reason,
1347 context_key.as_deref(),
1348 &now,
1349 )?;
1350 if inserted > 0 {
1351 self.upsert_trace_confidence_evidence(
1352 trace_id,
1353 cid,
1354 "feedback_up",
1355 1.0,
1356 feedback_strength,
1357 if feedback_kind == "judge" {
1358 "judge_up"
1359 } else {
1360 "user_up"
1361 },
1362 context_key.as_deref(),
1363 &now,
1364 true,
1365 )?;
1366 self.storage.update_chunk_last_used(cid, &now)?;
1367 self.refresh_governance_evidence(cid, &now)?;
1368 context_affected.insert(cid.clone());
1369 } else if corrected > 0 {
1370 self.recompute_chunk_confidence(cid, &now)?;
1371 self.refresh_governance_evidence(cid, &now)?;
1372 context_affected.insert(cid.clone());
1373 }
1374 }
1375 }
1376 if let Some(downs) = feedback_down {
1377 for cid in downs {
1378 let corrected = self.storage
1379 .delete_feedback_event(trace_id, cid, "up")?;
1380 self.storage
1381 .delete_chunk_trace_confidence_evidence(trace_id, cid, "feedback_up")?;
1382 let inserted = self.storage.insert_feedback_event(
1383 &gen_uuid(),
1384 trace_id,
1385 cid,
1386 "down",
1387 feedback_strength,
1388 source,
1389 feedback_actor,
1390 feedback_reason,
1391 context_key.as_deref(),
1392 &now,
1393 )?;
1394 if inserted > 0 {
1395 self.upsert_trace_confidence_evidence(
1396 trace_id,
1397 cid,
1398 "feedback_down",
1399 0.0,
1400 feedback_strength,
1401 if feedback_kind == "judge" {
1402 "judge_down"
1403 } else {
1404 "user_down"
1405 },
1406 context_key.as_deref(),
1407 &now,
1408 true,
1409 )?;
1410 self.refresh_governance_evidence(cid, &now)?;
1411 context_affected.insert(cid.clone());
1412 } else if corrected > 0 {
1413 self.recompute_chunk_confidence(cid, &now)?;
1414 self.refresh_governance_evidence(cid, &now)?;
1415 context_affected.insert(cid.clone());
1416 }
1417 }
1418 }
1419 self.rebuild_context_stats_for(&context_affected, &now)?;
1421
1422 if !is_fresh_insert {
1424 self.storage.patch_episodic_log_content(
1425 trace_id,
1426 query,
1427 output,
1428 output_summary,
1429 nomination,
1430 effective_priority,
1431 )?;
1432 }
1433
1434 let lifecycle_state = if effective_outcome.is_some() {
1435 "completed"
1436 } else {
1437 task_state.unwrap_or_else(|| {
1438 log.get("task_state")
1439 .and_then(Value::as_str)
1440 .unwrap_or("running")
1441 })
1442 };
1443 let used_ids_json = effective_used_ids
1444 .as_deref()
1445 .map(serde_json::to_string)
1446 .transpose()?;
1447 self.storage.update_trace_lifecycle(
1448 trace_id,
1449 lifecycle_state,
1450 (lifecycle_state == "completed").then_some(now.as_str()),
1451 effective_used_ids
1452 .as_deref()
1453 .map(|ids| usage_state(Some(ids))),
1454 used_ids_json.as_deref(),
1455 used.map(|_| used_attribution),
1456 used.map(|_| effective_used_complete),
1457 )?;
1458
1459 let current_state = log
1461 .get("distill_state")
1462 .and_then(Value::as_str)
1463 .unwrap_or("open");
1464 let lifecycle_completed = lifecycle_state == "completed";
1465 let new_state = if current_state == "open"
1466 && matches!(lifecycle_state, "abandoned" | "timed_out")
1467 {
1468 Some("discarded")
1469 } else if current_state == "open" && lifecycle_completed {
1470 let has_material = output_summary.is_some()
1471 || nomination.is_some()
1472 || output.is_some()
1473 || log.get("output_summary").and_then(Value::as_str).is_some()
1474 || log.get("nomination").and_then(Value::as_str).is_some()
1475 || log.get("output").and_then(Value::as_str).is_some();
1476 if has_material {
1477 Some("new")
1478 } else {
1479 Some("discarded")
1480 }
1481 } else {
1482 None
1483 };
1484 if let Some(state) = new_state {
1485 let note = if state == "discarded" {
1486 Some(if matches!(lifecycle_state, "abandoned" | "timed_out") {
1487 lifecycle_state
1488 } else {
1489 "insufficient_material"
1490 })
1491 } else {
1492 None
1493 };
1494 let outcome_str = outcome.map(str::to_string);
1495 self.storage.update_episodic_log_state(
1496 trace_id,
1497 state,
1498 note,
1499 outcome_str.as_deref(),
1500 )?;
1501 } else if outcome.is_some() {
1502 let outcome_str = outcome.map(str::to_string);
1503 self.storage.update_episodic_log_state(
1504 trace_id,
1505 current_state,
1506 None,
1507 outcome_str.as_deref(),
1508 )?;
1509 }
1510
1511 self.storage.commit()
1512 })();
1513 if result.is_err() {
1514 let _ = self.storage.rollback();
1515 }
1516 result?;
1517 self.enqueue_evolve_if_needed(&now)?;
1518 Ok(())
1519 }
1520
1521 fn validate_trace_attribution(
1522 &self,
1523 trace_id: &str,
1524 chunk_ids: Option<&[String]>,
1525 field: &str,
1526 ) -> Result<()> {
1527 let Some(chunk_ids) = chunk_ids else {
1528 return Ok(());
1529 };
1530 if chunk_ids.is_empty() {
1531 return Ok(());
1532 }
1533
1534 let log = self.storage.get_episodic_log(trace_id)?.ok_or_else(|| {
1535 InnateError::InvalidState(format!(
1536 "{field} requires a trace created by recall: {trace_id}"
1537 ))
1538 })?;
1539 let mut attributable = HashSet::new();
1540 if let Some(raw) = log.get("recall_snapshot").and_then(Value::as_str) {
1541 if let Ok(snapshot) = serde_json::from_str::<Value>(raw) {
1542 if let Some(ids) = snapshot.get("selected").and_then(Value::as_array) {
1543 attributable.extend(ids.iter().filter_map(Value::as_str).map(str::to_string));
1544 }
1545 }
1546 }
1547 let rows = self.storage.query_chunks_params(
1548 "SELECT DISTINCT chunk_id FROM usage_trace
1549 WHERE trace_id=? AND chunk_id IS NOT NULL
1550 AND event='selected'",
1551 rusqlite::params![trace_id],
1552 )?;
1553 attributable.extend(rows.iter().filter_map(|row| {
1554 row.get("chunk_id")
1555 .and_then(Value::as_str)
1556 .map(str::to_string)
1557 }));
1558
1559 for chunk_id in chunk_ids {
1560 if self.storage.get_chunk(chunk_id)?.is_none() || !attributable.contains(chunk_id) {
1561 return Err(InnateError::InvalidState(format!(
1562 "{field} chunk {chunk_id} was not attributable to trace {trace_id}"
1563 )));
1564 }
1565 }
1566 Ok(())
1567 }
1568
1569 fn replace_selected_unused_evidence(
1570 &self,
1571 trace_id: &str,
1572 used_ids: &[String],
1573 now: &str,
1574 ) -> Result<()> {
1575 let old_rows = self.storage.query_chunks_params(
1576 "SELECT DISTINCT chunk_id FROM confidence_evidence
1577 WHERE trace_id=? AND kind='selected_unused'",
1578 rusqlite::params![trace_id],
1579 )?;
1580 let mut affected: HashSet<String> = old_rows
1581 .iter()
1582 .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1583 .collect();
1584 self.storage
1585 .delete_trace_confidence_evidence(trace_id, &["selected_unused"])?;
1586
1587 let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1588 let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1589 log.get("context_key")
1590 .and_then(Value::as_str)
1591 .map(str::to_string)
1592 });
1593 let selected_rows = self.storage.query_chunks_params(
1594 "SELECT chunk_id FROM usage_trace
1595 WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1596 rusqlite::params![trace_id],
1597 )?;
1598 for row in selected_rows {
1599 if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1600 if !used_set.contains(chunk_id) {
1601 self.upsert_trace_confidence_evidence(
1602 trace_id,
1603 chunk_id,
1604 "selected_unused",
1605 0.0,
1606 0.08,
1607 "selected_unused",
1608 context_key.as_deref(),
1609 now,
1610 false,
1611 )?;
1612 affected.insert(chunk_id.to_string());
1613 }
1614 }
1615 }
1616 for chunk_id in affected {
1617 self.recompute_chunk_confidence(&chunk_id, now)?;
1618 }
1619 Ok(())
1620 }
1621
1622 #[allow(clippy::too_many_arguments)]
1623 fn upsert_trace_confidence_evidence(
1624 &self,
1625 trace_id: &str,
1626 chunk_id: &str,
1627 kind: &str,
1628 target: f64,
1629 strength: f64,
1630 reason: &str,
1631 context_key: Option<&str>,
1632 now: &str,
1633 explicit: bool,
1634 ) -> Result<()> {
1635 let chunk = match self.storage.get_chunk(chunk_id)? {
1636 Some(chunk) => chunk,
1637 None => return Ok(()),
1638 };
1639 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1640 return Ok(());
1641 }
1642 let recency_weight = if explicit {
1643 const KAPPA: f64 = 0.5;
1644 const WINDOW_DAYS: f64 = 14.0;
1645 let gap_days = chunk
1646 .get("last_used_at")
1647 .and_then(Value::as_str)
1648 .map(|ts| iso_days_diff(now, ts) as f64)
1649 .unwrap_or(0.0);
1650 (1.0
1651 + KAPPA
1652 * (-(gap_days / WINDOW_DAYS) * std::f64::consts::LN_2).exp())
1653 .min(1.5)
1654 } else {
1655 1.0
1656 };
1657 let alpha = (0.2 * strength * recency_weight).clamp(0.0, 1.0);
1658 self.storage.upsert_confidence_evidence(
1659 &gen_uuid(),
1660 Some(trace_id),
1661 chunk_id,
1662 kind,
1663 target,
1664 alpha,
1665 reason,
1666 context_key,
1667 now,
1668 )?;
1669 self.recompute_chunk_confidence(chunk_id, now)
1670 }
1671
1672 fn recompute_chunk_confidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1673 let Some(chunk) = self.storage.get_chunk(chunk_id)? else {
1674 return Ok(());
1675 };
1676 let mut confidence = chunk
1677 .get("confidence_base")
1678 .and_then(Value::as_f64)
1679 .unwrap_or_else(|| {
1680 chunk
1681 .get("confidence")
1682 .and_then(Value::as_f64)
1683 .unwrap_or(0.5)
1684 });
1685 let mut reason = chunk
1686 .get("confidence_reason")
1687 .and_then(Value::as_str)
1688 .unwrap_or("base")
1689 .to_string();
1690 for evidence in self.storage.confidence_evidence_for_chunk(chunk_id)? {
1691 let target = evidence
1692 .get("target")
1693 .and_then(Value::as_f64)
1694 .unwrap_or(0.5);
1695 let alpha = evidence
1696 .get("alpha")
1697 .and_then(Value::as_f64)
1698 .unwrap_or(0.0)
1699 .clamp(0.0, 1.0);
1700 confidence = (confidence + alpha * (target - confidence)).clamp(0.0, 1.0);
1701 reason = evidence
1702 .get("reason")
1703 .and_then(Value::as_str)
1704 .unwrap_or("evidence")
1705 .to_string();
1706 }
1707 self.storage.conn_execute(
1708 "UPDATE chunks SET confidence=?, confidence_reason=?, updated_at=? WHERE id=?",
1709 rusqlite::params![confidence, reason, now, chunk_id],
1710 )
1711 }
1712
1713 #[allow(clippy::too_many_arguments)]
1714 fn replace_outcome_evidence(
1715 &self,
1716 trace_id: &str,
1717 outcome: &str,
1718 used: Option<&[String]>,
1719 used_complete: bool,
1720 now: &str,
1721 ) -> Result<()> {
1722 let old_rows = self.storage.query_chunks_params(
1723 "SELECT DISTINCT chunk_id FROM confidence_evidence
1724 WHERE trace_id=? AND kind IN ('outcome_ok','outcome_fail','selected_unused')",
1725 rusqlite::params![trace_id],
1726 )?;
1727 let mut affected: HashSet<String> = old_rows
1728 .iter()
1729 .filter_map(|row| row.get("chunk_id").and_then(Value::as_str).map(str::to_string))
1730 .collect();
1731 self.storage.delete_trace_confidence_evidence(
1732 trace_id,
1733 &["outcome_ok", "outcome_fail", "selected_unused"],
1734 )?;
1735
1736 let used_ids = used.unwrap_or_default();
1737 let used_set: HashSet<&str> = used_ids.iter().map(String::as_str).collect();
1738 let attribution_divisor = used_ids.len().max(1) as f64;
1739 let context_key = self.storage.get_episodic_log(trace_id)?.and_then(|log| {
1740 log.get("context_key")
1741 .and_then(Value::as_str)
1742 .map(str::to_string)
1743 });
1744 for chunk_id in used_ids {
1745 let attribution = self
1746 .storage
1747 .query_chunks_params(
1748 "SELECT strength, attribution FROM usage_trace
1749 WHERE trace_id=? AND chunk_id=? AND event='used'",
1750 rusqlite::params![trace_id, chunk_id],
1751 )?
1752 .into_iter()
1753 .next();
1754 let base_strength = attribution
1755 .as_ref()
1756 .and_then(|row| row.get("strength"))
1757 .and_then(Value::as_f64)
1758 .unwrap_or(0.15)
1759 / attribution_divisor;
1760 let attribution_reason = attribution
1761 .as_ref()
1762 .and_then(|row| row.get("attribution"))
1763 .and_then(Value::as_str)
1764 .unwrap_or("inferred");
1765 let (kind, target, strength, reason) = if outcome == "ok" {
1766 ("outcome_ok", 1.0, base_strength, attribution_reason)
1767 } else {
1768 ("outcome_fail", 0.0, base_strength * 0.5, "task_fail")
1769 };
1770 self.upsert_trace_confidence_evidence(
1771 trace_id,
1772 chunk_id,
1773 kind,
1774 target,
1775 strength,
1776 reason,
1777 context_key.as_deref(),
1778 now,
1779 false,
1780 )?;
1781 affected.insert(chunk_id.clone());
1782 }
1783
1784 if used_complete {
1785 let selected_rows = self.storage.query_chunks_params(
1786 "SELECT chunk_id FROM usage_trace
1787 WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1788 rusqlite::params![trace_id],
1789 )?;
1790 for row in selected_rows {
1791 if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
1792 if !used_set.contains(chunk_id) {
1793 self.upsert_trace_confidence_evidence(
1794 trace_id,
1795 chunk_id,
1796 "selected_unused",
1797 0.0,
1798 0.08,
1799 "selected_unused",
1800 context_key.as_deref(),
1801 now,
1802 false,
1803 )?;
1804 affected.insert(chunk_id.to_string());
1805 }
1806 }
1807 }
1808 }
1809
1810 for chunk_id in affected {
1811 self.recompute_chunk_confidence(&chunk_id, now)?;
1812 }
1813 Ok(())
1814 }
1815
1816 fn rebuild_context_stats(&self, now: &str) -> Result<()> {
1817 self.storage.conn_execute(
1818 "DELETE FROM chunk_context_stats",
1819 rusqlite::params![],
1820 )?;
1821 self.storage.conn_execute(
1822 "INSERT INTO chunk_context_stats
1823 (chunk_id, context_key, success_count, failure_count,
1824 positive_feedback, negative_feedback, last_updated_at)
1825 SELECT chunk_id, context_key, success_count, failure_count,
1826 positive_feedback, negative_feedback, ?
1827 FROM chunk_context_stats_base",
1828 rusqlite::params![now],
1829 )?;
1830 self.storage.conn_execute(
1831 "INSERT INTO chunk_context_stats
1832 (chunk_id, context_key, success_count, failure_count,
1833 positive_feedback, negative_feedback, last_updated_at)
1834 SELECT chunk_id, context_key,
1835 SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1836 SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1837 0, 0, ?
1838 FROM confidence_evidence
1839 WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1840 GROUP BY chunk_id, context_key
1841 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1842 success_count=success_count+excluded.success_count,
1843 failure_count=failure_count+excluded.failure_count,
1844 last_updated_at=excluded.last_updated_at",
1845 rusqlite::params![now],
1846 )?;
1847 self.storage.conn_execute(
1848 "INSERT INTO chunk_context_stats
1849 (chunk_id, context_key, success_count, failure_count,
1850 positive_feedback, negative_feedback, last_updated_at)
1851 SELECT fe.chunk_id, fe.context_key, 0, 0,
1852 SUM(CASE WHEN fe.signal='up' THEN 1 ELSE 0 END),
1853 SUM(CASE WHEN fe.signal='down' THEN 1 ELSE 0 END), ?
1854 FROM feedback_events fe
1855 WHERE fe.context_key IS NOT NULL
1856 AND fe.ts > COALESCE((
1857 SELECT c.state_updated_at FROM chunks c
1858 WHERE c.id = fe.chunk_id AND c.state_reason = 'restore'
1859 ), '')
1860 GROUP BY fe.chunk_id, fe.context_key
1861 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1862 positive_feedback=positive_feedback+excluded.positive_feedback,
1863 negative_feedback=negative_feedback+excluded.negative_feedback,
1864 last_updated_at=excluded.last_updated_at",
1865 rusqlite::params![now],
1866 )
1867 }
1868
1869 fn rebuild_context_stats_for(&self, chunk_ids: &HashSet<String>, now: &str) -> Result<()> {
1873 if chunk_ids.is_empty() {
1874 return Ok(());
1875 }
1876 for chunk_id in chunk_ids {
1877 self.storage.conn_execute(
1878 "DELETE FROM chunk_context_stats WHERE chunk_id=?",
1879 rusqlite::params![chunk_id],
1880 )?;
1881 self.storage.conn_execute(
1882 "INSERT OR IGNORE INTO chunk_context_stats
1883 (chunk_id, context_key, success_count, failure_count,
1884 positive_feedback, negative_feedback, last_updated_at)
1885 SELECT chunk_id, context_key, success_count, failure_count,
1886 positive_feedback, negative_feedback, ?
1887 FROM chunk_context_stats_base WHERE chunk_id=?",
1888 rusqlite::params![now, chunk_id],
1889 )?;
1890 self.storage.conn_execute(
1891 "INSERT INTO chunk_context_stats
1892 (chunk_id, context_key, success_count, failure_count,
1893 positive_feedback, negative_feedback, last_updated_at)
1894 SELECT chunk_id, context_key,
1895 SUM(CASE WHEN kind='outcome_ok' THEN 1 ELSE 0 END),
1896 SUM(CASE WHEN kind='outcome_fail' THEN 1 ELSE 0 END),
1897 0, 0, ?
1898 FROM confidence_evidence
1899 WHERE context_key IS NOT NULL AND kind IN ('outcome_ok','outcome_fail')
1900 AND chunk_id=?
1901 GROUP BY chunk_id, context_key
1902 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1903 success_count=success_count+excluded.success_count,
1904 failure_count=failure_count+excluded.failure_count,
1905 last_updated_at=excluded.last_updated_at",
1906 rusqlite::params![now, chunk_id],
1907 )?;
1908 self.storage.conn_execute(
1909 "INSERT INTO chunk_context_stats
1910 (chunk_id, context_key, success_count, failure_count,
1911 positive_feedback, negative_feedback, last_updated_at)
1912 SELECT chunk_id, context_key, 0, 0,
1913 SUM(CASE WHEN signal='up' THEN 1 ELSE 0 END),
1914 SUM(CASE WHEN signal='down' THEN 1 ELSE 0 END), ?
1915 FROM feedback_events
1916 WHERE context_key IS NOT NULL AND chunk_id=?
1917 AND ts > COALESCE((
1918 SELECT state_updated_at FROM chunks
1919 WHERE id=? AND state_reason='restore'
1920 ), '')
1921 GROUP BY chunk_id, context_key
1922 ON CONFLICT(chunk_id, context_key) DO UPDATE SET
1923 positive_feedback=positive_feedback+excluded.positive_feedback,
1924 negative_feedback=negative_feedback+excluded.negative_feedback,
1925 last_updated_at=excluded.last_updated_at",
1926 rusqlite::params![now, chunk_id, chunk_id],
1927 )?;
1928 }
1929 Ok(())
1930 }
1931
1932 fn refresh_governance_evidence(&self, chunk_id: &str, now: &str) -> Result<()> {
1933 let rows = self.storage.query_chunks_params(
1934 "SELECT COALESCE(actor, 'anonymous:' || source) AS actor_key,
1935 signal, strength, ts
1936 FROM feedback_events
1937 WHERE chunk_id=?
1938 AND ts > COALESCE((
1939 SELECT state_updated_at FROM chunks
1940 WHERE id=? AND state_reason='restore'
1941 ), '')",
1942 rusqlite::params![chunk_id, chunk_id],
1943 )?;
1944 let mut actor_contributions: HashMap<String, f64> = HashMap::new();
1945 for row in rows {
1946 let actor = row
1947 .get("actor_key")
1948 .and_then(Value::as_str)
1949 .unwrap_or("anonymous")
1950 .to_string();
1951 let age_days = row
1952 .get("ts")
1953 .and_then(Value::as_str)
1954 .map(|ts| iso_days_diff(now, ts).max(0) as f64)
1955 .unwrap_or(0.0);
1956 let recency_weight = 0.5_f64.powf(age_days / 90.0);
1957 let strength = row.get("strength").and_then(Value::as_f64).unwrap_or(0.0);
1958 let signed = if row.get("signal").and_then(Value::as_str) == Some("down") {
1959 strength
1960 } else {
1961 -strength
1962 };
1963 *actor_contributions.entry(actor).or_default() += signed * recency_weight;
1964 }
1965 let mut score = 0.0_f64;
1966 let mut actor_count = 0_i64;
1967 for contribution in actor_contributions.values().copied() {
1968 let contribution = contribution.clamp(-1.0, 1.0);
1969 score += contribution;
1970 if contribution > 0.0 {
1971 actor_count += 1;
1972 }
1973 }
1974 let score = score.max(0.0);
1975 if score >= 2.0 && actor_count >= 2 {
1976 self.storage.upsert_governance_proposal(
1977 &gen_uuid(),
1978 chunk_id,
1979 "review_applicability",
1980 "Weighted negative feedback",
1981 score.ceil() as i64,
1982 score,
1983 actor_count,
1984 now,
1985 )?;
1986 } else {
1987 self.storage.conn_execute(
1988 "UPDATE governance_proposals
1989 SET state='rejected', evidence_count=?, evidence_score=?, actor_count=?, updated_at=?
1990 WHERE chunk_id=? AND state='pending'",
1991 rusqlite::params![score.ceil() as i64, score, actor_count, now, chunk_id],
1992 )?;
1993 }
1994 Ok(())
1995 }
1996
1997 fn enqueue_evolve_if_needed(&self, now: &str) -> Result<()> {
1998 let ready = count_query(
1999 &self.storage,
2000 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2001 )?;
2002 let oldest = self
2003 .storage
2004 .query_chunks("SELECT MIN(ts) AS oldest FROM episodic_log WHERE distill_state='new'")?
2005 .first()
2006 .and_then(|row| row.get("oldest"))
2007 .and_then(Value::as_str)
2008 .map(str::to_string);
2009 let age_due = oldest
2010 .as_deref()
2011 .is_some_and(|ts| ts <= hours_ago(now, self.evolve_schedule_interval_hours).as_str());
2012 let governance_pending = count_query(
2013 &self.storage,
2014 "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
2015 )?;
2016 let governance_ready = count_query_params(
2019 &self.storage,
2020 "SELECT COUNT(*) FROM governance_proposals
2021 WHERE state='pending'
2022 AND evidence_score >= ? AND actor_count >= 2",
2023 rusqlite::params![self.governance_archive_threshold as f64],
2024 )?;
2025 if ready >= self.evolve_threshold
2026 || (ready > 0 && age_due)
2027 || governance_pending >= self.governance_evolve_threshold
2028 || governance_ready > 0
2029 {
2030 let reason = if ready >= self.evolve_threshold {
2031 "threshold"
2032 } else if governance_ready > 0 {
2033 "governance_ready"
2034 } else if governance_pending >= self.governance_evolve_threshold {
2035 "governance"
2036 } else {
2037 "scheduled"
2038 };
2039 self.storage.request_evolve(&gen_uuid(), reason, now)?;
2040 }
2041 Ok(())
2042 }
2043
2044 pub fn add(
2049 &self,
2050 content: &str,
2051 kind: &str,
2052 trigger_desc: Option<&str>,
2053 anti_trigger_desc: Option<&str>,
2054 source: &str,
2055 skill_name: Option<&str>,
2056 ) -> Result<String> {
2057 if !matches!(kind, "note" | "skill") {
2058 return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
2059 }
2060 if !matches!(source, "chat" | "manual" | "doc" | "agent") {
2061 return Err(InnateError::InvalidState(format!(
2062 "invalid source: {source}"
2063 )));
2064 }
2065
2066 let (content, action) = self.sanitize_content(content);
2067 if action == SanitizeAction::Discard {
2068 return Ok(String::new());
2069 }
2070
2071 let trigger_clean = trigger_desc.and_then(|t| {
2072 let (cleaned, act) = self.sanitizer.sanitize(t);
2073 if act == SanitizeAction::Discard {
2074 None
2075 } else {
2076 Some(cleaned)
2077 }
2078 });
2079 let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2080 let (cleaned, act) = self.sanitizer.sanitize(t);
2081 if act == SanitizeAction::Discard {
2082 None
2083 } else {
2084 Some(cleaned)
2085 }
2086 });
2087
2088 let h = content_hash(&content);
2089 if self.storage.is_hash_invalidated(&h)? {
2090 return Err(InnateError::InvalidState(
2091 "content hash is invalidated".into(),
2092 ));
2093 }
2094
2095 let existing = self.storage.query_chunks_params(
2097 "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2098 rusqlite::params![h],
2099 )?;
2100 if let Some(e) = existing.first() {
2101 if let Some(id) = e.get("id").and_then(Value::as_str) {
2102 return Ok(id.to_string());
2103 }
2104 }
2105
2106 let now = utc_now_iso();
2107 let chunk_id = gen_uuid();
2108 let redacted = action == SanitizeAction::Redact;
2109
2110 let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
2111 (
2112 "captured",
2113 "pending",
2114 if redacted { 0.4 } else { 0.60 },
2115 0,
2116 "init:captured_agent",
2117 )
2118 } else if kind == "skill" {
2119 (
2120 "installed",
2121 "active",
2122 if redacted { 0.4 } else { 0.85 },
2123 1,
2124 "init:installed",
2125 )
2126 } else {
2127 (
2128 "captured",
2129 "active",
2130 if redacted { 0.4 } else { 0.60 },
2131 0,
2132 "init:captured",
2133 )
2134 };
2135
2136 let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2138 let (cvec, tvec, embed_ver, final_state_reason) = match (
2139 self.embedding.embed_content(&content),
2140 self.embedding.embed_trigger(trigger_str),
2141 ) {
2142 (Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
2143 _ => (
2144 vec![],
2145 vec![],
2146 0i64,
2147 format!("embedding_pending:target={state}"),
2148 ),
2149 };
2150
2151 let tokens = estimate_tokens(&content) as i64;
2152 let row = ChunkRow {
2153 id: chunk_id.clone(),
2154 skill_name: skill_name.map(str::to_string),
2155 content: content.clone(),
2156 trigger_desc: trigger_clean.clone(),
2157 anti_trigger_desc: anti_trigger_clean.clone(),
2158 content_hash: h,
2159 token_count: Some(tokens),
2160 origin: origin.to_string(),
2161 source: Some(source.to_string()),
2162 protected: prot,
2163 state: state.to_string(),
2164 state_reason: Some(final_state_reason),
2165 confidence: conf,
2166 confidence_reason: Some(format!("init:{origin}")),
2167 version: 1,
2168 embed_version: embed_ver,
2169 created_at: now.clone(),
2170 updated_at: now.clone(),
2171 ..Default::default()
2172 };
2173
2174 self.storage.begin_immediate()?;
2175 let result = (|| -> Result<()> {
2176 self.storage.insert_chunk(&row)?;
2177 if embed_ver > 0 {
2178 self.storage
2179 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2180 self.storage
2181 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2182 }
2183 self.storage.commit()
2184 })();
2185 if result.is_err() {
2186 let _ = self.storage.rollback();
2187 }
2188 result?;
2189 Ok(chunk_id)
2190 }
2191
2192 pub fn spark(
2197 &self,
2198 content: &str,
2199 trigger_desc: Option<&str>,
2200 anti_trigger_desc: Option<&str>,
2201 ) -> Result<String> {
2202 let (content, action) = self.sanitize_content(content);
2203 if action == SanitizeAction::Discard {
2204 return Ok(String::new());
2205 }
2206
2207 let trigger_clean = trigger_desc.and_then(|t| {
2208 let (cleaned, act) = self.sanitizer.sanitize(t);
2209 if act == SanitizeAction::Discard {
2210 None
2211 } else {
2212 Some(cleaned)
2213 }
2214 });
2215 let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
2216 let (cleaned, act) = self.sanitizer.sanitize(t);
2217 if act == SanitizeAction::Discard {
2218 None
2219 } else {
2220 Some(cleaned)
2221 }
2222 });
2223
2224 let h = content_hash(&content);
2225 if self.storage.is_hash_invalidated(&h)? {
2226 return Err(InnateError::InvalidState(
2227 "content hash is invalidated".into(),
2228 ));
2229 }
2230
2231 let related: Vec<String> = self
2233 .recall(
2234 &content,
2235 2000,
2236 false,
2237 false,
2238 Some(5),
2239 "sdk",
2240 "false",
2241 false,
2242 "off",
2243 )
2244 .map(|r| {
2245 r.knowledge
2246 .iter()
2247 .filter_map(|c| c["id"].as_str().map(str::to_string))
2248 .collect()
2249 })
2250 .unwrap_or_default();
2251
2252 let now = utc_now_iso();
2253 let chunk_id = gen_uuid();
2254 let tokens = estimate_tokens(&content) as i64;
2255
2256 let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
2257 let (cvec, tvec, embed_ver, state_reason) = match (
2258 self.embedding.embed_content(&content),
2259 self.embedding.embed_trigger(trigger_str),
2260 ) {
2261 (Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
2262 _ => (
2263 vec![],
2264 vec![],
2265 0i64,
2266 "embedding_pending:target=active".to_string(),
2267 ),
2268 };
2269
2270 let row = ChunkRow {
2271 id: chunk_id.clone(),
2272 content: content.clone(),
2273 trigger_desc: trigger_clean.clone(),
2274 anti_trigger_desc: anti_trigger_clean.clone(),
2275 content_hash: h,
2276 token_count: Some(tokens),
2277 origin: "spark".to_string(),
2278 maturity: Some("seed".to_string()),
2279 related_ids: if related.is_empty() {
2280 None
2281 } else {
2282 Some(related.join(","))
2283 },
2284 state: "active".to_string(),
2285 state_reason: Some(state_reason),
2286 confidence: 0.5,
2287 version: 1,
2288 embed_version: embed_ver,
2289 created_at: now.clone(),
2290 updated_at: now.clone(),
2291 ..Default::default()
2292 };
2293
2294 self.storage.begin_immediate()?;
2295 let result = (|| -> Result<()> {
2296 self.storage.insert_chunk(&row)?;
2297 if embed_ver > 0 {
2298 self.storage
2299 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
2300 self.storage
2301 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
2302 }
2303 self.storage.commit()
2304 })();
2305 if result.is_err() {
2306 let _ = self.storage.rollback();
2307 }
2308 result?;
2309 Ok(chunk_id)
2310 }
2311
2312 pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
2317 let chunk = self
2318 .storage
2319 .get_chunk(spark_id)?
2320 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2321 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
2322 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2323 }
2324 let current = chunk
2325 .get("maturity")
2326 .and_then(Value::as_str)
2327 .unwrap_or("seed");
2328 let valid_next: &[&str] = match current {
2329 "seed" => &["sprouting"],
2330 "sprouting" => &["incubating"],
2331 _ => {
2332 return Err(InnateError::InvalidState(format!(
2333 "spark {spark_id} already {current}"
2334 )))
2335 }
2336 };
2337 if current == to {
2338 return Ok(());
2339 }
2340 if !valid_next.contains(&to) {
2341 return Err(InnateError::InvalidState(format!(
2342 "invalid spark maturity transition: {current} -> {to}"
2343 )));
2344 }
2345 let now = utc_now_iso();
2346 self.storage.begin_immediate()?;
2347 let result = self
2348 .storage
2349 .query_chunks_params(
2350 "UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
2351 rusqlite::params![to, now, spark_id],
2352 )
2353 .and_then(|_| self.storage.commit());
2354 if result.is_err() {
2355 let _ = self.storage.rollback();
2356 }
2357 result.map(|_| ())
2358 }
2359
2360 pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
2361 let spark = self
2362 .storage
2363 .get_chunk(spark_id)?
2364 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2365 if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2366 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2367 }
2368 let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2369 if maturity == "promoted" || maturity == "dropped" {
2370 return Err(InnateError::InvalidState(format!(
2371 "spark {spark_id} already {maturity}"
2372 )));
2373 }
2374 if !matches!(to, "note" | "skill") {
2375 return Err(InnateError::InvalidState(format!(
2376 "invalid spark promotion target: {to}"
2377 )));
2378 }
2379
2380 let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
2381 let (content, action) = self.sanitize_content(content);
2382 if action == SanitizeAction::Discard {
2383 return Err(InnateError::InvalidState(
2384 "sanitize discard on promote".into(),
2385 ));
2386 }
2387
2388 let promoted_hash = content_hash(&content);
2389 if self.storage.is_hash_invalidated(&promoted_hash)? {
2390 return Err(InnateError::InvalidState(
2391 "spark content hash is invalidated".into(),
2392 ));
2393 }
2394
2395 let now = utc_now_iso();
2396
2397 let existing = self.storage.query_chunks_params(
2399 "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
2400 rusqlite::params![promoted_hash],
2401 )?;
2402 if let Some(e) = existing.first() {
2403 if let Some(id) = e.get("id").and_then(Value::as_str) {
2404 let id = id.to_string();
2405 self.storage.begin_immediate()?;
2406 let result = self
2407 .storage
2408 .query_chunks_params(
2409 "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2410 rusqlite::params![now, spark_id],
2411 )
2412 .and_then(|_| self.storage.commit());
2413 if result.is_err() {
2414 let _ = self.storage.rollback();
2415 result?;
2416 }
2417 return Ok(id);
2418 }
2419 }
2420
2421 let (state, conf, prot, origin, state_reason) = if to == "skill" {
2422 ("active", 0.85, 1, "installed", "init:installed")
2423 } else {
2424 ("active", 0.60, 0, "captured", "init:captured")
2425 };
2426
2427 let conf = if action == SanitizeAction::Redact {
2428 0.4_f64
2429 } else {
2430 conf
2431 };
2432 let new_id = gen_uuid();
2433 let trigger = spark.get("trigger_desc").and_then(Value::as_str);
2434 let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
2435
2436 let row = ChunkRow {
2437 id: new_id.clone(),
2438 content: content.clone(),
2439 trigger_desc: trigger.map(str::to_string),
2440 anti_trigger_desc: anti.map(str::to_string),
2441 content_hash: promoted_hash,
2442 token_count: Some(estimate_tokens(&content) as i64),
2443 origin: origin.to_string(),
2444 source: Some("manual".to_string()),
2445 protected: prot,
2446 state: state.to_string(),
2447 state_reason: Some(state_reason.to_string()),
2448 confidence: conf,
2449 confidence_reason: Some("manual_set".to_string()),
2450 parent_id: Some(spark_id.to_string()),
2451 version: 1,
2452 embed_version: 1,
2453 created_at: now.clone(),
2454 updated_at: now.clone(),
2455 ..Default::default()
2456 };
2457
2458 let cvec = self.embedding.embed_content(&content)?;
2459 let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
2460
2461 self.storage.begin_immediate()?;
2462 let result = (|| -> Result<()> {
2463 self.storage.insert_chunk(&row)?;
2464 self.storage
2465 .insert_vec_content(&new_id, &pack_embedding(&cvec))?;
2466 self.storage
2467 .insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
2468 self.storage.query_chunks_params(
2469 "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
2470 rusqlite::params![now, spark_id],
2471 )?;
2472 self.storage.commit()
2473 })();
2474 if result.is_err() {
2475 let _ = self.storage.rollback();
2476 }
2477 result?;
2478 Ok(new_id)
2479 }
2480
2481 pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
2482 let spark = self
2483 .storage
2484 .get_chunk(spark_id)?
2485 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
2486 if spark.get("origin").and_then(Value::as_str) != Some("spark") {
2487 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
2488 }
2489 let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
2490 if maturity == "promoted" {
2491 return Err(InnateError::InvalidState(format!(
2492 "spark {spark_id} already promoted"
2493 )));
2494 }
2495 if maturity == "dropped" {
2496 return Ok(());
2497 }
2498 let now = utc_now_iso();
2499 let reason_str = if reason.is_empty() {
2500 "dropped".to_string()
2501 } else {
2502 format!("dropped:{reason}")
2503 };
2504 self.storage.begin_immediate()?;
2505 let result = self
2506 .storage
2507 .query_chunks_params(
2508 "UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
2509 rusqlite::params![reason_str, now, spark_id],
2510 )
2511 .and_then(|_| self.storage.commit());
2512 if result.is_err() {
2513 let _ = self.storage.rollback();
2514 }
2515 result.map(|_| ())
2516 }
2517
2518 pub fn approve(&self, chunk_id: &str) -> Result<()> {
2523 let chunk = self
2524 .storage
2525 .get_chunk(chunk_id)?
2526 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2527 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2528 return Err(InnateError::InvalidState(
2529 "spark lifecycle uses promote_spark() or invalidate()".into(),
2530 ));
2531 }
2532 if chunk.get("state").and_then(Value::as_str) == Some("active") {
2533 return Ok(());
2534 }
2535 if chunk.get("state").and_then(Value::as_str) != Some("pending") {
2536 return Err(InnateError::InvalidState(
2537 "approve requires pending chunk".into(),
2538 ));
2539 }
2540 let now = utc_now_iso();
2541 self.storage.begin_immediate()?;
2542 let result = (|| -> Result<()> {
2543 self.storage
2544 .update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
2545 self.storage.query_chunks_params(
2546 "UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
2547 rusqlite::params![now, chunk_id],
2548 )?;
2549 self.storage.commit()
2550 })();
2551 if result.is_err() {
2552 let _ = self.storage.rollback();
2553 }
2554 result
2555 }
2556
2557 pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
2558 let chunk = self
2559 .storage
2560 .get_chunk(chunk_id)?
2561 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2562 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
2563 return Err(InnateError::InvalidState(
2564 "spark lifecycle uses drop_spark() or invalidate()".into(),
2565 ));
2566 }
2567 let now = utc_now_iso();
2568 self.storage.begin_immediate()?;
2569 let result = self
2570 .storage
2571 .update_chunk_state(chunk_id, "archived", Some(reason), &now)
2572 .and_then(|_| self.storage.commit());
2573 if result.is_err() {
2574 let _ = self.storage.rollback();
2575 }
2576 result
2577 }
2578
2579 pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
2580 let chunk = self
2581 .storage
2582 .get_chunk(chunk_id)?
2583 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2584 let h = chunk
2585 .get("content_hash")
2586 .and_then(Value::as_str)
2587 .unwrap_or("")
2588 .to_string();
2589 let now = utc_now_iso();
2590 let reason_str = if reason.is_empty() {
2591 "invalidated".to_string()
2592 } else {
2593 format!("invalidated:{reason}")
2594 };
2595
2596 self.storage.begin_immediate()?;
2597 let result = (|| -> Result<()> {
2598 self.storage.query_chunks_params(
2599 "UPDATE chunks
2600 SET state='archived', confidence=0.0, confidence_base=0.0,
2601 confidence_reason='invalidated', state_reason=?,
2602 state_updated_at=?, updated_at=?
2603 WHERE id=?",
2604 rusqlite::params![reason_str, now, now, chunk_id],
2605 )?;
2606 self.storage.query_chunks_params(
2607 "UPDATE chunks
2608 SET state='archived', confidence=0.0, confidence_base=0.0,
2609 confidence_reason='invalidated',
2610 state_reason='invalidated:same_hash',
2611 state_updated_at=?, updated_at=?
2612 WHERE content_hash=? AND id!=?",
2613 rusqlite::params![now, now, h, chunk_id],
2614 )?;
2615 self.storage.conn_execute(
2616 "DELETE FROM confidence_evidence
2617 WHERE chunk_id IN (SELECT id FROM chunks WHERE content_hash=?)",
2618 rusqlite::params![h],
2619 )?;
2620 self.storage
2621 .insert_invalidated_hash(&h, Some(reason), &now)?;
2622 self.storage.commit()
2623 })();
2624 if result.is_err() {
2625 let _ = self.storage.rollback();
2626 }
2627 result
2628 }
2629
2630 pub fn restore(&self, chunk_id: &str) -> Result<()> {
2631 let chunk = self
2632 .storage
2633 .get_chunk(chunk_id)?
2634 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
2635 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
2636 if state == "active" {
2637 return Ok(());
2638 }
2639 if state != "archived" {
2640 return Err(InnateError::InvalidState(
2641 "restore requires archived chunk".into(),
2642 ));
2643 }
2644 let was_invalidated = chunk
2645 .get("state_reason")
2646 .and_then(Value::as_str)
2647 .map(|r| r.starts_with("invalidated"))
2648 .unwrap_or(false);
2649 let h = chunk
2650 .get("content_hash")
2651 .and_then(Value::as_str)
2652 .unwrap_or("")
2653 .to_string();
2654 let now = utc_now_iso();
2655
2656 self.storage.begin_immediate()?;
2657 let result = (|| -> Result<()> {
2658 self.storage
2659 .update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
2660 if was_invalidated {
2661 self.storage.query_chunks_params(
2662 "DELETE FROM invalidated_hashes WHERE content_hash=?",
2663 rusqlite::params![h],
2664 )?;
2665 }
2666 self.storage.query_chunks_params(
2667 "UPDATE chunks
2668 SET confidence_base=CASE WHEN ? THEN 0.5 ELSE confidence_base END,
2669 confidence=CASE WHEN ? THEN 0.5 ELSE confidence END,
2670 confidence_reason='restore', updated_at=?
2671 WHERE id=?",
2672 rusqlite::params![was_invalidated, was_invalidated, now, chunk_id],
2673 )?;
2674 self.storage.conn_execute(
2675 "DELETE FROM confidence_evidence
2676 WHERE chunk_id=? AND kind IN ('feedback_up','feedback_down')",
2677 rusqlite::params![chunk_id],
2678 )?;
2679 self.storage.conn_execute(
2680 "UPDATE chunk_context_stats_base
2681 SET positive_feedback=0, negative_feedback=0
2682 WHERE chunk_id=?",
2683 rusqlite::params![chunk_id],
2684 )?;
2685 self.storage.conn_execute(
2686 "UPDATE governance_proposals
2687 SET state='rejected', reason=reason || '; restored by user', updated_at=?
2688 WHERE chunk_id=? AND state IN ('pending','accepted')",
2689 rusqlite::params![now, chunk_id],
2690 )?;
2691 self.recompute_chunk_confidence(chunk_id, &now)?;
2692 self.rebuild_context_stats_for(&HashSet::from([chunk_id.to_string()]), &now)?;
2693 self.storage.commit()
2694 })();
2695 if result.is_err() {
2696 let _ = self.storage.rollback();
2697 }
2698 result
2699 }
2700
2701 pub fn evolve(&self, trigger: &str) -> Result<Value> {
2706 if !matches!(trigger, "manual" | "scheduled" | "threshold") {
2707 return Err(InnateError::InvalidState(format!(
2708 "invalid evolve trigger: {trigger}"
2709 )));
2710 }
2711 let evolve_started_at = utc_now_iso();
2712 let request_id = self.storage.claim_evolve_request(
2713 &evolve_started_at,
2714 &minutes_ago(&evolve_started_at, self.screening_timeout_minutes),
2715 )?;
2716 if trigger == "scheduled" && request_id.is_none() {
2718 let curator = Arc::clone(&self.curator);
2719 let curate = curator.run(self, &CurateScope::default())?;
2720 return Ok(json!({
2721 "distilled": 0,
2722 "curate": self.format_curate_report(&curate),
2723 "skipped": "no_evolve_request"
2724 }));
2725 }
2726
2727 if trigger == "threshold" {
2729 let rows = self.storage.query_chunks(
2730 "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
2731 )?;
2732 let cnt = rows
2733 .first()
2734 .and_then(|r| r.get("cnt"))
2735 .and_then(Value::as_i64)
2736 .unwrap_or(0);
2737 if cnt < self.evolve_threshold {
2738 let curator = Arc::clone(&self.curator);
2739 let curate = curator.run(self, &CurateScope::default())?;
2740 if let Some(ref id) = request_id {
2741 self.storage.finish_evolve_request(
2742 id, "completed", Some("below_threshold"), &utc_now_iso(),
2743 )?;
2744 }
2745 return Ok(json!({
2746 "distilled": 0,
2747 "curate": self.format_curate_report(&curate),
2748 "skipped": "below_threshold"
2749 }));
2750 }
2751
2752 if let Some(limit) = self
2753 .storage
2754 .get_meta("max_distill_tokens_per_period")?
2755 .and_then(|value| value.parse::<i64>().ok())
2756 .filter(|value| *value > 0)
2757 {
2758 let period_start = self.distill_token_period_start(&utc_now_iso())?;
2759 let rows = self.storage.query_chunks_params(
2760 "SELECT COALESCE(SUM(distill_prompt_tokens),0)
2761 + COALESCE(SUM(distill_completion_tokens),0) AS used
2762 FROM episodic_log
2763 WHERE distill_accounted_at >= ?",
2764 rusqlite::params![period_start],
2765 )?;
2766 let used_tokens = rows
2767 .first()
2768 .and_then(|row| row.get("used"))
2769 .and_then(Value::as_i64)
2770 .unwrap_or(0);
2771 if used_tokens >= limit {
2772 let curator = Arc::clone(&self.curator);
2773 let curate = curator.run(self, &CurateScope::default())?;
2774 if let Some(ref id) = request_id {
2775 self.storage.finish_evolve_request(
2776 id, "completed", Some("distill_token_limit"), &utc_now_iso(),
2777 )?;
2778 }
2779 return Ok(json!({
2780 "distilled": 0,
2781 "curate": self.format_curate_report(&curate),
2782 "skipped": "distill_token_limit",
2783 "distill_tokens_used": used_tokens,
2784 "distill_token_limit": limit,
2785 "period_start": period_start,
2786 }));
2787 }
2788 }
2789 }
2790
2791 let result = (|| -> Result<Value> {
2792 let retry_cutoff = minutes_ago(&utc_now_iso(), 5);
2793 self.storage.conn_execute(
2794 "UPDATE episodic_log
2795 SET distill_state='new', distill_note='retry_failed',
2796 distill_locked_at=NULL, distill_run_id=NULL
2797 WHERE distill_state='failed'
2798 AND distill_attempts < 3
2799 AND COALESCE(distill_accounted_at, ts) < ?",
2800 rusqlite::params![retry_cutoff],
2801 )?;
2802 let distilled = self.distill_batch()?;
2803 let curator = Arc::clone(&self.curator);
2804 let curate = curator.run(self, &CurateScope::default())?;
2805 Ok(json!({
2806 "distilled": distilled,
2807 "curate": self.format_curate_report(&curate),
2808 }))
2809 })();
2810 if let Some(ref id) = request_id {
2811 let (state, note) = match &result {
2812 Ok(_) => ("completed", None),
2813 Err(error) => ("failed", Some(error.to_string())),
2814 };
2815 self.storage
2816 .finish_evolve_request(id, state, note.as_deref(), &utc_now_iso())?;
2817 }
2818 if result.is_ok() {
2819 self.storage
2820 .finish_covered_evolve_requests(&evolve_started_at, &utc_now_iso())?;
2821 }
2822
2823 if result.is_ok() {
2825 let remaining = count_query(
2826 &self.storage,
2827 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2828 )?;
2829 if remaining > 0 {
2830 let _ = self.storage.request_evolve(&gen_uuid(), "batch_continue", &utc_now_iso());
2831 }
2832 }
2833 result
2834 }
2835
2836 fn format_curate_report(&self, curate: &CurateReport) -> Value {
2837 json!({
2838 "archived": curate.archived.len(),
2839 "deduped": curate.deduped.len(),
2840 "decayed": curate.decayed.len(),
2841 "recovered": curate.recovered.len(),
2842 "orphans": curate.orphans.len(),
2843 "warnings": curate.warnings,
2844 })
2845 }
2846
2847 fn distill_batch(&self) -> Result<usize> {
2848 let run_id = gen_uuid();
2849 let now = utc_now_iso();
2850
2851 self.storage.begin_immediate()?;
2853 let logs = match self
2854 .storage
2855 .claim_distill_batch(&run_id, self.distill_batch_size, &now)
2856 {
2857 Ok(l) => {
2858 self.storage.commit()?;
2859 l
2860 }
2861 Err(e) => {
2862 let _ = self.storage.rollback();
2863 return Err(e);
2864 }
2865 };
2866
2867 let mut chunks_by_log: HashMap<String, Vec<DistilledChunk>> = HashMap::new();
2868 let mut failed_logs = HashSet::new();
2869 let mut distill_errors = Vec::new();
2870 for log in &logs {
2871 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
2872 match self.distiller.distill_with_context(log, &logs) {
2873 Ok(chunks) => {
2874 if chunks.iter().any(|chunk| chunk.source_log_id != log_id) {
2875 let error = "distiller returned a chunk for an unknown source log";
2876 failed_logs.insert(log_id.to_string());
2877 distill_errors.push(format!("{log_id}: {error}"));
2878 self.finish_distill_log(
2879 log_id,
2880 "failed",
2881 Some(&format!("distill_failed:{error}")),
2882 estimate_distill_prompt_tokens(log, &logs),
2883 0,
2884 )?;
2885 continue;
2886 }
2887 chunks_by_log.insert(log_id.to_string(), chunks);
2888 }
2889 Err(error) => {
2890 let note = format!("distill_failed:{error}");
2891 failed_logs.insert(log_id.to_string());
2892 distill_errors.push(format!("{log_id}: {error}"));
2893 self.finish_distill_log(
2894 log_id,
2895 "failed",
2896 Some(¬e),
2897 estimate_distill_prompt_tokens(log, &logs),
2898 0,
2899 )?;
2900 }
2901 }
2902 }
2903
2904 let mut count = 0;
2905 let provenance = self.distiller.provenance();
2906 for log in &logs {
2907 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
2908 if failed_logs.contains(log_id) {
2909 continue;
2910 }
2911 let prompt_tokens = estimate_distill_prompt_tokens(log, &logs);
2912 let chunks = chunks_by_log.remove(log_id).unwrap_or_default();
2913 let completion_tokens = chunks
2914 .iter()
2915 .map(estimate_distilled_chunk_tokens)
2916 .sum::<i64>();
2917 if chunks.is_empty() {
2918 self.finish_distill_log(
2919 log_id,
2920 "discarded",
2921 Some("insufficient_material"),
2922 prompt_tokens,
2923 completion_tokens,
2924 )?;
2925 continue;
2926 }
2927
2928 struct PreparedChunk {
2933 row: ChunkRow,
2934 cvec_bytes: Vec<u8>,
2935 tvec_bytes: Vec<u8>,
2936 }
2937 let mut prepared: Vec<PreparedChunk> = Vec::with_capacity(chunks.len());
2938 let mut embedding_failures = 0_usize;
2939 for dc in chunks {
2940 let (content, action) = self.sanitize_content(&dc.content);
2941 if action == SanitizeAction::Discard {
2942 continue; }
2944 let h = content_hash(&content);
2945 if self.storage.is_hash_invalidated(&h)? {
2946 continue; }
2948 let redacted = action == SanitizeAction::Redact;
2949 let conf = if redacted { 0.4 } else { 0.55 };
2950 let now2 = utc_now_iso();
2951 let chunk_id = gen_uuid();
2952 let tokens = estimate_tokens(&content) as i64;
2953 let row = ChunkRow {
2954 id: chunk_id,
2955 content: content.clone(),
2956 trigger_desc: dc.trigger_desc.clone(),
2957 anti_trigger_desc: dc.anti_trigger_desc,
2958 content_hash: h,
2959 token_count: Some(tokens),
2960 origin: "distilled".to_string(),
2961 distilled_from: Some(dc.source_log_id),
2962 distill_provider: provenance.provider.clone(),
2963 distill_model: provenance.model.clone(),
2964 distill_prompt_version: provenance.prompt_version.clone(),
2965 state: "pending".to_string(),
2966 state_reason: Some("init:distilled".to_string()),
2967 confidence: conf,
2968 confidence_reason: Some("init:distilled".to_string()),
2969 version: 1,
2970 embed_version: 1,
2971 created_at: now2.clone(),
2972 updated_at: now2,
2973 ..Default::default()
2974 };
2975 let cvec = match self.embedding.embed_content(&content) {
2976 Ok(v) => v,
2977 Err(_) => {
2978 embedding_failures += 1;
2979 continue;
2980 }
2981 };
2982 let tvec = match self
2983 .embedding
2984 .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
2985 {
2986 Ok(v) => v,
2987 Err(_) => {
2988 embedding_failures += 1;
2989 continue;
2990 }
2991 };
2992 prepared.push(PreparedChunk {
2993 row,
2994 cvec_bytes: pack_embedding(&cvec),
2995 tvec_bytes: pack_embedding(&tvec),
2996 });
2997 }
2998
2999 if prepared.is_empty() {
3000 let note = if embedding_failures > 0 {
3001 "embedding_failed"
3002 } else {
3003 "all_chunks_filtered"
3004 };
3005 self.finish_distill_log(
3006 log_id,
3007 if embedding_failures > 0 {
3008 "failed"
3009 } else {
3010 "discarded"
3011 },
3012 Some(note),
3013 prompt_tokens,
3014 completion_tokens,
3015 )?;
3016 continue;
3017 }
3018
3019 let accounted_at = utc_now_iso();
3021 self.storage.begin_immediate()?;
3022 let write_result = (|| -> Result<()> {
3023 for pc in &prepared {
3024 self.storage.insert_chunk(&pc.row)?;
3025 self.storage.insert_vec_content(&pc.row.id, &pc.cvec_bytes)?;
3026 self.storage.insert_vec_trigger(&pc.row.id, &pc.tvec_bytes)?;
3027 }
3028 let note = (embedding_failures > 0)
3029 .then(|| format!("partial_embedding_failures:{embedding_failures}"));
3030 self.storage.finish_distill_log(
3031 log_id,
3032 "distilled",
3033 note.as_deref(),
3034 prompt_tokens,
3035 completion_tokens,
3036 &accounted_at,
3037 )?;
3038 self.storage.commit()
3039 })();
3040 if let Err(error) = write_result {
3041 let _ = self.storage.rollback();
3042 let note = format!("distill_write_failed:{error}");
3043 self.finish_distill_log(
3044 log_id,
3045 "failed",
3046 Some(¬e),
3047 prompt_tokens,
3048 completion_tokens,
3049 )?;
3050 continue;
3051 }
3052 count += 1;
3053 }
3054 if !distill_errors.is_empty() {
3055 eprintln!(
3059 "[innate] distillation partial failure ({} log(s)): {}",
3060 distill_errors.len(),
3061 distill_errors.join("; ")
3062 );
3063 }
3064 Ok(count)
3065 }
3066
3067 fn finish_distill_log(
3068 &self,
3069 log_id: &str,
3070 state: &str,
3071 note: Option<&str>,
3072 prompt_tokens: i64,
3073 completion_tokens: i64,
3074 ) -> Result<()> {
3075 let accounted_at = utc_now_iso();
3076 self.storage.begin_immediate()?;
3077 let result = (|| -> Result<()> {
3078 self.storage.finish_distill_log(
3079 log_id,
3080 state,
3081 note,
3082 prompt_tokens,
3083 completion_tokens,
3084 &accounted_at,
3085 )?;
3086 self.storage.commit()
3087 })();
3088 if result.is_err() {
3089 let _ = self.storage.rollback();
3090 }
3091 result
3092 }
3093
3094 fn distill_token_period_start(&self, now: &str) -> Result<String> {
3095 let window_hours = self
3096 .storage
3097 .get_meta("evolve.distill_token_window_hours")?
3098 .and_then(|value| value.parse::<i64>().ok())
3099 .unwrap_or(24)
3100 .max(1);
3101 Ok(hours_ago(now, window_hours))
3102 }
3103
3104 pub(crate) fn builtin_curate_impl(&self, scope: &CurateScope) -> Result<CurateReport> {
3105 let mut report = CurateReport::default();
3106 let now_iso = utc_now_iso();
3107 if scope.dry_run {
3108 let archived_count: i64 = count_query(&self.storage,
3110 "SELECT COUNT(*) FROM chunks WHERE origin!='spark' AND protected=0 AND state='active'")?;
3111 report.stats.insert("dry_run".to_string(), json!(true));
3112 report
3113 .stats
3114 .insert("eligible_for_governance".to_string(), json!(archived_count));
3115 return Ok(report);
3116 }
3117
3118 self.storage.begin_immediate()?;
3120 let agg_result = (|| -> Result<()> {
3121 let cutoff_ts = now_iso.clone();
3122
3123 self.storage.conn_execute(
3125 "DELETE FROM chunk_success_traces",
3126 rusqlite::params![],
3127 )?;
3128 self.storage.conn_execute(
3129 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts)
3130 SELECT ut.chunk_id, ut.trace_id, MAX(ut.ts)
3131 FROM usage_trace ut
3132 WHERE ut.event = 'used'
3133 AND ut.chunk_id IS NOT NULL
3134 AND (
3135 EXISTS (SELECT 1 FROM usage_trace ok
3136 WHERE ok.trace_id = ut.trace_id
3137 AND ok.event = 'task_ok' AND ok.chunk_id IS NULL)
3138 OR EXISTS (SELECT 1 FROM episodic_log el
3139 WHERE el.trace_id = ut.trace_id AND el.outcome = 'ok')
3140 )
3141 GROUP BY ut.chunk_id, ut.trace_id",
3142 rusqlite::params![],
3143 )?;
3144
3145 self.storage.conn_execute(
3147 "WITH cst AS (
3148 SELECT chunk_id, COUNT(*) AS cnt, MAX(ts) AS max_ts
3149 FROM chunk_success_traces
3150 GROUP BY chunk_id
3151 )
3152 UPDATE chunks SET
3153 used_success_count = used_success_count_base
3154 + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3155 success_trace_ids_count = used_success_count_base
3156 + COALESCE((SELECT cnt FROM cst WHERE cst.chunk_id=chunks.id), 0),
3157 last_success_at = COALESCE(
3158 (SELECT max_ts FROM cst WHERE cst.chunk_id=chunks.id),
3159 last_success_at
3160 )
3161 WHERE origin!='spark'",
3162 rusqlite::params![],
3163 )?;
3164
3165 self.storage.conn_execute(
3167 "UPDATE chunks SET
3168 selected_count = selected_count_base + COALESCE(
3169 (SELECT COUNT(*) FROM usage_trace
3170 WHERE chunk_id = chunks.id AND event = 'selected'), 0),
3171 used_count = used_count_base + COALESCE(
3172 (SELECT COUNT(*) FROM usage_trace
3173 WHERE chunk_id = chunks.id AND event = 'used'), 0),
3174 last_used_at = COALESCE(
3175 (SELECT MAX(ts) FROM usage_trace
3176 WHERE chunk_id=chunks.id AND event='used'),
3177 last_used_base
3178 )
3179 WHERE origin!='spark'",
3180 rusqlite::params![],
3181 )?;
3182
3183 self.storage.set_meta("last_agg_ts", &cutoff_ts)?;
3187 self.storage.purge_usage_trace(&cutoff_ts)?;
3188 self.storage.commit()
3189 })();
3190 if agg_result.is_err() {
3191 let _ = self.storage.rollback();
3192 agg_result?;
3193 }
3194
3195 self.storage.begin_immediate()?;
3197 let recover_result = (|| -> Result<()> {
3198 let screening_cutoff = minutes_ago(&now_iso, self.screening_timeout_minutes);
3200 let stale = self.storage.query_chunks_params(
3201 "SELECT id, distill_run_id FROM episodic_log
3202 WHERE distill_state='screening' AND distill_locked_at < ?",
3203 rusqlite::params![screening_cutoff],
3204 )?;
3205 for row in &stale {
3206 let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3207 let run_id = row
3208 .get("distill_run_id")
3209 .and_then(Value::as_str)
3210 .unwrap_or("unknown");
3211 let note = format!("screening_timeout:{run_id}");
3212 self.storage.conn_execute(
3213 "UPDATE episodic_log
3214 SET distill_state='failed', distill_note=?,
3215 distill_attempts=distill_attempts+1,
3216 distill_last_failed_at=?,
3217 distill_run_id=NULL, distill_locked_at=NULL
3218 WHERE id=?",
3219 rusqlite::params![note, now_iso, id],
3220 )?;
3221 report.recovered.push(id.to_string());
3222 report
3223 .warnings
3224 .push(format!("stale screening recovered as failed: {id}"));
3225 }
3226
3227 let open_ttl_cutoff = days_ago(&now_iso, self.open_ttl_days);
3229 self.storage.conn_execute(
3230 "UPDATE episodic_log
3231 SET distill_state='discarded', distill_note='no_record_timeout',
3232 task_state='timed_out', completed_at=?
3233 WHERE distill_state='open' AND ts < ?",
3234 rusqlite::params![now_iso, open_ttl_cutoff],
3235 )?;
3236 self.storage.commit()
3237 })();
3238 if recover_result.is_err() {
3239 let _ = self.storage.rollback();
3240 recover_result?;
3241 }
3242
3243 let scope_origin = scope.origin.clone();
3245 let scope_skill = scope.skill_name.clone();
3246 self.storage.begin_immediate()?;
3247 let gov_result = (|| -> Result<()> {
3248 let governance_chunks = self
3251 .storage
3252 .query_chunks(
3253 "SELECT DISTINCT chunk_id FROM governance_proposals WHERE state='pending'",
3254 )?;
3255 for row in governance_chunks {
3256 if let Some(chunk_id) = row.get("chunk_id").and_then(Value::as_str) {
3257 self.refresh_governance_evidence(chunk_id, &now_iso)?;
3258 }
3259 }
3260
3261 let low_conf_cutoff = days_ago(&now_iso, self.low_conf_idle_days);
3263 let low_conf = self.storage.query_chunks_params(
3264 "SELECT id FROM chunks
3265 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3266 AND last_used_at IS NOT NULL
3267 AND confidence < ?
3268 AND last_used_at < ?
3269 AND (? IS NULL OR origin=?)
3270 AND (? IS NULL OR skill_name=?)",
3271 rusqlite::params![
3272 self.low_conf_threshold,
3273 low_conf_cutoff,
3274 scope_origin,
3275 scope_origin,
3276 scope_skill,
3277 scope_skill
3278 ],
3279 )?;
3280 for c in &low_conf {
3281 if let Some(id) = c.get("id").and_then(Value::as_str) {
3282 self.storage.update_chunk_state(
3283 id,
3284 "archived",
3285 Some("low_confidence"),
3286 &now_iso,
3287 )?;
3288 report.archived.push(id.to_string());
3289 }
3290 }
3291
3292 let rep_sel = self.storage.query_chunks_params(
3294 "SELECT id FROM chunks
3295 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3296 AND selected_count >= ? AND used_count = 0 AND confidence < ?
3297 AND (? IS NULL OR origin=?)
3298 AND (? IS NULL OR skill_name=?)",
3299 rusqlite::params![
3300 self.repeat_select_min,
3301 self.repeat_select_conf_max,
3302 scope_origin,
3303 scope_origin,
3304 scope_skill,
3305 scope_skill
3306 ],
3307 )?;
3308 for c in &rep_sel {
3309 if let Some(id) = c.get("id").and_then(Value::as_str) {
3310 if !report.archived.contains(&id.to_string()) {
3311 self.storage.update_chunk_state(
3312 id,
3313 "archived",
3314 Some("repeated_selected_unused"),
3315 &now_iso,
3316 )?;
3317 report.archived.push(id.to_string());
3318 }
3319 }
3320 }
3321
3322 let never_used_cutoff = days_ago(&now_iso, self.never_used_age_days);
3324 let never_used = self.storage.query_chunks_params(
3325 "SELECT id FROM chunks
3326 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3327 AND used_count = 0 AND selected_count = 0
3328 AND created_at < ?
3329 AND (? IS NULL OR origin=?)
3330 AND (? IS NULL OR skill_name=?)",
3331 rusqlite::params![
3332 never_used_cutoff,
3333 scope_origin,
3334 scope_origin,
3335 scope_skill,
3336 scope_skill
3337 ],
3338 )?;
3339 for c in &never_used {
3340 if let Some(id) = c.get("id").and_then(Value::as_str) {
3341 if !report.archived.contains(&id.to_string()) {
3342 self.storage.update_chunk_state(
3343 id,
3344 "archived",
3345 Some("never_used"),
3346 &now_iso,
3347 )?;
3348 report.archived.push(id.to_string());
3349 }
3350 }
3351 }
3352
3353 let gov_proposals = self.storage.query_chunks_params(
3357 "SELECT DISTINCT chunk_id FROM governance_proposals
3358 WHERE state='pending'
3359 AND evidence_score >= ? AND actor_count >= 2",
3360 rusqlite::params![self.governance_archive_threshold as f64],
3361 )?;
3362 for c in &gov_proposals {
3363 if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3364 let already_archived = report.archived.contains(&cid.to_string());
3365 let eligible = !already_archived && self.storage.get_chunk(cid)?.map(|ch| {
3366 ch.get("origin").and_then(Value::as_str) != Some("spark")
3367 && ch.get("protected").and_then(Value::as_i64).unwrap_or(0) == 0
3368 && matches!(
3369 ch.get("state").and_then(Value::as_str),
3370 Some("active") | Some("pending")
3371 )
3372 }).unwrap_or(false);
3373 if eligible {
3374 self.storage.update_chunk_state(
3375 cid,
3376 "archived",
3377 Some("governance_proposal"),
3378 &now_iso,
3379 )?;
3380 report.archived.push(cid.to_string());
3381 self.storage.conn_execute(
3382 "UPDATE governance_proposals
3383 SET state='accepted', updated_at=?
3384 WHERE chunk_id=? AND state='pending'",
3385 rusqlite::params![now_iso, cid],
3386 )?;
3387 } else {
3388 self.storage.conn_execute(
3389 "UPDATE governance_proposals
3390 SET state='rejected', updated_at=?
3391 WHERE chunk_id=? AND state='pending'",
3392 rusqlite::params![now_iso, cid],
3393 )?;
3394 }
3395 }
3396 }
3397
3398 let proposal_expiry_cutoff = days_ago(&now_iso, self.governance_proposal_max_age_days);
3402 self.storage.conn_execute(
3403 "UPDATE governance_proposals
3404 SET state='rejected', updated_at=?
3405 WHERE state='pending'
3406 AND evidence_score < ?
3407 AND created_at < ?",
3408 rusqlite::params![
3409 now_iso,
3410 self.governance_archive_threshold as f64,
3411 proposal_expiry_cutoff
3412 ],
3413 )?;
3414
3415 let neg_feedback_chunks = self.storage.query_chunks_params(
3419 "SELECT p.chunk_id FROM governance_proposals p
3420 JOIN chunks c ON c.id = p.chunk_id
3421 WHERE c.origin!='spark' AND c.protected=0
3422 AND c.state IN ('active','pending')
3423 AND p.state='pending'
3424 AND p.evidence_score >= ? AND p.actor_count >= 2
3425 AND (? IS NULL OR c.origin=?)
3426 AND (? IS NULL OR c.skill_name=?)
3427 GROUP BY p.chunk_id",
3428 rusqlite::params![
3429 self.negative_feedback_archive_threshold as f64,
3430 scope_origin,
3431 scope_origin,
3432 scope_skill,
3433 scope_skill
3434 ],
3435 )?;
3436 for c in &neg_feedback_chunks {
3437 if let Some(cid) = c.get("chunk_id").and_then(Value::as_str) {
3438 if !report.archived.contains(&cid.to_string()) {
3439 self.storage.update_chunk_state(
3440 cid,
3441 "archived",
3442 Some("sustained_negative_feedback"),
3443 &now_iso,
3444 )?;
3445 report.archived.push(cid.to_string());
3446 }
3447 }
3448 }
3449
3450 let high_fail_chunks = self.storage.query_chunks_params(
3454 "SELECT id FROM chunks
3455 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3456 AND used_count >= ?
3457 AND CAST(used_success_count AS REAL) / CAST(used_count AS REAL) < ?
3458 AND confidence < ?
3459 AND (? IS NULL OR origin=?)
3460 AND (? IS NULL OR skill_name=?)",
3461 rusqlite::params![
3462 self.failure_min_uses,
3463 self.failure_max_success_rate,
3464 self.failure_confidence_max,
3465 scope_origin, scope_origin, scope_skill, scope_skill
3466 ],
3467 )?;
3468 for c in &high_fail_chunks {
3469 if let Some(cid) = c.get("id").and_then(Value::as_str) {
3470 if !report.archived.contains(&cid.to_string()) {
3471 self.storage.update_chunk_state(
3472 cid, "archived", Some("sustained_task_failure"), &now_iso,
3473 )?;
3474 report.archived.push(cid.to_string());
3475 }
3476 }
3477 }
3478
3479 let dupes = self.storage.query_chunks_params(
3481 "SELECT content_hash FROM chunks
3482 WHERE origin!='spark' AND state IN ('active','pending')
3483 AND (? IS NULL OR origin=?)
3484 AND (? IS NULL OR skill_name=?)
3485 GROUP BY content_hash HAVING COUNT(*) > 1",
3486 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3487 )?;
3488 for d in &dupes {
3489 if let Some(h) = d.get("content_hash").and_then(Value::as_str) {
3490 let group = self.storage.query_chunks_params(
3491 "SELECT id, confidence, protected FROM chunks
3492 WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending')
3493 AND (? IS NULL OR origin=?)
3494 AND (? IS NULL OR skill_name=?)
3495 ORDER BY protected DESC, confidence DESC",
3496 rusqlite::params![h, scope_origin, scope_origin, scope_skill, scope_skill],
3497 )?;
3498 let canonical_id = group
3499 .first()
3500 .and_then(|row| row.get("id"))
3501 .and_then(Value::as_str)
3502 .unwrap_or("");
3503 for row in group.iter().skip(1) {
3504 let id = row.get("id").and_then(Value::as_str).unwrap_or("");
3505 let reason = format!("duplicate:{canonical_id}");
3506 self.storage
3507 .update_chunk_state(id, "archived", Some(&reason), &now_iso)?;
3508 self.storage.conn_execute(
3509 "UPDATE chunks SET parent_id=?, updated_at=? WHERE id=?",
3510 rusqlite::params![canonical_id, now_iso, id],
3511 )?;
3512 report.deduped.push(id.to_string());
3513 }
3514 }
3515 }
3516
3517 let decay_candidates = self.storage.query_chunks_params(
3522 "SELECT id, confidence, last_used_at, last_decayed_at FROM chunks
3523 WHERE origin!='spark' AND protected=0 AND state IN ('active','pending')
3524 AND last_used_at IS NOT NULL
3525 AND (? IS NULL OR origin=?)
3526 AND (? IS NULL OR skill_name=?)",
3527 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3528 )?;
3529 for c in &decay_candidates {
3530 let id = match c.get("id").and_then(Value::as_str) {
3531 Some(v) => v,
3532 None => continue,
3533 };
3534 let conf = c.get("confidence").and_then(Value::as_f64).unwrap_or(0.5);
3535 let last_used = c.get("last_used_at").and_then(Value::as_str).unwrap_or(&now_iso);
3536 let decay_ref = c
3538 .get("last_decayed_at")
3539 .and_then(Value::as_str)
3540 .filter(|s| *s > last_used)
3541 .unwrap_or(last_used);
3542 let delta_days = iso_days_diff(&now_iso, decay_ref);
3543 if delta_days <= 0 {
3544 continue;
3545 }
3546 let floor = self.decay_floor;
3547 let decay_alpha = 1.0 - 0.5_f64.powf(delta_days as f64 / 90.0);
3548 let new_conf = conf + decay_alpha * (floor - conf);
3549 if (new_conf - conf).abs() > 0.001 {
3550 let note = format!("decay:{delta_days}d");
3551 self.storage.upsert_confidence_evidence(
3552 &gen_uuid(),
3553 None,
3554 id,
3555 "decay",
3556 floor,
3557 decay_alpha,
3558 ¬e,
3559 None,
3560 &now_iso,
3561 )?;
3562 self.recompute_chunk_confidence(id, &now_iso)?;
3563 self.storage.update_chunk_last_decayed_at(id, &now_iso)?;
3564 report.decayed.push(id.to_string());
3565 }
3566 }
3567
3568 let promotable = self.storage.query_chunks_params(
3570 "SELECT id FROM chunks
3571 WHERE state='pending' AND origin!='spark'
3572 AND used_success_count >= ?
3573 AND success_trace_ids_count >= 2
3574 AND confidence >= ?
3575 AND (? IS NULL OR origin=?)
3576 AND (? IS NULL OR skill_name=?)",
3577 rusqlite::params![
3578 self.promote_used_success_min,
3579 self.promote_confidence_min,
3580 scope_origin,
3581 scope_origin,
3582 scope_skill,
3583 scope_skill
3584 ],
3585 )?;
3586 for c in &promotable {
3587 if let Some(id) = c.get("id").and_then(Value::as_str) {
3588 self.storage.update_chunk_state(
3589 id,
3590 "active",
3591 Some("repeated_success"),
3592 &now_iso,
3593 )?;
3594 }
3595 }
3596
3597 let all_deps = self
3599 .storage
3600 .query_chunks("SELECT src, dst FROM deps WHERE kind='hard'")?;
3601 let cycles = detect_cycles(&all_deps);
3602 report.cycles = cycles;
3603 let orphan_rows = self.storage.query_chunks_params(
3604 "SELECT d.src, d.dst, s.id AS src_exists, t.id AS dst_exists
3605 FROM deps d
3606 LEFT JOIN chunks s ON s.id=d.src
3607 LEFT JOIN chunks t ON t.id=d.dst
3608 WHERE d.kind='hard'
3609 AND (? IS NULL OR s.origin=?)
3610 AND (? IS NULL OR s.skill_name=?)",
3611 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
3612 )?;
3613 let mut orphans = HashSet::new();
3614 for row in orphan_rows {
3615 if row.get("src_exists").is_none_or(Value::is_null) {
3616 if let Some(id) = row.get("src").and_then(Value::as_str) {
3617 orphans.insert(id.to_string());
3618 }
3619 }
3620 if row.get("dst_exists").is_none_or(Value::is_null) {
3621 if let Some(id) = row.get("dst").and_then(Value::as_str) {
3622 orphans.insert(id.to_string());
3623 }
3624 }
3625 }
3626 report.orphans = orphans.into_iter().collect();
3627 report.orphans.sort();
3628
3629 self.rebuild_context_stats(&now_iso)?;
3633
3634 self.storage.commit()
3635 })();
3636 if gov_result.is_err() {
3637 let _ = self.storage.rollback();
3638 gov_result?;
3639 }
3640
3641 self.storage.begin_immediate()?;
3645 let purge_cutoff = days_ago(&now_iso, 30);
3646 let purge_result = self
3647 .storage
3648 .conn_execute(
3649 "UPDATE episodic_log
3650 SET query=NULL, recall_snapshot=NULL, output=NULL, output_summary=NULL,
3651 nomination=NULL,
3652 distill_note=COALESCE(distill_note, 'compacted')
3653 WHERE distill_state IN ('distilled','discarded','failed')
3654 AND ts < ?",
3655 rusqlite::params![purge_cutoff],
3656 )
3657 .and_then(|_| self.storage.commit());
3658 if purge_result.is_err() {
3659 let _ = self.storage.rollback();
3660 purge_result?;
3661 }
3662
3663 self.storage.begin_immediate()?;
3666 let evolve_req_cutoff = days_ago(&now_iso, 30);
3667 let prune_req_result = self
3668 .storage
3669 .conn_execute(
3670 "DELETE FROM evolve_requests
3671 WHERE state IN ('completed','failed') AND requested_at < ?",
3672 rusqlite::params![evolve_req_cutoff],
3673 )
3674 .and_then(|_| self.storage.commit());
3675 if prune_req_result.is_err() {
3676 let _ = self.storage.rollback();
3677 prune_req_result?;
3678 }
3679
3680 Ok(report)
3681 }
3682
3683 pub fn inspect(&self) -> Result<Value> {
3688 let total: i64 = count_query(
3689 &self.storage,
3690 "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
3691 )?;
3692 let active: i64 = count_query(
3693 &self.storage,
3694 "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
3695 )?;
3696 let pending: i64 = count_query(
3697 &self.storage,
3698 "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
3699 )?;
3700 let archived: i64 = count_query(
3701 &self.storage,
3702 "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
3703 )?;
3704 let sparks: i64 = count_query(
3705 &self.storage,
3706 "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
3707 )?;
3708 let open_logs: i64 = count_query(
3709 &self.storage,
3710 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
3711 )?;
3712 let new_logs: i64 = count_query(
3713 &self.storage,
3714 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
3715 )?;
3716 let embed_rebuild: i64 = count_query(&self.storage,
3717 "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
3718 let schema_version = self.storage.get_meta_or("schema_version", "?");
3719 let lib_id = self.storage.get_meta_or("lib_id", "?");
3720 let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
3721
3722 let metric_window_start = days_ago(&utc_now_iso(), 30);
3723 let trace_metrics = self.storage.query_chunks_params(
3724 "SELECT COUNT(*) AS total,
3725 SUM(CASE WHEN task_state='completed' THEN 1 ELSE 0 END) AS completed,
3726 SUM(CASE WHEN task_state='timed_out' THEN 1 ELSE 0 END) AS timed_out,
3727 SUM(CASE WHEN task_state='completed' AND usage_state!='unknown'
3728 THEN 1 ELSE 0 END) AS usage_known,
3729 SUM(CASE WHEN task_state='completed' AND usage_state='known_some'
3730 THEN 1 ELSE 0 END) AS usage_some,
3731 SUM(CASE WHEN outcome='ok' THEN 1 ELSE 0 END) AS succeeded
3732 FROM episodic_log WHERE ts >= ?",
3733 rusqlite::params![metric_window_start],
3734 )?;
3735 let trace_row = trace_metrics.first();
3736 let trace_total = trace_row
3737 .and_then(|row| row.get("total"))
3738 .and_then(Value::as_i64)
3739 .unwrap_or(0);
3740 let trace_completed = trace_row
3741 .and_then(|row| row.get("completed"))
3742 .and_then(Value::as_i64)
3743 .unwrap_or(0);
3744 let trace_timed_out = trace_row
3745 .and_then(|row| row.get("timed_out"))
3746 .and_then(Value::as_i64)
3747 .unwrap_or(0);
3748 let usage_known = trace_row
3749 .and_then(|row| row.get("usage_known"))
3750 .and_then(Value::as_i64)
3751 .unwrap_or(0);
3752 let usage_some = trace_row
3753 .and_then(|row| row.get("usage_some"))
3754 .and_then(Value::as_i64)
3755 .unwrap_or(0);
3756 let succeeded = trace_row
3757 .and_then(|row| row.get("succeeded"))
3758 .and_then(Value::as_i64)
3759 .unwrap_or(0);
3760 let usage_rows = self.storage.query_chunks_params(
3761 "SELECT recall_snapshot, used_ids FROM episodic_log
3762 WHERE usage_state!='unknown' AND used_complete=1
3763 AND recall_snapshot IS NOT NULL AND used_ids IS NOT NULL
3764 AND ts >= ?",
3765 rusqlite::params![metric_window_start],
3766 )?;
3767 let mut selected_total = 0_i64;
3768 let mut selected_used = 0_i64;
3769 for row in usage_rows {
3770 let selected: HashSet<String> = row
3771 .get("recall_snapshot")
3772 .and_then(Value::as_str)
3773 .and_then(|raw| serde_json::from_str::<Value>(raw).ok())
3774 .and_then(|snapshot| snapshot.get("selected").cloned())
3775 .and_then(|value| serde_json::from_value::<Vec<String>>(value).ok())
3776 .unwrap_or_default()
3777 .into_iter()
3778 .collect();
3779 let used: HashSet<String> = row
3780 .get("used_ids")
3781 .and_then(Value::as_str)
3782 .and_then(|raw| serde_json::from_str::<Vec<String>>(raw).ok())
3783 .unwrap_or_default()
3784 .into_iter()
3785 .collect();
3786 selected_total += selected.len() as i64;
3787 selected_used += selected.intersection(&used).count() as i64;
3788 }
3789 let feedback_count = count_query_params(
3790 &self.storage,
3791 "SELECT COUNT(*) FROM feedback_events WHERE ts >= ?",
3792 rusqlite::params![metric_window_start],
3793 )?;
3794 let feedback_traces = count_query_params(
3795 &self.storage,
3796 "SELECT COUNT(DISTINCT f.trace_id)
3797 FROM feedback_events f
3798 JOIN episodic_log e ON e.trace_id=f.trace_id
3799 WHERE f.ts >= ? AND e.ts >= ? AND e.task_state='completed'",
3800 rusqlite::params![metric_window_start, metric_window_start],
3801 )?;
3802 let pending_evolve = count_query(
3803 &self.storage,
3804 "SELECT COUNT(*) FROM evolve_requests WHERE state IN ('pending','running')",
3805 )?;
3806 let governance_pending = count_query(
3807 &self.storage,
3808 "SELECT COUNT(*) FROM governance_proposals WHERE state='pending'",
3809 )?;
3810 let failed_evolve = count_query_params(
3811 &self.storage,
3812 "SELECT COUNT(*) FROM evolve_requests
3813 WHERE last_failed_at >= ?",
3814 rusqlite::params![metric_window_start],
3815 )?;
3816 let failed_distill = count_query_params(
3817 &self.storage,
3818 "SELECT COUNT(*) FROM episodic_log
3819 WHERE distill_last_failed_at >= ?",
3820 rusqlite::params![metric_window_start],
3821 )?;
3822 let confidence_buckets = self.storage.query_chunks(
3823 &format!("SELECT
3824 SUM(CASE WHEN confidence < 0.25 THEN 1 ELSE 0 END) AS low,
3825 SUM(CASE WHEN confidence >= 0.25 AND confidence < {0} THEN 1 ELSE 0 END) AS medium,
3826 SUM(CASE WHEN confidence >= {0} THEN 1 ELSE 0 END) AS high
3827 FROM chunks WHERE origin!='spark' AND state!='archived'",
3828 self.promote_confidence_min),
3829 )?;
3830 let confidence_row = confidence_buckets.first();
3831
3832 let pending_oldest_ts = self.storage.query_chunks(
3834 "SELECT MIN(created_at) AS oldest FROM chunks WHERE state='pending' AND origin!='spark'",
3835 )?.into_iter().next()
3836 .and_then(|r| r.get("oldest").cloned())
3837 .and_then(|v| if v.is_null() { None } else { Some(v) });
3838
3839 let zombie_cutoff = days_ago(&utc_now_iso(), 14);
3844 let zombie: i64 = count_query_params(
3845 &self.storage,
3846 "SELECT COUNT(*) FROM chunks
3847 WHERE origin!='spark' AND state='active'
3848 AND confidence >= 0.4 AND confidence <= 0.6
3849 AND last_used_at IS NOT NULL
3850 AND created_at < ?",
3851 rusqlite::params![zombie_cutoff],
3852 )?;
3853 let debt_numerator = pending + zombie;
3854 let debt_denominator = active.max(1);
3855 let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
3856
3857 let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
3859 let stale_screening: i64 = count_query_params(
3860 &self.storage,
3861 "SELECT COUNT(*) FROM episodic_log
3862 WHERE distill_state='screening' AND distill_locked_at < ?",
3863 rusqlite::params![screening_cutoff],
3864 )?;
3865
3866 let distill_period_start = self.distill_token_period_start(&utc_now_iso())?;
3868 let distill_cost = self.storage.query_chunks_params(
3869 "SELECT COALESCE(SUM(distill_prompt_tokens),0) AS pt,
3870 COALESCE(SUM(distill_completion_tokens),0) AS ct
3871 FROM episodic_log
3872 WHERE distill_accounted_at >= ?",
3873 rusqlite::params![distill_period_start],
3874 )?;
3875 let prompt_tokens = distill_cost
3876 .first()
3877 .and_then(|r| r.get("pt"))
3878 .and_then(Value::as_i64)
3879 .unwrap_or(0);
3880 let completion_tokens = distill_cost
3881 .first()
3882 .and_then(|r| r.get("ct"))
3883 .and_then(Value::as_i64)
3884 .unwrap_or(0);
3885
3886 let spark_threshold: i64 = self
3888 .storage
3889 .get_meta("curate.soft_mature_threshold")
3890 .ok()
3891 .flatten()
3892 .and_then(|v| v.parse::<i64>().ok())
3893 .unwrap_or(5);
3894 let recurring_sparks = self.storage.query_chunks_params(
3895 "SELECT ut.chunk_id, COUNT(*) AS cnt,
3896 c.content, c.trigger_desc, c.maturity
3897 FROM usage_trace ut
3898 JOIN chunks c ON c.id = ut.chunk_id
3899 WHERE ut.event='retrieved'
3900 AND c.origin='spark'
3901 GROUP BY ut.chunk_id HAVING cnt >= ?",
3902 rusqlite::params![spark_threshold],
3903 )?;
3904 let recurring_spark_ids: Vec<Value> = recurring_sparks
3905 .iter()
3906 .map(|r| {
3907 json!({
3908 "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
3909 "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
3910 "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
3911 "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
3912 .chars().take(80).collect::<String>(),
3913 })
3914 })
3915 .collect();
3916
3917 let mut suggestions: Vec<Value> = Vec::new();
3918 if embed_rebuild > 0 {
3919 suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
3920 }
3921 if new_logs > 0 {
3922 suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
3923 }
3924 if pending > 0 {
3925 suggestions.push(json!({"action": "innate approve <id> # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
3926 }
3927 if !recurring_spark_ids.is_empty() {
3928 suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
3929 }
3930 if stale_screening > 0 {
3931 suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
3932 }
3933 if governance_pending > 0 {
3934 suggestions.push(json!({
3935 "action": "review governance_proposals",
3936 "reason": format!("{governance_pending} chunk(s) have repeated negative feedback")
3937 }));
3938 }
3939
3940 Ok(json!({
3941 "schema_version": schema_version,
3942 "lib_id": lib_id,
3943 "last_agg_ts": last_agg,
3944 "chunks": {
3945 "total": total, "active": active, "pending": pending, "archived": archived,
3946 "pending_oldest_ts": pending_oldest_ts,
3947 },
3948 "sparks": sparks,
3949 "episodic_log": {"open": open_logs, "new": new_logs},
3950 "embed_rebuild_queue": embed_rebuild,
3951 "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
3952 "stale_screening_count": stale_screening,
3953 "feedback_loop": {
3954 "trace_completion_rate": ratio(trace_completed, trace_total),
3955 "usage_annotation_rate": ratio(usage_known, trace_completed),
3956 "trace_use_rate": ratio(usage_some, usage_known),
3957 "selected_to_used_rate": ratio(selected_used, selected_total),
3958 "task_success_rate": ratio(succeeded, trace_completed),
3959 "feedback_coverage": ratio(feedback_traces, trace_completed),
3960 "feedback_events": feedback_count,
3961 "timed_out_traces": trace_timed_out,
3962 "pending_evolve_requests": pending_evolve,
3963 "failed_evolve_requests_30d": failed_evolve,
3964 "failed_distill_logs_30d": failed_distill,
3965 "pending_governance_proposals": governance_pending,
3966 "window_days": 30,
3967 "confidence_distribution": {
3968 "low": confidence_row.and_then(|row| row.get("low")).and_then(Value::as_i64).unwrap_or(0),
3969 "medium": confidence_row.and_then(|row| row.get("medium")).and_then(Value::as_i64).unwrap_or(0),
3970 "high": confidence_row.and_then(|row| row.get("high")).and_then(Value::as_i64).unwrap_or(0),
3971 }
3972 },
3973 "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
3974 "recurring_sparks": recurring_sparks.len(),
3975 "recurring_spark_ids": recurring_spark_ids,
3976 "params": {
3977 "recall.w_content": self.w_content,
3978 "recall.w_trigger": self.w_trigger,
3979 "recall.w_context": self.w_context,
3980 "recall.top_k_candidates": self.top_k_candidates,
3981 "curate.low_conf_threshold": self.low_conf_threshold,
3982 "curate.low_conf_idle_days": self.low_conf_idle_days,
3983 "curate.repeat_select_min": self.repeat_select_min,
3984 "curate.never_used_age_days": self.never_used_age_days,
3985 "curate.promote_used_success_min": self.promote_used_success_min,
3986 "curate.promote_confidence_min": self.promote_confidence_min,
3987 "curate.screening_timeout_minutes": self.screening_timeout_minutes,
3988 "curate.open_ttl_days": self.open_ttl_days,
3989 "evolve.schedule_interval_hours": self.evolve_schedule_interval_hours,
3990 },
3991 "suggestions": suggestions
3992 }))
3993 }
3994
3995 pub fn rebuild_embeddings(&self) -> Result<usize> {
4000 let meta_version = self
4001 .storage
4002 .get_meta("embed_version")?
4003 .and_then(|v| v.parse::<i64>().ok())
4004 .unwrap_or(1);
4005 let stale = self.storage.query_chunks_params(
4007 "SELECT id, content, trigger_desc, state_reason FROM chunks
4008 WHERE embed_version = 0 OR embed_version < ?",
4009 rusqlite::params![meta_version],
4010 )?;
4011 let mut count = 0;
4012 for row in &stale {
4013 let id = match row.get("id").and_then(Value::as_str) {
4014 Some(v) => v,
4015 None => continue,
4016 };
4017 let content = row.get("content").and_then(Value::as_str).unwrap_or("");
4018 let trigger = row
4019 .get("trigger_desc")
4020 .and_then(Value::as_str)
4021 .unwrap_or(content);
4022 let state_reason = row
4023 .get("state_reason")
4024 .and_then(Value::as_str)
4025 .unwrap_or("");
4026
4027 let cvec = match self.embedding.embed_content(content) {
4028 Ok(v) => v,
4029 Err(_) => continue,
4030 };
4031 let tvec = match self.embedding.embed_trigger(trigger) {
4032 Ok(v) => v,
4033 Err(_) => continue,
4034 };
4035
4036 self.storage.begin_immediate()?;
4037 let r = (|| -> Result<()> {
4038 self.storage
4039 .insert_vec_content(id, &pack_embedding(&cvec))?;
4040 self.storage
4041 .insert_vec_trigger(id, &pack_embedding(&tvec))?;
4042 let new_reason = if state_reason.starts_with("embedding_pending:target=") {
4044 let target_state = state_reason.trim_start_matches("embedding_pending:target=");
4045 let now = utc_now_iso();
4046 self.storage.update_chunk_state(
4047 id,
4048 target_state,
4049 Some("embedding_rebuilt"),
4050 &now,
4051 )?;
4052 "embedding_rebuilt".to_string()
4053 } else {
4054 "embedding_rebuilt".to_string()
4055 };
4056 let now = utc_now_iso();
4057 self.storage.conn_execute(
4058 "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
4059 rusqlite::params![meta_version, new_reason, now, id],
4060 )?;
4061 self.storage.commit()
4062 })();
4063 if r.is_err() {
4064 let _ = self.storage.rollback();
4065 } else {
4066 count += 1;
4067 }
4068 }
4069 Ok(count)
4070 }
4071
4072 pub fn inspect_id(&self, id: &str) -> Result<Value> {
4077 if let Some(chunk) = self.storage.get_chunk(id)? {
4079 let traces = self.storage.query_chunks_params(
4080 "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
4081 rusqlite::params![id],
4082 )?;
4083 let derived = self.storage.query_chunks_params(
4084 "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
4085 SELECT id FROM episodic_log WHERE trace_id IN (
4086 SELECT trace_id FROM usage_trace WHERE chunk_id=?
4087 )
4088 ) LIMIT 10",
4089 rusqlite::params![id],
4090 )?;
4091 return Ok(json!({
4092 "kind": "chunk",
4093 "chunk": chunk,
4094 "recent_traces": traces,
4095 "derived_chunks": derived,
4096 }));
4097 }
4098 if let Some(log) = self.storage.get_episodic_log(id)? {
4100 let traces = self.storage.query_chunks_params(
4101 "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
4102 rusqlite::params![id],
4103 )?;
4104 return Ok(json!({
4105 "kind": "trace",
4106 "episodic_log": log,
4107 "usage_traces": traces,
4108 }));
4109 }
4110 Err(InnateError::ChunkNotFound(id.to_string()))
4111 }
4112
4113 fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
4118 self.sanitizer.sanitize(content)
4119 }
4120}
4121
4122struct CandidateInfo {
4127 chunk: Value,
4128 sim_content: f32,
4129 sim_trigger: f32,
4130}
4131
4132fn chunk_is_valid_for_recall(chunk: &Value, embed_version: i64) -> bool {
4133 chunk.get("state").and_then(Value::as_str) != Some("archived")
4134 && chunk.get("origin").and_then(Value::as_str) != Some("spark")
4135 && chunk
4136 .get("embed_version")
4137 .and_then(Value::as_i64)
4138 .unwrap_or(1)
4139 >= embed_version
4140}
4141
4142fn normalize_query(query: &str) -> String {
4151 const STOP_WORDS: &[&str] = &[
4152 "a", "an", "and", "for", "in", "of", "on", "the", "to", "with",
4153 ];
4154 let cleaned: String = query
4155 .to_lowercase()
4156 .chars()
4157 .map(|ch| {
4158 if ch.is_alphanumeric() || ch.is_whitespace() {
4159 ch
4160 } else {
4161 ' '
4162 }
4163 })
4164 .collect();
4165 let mut tokens: Vec<&str> = cleaned
4166 .split_whitespace()
4167 .filter(|token| !STOP_WORDS.contains(token))
4168 .collect();
4169 tokens.sort_unstable();
4170 tokens.dedup();
4171 tokens.join(" ")
4172}
4173
4174fn estimate_distill_prompt_tokens(log: &Value, related_logs: &[Value]) -> i64 {
4175 let primary: i64 = [
4176 "query",
4177 "recall_snapshot",
4178 "output",
4179 "output_summary",
4180 "nomination",
4181 ]
4182 .iter()
4183 .filter_map(|key| log.get(*key).and_then(Value::as_str))
4184 .map(|text| estimate_tokens(text) as i64)
4185 .sum();
4186 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
4187 let related: i64 = related_logs
4188 .iter()
4189 .filter(|other| other.get("id").and_then(Value::as_str).unwrap_or("") != log_id)
4190 .take(4)
4191 .flat_map(|other| {
4192 ["query", "output_summary", "outcome"]
4193 .into_iter()
4194 .filter_map(|key| other.get(key).and_then(Value::as_str))
4195 })
4196 .map(|text| estimate_tokens(text) as i64)
4197 .sum();
4198 primary + related
4199}
4200
4201fn estimate_distilled_chunk_tokens(chunk: &DistilledChunk) -> i64 {
4202 estimate_tokens(&chunk.content) as i64
4203 + chunk
4204 .trigger_desc
4205 .as_deref()
4206 .map(estimate_tokens)
4207 .unwrap_or(0) as i64
4208 + chunk
4209 .anti_trigger_desc
4210 .as_deref()
4211 .map(estimate_tokens)
4212 .unwrap_or(0) as i64
4213}
4214
4215fn anti_trigger_hit(query: &str, anti: &str) -> bool {
4216 let q_lower = query.to_lowercase();
4217 anti.to_lowercase().split(',').any(|part| {
4218 let p = part.trim();
4219 !p.is_empty() && q_lower.contains(p)
4220 })
4221}
4222
4223fn block_cost(block: &[Value]) -> usize {
4224 block
4225 .iter()
4226 .map(|b| {
4227 b.get("token_count")
4228 .and_then(Value::as_u64)
4229 .map(|t| t as usize)
4230 .unwrap_or_else(|| {
4231 estimate_tokens(b.get("content").and_then(Value::as_str).unwrap_or("")).max(100)
4232 })
4233 })
4234 .sum()
4235}
4236
4237fn limit_knowledge(knowledge: Vec<Value>, top: Option<usize>) -> Vec<Value> {
4238 match top {
4239 None => knowledge,
4240 Some(0) => vec![],
4241 Some(n) => knowledge.into_iter().take(n).collect(),
4242 }
4243}
4244
4245fn usage_state(used: Option<&[String]>) -> &'static str {
4246 match used {
4247 None => "unknown",
4248 Some([]) => "known_none",
4249 Some(_) => "known_some",
4250 }
4251}
4252
4253fn ratio(numerator: i64, denominator: i64) -> f64 {
4254 if denominator <= 0 {
4255 0.0
4256 } else {
4257 ((numerator as f64 / denominator as f64) * 1000.0).round() / 1000.0
4258 }
4259}
4260
4261fn validate_source(source: &str) -> Result<()> {
4262 if !matches!(
4263 source,
4264 "mcp" | "sdk" | "cli" | "hook" | "daemon" | "augmented"
4265 ) {
4266 return Err(InnateError::InvalidState(format!(
4267 "invalid event source: {source}"
4268 )));
4269 }
4270 Ok(())
4271}
4272
4273fn count_query(storage: &Storage, sql: &str) -> Result<i64> {
4274 Ok(storage
4275 .query_chunks(sql)?
4276 .first()
4277 .and_then(|r| r.as_object())
4278 .and_then(|m| m.values().next())
4279 .and_then(Value::as_i64)
4280 .unwrap_or(0))
4281}
4282
4283fn count_query_params<P: rusqlite::Params>(storage: &Storage, sql: &str, p: P) -> Result<i64> {
4284 Ok(storage
4285 .query_chunks_params(sql, p)?
4286 .first()
4287 .and_then(|r| r.as_object())
4288 .and_then(|m| m.values().next())
4289 .and_then(Value::as_i64)
4290 .unwrap_or(0))
4291}
4292
4293fn days_ago(now_iso: &str, days: i64) -> String {
4294 use chrono::{DateTime, Duration, Utc};
4295 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4296 let cutoff = t - Duration::days(days);
4297 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4298 }
4299 now_iso.to_string()
4300}
4301
4302fn minutes_ago(now_iso: &str, minutes: i64) -> String {
4303 use chrono::{DateTime, Duration, Utc};
4304 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4305 let cutoff = t - Duration::minutes(minutes);
4306 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4307 }
4308 now_iso.to_string()
4309}
4310
4311fn hours_ago(now_iso: &str, hours: i64) -> String {
4312 use chrono::{DateTime, Duration, Utc};
4313 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
4314 let cutoff = t - Duration::hours(hours);
4315 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
4316 }
4317 now_iso.to_string()
4318}
4319
4320fn iso_days_diff(now_iso: &str, past_iso: &str) -> i64 {
4322 use chrono::{DateTime, Utc};
4323 let parse = |s: &str| s.parse::<DateTime<Utc>>().ok();
4324 if let (Some(a), Some(b)) = (parse(now_iso), parse(past_iso)) {
4325 let diff = a - b;
4326 diff.num_days().max(0)
4327 } else {
4328 0
4329 }
4330}
4331
4332fn detect_cycles(deps: &[Value]) -> Vec<Vec<String>> {
4334 use std::collections::HashMap;
4335 let mut adj: HashMap<String, Vec<String>> = HashMap::new();
4336 for d in deps {
4337 let src = d
4338 .get("src")
4339 .and_then(Value::as_str)
4340 .unwrap_or("")
4341 .to_string();
4342 let dst = d
4343 .get("dst")
4344 .and_then(Value::as_str)
4345 .unwrap_or("")
4346 .to_string();
4347 if !src.is_empty() && !dst.is_empty() {
4348 adj.entry(src).or_default().push(dst);
4349 }
4350 }
4351 let nodes: Vec<String> = adj.keys().cloned().collect();
4352 let mut visited: HashSet<String> = HashSet::new();
4353 let mut on_stack: HashSet<String> = HashSet::new();
4354 let mut cycles: Vec<Vec<String>> = vec![];
4355
4356 fn dfs(
4357 node: &str,
4358 adj: &HashMap<String, Vec<String>>,
4359 visited: &mut HashSet<String>,
4360 on_stack: &mut HashSet<String>,
4361 path: &mut Vec<String>,
4362 cycles: &mut Vec<Vec<String>>,
4363 ) {
4364 if on_stack.contains(node) {
4365 let start = path.iter().position(|n| n == node).unwrap_or(0);
4367 cycles.push(path[start..].to_vec());
4368 return;
4369 }
4370 if visited.contains(node) {
4371 return;
4372 }
4373 visited.insert(node.to_string());
4374 on_stack.insert(node.to_string());
4375 path.push(node.to_string());
4376 if let Some(children) = adj.get(node) {
4377 for child in children {
4378 dfs(child, adj, visited, on_stack, path, cycles);
4379 }
4380 }
4381 path.pop();
4382 on_stack.remove(node);
4383 }
4384
4385 for node in nodes {
4386 let mut path = vec![];
4387 dfs(
4388 &node,
4389 &adj,
4390 &mut visited,
4391 &mut on_stack,
4392 &mut path,
4393 &mut cycles,
4394 );
4395 }
4396 cycles
4397}