1use crate::agent_runner::{AgentChunkBuffer, AgentChunkRunner, AgentRunOptions};
2use crate::agents::FinalizeArgs;
3use crate::category::DeFiCategory;
4use crate::db::HistoricalDatabase;
5use crate::error::{KgError, Result};
6use crate::learn::{ExtractedFinding, count_tokens, get_context_budget, sanitize_prompt_prefix};
7use crate::prompts;
8use itertools::Itertools;
9use llmy::agent::tool::ToolBox;
10use llmy::agent::{LLMYError, tool};
11use llmy::client::client::LLM;
12use llmy::client::model::OpenAIModel;
13use schemars::JsonSchema;
14use serde::Deserialize;
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18#[derive(Debug, Clone, Deserialize, JsonSchema)]
23pub struct FindingLinkDecision {
24 pub reasoning: String,
27 pub finding_id: String,
31 pub semantic_evidence: Vec<LinkEvidence>,
37}
38
39#[derive(Debug, Clone, Deserialize, JsonSchema)]
45pub struct LinkEvidence {
46 pub semantic_id: String,
49 pub strength: String,
54 pub why_finding_can_fire: String,
60}
61
62#[derive(Debug, Clone)]
63pub struct PendingFindingForLinking {
64 pub finding_id: i32,
65 pub link_target_finding_id: i32,
66 pub categories: Vec<DeFiCategory>,
67 pub finding: ExtractedFinding,
68}
69
70#[derive(Debug, Clone)]
74pub struct PersistedSemanticLink {
75 pub semantic_id: i32,
76 pub strength: knowdit_kg_model::link_strength::LinkStrength,
77 pub evidence: String,
78}
79
80#[derive(Debug, Clone)]
81pub struct PersistedFindingLinkResult {
82 pub finding_id: i32,
83 pub link_target_finding_id: i32,
84 pub semantic_links: Vec<PersistedSemanticLink>,
85}
86
87#[derive(Debug, Clone, Copy)]
88pub struct FindingLinkOptions {
89 pub concurrency: usize,
90 pub input_token_budget: Option<usize>,
91 pub finding_token_budget: Option<usize>,
92 pub max_findings_per_batch: Option<usize>,
101 pub max_response_attempts: usize,
107 pub max_agent_steps: usize,
110 pub include_unlinked: bool,
111}
112
113#[derive(Debug, Clone)]
114struct SemanticLinkCandidate {
115 candidate_id: String,
116 canonical_semantic_id: i32,
117 is_canonical: bool,
118 category: DeFiCategory,
119 name: String,
120 definition: String,
121 description: String,
122}
123
124#[derive(Debug, Clone)]
125struct FindingLinkContext {
126 prompt_prefix: String,
127 prompt_prefix_tokens: usize,
128 candidate_token_count: usize,
129 candidate_map: HashMap<String, i32>,
130 cache_key: String,
131}
132
133#[derive(Debug, Clone)]
134struct FindingLinkCandidateEntry {
135 candidate_id: String,
136 canonical_semantic_id: i32,
137 is_canonical: bool,
138 category: DeFiCategory,
139 name: String,
140 prompt_body: String,
141 token_count: usize,
142}
143
144#[derive(Debug, Clone)]
152struct FindingLinkCandidateGroup {
153 canonical_semantic_id: i32,
154 category: DeFiCategory,
155 name: String,
156 entries: Vec<FindingLinkCandidateEntry>,
157 token_count: usize,
158}
159
160#[derive(Debug, Clone)]
161struct FindingLinkBatchEntry {
162 pending: PendingFindingForLinking,
163 prompt_finding_id: String,
164 prompt_body: String,
165 token_count: usize,
166}
167
168#[derive(Debug, Clone)]
169struct FindingLinkBatch {
170 context_key: FindingLinkContextKey,
171 entries: Vec<FindingLinkBatchEntry>,
172 finding_token_count: usize,
173 finding_token_budget: usize,
174 input_token_budget: usize,
175}
176
177impl std::fmt::Display for FindingLinkBatch {
178 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179 write!(
180 f,
181 "{}:{}",
182 self.context_key.label(),
183 finding_link_batch_entry_span(&self.entries)
184 )
185 }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
189struct FindingLinkContextKey {
190 categories: BTreeSet<DeFiCategory>,
191 semantic_chunk_index: usize,
192}
193
194impl FindingLinkContextKey {
195 fn empty() -> Self {
196 Self {
197 categories: BTreeSet::new(),
198 semantic_chunk_index: 0,
199 }
200 }
201
202 #[cfg(test)]
203 fn for_category(category: DeFiCategory) -> Self {
204 let mut categories = BTreeSet::new();
205 categories.insert(category);
206 Self {
207 categories,
208 semantic_chunk_index: 0,
209 }
210 }
211
212 fn from_project_categories(_categories: &[DeFiCategory]) -> Vec<Self> {
218 vec![Self::empty()]
219 }
220
221 fn prompt_category(&self) -> Option<DeFiCategory> {
222 self.categories.iter().next().copied()
223 }
224
225 fn with_semantic_chunk(&self, semantic_chunk_index: usize) -> Self {
226 Self {
227 categories: self.categories.clone(),
228 semantic_chunk_index,
229 }
230 }
231
232 fn cache_key(&self) -> String {
233 format!("finding-link-{}", self.slug())
234 }
235
236 fn label(&self) -> String {
237 self.slug()
238 }
239
240 fn slug(&self) -> String {
241 let base = if self.categories.is_empty() {
247 "none".to_string()
248 } else {
249 sanitize_prompt_prefix(
250 &self
251 .categories
252 .iter()
253 .map(DeFiCategory::as_str)
254 .collect::<Vec<_>>()
255 .join("-"),
256 )
257 };
258
259 format!("{}-chunk-{}", base, self.semantic_chunk_index)
260 }
261}
262
263impl FindingLinkContext {
264 fn build(
265 model: &OpenAIModel,
266 context_key: &FindingLinkContextKey,
267 candidate_entries: Vec<FindingLinkCandidateEntry>,
268 ) -> Self {
269 let candidate_token_count = candidate_entries
270 .iter()
271 .map(|entry| entry.token_count)
272 .sum();
273 let candidate_text = candidate_entries
274 .iter()
275 .map(|entry| entry.prompt_body.as_str())
276 .collect::<String>();
277 let candidate_map = candidate_entries
278 .into_iter()
279 .map(|entry| (entry.candidate_id, entry.canonical_semantic_id))
280 .collect();
281 let prompt_prefix =
282 prompts::finding_link_user_prefix(context_key.prompt_category(), &candidate_text);
283
284 Self {
285 prompt_prefix_tokens: count_tokens(model, &prompt_prefix),
286 prompt_prefix,
287 candidate_token_count,
288 candidate_map,
289 cache_key: context_key.cache_key(),
290 }
291 }
292}
293
294#[derive(Debug, Default)]
295struct FindingLinkContextPlan {
296 pending_by_context: BTreeMap<FindingLinkContextKey, Vec<PendingFindingForLinking>>,
297}
298
299impl FindingLinkContextPlan {
300 fn from_pending_findings(pending_findings: Vec<PendingFindingForLinking>) -> Self {
301 let mut plan = Self::default();
302
303 for pending in pending_findings {
304 for context_key in FindingLinkContextKey::from_project_categories(&pending.categories) {
305 plan.pending_by_context
306 .entry(context_key)
307 .or_default()
308 .push(pending.clone());
309 }
310 }
311
312 plan
313 }
314
315 async fn materialize(
316 self,
317 db: &HistoricalDatabase,
318 model: &OpenAIModel,
319 budgets: &FindingLinkBudgets,
320 ) -> Result<FindingLinkExecutionPlan> {
321 use std::sync::Arc;
322
323 let mut contexts = BTreeMap::new();
324 let mut expanded_pending = BTreeMap::new();
325 let mut expected_context_counts = HashMap::new();
326
327 for (base_context_key, pending_findings) in self.pending_by_context {
328 let chunked_contexts =
329 build_finding_link_contexts(db, model, &base_context_key, budgets).await?;
330 let chunk_count = chunked_contexts.len();
331
332 for pending in &pending_findings {
333 expected_context_counts
334 .entry(pending.finding_id)
335 .and_modify(|count| *count += chunk_count)
336 .or_insert(chunk_count);
337 }
338
339 for (context_key, context) in chunked_contexts {
340 contexts.insert(context_key.clone(), Arc::new(context));
341 expanded_pending.insert(context_key, pending_findings.clone());
342 }
343 }
344
345 let batches = build_finding_link_batches(model, expanded_pending, &contexts, budgets)?;
346
347 Ok(FindingLinkExecutionPlan {
348 contexts,
349 batches,
350 expected_context_counts,
351 })
352 }
353}
354
355#[derive(Debug)]
356struct FindingLinkExecutionPlan {
357 contexts: BTreeMap<FindingLinkContextKey, std::sync::Arc<FindingLinkContext>>,
358 batches: Vec<FindingLinkBatch>,
359 expected_context_counts: HashMap<i32, usize>,
360}
361
362impl FindingLinkExecutionPlan {
363 fn expected_context_count(&self, finding_id: i32) -> usize {
364 self.expected_context_counts
365 .get(&finding_id)
366 .copied()
367 .unwrap_or(0)
368 }
369
370 fn task_count(&self) -> usize {
371 self.batches.iter().map(|batch| batch.entries.len()).sum()
372 }
373}
374
375const DEFAULT_MAX_FINDINGS_PER_BATCH: usize = 50;
380
381#[derive(Debug, Clone, Copy)]
382struct FindingLinkBudgets {
383 input_token_budget: usize,
384 semantic_token_target: usize,
385 finding_token_target: usize,
386 max_findings_per_batch: usize,
389}
390
391impl FindingLinkBudgets {
392 fn from_options(model: &OpenAIModel, options: FindingLinkOptions) -> Self {
393 let max_input_budget = get_context_budget(model).max(1);
394 let input_token_budget = options
395 .input_token_budget
396 .unwrap_or(max_input_budget)
397 .min(max_input_budget)
398 .max(1);
399 let shared_target = (input_token_budget / 3).max(1);
400 let finding_token_target = options
401 .finding_token_budget
402 .unwrap_or(shared_target)
403 .min(shared_target)
404 .max(1);
405 let max_findings_per_batch = options
406 .max_findings_per_batch
407 .unwrap_or(DEFAULT_MAX_FINDINGS_PER_BATCH)
408 .max(1);
409
410 Self {
411 input_token_budget,
412 semantic_token_target: shared_target,
413 finding_token_target,
414 max_findings_per_batch,
415 }
416 }
417
418 fn semantic_token_budget(
419 &self,
420 model: &OpenAIModel,
421 prompt_category: Option<DeFiCategory>,
422 ) -> Result<usize> {
423 let system_tokens = count_tokens(model, prompts::GENERAL_ROLE_SYSTEM);
424 let stable_prefix_tokens = count_tokens(
425 model,
426 &prompts::finding_link_user_prefix(prompt_category, ""),
427 );
428 let available_for_candidates = self
429 .input_token_budget
430 .saturating_sub(system_tokens + stable_prefix_tokens + self.finding_token_target);
431 let effective = self.semantic_token_target.min(available_for_candidates);
432
433 if effective == 0 {
434 return Err(KgError::other(format!(
435 "Finding link prompt for category '{}' leaves no room for semantic candidates under input budget {}",
436 prompt_category
437 .map(|category| category.as_str())
438 .unwrap_or("None"),
439 self.input_token_budget
440 )));
441 }
442
443 Ok(effective)
444 }
445
446 fn finding_token_budget(
447 &self,
448 model: &OpenAIModel,
449 context: &FindingLinkContext,
450 ) -> Result<usize> {
451 let system_tokens = count_tokens(model, prompts::GENERAL_ROLE_SYSTEM);
452 let available = self
453 .input_token_budget
454 .saturating_sub(system_tokens + context.prompt_prefix_tokens);
455 let effective = self.finding_token_target.min(available);
456
457 if effective == 0 {
458 return Err(KgError::other(format!(
459 "Finding link context '{}' leaves no room for batched findings under input budget {}",
460 context.cache_key, self.input_token_budget
461 )));
462 }
463
464 Ok(effective)
465 }
466}
467
468#[derive(Debug)]
469struct AggregatedFindingLinkResult {
470 finding_id: i32,
471 link_target_finding_id: i32,
472 expected_contexts: usize,
473 completed_contexts: usize,
474 semantic_links: BTreeMap<i32, PersistedSemanticLink>,
479}
480
481pub async fn link_pending_findings(
482 db: &HistoricalDatabase,
483 llm: &LLM,
484 options: FindingLinkOptions,
485) -> Result<()> {
486 use std::sync::Arc;
487 use tokio::sync::mpsc;
488 use tokio::task::JoinSet;
489
490 let pending_findings = db
491 .list_findings_for_linking(options.include_unlinked)
492 .await?;
493 if pending_findings.is_empty() {
494 tracing::info!("No findings pending linking.");
495 return Ok(());
496 }
497
498 let total_pending_findings = pending_findings.len();
499 let concurrency = options.concurrency.max(1);
500 let max_response_attempts = options.max_response_attempts.max(1);
501 let agent_options = AgentRunOptions::new(options.max_agent_steps);
502 let budgets = FindingLinkBudgets::from_options(&llm.model, options);
503 tracing::info!(
504 "Will link {} pending findings (concurrency={}, input token budget={}, target semantic tokens={}, target finding tokens={}, max response attempts={}, max agent steps={})",
505 total_pending_findings,
506 concurrency,
507 budgets.input_token_budget,
508 budgets.semantic_token_target,
509 budgets.finding_token_target,
510 max_response_attempts,
511 options.max_agent_steps,
512 );
513
514 let plan = FindingLinkContextPlan::from_pending_findings(pending_findings.clone());
515 let execution_plan = plan.materialize(db, &llm.model, &budgets).await?;
516 let mut aggregated_results: HashMap<i32, AggregatedFindingLinkResult> = pending_findings
517 .iter()
518 .map(|pending| {
519 (
520 pending.finding_id,
521 AggregatedFindingLinkResult {
522 finding_id: pending.finding_id,
523 link_target_finding_id: pending.link_target_finding_id,
524 expected_contexts: execution_plan.expected_context_count(pending.finding_id),
525 completed_contexts: 0,
526 semantic_links: BTreeMap::new(),
527 },
528 )
529 })
530 .collect();
531 let task_count = execution_plan.task_count();
532 let FindingLinkExecutionPlan {
533 contexts, batches, ..
534 } = execution_plan;
535
536 tracing::info!(
537 "Prepared {} finding-link batch(es) covering {} finding-context task(s) for {} pending findings",
538 batches.len(),
539 task_count,
540 total_pending_findings
541 );
542
543 let mut failed_findings = HashSet::new();
544 let mut committed_findings: HashSet<i32> = HashSet::new();
545 if concurrency <= 1 {
546 for batch in batches {
547 let key = batch.context_key.clone();
548 let context = contexts.get(&key).ok_or_else(|| {
549 KgError::other(format!("Missing link context for key '{}'", key.label()))
550 })?;
551
552 let runner = FindingLinkBatchAgentRunner {
553 llm: llm.clone(),
554 batch: &batch,
555 context: context.as_ref(),
556 agent_options: agent_options.clone(),
557 max_response_attempts,
558 };
559 match runner.run().await {
560 Ok(results) => {
561 persist_batch_partials(db, &failed_findings, &committed_findings, &results)
568 .await;
569 if let Err(e) =
570 merge_finding_link_batch_results(&mut aggregated_results, results)
571 {
572 mark_batch_findings_failed(&mut failed_findings, &batch);
573 tracing::error!(
574 "Failed to aggregate finding-link batch {} ({} finding-category task(s)): {}",
575 &batch,
576 batch.entries.len(),
577 e
578 );
579 } else {
580 commit_completed_findings(
581 db,
582 &mut aggregated_results,
583 &mut failed_findings,
584 &mut committed_findings,
585 batch.entries.iter().map(|e| e.pending.finding_id),
586 )
587 .await;
588 }
589 }
590 Err(e) => {
591 mark_batch_findings_failed(&mut failed_findings, &batch);
592 tracing::error!(
593 "Failed to link finding batch {} ({} finding-category task(s)): {}",
594 &batch,
595 batch.entries.len(),
596 e
597 );
598 }
599 }
600 }
601 } else {
602 let contexts = Arc::new(contexts);
603 let (tx, rx) = async_channel::bounded::<FindingLinkBatch>(batches.len() + 1);
604 let (out_tx, mut out_rx) = mpsc::channel::<(
605 FindingLinkBatch,
606 Result<Vec<PersistedFindingLinkResult>>,
607 )>(concurrency + 1);
608 let mut handles = JoinSet::new();
609
610 for _ in 0..concurrency {
611 let rx = rx.clone();
612 let out = out_tx.clone();
613 let llm_clone = llm.clone();
614 let contexts = contexts.clone();
615 let worker_agent_options = agent_options.clone();
616
617 handles.spawn(async move {
618 while let Ok(batch) = rx.recv().await {
619 let result = match contexts.get(&batch.context_key) {
620 Some(context) => {
621 let runner = FindingLinkBatchAgentRunner {
622 llm: llm_clone.clone(),
623 batch: &batch,
624 context: context.as_ref(),
625 agent_options: worker_agent_options.clone(),
626 max_response_attempts,
627 };
628 runner.run().await
629 }
630 None => Err(KgError::other(format!(
631 "Missing link context for key '{}'",
632 batch.context_key.label()
633 ))),
634 };
635
636 out.send((batch, result))
637 .await
638 .expect("can not send finding link batch result");
639 }
640
641 Ok::<_, KgError>(())
642 });
643 }
644 drop(out_tx);
645 drop(rx);
646
647 for batch in batches {
648 tx.send(batch)
649 .await
650 .expect("fail to send out finding link batch");
651 }
652 drop(tx);
653
654 while let Some(handle) = out_rx.recv().await {
655 let (batch, results) = handle;
656 match results {
657 Ok(results) => {
658 persist_batch_partials(db, &failed_findings, &committed_findings, &results)
668 .await;
669 if let Err(e) =
670 merge_finding_link_batch_results(&mut aggregated_results, results)
671 {
672 mark_batch_findings_failed(&mut failed_findings, &batch);
673 tracing::error!(
674 "Failed to aggregate finding-link batch {} ({} finding-category task(s)): {}",
675 &batch,
676 batch.entries.len(),
677 e
678 );
679 } else {
680 commit_completed_findings(
681 db,
682 &mut aggregated_results,
683 &mut failed_findings,
684 &mut committed_findings,
685 batch.entries.iter().map(|e| e.pending.finding_id),
686 )
687 .await;
688 }
689 }
690 Err(e) => {
691 mark_batch_findings_failed(&mut failed_findings, &batch);
692 tracing::error!(
693 "Finding-link batch {} ({} finding-category task(s)) failed: {}",
694 &batch,
695 batch.entries.len(),
696 e
697 );
698 }
699 }
700 }
701
702 while let Some(handle) = handles.join_next().await {
703 match handle {
704 Ok(Ok(())) => {}
705 Ok(Err(e)) => {
706 tracing::error!("Finding-link worker task failed: {}", e);
707 }
708 Err(e) => {
709 tracing::error!("Finding-link worker task panicked: {}", e);
710 }
711 }
712 }
713 }
714
715 let final_results = finalize_finding_link_results(aggregated_results, &mut failed_findings);
716 for result in final_results {
717 if let Err(e) = db.write_finding_link_result(&result).await {
718 failed_findings.insert(result.finding_id);
719 tracing::error!(
720 "Failed to save finding links for #{}: {}",
721 result.finding_id,
722 e
723 );
724 }
725 }
726
727 if !failed_findings.is_empty() {
728 tracing::warn!(
729 "Finding linking failed for {} finding(s)",
730 failed_findings.len()
731 );
732 }
733
734 let findings_without_links = db.list_processed_findings_without_semantic_links().await?;
735 if !findings_without_links.is_empty() {
736 let preview = findings_without_links
737 .iter()
738 .take(20)
739 .map(|finding_id| format!("finding-{finding_id}"))
740 .collect::<Vec<_>>()
741 .join(", ");
742 let remainder = findings_without_links.len().saturating_sub(20);
743 let more = if remainder > 0 {
744 format!(" and {remainder} more")
745 } else {
746 String::new()
747 };
748 let retry_hint = if options.include_unlinked {
749 "These findings were included in this run because `--include-unlinked` was enabled, but they still ended up without semantic links."
750 } else {
751 "To retry just these findings, run `knowdit link --include-unlinked`."
752 };
753 tracing::warn!(
754 "{} processed finding(s) still have no semantic links after this run: {}{}. {}",
755 findings_without_links.len(),
756 preview,
757 more,
758 retry_hint,
759 );
760 }
761
762 tracing::info!("Finding linking complete.");
763 Ok(())
764}
765
766fn partition_finding_link_entries(
767 entries: Vec<FindingLinkBatchEntry>,
768 token_budget: usize,
769 max_count: usize,
770) -> Vec<Vec<FindingLinkBatchEntry>> {
771 let mut batches = Vec::new();
772 let mut current = Vec::new();
773 let mut current_tokens = 0usize;
774
775 for entry in entries {
776 let count_exceeded = current.len() >= max_count;
777 let tokens_exceeded = current_tokens + entry.token_count > token_budget;
778 if !current.is_empty() && (count_exceeded || tokens_exceeded) {
779 batches.push(current);
780 current = Vec::new();
781 current_tokens = 0;
782 }
783
784 current_tokens += entry.token_count;
785 current.push(entry);
786 }
787
788 if !current.is_empty() {
789 batches.push(current);
790 }
791
792 batches
793}
794
795fn partition_finding_link_candidate_groups(
796 groups: Vec<FindingLinkCandidateGroup>,
797 token_budget: usize,
798) -> Vec<Vec<FindingLinkCandidateEntry>> {
799 let mut batches = Vec::new();
800 let mut current = Vec::new();
801 let mut current_tokens = 0usize;
802
803 for group in groups {
804 if !current.is_empty() && current_tokens + group.token_count > token_budget {
805 batches.push(current);
806 current = Vec::new();
807 current_tokens = 0;
808 }
809
810 current_tokens += group.token_count;
811 current.extend(group.entries);
812 }
813
814 if !current.is_empty() {
815 batches.push(current);
816 }
817
818 batches
819}
820
821fn group_finding_link_candidate_entries(
822 entries: Vec<FindingLinkCandidateEntry>,
823) -> Result<Vec<FindingLinkCandidateGroup>> {
824 let mut grouped_entries = BTreeMap::<i32, Vec<FindingLinkCandidateEntry>>::new();
825 for entry in entries {
826 grouped_entries
827 .entry(entry.canonical_semantic_id)
828 .or_default()
829 .push(entry);
830 }
831
832 let mut groups = Vec::new();
833 for (canonical_semantic_id, mut group_entries) in grouped_entries {
834 group_entries.sort_by(|lhs, rhs| {
835 rhs.is_canonical
836 .cmp(&lhs.is_canonical)
837 .then_with(|| lhs.name.cmp(&rhs.name))
838 .then_with(|| lhs.candidate_id.cmp(&rhs.candidate_id))
839 });
840
841 let canonical_entry = group_entries
842 .first()
843 .filter(|entry| entry.is_canonical)
844 .ok_or_else(|| {
845 KgError::other(format!(
846 "Missing canonical semantic sem-{} while building finding-link candidate group",
847 canonical_semantic_id
848 ))
849 })?;
850
851 let token_count = group_entries.iter().map(|entry| entry.token_count).sum();
852 groups.push(FindingLinkCandidateGroup {
853 canonical_semantic_id,
854 category: canonical_entry.category,
855 name: canonical_entry.name.clone(),
856 entries: group_entries,
857 token_count,
858 });
859 }
860
861 groups.sort_by(|lhs, rhs| {
862 lhs.category
863 .as_str()
864 .cmp(rhs.category.as_str())
865 .then_with(|| lhs.name.cmp(&rhs.name))
866 .then_with(|| lhs.canonical_semantic_id.cmp(&rhs.canonical_semantic_id))
867 });
868
869 Ok(groups)
870}
871
872fn build_finding_link_batches(
873 model: &OpenAIModel,
874 pending_by_context: BTreeMap<FindingLinkContextKey, Vec<PendingFindingForLinking>>,
875 contexts: &BTreeMap<FindingLinkContextKey, std::sync::Arc<FindingLinkContext>>,
876 budgets: &FindingLinkBudgets,
877) -> Result<Vec<FindingLinkBatch>> {
878 let mut batches = Vec::new();
879
880 for (context_key, pending_findings) in pending_by_context {
881 let context = contexts.get(&context_key).ok_or_else(|| {
882 KgError::other(format!(
883 "Missing link context for key '{}'",
884 context_key.label()
885 ))
886 })?;
887 let finding_token_budget = budgets.finding_token_budget(model, context.as_ref())?;
888
889 let entries = pending_findings
890 .into_iter()
891 .map(|pending| {
892 let prompt_finding_id = prompt_finding_id(pending.finding_id);
893 let prompt_body = prompts::finding_link_finding_entry(
894 &prompt_finding_id,
895 &pending.categories,
896 &pending.finding.title,
897 pending.finding.severity,
898 pending.finding.category,
899 &pending.finding.subcategory,
900 &pending.finding.root_cause,
901 &pending.finding.description,
902 &pending.finding.patterns,
903 &pending.finding.exploits,
904 );
905 let token_count = count_tokens(model, &prompt_body);
906 FindingLinkBatchEntry {
907 pending,
908 prompt_finding_id,
909 prompt_body,
910 token_count,
911 }
912 })
913 .collect();
914
915 for batch_entries in partition_finding_link_entries(
916 entries,
917 finding_token_budget,
918 budgets.max_findings_per_batch,
919 ) {
920 let finding_token_count = batch_entries.iter().map(|entry| entry.token_count).sum();
921 if finding_token_count > finding_token_budget {
922 tracing::warn!(
923 "Finding-link batch {} exceeds finding token budget {} with {} finding tokens; sending as singleton batch",
924 finding_link_batch_entry_span(&batch_entries),
925 finding_token_budget,
926 finding_token_count
927 );
928 }
929
930 batches.push(FindingLinkBatch {
931 context_key: context_key.clone(),
932 entries: batch_entries,
933 finding_token_count,
934 finding_token_budget,
935 input_token_budget: budgets.input_token_budget,
936 });
937 }
938 }
939
940 Ok(batches)
941}
942
943async fn build_finding_link_contexts(
944 db: &HistoricalDatabase,
945 model: &OpenAIModel,
946 base_context_key: &FindingLinkContextKey,
947 budgets: &FindingLinkBudgets,
948) -> Result<Vec<(FindingLinkContextKey, FindingLinkContext)>> {
949 let candidates: Vec<SemanticLinkCandidate> = db
955 .all_canonical_semantic_link_candidates()
956 .await?
957 .into_iter()
958 .map(|node| SemanticLinkCandidate {
959 candidate_id: format!("sem-{}", node.id),
960 canonical_semantic_id: node.id,
961 is_canonical: true,
962 category: node.category,
963 name: node.name,
964 definition: node.definition,
965 description: node.description,
966 })
967 .collect();
968
969 let candidate_entries: Vec<FindingLinkCandidateEntry> = candidates
970 .into_iter()
971 .map(|candidate| {
972 let prompt_body = render_finding_link_candidate(&candidate);
973 FindingLinkCandidateEntry {
974 candidate_id: candidate.candidate_id,
975 canonical_semantic_id: candidate.canonical_semantic_id,
976 is_canonical: candidate.is_canonical,
977 category: candidate.category,
978 name: candidate.name,
979 token_count: count_tokens(model, &prompt_body),
980 prompt_body,
981 }
982 })
983 .collect();
984
985 if candidate_entries.is_empty() {
986 return Ok(vec![(
987 base_context_key.clone(),
988 FindingLinkContext::build(model, base_context_key, Vec::new()),
989 )]);
990 }
991
992 let semantic_token_budget =
993 budgets.semantic_token_budget(model, base_context_key.prompt_category())?;
994 let candidate_groups = group_finding_link_candidate_entries(candidate_entries)?;
995 let candidate_chunks =
996 partition_finding_link_candidate_groups(candidate_groups, semantic_token_budget);
997
998 Ok(candidate_chunks
999 .into_iter()
1000 .enumerate()
1001 .map(|(chunk_index, candidate_entries)| {
1002 let context_key = base_context_key.with_semantic_chunk(chunk_index);
1003 let context = FindingLinkContext::build(model, &context_key, candidate_entries);
1004 (context_key, context)
1005 })
1006 .collect())
1007}
1008
1009fn render_finding_link_candidate(candidate: &SemanticLinkCandidate) -> String {
1010 format!(
1013 "Candidate ID: {}\nCategory: {}\nName: {}\nDefinition: {}\nDescription: {}\n\n",
1014 candidate.candidate_id,
1015 candidate.category,
1016 candidate.name,
1017 candidate.definition,
1018 candidate.description
1019 )
1020}
1021
1022fn merge_finding_link_batch_results(
1023 aggregated_results: &mut HashMap<i32, AggregatedFindingLinkResult>,
1024 results: Vec<PersistedFindingLinkResult>,
1025) -> Result<()> {
1026 for result in results {
1027 let aggregate = aggregated_results
1028 .get_mut(&result.finding_id)
1029 .ok_or_else(|| {
1030 KgError::other(format!(
1031 "Finding link aggregation is missing finding {}",
1032 result.finding_id
1033 ))
1034 })?;
1035
1036 if aggregate.link_target_finding_id != result.link_target_finding_id {
1037 return Err(KgError::other(format!(
1038 "Finding {} resolved inconsistent link targets: {} vs {}",
1039 result.finding_id, aggregate.link_target_finding_id, result.link_target_finding_id
1040 )));
1041 }
1042
1043 aggregate.completed_contexts += 1;
1044 for link in result.semantic_links {
1045 aggregate
1048 .semantic_links
1049 .entry(link.semantic_id)
1050 .and_modify(|existing| {
1051 if link.strength.rank() > existing.strength.rank() {
1052 *existing = link.clone();
1053 }
1054 })
1055 .or_insert(link);
1056 }
1057 }
1058
1059 Ok(())
1060}
1061
1062fn finalize_finding_link_results(
1063 aggregated_results: HashMap<i32, AggregatedFindingLinkResult>,
1064 failed_findings: &mut HashSet<i32>,
1065) -> Vec<PersistedFindingLinkResult> {
1066 let mut final_results = Vec::new();
1067
1068 for aggregate in aggregated_results
1069 .into_values()
1070 .sorted_by_key(|aggregate| aggregate.finding_id)
1071 {
1072 if failed_findings.contains(&aggregate.finding_id) {
1073 continue;
1074 }
1075
1076 if aggregate.completed_contexts != aggregate.expected_contexts {
1077 failed_findings.insert(aggregate.finding_id);
1078 tracing::error!(
1079 "Finding #{} only completed {}/{} category-specific link tasks",
1080 aggregate.finding_id,
1081 aggregate.completed_contexts,
1082 aggregate.expected_contexts
1083 );
1084 continue;
1085 }
1086
1087 final_results.push(PersistedFindingLinkResult {
1088 finding_id: aggregate.finding_id,
1089 link_target_finding_id: aggregate.link_target_finding_id,
1090 semantic_links: aggregate.semantic_links.into_values().collect(),
1091 });
1092 }
1093
1094 final_results
1095}
1096
1097fn mark_batch_findings_failed(failed_findings: &mut HashSet<i32>, batch: &FindingLinkBatch) {
1098 failed_findings.extend(batch.entries.iter().map(|entry| entry.pending.finding_id));
1099}
1100
1101async fn persist_batch_partials(
1110 db: &HistoricalDatabase,
1111 failed_findings: &HashSet<i32>,
1112 committed_findings: &HashSet<i32>,
1113 results: &[PersistedFindingLinkResult],
1114) {
1115 for result in results {
1116 if failed_findings.contains(&result.finding_id)
1117 || committed_findings.contains(&result.finding_id)
1118 {
1119 continue;
1120 }
1121 if let Err(e) = db.upsert_finding_link_partial(result).await {
1122 tracing::error!(
1123 "Failed to persist partial batch links for finding #{}: {}",
1124 result.finding_id,
1125 e
1126 );
1127 }
1133 }
1134}
1135
1136async fn commit_completed_findings<I: IntoIterator<Item = i32>>(
1142 db: &HistoricalDatabase,
1143 aggregated_results: &mut HashMap<i32, AggregatedFindingLinkResult>,
1144 failed_findings: &mut HashSet<i32>,
1145 committed_findings: &mut HashSet<i32>,
1146 finding_ids: I,
1147) {
1148 let mut seen: HashSet<i32> = HashSet::new();
1149 for fid in finding_ids {
1150 if !seen.insert(fid) {
1151 continue;
1152 }
1153 if failed_findings.contains(&fid) || committed_findings.contains(&fid) {
1154 continue;
1155 }
1156 let ready = aggregated_results
1157 .get(&fid)
1158 .map(|agg| agg.expected_contexts > 0 && agg.completed_contexts >= agg.expected_contexts)
1159 .unwrap_or(false);
1160 if !ready {
1161 continue;
1162 }
1163 let agg = aggregated_results
1164 .remove(&fid)
1165 .expect("readiness check just confirmed presence");
1166 let link_count = agg.semantic_links.len();
1167 let finding_id = agg.finding_id;
1168 match db.mark_finding_link_complete(finding_id).await {
1169 Ok(()) => {
1170 committed_findings.insert(finding_id);
1171 tracing::debug!(
1172 "Committed finding #{} ({} semantic link(s))",
1173 finding_id,
1174 link_count,
1175 );
1176 }
1177 Err(e) => {
1178 failed_findings.insert(finding_id);
1179 tracing::error!("Failed to mark finding #{} complete: {}", finding_id, e);
1180 }
1181 }
1182 }
1183}
1184
1185#[derive(Debug, Clone)]
1190#[tool(
1191 arguments = FindingLinkDecision,
1192 invoke = invoke,
1193 description = "Emit one link decision for a single finding in this prompt. Call once per `finding_id` shown under \"Findings To Link\". `semantic_evidence` is a list of {semantic_id, strength, why_finding_can_fire} entries — include candidates the finding is at least tangentially related to (Low or higher). Candidates with NO meaningful relation can be omitted entirely; silent omission is treated as 'no link'. Each entry's `semantic_id` MUST be a Candidate ID from the prompt; `strength` is one of `High` / `Medium` / `Low`; `why_finding_can_fire` cites the specific behaviour from the semantic's description that ties to this finding. High/Medium evidence must be at least 40 characters; Low can be as short as 15. The tool validates ids, strength parsing, and evidence length; rejected decisions can be re-emitted.",
1194 name = "emit_finding_link_decision",
1195)]
1196struct EmitFindingLinkDecisionTool {
1197 buffer: Arc<AgentChunkBuffer<FindingLinkDecision>>,
1198 valid_finding_ids: Arc<HashSet<String>>,
1203 valid_semantic_ids: Arc<HashSet<String>>,
1205 label: String,
1209 target_finding_count: usize,
1212 emit_count: Arc<std::sync::atomic::AtomicUsize>,
1215}
1216
1217const LINK_EVIDENCE_MIN_HIGH_MEDIUM: usize = 40;
1222const LINK_EVIDENCE_MIN_LOW: usize = 15;
1223
1224impl EmitFindingLinkDecisionTool {
1225 async fn invoke(&self, args: FindingLinkDecision) -> std::result::Result<String, LLMYError> {
1226 if args.finding_id.trim().is_empty() {
1227 return Ok("error: `finding_id` must not be empty".to_string());
1228 }
1229 if !self.valid_finding_ids.contains(&args.finding_id) {
1230 return Ok(format!(
1231 "error: `finding_id` '{}' is not in this prompt's \"Findings To Link\" section. Re-emit using one of the IDs shown there exactly.",
1232 args.finding_id,
1233 ));
1234 }
1235
1236 let invalid_ids: Vec<&String> = args
1243 .semantic_evidence
1244 .iter()
1245 .map(|e| &e.semantic_id)
1246 .filter(|id| !self.valid_semantic_ids.contains(*id))
1247 .collect();
1248 if !invalid_ids.is_empty() {
1249 return Ok(format!(
1250 "error: `semantic_evidence` contains semantic_id values not in this prompt's \"Candidate Semantics\" list: {invalid_ids:?}. Re-emit this decision using only `Candidate ID` values shown above.",
1251 ));
1252 }
1253
1254 for evidence in &args.semantic_evidence {
1256 let Some(strength) =
1257 knowdit_kg_model::link_strength::LinkStrength::parse(&evidence.strength)
1258 else {
1259 return Ok(format!(
1260 "error: evidence for semantic_id '{}' under finding '{}' has unrecognised `strength` value {:?}. Use one of `High`, `Medium`, or `Low` (case-insensitive).",
1261 evidence.semantic_id, args.finding_id, evidence.strength,
1262 ));
1263 };
1264 let trimmed = evidence.why_finding_can_fire.trim();
1265 if trimmed.is_empty() {
1266 return Ok(format!(
1267 "error: evidence for semantic_id '{}' under finding '{}' has empty `why_finding_can_fire`. Even Low strength needs a brief reason (>= {} chars) explaining why the link is tangential.",
1268 evidence.semantic_id, args.finding_id, LINK_EVIDENCE_MIN_LOW,
1269 ));
1270 }
1271 let floor = match strength {
1272 knowdit_kg_model::link_strength::LinkStrength::High
1273 | knowdit_kg_model::link_strength::LinkStrength::Medium => {
1274 LINK_EVIDENCE_MIN_HIGH_MEDIUM
1275 }
1276 knowdit_kg_model::link_strength::LinkStrength::Low => LINK_EVIDENCE_MIN_LOW,
1277 };
1278 if trimmed.len() < floor {
1279 return Ok(format!(
1280 "error: evidence for semantic_id '{}' (strength={}) under finding '{}' is only {} chars; the {} tier requires at least {} chars. Cite a specific behaviour from the semantic's description that matches the finding's bug prerequisites.",
1281 evidence.semantic_id,
1282 strength,
1283 args.finding_id,
1284 trimmed.len(),
1285 strength,
1286 floor,
1287 ));
1288 }
1289 }
1290 let response = self
1291 .buffer
1292 .push_with_message(args, "finding link decision")
1293 .await;
1294 let n = self
1298 .emit_count
1299 .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1300 + 1;
1301 if n.is_multiple_of(LINK_PROGRESS_HEARTBEAT_EVERY) || n == self.target_finding_count {
1302 tracing::info!(
1303 "{}: progress {}/{} decisions emitted",
1304 self.label,
1305 n,
1306 self.target_finding_count,
1307 );
1308 }
1309 Ok(response)
1310 }
1311}
1312
1313const LINK_PROGRESS_HEARTBEAT_EVERY: usize = 25;
1317
1318#[derive(Debug, Clone)]
1319#[tool(
1320 arguments = FinalizeArgs,
1321 invoke = invoke,
1322 description = "Signal that every finding in this prompt has been processed via emit_finding_link_decision (with possibly empty semantic_ids when no candidate is related). Call exactly once, then stop.",
1323 name = "finalize_finding_link",
1324)]
1325struct FinalizeFindingLinkTool {
1326 buffer: Arc<AgentChunkBuffer<FindingLinkDecision>>,
1327}
1328
1329impl FinalizeFindingLinkTool {
1330 async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
1331 Ok(self
1332 .buffer
1333 .finalize_with_message(args.summary, "finding linking")
1334 .await)
1335 }
1336}
1337
1338struct FindingLinkBatchAgentRunner<'a> {
1344 llm: LLM,
1345 batch: &'a FindingLinkBatch,
1346 context: &'a FindingLinkContext,
1347 agent_options: AgentRunOptions,
1348 max_response_attempts: usize,
1349}
1350
1351impl<'a> FindingLinkBatchAgentRunner<'a> {
1352 async fn run(self) -> Result<Vec<PersistedFindingLinkResult>> {
1353 if self.batch.entries.is_empty() {
1354 return Ok(Vec::new());
1355 }
1356 if self.context.candidate_map.is_empty() {
1357 tracing::info!(
1358 "No semantic candidates for finding batch {}, marking {} finding(s) as processed without links",
1359 self.batch,
1360 self.batch.entries.len(),
1361 );
1362 return Ok(self
1363 .batch
1364 .entries
1365 .iter()
1366 .map(|entry| PersistedFindingLinkResult {
1367 finding_id: entry.pending.finding_id,
1368 link_target_finding_id: entry.pending.link_target_finding_id,
1369 semantic_links: Vec::new(),
1370 })
1371 .collect());
1372 }
1373
1374 tracing::info!(
1375 "Linking finding batch {} ({} finding(s), ~{} semantic tokens, ~{} finding tokens, finding_budget={}, input_budget={}, max_response_attempts={})",
1376 self.batch,
1377 self.batch.entries.len(),
1378 self.context.candidate_token_count,
1379 self.batch.finding_token_count,
1380 self.batch.finding_token_budget,
1381 self.batch.input_token_budget,
1382 self.max_response_attempts,
1383 );
1384
1385 let max_agent_steps = self.agent_options.max_agent_steps;
1390 let step_headroom = max_agent_steps.saturating_sub(self.batch.entries.len() + 1);
1391 if step_headroom < 4 {
1392 tracing::warn!(
1393 "Finding batch {} may not fit the step budget: {} finding(s) need ≥{} steps (emit+finalize), but --link-max-agent-steps is {}. Expect premature finalize / no-link defaults. Lower --max-findings-per-batch or raise --link-max-agent-steps.",
1394 self.batch,
1395 self.batch.entries.len(),
1396 self.batch.entries.len() + 1,
1397 max_agent_steps,
1398 );
1399 }
1400
1401 let valid_semantic_ids: Arc<HashSet<String>> =
1402 Arc::new(self.context.candidate_map.keys().cloned().collect());
1403 let mut collected: HashMap<i32, FindingLinkDecision> = HashMap::new();
1407
1408 for attempt in 1..=self.max_response_attempts {
1409 let still_missing: Vec<&FindingLinkBatchEntry> = self
1410 .batch
1411 .entries
1412 .iter()
1413 .filter(|entry| !collected.contains_key(&entry.pending.finding_id))
1414 .collect();
1415 if still_missing.is_empty() {
1416 break;
1417 }
1418
1419 let valid_finding_ids: Arc<HashSet<String>> = Arc::new(
1420 still_missing
1421 .iter()
1422 .map(|entry| entry.prompt_finding_id.clone())
1423 .collect(),
1424 );
1425 let mut user_prompt = self.context.prompt_prefix.clone();
1426 for entry in &still_missing {
1427 user_prompt.push_str(&entry.prompt_body);
1428 }
1429 let cache_key = format!("{}-attempt{:02}", self.context.cache_key, attempt);
1430 let label = format!("finding-link-{}-attempt{:02}", self.batch, attempt);
1431 let target_finding_count = still_missing.len();
1432 let agent_decisions = self
1433 .run_one_attempt(
1434 user_prompt,
1435 cache_key,
1436 label.clone(),
1437 valid_finding_ids,
1438 valid_semantic_ids.clone(),
1439 attempt,
1440 target_finding_count,
1441 )
1442 .await?;
1443 tracing::info!(
1444 "{}: agent emitted {} decision(s) for {} pending finding(s)",
1445 label,
1446 agent_decisions.len(),
1447 still_missing.len(),
1448 );
1449
1450 let entries_by_prompt_id: HashMap<&str, &FindingLinkBatchEntry> = self
1453 .batch
1454 .entries
1455 .iter()
1456 .map(|entry| (entry.prompt_finding_id.as_str(), entry))
1457 .collect();
1458 for decision in agent_decisions {
1459 if let Some(entry) = entries_by_prompt_id.get(decision.finding_id.as_str()) {
1460 collected.insert(entry.pending.finding_id, decision);
1461 }
1462 }
1463
1464 let leftover: Vec<&FindingLinkBatchEntry> = self
1467 .batch
1468 .entries
1469 .iter()
1470 .filter(|entry| !collected.contains_key(&entry.pending.finding_id))
1471 .collect();
1472 if leftover.is_empty() {
1473 break;
1474 }
1475 if attempt < self.max_response_attempts {
1476 tracing::warn!(
1477 "Finding-link batch {} attempt {}/{}: {} finding(s) still missing decisions ({}); retrying with restricted prompt",
1478 self.batch,
1479 attempt,
1480 self.max_response_attempts,
1481 leftover.len(),
1482 leftover
1483 .iter()
1484 .map(|entry| entry.prompt_finding_id.as_str())
1485 .collect::<Vec<_>>()
1486 .join(", "),
1487 );
1488 } else {
1489 tracing::warn!(
1490 "Finding-link batch {} exhausted {} attempt(s) with {} finding(s) still missing ({}); treating them as no-link results",
1491 self.batch,
1492 self.max_response_attempts,
1493 leftover.len(),
1494 leftover
1495 .iter()
1496 .map(|entry| entry.prompt_finding_id.as_str())
1497 .collect::<Vec<_>>()
1498 .join(", "),
1499 );
1500 }
1501 }
1502
1503 Ok(self
1507 .batch
1508 .entries
1509 .iter()
1510 .map(|entry| {
1511 let semantic_links = collected
1512 .remove(&entry.pending.finding_id)
1513 .map(|d| {
1514 Self::resolve_semantic_links(
1515 d.semantic_evidence,
1516 &self.context.candidate_map,
1517 )
1518 })
1519 .unwrap_or_default();
1520 PersistedFindingLinkResult {
1521 finding_id: entry.pending.finding_id,
1522 link_target_finding_id: entry.pending.link_target_finding_id,
1523 semantic_links,
1524 }
1525 })
1526 .collect())
1527 }
1528
1529 async fn run_one_attempt(
1530 &self,
1531 user_prompt: String,
1532 cache_key: String,
1533 label: String,
1534 valid_finding_ids: Arc<HashSet<String>>,
1535 valid_semantic_ids: Arc<HashSet<String>>,
1536 attempt: usize,
1537 target_finding_count: usize,
1538 ) -> Result<Vec<FindingLinkDecision>> {
1539 let buffer = AgentChunkBuffer::<FindingLinkDecision>::new();
1540 let mut tools = ToolBox::new();
1541 tools.add_tool(EmitFindingLinkDecisionTool {
1542 buffer: buffer.clone(),
1543 valid_finding_ids,
1544 valid_semantic_ids,
1545 label: label.clone(),
1546 target_finding_count,
1547 emit_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
1548 });
1549 tools.add_tool(FinalizeFindingLinkTool {
1550 buffer: buffer.clone(),
1551 });
1552 let runner = AgentChunkRunner {
1553 llm: self.llm.clone(),
1554 options: self
1555 .agent_options
1556 .scoped(&format!("link-{}-attempt{:02}", self.batch, attempt)),
1557 buffer: buffer.clone(),
1558 tools,
1559 system_prompt: prompts::GENERAL_ROLE_SYSTEM.to_string(),
1560 user_prompt,
1561 cache_key,
1562 label,
1563 };
1564 let _outcome = runner.run().await?;
1565 Ok(buffer.drain().await)
1566 }
1567
1568 fn resolve_semantic_links(
1576 evidence: Vec<LinkEvidence>,
1577 candidate_map: &HashMap<String, i32>,
1578 ) -> Vec<PersistedSemanticLink> {
1579 let mut by_id: BTreeMap<i32, PersistedSemanticLink> = BTreeMap::new();
1580 for ev in evidence {
1581 let Some(target) = candidate_map.get(&ev.semantic_id).copied() else {
1582 continue;
1583 };
1584 let Some(strength) = knowdit_kg_model::link_strength::LinkStrength::parse(&ev.strength)
1585 else {
1586 continue;
1587 };
1588 let new_link = PersistedSemanticLink {
1589 semantic_id: target,
1590 strength,
1591 evidence: ev.why_finding_can_fire,
1592 };
1593 by_id
1594 .entry(target)
1595 .and_modify(|existing| {
1596 if new_link.strength.rank() > existing.strength.rank() {
1597 *existing = new_link.clone();
1598 }
1599 })
1600 .or_insert(new_link);
1601 }
1602 by_id.into_values().collect()
1603 }
1604}
1605
1606fn prompt_finding_id(finding_id: i32) -> String {
1607 format!("finding-{}", finding_id)
1608}
1609
1610pub async fn retro_link_pending_semantics(
1628 db: &HistoricalDatabase,
1629 llm: &LLM,
1630 options: FindingLinkOptions,
1631) -> Result<()> {
1632 let pending_semantics = db.list_pending_canonical_semantics().await?;
1633 if pending_semantics.is_empty() {
1634 tracing::info!("Retro-link: pending_semantic queue is empty, nothing to do");
1635 return Ok(());
1636 }
1637 let already_linked_findings = db.list_findings_with_link_status().await?;
1638 if already_linked_findings.is_empty() {
1639 tracing::info!(
1640 "Retro-link: no findings in finding_link_status; clearing the {} queued pending semantic(s) without LLM work (bootstrap path)",
1641 pending_semantics.len()
1642 );
1643 db.clear_pending_semantic_queue().await?;
1644 return Ok(());
1645 }
1646
1647 let model = &llm.model;
1648 let max_response_attempts = options.max_response_attempts.max(1);
1649 let agent_options = AgentRunOptions::new(options.max_agent_steps);
1650 let budgets = FindingLinkBudgets::from_options(model, options);
1651
1652 let candidate_entries: Vec<FindingLinkCandidateEntry> = pending_semantics
1655 .iter()
1656 .map(|node| {
1657 let candidate = SemanticLinkCandidate {
1658 candidate_id: format!("sem-{}", node.id),
1659 canonical_semantic_id: node.id,
1660 is_canonical: true,
1661 category: node.category,
1662 name: node.name.clone(),
1663 definition: node.definition.clone(),
1664 description: node.description.clone(),
1665 };
1666 let prompt_body = render_finding_link_candidate(&candidate);
1667 FindingLinkCandidateEntry {
1668 candidate_id: candidate.candidate_id,
1669 canonical_semantic_id: candidate.canonical_semantic_id,
1670 is_canonical: candidate.is_canonical,
1671 category: candidate.category,
1672 name: candidate.name,
1673 token_count: count_tokens(model, &prompt_body),
1674 prompt_body,
1675 }
1676 })
1677 .collect();
1678
1679 let context_key = FindingLinkContextKey::empty();
1680 let context = std::sync::Arc::new(FindingLinkContext::build(
1681 model,
1682 &context_key,
1683 candidate_entries,
1684 ));
1685 let finding_token_budget = budgets.finding_token_budget(model, context.as_ref())?;
1686
1687 let entries: Vec<FindingLinkBatchEntry> = already_linked_findings
1689 .into_iter()
1690 .map(|pending| {
1691 let prompt_finding_id = prompt_finding_id(pending.finding_id);
1692 let prompt_body = prompts::finding_link_finding_entry(
1693 &prompt_finding_id,
1694 &pending.categories,
1695 &pending.finding.title,
1696 pending.finding.severity,
1697 pending.finding.category,
1698 &pending.finding.subcategory,
1699 &pending.finding.root_cause,
1700 &pending.finding.description,
1701 &pending.finding.patterns,
1702 &pending.finding.exploits,
1703 );
1704 let token_count = count_tokens(model, &prompt_body);
1705 FindingLinkBatchEntry {
1706 pending,
1707 prompt_finding_id,
1708 prompt_body,
1709 token_count,
1710 }
1711 })
1712 .collect();
1713
1714 let total = entries.len();
1715 let batches = partition_finding_link_entries(
1716 entries,
1717 finding_token_budget,
1718 budgets.max_findings_per_batch,
1719 );
1720 tracing::info!(
1721 "Retro-link: {} pending canonical semantic(s) × {} pre-linked finding(s) → {} batch(es)",
1722 context.candidate_map.len(),
1723 total,
1724 batches.len()
1725 );
1726
1727 let mut total_inserted = 0usize;
1728 for (idx, batch_entries) in batches.into_iter().enumerate() {
1729 let finding_token_count = batch_entries.iter().map(|e| e.token_count).sum();
1730 let batch = FindingLinkBatch {
1731 context_key: context_key.clone(),
1732 entries: batch_entries,
1733 finding_token_count,
1734 finding_token_budget,
1735 input_token_budget: budgets.input_token_budget,
1736 };
1737 tracing::info!(
1738 "Retro-link batch {}: {} finding(s)",
1739 idx + 1,
1740 batch.entries.len()
1741 );
1742 let runner = FindingLinkBatchAgentRunner {
1743 llm: llm.clone(),
1744 batch: &batch,
1745 context: context.as_ref(),
1746 agent_options: agent_options.clone(),
1747 max_response_attempts,
1748 };
1749 let results = runner.run().await?;
1750
1751 let mut edges = Vec::new();
1755 for result in results {
1756 for link in result.semantic_links {
1757 edges.push(crate::db::LinkEdge {
1758 semantic_node_id: link.semantic_id,
1759 audit_finding_id: result.link_target_finding_id,
1760 strength: link.strength,
1761 evidence: link.evidence,
1762 });
1763 }
1764 }
1765 let inserted = db.append_semantic_finding_links(&edges).await?;
1766 total_inserted += inserted;
1767 tracing::info!(
1768 "Retro-link batch {} inserted {} new semantic_finding_link row(s)",
1769 idx + 1,
1770 inserted
1771 );
1772 }
1773
1774 let cleared = db.clear_pending_semantic_queue().await?;
1775 tracing::info!(
1776 "Retro-link complete: {} new link(s) total, cleared {} row(s) from pending_semantic queue",
1777 total_inserted,
1778 cleared
1779 );
1780 Ok(())
1781}
1782
1783fn finding_link_batch_entry_span(entries: &[FindingLinkBatchEntry]) -> String {
1784 let first = entries.first().map(|entry| entry.pending.finding_id);
1785 let last = entries.last().map(|entry| entry.pending.finding_id);
1786 match (first, last) {
1787 (Some(first), Some(last)) if first == last => format!("{}", first),
1788 (Some(first), Some(last)) => format!("{}-{}", first, last),
1789 _ => "empty".to_string(),
1790 }
1791}
1792
1793#[cfg(test)]
1794mod tests {
1795 use super::*;
1796 use crate::vulnerability::{FindingSeverity, VulnerabilityCategory};
1797
1798 fn sample_finding(id: i32) -> PendingFindingForLinking {
1799 PendingFindingForLinking {
1800 finding_id: id,
1801 link_target_finding_id: id,
1802 categories: vec![DeFiCategory::Dexes],
1803 finding: ExtractedFinding {
1804 title: format!("Finding {}", id),
1805 severity: FindingSeverity::Medium,
1806 category: VulnerabilityCategory::AccessControl,
1807 subcategory: "Missing Input Validation".to_string(),
1808 root_cause: "Unchecked parameters".to_string(),
1809 description: "A critical flow trusts malformed input.".to_string(),
1810 patterns: "Input not validated".to_string(),
1811 exploits: "Attacker passes malformed values".to_string(),
1812 },
1813 }
1814 }
1815
1816 fn sample_finding_with_categories(
1817 id: i32,
1818 categories: Vec<DeFiCategory>,
1819 ) -> PendingFindingForLinking {
1820 let mut finding = sample_finding(id);
1821 finding.categories = categories;
1822 finding
1823 }
1824
1825 fn sample_batch_entry(id: i32, token_count: usize) -> FindingLinkBatchEntry {
1826 FindingLinkBatchEntry {
1827 pending: sample_finding(id),
1828 prompt_finding_id: prompt_finding_id(id),
1829 prompt_body: format!("### finding-{}\n", id),
1830 token_count,
1831 }
1832 }
1833
1834 fn sample_context_key(category: DeFiCategory) -> FindingLinkContextKey {
1835 FindingLinkContextKey::for_category(category)
1836 }
1837
1838 #[test]
1839 fn partition_finding_link_entries_respects_budget_and_order() {
1840 let batches = partition_finding_link_entries(
1841 vec![
1842 sample_batch_entry(101, 40),
1843 sample_batch_entry(102, 20),
1844 sample_batch_entry(103, 50),
1845 ],
1846 60,
1847 usize::MAX,
1848 );
1849
1850 assert_eq!(batches.len(), 2);
1851 assert_eq!(
1852 batches[0]
1853 .iter()
1854 .map(|entry| entry.pending.finding_id)
1855 .collect::<Vec<_>>(),
1856 vec![101, 102]
1857 );
1858 assert_eq!(
1859 batches[1]
1860 .iter()
1861 .map(|entry| entry.pending.finding_id)
1862 .collect::<Vec<_>>(),
1863 vec![103]
1864 );
1865 }
1866
1867 #[test]
1868 fn partition_finding_link_entries_keeps_oversized_singleton_batch() {
1869 let batches = partition_finding_link_entries(
1870 vec![sample_batch_entry(201, 120), sample_batch_entry(202, 30)],
1871 100,
1872 usize::MAX,
1873 );
1874
1875 assert_eq!(batches.len(), 2);
1876 assert_eq!(batches[0].len(), 1);
1877 assert_eq!(batches[0][0].pending.finding_id, 201);
1878 assert_eq!(batches[1].len(), 1);
1879 assert_eq!(batches[1][0].pending.finding_id, 202);
1880 }
1881
1882 #[test]
1883 fn partition_finding_link_entries_honours_count_cap_under_token_budget() {
1884 let batches = partition_finding_link_entries(
1886 vec![
1887 sample_batch_entry(301, 10),
1888 sample_batch_entry(302, 10),
1889 sample_batch_entry(303, 10),
1890 sample_batch_entry(304, 10),
1891 sample_batch_entry(305, 10),
1892 ],
1893 10_000,
1894 2,
1895 );
1896
1897 assert_eq!(batches.len(), 3);
1898 assert_eq!(batches[0].len(), 2);
1899 assert_eq!(batches[1].len(), 2);
1900 assert_eq!(batches[2].len(), 1);
1901 }
1902
1903 #[test]
1904 fn resolve_semantic_links_dedupes_and_maps_through_candidate_map() {
1905 let candidate_map: HashMap<String, i32> = HashMap::from([
1910 ("sem-1".to_string(), 1),
1911 ("sem-merged".to_string(), 1),
1912 ("sem-2".to_string(), 2),
1913 ]);
1914 let resolved = FindingLinkBatchAgentRunner::resolve_semantic_links(
1915 vec![
1916 LinkEvidence {
1917 semantic_id: "sem-1".to_string(),
1918 strength: "High".to_string(),
1919 why_finding_can_fire: "shared behaviour 1".to_string(),
1920 },
1921 LinkEvidence {
1922 semantic_id: "sem-merged".to_string(),
1923 strength: "High".to_string(),
1924 why_finding_can_fire: "shared behaviour merged".to_string(),
1925 },
1926 LinkEvidence {
1927 semantic_id: "sem-2".to_string(),
1928 strength: "High".to_string(),
1929 why_finding_can_fire: "shared behaviour 2".to_string(),
1930 },
1931 ],
1932 &candidate_map,
1933 );
1934 let resolved_ids: Vec<i32> = resolved.iter().map(|link| link.semantic_id).collect();
1935 assert_eq!(resolved_ids, vec![1, 2]);
1936 }
1937
1938 #[test]
1939 fn merge_finding_link_batch_results_aggregates_multiple_categories() {
1940 use knowdit_kg_model::link_strength::LinkStrength;
1941
1942 fn link(sid: i32, strength: LinkStrength) -> PersistedSemanticLink {
1943 PersistedSemanticLink {
1944 semantic_id: sid,
1945 strength,
1946 evidence: format!("evidence for sem-{sid}"),
1947 }
1948 }
1949
1950 let mut aggregated_results = HashMap::from([(
1951 501,
1952 AggregatedFindingLinkResult {
1953 finding_id: 501,
1954 link_target_finding_id: 777,
1955 expected_contexts: 2,
1956 completed_contexts: 0,
1957 semantic_links: BTreeMap::new(),
1958 },
1959 )]);
1960
1961 merge_finding_link_batch_results(
1962 &mut aggregated_results,
1963 vec![PersistedFindingLinkResult {
1964 finding_id: 501,
1965 link_target_finding_id: 777,
1966 semantic_links: vec![link(3, LinkStrength::Medium), link(1, LinkStrength::Medium)],
1967 }],
1968 )
1969 .expect("first category result should merge");
1970
1971 merge_finding_link_batch_results(
1972 &mut aggregated_results,
1973 vec![PersistedFindingLinkResult {
1974 finding_id: 501,
1975 link_target_finding_id: 777,
1976 semantic_links: vec![link(2, LinkStrength::Medium), link(3, LinkStrength::Medium)],
1977 }],
1978 )
1979 .expect("second category result should merge");
1980
1981 let mut failed_findings = HashSet::new();
1982 let results = finalize_finding_link_results(aggregated_results, &mut failed_findings);
1983
1984 assert!(failed_findings.is_empty());
1985 assert_eq!(results.len(), 1);
1986 assert_eq!(results[0].finding_id, 501);
1987 assert_eq!(results[0].link_target_finding_id, 777);
1988 let resolved_ids: Vec<i32> = results[0]
1989 .semantic_links
1990 .iter()
1991 .map(|l| l.semantic_id)
1992 .collect();
1993 assert_eq!(resolved_ids, vec![1, 2, 3]);
1994 }
1995
1996 #[test]
1997 fn finalize_finding_link_results_rejects_missing_category_runs() {
1998 let aggregated_results = HashMap::from([(
1999 601,
2000 AggregatedFindingLinkResult {
2001 finding_id: 601,
2002 link_target_finding_id: 601,
2003 expected_contexts: 3,
2004 completed_contexts: 2,
2005 semantic_links: BTreeMap::new(),
2006 },
2007 )]);
2008
2009 let mut failed_findings = HashSet::new();
2010 let results = finalize_finding_link_results(aggregated_results, &mut failed_findings);
2011
2012 assert!(results.is_empty());
2013 assert!(failed_findings.contains(&601));
2014 }
2015
2016 #[test]
2017 fn finding_link_context_plan_collapses_to_single_all_canonical_key() {
2018 let finding = sample_finding_with_categories(
2022 701,
2023 vec![
2024 DeFiCategory::Yield,
2025 DeFiCategory::Dexes,
2026 DeFiCategory::Yield,
2027 ],
2028 );
2029
2030 let context_keys = FindingLinkContextKey::from_project_categories(&finding.categories);
2031 assert_eq!(context_keys.len(), 1);
2032 assert!(context_keys[0].categories.is_empty());
2033 }
2034
2035 #[test]
2036 fn partition_finding_link_candidate_groups_keeps_aliases_with_canonical() {
2037 let batches = partition_finding_link_candidate_groups(
2038 vec![
2039 FindingLinkCandidateGroup {
2040 canonical_semantic_id: 1,
2041 category: DeFiCategory::Dexes,
2042 name: "Canonical 1".to_string(),
2043 token_count: 70,
2044 entries: vec![
2045 FindingLinkCandidateEntry {
2046 candidate_id: "sem-1".to_string(),
2047 canonical_semantic_id: 1,
2048 is_canonical: true,
2049 category: DeFiCategory::Dexes,
2050 name: "Canonical 1".to_string(),
2051 prompt_body: "candidate-1".to_string(),
2052 token_count: 40,
2053 },
2054 FindingLinkCandidateEntry {
2055 candidate_id: "sem-101".to_string(),
2056 canonical_semantic_id: 1,
2057 is_canonical: false,
2058 category: DeFiCategory::Dexes,
2059 name: "Alias 101".to_string(),
2060 prompt_body: "candidate-101".to_string(),
2061 token_count: 30,
2062 },
2063 ],
2064 },
2065 FindingLinkCandidateGroup {
2066 canonical_semantic_id: 2,
2067 category: DeFiCategory::Dexes,
2068 name: "Canonical 2".to_string(),
2069 token_count: 20,
2070 entries: vec![FindingLinkCandidateEntry {
2071 candidate_id: "sem-2".to_string(),
2072 canonical_semantic_id: 2,
2073 is_canonical: true,
2074 category: DeFiCategory::Dexes,
2075 name: "Canonical 2".to_string(),
2076 prompt_body: "candidate-2".to_string(),
2077 token_count: 20,
2078 }],
2079 },
2080 ],
2081 60,
2082 );
2083
2084 assert_eq!(batches.len(), 2);
2085 assert_eq!(
2086 batches[0]
2087 .iter()
2088 .map(|entry| entry.candidate_id.as_str())
2089 .collect::<Vec<_>>(),
2090 vec!["sem-1", "sem-101"]
2091 );
2092 assert_eq!(
2093 batches[1]
2094 .iter()
2095 .map(|entry| entry.candidate_id.as_str())
2096 .collect::<Vec<_>>(),
2097 vec!["sem-2"]
2098 );
2099 }
2100
2101 #[test]
2102 fn group_finding_link_candidate_entries_requires_canonical_entry() {
2103 let error = group_finding_link_candidate_entries(vec![FindingLinkCandidateEntry {
2104 candidate_id: "sem-101".to_string(),
2105 canonical_semantic_id: 1,
2106 is_canonical: false,
2107 category: DeFiCategory::Dexes,
2108 name: "Alias 101".to_string(),
2109 prompt_body: "candidate-101".to_string(),
2110 token_count: 30,
2111 }])
2112 .expect_err("candidate groups should require the canonical semantic entry");
2113
2114 assert!(
2115 error.to_string().contains(
2116 "Missing canonical semantic sem-1 while building finding-link candidate group"
2117 ),
2118 "unexpected error: {error}"
2119 );
2120 }
2121}