1use super::*;
2
3#[derive(Debug, Clone, Default)]
9pub struct RecallParams<'a> {
10 pub query: &'a str,
11 pub budget: usize,
12 pub trace: bool,
13 pub include_sparks: bool,
14 pub top: Option<usize>,
15 pub source: &'a str,
16 pub expand_deps: &'a str, pub allow_trim: bool, pub refine_mode: &'a str, pub min_score: Option<f64>,
24 pub session_only: bool,
31 pub rerank: bool,
36}
37
38impl KnowledgeBase {
39 pub fn recall(&self, params: RecallParams<'_>) -> Result<RecallResult> {
40 let RecallParams {
41 query,
42 budget,
43 trace,
44 include_sparks,
45 top,
46 source,
47 expand_deps,
48 allow_trim,
49 refine_mode,
50 min_score,
51 session_only,
52 rerank,
53 } = params;
54 let expand_deps = if expand_deps.is_empty() {
55 "false"
56 } else {
57 expand_deps
58 };
59 let refine_mode = if refine_mode.is_empty() {
60 "off"
61 } else {
62 refine_mode
63 };
64 validate_source(source)?;
65 let trace_id = gen_uuid();
66 let now = utc_now_iso();
67
68 let situation = Situation::from_query(query);
72 let context_key = situation.context_key(&self.situation_coarse_keys);
73
74 let embed_query = if self.embed_situation_signature {
78 let sig = situation.coarse_signature(&self.situation_coarse_keys);
79 if signature_has_signal(&sig) {
80 format!("{sig}\n{query}")
81 } else {
82 query.to_string()
83 }
84 } else {
85 query.to_string()
86 };
87
88 let (q_content, q_trigger) = self
89 .embedding
90 .embed_both(&embed_query)
91 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
92
93 let mut candidates = self.ann_candidates(&q_content, &q_trigger, query)?;
95 self.apply_soft_dep_bonus(&mut candidates)?;
96
97 let mut scored = self.score_candidates(candidates, query, &context_key, &now)?;
99
100 if rerank {
103 self.apply_rerank(query, &mut scored);
104 }
105
106 if let Some(min) = min_score {
109 scored.retain(|(fused, _)| *fused >= min);
110 }
111
112 let (selected, skipped, skipped_reasons) =
114 self.pack(&scored, budget, expand_deps, allow_trim, query)?;
115
116 let depth_skipped: Vec<String> = skipped_reasons
117 .iter()
118 .filter(|(_, r)| r.as_str() == "dep_depth_limit")
119 .map(|(id, _)| id.clone())
120 .collect();
121
122 let mut selected = selected;
124 if self.density_refill {
125 selected = self.density_refill(selected, &skipped, budget);
126 }
127
128 let limited = limit_knowledge(selected, top);
129 let visible = if refine_mode == "adapt" {
130 self.refiner
131 .refine(limited.clone(), Some(budget))
132 .unwrap_or(limited)
133 } else {
134 limited
135 };
136
137 let sparks = if include_sparks {
139 self.recall_sparks(&q_content, &q_trigger)?
140 } else {
141 vec![]
142 };
143
144 if trace {
145 self.write_recall_trace(
146 &trace_id,
147 query,
148 &context_key,
149 &scored,
150 &visible,
151 &sparks,
152 &depth_skipped,
153 &skipped_reasons,
154 refine_mode,
155 source,
156 &now,
157 session_only,
158 )?;
159 }
160
161 let empty = visible.is_empty() && sparks.is_empty();
162 Ok(RecallResult {
163 knowledge: visible,
164 sparks,
165 trace_id,
166 empty,
167 depth_skipped,
168 skipped_reasons,
169 })
170 }
171
172 pub(super) fn ann_candidates(
173 &self,
174 q_content: &[f32],
175 q_trigger: &[f32],
176 query: &str,
177 ) -> Result<HashMap<String, CandidateInfo>> {
178 let embed_version = self
179 .storage
180 .get_meta("embed_version")?
181 .and_then(|v| v.parse::<i64>().ok())
182 .unwrap_or(1);
183
184 let content_res = self
185 .storage
186 .search_vec_content(q_content, self.top_k_candidates * 2)?;
187 let trigger_res = self
188 .storage
189 .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
190 let lexical_res = self
194 .storage
195 .search_lexical(query, self.top_k_candidates * 2)?;
196
197 let all_ids: Vec<&str> = {
199 let mut seen = HashSet::new();
200 content_res
201 .iter()
202 .chain(trigger_res.iter())
203 .chain(lexical_res.iter())
204 .map(|(id, _)| id.as_str())
205 .filter(|id| seen.insert(*id))
206 .collect()
207 };
208 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
209
210 let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
211 for (cid, sim) in &content_res {
212 if let Some(chunk) = chunks.get(cid) {
213 if chunk_is_valid_for_recall(chunk, embed_version) {
214 let e = candidates.entry(cid.clone()).or_insert_with(|| new_candidate(chunk));
215 e.sim_content = e.sim_content.max(*sim);
216 }
217 }
218 }
219 for (cid, sim) in &trigger_res {
220 if let Some(chunk) = chunks.get(cid) {
221 if chunk_is_valid_for_recall(chunk, embed_version) {
222 let e = candidates.entry(cid.clone()).or_insert_with(|| new_candidate(chunk));
223 e.sim_trigger = e.sim_trigger.max(*sim);
224 }
225 }
226 }
227 for (cid, sim) in &lexical_res {
228 if let Some(chunk) = chunks.get(cid) {
229 if chunk_is_valid_for_recall(chunk, embed_version) {
230 let e = candidates.entry(cid.clone()).or_insert_with(|| new_candidate(chunk));
231 e.sim_lexical = e.sim_lexical.max(*sim);
232 }
233 }
234 }
235 Ok(candidates)
236 }
237
238 pub(super) fn apply_soft_dep_bonus(
239 &self,
240 candidates: &mut HashMap<String, CandidateInfo>,
241 ) -> Result<()> {
242 let src_ids: Vec<String> = candidates
245 .iter()
246 .filter(|(_, info)| info.chunk.get("origin").and_then(Value::as_str) != Some("spark"))
247 .map(|(cid, _)| cid.clone())
248 .collect();
249 if src_ids.is_empty() {
250 return Ok(());
251 }
252 let src_refs: Vec<&str> = src_ids.iter().map(String::as_str).collect();
253 let deps_map = self.storage.get_deps_batch(&src_refs)?;
254
255 let mut target_ids: Vec<String> = Vec::new();
258 let mut seen: HashSet<String> = HashSet::new();
259 for deps in deps_map.values() {
260 for (dst, kind, _) in deps {
261 if kind == "soft" && seen.insert(dst.clone()) {
262 target_ids.push(dst.clone());
263 }
264 }
265 }
266 if target_ids.is_empty() {
267 return Ok(());
268 }
269 let target_refs: Vec<&str> = target_ids.iter().map(String::as_str).collect();
270 let targets = self.storage.get_chunks_by_ids(&target_refs)?;
271
272 for src in &src_ids {
273 let Some(deps) = deps_map.get(src) else {
274 continue;
275 };
276 for (dst, kind, _) in deps {
277 if kind != "soft" {
278 continue;
279 }
280 let Some(target) = targets.get(dst) else {
281 continue;
282 };
283 if target.get("state").and_then(Value::as_str) == Some("archived") {
284 continue;
285 }
286 if target.get("origin").and_then(Value::as_str) == Some("spark") {
287 continue;
288 }
289 let e = candidates
290 .entry(dst.clone())
291 .or_insert_with(|| new_candidate(target));
292 e.sim_content = (e.sim_content + 0.05).min(1.0);
293 }
294 }
295 Ok(())
296 }
297
298 fn score_candidates(
299 &self,
300 candidates: HashMap<String, CandidateInfo>,
301 query: &str,
302 context_key: &str,
303 now: &str,
304 ) -> Result<Vec<(f64, Value)>> {
305 let cand_ids: Vec<String> = candidates
308 .values()
309 .filter_map(|info| {
310 info.chunk
311 .get("id")
312 .and_then(Value::as_str)
313 .map(str::to_string)
314 })
315 .collect();
316 let cand_refs: Vec<&str> = cand_ids.iter().map(String::as_str).collect();
317 let ctx_scores = self.storage.context_scores_batch(
319 &cand_refs,
320 context_key,
321 RECALL_PRIOR_M,
322 RECALL_BASE_RATE,
323 )?;
324
325 let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
326 for info in candidates.into_values() {
327 let conf = info
328 .chunk
329 .get("confidence")
330 .and_then(Value::as_f64)
331 .unwrap_or(0.5);
332 let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
333 let context_score = ctx_scores.get(chunk_id).copied().unwrap_or(0.0);
334 let used_count = info
337 .chunk
338 .get("used_count")
339 .and_then(Value::as_i64)
340 .unwrap_or(0);
341 let last_used_at = info.chunk.get("last_used_at").and_then(Value::as_str);
342 let activation = actr_activation(used_count, last_used_at, now);
343 let mut fused = self.w_content * info.sim_content as f64
344 + self.w_trigger * info.sim_trigger as f64
345 + self.w_lexical * info.sim_lexical as f64
346 + self.w_confidence * conf
347 + self.w_context * context_score
348 + self.w_activation * activation;
349 if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
350 fused *= PENDING_RECALL_PENALTY;
351 }
352 let anti = info
353 .chunk
354 .get("anti_trigger_desc")
355 .and_then(Value::as_str)
356 .unwrap_or("");
357 if !anti.is_empty() && anti_trigger_hit(query, anti) {
358 fused *= self.anti_trigger_penalty;
359 }
360 let mut chunk = info.chunk;
361 chunk["_context_score"] = json!(context_score);
362 chunk["_activation"] = json!(activation);
363 chunk["_sim_lexical"] = json!(info.sim_lexical);
364 chunk["_fused_score"] = json!(fused);
365 scored.push((fused, chunk));
366 }
367 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
368 scored.truncate(self.top_k_candidates);
369 Ok(scored)
370 }
371
372 fn apply_rerank(&self, query: &str, scored: &mut [(f64, Value)]) {
377 let chunks: Vec<Value> = scored.iter().map(|(_, c)| c.clone()).collect();
378 let order = match self.reranker.rerank(query, &chunks) {
379 Ok(order) if !order.is_empty() => order,
380 _ => return,
381 };
382 let rank: HashMap<&str, usize> = order
383 .iter()
384 .enumerate()
385 .map(|(i, id)| (id.as_str(), i))
386 .collect();
387 scored.sort_by_key(|(_, c)| {
388 rank.get(c["id"].as_str().unwrap_or(""))
389 .copied()
390 .unwrap_or(usize::MAX)
391 });
392 }
393
394 fn pack(
395 &self,
396 scored: &[(f64, Value)],
397 budget: usize,
398 expand_deps: &str,
399 allow_trim: bool,
400 query: &str,
401 ) -> Result<PackResult> {
402 let mut selected: Vec<Value> = vec![];
403 let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
404 let mut skipped_reasons: HashMap<String, String> = HashMap::new();
405 let mut used_ids: HashSet<String> = HashSet::new();
406 let mut used_tokens: usize = 0;
407
408 for (fused, chunk) in scored {
409 let cid = chunk["id"].as_str().unwrap_or("").to_string();
410 if used_ids.contains(&cid) {
411 continue;
412 }
413
414 let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
416 if let Some(reason) = dep_skip_reason {
417 skipped_reasons.insert(cid, reason);
418 continue;
419 }
420
421 let new_block: Vec<Value> = block
422 .iter()
423 .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
424 .cloned()
425 .collect();
426 let cost = block_cost(&new_block);
427
428 if used_tokens + cost <= budget {
429 for b in &block {
430 let bid = b["id"].as_str().unwrap_or("").to_string();
431 if !used_ids.contains(&bid) {
432 let mut b = b.clone();
433 b["_fused_score"] = json!(fused);
434 selected.push(b);
435 used_ids.insert(bid);
436 }
437 }
438 used_tokens += cost;
439 } else if allow_trim {
440 if let Some(trimmed) =
442 self.refiner
443 .trim(&block, query, budget.saturating_sub(used_tokens))
444 {
445 let trim_cost = block_cost(&trimmed);
446 if used_tokens + trim_cost <= budget {
447 for b in &trimmed {
448 let bid = b["id"].as_str().unwrap_or("").to_string();
449 if !used_ids.contains(&bid) {
450 let mut b = b.clone();
451 b["_fused_score"] = json!(fused);
452 b["_trimmed"] = json!(true);
453 selected.push(b);
454 used_ids.insert(bid);
455 }
456 }
457 used_tokens += trim_cost;
458 continue;
459 }
460 }
461 skipped.push((block, *fused, cost));
462 } else {
463 skipped.push((block, *fused, cost));
464 }
465 }
466 Ok((selected, skipped, skipped_reasons))
467 }
468
469 fn build_dep_block(
472 &self,
473 seed: &Value,
474 expand_deps: &str,
475 ) -> Result<(Vec<Value>, Option<String>)> {
476 if expand_deps == "false" || expand_deps.is_empty() {
477 return Ok((vec![seed.clone()], None));
478 }
479 let seed_id = seed["id"].as_str().unwrap_or("");
480 match expand_deps {
481 "direct" => {
482 let deps = self.storage.get_deps(seed_id)?;
483 let mut block = vec![seed.clone()];
484 for (dep_id, kind, _) in &deps {
485 if kind != "hard" {
486 continue;
487 }
488 match self.validate_hard_dep(dep_id)? {
489 Some(chunk) => block.push(chunk),
490 None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
491 }
492 }
493 Ok((block, None))
494 }
495 "closure" => {
496 let mut block = vec![seed.clone()];
497 let mut visited: HashSet<String> = [seed_id.to_string()].into();
498 match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
499 Some(reason) => Ok((vec![], Some(reason))),
500 None => Ok((block, None)),
501 }
502 }
503 _ => Ok((vec![seed.clone()], None)),
504 }
505 }
506
507 fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
509 match self.storage.get_chunk(dep_id)? {
510 None => Ok(None),
511 Some(chunk) => {
512 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
513 let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
514 let embed_v = chunk
515 .get("embed_version")
516 .and_then(Value::as_i64)
517 .unwrap_or(0);
518 if state == "archived" || origin == "spark" || embed_v == 0 {
519 Ok(None)
520 } else {
521 Ok(Some(chunk))
522 }
523 }
524 }
525 }
526
527 fn expand_hard_closure(
529 &self,
530 id: &str,
531 visited: &mut HashSet<String>,
532 block: &mut Vec<Value>,
533 depth: usize,
534 max_depth: usize,
535 ) -> Result<Option<String>> {
536 if depth >= max_depth {
537 return Ok(Some("dep_depth_limit".to_string()));
538 }
539 let deps = self.storage.get_deps(id)?;
540 for (dep_id, kind, _) in &deps {
541 if kind != "hard" {
542 continue;
543 }
544 if visited.contains(dep_id) {
545 continue;
546 } visited.insert(dep_id.clone());
548 match self.validate_hard_dep(dep_id)? {
549 None => return Ok(Some("hard_dep_unavailable".to_string())),
550 Some(chunk) => {
551 block.push(chunk);
552 if let Some(reason) =
553 self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
554 {
555 return Ok(Some(reason));
556 }
557 }
558 }
559 }
560 Ok(None)
561 }
562
563 fn density_refill(
564 &self,
565 mut selected: Vec<Value>,
566 skipped: &[(Vec<Value>, f64, usize)],
567 budget: usize,
568 ) -> Vec<Value> {
569 let used_tokens = block_cost(&selected);
570 if used_tokens >= budget {
571 return selected;
572 }
573
574 let selected_ids: HashSet<String> = selected
575 .iter()
576 .filter_map(|c| c["id"].as_str().map(str::to_string))
577 .collect();
578
579 let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
580 .iter()
581 .filter_map(|(block, fscore, _)| {
582 let block: Vec<Value> = block
583 .iter()
584 .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
585 .cloned()
586 .collect();
587 if block.is_empty() {
588 return None;
589 }
590 let cost = block_cost(&block);
591 let density = fscore / cost.max(1) as f64;
592 Some((density, block, cost))
593 })
594 .collect();
595 density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
596
597 let mut used_tokens = block_cost(&selected);
598 let mut added_ids: HashSet<String> = selected_ids;
599 for (_, block, cost) in density_items {
600 if used_tokens + cost <= budget {
601 for b in block {
602 let bid = b["id"].as_str().unwrap_or("").to_string();
603 if !added_ids.contains(&bid) {
604 selected.push(b);
605 added_ids.insert(bid);
606 }
607 }
608 used_tokens += cost;
609 }
610 }
611 selected
612 }
613
614 fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
615 let embed_version = self
616 .storage
617 .get_meta("embed_version")?
618 .and_then(|v| v.parse::<i64>().ok())
619 .unwrap_or(1);
620
621 let content_res = self
622 .storage
623 .search_vec_content(q_content, self.top_k_candidates)?;
624 let trigger_res = self
625 .storage
626 .search_vec_trigger(q_trigger, self.top_k_candidates)?;
627
628 let all_ids: Vec<&str> = {
630 let mut seen = HashSet::new();
631 content_res
632 .iter()
633 .chain(trigger_res.iter())
634 .map(|(id, _)| id.as_str())
635 .filter(|id| seen.insert(*id))
636 .collect()
637 };
638 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
639
640 let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
641 for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
642 if let Some(chunk) = chunks.get(cid) {
643 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
644 continue;
645 }
646 if chunk.get("state").and_then(Value::as_str) == Some("archived") {
647 continue;
648 }
649 let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
650 if maturity == "promoted" || maturity == "dropped" {
651 continue;
652 }
653 let ev = chunk
654 .get("embed_version")
655 .and_then(Value::as_i64)
656 .unwrap_or(1);
657 if ev < embed_version {
658 continue;
659 }
660 let entry = spark_scores
661 .entry(cid.clone())
662 .or_insert_with(|| (*sim, chunk.clone()));
663 if *sim > entry.0 {
664 *entry = (*sim, chunk.clone());
665 }
666 }
667 }
668 let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
669 sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
670 Ok(sparks
671 .into_iter()
672 .take(self.top_k_candidates)
673 .map(|(_, c)| c)
674 .collect())
675 }
676
677 #[allow(clippy::too_many_arguments)]
678 fn write_recall_trace(
679 &self,
680 trace_id: &str,
681 query: &str,
682 context_key: &str,
683 scored: &[(f64, Value)],
684 visible: &[Value],
685 sparks: &[Value],
686 depth_skipped: &[String],
687 skipped_reasons: &HashMap<String, String>,
688 refine_mode: &str,
689 source: &str,
690 now: &str,
691 session_only: bool,
692 ) -> Result<()> {
693 let lib_id = self.storage.lib_id()?;
694 let is_empty = visible.is_empty() && sparks.is_empty();
702 let record_selection = !is_empty && !session_only;
703 self.storage.begin_immediate()?;
704 let result = (|| -> Result<()> {
705 if record_selection {
706 for (rank, (_, chunk)) in scored.iter().enumerate() {
707 let cid = chunk["id"].as_str().unwrap_or("");
708 let sim = chunk.get("_fused_score").and_then(Value::as_f64);
709 let rm = skipped_reasons
711 .get(cid)
712 .map(|r| format!("skipped:{r}"))
713 .or_else(|| {
714 if refine_mode != "off" && !refine_mode.is_empty() {
715 Some(refine_mode.to_string())
716 } else {
717 None
718 }
719 });
720 self.storage.insert_usage_trace(
721 trace_id,
722 Some(cid),
723 "retrieved",
724 1.0,
725 sim,
726 rm.as_deref(),
727 None,
728 Some((rank + 1) as i64),
729 None,
730 source,
731 now,
732 )?;
733 }
734 for (rank, chunk) in visible.iter().enumerate() {
735 let cid = chunk["id"].as_str().unwrap_or("");
736 self.storage.insert_usage_trace(
737 trace_id,
738 Some(cid),
739 "selected",
740 1.0,
741 None,
742 None,
743 None,
744 Some((rank + 1) as i64),
745 None,
746 source,
747 now,
748 )?;
749 if chunk
751 .get("_trimmed")
752 .and_then(Value::as_bool)
753 .unwrap_or(false)
754 {
755 self.storage.insert_usage_trace(
756 trace_id,
757 Some(cid),
758 "refined",
759 1.0,
760 None,
761 Some("trim"),
762 None,
763 Some((rank + 1) as i64),
764 None,
765 source,
766 now,
767 )?;
768 }
769 }
770 for (rank, chunk) in sparks.iter().enumerate() {
772 let cid = chunk["id"].as_str().unwrap_or("");
773 self.storage.insert_usage_trace(
774 trace_id,
775 Some(cid),
776 "retrieved",
777 1.0,
778 None,
779 Some("spark"),
780 None,
781 Some((rank + 1) as i64),
782 None,
783 source,
784 now,
785 )?;
786 }
787 }
788 let snapshot = json!({
791 "retrieved": if record_selection { scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
792 "selected": if record_selection { visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
793 "sparks": if record_selection { sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
794 "depth_skipped": depth_skipped,
795 "skipped_reasons": skipped_reasons,
796 "session_only": session_only,
797 });
798 let (usage_state, distill_state) = if is_empty {
804 ("known_none", "discarded")
805 } else {
806 ("unknown", "open")
807 };
808 let log = EpisodicLogRow {
809 id: gen_uuid(),
810 trace_id: trace_id.to_string(),
811 lib_id,
812 ts: now.to_string(),
813 query: Some(query.to_string()),
814 recall_snapshot: Some(snapshot.to_string()),
815 event_source: source.to_string(),
816 agent: agent_source(),
817 task_state: "recalled".to_string(),
818 usage_state: usage_state.to_string(),
819 context_key: Some(context_key.to_string()),
820 distill_state: distill_state.to_string(),
821 ..Default::default()
822 };
823 self.storage.upsert_episodic_log(&log)?;
824 self.storage.commit()
825 })();
826 if result.is_err() {
827 let _ = self.storage.rollback();
828 }
829 result
830 }
831}