1type PackResult = (
5 Vec<Value>,
6 Vec<(Vec<Value>, f64, usize)>,
7 std::collections::HashMap<String, String>,
8);
9
10use std::collections::{HashMap, HashSet};
11use std::path::Path;
12use std::sync::Arc;
13
14use serde_json::{json, Value};
15
16use crate::embedding::{DummyEmbeddingProvider, EmbeddingProvider};
17use crate::errors::{InnateError, Result};
18use crate::refine::{
19 DefaultSanitizer, DistilledChunk, Distiller, HeuristicDistiller, NullRefiner, Refiner,
20 Sanitizer,
21};
22use crate::storage::{ChunkRow, EpisodicLogRow, Storage};
23use crate::utils::{
24 content_hash, estimate_tokens, gen_uuid, pack_embedding, utc_now_iso, SanitizeAction,
25};
26
27const W_CONTENT: f64 = 0.65;
32const W_TRIGGER: f64 = 0.25;
33const W_CONFIDENCE: f64 = 0.10;
34const TOP_K_CANDIDATES: usize = 20;
35const ANTI_TRIGGER_PENALTY: f64 = 0.6;
36const DENSITY_REFILL: bool = true;
37
38const LOW_CONF_THRESHOLD: f64 = 0.25;
39const LOW_CONF_IDLE_DAYS: i64 = 60;
40const REPEAT_SELECT_MIN: i64 = 10;
41const REPEAT_SELECT_CONF_MAX: f64 = 0.5;
42const NEVER_USED_AGE_DAYS: i64 = 30;
43const OPEN_TTL_DAYS: i64 = 7;
44const SCREENING_TIMEOUT_MINUTES: i64 = 30;
45const PROMOTE_USED_SUCCESS_MIN: i64 = 3;
46const PROMOTE_CONFIDENCE_MIN: f64 = 0.65;
47const EVOLVE_THRESHOLD: i64 = 5;
48const DISTILL_BATCH_SIZE: usize = 20;
49
50#[derive(Debug, Default, Clone)]
55pub struct RecallResult {
56 pub knowledge: Vec<Value>,
57 pub sparks: Vec<Value>,
58 pub trace_id: String,
59 pub empty: bool,
60 pub depth_skipped: Vec<String>,
61 pub skipped_reasons: HashMap<String, String>,
62}
63
64#[derive(Debug, Default)]
65pub struct CurateReport {
66 pub archived: Vec<String>,
67 pub deduped: Vec<String>,
68 pub decayed: Vec<String>,
69 pub cycles: Vec<Vec<String>>,
70 pub orphans: Vec<String>,
71 pub recovered: Vec<String>,
72 pub warnings: Vec<String>,
73 pub stats: HashMap<String, Value>,
74}
75
76#[derive(Debug, Default, Clone)]
78pub struct CurateScope {
79 pub origin: Option<String>,
81 pub skill_name: Option<String>,
83 pub dry_run: bool,
85}
86
87pub trait Curator: Send + Sync {
90 fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport>;
91}
92
93pub struct BuiltinCurator;
95
96impl Curator for BuiltinCurator {
97 fn run(&self, kb: &KnowledgeBase, scope: &CurateScope) -> Result<CurateReport> {
98 kb.builtin_curate_impl(scope)
99 }
100}
101
102pub struct KnowledgeBase {
107 pub storage: Storage,
108 embedding: Arc<dyn EmbeddingProvider>,
109 refiner: Arc<dyn Refiner>,
110 distiller: Arc<dyn Distiller>,
111 curator: Arc<dyn Curator>,
112 sanitizer: Arc<dyn Sanitizer>,
113
114 w_content: f64,
116 w_trigger: f64,
117 w_confidence: f64,
118 top_k_candidates: usize,
119 anti_trigger_penalty: f64,
120 density_refill: bool,
121
122 low_conf_threshold: f64,
123 low_conf_idle_days: i64,
124 repeat_select_min: i64,
125 repeat_select_conf_max: f64,
126 never_used_age_days: i64,
127 open_ttl_days: i64,
128 screening_timeout_minutes: i64,
129 promote_used_success_min: i64,
130 promote_confidence_min: f64,
131 evolve_threshold: i64,
132 distill_batch_size: usize,
133}
134
135impl KnowledgeBase {
136 pub fn open(db_path: impl AsRef<Path>) -> Result<Self> {
137 Self::open_with(db_path, None, None, None, None, None)
138 }
139
140 pub fn open_with(
141 db_path: impl AsRef<Path>,
142 embedding: Option<Arc<dyn EmbeddingProvider>>,
143 refiner: Option<Arc<dyn Refiner>>,
144 distiller: Option<Arc<dyn Distiller>>,
145 curator: Option<Arc<dyn Curator>>,
146 sanitizer: Option<Arc<dyn Sanitizer>>,
147 ) -> Result<Self> {
148 let embedding = embedding.unwrap_or_else(|| Arc::new(DummyEmbeddingProvider::default()));
149 let refiner = refiner.unwrap_or_else(|| Arc::new(NullRefiner));
150 let distiller = distiller.unwrap_or_else(|| Arc::new(HeuristicDistiller));
151 let curator = curator.unwrap_or_else(|| Arc::new(BuiltinCurator));
152 let sanitizer = sanitizer.unwrap_or_else(|| Arc::new(DefaultSanitizer));
153
154 let storage = Storage::open(db_path, embedding.content_dim(), embedding.trigger_dim())?;
155
156 let mut kb = Self {
157 storage,
158 embedding,
159 refiner,
160 distiller,
161 curator,
162 sanitizer,
163 w_content: W_CONTENT,
164 w_trigger: W_TRIGGER,
165 w_confidence: W_CONFIDENCE,
166 top_k_candidates: TOP_K_CANDIDATES,
167 anti_trigger_penalty: ANTI_TRIGGER_PENALTY,
168 density_refill: DENSITY_REFILL,
169 low_conf_threshold: LOW_CONF_THRESHOLD,
170 low_conf_idle_days: LOW_CONF_IDLE_DAYS,
171 repeat_select_min: REPEAT_SELECT_MIN,
172 repeat_select_conf_max: REPEAT_SELECT_CONF_MAX,
173 never_used_age_days: NEVER_USED_AGE_DAYS,
174 open_ttl_days: OPEN_TTL_DAYS,
175 screening_timeout_minutes: SCREENING_TIMEOUT_MINUTES,
176 promote_used_success_min: PROMOTE_USED_SUCCESS_MIN,
177 promote_confidence_min: PROMOTE_CONFIDENCE_MIN,
178 evolve_threshold: EVOLVE_THRESHOLD,
179 distill_batch_size: DISTILL_BATCH_SIZE,
180 };
181 kb.init_meta()?;
182 kb.load_params()?;
183 Ok(kb)
184 }
185
186 fn init_meta(&self) -> Result<()> {
187 let lib_id = gen_uuid();
188 let content_dim = self.embedding.content_dim().to_string();
189 let trigger_dim = self.embedding.trigger_dim().to_string();
190 let embed_model = self.embedding.model_name();
191
192 for (key, expected) in [
193 ("content_dim", self.embedding.content_dim()),
194 ("trigger_dim", self.embedding.trigger_dim()),
195 ] {
196 if let Some(stored) = self.storage.get_meta(key)? {
197 let actual = stored.parse::<usize>().map_err(|_| {
198 InnateError::Other(format!("invalid {key} metadata value: {stored}"))
199 })?;
200 if actual != expected {
201 return Err(InnateError::Other(format!(
202 "{key} mismatch: database uses {actual}, embedding provider uses {expected}"
203 )));
204 }
205 }
206 }
207
208 let defaults: &[(&str, &str)] = &[
209 ("lib_id", &lib_id),
210 ("lib_role", "personal"),
211 ("schema_version", "4.5.1"),
212 ("content_dim", &content_dim),
213 ("trigger_dim", &trigger_dim),
214 ("embed_model", embed_model),
215 ("embed_version", "1"),
216 ("last_agg_ts", "1970-01-01T00:00:00.000Z"),
217 ("recall.w_content", "0.65"),
218 ("recall.w_trigger", "0.25"),
219 ("recall.w_confidence", "0.10"),
220 ("recall.top_k_candidates", "20"),
221 ("recall.anti_trigger_penalty", "0.6"),
222 ("recall.density_refill", "true"),
223 ("curate.low_conf_threshold", "0.25"),
224 ("curate.low_conf_idle_days", "60"),
225 ("curate.repeat_select_min", "10"),
226 ("curate.repeat_select_conf_max", "0.5"),
227 ("curate.never_used_age_days", "30"),
228 ("curate.open_ttl_days", "7"),
229 ("curate.screening_timeout_minutes", "30"),
230 ("curate.promote_used_success_min", "3"),
231 ("curate.promote_confidence_min", "0.65"),
232 ("evolve.threshold_new_count", "5"),
233 ("evolve.distill_batch_size", "20"),
234 ("curate.soft_mature_threshold", "5"),
235 ];
236 self.storage.begin_immediate()?;
237 let result = (|| -> Result<()> {
238 for (k, v) in defaults {
239 if self.storage.get_meta(k)?.is_none() {
240 self.storage.set_meta(k, v)?;
241 }
242 }
243 self.storage.commit()
244 })();
245 if result.is_err() {
246 let _ = self.storage.rollback();
247 }
248 result
249 }
250
251 fn load_params(&mut self) -> Result<()> {
252 let f = |k: &str, d: f64| -> f64 {
253 self.storage
254 .get_meta(k)
255 .ok()
256 .flatten()
257 .and_then(|v| v.parse().ok())
258 .unwrap_or(d)
259 };
260 let i = |k: &str, d: i64| -> i64 {
261 self.storage
262 .get_meta(k)
263 .ok()
264 .flatten()
265 .and_then(|v| v.parse().ok())
266 .unwrap_or(d)
267 };
268 let b = |k: &str, d: bool| -> bool {
269 self.storage
270 .get_meta(k)
271 .ok()
272 .flatten()
273 .map(|v| v.to_lowercase() == "true")
274 .unwrap_or(d)
275 };
276 self.w_content = f("recall.w_content", W_CONTENT);
277 self.w_trigger = f("recall.w_trigger", W_TRIGGER);
278 self.w_confidence = f("recall.w_confidence", W_CONFIDENCE);
279 self.top_k_candidates = i("recall.top_k_candidates", TOP_K_CANDIDATES as i64) as usize;
280 self.anti_trigger_penalty = f("recall.anti_trigger_penalty", ANTI_TRIGGER_PENALTY);
281 self.density_refill = b("recall.density_refill", DENSITY_REFILL);
282 self.low_conf_threshold = f("curate.low_conf_threshold", LOW_CONF_THRESHOLD);
283 self.low_conf_idle_days = i("curate.low_conf_idle_days", LOW_CONF_IDLE_DAYS);
284 self.repeat_select_min = i("curate.repeat_select_min", REPEAT_SELECT_MIN);
285 self.repeat_select_conf_max = f("curate.repeat_select_conf_max", REPEAT_SELECT_CONF_MAX);
286 self.never_used_age_days = i("curate.never_used_age_days", NEVER_USED_AGE_DAYS);
287 self.open_ttl_days = i("curate.open_ttl_days", OPEN_TTL_DAYS);
288 self.screening_timeout_minutes = i(
289 "curate.screening_timeout_minutes",
290 SCREENING_TIMEOUT_MINUTES,
291 );
292 self.promote_used_success_min =
293 i("curate.promote_used_success_min", PROMOTE_USED_SUCCESS_MIN);
294 self.promote_confidence_min = f("curate.promote_confidence_min", PROMOTE_CONFIDENCE_MIN);
295 self.evolve_threshold = i("evolve.threshold_new_count", EVOLVE_THRESHOLD);
296 self.distill_batch_size =
297 i("evolve.distill_batch_size", DISTILL_BATCH_SIZE as i64) as usize;
298 Ok(())
299 }
300
301 #[allow(clippy::too_many_arguments)]
306 pub fn recall(
307 &self,
308 query: &str,
309 budget: usize,
310 trace: bool,
311 include_sparks: bool,
312 top: Option<usize>,
313 source: &str,
314 expand_deps: &str, allow_trim: bool, refine_mode: &str, ) -> Result<RecallResult> {
318 validate_source(source)?;
319 let trace_id = gen_uuid();
320 let now = utc_now_iso();
321
322 let q_content = self
323 .embedding
324 .embed_content(query)
325 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
326 let q_trigger = self
327 .embedding
328 .embed_trigger(query)
329 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
330
331 let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
333 self.apply_soft_dep_bonus(&mut candidates)?;
334
335 let scored = self.score_candidates(candidates, query);
337
338 let (selected, skipped, skipped_reasons) =
340 self.pack(&scored, budget, expand_deps, allow_trim, query)?;
341
342 let depth_skipped: Vec<String> = skipped_reasons
343 .iter()
344 .filter(|(_, r)| r.as_str() == "dep_depth_limit")
345 .map(|(id, _)| id.clone())
346 .collect();
347
348 let mut selected = selected;
350 if self.density_refill {
351 selected = self.density_refill(selected, &skipped, budget);
352 }
353
354 let limited = limit_knowledge(selected, top);
355 let visible = if refine_mode == "adapt" {
356 self.refiner
357 .refine(limited.clone(), Some(budget))
358 .unwrap_or(limited)
359 } else {
360 limited
361 };
362
363 let sparks = if include_sparks {
365 self.recall_sparks(&q_content, &q_trigger)?
366 } else {
367 vec![]
368 };
369
370 if trace {
371 self.write_recall_trace(
372 &trace_id,
373 query,
374 &scored,
375 &visible,
376 &sparks,
377 &depth_skipped,
378 &skipped_reasons,
379 refine_mode,
380 source,
381 &now,
382 )?;
383 }
384
385 let empty = visible.is_empty() && sparks.is_empty();
386 Ok(RecallResult {
387 knowledge: visible,
388 sparks,
389 trace_id,
390 empty,
391 depth_skipped,
392 skipped_reasons,
393 })
394 }
395
396 fn ann_candidates(
397 &self,
398 q_content: &[f32],
399 q_trigger: &[f32],
400 ) -> Result<HashMap<String, CandidateInfo>> {
401 let embed_version = self
402 .storage
403 .get_meta("embed_version")?
404 .and_then(|v| v.parse::<i64>().ok())
405 .unwrap_or(1);
406
407 let content_res = self
408 .storage
409 .search_vec_content(q_content, self.top_k_candidates * 2)?;
410 let trigger_res = self
411 .storage
412 .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
413
414 let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
415
416 for (cid, sim) in &content_res {
417 if let Some(chunk) = self.storage.get_chunk(cid)? {
418 if chunk_is_valid_for_recall(&chunk, embed_version) {
419 let e = candidates
420 .entry(cid.clone())
421 .or_insert_with(|| CandidateInfo {
422 chunk: chunk.clone(),
423 sim_content: 0.0,
424 sim_trigger: 0.0,
425 });
426 e.sim_content = e.sim_content.max(*sim);
427 }
428 }
429 }
430 for (cid, sim) in &trigger_res {
431 if let Some(chunk) = self.storage.get_chunk(cid)? {
432 if chunk_is_valid_for_recall(&chunk, embed_version) {
433 let e = candidates
434 .entry(cid.clone())
435 .or_insert_with(|| CandidateInfo {
436 chunk: chunk.clone(),
437 sim_content: 0.0,
438 sim_trigger: 0.0,
439 });
440 e.sim_trigger = e.sim_trigger.max(*sim);
441 }
442 }
443 }
444 Ok(candidates)
445 }
446
447 fn apply_soft_dep_bonus(&self, candidates: &mut HashMap<String, CandidateInfo>) -> Result<()> {
448 let ids: Vec<String> = candidates.keys().cloned().collect();
449 for cid in ids {
450 if candidates[&cid].chunk.get("origin").and_then(Value::as_str) == Some("spark") {
451 continue;
452 }
453 let deps = self.storage.get_deps(&cid)?;
454 for (dst, kind, _) in &deps {
455 if kind != "soft" {
456 continue;
457 }
458 if let Some(target) = self.storage.get_chunk(dst)? {
459 if target.get("state").and_then(Value::as_str) == Some("archived") {
460 continue;
461 }
462 if target.get("origin").and_then(Value::as_str) == Some("spark") {
463 continue;
464 }
465 let e = candidates
466 .entry(dst.clone())
467 .or_insert_with(|| CandidateInfo {
468 chunk: target,
469 sim_content: 0.0,
470 sim_trigger: 0.0,
471 });
472 e.sim_content = (e.sim_content + 0.05).min(1.0);
473 }
474 }
475 }
476 Ok(())
477 }
478
479 fn score_candidates(
480 &self,
481 candidates: HashMap<String, CandidateInfo>,
482 query: &str,
483 ) -> Vec<(f64, Value)> {
484 let mut scored: Vec<(f64, Value)> = candidates
485 .into_values()
486 .map(|info| {
487 let conf = info
488 .chunk
489 .get("confidence")
490 .and_then(Value::as_f64)
491 .unwrap_or(0.5);
492 let mut fused = self.w_content * info.sim_content as f64
493 + self.w_trigger * info.sim_trigger as f64
494 + self.w_confidence * conf;
495 let anti = info
496 .chunk
497 .get("anti_trigger_desc")
498 .and_then(Value::as_str)
499 .unwrap_or("");
500 if !anti.is_empty() && anti_trigger_hit(query, anti) {
501 fused *= self.anti_trigger_penalty;
502 }
503 let mut chunk = info.chunk;
504 chunk["_fused_score"] = json!(fused);
505 (fused, chunk)
506 })
507 .collect();
508 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
509 scored.truncate(self.top_k_candidates);
510 scored
511 }
512
513 fn pack(
514 &self,
515 scored: &[(f64, Value)],
516 budget: usize,
517 expand_deps: &str,
518 allow_trim: bool,
519 query: &str,
520 ) -> Result<PackResult> {
521 let mut selected: Vec<Value> = vec![];
522 let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
523 let mut skipped_reasons: HashMap<String, String> = HashMap::new();
524 let mut used_ids: HashSet<String> = HashSet::new();
525 let mut used_tokens: usize = 0;
526
527 for (fused, chunk) in scored {
528 let cid = chunk["id"].as_str().unwrap_or("").to_string();
529 if used_ids.contains(&cid) {
530 continue;
531 }
532
533 let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
535 if let Some(reason) = dep_skip_reason {
536 skipped_reasons.insert(cid, reason);
537 continue;
538 }
539
540 let new_block: Vec<Value> = block
541 .iter()
542 .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
543 .cloned()
544 .collect();
545 let cost = block_cost(&new_block);
546
547 if used_tokens + cost <= budget {
548 for b in &block {
549 let bid = b["id"].as_str().unwrap_or("").to_string();
550 if !used_ids.contains(&bid) {
551 let mut b = b.clone();
552 b["_fused_score"] = json!(fused);
553 selected.push(b);
554 used_ids.insert(bid);
555 }
556 }
557 used_tokens += cost;
558 } else if allow_trim {
559 if let Some(trimmed) =
561 self.refiner
562 .trim(&block, query, budget.saturating_sub(used_tokens))
563 {
564 let trim_cost = block_cost(&trimmed);
565 if used_tokens + trim_cost <= budget {
566 for b in &trimmed {
567 let bid = b["id"].as_str().unwrap_or("").to_string();
568 if !used_ids.contains(&bid) {
569 let mut b = b.clone();
570 b["_fused_score"] = json!(fused);
571 b["_trimmed"] = json!(true);
572 selected.push(b);
573 used_ids.insert(bid);
574 }
575 }
576 used_tokens += trim_cost;
577 continue;
578 }
579 }
580 skipped.push((block, *fused, cost));
581 } else {
582 skipped.push((block, *fused, cost));
583 }
584 }
585 Ok((selected, skipped, skipped_reasons))
586 }
587
588 fn build_dep_block(
591 &self,
592 seed: &Value,
593 expand_deps: &str,
594 ) -> Result<(Vec<Value>, Option<String>)> {
595 if expand_deps == "false" || expand_deps.is_empty() {
596 return Ok((vec![seed.clone()], None));
597 }
598 let seed_id = seed["id"].as_str().unwrap_or("");
599 match expand_deps {
600 "direct" => {
601 let deps = self.storage.get_deps(seed_id)?;
602 let mut block = vec![seed.clone()];
603 for (dep_id, kind, _) in &deps {
604 if kind != "hard" {
605 continue;
606 }
607 match self.validate_hard_dep(dep_id)? {
608 Some(chunk) => block.push(chunk),
609 None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
610 }
611 }
612 Ok((block, None))
613 }
614 "closure" => {
615 let mut block = vec![seed.clone()];
616 let mut visited: HashSet<String> = [seed_id.to_string()].into();
617 match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
618 Some(reason) => Ok((vec![], Some(reason))),
619 None => Ok((block, None)),
620 }
621 }
622 _ => Ok((vec![seed.clone()], None)),
623 }
624 }
625
626 fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
628 match self.storage.get_chunk(dep_id)? {
629 None => Ok(None),
630 Some(chunk) => {
631 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
632 let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
633 let embed_v = chunk
634 .get("embed_version")
635 .and_then(Value::as_i64)
636 .unwrap_or(0);
637 if state == "archived" || origin == "spark" || embed_v == 0 {
638 Ok(None)
639 } else {
640 Ok(Some(chunk))
641 }
642 }
643 }
644 }
645
646 fn expand_hard_closure(
648 &self,
649 id: &str,
650 visited: &mut HashSet<String>,
651 block: &mut Vec<Value>,
652 depth: usize,
653 max_depth: usize,
654 ) -> Result<Option<String>> {
655 if depth >= max_depth {
656 return Ok(Some("dep_depth_limit".to_string()));
657 }
658 let deps = self.storage.get_deps(id)?;
659 for (dep_id, kind, _) in &deps {
660 if kind != "hard" {
661 continue;
662 }
663 if visited.contains(dep_id) {
664 continue;
665 } visited.insert(dep_id.clone());
667 match self.validate_hard_dep(dep_id)? {
668 None => return Ok(Some("hard_dep_unavailable".to_string())),
669 Some(chunk) => {
670 block.push(chunk);
671 if let Some(reason) =
672 self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
673 {
674 return Ok(Some(reason));
675 }
676 }
677 }
678 }
679 Ok(None)
680 }
681
682 fn density_refill(
683 &self,
684 mut selected: Vec<Value>,
685 skipped: &[(Vec<Value>, f64, usize)],
686 budget: usize,
687 ) -> Vec<Value> {
688 let used_tokens = block_cost(&selected);
689 if used_tokens >= budget {
690 return selected;
691 }
692
693 let selected_ids: HashSet<String> = selected
694 .iter()
695 .filter_map(|c| c["id"].as_str().map(str::to_string))
696 .collect();
697
698 let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
699 .iter()
700 .filter_map(|(block, fscore, _)| {
701 let block: Vec<Value> = block
702 .iter()
703 .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
704 .cloned()
705 .collect();
706 if block.is_empty() {
707 return None;
708 }
709 let cost = block_cost(&block);
710 let density = fscore / cost.max(1) as f64;
711 Some((density, block, cost))
712 })
713 .collect();
714 density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
715
716 let mut used_tokens = block_cost(&selected);
717 let mut added_ids: HashSet<String> = selected_ids;
718 for (_, block, cost) in density_items {
719 if used_tokens + cost <= budget {
720 for b in block {
721 let bid = b["id"].as_str().unwrap_or("").to_string();
722 if !added_ids.contains(&bid) {
723 selected.push(b);
724 added_ids.insert(bid);
725 }
726 }
727 used_tokens += cost;
728 }
729 }
730 selected
731 }
732
733 fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
734 let embed_version = self
735 .storage
736 .get_meta("embed_version")?
737 .and_then(|v| v.parse::<i64>().ok())
738 .unwrap_or(1);
739
740 let content_res = self
741 .storage
742 .search_vec_content(q_content, self.top_k_candidates)?;
743 let trigger_res = self
744 .storage
745 .search_vec_trigger(q_trigger, self.top_k_candidates)?;
746
747 let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
748 for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
749 if let Some(chunk) = self.storage.get_chunk(cid)? {
750 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
751 continue;
752 }
753 if chunk.get("state").and_then(Value::as_str) == Some("archived") {
754 continue;
755 }
756 let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
757 if maturity == "promoted" || maturity == "dropped" {
758 continue;
759 }
760 let ev = chunk
761 .get("embed_version")
762 .and_then(Value::as_i64)
763 .unwrap_or(1);
764 if ev < embed_version {
765 continue;
766 }
767 let entry = spark_scores
768 .entry(cid.clone())
769 .or_insert_with(|| (*sim, chunk.clone()));
770 if *sim > entry.0 {
771 *entry = (*sim, chunk);
772 }
773 }
774 }
775 let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
776 sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
777 Ok(sparks
778 .into_iter()
779 .take(self.top_k_candidates)
780 .map(|(_, c)| c)
781 .collect())
782 }
783
784 #[allow(clippy::too_many_arguments)]
785 fn write_recall_trace(
786 &self,
787 trace_id: &str,
788 query: &str,
789 scored: &[(f64, Value)],
790 visible: &[Value],
791 sparks: &[Value],
792 depth_skipped: &[String],
793 skipped_reasons: &HashMap<String, String>,
794 refine_mode: &str,
795 source: &str,
796 now: &str,
797 ) -> Result<()> {
798 let lib_id = self.storage.lib_id()?;
799 self.storage.begin_immediate()?;
800 let result = (|| -> Result<()> {
801 for (rank, (_, chunk)) in scored.iter().enumerate() {
802 let cid = chunk["id"].as_str().unwrap_or("");
803 let sim = chunk.get("_fused_score").and_then(Value::as_f64);
804 let rm = skipped_reasons
806 .get(cid)
807 .map(|r| format!("skipped:{r}"))
808 .or_else(|| {
809 if refine_mode != "off" && !refine_mode.is_empty() {
810 Some(refine_mode.to_string())
811 } else {
812 None
813 }
814 });
815 self.storage.insert_usage_trace(
816 trace_id,
817 Some(cid),
818 "retrieved",
819 1.0,
820 sim,
821 rm.as_deref(),
822 None,
823 Some((rank + 1) as i64),
824 source,
825 now,
826 )?;
827 }
828 for (rank, chunk) in visible.iter().enumerate() {
829 let cid = chunk["id"].as_str().unwrap_or("");
830 self.storage.insert_usage_trace(
831 trace_id,
832 Some(cid),
833 "selected",
834 1.0,
835 None,
836 None,
837 None,
838 Some((rank + 1) as i64),
839 source,
840 now,
841 )?;
842 if chunk
844 .get("_trimmed")
845 .and_then(Value::as_bool)
846 .unwrap_or(false)
847 {
848 self.storage.insert_usage_trace(
849 trace_id,
850 Some(cid),
851 "refined",
852 1.0,
853 None,
854 Some("trim"),
855 None,
856 Some((rank + 1) as i64),
857 source,
858 now,
859 )?;
860 }
861 }
862 for (rank, chunk) in sparks.iter().enumerate() {
864 let cid = chunk["id"].as_str().unwrap_or("");
865 self.storage.insert_usage_trace(
866 trace_id,
867 Some(cid),
868 "retrieved",
869 1.0,
870 None,
871 Some("spark"),
872 None,
873 Some((rank + 1) as i64),
874 source,
875 now,
876 )?;
877 }
878 let snapshot = json!({
879 "retrieved": scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
880 "selected": visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
881 "sparks": sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>(),
882 "depth_skipped": depth_skipped,
883 "skipped_reasons": skipped_reasons,
884 });
885 let log = EpisodicLogRow {
886 id: gen_uuid(),
887 trace_id: trace_id.to_string(),
888 lib_id,
889 ts: now.to_string(),
890 query: Some(query.to_string()),
891 recall_snapshot: Some(snapshot.to_string()),
892 event_source: source.to_string(),
893 distill_state: "open".to_string(),
894 ..Default::default()
895 };
896 self.storage.upsert_episodic_log(&log)?;
897 self.storage.commit()
898 })();
899 if result.is_err() {
900 let _ = self.storage.rollback();
901 }
902 result
903 }
904
905 #[allow(clippy::too_many_arguments)]
910 pub fn record(
911 &self,
912 trace_id: &str,
913 query: Option<&str>,
914 output: Option<&str>,
915 output_summary: Option<&str>,
916 outcome: Option<&str>,
917 used: Option<&[String]>,
918 feedback_up: Option<&[String]>,
919 feedback_down: Option<&[String]>,
920 nomination: Option<&str>,
921 priority: i64,
922 source: &str,
923 ) -> Result<()> {
924 if let Some(o) = outcome {
925 if !matches!(o, "ok" | "fail" | "unknown") {
926 return Err(InnateError::InvalidState(format!("invalid outcome: {o}")));
927 }
928 }
929 validate_source(source)?;
930 let effective_priority = if nomination.is_some() && priority == 0 {
931 1
932 } else {
933 priority
934 };
935 let now = utc_now_iso();
936 let lib_id = self.storage.lib_id()?;
937
938 self.storage.begin_immediate()?;
939 let result = (|| -> Result<()> {
940 let log = self.storage.get_episodic_log(trace_id)?;
941 let mut is_fresh_insert = false;
942 let log = match log {
943 Some(l) => l,
944 None => {
945 let row = EpisodicLogRow {
946 id: gen_uuid(),
947 trace_id: trace_id.to_string(),
948 lib_id,
949 ts: now.clone(),
950 query: query.map(str::to_string).or_else(|| Some(String::new())),
951 output: output.map(str::to_string),
952 output_summary: output_summary.map(str::to_string),
953 outcome: outcome.map(str::to_string),
954 event_source: source.to_string(),
955 nomination: nomination.map(str::to_string),
956 priority: effective_priority,
957 distill_state: "open".to_string(),
958 ..Default::default()
959 };
960 self.storage.upsert_episodic_log(&row)?;
961 is_fresh_insert = true;
962 self.storage.get_episodic_log(trace_id)?.unwrap()
963 }
964 };
965
966 let existing_outcome = log
967 .get("outcome")
968 .and_then(Value::as_str)
969 .map(str::to_string);
970 if let Some(new_outcome) = outcome {
971 if let Some(ref ex) = existing_outcome {
972 if ex != new_outcome {
973 return Err(InnateError::OutcomeConflict {
974 trace_id: trace_id.to_string(),
975 existing: ex.clone(),
976 requested: new_outcome.to_string(),
977 });
978 }
979 }
980 }
981
982 if let Some(used_ids) = used {
984 for cid in used_ids {
985 self.storage.insert_usage_trace(
986 trace_id,
987 Some(cid),
988 "used",
989 0.3,
990 None,
991 None,
992 None,
993 None,
994 source,
995 &now,
996 )?;
997 self.storage.update_chunk_last_used(cid, &now)?;
998 }
999 }
1000
1001 if let Some(o) = outcome {
1003 if matches!(o, "ok" | "fail") {
1004 let event = if o == "ok" { "task_ok" } else { "task_fail" };
1005 let strength = if event == "task_fail" { 0.15 } else { 1.0 };
1006 self.storage.insert_usage_trace(
1007 trace_id, None, event, strength, None, None, None, None, source, &now,
1008 )?;
1009 }
1010 }
1011
1012 if let Some(o) = outcome {
1014 if is_fresh_insert || existing_outcome.is_none() {
1015 self.apply_outcome_implicit(trace_id, o, used, &now)?;
1016 }
1017 }
1018
1019 if let Some(ups) = feedback_up {
1021 for cid in ups {
1022 self.update_confidence(cid, 1.0, 1.0, "user_up", &now, true)?;
1023 self.storage.update_chunk_last_used(cid, &now)?;
1024 }
1025 }
1026 if let Some(downs) = feedback_down {
1027 for cid in downs {
1028 self.update_confidence(cid, 0.0, 1.0, "user_down", &now, true)?;
1029 }
1030 }
1031
1032 if !is_fresh_insert {
1034 self.storage.patch_episodic_log_content(
1035 trace_id,
1036 query,
1037 output,
1038 output_summary,
1039 nomination,
1040 effective_priority,
1041 )?;
1042 }
1043
1044 let current_state = log
1046 .get("distill_state")
1047 .and_then(Value::as_str)
1048 .unwrap_or("open");
1049 let outcome_completed = outcome.is_some() || existing_outcome.is_some();
1050 let new_state = if current_state == "open" && outcome_completed {
1051 let has_material = output_summary.is_some()
1052 || nomination.is_some()
1053 || output.is_some()
1054 || log.get("output_summary").and_then(Value::as_str).is_some()
1055 || log.get("nomination").and_then(Value::as_str).is_some()
1056 || log.get("output").and_then(Value::as_str).is_some()
1057 || (used.map(|u| !u.is_empty()).unwrap_or(false)
1058 && outcome.map(|o| o != "unknown").unwrap_or(false));
1059 if has_material {
1060 Some("new")
1061 } else {
1062 Some("discarded")
1063 }
1064 } else {
1065 None
1066 };
1067 if let Some(state) = new_state {
1068 let note = if state == "discarded" {
1069 Some("insufficient_material")
1070 } else {
1071 None
1072 };
1073 let outcome_str = outcome.map(str::to_string);
1074 self.storage.update_episodic_log_state(
1075 trace_id,
1076 state,
1077 note,
1078 outcome_str.as_deref(),
1079 )?;
1080 } else if outcome.is_some() {
1081 let outcome_str = outcome.map(str::to_string);
1082 self.storage.update_episodic_log_state(
1083 trace_id,
1084 current_state,
1085 None,
1086 outcome_str.as_deref(),
1087 )?;
1088 }
1089
1090 self.storage.commit()
1091 })();
1092 if result.is_err() {
1093 let _ = self.storage.rollback();
1094 }
1095 result
1096 }
1097
1098 fn apply_outcome_implicit(
1099 &self,
1100 trace_id: &str,
1101 outcome: &str,
1102 used: Option<&[String]>,
1103 now: &str,
1104 ) -> Result<()> {
1105 let used_set: HashSet<&str> = used
1106 .map(|u| u.iter().map(String::as_str).collect())
1107 .unwrap_or_default();
1108 let (target, strength, reason) = if outcome == "ok" {
1109 (1.0, 0.3, "agent_used")
1110 } else {
1111 (0.0, 0.15, "task_fail")
1112 };
1113 for cid in &used_set {
1114 self.update_confidence(cid, target, strength, reason, now, false)?;
1115 }
1116 let selected_rows = self.storage.query_chunks_params(
1118 "SELECT chunk_id FROM usage_trace WHERE trace_id=? AND event='selected' AND chunk_id IS NOT NULL",
1119 rusqlite::params![trace_id],
1120 )?;
1121 for row in selected_rows {
1122 if let Some(cid) = row.get("chunk_id").and_then(Value::as_str) {
1123 if !used_set.contains(cid) {
1124 self.update_confidence(cid, 0.3, 0.1, "selected_unused", now, false)?;
1125 }
1126 }
1127 }
1128 Ok(())
1129 }
1130
1131 fn update_confidence(
1135 &self,
1136 chunk_id: &str,
1137 target: f64,
1138 strength: f64,
1139 reason: &str,
1140 now: &str,
1141 explicit: bool,
1142 ) -> Result<()> {
1143 let chunk = match self.storage.get_chunk(chunk_id)? {
1144 Some(c) => c,
1145 None => return Ok(()),
1146 };
1147 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1148 return Ok(());
1149 }
1150 let conf = chunk
1151 .get("confidence")
1152 .and_then(Value::as_f64)
1153 .unwrap_or(0.5);
1154
1155 let recency_w = if explicit {
1157 const KAPPA: f64 = 0.5;
1158 const W_DAYS: f64 = 14.0;
1159 let gap_days = chunk
1160 .get("last_used_at")
1161 .and_then(Value::as_str)
1162 .map(|t| iso_days_diff(now, t) as f64)
1163 .unwrap_or(0.0);
1164 (1.0 + KAPPA * (-(gap_days / W_DAYS) * std::f64::consts::LN_2).exp()).min(1.5)
1165 } else {
1166 1.0
1167 };
1168
1169 let alpha = 0.2_f64;
1170 let effective_alpha = (alpha * strength * recency_w).min(1.0);
1171 let new_conf = (conf + effective_alpha * (target - conf)).clamp(0.0, 1.0);
1172 self.storage
1173 .update_chunk_confidence(chunk_id, new_conf, Some(reason), now)?;
1174 Ok(())
1175 }
1176
1177 pub fn add(
1182 &self,
1183 content: &str,
1184 kind: &str,
1185 trigger_desc: Option<&str>,
1186 anti_trigger_desc: Option<&str>,
1187 source: &str,
1188 skill_name: Option<&str>,
1189 ) -> Result<String> {
1190 if !matches!(kind, "note" | "skill") {
1191 return Err(InnateError::InvalidState(format!("invalid kind: {kind}")));
1192 }
1193 if !matches!(source, "chat" | "manual" | "doc" | "agent") {
1194 return Err(InnateError::InvalidState(format!(
1195 "invalid source: {source}"
1196 )));
1197 }
1198
1199 let (content, action) = self.sanitize_content(content);
1200 if action == SanitizeAction::Discard {
1201 return Ok(String::new());
1202 }
1203
1204 let trigger_clean = trigger_desc.and_then(|t| {
1205 let (cleaned, act) = self.sanitizer.sanitize(t);
1206 if act == SanitizeAction::Discard {
1207 None
1208 } else {
1209 Some(cleaned)
1210 }
1211 });
1212 let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
1213 let (cleaned, act) = self.sanitizer.sanitize(t);
1214 if act == SanitizeAction::Discard {
1215 None
1216 } else {
1217 Some(cleaned)
1218 }
1219 });
1220
1221 let h = content_hash(&content);
1222 if self.storage.is_hash_invalidated(&h)? {
1223 return Err(InnateError::InvalidState(
1224 "content hash is invalidated".into(),
1225 ));
1226 }
1227
1228 let existing = self.storage.query_chunks_params(
1230 "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
1231 rusqlite::params![h],
1232 )?;
1233 if let Some(e) = existing.first() {
1234 if let Some(id) = e.get("id").and_then(Value::as_str) {
1235 return Ok(id.to_string());
1236 }
1237 }
1238
1239 let now = utc_now_iso();
1240 let chunk_id = gen_uuid();
1241 let redacted = action == SanitizeAction::Redact;
1242
1243 let (origin, state, conf, prot, init_state_reason) = if source == "agent" {
1244 (
1245 "captured",
1246 "pending",
1247 if redacted { 0.4 } else { 0.60 },
1248 0,
1249 "init:captured_agent",
1250 )
1251 } else if kind == "skill" {
1252 (
1253 "installed",
1254 "active",
1255 if redacted { 0.4 } else { 0.85 },
1256 1,
1257 "init:installed",
1258 )
1259 } else {
1260 (
1261 "captured",
1262 "active",
1263 if redacted { 0.4 } else { 0.60 },
1264 0,
1265 "init:captured",
1266 )
1267 };
1268
1269 let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
1271 let (cvec, tvec, embed_ver, final_state_reason) = match (
1272 self.embedding.embed_content(&content),
1273 self.embedding.embed_trigger(trigger_str),
1274 ) {
1275 (Ok(cv), Ok(tv)) => (cv, tv, 1i64, init_state_reason.to_string()),
1276 _ => (
1277 vec![],
1278 vec![],
1279 0i64,
1280 format!("embedding_pending:target={state}"),
1281 ),
1282 };
1283
1284 let tokens = estimate_tokens(&content) as i64;
1285 let row = ChunkRow {
1286 id: chunk_id.clone(),
1287 skill_name: skill_name.map(str::to_string),
1288 content: content.clone(),
1289 trigger_desc: trigger_clean.clone(),
1290 anti_trigger_desc: anti_trigger_clean.clone(),
1291 content_hash: h,
1292 token_count: Some(tokens),
1293 origin: origin.to_string(),
1294 source: Some(source.to_string()),
1295 protected: prot,
1296 state: state.to_string(),
1297 state_reason: Some(final_state_reason),
1298 confidence: conf,
1299 confidence_reason: Some(format!("init:{origin}")),
1300 version: 1,
1301 embed_version: embed_ver,
1302 created_at: now.clone(),
1303 updated_at: now.clone(),
1304 ..Default::default()
1305 };
1306
1307 self.storage.begin_immediate()?;
1308 let result = (|| -> Result<()> {
1309 self.storage.insert_chunk(&row)?;
1310 if embed_ver > 0 {
1311 self.storage
1312 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
1313 self.storage
1314 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
1315 }
1316 self.storage.commit()
1317 })();
1318 if result.is_err() {
1319 let _ = self.storage.rollback();
1320 }
1321 result?;
1322 Ok(chunk_id)
1323 }
1324
1325 pub fn spark(
1330 &self,
1331 content: &str,
1332 trigger_desc: Option<&str>,
1333 anti_trigger_desc: Option<&str>,
1334 ) -> Result<String> {
1335 let (content, action) = self.sanitize_content(content);
1336 if action == SanitizeAction::Discard {
1337 return Ok(String::new());
1338 }
1339
1340 let trigger_clean = trigger_desc.and_then(|t| {
1341 let (cleaned, act) = self.sanitizer.sanitize(t);
1342 if act == SanitizeAction::Discard {
1343 None
1344 } else {
1345 Some(cleaned)
1346 }
1347 });
1348 let anti_trigger_clean = anti_trigger_desc.and_then(|t| {
1349 let (cleaned, act) = self.sanitizer.sanitize(t);
1350 if act == SanitizeAction::Discard {
1351 None
1352 } else {
1353 Some(cleaned)
1354 }
1355 });
1356
1357 let h = content_hash(&content);
1358 if self.storage.is_hash_invalidated(&h)? {
1359 return Err(InnateError::InvalidState(
1360 "content hash is invalidated".into(),
1361 ));
1362 }
1363
1364 let related: Vec<String> = self
1366 .recall(
1367 &content,
1368 2000,
1369 false,
1370 false,
1371 Some(5),
1372 "sdk",
1373 "false",
1374 false,
1375 "off",
1376 )
1377 .map(|r| {
1378 r.knowledge
1379 .iter()
1380 .filter_map(|c| c["id"].as_str().map(str::to_string))
1381 .collect()
1382 })
1383 .unwrap_or_default();
1384
1385 let now = utc_now_iso();
1386 let chunk_id = gen_uuid();
1387 let tokens = estimate_tokens(&content) as i64;
1388
1389 let trigger_str = trigger_clean.as_deref().unwrap_or(&content);
1390 let (cvec, tvec, embed_ver, state_reason) = match (
1391 self.embedding.embed_content(&content),
1392 self.embedding.embed_trigger(trigger_str),
1393 ) {
1394 (Ok(cv), Ok(tv)) => (cv, tv, 1i64, "init:spark".to_string()),
1395 _ => (
1396 vec![],
1397 vec![],
1398 0i64,
1399 "embedding_pending:target=active".to_string(),
1400 ),
1401 };
1402
1403 let row = ChunkRow {
1404 id: chunk_id.clone(),
1405 content: content.clone(),
1406 trigger_desc: trigger_clean.clone(),
1407 anti_trigger_desc: anti_trigger_clean.clone(),
1408 content_hash: h,
1409 token_count: Some(tokens),
1410 origin: "spark".to_string(),
1411 maturity: Some("seed".to_string()),
1412 related_ids: if related.is_empty() {
1413 None
1414 } else {
1415 Some(related.join(","))
1416 },
1417 state: "active".to_string(),
1418 state_reason: Some(state_reason),
1419 confidence: 0.5,
1420 version: 1,
1421 embed_version: embed_ver,
1422 created_at: now.clone(),
1423 updated_at: now.clone(),
1424 ..Default::default()
1425 };
1426
1427 self.storage.begin_immediate()?;
1428 let result = (|| -> Result<()> {
1429 self.storage.insert_chunk(&row)?;
1430 if embed_ver > 0 {
1431 self.storage
1432 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
1433 self.storage
1434 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
1435 }
1436 self.storage.commit()
1437 })();
1438 if result.is_err() {
1439 let _ = self.storage.rollback();
1440 }
1441 result?;
1442 Ok(chunk_id)
1443 }
1444
1445 pub fn mature_spark(&self, spark_id: &str, to: &str) -> Result<()> {
1450 let chunk = self
1451 .storage
1452 .get_chunk(spark_id)?
1453 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
1454 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
1455 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
1456 }
1457 let current = chunk
1458 .get("maturity")
1459 .and_then(Value::as_str)
1460 .unwrap_or("seed");
1461 let valid_next: &[&str] = match current {
1462 "seed" => &["sprouting"],
1463 "sprouting" => &["incubating"],
1464 _ => {
1465 return Err(InnateError::InvalidState(format!(
1466 "spark {spark_id} already {current}"
1467 )))
1468 }
1469 };
1470 if current == to {
1471 return Ok(());
1472 }
1473 if !valid_next.contains(&to) {
1474 return Err(InnateError::InvalidState(format!(
1475 "invalid spark maturity transition: {current} -> {to}"
1476 )));
1477 }
1478 let now = utc_now_iso();
1479 self.storage.begin_immediate()?;
1480 let result = self
1481 .storage
1482 .query_chunks_params(
1483 "UPDATE chunks SET maturity=?, updated_at=? WHERE id=?",
1484 rusqlite::params![to, now, spark_id],
1485 )
1486 .and_then(|_| self.storage.commit());
1487 if result.is_err() {
1488 let _ = self.storage.rollback();
1489 }
1490 result.map(|_| ())
1491 }
1492
1493 pub fn promote_spark(&self, spark_id: &str, to: &str) -> Result<String> {
1494 let spark = self
1495 .storage
1496 .get_chunk(spark_id)?
1497 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
1498 if spark.get("origin").and_then(Value::as_str) != Some("spark") {
1499 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
1500 }
1501 let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
1502 if maturity == "promoted" || maturity == "dropped" {
1503 return Err(InnateError::InvalidState(format!(
1504 "spark {spark_id} already {maturity}"
1505 )));
1506 }
1507 if !matches!(to, "note" | "skill") {
1508 return Err(InnateError::InvalidState(format!(
1509 "invalid spark promotion target: {to}"
1510 )));
1511 }
1512
1513 let content = spark.get("content").and_then(Value::as_str).unwrap_or("");
1514 let (content, action) = self.sanitize_content(content);
1515 if action == SanitizeAction::Discard {
1516 return Err(InnateError::InvalidState(
1517 "sanitize discard on promote".into(),
1518 ));
1519 }
1520
1521 let promoted_hash = content_hash(&content);
1522 if self.storage.is_hash_invalidated(&promoted_hash)? {
1523 return Err(InnateError::InvalidState(
1524 "spark content hash is invalidated".into(),
1525 ));
1526 }
1527
1528 let now = utc_now_iso();
1529
1530 let existing = self.storage.query_chunks_params(
1532 "SELECT id FROM chunks WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending') ORDER BY created_at ASC LIMIT 1",
1533 rusqlite::params![promoted_hash],
1534 )?;
1535 if let Some(e) = existing.first() {
1536 if let Some(id) = e.get("id").and_then(Value::as_str) {
1537 let id = id.to_string();
1538 self.storage.query_chunks_params(
1539 "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
1540 rusqlite::params![now, spark_id],
1541 )?;
1542 self.storage.commit()?;
1543 return Ok(id);
1544 }
1545 }
1546
1547 let (state, conf, prot, origin, state_reason) = if to == "skill" {
1548 ("active", 0.85, 1, "installed", "init:installed")
1549 } else {
1550 ("active", 0.60, 0, "captured", "init:captured")
1551 };
1552
1553 let conf = if action == SanitizeAction::Redact {
1554 0.4_f64
1555 } else {
1556 conf
1557 };
1558 let new_id = gen_uuid();
1559 let trigger = spark.get("trigger_desc").and_then(Value::as_str);
1560 let anti = spark.get("anti_trigger_desc").and_then(Value::as_str);
1561
1562 let row = ChunkRow {
1563 id: new_id.clone(),
1564 content: content.clone(),
1565 trigger_desc: trigger.map(str::to_string),
1566 anti_trigger_desc: anti.map(str::to_string),
1567 content_hash: promoted_hash,
1568 token_count: Some(estimate_tokens(&content) as i64),
1569 origin: origin.to_string(),
1570 source: Some("manual".to_string()),
1571 protected: prot,
1572 state: state.to_string(),
1573 state_reason: Some(state_reason.to_string()),
1574 confidence: conf,
1575 confidence_reason: Some("manual_set".to_string()),
1576 parent_id: Some(spark_id.to_string()),
1577 version: 1,
1578 embed_version: 1,
1579 created_at: now.clone(),
1580 updated_at: now.clone(),
1581 ..Default::default()
1582 };
1583
1584 let cvec = self.embedding.embed_content(&content)?;
1585 let tvec = self.embedding.embed_trigger(trigger.unwrap_or(&content))?;
1586
1587 self.storage.begin_immediate()?;
1588 let result = (|| -> Result<()> {
1589 self.storage.insert_chunk(&row)?;
1590 self.storage
1591 .insert_vec_content(&new_id, &pack_embedding(&cvec))?;
1592 self.storage
1593 .insert_vec_trigger(&new_id, &pack_embedding(&tvec))?;
1594 self.storage.query_chunks_params(
1595 "UPDATE chunks SET maturity='promoted', updated_at=? WHERE id=?",
1596 rusqlite::params![now, spark_id],
1597 )?;
1598 self.storage.commit()
1599 })();
1600 if result.is_err() {
1601 let _ = self.storage.rollback();
1602 }
1603 result?;
1604 Ok(new_id)
1605 }
1606
1607 pub fn drop_spark(&self, spark_id: &str, reason: &str) -> Result<()> {
1608 let spark = self
1609 .storage
1610 .get_chunk(spark_id)?
1611 .ok_or_else(|| InnateError::ChunkNotFound(spark_id.to_string()))?;
1612 if spark.get("origin").and_then(Value::as_str) != Some("spark") {
1613 return Err(InnateError::ChunkNotFound(spark_id.to_string()));
1614 }
1615 let maturity = spark.get("maturity").and_then(Value::as_str).unwrap_or("");
1616 if maturity == "promoted" {
1617 return Err(InnateError::InvalidState(format!(
1618 "spark {spark_id} already promoted"
1619 )));
1620 }
1621 if maturity == "dropped" {
1622 return Ok(());
1623 }
1624 let now = utc_now_iso();
1625 let reason_str = if reason.is_empty() {
1626 "dropped".to_string()
1627 } else {
1628 format!("dropped:{reason}")
1629 };
1630 self.storage.begin_immediate()?;
1631 let result = self
1632 .storage
1633 .query_chunks_params(
1634 "UPDATE chunks SET maturity='dropped', state_reason=?, updated_at=? WHERE id=?",
1635 rusqlite::params![reason_str, now, spark_id],
1636 )
1637 .and_then(|_| self.storage.commit());
1638 if result.is_err() {
1639 let _ = self.storage.rollback();
1640 }
1641 result.map(|_| ())
1642 }
1643
1644 pub fn approve(&self, chunk_id: &str) -> Result<()> {
1649 let chunk = self
1650 .storage
1651 .get_chunk(chunk_id)?
1652 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1653 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1654 return Err(InnateError::InvalidState(
1655 "spark lifecycle uses promote_spark() or invalidate()".into(),
1656 ));
1657 }
1658 if chunk.get("state").and_then(Value::as_str) == Some("active") {
1659 return Ok(());
1660 }
1661 if chunk.get("state").and_then(Value::as_str) != Some("pending") {
1662 return Err(InnateError::InvalidState(
1663 "approve requires pending chunk".into(),
1664 ));
1665 }
1666 let now = utc_now_iso();
1667 self.storage.begin_immediate()?;
1668 let result = (|| -> Result<()> {
1669 self.storage
1670 .update_chunk_state(chunk_id, "active", Some("approved"), &now)?;
1671 self.storage.query_chunks_params(
1672 "UPDATE chunks SET confidence_reason='manual_set', updated_at=? WHERE id=?",
1673 rusqlite::params![now, chunk_id],
1674 )?;
1675 self.storage.commit()
1676 })();
1677 if result.is_err() {
1678 let _ = self.storage.rollback();
1679 }
1680 result
1681 }
1682
1683 pub fn archive(&self, chunk_id: &str, reason: &str) -> Result<()> {
1684 let chunk = self
1685 .storage
1686 .get_chunk(chunk_id)?
1687 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1688 if chunk.get("origin").and_then(Value::as_str) == Some("spark") {
1689 return Err(InnateError::InvalidState(
1690 "spark lifecycle uses drop_spark() or invalidate()".into(),
1691 ));
1692 }
1693 let now = utc_now_iso();
1694 self.storage.begin_immediate()?;
1695 let result = self
1696 .storage
1697 .update_chunk_state(chunk_id, "archived", Some(reason), &now)
1698 .and_then(|_| self.storage.commit());
1699 if result.is_err() {
1700 let _ = self.storage.rollback();
1701 }
1702 result
1703 }
1704
1705 pub fn invalidate(&self, chunk_id: &str, reason: &str) -> Result<()> {
1706 let chunk = self
1707 .storage
1708 .get_chunk(chunk_id)?
1709 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1710 let h = chunk
1711 .get("content_hash")
1712 .and_then(Value::as_str)
1713 .unwrap_or("")
1714 .to_string();
1715 let now = utc_now_iso();
1716 let reason_str = if reason.is_empty() {
1717 "invalidated".to_string()
1718 } else {
1719 format!("invalidated:{reason}")
1720 };
1721
1722 self.storage.begin_immediate()?;
1723 let result = (|| -> Result<()> {
1724 self.storage.query_chunks_params(
1725 "UPDATE chunks SET state='archived', confidence=0.0, state_reason=?, state_updated_at=?, updated_at=? WHERE id=?",
1726 rusqlite::params![reason_str, now, now, chunk_id],
1727 )?;
1728 self.storage.query_chunks_params(
1729 "UPDATE chunks SET state='archived', confidence=0.0, state_reason='invalidated:same_hash', state_updated_at=?, updated_at=? WHERE content_hash=? AND id!=?",
1730 rusqlite::params![now, now, h, chunk_id],
1731 )?;
1732 self.storage
1733 .insert_invalidated_hash(&h, Some(reason), &now)?;
1734 self.storage.commit()
1735 })();
1736 if result.is_err() {
1737 let _ = self.storage.rollback();
1738 }
1739 result
1740 }
1741
1742 pub fn restore(&self, chunk_id: &str) -> Result<()> {
1743 let chunk = self
1744 .storage
1745 .get_chunk(chunk_id)?
1746 .ok_or_else(|| InnateError::ChunkNotFound(chunk_id.to_string()))?;
1747 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
1748 if state == "active" {
1749 return Ok(());
1750 }
1751 if state != "archived" {
1752 return Err(InnateError::InvalidState(
1753 "restore requires archived chunk".into(),
1754 ));
1755 }
1756 let was_invalidated = chunk
1757 .get("state_reason")
1758 .and_then(Value::as_str)
1759 .map(|r| r.starts_with("invalidated"))
1760 .unwrap_or(false);
1761 let h = chunk
1762 .get("content_hash")
1763 .and_then(Value::as_str)
1764 .unwrap_or("")
1765 .to_string();
1766 let now = utc_now_iso();
1767
1768 self.storage.begin_immediate()?;
1769 let result = (|| -> Result<()> {
1770 self.storage
1771 .update_chunk_state(chunk_id, "active", Some("restore"), &now)?;
1772 if was_invalidated {
1773 self.storage.query_chunks_params(
1774 "DELETE FROM invalidated_hashes WHERE content_hash=?",
1775 rusqlite::params![h],
1776 )?;
1777 }
1778 self.storage.query_chunks_params(
1779 "UPDATE chunks SET confidence_reason='restore', updated_at=? WHERE id=?",
1780 rusqlite::params![now, chunk_id],
1781 )?;
1782 self.storage.commit()
1783 })();
1784 if result.is_err() {
1785 let _ = self.storage.rollback();
1786 }
1787 result
1788 }
1789
1790 pub fn evolve(&self, trigger: &str) -> Result<Value> {
1795 if !matches!(trigger, "manual" | "scheduled" | "threshold") {
1796 return Err(InnateError::InvalidState(format!(
1797 "invalid evolve trigger: {trigger}"
1798 )));
1799 }
1800
1801 if trigger == "threshold" {
1803 let rows = self.storage.query_chunks(
1804 "SELECT COUNT(*) AS cnt FROM episodic_log WHERE distill_state='new'",
1805 )?;
1806 let cnt = rows
1807 .first()
1808 .and_then(|r| r.get("cnt"))
1809 .and_then(Value::as_i64)
1810 .unwrap_or(0);
1811 if cnt < self.evolve_threshold {
1812 return Ok(json!({"distilled": 0, "curate": null}));
1813 }
1814
1815 if let Some(limit) = self
1816 .storage
1817 .get_meta("max_distill_tokens_per_period")?
1818 .and_then(|value| value.parse::<i64>().ok())
1819 .filter(|value| *value > 0)
1820 {
1821 let rows = self.storage.query_chunks(
1822 "SELECT COALESCE(SUM(distill_prompt_tokens),0)
1823 + COALESCE(SUM(distill_completion_tokens),0) AS used
1824 FROM episodic_log",
1825 )?;
1826 let used = rows
1827 .first()
1828 .and_then(|row| row.get("used"))
1829 .and_then(Value::as_i64)
1830 .unwrap_or(0);
1831 if used >= limit {
1832 return Ok(json!({
1833 "distilled": 0,
1834 "curate": null,
1835 "skipped": "distill_token_limit",
1836 "distill_tokens_used": used,
1837 "distill_token_limit": limit,
1838 }));
1839 }
1840 }
1841 }
1842
1843 let distilled = self.distill_batch()?;
1844 let curator = Arc::clone(&self.curator);
1845 let curate = curator.run(self, &CurateScope::default())?;
1846
1847 Ok(json!({
1848 "distilled": distilled,
1849 "curate": {
1850 "archived": curate.archived.len(),
1851 "deduped": curate.deduped.len(),
1852 "decayed": curate.decayed.len(),
1853 "recovered": curate.recovered.len(),
1854 "orphans": curate.orphans.len(),
1855 "warnings": curate.warnings,
1856 }
1857 }))
1858 }
1859
1860 fn distill_batch(&self) -> Result<usize> {
1861 let run_id = gen_uuid();
1862 let now = utc_now_iso();
1863
1864 self.storage.begin_immediate()?;
1866 let logs = match self
1867 .storage
1868 .claim_distill_batch(&run_id, self.distill_batch_size, &now)
1869 {
1870 Ok(l) => {
1871 self.storage.commit()?;
1872 l
1873 }
1874 Err(e) => {
1875 let _ = self.storage.rollback();
1876 return Err(e);
1877 }
1878 };
1879
1880 let mut count = 0;
1881 for log in &logs {
1882 let log_id = log.get("id").and_then(Value::as_str).unwrap_or("");
1883 let prompt_tokens = estimate_distill_prompt_tokens(log);
1884 self.storage
1885 .update_episodic_log_tokens(log_id, prompt_tokens, 0)?;
1886 let chunks = match self.distiller.distill(std::slice::from_ref(log)) {
1887 Ok(chunks) => chunks,
1888 Err(error) => {
1889 let note = format!("distill_failed:{error}");
1890 self.storage.update_episodic_log_state_by_id(
1891 log_id,
1892 "failed",
1893 Some(¬e),
1894 None,
1895 )?;
1896 continue;
1897 }
1898 };
1899 let completion_tokens = chunks
1900 .iter()
1901 .map(estimate_distilled_chunk_tokens)
1902 .sum::<i64>();
1903 self.storage
1904 .update_episodic_log_tokens(log_id, prompt_tokens, completion_tokens)?;
1905 if chunks.is_empty() {
1906 let _ = self.storage.update_episodic_log_state_by_id(
1908 log_id,
1909 "discarded",
1910 Some("insufficient_material"),
1911 None,
1912 );
1913 continue;
1914 }
1915 let mut log_written = false;
1916 for dc in chunks {
1917 let (content, action) = self.sanitize_content(&dc.content);
1918 if action == SanitizeAction::Discard {
1919 let _ = self.storage.update_episodic_log_state_by_id(
1920 log_id,
1921 "discarded",
1922 Some("sanitize_discard"),
1923 None,
1924 );
1925 continue;
1926 }
1927 let h = content_hash(&content);
1928 if self.storage.is_hash_invalidated(&h)? {
1929 let _ = self.storage.update_episodic_log_state_by_id(
1930 log_id,
1931 "discarded",
1932 Some("invalidated_hash"),
1933 None,
1934 );
1935 continue;
1936 }
1937 let redacted = action == SanitizeAction::Redact;
1938 let conf = if redacted { 0.4 } else { 0.45 };
1939 let now2 = utc_now_iso();
1940 let chunk_id = gen_uuid();
1941 let tokens = estimate_tokens(&content) as i64;
1942 let row = ChunkRow {
1943 id: chunk_id.clone(),
1944 content: content.clone(),
1945 trigger_desc: dc.trigger_desc,
1946 anti_trigger_desc: dc.anti_trigger_desc,
1947 content_hash: h,
1948 token_count: Some(tokens),
1949 origin: "distilled".to_string(),
1950 distilled_from: Some(dc.source_log_id),
1951 state: "pending".to_string(),
1952 state_reason: Some("init:distilled".to_string()),
1953 confidence: conf,
1954 confidence_reason: Some("init:distilled".to_string()),
1955 version: 1,
1956 embed_version: 1,
1957 created_at: now2.clone(),
1958 updated_at: now2.clone(),
1959 ..Default::default()
1960 };
1961 let cvec = match self.embedding.embed_content(&content) {
1962 Ok(v) => v,
1963 Err(_) => {
1964 let _ = self.storage.update_episodic_log_state_by_id(
1966 log_id,
1967 "failed",
1968 Some("embedding_failed"),
1969 None,
1970 );
1971 continue; }
1973 };
1974 let tvec = match self
1975 .embedding
1976 .embed_trigger(row.trigger_desc.as_deref().unwrap_or(&content))
1977 {
1978 Ok(v) => v,
1979 Err(_) => {
1980 let _ = self.storage.update_episodic_log_state_by_id(
1981 log_id,
1982 "failed",
1983 Some("embedding_failed"),
1984 None,
1985 );
1986 continue;
1987 }
1988 };
1989 self.storage.begin_immediate()?;
1990 let r = (|| -> Result<()> {
1991 self.storage.insert_chunk(&row)?;
1992 self.storage
1993 .insert_vec_content(&chunk_id, &pack_embedding(&cvec))?;
1994 self.storage
1995 .insert_vec_trigger(&chunk_id, &pack_embedding(&tvec))?;
1996 self.storage.commit()
1997 })();
1998 if r.is_err() {
1999 let _ = self.storage.rollback();
2000 r?;
2001 }
2002 count += 1;
2003 log_written = true;
2004 }
2005 if log_written {
2006 let _ =
2007 self.storage
2008 .update_episodic_log_state_by_id(log_id, "distilled", None, None);
2009 }
2010 }
2011 Ok(count)
2012 }
2013
2014 pub(crate) fn builtin_curate_impl(&self, scope: &CurateScope) -> Result<CurateReport> {
2015 let mut report = CurateReport::default();
2016 let now_iso = utc_now_iso();
2017 if scope.dry_run {
2018 let archived_count: i64 = count_query(&self.storage,
2020 "SELECT COUNT(*) FROM chunks WHERE origin!='spark' AND protected=0 AND state='active'")?;
2021 report.stats.insert("dry_run".to_string(), json!(true));
2022 report
2023 .stats
2024 .insert("eligible_for_governance".to_string(), json!(archived_count));
2025 return Ok(report);
2026 }
2027
2028 self.storage.begin_immediate()?;
2030 let agg_result = (|| -> Result<()> {
2031 let last_ts = self
2032 .storage
2033 .get_meta("last_agg_ts")?
2034 .unwrap_or_else(|| "1970-01-01T00:00:00.000Z".to_string());
2035 let cutoff_ts = now_iso.clone();
2036
2037 self.storage.conn_execute(
2039 "INSERT OR IGNORE INTO chunk_success_traces(chunk_id, trace_id, ts)
2040 SELECT ut.chunk_id, ut.trace_id, MAX(ut.ts)
2041 FROM usage_trace ut
2042 WHERE ut.event = 'used'
2043 AND ut.chunk_id IS NOT NULL
2044 AND ut.ts >= ? AND ut.ts < ?
2045 AND (
2046 EXISTS (SELECT 1 FROM usage_trace ok
2047 WHERE ok.trace_id = ut.trace_id
2048 AND ok.event = 'task_ok' AND ok.chunk_id IS NULL)
2049 OR EXISTS (SELECT 1 FROM episodic_log el
2050 WHERE el.trace_id = ut.trace_id AND el.outcome = 'ok')
2051 )
2052 GROUP BY ut.chunk_id, ut.trace_id",
2053 rusqlite::params![last_ts, cutoff_ts],
2054 )?;
2055
2056 self.storage.conn_execute(
2058 "UPDATE chunks SET
2059 used_success_count = (SELECT COUNT(*) FROM chunk_success_traces WHERE chunk_id = chunks.id),
2060 success_trace_ids_count = (SELECT COUNT(*) FROM chunk_success_traces WHERE chunk_id = chunks.id),
2061 last_success_at = (SELECT MAX(ts) FROM chunk_success_traces WHERE chunk_id = chunks.id)
2062 WHERE id IN (SELECT DISTINCT chunk_id FROM chunk_success_traces)",
2063 rusqlite::params![],
2064 )?;
2065
2066 self.storage.conn_execute(
2068 "UPDATE chunks SET
2069 selected_count = selected_count + COALESCE(
2070 (SELECT COUNT(*) FROM usage_trace
2071 WHERE chunk_id = chunks.id AND event = 'selected'
2072 AND ts >= ? AND ts < ?), 0),
2073 used_count = used_count + COALESCE(
2074 (SELECT COUNT(*) FROM usage_trace
2075 WHERE chunk_id = chunks.id AND event = 'used'
2076 AND ts >= ? AND ts < ?), 0)
2077 WHERE id IN (SELECT DISTINCT chunk_id FROM usage_trace
2078 WHERE ts >= ? AND ts < ?)",
2079 rusqlite::params![last_ts, cutoff_ts, last_ts, cutoff_ts, last_ts, cutoff_ts],
2080 )?;
2081
2082 self.storage.set_meta("last_agg_ts", &cutoff_ts)?;
2084 self.storage.purge_usage_trace(&cutoff_ts)?;
2085 self.storage.commit()
2086 })();
2087 if agg_result.is_err() {
2088 let _ = self.storage.rollback();
2089 agg_result?;
2090 }
2091
2092 self.storage.begin_immediate()?;
2094 let recover_result = (|| -> Result<()> {
2095 let screening_cutoff = minutes_ago(&now_iso, self.screening_timeout_minutes);
2097 let stale = self.storage.query_chunks_params(
2098 "SELECT id, distill_run_id FROM episodic_log
2099 WHERE distill_state='screening' AND distill_locked_at < ?",
2100 rusqlite::params![screening_cutoff],
2101 )?;
2102 for row in &stale {
2103 let id = row.get("id").and_then(Value::as_str).unwrap_or("");
2104 let run_id = row
2105 .get("distill_run_id")
2106 .and_then(Value::as_str)
2107 .unwrap_or("unknown");
2108 let note = format!("screening_timeout:{run_id}");
2109 self.storage.conn_execute(
2110 "UPDATE episodic_log
2111 SET distill_state='failed', distill_note=?,
2112 distill_run_id=NULL, distill_locked_at=NULL
2113 WHERE id=?",
2114 rusqlite::params![note, id],
2115 )?;
2116 report.recovered.push(id.to_string());
2117 report
2118 .warnings
2119 .push(format!("stale screening recovered as failed: {id}"));
2120 }
2121
2122 let open_ttl_cutoff = days_ago(&now_iso, self.open_ttl_days);
2124 self.storage.conn_execute(
2125 "UPDATE episodic_log
2126 SET distill_state='discarded', distill_note='no_record_timeout'
2127 WHERE distill_state='open' AND ts < ?",
2128 rusqlite::params![open_ttl_cutoff],
2129 )?;
2130 self.storage.commit()
2131 })();
2132 if recover_result.is_err() {
2133 let _ = self.storage.rollback();
2134 recover_result?;
2135 }
2136
2137 let scope_origin = scope.origin.clone();
2139 let scope_skill = scope.skill_name.clone();
2140 self.storage.begin_immediate()?;
2141 let gov_result = (|| -> Result<()> {
2142 let low_conf_cutoff = days_ago(&now_iso, self.low_conf_idle_days);
2144 let low_conf = self.storage.query_chunks_params(
2145 "SELECT id FROM chunks
2146 WHERE origin!='spark' AND protected=0 AND state='active'
2147 AND last_used_at IS NOT NULL
2148 AND confidence < ?
2149 AND last_used_at < ?
2150 AND (? IS NULL OR origin=?)
2151 AND (? IS NULL OR skill_name=?)",
2152 rusqlite::params![
2153 self.low_conf_threshold,
2154 low_conf_cutoff,
2155 scope_origin,
2156 scope_origin,
2157 scope_skill,
2158 scope_skill
2159 ],
2160 )?;
2161 for c in &low_conf {
2162 if let Some(id) = c.get("id").and_then(Value::as_str) {
2163 self.storage.update_chunk_state(
2164 id,
2165 "archived",
2166 Some("low_confidence"),
2167 &now_iso,
2168 )?;
2169 report.archived.push(id.to_string());
2170 }
2171 }
2172
2173 let rep_sel = self.storage.query_chunks_params(
2175 "SELECT id FROM chunks
2176 WHERE origin!='spark' AND protected=0 AND state='active'
2177 AND selected_count >= ? AND used_count = 0 AND confidence < ?
2178 AND (? IS NULL OR origin=?)
2179 AND (? IS NULL OR skill_name=?)",
2180 rusqlite::params![
2181 self.repeat_select_min,
2182 self.repeat_select_conf_max,
2183 scope_origin,
2184 scope_origin,
2185 scope_skill,
2186 scope_skill
2187 ],
2188 )?;
2189 for c in &rep_sel {
2190 if let Some(id) = c.get("id").and_then(Value::as_str) {
2191 if !report.archived.contains(&id.to_string()) {
2192 self.storage.update_chunk_state(
2193 id,
2194 "archived",
2195 Some("repeated_selected_unused"),
2196 &now_iso,
2197 )?;
2198 report.archived.push(id.to_string());
2199 }
2200 }
2201 }
2202
2203 let never_used_cutoff = days_ago(&now_iso, self.never_used_age_days);
2205 let never_used = self.storage.query_chunks_params(
2206 "SELECT id FROM chunks
2207 WHERE origin!='spark' AND protected=0 AND state='active'
2208 AND used_count = 0 AND selected_count = 0
2209 AND created_at < ?
2210 AND (? IS NULL OR origin=?)
2211 AND (? IS NULL OR skill_name=?)",
2212 rusqlite::params![
2213 never_used_cutoff,
2214 scope_origin,
2215 scope_origin,
2216 scope_skill,
2217 scope_skill
2218 ],
2219 )?;
2220 for c in &never_used {
2221 if let Some(id) = c.get("id").and_then(Value::as_str) {
2222 if !report.archived.contains(&id.to_string()) {
2223 self.storage.update_chunk_state(
2224 id,
2225 "archived",
2226 Some("never_used"),
2227 &now_iso,
2228 )?;
2229 report.archived.push(id.to_string());
2230 }
2231 }
2232 }
2233
2234 let dupes = self.storage.query_chunks_params(
2236 "SELECT content_hash FROM chunks
2237 WHERE origin!='spark' AND state IN ('active','pending')
2238 AND (? IS NULL OR origin=?)
2239 AND (? IS NULL OR skill_name=?)
2240 GROUP BY content_hash HAVING COUNT(*) > 1",
2241 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
2242 )?;
2243 for d in &dupes {
2244 if let Some(h) = d.get("content_hash").and_then(Value::as_str) {
2245 let group = self.storage.query_chunks_params(
2246 "SELECT id, confidence, protected FROM chunks
2247 WHERE content_hash=? AND origin!='spark' AND state IN ('active','pending')
2248 AND (? IS NULL OR origin=?)
2249 AND (? IS NULL OR skill_name=?)
2250 ORDER BY protected DESC, confidence DESC",
2251 rusqlite::params![h, scope_origin, scope_origin, scope_skill, scope_skill],
2252 )?;
2253 let canonical_id = group
2254 .first()
2255 .and_then(|row| row.get("id"))
2256 .and_then(Value::as_str)
2257 .unwrap_or("");
2258 for row in group.iter().skip(1) {
2259 let id = row.get("id").and_then(Value::as_str).unwrap_or("");
2260 let reason = format!("duplicate:{canonical_id}");
2261 self.storage
2262 .update_chunk_state(id, "archived", Some(&reason), &now_iso)?;
2263 self.storage.conn_execute(
2264 "UPDATE chunks SET parent_id=?, updated_at=? WHERE id=?",
2265 rusqlite::params![canonical_id, now_iso, id],
2266 )?;
2267 report.deduped.push(id.to_string());
2268 }
2269 }
2270 }
2271
2272 let decay_candidates = self.storage.query_chunks_params(
2274 "SELECT id, confidence, last_used_at FROM chunks
2275 WHERE origin!='spark' AND protected=0 AND state='active'
2276 AND last_used_at IS NOT NULL
2277 AND (? IS NULL OR origin=?)
2278 AND (? IS NULL OR skill_name=?)",
2279 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
2280 )?;
2281 for c in &decay_candidates {
2282 let id = match c.get("id").and_then(Value::as_str) {
2283 Some(v) => v,
2284 None => continue,
2285 };
2286 let conf = c.get("confidence").and_then(Value::as_f64).unwrap_or(0.5);
2287 let last_used = c
2288 .get("last_used_at")
2289 .and_then(Value::as_str)
2290 .unwrap_or(&now_iso);
2291 let days_idle = iso_days_diff(&now_iso, last_used);
2292 if days_idle <= 0 {
2293 continue;
2294 }
2295 let floor = 0.3_f64;
2296 let new_conf = floor + (conf - floor) * 0.5_f64.powf(days_idle as f64 / 90.0);
2297 let new_conf = new_conf.clamp(floor, 1.0);
2298 if (new_conf - conf).abs() > 0.001 {
2299 let note = format!("decay:{days_idle}d");
2300 self.storage
2301 .update_chunk_confidence(id, new_conf, Some(¬e), &now_iso)?;
2302 report.decayed.push(id.to_string());
2303 }
2304 }
2305
2306 let promotable = self.storage.query_chunks_params(
2308 "SELECT id FROM chunks
2309 WHERE state='pending' AND origin!='spark'
2310 AND used_success_count >= ?
2311 AND success_trace_ids_count >= 2
2312 AND confidence >= ?
2313 AND (? IS NULL OR origin=?)
2314 AND (? IS NULL OR skill_name=?)",
2315 rusqlite::params![
2316 self.promote_used_success_min,
2317 self.promote_confidence_min,
2318 scope_origin,
2319 scope_origin,
2320 scope_skill,
2321 scope_skill
2322 ],
2323 )?;
2324 for c in &promotable {
2325 if let Some(id) = c.get("id").and_then(Value::as_str) {
2326 self.storage.update_chunk_state(
2327 id,
2328 "active",
2329 Some("repeated_success"),
2330 &now_iso,
2331 )?;
2332 }
2333 }
2334
2335 let all_deps = self
2337 .storage
2338 .query_chunks("SELECT src, dst FROM deps WHERE kind='hard'")?;
2339 let cycles = detect_cycles(&all_deps);
2340 report.cycles = cycles;
2341 let orphan_rows = self.storage.query_chunks_params(
2342 "SELECT d.src, d.dst, s.id AS src_exists, t.id AS dst_exists
2343 FROM deps d
2344 LEFT JOIN chunks s ON s.id=d.src
2345 LEFT JOIN chunks t ON t.id=d.dst
2346 WHERE d.kind='hard'
2347 AND (? IS NULL OR s.origin=?)
2348 AND (? IS NULL OR s.skill_name=?)",
2349 rusqlite::params![scope_origin, scope_origin, scope_skill, scope_skill],
2350 )?;
2351 let mut orphans = HashSet::new();
2352 for row in orphan_rows {
2353 if row.get("src_exists").is_none_or(Value::is_null) {
2354 if let Some(id) = row.get("src").and_then(Value::as_str) {
2355 orphans.insert(id.to_string());
2356 }
2357 }
2358 if row.get("dst_exists").is_none_or(Value::is_null) {
2359 if let Some(id) = row.get("dst").and_then(Value::as_str) {
2360 orphans.insert(id.to_string());
2361 }
2362 }
2363 }
2364 report.orphans = orphans.into_iter().collect();
2365 report.orphans.sort();
2366
2367 self.storage.commit()
2368 })();
2369 if gov_result.is_err() {
2370 let _ = self.storage.rollback();
2371 gov_result?;
2372 }
2373
2374 self.storage.begin_immediate()?;
2376 let purge_cutoff = days_ago(&now_iso, 30);
2377 let purge_result = self
2378 .storage
2379 .conn_execute(
2380 "DELETE FROM episodic_log
2381 WHERE distill_state IN ('distilled','discarded','failed')
2382 AND ts < ?",
2383 rusqlite::params![purge_cutoff],
2384 )
2385 .and_then(|_| self.storage.commit());
2386 if purge_result.is_err() {
2387 let _ = self.storage.rollback();
2388 purge_result?;
2389 }
2390
2391 Ok(report)
2392 }
2393
2394 pub fn inspect(&self) -> Result<Value> {
2399 let total: i64 = count_query(
2400 &self.storage,
2401 "SELECT COUNT(*) FROM chunks WHERE origin!='spark'",
2402 )?;
2403 let active: i64 = count_query(
2404 &self.storage,
2405 "SELECT COUNT(*) FROM chunks WHERE state='active' AND origin!='spark'",
2406 )?;
2407 let pending: i64 = count_query(
2408 &self.storage,
2409 "SELECT COUNT(*) FROM chunks WHERE state='pending' AND origin!='spark'",
2410 )?;
2411 let archived: i64 = count_query(
2412 &self.storage,
2413 "SELECT COUNT(*) FROM chunks WHERE state='archived' AND origin!='spark'",
2414 )?;
2415 let sparks: i64 = count_query(
2416 &self.storage,
2417 "SELECT COUNT(*) FROM chunks WHERE origin='spark' AND state!='archived'",
2418 )?;
2419 let open_logs: i64 = count_query(
2420 &self.storage,
2421 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='open'",
2422 )?;
2423 let new_logs: i64 = count_query(
2424 &self.storage,
2425 "SELECT COUNT(*) FROM episodic_log WHERE distill_state='new'",
2426 )?;
2427 let embed_rebuild: i64 = count_query(&self.storage,
2428 "SELECT COUNT(*) FROM chunks WHERE embed_version=0 OR embed_version < (SELECT COALESCE(CAST(value AS INTEGER),1) FROM meta WHERE key='embed_version')")?;
2429 let schema_version = self.storage.get_meta_or("schema_version", "?");
2430 let lib_id = self.storage.get_meta_or("lib_id", "?");
2431 let last_agg = self.storage.get_meta_or("last_agg_ts", "never");
2432
2433 let zombie: i64 = count_query(
2436 &self.storage,
2437 "SELECT COUNT(*) FROM chunks
2438 WHERE origin!='spark' AND state='active'
2439 AND confidence >= 0.4 AND confidence <= 0.6
2440 AND created_at < datetime('now','-7 days')",
2441 )?;
2442 let debt_numerator = pending + zombie;
2443 let debt_denominator = active.max(1);
2444 let debt_ratio = debt_numerator as f64 / debt_denominator as f64;
2445
2446 let screening_cutoff = minutes_ago(&utc_now_iso(), self.screening_timeout_minutes);
2448 let stale_screening: i64 = count_query_params(
2449 &self.storage,
2450 "SELECT COUNT(*) FROM episodic_log
2451 WHERE distill_state='screening' AND distill_locked_at < ?",
2452 rusqlite::params![screening_cutoff],
2453 )?;
2454
2455 let distill_cost = self.storage.query_chunks(
2457 "SELECT COALESCE(SUM(distill_prompt_tokens),0) AS pt,
2458 COALESCE(SUM(distill_completion_tokens),0) AS ct
2459 FROM episodic_log",
2460 )?;
2461 let prompt_tokens = distill_cost
2462 .first()
2463 .and_then(|r| r.get("pt"))
2464 .and_then(Value::as_i64)
2465 .unwrap_or(0);
2466 let completion_tokens = distill_cost
2467 .first()
2468 .and_then(|r| r.get("ct"))
2469 .and_then(Value::as_i64)
2470 .unwrap_or(0);
2471
2472 let spark_threshold: i64 = self
2474 .storage
2475 .get_meta("curate.soft_mature_threshold")
2476 .ok()
2477 .flatten()
2478 .and_then(|v| v.parse::<i64>().ok())
2479 .unwrap_or(5);
2480 let recurring_sparks = self.storage.query_chunks_params(
2481 "SELECT ut.chunk_id, COUNT(*) AS cnt,
2482 c.content, c.trigger_desc, c.maturity
2483 FROM usage_trace ut
2484 JOIN chunks c ON c.id = ut.chunk_id
2485 WHERE ut.event='retrieved'
2486 AND c.origin='spark'
2487 GROUP BY ut.chunk_id HAVING cnt >= ?",
2488 rusqlite::params![spark_threshold],
2489 )?;
2490 let recurring_spark_ids: Vec<Value> = recurring_sparks
2491 .iter()
2492 .map(|r| {
2493 json!({
2494 "id": r.get("chunk_id").and_then(Value::as_str).unwrap_or(""),
2495 "retrieved_count": r.get("cnt").and_then(Value::as_i64).unwrap_or(0),
2496 "maturity": r.get("maturity").and_then(Value::as_str).unwrap_or(""),
2497 "content_preview": r.get("content").and_then(Value::as_str).unwrap_or("")
2498 .chars().take(80).collect::<String>(),
2499 })
2500 })
2501 .collect();
2502
2503 let mut suggestions: Vec<Value> = Vec::new();
2504 if embed_rebuild > 0 {
2505 suggestions.push(json!({"action": "innate evolve --rebuild-embeddings", "reason": format!("{embed_rebuild} chunk(s) missing embeddings")}));
2506 }
2507 if new_logs > 0 {
2508 suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{new_logs} episodic log(s) ready to distill")}));
2509 }
2510 if pending > 0 {
2511 suggestions.push(json!({"action": "innate approve <id> # or innate archive <id>", "reason": format!("{pending} pending chunk(s) awaiting review")}));
2512 }
2513 if !recurring_spark_ids.is_empty() {
2514 suggestions.push(json!({"action": "innate promote-spark <id> --to note", "reason": format!("{} spark(s) recalled ≥{spark_threshold}× — consider promoting", recurring_spark_ids.len())}));
2515 }
2516 if stale_screening > 0 {
2517 suggestions.push(json!({"action": "innate evolve --trigger manual", "reason": format!("{stale_screening} episodic log(s) stuck in screening")}));
2518 }
2519
2520 Ok(json!({
2521 "schema_version": schema_version,
2522 "lib_id": lib_id,
2523 "last_agg_ts": last_agg,
2524 "chunks": {"total": total, "active": active, "pending": pending, "archived": archived},
2525 "sparks": sparks,
2526 "episodic_log": {"open": open_logs, "new": new_logs},
2527 "embed_rebuild_queue": embed_rebuild,
2528 "knowledge_debt_ratio": (debt_ratio * 100.0).round() / 100.0,
2529 "stale_screening_count": stale_screening,
2530 "distill_cost_estimate": {"prompt_tokens": prompt_tokens, "completion_tokens": completion_tokens},
2531 "recurring_sparks": recurring_sparks.len(),
2532 "recurring_spark_ids": recurring_spark_ids,
2533 "params": {
2534 "recall.w_content": self.w_content,
2535 "recall.w_trigger": self.w_trigger,
2536 "recall.top_k_candidates": self.top_k_candidates,
2537 "curate.low_conf_threshold": self.low_conf_threshold,
2538 "curate.low_conf_idle_days": self.low_conf_idle_days,
2539 "curate.repeat_select_min": self.repeat_select_min,
2540 "curate.never_used_age_days": self.never_used_age_days,
2541 "curate.promote_used_success_min": self.promote_used_success_min,
2542 "curate.promote_confidence_min": self.promote_confidence_min,
2543 "curate.screening_timeout_minutes": self.screening_timeout_minutes,
2544 "curate.open_ttl_days": self.open_ttl_days,
2545 },
2546 "suggestions": suggestions
2547 }))
2548 }
2549
2550 pub fn rebuild_embeddings(&self) -> Result<usize> {
2555 let meta_version = self
2556 .storage
2557 .get_meta("embed_version")?
2558 .and_then(|v| v.parse::<i64>().ok())
2559 .unwrap_or(1);
2560 let stale = self.storage.query_chunks_params(
2562 "SELECT id, content, trigger_desc, state_reason FROM chunks
2563 WHERE embed_version = 0 OR embed_version < ?",
2564 rusqlite::params![meta_version],
2565 )?;
2566 let mut count = 0;
2567 for row in &stale {
2568 let id = match row.get("id").and_then(Value::as_str) {
2569 Some(v) => v,
2570 None => continue,
2571 };
2572 let content = row.get("content").and_then(Value::as_str).unwrap_or("");
2573 let trigger = row
2574 .get("trigger_desc")
2575 .and_then(Value::as_str)
2576 .unwrap_or(content);
2577 let state_reason = row
2578 .get("state_reason")
2579 .and_then(Value::as_str)
2580 .unwrap_or("");
2581
2582 let cvec = match self.embedding.embed_content(content) {
2583 Ok(v) => v,
2584 Err(_) => continue,
2585 };
2586 let tvec = match self.embedding.embed_trigger(trigger) {
2587 Ok(v) => v,
2588 Err(_) => continue,
2589 };
2590
2591 self.storage.begin_immediate()?;
2592 let r = (|| -> Result<()> {
2593 self.storage
2594 .insert_vec_content(id, &pack_embedding(&cvec))?;
2595 self.storage
2596 .insert_vec_trigger(id, &pack_embedding(&tvec))?;
2597 let new_reason = if state_reason.starts_with("embedding_pending:target=") {
2599 let target_state = state_reason.trim_start_matches("embedding_pending:target=");
2600 let now = utc_now_iso();
2601 self.storage.update_chunk_state(
2602 id,
2603 target_state,
2604 Some("embedding_rebuilt"),
2605 &now,
2606 )?;
2607 "embedding_rebuilt".to_string()
2608 } else {
2609 "embedding_rebuilt".to_string()
2610 };
2611 let now = utc_now_iso();
2612 self.storage.conn_execute(
2613 "UPDATE chunks SET embed_version=?, state_reason=?, updated_at=? WHERE id=?",
2614 rusqlite::params![meta_version, new_reason, now, id],
2615 )?;
2616 self.storage.commit()
2617 })();
2618 if r.is_err() {
2619 let _ = self.storage.rollback();
2620 } else {
2621 count += 1;
2622 }
2623 }
2624 Ok(count)
2625 }
2626
2627 pub fn inspect_id(&self, id: &str) -> Result<Value> {
2632 if let Some(chunk) = self.storage.get_chunk(id)? {
2634 let traces = self.storage.query_chunks_params(
2635 "SELECT * FROM usage_trace WHERE chunk_id=? ORDER BY ts DESC LIMIT 20",
2636 rusqlite::params![id],
2637 )?;
2638 let derived = self.storage.query_chunks_params(
2639 "SELECT id, state, confidence FROM chunks WHERE distilled_from IN (
2640 SELECT id FROM episodic_log WHERE trace_id IN (
2641 SELECT trace_id FROM usage_trace WHERE chunk_id=?
2642 )
2643 ) LIMIT 10",
2644 rusqlite::params![id],
2645 )?;
2646 return Ok(json!({
2647 "kind": "chunk",
2648 "chunk": chunk,
2649 "recent_traces": traces,
2650 "derived_chunks": derived,
2651 }));
2652 }
2653 if let Some(log) = self.storage.get_episodic_log(id)? {
2655 let traces = self.storage.query_chunks_params(
2656 "SELECT * FROM usage_trace WHERE trace_id=? ORDER BY ts ASC",
2657 rusqlite::params![id],
2658 )?;
2659 return Ok(json!({
2660 "kind": "trace",
2661 "episodic_log": log,
2662 "usage_traces": traces,
2663 }));
2664 }
2665 Err(InnateError::ChunkNotFound(id.to_string()))
2666 }
2667
2668 fn sanitize_content(&self, content: &str) -> (String, SanitizeAction) {
2673 self.sanitizer.sanitize(content)
2674 }
2675}
2676
2677struct CandidateInfo {
2682 chunk: Value,
2683 sim_content: f32,
2684 sim_trigger: f32,
2685}
2686
2687fn chunk_is_valid_for_recall(chunk: &Value, embed_version: i64) -> bool {
2688 chunk.get("state").and_then(Value::as_str) != Some("archived")
2689 && chunk.get("origin").and_then(Value::as_str) != Some("spark")
2690 && chunk
2691 .get("embed_version")
2692 .and_then(Value::as_i64)
2693 .unwrap_or(1)
2694 >= embed_version
2695}
2696
2697fn estimate_distill_prompt_tokens(log: &Value) -> i64 {
2698 [
2699 "query",
2700 "recall_snapshot",
2701 "output",
2702 "output_summary",
2703 "nomination",
2704 ]
2705 .iter()
2706 .filter_map(|key| log.get(*key).and_then(Value::as_str))
2707 .map(|text| estimate_tokens(text) as i64)
2708 .sum()
2709}
2710
2711fn estimate_distilled_chunk_tokens(chunk: &DistilledChunk) -> i64 {
2712 estimate_tokens(&chunk.content) as i64
2713 + chunk
2714 .trigger_desc
2715 .as_deref()
2716 .map(estimate_tokens)
2717 .unwrap_or(0) as i64
2718 + chunk
2719 .anti_trigger_desc
2720 .as_deref()
2721 .map(estimate_tokens)
2722 .unwrap_or(0) as i64
2723}
2724
2725fn anti_trigger_hit(query: &str, anti: &str) -> bool {
2726 let q_lower = query.to_lowercase();
2727 anti.to_lowercase().split(',').any(|part| {
2728 let p = part.trim();
2729 !p.is_empty() && q_lower.contains(p)
2730 })
2731}
2732
2733fn block_cost(block: &[Value]) -> usize {
2734 block
2735 .iter()
2736 .map(|b| {
2737 b.get("token_count")
2738 .and_then(Value::as_u64)
2739 .map(|t| t as usize)
2740 .unwrap_or_else(|| {
2741 estimate_tokens(b.get("content").and_then(Value::as_str).unwrap_or("")).max(100)
2742 })
2743 })
2744 .sum()
2745}
2746
2747fn limit_knowledge(knowledge: Vec<Value>, top: Option<usize>) -> Vec<Value> {
2748 match top {
2749 None => knowledge,
2750 Some(0) => vec![],
2751 Some(n) => knowledge.into_iter().take(n).collect(),
2752 }
2753}
2754
2755fn validate_source(source: &str) -> Result<()> {
2756 if !matches!(source, "sdk" | "cli" | "hook" | "daemon" | "augmented") {
2757 return Err(InnateError::InvalidState(format!(
2758 "invalid event source: {source}"
2759 )));
2760 }
2761 Ok(())
2762}
2763
2764fn count_query(storage: &Storage, sql: &str) -> Result<i64> {
2765 Ok(storage
2766 .query_chunks(sql)?
2767 .first()
2768 .and_then(|r| r.as_object())
2769 .and_then(|m| m.values().next())
2770 .and_then(Value::as_i64)
2771 .unwrap_or(0))
2772}
2773
2774fn count_query_params<P: rusqlite::Params>(storage: &Storage, sql: &str, p: P) -> Result<i64> {
2775 Ok(storage
2776 .query_chunks_params(sql, p)?
2777 .first()
2778 .and_then(|r| r.as_object())
2779 .and_then(|m| m.values().next())
2780 .and_then(Value::as_i64)
2781 .unwrap_or(0))
2782}
2783
2784fn days_ago(now_iso: &str, days: i64) -> String {
2785 use chrono::{DateTime, Duration, Utc};
2786 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
2787 let cutoff = t - Duration::days(days);
2788 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
2789 }
2790 now_iso.to_string()
2791}
2792
2793fn minutes_ago(now_iso: &str, minutes: i64) -> String {
2794 use chrono::{DateTime, Duration, Utc};
2795 if let Ok(t) = now_iso.parse::<DateTime<Utc>>() {
2796 let cutoff = t - Duration::minutes(minutes);
2797 return cutoff.format("%Y-%m-%dT%H:%M:%S%.3fZ").to_string();
2798 }
2799 now_iso.to_string()
2800}
2801
2802fn iso_days_diff(now_iso: &str, past_iso: &str) -> i64 {
2804 use chrono::{DateTime, Utc};
2805 let parse = |s: &str| s.parse::<DateTime<Utc>>().ok();
2806 if let (Some(a), Some(b)) = (parse(now_iso), parse(past_iso)) {
2807 let diff = a - b;
2808 diff.num_days().max(0)
2809 } else {
2810 0
2811 }
2812}
2813
2814fn detect_cycles(deps: &[Value]) -> Vec<Vec<String>> {
2816 use std::collections::HashMap;
2817 let mut adj: HashMap<String, Vec<String>> = HashMap::new();
2818 for d in deps {
2819 let src = d
2820 .get("src")
2821 .and_then(Value::as_str)
2822 .unwrap_or("")
2823 .to_string();
2824 let dst = d
2825 .get("dst")
2826 .and_then(Value::as_str)
2827 .unwrap_or("")
2828 .to_string();
2829 if !src.is_empty() && !dst.is_empty() {
2830 adj.entry(src).or_default().push(dst);
2831 }
2832 }
2833 let nodes: Vec<String> = adj.keys().cloned().collect();
2834 let mut visited: HashSet<String> = HashSet::new();
2835 let mut on_stack: HashSet<String> = HashSet::new();
2836 let mut cycles: Vec<Vec<String>> = vec![];
2837
2838 fn dfs(
2839 node: &str,
2840 adj: &HashMap<String, Vec<String>>,
2841 visited: &mut HashSet<String>,
2842 on_stack: &mut HashSet<String>,
2843 path: &mut Vec<String>,
2844 cycles: &mut Vec<Vec<String>>,
2845 ) {
2846 if on_stack.contains(node) {
2847 let start = path.iter().position(|n| n == node).unwrap_or(0);
2849 cycles.push(path[start..].to_vec());
2850 return;
2851 }
2852 if visited.contains(node) {
2853 return;
2854 }
2855 visited.insert(node.to_string());
2856 on_stack.insert(node.to_string());
2857 path.push(node.to_string());
2858 if let Some(children) = adj.get(node) {
2859 for child in children {
2860 dfs(child, adj, visited, on_stack, path, cycles);
2861 }
2862 }
2863 path.pop();
2864 on_stack.remove(node);
2865 }
2866
2867 for node in nodes {
2868 let mut path = vec![];
2869 dfs(
2870 &node,
2871 &adj,
2872 &mut visited,
2873 &mut on_stack,
2874 &mut path,
2875 &mut cycles,
2876 );
2877 }
2878 cycles
2879}