1use super::*;
2
3#[derive(Debug, Clone, Default)]
9pub struct RecallParams<'a> {
10 pub query: &'a str,
11 pub budget: usize,
12 pub trace: bool,
13 pub include_sparks: bool,
14 pub top: Option<usize>,
15 pub source: &'a str,
16 pub expand_deps: &'a str, pub allow_trim: bool, pub refine_mode: &'a str, pub min_score: Option<f64>,
24 pub session_only: bool,
31}
32
33impl KnowledgeBase {
34 pub fn recall(&self, params: RecallParams<'_>) -> Result<RecallResult> {
35 let RecallParams {
36 query,
37 budget,
38 trace,
39 include_sparks,
40 top,
41 source,
42 expand_deps,
43 allow_trim,
44 refine_mode,
45 min_score,
46 session_only,
47 } = params;
48 let expand_deps = if expand_deps.is_empty() {
49 "false"
50 } else {
51 expand_deps
52 };
53 let refine_mode = if refine_mode.is_empty() {
54 "off"
55 } else {
56 refine_mode
57 };
58 validate_source(source)?;
59 let trace_id = gen_uuid();
60 let now = utc_now_iso();
61
62 let situation = Situation::from_query(query);
66 let context_key = situation.context_key(&self.situation_coarse_keys);
67
68 let (q_content, q_trigger) = self
69 .embedding
70 .embed_both(query)
71 .map_err(|e| InnateError::EmbeddingUnavailable(e.to_string()))?;
72
73 let mut candidates = self.ann_candidates(&q_content, &q_trigger)?;
75 self.apply_soft_dep_bonus(&mut candidates)?;
76
77 let mut scored = self.score_candidates(candidates, query, &context_key, &now)?;
79
80 if let Some(min) = min_score {
83 scored.retain(|(fused, _)| *fused >= min);
84 }
85
86 let (selected, skipped, skipped_reasons) =
88 self.pack(&scored, budget, expand_deps, allow_trim, query)?;
89
90 let depth_skipped: Vec<String> = skipped_reasons
91 .iter()
92 .filter(|(_, r)| r.as_str() == "dep_depth_limit")
93 .map(|(id, _)| id.clone())
94 .collect();
95
96 let mut selected = selected;
98 if self.density_refill {
99 selected = self.density_refill(selected, &skipped, budget);
100 }
101
102 let limited = limit_knowledge(selected, top);
103 let visible = if refine_mode == "adapt" {
104 self.refiner
105 .refine(limited.clone(), Some(budget))
106 .unwrap_or(limited)
107 } else {
108 limited
109 };
110
111 let sparks = if include_sparks {
113 self.recall_sparks(&q_content, &q_trigger)?
114 } else {
115 vec![]
116 };
117
118 if trace {
119 self.write_recall_trace(
120 &trace_id,
121 query,
122 &context_key,
123 &scored,
124 &visible,
125 &sparks,
126 &depth_skipped,
127 &skipped_reasons,
128 refine_mode,
129 source,
130 &now,
131 session_only,
132 )?;
133 }
134
135 let empty = visible.is_empty() && sparks.is_empty();
136 Ok(RecallResult {
137 knowledge: visible,
138 sparks,
139 trace_id,
140 empty,
141 depth_skipped,
142 skipped_reasons,
143 })
144 }
145
146 pub(super) fn ann_candidates(
147 &self,
148 q_content: &[f32],
149 q_trigger: &[f32],
150 ) -> Result<HashMap<String, CandidateInfo>> {
151 let embed_version = self
152 .storage
153 .get_meta("embed_version")?
154 .and_then(|v| v.parse::<i64>().ok())
155 .unwrap_or(1);
156
157 let content_res = self
158 .storage
159 .search_vec_content(q_content, self.top_k_candidates * 2)?;
160 let trigger_res = self
161 .storage
162 .search_vec_trigger(q_trigger, self.top_k_candidates * 2)?;
163
164 let all_ids: Vec<&str> = {
166 let mut seen = HashSet::new();
167 content_res
168 .iter()
169 .chain(trigger_res.iter())
170 .map(|(id, _)| id.as_str())
171 .filter(|id| seen.insert(*id))
172 .collect()
173 };
174 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
175
176 let mut candidates: HashMap<String, CandidateInfo> = HashMap::new();
177
178 for (cid, sim) in &content_res {
179 if let Some(chunk) = chunks.get(cid) {
180 if chunk_is_valid_for_recall(chunk, embed_version) {
181 let e = candidates
182 .entry(cid.clone())
183 .or_insert_with(|| CandidateInfo {
184 chunk: chunk.clone(),
185 sim_content: 0.0,
186 sim_trigger: 0.0,
187 });
188 e.sim_content = e.sim_content.max(*sim);
189 }
190 }
191 }
192 for (cid, sim) in &trigger_res {
193 if let Some(chunk) = chunks.get(cid) {
194 if chunk_is_valid_for_recall(chunk, embed_version) {
195 let e = candidates
196 .entry(cid.clone())
197 .or_insert_with(|| CandidateInfo {
198 chunk: chunk.clone(),
199 sim_content: 0.0,
200 sim_trigger: 0.0,
201 });
202 e.sim_trigger = e.sim_trigger.max(*sim);
203 }
204 }
205 }
206 Ok(candidates)
207 }
208
209 pub(super) fn apply_soft_dep_bonus(
210 &self,
211 candidates: &mut HashMap<String, CandidateInfo>,
212 ) -> Result<()> {
213 let src_ids: Vec<String> = candidates
216 .iter()
217 .filter(|(_, info)| info.chunk.get("origin").and_then(Value::as_str) != Some("spark"))
218 .map(|(cid, _)| cid.clone())
219 .collect();
220 if src_ids.is_empty() {
221 return Ok(());
222 }
223 let src_refs: Vec<&str> = src_ids.iter().map(String::as_str).collect();
224 let deps_map = self.storage.get_deps_batch(&src_refs)?;
225
226 let mut target_ids: Vec<String> = Vec::new();
229 let mut seen: HashSet<String> = HashSet::new();
230 for deps in deps_map.values() {
231 for (dst, kind, _) in deps {
232 if kind == "soft" && seen.insert(dst.clone()) {
233 target_ids.push(dst.clone());
234 }
235 }
236 }
237 if target_ids.is_empty() {
238 return Ok(());
239 }
240 let target_refs: Vec<&str> = target_ids.iter().map(String::as_str).collect();
241 let targets = self.storage.get_chunks_by_ids(&target_refs)?;
242
243 for src in &src_ids {
244 let Some(deps) = deps_map.get(src) else {
245 continue;
246 };
247 for (dst, kind, _) in deps {
248 if kind != "soft" {
249 continue;
250 }
251 let Some(target) = targets.get(dst) else {
252 continue;
253 };
254 if target.get("state").and_then(Value::as_str) == Some("archived") {
255 continue;
256 }
257 if target.get("origin").and_then(Value::as_str) == Some("spark") {
258 continue;
259 }
260 let e = candidates
261 .entry(dst.clone())
262 .or_insert_with(|| CandidateInfo {
263 chunk: target.clone(),
264 sim_content: 0.0,
265 sim_trigger: 0.0,
266 });
267 e.sim_content = (e.sim_content + 0.05).min(1.0);
268 }
269 }
270 Ok(())
271 }
272
273 fn score_candidates(
274 &self,
275 candidates: HashMap<String, CandidateInfo>,
276 query: &str,
277 context_key: &str,
278 now: &str,
279 ) -> Result<Vec<(f64, Value)>> {
280 let cand_ids: Vec<String> = candidates
283 .values()
284 .filter_map(|info| {
285 info.chunk
286 .get("id")
287 .and_then(Value::as_str)
288 .map(str::to_string)
289 })
290 .collect();
291 let cand_refs: Vec<&str> = cand_ids.iter().map(String::as_str).collect();
292 let ctx_scores = self.storage.context_scores_batch(
294 &cand_refs,
295 context_key,
296 RECALL_PRIOR_M,
297 RECALL_BASE_RATE,
298 )?;
299
300 let mut scored: Vec<(f64, Value)> = Vec::with_capacity(candidates.len());
301 for info in candidates.into_values() {
302 let conf = info
303 .chunk
304 .get("confidence")
305 .and_then(Value::as_f64)
306 .unwrap_or(0.5);
307 let chunk_id = info.chunk.get("id").and_then(Value::as_str).unwrap_or("");
308 let context_score = ctx_scores.get(chunk_id).copied().unwrap_or(0.0);
309 let used_count = info
312 .chunk
313 .get("used_count")
314 .and_then(Value::as_i64)
315 .unwrap_or(0);
316 let last_used_at = info.chunk.get("last_used_at").and_then(Value::as_str);
317 let activation = actr_activation(used_count, last_used_at, now);
318 let mut fused = self.w_content * info.sim_content as f64
319 + self.w_trigger * info.sim_trigger as f64
320 + self.w_confidence * conf
321 + self.w_context * context_score
322 + self.w_activation * activation;
323 if info.chunk.get("state").and_then(Value::as_str) == Some("pending") {
324 fused *= PENDING_RECALL_PENALTY;
325 }
326 let anti = info
327 .chunk
328 .get("anti_trigger_desc")
329 .and_then(Value::as_str)
330 .unwrap_or("");
331 if !anti.is_empty() && anti_trigger_hit(query, anti) {
332 fused *= self.anti_trigger_penalty;
333 }
334 let mut chunk = info.chunk;
335 chunk["_context_score"] = json!(context_score);
336 chunk["_activation"] = json!(activation);
337 chunk["_fused_score"] = json!(fused);
338 scored.push((fused, chunk));
339 }
340 scored.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
341 scored.truncate(self.top_k_candidates);
342 Ok(scored)
343 }
344
345 fn pack(
346 &self,
347 scored: &[(f64, Value)],
348 budget: usize,
349 expand_deps: &str,
350 allow_trim: bool,
351 query: &str,
352 ) -> Result<PackResult> {
353 let mut selected: Vec<Value> = vec![];
354 let mut skipped: Vec<(Vec<Value>, f64, usize)> = vec![];
355 let mut skipped_reasons: HashMap<String, String> = HashMap::new();
356 let mut used_ids: HashSet<String> = HashSet::new();
357 let mut used_tokens: usize = 0;
358
359 for (fused, chunk) in scored {
360 let cid = chunk["id"].as_str().unwrap_or("").to_string();
361 if used_ids.contains(&cid) {
362 continue;
363 }
364
365 let (block, dep_skip_reason) = self.build_dep_block(chunk, expand_deps)?;
367 if let Some(reason) = dep_skip_reason {
368 skipped_reasons.insert(cid, reason);
369 continue;
370 }
371
372 let new_block: Vec<Value> = block
373 .iter()
374 .filter(|b| !used_ids.contains(b["id"].as_str().unwrap_or("")))
375 .cloned()
376 .collect();
377 let cost = block_cost(&new_block);
378
379 if used_tokens + cost <= budget {
380 for b in &block {
381 let bid = b["id"].as_str().unwrap_or("").to_string();
382 if !used_ids.contains(&bid) {
383 let mut b = b.clone();
384 b["_fused_score"] = json!(fused);
385 selected.push(b);
386 used_ids.insert(bid);
387 }
388 }
389 used_tokens += cost;
390 } else if allow_trim {
391 if let Some(trimmed) =
393 self.refiner
394 .trim(&block, query, budget.saturating_sub(used_tokens))
395 {
396 let trim_cost = block_cost(&trimmed);
397 if used_tokens + trim_cost <= budget {
398 for b in &trimmed {
399 let bid = b["id"].as_str().unwrap_or("").to_string();
400 if !used_ids.contains(&bid) {
401 let mut b = b.clone();
402 b["_fused_score"] = json!(fused);
403 b["_trimmed"] = json!(true);
404 selected.push(b);
405 used_ids.insert(bid);
406 }
407 }
408 used_tokens += trim_cost;
409 continue;
410 }
411 }
412 skipped.push((block, *fused, cost));
413 } else {
414 skipped.push((block, *fused, cost));
415 }
416 }
417 Ok((selected, skipped, skipped_reasons))
418 }
419
420 fn build_dep_block(
423 &self,
424 seed: &Value,
425 expand_deps: &str,
426 ) -> Result<(Vec<Value>, Option<String>)> {
427 if expand_deps == "false" || expand_deps.is_empty() {
428 return Ok((vec![seed.clone()], None));
429 }
430 let seed_id = seed["id"].as_str().unwrap_or("");
431 match expand_deps {
432 "direct" => {
433 let deps = self.storage.get_deps(seed_id)?;
434 let mut block = vec![seed.clone()];
435 for (dep_id, kind, _) in &deps {
436 if kind != "hard" {
437 continue;
438 }
439 match self.validate_hard_dep(dep_id)? {
440 Some(chunk) => block.push(chunk),
441 None => return Ok((vec![], Some("hard_dep_unavailable".to_string()))),
442 }
443 }
444 Ok((block, None))
445 }
446 "closure" => {
447 let mut block = vec![seed.clone()];
448 let mut visited: HashSet<String> = [seed_id.to_string()].into();
449 match self.expand_hard_closure(seed_id, &mut visited, &mut block, 0, 3)? {
450 Some(reason) => Ok((vec![], Some(reason))),
451 None => Ok((block, None)),
452 }
453 }
454 _ => Ok((vec![seed.clone()], None)),
455 }
456 }
457
458 fn validate_hard_dep(&self, dep_id: &str) -> Result<Option<Value>> {
460 match self.storage.get_chunk(dep_id)? {
461 None => Ok(None),
462 Some(chunk) => {
463 let state = chunk.get("state").and_then(Value::as_str).unwrap_or("");
464 let origin = chunk.get("origin").and_then(Value::as_str).unwrap_or("");
465 let embed_v = chunk
466 .get("embed_version")
467 .and_then(Value::as_i64)
468 .unwrap_or(0);
469 if state == "archived" || origin == "spark" || embed_v == 0 {
470 Ok(None)
471 } else {
472 Ok(Some(chunk))
473 }
474 }
475 }
476 }
477
478 fn expand_hard_closure(
480 &self,
481 id: &str,
482 visited: &mut HashSet<String>,
483 block: &mut Vec<Value>,
484 depth: usize,
485 max_depth: usize,
486 ) -> Result<Option<String>> {
487 if depth >= max_depth {
488 return Ok(Some("dep_depth_limit".to_string()));
489 }
490 let deps = self.storage.get_deps(id)?;
491 for (dep_id, kind, _) in &deps {
492 if kind != "hard" {
493 continue;
494 }
495 if visited.contains(dep_id) {
496 continue;
497 } visited.insert(dep_id.clone());
499 match self.validate_hard_dep(dep_id)? {
500 None => return Ok(Some("hard_dep_unavailable".to_string())),
501 Some(chunk) => {
502 block.push(chunk);
503 if let Some(reason) =
504 self.expand_hard_closure(dep_id, visited, block, depth + 1, max_depth)?
505 {
506 return Ok(Some(reason));
507 }
508 }
509 }
510 }
511 Ok(None)
512 }
513
514 fn density_refill(
515 &self,
516 mut selected: Vec<Value>,
517 skipped: &[(Vec<Value>, f64, usize)],
518 budget: usize,
519 ) -> Vec<Value> {
520 let used_tokens = block_cost(&selected);
521 if used_tokens >= budget {
522 return selected;
523 }
524
525 let selected_ids: HashSet<String> = selected
526 .iter()
527 .filter_map(|c| c["id"].as_str().map(str::to_string))
528 .collect();
529
530 let mut density_items: Vec<(f64, Vec<Value>, usize)> = skipped
531 .iter()
532 .filter_map(|(block, fscore, _)| {
533 let block: Vec<Value> = block
534 .iter()
535 .filter(|b| !selected_ids.contains(b["id"].as_str().unwrap_or("")))
536 .cloned()
537 .collect();
538 if block.is_empty() {
539 return None;
540 }
541 let cost = block_cost(&block);
542 let density = fscore / cost.max(1) as f64;
543 Some((density, block, cost))
544 })
545 .collect();
546 density_items.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
547
548 let mut used_tokens = block_cost(&selected);
549 let mut added_ids: HashSet<String> = selected_ids;
550 for (_, block, cost) in density_items {
551 if used_tokens + cost <= budget {
552 for b in block {
553 let bid = b["id"].as_str().unwrap_or("").to_string();
554 if !added_ids.contains(&bid) {
555 selected.push(b);
556 added_ids.insert(bid);
557 }
558 }
559 used_tokens += cost;
560 }
561 }
562 selected
563 }
564
565 fn recall_sparks(&self, q_content: &[f32], q_trigger: &[f32]) -> Result<Vec<Value>> {
566 let embed_version = self
567 .storage
568 .get_meta("embed_version")?
569 .and_then(|v| v.parse::<i64>().ok())
570 .unwrap_or(1);
571
572 let content_res = self
573 .storage
574 .search_vec_content(q_content, self.top_k_candidates)?;
575 let trigger_res = self
576 .storage
577 .search_vec_trigger(q_trigger, self.top_k_candidates)?;
578
579 let all_ids: Vec<&str> = {
581 let mut seen = HashSet::new();
582 content_res
583 .iter()
584 .chain(trigger_res.iter())
585 .map(|(id, _)| id.as_str())
586 .filter(|id| seen.insert(*id))
587 .collect()
588 };
589 let chunks = self.storage.get_chunks_by_ids(&all_ids)?;
590
591 let mut spark_scores: HashMap<String, (f32, Value)> = HashMap::new();
592 for (cid, sim) in content_res.iter().chain(trigger_res.iter()) {
593 if let Some(chunk) = chunks.get(cid) {
594 if chunk.get("origin").and_then(Value::as_str) != Some("spark") {
595 continue;
596 }
597 if chunk.get("state").and_then(Value::as_str) == Some("archived") {
598 continue;
599 }
600 let maturity = chunk.get("maturity").and_then(Value::as_str).unwrap_or("");
601 if maturity == "promoted" || maturity == "dropped" {
602 continue;
603 }
604 let ev = chunk
605 .get("embed_version")
606 .and_then(Value::as_i64)
607 .unwrap_or(1);
608 if ev < embed_version {
609 continue;
610 }
611 let entry = spark_scores
612 .entry(cid.clone())
613 .or_insert_with(|| (*sim, chunk.clone()));
614 if *sim > entry.0 {
615 *entry = (*sim, chunk.clone());
616 }
617 }
618 }
619 let mut sparks: Vec<(f32, Value)> = spark_scores.into_values().collect();
620 sparks.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
621 Ok(sparks
622 .into_iter()
623 .take(self.top_k_candidates)
624 .map(|(_, c)| c)
625 .collect())
626 }
627
628 #[allow(clippy::too_many_arguments)]
629 fn write_recall_trace(
630 &self,
631 trace_id: &str,
632 query: &str,
633 context_key: &str,
634 scored: &[(f64, Value)],
635 visible: &[Value],
636 sparks: &[Value],
637 depth_skipped: &[String],
638 skipped_reasons: &HashMap<String, String>,
639 refine_mode: &str,
640 source: &str,
641 now: &str,
642 session_only: bool,
643 ) -> Result<()> {
644 let lib_id = self.storage.lib_id()?;
645 let is_empty = visible.is_empty() && sparks.is_empty();
653 let record_selection = !is_empty && !session_only;
654 self.storage.begin_immediate()?;
655 let result = (|| -> Result<()> {
656 if record_selection {
657 for (rank, (_, chunk)) in scored.iter().enumerate() {
658 let cid = chunk["id"].as_str().unwrap_or("");
659 let sim = chunk.get("_fused_score").and_then(Value::as_f64);
660 let rm = skipped_reasons
662 .get(cid)
663 .map(|r| format!("skipped:{r}"))
664 .or_else(|| {
665 if refine_mode != "off" && !refine_mode.is_empty() {
666 Some(refine_mode.to_string())
667 } else {
668 None
669 }
670 });
671 self.storage.insert_usage_trace(
672 trace_id,
673 Some(cid),
674 "retrieved",
675 1.0,
676 sim,
677 rm.as_deref(),
678 None,
679 Some((rank + 1) as i64),
680 None,
681 source,
682 now,
683 )?;
684 }
685 for (rank, chunk) in visible.iter().enumerate() {
686 let cid = chunk["id"].as_str().unwrap_or("");
687 self.storage.insert_usage_trace(
688 trace_id,
689 Some(cid),
690 "selected",
691 1.0,
692 None,
693 None,
694 None,
695 Some((rank + 1) as i64),
696 None,
697 source,
698 now,
699 )?;
700 if chunk
702 .get("_trimmed")
703 .and_then(Value::as_bool)
704 .unwrap_or(false)
705 {
706 self.storage.insert_usage_trace(
707 trace_id,
708 Some(cid),
709 "refined",
710 1.0,
711 None,
712 Some("trim"),
713 None,
714 Some((rank + 1) as i64),
715 None,
716 source,
717 now,
718 )?;
719 }
720 }
721 for (rank, chunk) in sparks.iter().enumerate() {
723 let cid = chunk["id"].as_str().unwrap_or("");
724 self.storage.insert_usage_trace(
725 trace_id,
726 Some(cid),
727 "retrieved",
728 1.0,
729 None,
730 Some("spark"),
731 None,
732 Some((rank + 1) as i64),
733 None,
734 source,
735 now,
736 )?;
737 }
738 }
739 let snapshot = json!({
742 "retrieved": if record_selection { scored.iter().map(|(_, c)| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
743 "selected": if record_selection { visible.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
744 "sparks": if record_selection { sparks.iter().map(|c| c["id"].as_str().unwrap_or("")).collect::<Vec<_>>() } else { vec![] },
745 "depth_skipped": depth_skipped,
746 "skipped_reasons": skipped_reasons,
747 "session_only": session_only,
748 });
749 let (usage_state, distill_state) = if is_empty {
755 ("known_none", "discarded")
756 } else {
757 ("unknown", "open")
758 };
759 let log = EpisodicLogRow {
760 id: gen_uuid(),
761 trace_id: trace_id.to_string(),
762 lib_id,
763 ts: now.to_string(),
764 query: Some(query.to_string()),
765 recall_snapshot: Some(snapshot.to_string()),
766 event_source: source.to_string(),
767 task_state: "recalled".to_string(),
768 usage_state: usage_state.to_string(),
769 context_key: Some(context_key.to_string()),
770 distill_state: distill_state.to_string(),
771 ..Default::default()
772 };
773 self.storage.upsert_episodic_log(&log)?;
774 self.storage.commit()
775 })();
776 if result.is_err() {
777 let _ = self.storage.rollback();
778 }
779 result
780 }
781}