Skip to main content

knowdit_kg/
link.rs

1use crate::agent_runner::{AgentChunkBuffer, AgentChunkRunner, AgentRunOptions};
2use crate::agents::FinalizeArgs;
3use crate::category::DeFiCategory;
4use crate::db::HistoricalDatabase;
5use crate::error::{KgError, Result};
6use crate::learn::{ExtractedFinding, count_tokens, get_context_budget, sanitize_prompt_prefix};
7use crate::prompts;
8use itertools::Itertools;
9use llmy::agent::tool::ToolBox;
10use llmy::agent::{LLMYError, tool};
11use llmy::client::client::LLM;
12use llmy::client::model::OpenAIModel;
13use schemars::JsonSchema;
14use serde::Deserialize;
15use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
16use std::sync::Arc;
17
18/// One per-finding decision emitted by the link agent. Doubles as the
19/// `emit_finding_link_decision` tool's `ARGUMENTS` shape — no duplicate
20/// "Args" mirror. Field order keeps `reasoning` first to nudge the agent
21/// into chain-of-thought before the evidence list.
22#[derive(Debug, Clone, Deserialize, JsonSchema)]
23pub struct FindingLinkDecision {
24    /// Brief explanation of why these semantics are related (or why none
25    /// were chosen).
26    pub reasoning: String,
27    /// `finding-N` id taken from the prompt's "Findings To Link" section.
28    /// Must be one of the IDs the prompt listed; otherwise the tool
29    /// rejects the call so the agent can retry.
30    pub finding_id: String,
31    /// Per-link evidence. Empty list = "no link" — that's a valid decision;
32    /// the tool still records it so the orchestrator counts the finding as
33    /// covered. Each entry must reference a `Candidate ID` from the
34    /// prompt's candidate list AND articulate concretely why this finding's
35    /// bug pattern can fire on that semantic.
36    pub semantic_evidence: Vec<LinkEvidence>,
37}
38
39/// Concrete evidence justifying one finding↔semantic link.
40///
41/// Under the v2 "forced per-candidate emit" contract (plan_link.md §6.1),
42/// the agent must emit one LinkEvidence per candidate semantic shown,
43/// each tagged with a `strength` label from `{High, Medium, Low}`.
44#[derive(Debug, Clone, Deserialize, JsonSchema)]
45pub struct LinkEvidence {
46    /// `Candidate ID` (e.g. `sem-12`) from the prompt's "Candidate
47    /// Semantics" section.
48    pub semantic_id: String,
49    /// Strength tier for this link, drawn from {`High`, `Medium`, `Low`}.
50    /// Parsed via `LinkStrength::parse`; unknown values are rejected by
51    /// the tool handler so the agent can retry. Per-tier evidence length
52    /// floors: High/Medium >= 40 chars, Low >= 15 chars.
53    pub strength: String,
54    /// One- or two-sentence concrete justification: cite a specific
55    /// behaviour from the semantic's description (state mutation, call
56    /// ordering, external boundary, invariant) that matches the finding's
57    /// pattern/exploit prerequisites. Generic theme overlap ("both involve
58    /// deposit") is not sufficient — name the specific shared behaviour.
59    pub why_finding_can_fire: String,
60}
61
62#[derive(Debug, Clone)]
63pub struct PendingFindingForLinking {
64    pub finding_id: i32,
65    pub link_target_finding_id: i32,
66    pub categories: Vec<DeFiCategory>,
67    pub finding: ExtractedFinding,
68}
69
70/// One link decision the link agent produced for one (finding, semantic)
71/// edge. Carries strength + evidence so it can be persisted into
72/// `semantic_finding_link.{strength,evidence}` directly.
73#[derive(Debug, Clone)]
74pub struct PersistedSemanticLink {
75    pub semantic_id: i32,
76    pub strength: knowdit_kg_model::link_strength::LinkStrength,
77    pub evidence: String,
78}
79
80#[derive(Debug, Clone)]
81pub struct PersistedFindingLinkResult {
82    pub finding_id: i32,
83    pub link_target_finding_id: i32,
84    pub semantic_links: Vec<PersistedSemanticLink>,
85}
86
87#[derive(Debug, Clone, Copy)]
88pub struct FindingLinkOptions {
89    pub concurrency: usize,
90    pub input_token_budget: Option<usize>,
91    pub finding_token_budget: Option<usize>,
92    /// Hard ceiling on findings per batch — independent of token
93    /// budget. The token-based partitioner would happily pack 400+
94    /// findings into a single batch on small-finding KGs (e.g. the
95    /// Move KG: ~150 tok/finding × 459 findings ≈ 70K tok, fits
96    /// in one 72K budget), but agents in practice can only stably
97    /// emit decisions for a few dozen findings per attempt before
98    /// finalizing prematurely. Cap both ways: token budget AND
99    /// finding count. `None` ⇒ uses [`DEFAULT_MAX_FINDINGS_PER_BATCH`].
100    pub max_findings_per_batch: Option<usize>,
101    /// Max attempts per (finding-batch × semantic-chunk) — if the agent
102    /// finalizes without covering every finding in the batch, the runner
103    /// re-runs a fresh agent restricted to the still-missing findings.
104    /// After this many attempts the missing findings are treated as
105    /// `no-link` and a warning is logged.
106    pub max_response_attempts: usize,
107    /// Per-attempt cap on agent steps (inner step loop, not the
108    /// missing-coverage retry counter).
109    pub max_agent_steps: usize,
110    pub include_unlinked: bool,
111}
112
113#[derive(Debug, Clone)]
114struct SemanticLinkCandidate {
115    candidate_id: String,
116    canonical_semantic_id: i32,
117    is_canonical: bool,
118    category: DeFiCategory,
119    name: String,
120    definition: String,
121    description: String,
122}
123
124#[derive(Debug, Clone)]
125struct FindingLinkContext {
126    prompt_prefix: String,
127    prompt_prefix_tokens: usize,
128    candidate_token_count: usize,
129    candidate_map: HashMap<String, i32>,
130    cache_key: String,
131}
132
133#[derive(Debug, Clone)]
134struct FindingLinkCandidateEntry {
135    candidate_id: String,
136    canonical_semantic_id: i32,
137    is_canonical: bool,
138    category: DeFiCategory,
139    name: String,
140    prompt_body: String,
141    token_count: usize,
142}
143
144/// A semantic chunking unit built around one canonical semantic and every
145/// merged alias that resolves to it.
146///
147/// `FindingLinkCandidateEntry` is the prompt-level unit that gets rendered into
148/// the candidate list. `FindingLinkCandidateGroup` is one level above that: it
149/// keeps the canonical entry and all of its aliases together so chunking never
150/// splits an alias away from the canonical target the model must return.
151#[derive(Debug, Clone)]
152struct FindingLinkCandidateGroup {
153    canonical_semantic_id: i32,
154    category: DeFiCategory,
155    name: String,
156    entries: Vec<FindingLinkCandidateEntry>,
157    token_count: usize,
158}
159
160#[derive(Debug, Clone)]
161struct FindingLinkBatchEntry {
162    pending: PendingFindingForLinking,
163    prompt_finding_id: String,
164    prompt_body: String,
165    token_count: usize,
166}
167
168#[derive(Debug, Clone)]
169struct FindingLinkBatch {
170    context_key: FindingLinkContextKey,
171    entries: Vec<FindingLinkBatchEntry>,
172    finding_token_count: usize,
173    finding_token_budget: usize,
174    input_token_budget: usize,
175}
176
177impl std::fmt::Display for FindingLinkBatch {
178    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
179        write!(
180            f,
181            "{}:{}",
182            self.context_key.label(),
183            finding_link_batch_entry_span(&self.entries)
184        )
185    }
186}
187
188#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
189struct FindingLinkContextKey {
190    categories: BTreeSet<DeFiCategory>,
191    semantic_chunk_index: usize,
192}
193
194impl FindingLinkContextKey {
195    fn empty() -> Self {
196        Self {
197            categories: BTreeSet::new(),
198            semantic_chunk_index: 0,
199        }
200    }
201
202    #[cfg(test)]
203    fn for_category(category: DeFiCategory) -> Self {
204        let mut categories = BTreeSet::new();
205        categories.insert(category);
206        Self {
207            categories,
208            semantic_chunk_index: 0,
209        }
210    }
211
212    /// Linking is one-time. Cost is not the concern; we want the LLM to see
213    /// every canonical semantic for every pending finding in one consistent
214    /// pass, irrespective of the originating project's categories. The empty
215    /// `categories` set is the marker that downstream code uses to switch
216    /// into "all-canonical-semantics" candidate loading.
217    fn from_project_categories(_categories: &[DeFiCategory]) -> Vec<Self> {
218        vec![Self::empty()]
219    }
220
221    fn prompt_category(&self) -> Option<DeFiCategory> {
222        self.categories.iter().next().copied()
223    }
224
225    fn with_semantic_chunk(&self, semantic_chunk_index: usize) -> Self {
226        Self {
227            categories: self.categories.clone(),
228            semantic_chunk_index,
229        }
230    }
231
232    fn cache_key(&self) -> String {
233        format!("finding-link-{}", self.slug())
234    }
235
236    fn label(&self) -> String {
237        self.slug()
238    }
239
240    fn slug(&self) -> String {
241        // Always include `chunk-<n>` — even for the empty-categories
242        // "all-canonical" pass, the semantic library is split into N
243        // chunks and each becomes its own batch. Without the chunk
244        // suffix all N parallel batches log identically and the user
245        // can't tell which one is still running.
246        let base = if self.categories.is_empty() {
247            "none".to_string()
248        } else {
249            sanitize_prompt_prefix(
250                &self
251                    .categories
252                    .iter()
253                    .map(DeFiCategory::as_str)
254                    .collect::<Vec<_>>()
255                    .join("-"),
256            )
257        };
258
259        format!("{}-chunk-{}", base, self.semantic_chunk_index)
260    }
261}
262
263impl FindingLinkContext {
264    fn build(
265        model: &OpenAIModel,
266        context_key: &FindingLinkContextKey,
267        candidate_entries: Vec<FindingLinkCandidateEntry>,
268    ) -> Self {
269        let candidate_token_count = candidate_entries
270            .iter()
271            .map(|entry| entry.token_count)
272            .sum();
273        let candidate_text = candidate_entries
274            .iter()
275            .map(|entry| entry.prompt_body.as_str())
276            .collect::<String>();
277        let candidate_map = candidate_entries
278            .into_iter()
279            .map(|entry| (entry.candidate_id, entry.canonical_semantic_id))
280            .collect();
281        let prompt_prefix =
282            prompts::finding_link_user_prefix(context_key.prompt_category(), &candidate_text);
283
284        Self {
285            prompt_prefix_tokens: count_tokens(model, &prompt_prefix),
286            prompt_prefix,
287            candidate_token_count,
288            candidate_map,
289            cache_key: context_key.cache_key(),
290        }
291    }
292}
293
294#[derive(Debug, Default)]
295struct FindingLinkContextPlan {
296    pending_by_context: BTreeMap<FindingLinkContextKey, Vec<PendingFindingForLinking>>,
297}
298
299impl FindingLinkContextPlan {
300    fn from_pending_findings(pending_findings: Vec<PendingFindingForLinking>) -> Self {
301        let mut plan = Self::default();
302
303        for pending in pending_findings {
304            for context_key in FindingLinkContextKey::from_project_categories(&pending.categories) {
305                plan.pending_by_context
306                    .entry(context_key)
307                    .or_default()
308                    .push(pending.clone());
309            }
310        }
311
312        plan
313    }
314
315    async fn materialize(
316        self,
317        db: &HistoricalDatabase,
318        model: &OpenAIModel,
319        budgets: &FindingLinkBudgets,
320    ) -> Result<FindingLinkExecutionPlan> {
321        use std::sync::Arc;
322
323        let mut contexts = BTreeMap::new();
324        let mut expanded_pending = BTreeMap::new();
325        let mut expected_context_counts = HashMap::new();
326
327        for (base_context_key, pending_findings) in self.pending_by_context {
328            let chunked_contexts =
329                build_finding_link_contexts(db, model, &base_context_key, budgets).await?;
330            let chunk_count = chunked_contexts.len();
331
332            for pending in &pending_findings {
333                expected_context_counts
334                    .entry(pending.finding_id)
335                    .and_modify(|count| *count += chunk_count)
336                    .or_insert(chunk_count);
337            }
338
339            for (context_key, context) in chunked_contexts {
340                contexts.insert(context_key.clone(), Arc::new(context));
341                expanded_pending.insert(context_key, pending_findings.clone());
342            }
343        }
344
345        let batches = build_finding_link_batches(model, expanded_pending, &contexts, budgets)?;
346
347        Ok(FindingLinkExecutionPlan {
348            contexts,
349            batches,
350            expected_context_counts,
351        })
352    }
353}
354
355#[derive(Debug)]
356struct FindingLinkExecutionPlan {
357    contexts: BTreeMap<FindingLinkContextKey, std::sync::Arc<FindingLinkContext>>,
358    batches: Vec<FindingLinkBatch>,
359    expected_context_counts: HashMap<i32, usize>,
360}
361
362impl FindingLinkExecutionPlan {
363    fn expected_context_count(&self, finding_id: i32) -> usize {
364        self.expected_context_counts
365            .get(&finding_id)
366            .copied()
367            .unwrap_or(0)
368    }
369
370    fn task_count(&self) -> usize {
371        self.batches.iter().map(|batch| batch.entries.len()).sum()
372    }
373}
374
375/// Hard cap on findings per batch when [`FindingLinkOptions::max_findings_per_batch`]
376/// isn't set. Empirically ~50 is what the post-step-cap-200 agent
377/// handles reliably across both Move and Solidity KGs without
378/// finalizing prematurely. Override with `--max-findings-per-batch`.
379const DEFAULT_MAX_FINDINGS_PER_BATCH: usize = 50;
380
381#[derive(Debug, Clone, Copy)]
382struct FindingLinkBudgets {
383    input_token_budget: usize,
384    semantic_token_target: usize,
385    finding_token_target: usize,
386    /// Independent of token budget: hard cap on findings per batch.
387    /// `partition_finding_link_entries` honours `min(token, count)`.
388    max_findings_per_batch: usize,
389}
390
391impl FindingLinkBudgets {
392    fn from_options(model: &OpenAIModel, options: FindingLinkOptions) -> Self {
393        let max_input_budget = get_context_budget(model).max(1);
394        let input_token_budget = options
395            .input_token_budget
396            .unwrap_or(max_input_budget)
397            .min(max_input_budget)
398            .max(1);
399        let shared_target = (input_token_budget / 3).max(1);
400        let finding_token_target = options
401            .finding_token_budget
402            .unwrap_or(shared_target)
403            .min(shared_target)
404            .max(1);
405        let max_findings_per_batch = options
406            .max_findings_per_batch
407            .unwrap_or(DEFAULT_MAX_FINDINGS_PER_BATCH)
408            .max(1);
409
410        Self {
411            input_token_budget,
412            semantic_token_target: shared_target,
413            finding_token_target,
414            max_findings_per_batch,
415        }
416    }
417
418    fn semantic_token_budget(
419        &self,
420        model: &OpenAIModel,
421        prompt_category: Option<DeFiCategory>,
422    ) -> Result<usize> {
423        let system_tokens = count_tokens(model, prompts::GENERAL_ROLE_SYSTEM);
424        let stable_prefix_tokens = count_tokens(
425            model,
426            &prompts::finding_link_user_prefix(prompt_category, ""),
427        );
428        let available_for_candidates = self
429            .input_token_budget
430            .saturating_sub(system_tokens + stable_prefix_tokens + self.finding_token_target);
431        let effective = self.semantic_token_target.min(available_for_candidates);
432
433        if effective == 0 {
434            return Err(KgError::other(format!(
435                "Finding link prompt for category '{}' leaves no room for semantic candidates under input budget {}",
436                prompt_category
437                    .map(|category| category.as_str())
438                    .unwrap_or("None"),
439                self.input_token_budget
440            )));
441        }
442
443        Ok(effective)
444    }
445
446    fn finding_token_budget(
447        &self,
448        model: &OpenAIModel,
449        context: &FindingLinkContext,
450    ) -> Result<usize> {
451        let system_tokens = count_tokens(model, prompts::GENERAL_ROLE_SYSTEM);
452        let available = self
453            .input_token_budget
454            .saturating_sub(system_tokens + context.prompt_prefix_tokens);
455        let effective = self.finding_token_target.min(available);
456
457        if effective == 0 {
458            return Err(KgError::other(format!(
459                "Finding link context '{}' leaves no room for batched findings under input budget {}",
460                context.cache_key, self.input_token_budget
461            )));
462        }
463
464        Ok(effective)
465    }
466}
467
468#[derive(Debug)]
469struct AggregatedFindingLinkResult {
470    finding_id: i32,
471    link_target_finding_id: i32,
472    expected_contexts: usize,
473    completed_contexts: usize,
474    /// Keyed by semantic_id (post-canonical-resolution). When multiple
475    /// contexts emit the same (finding, semantic) pair (e.g. raw vs
476    /// canonical both linked), we keep the strongest strength; the
477    /// evidence text is taken from the same row.
478    semantic_links: BTreeMap<i32, PersistedSemanticLink>,
479}
480
481pub async fn link_pending_findings(
482    db: &HistoricalDatabase,
483    llm: &LLM,
484    options: FindingLinkOptions,
485) -> Result<()> {
486    use std::sync::Arc;
487    use tokio::sync::mpsc;
488    use tokio::task::JoinSet;
489
490    let pending_findings = db
491        .list_findings_for_linking(options.include_unlinked)
492        .await?;
493    if pending_findings.is_empty() {
494        tracing::info!("No findings pending linking.");
495        return Ok(());
496    }
497
498    let total_pending_findings = pending_findings.len();
499    let concurrency = options.concurrency.max(1);
500    let max_response_attempts = options.max_response_attempts.max(1);
501    let agent_options = AgentRunOptions::new(options.max_agent_steps);
502    let budgets = FindingLinkBudgets::from_options(&llm.model, options);
503    tracing::info!(
504        "Will link {} pending findings (concurrency={}, input token budget={}, target semantic tokens={}, target finding tokens={}, max response attempts={}, max agent steps={})",
505        total_pending_findings,
506        concurrency,
507        budgets.input_token_budget,
508        budgets.semantic_token_target,
509        budgets.finding_token_target,
510        max_response_attempts,
511        options.max_agent_steps,
512    );
513
514    let plan = FindingLinkContextPlan::from_pending_findings(pending_findings.clone());
515    let execution_plan = plan.materialize(db, &llm.model, &budgets).await?;
516    let mut aggregated_results: HashMap<i32, AggregatedFindingLinkResult> = pending_findings
517        .iter()
518        .map(|pending| {
519            (
520                pending.finding_id,
521                AggregatedFindingLinkResult {
522                    finding_id: pending.finding_id,
523                    link_target_finding_id: pending.link_target_finding_id,
524                    expected_contexts: execution_plan.expected_context_count(pending.finding_id),
525                    completed_contexts: 0,
526                    semantic_links: BTreeMap::new(),
527                },
528            )
529        })
530        .collect();
531    let task_count = execution_plan.task_count();
532    let FindingLinkExecutionPlan {
533        contexts, batches, ..
534    } = execution_plan;
535
536    tracing::info!(
537        "Prepared {} finding-link batch(es) covering {} finding-context task(s) for {} pending findings",
538        batches.len(),
539        task_count,
540        total_pending_findings
541    );
542
543    let mut failed_findings = HashSet::new();
544    let mut committed_findings: HashSet<i32> = HashSet::new();
545    if concurrency <= 1 {
546        for batch in batches {
547            let key = batch.context_key.clone();
548            let context = contexts.get(&key).ok_or_else(|| {
549                KgError::other(format!("Missing link context for key '{}'", key.label()))
550            })?;
551
552            let runner = FindingLinkBatchAgentRunner {
553                llm: llm.clone(),
554                batch: &batch,
555                context: context.as_ref(),
556                agent_options: agent_options.clone(),
557                max_response_attempts,
558            };
559            match runner.run().await {
560                Ok(results) => {
561                    // Mirror the concurrent branch: persist this batch's
562                    // per-finding sfl rows BEFORE the in-memory merge.
563                    // `commit_completed_findings` only writes the
564                    // `finding_link_status` row — it relies on this
565                    // call to have already landed the link edges, so
566                    // the sfl payload must hit DB here or it's lost.
567                    persist_batch_partials(db, &failed_findings, &committed_findings, &results)
568                        .await;
569                    if let Err(e) =
570                        merge_finding_link_batch_results(&mut aggregated_results, results)
571                    {
572                        mark_batch_findings_failed(&mut failed_findings, &batch);
573                        tracing::error!(
574                            "Failed to aggregate finding-link batch {} ({} finding-category task(s)): {}",
575                            &batch,
576                            batch.entries.len(),
577                            e
578                        );
579                    } else {
580                        commit_completed_findings(
581                            db,
582                            &mut aggregated_results,
583                            &mut failed_findings,
584                            &mut committed_findings,
585                            batch.entries.iter().map(|e| e.pending.finding_id),
586                        )
587                        .await;
588                    }
589                }
590                Err(e) => {
591                    mark_batch_findings_failed(&mut failed_findings, &batch);
592                    tracing::error!(
593                        "Failed to link finding batch {} ({} finding-category task(s)): {}",
594                        &batch,
595                        batch.entries.len(),
596                        e
597                    );
598                }
599            }
600        }
601    } else {
602        let contexts = Arc::new(contexts);
603        let (tx, rx) = async_channel::bounded::<FindingLinkBatch>(batches.len() + 1);
604        let (out_tx, mut out_rx) = mpsc::channel::<(
605            FindingLinkBatch,
606            Result<Vec<PersistedFindingLinkResult>>,
607        )>(concurrency + 1);
608        let mut handles = JoinSet::new();
609
610        for _ in 0..concurrency {
611            let rx = rx.clone();
612            let out = out_tx.clone();
613            let llm_clone = llm.clone();
614            let contexts = contexts.clone();
615            let worker_agent_options = agent_options.clone();
616
617            handles.spawn(async move {
618                while let Ok(batch) = rx.recv().await {
619                    let result = match contexts.get(&batch.context_key) {
620                        Some(context) => {
621                            let runner = FindingLinkBatchAgentRunner {
622                                llm: llm_clone.clone(),
623                                batch: &batch,
624                                context: context.as_ref(),
625                                agent_options: worker_agent_options.clone(),
626                                max_response_attempts,
627                            };
628                            runner.run().await
629                        }
630                        None => Err(KgError::other(format!(
631                            "Missing link context for key '{}'",
632                            batch.context_key.label()
633                        ))),
634                    };
635
636                    out.send((batch, result))
637                        .await
638                        .expect("can not send finding link batch result");
639                }
640
641                Ok::<_, KgError>(())
642            });
643        }
644        drop(out_tx);
645        drop(rx);
646
647        for batch in batches {
648            tx.send(batch)
649                .await
650                .expect("fail to send out finding link batch");
651        }
652        drop(tx);
653
654        while let Some(handle) = out_rx.recv().await {
655            let (batch, results) = handle;
656            match results {
657                Ok(results) => {
658                    // Durably persist this batch's per-finding decisions
659                    // before merging into the in-memory aggregate. A
660                    // kill / billing-cap abort here leaves the sfl rows
661                    // in place; the finding stays out of
662                    // `finding_link_status` until the last batch
663                    // completes, so downstream consumers don't see the
664                    // partial state. Without this step the entire run's
665                    // work would live in `aggregated_results` only and
666                    // evaporate on exit.
667                    persist_batch_partials(db, &failed_findings, &committed_findings, &results)
668                        .await;
669                    if let Err(e) =
670                        merge_finding_link_batch_results(&mut aggregated_results, results)
671                    {
672                        mark_batch_findings_failed(&mut failed_findings, &batch);
673                        tracing::error!(
674                            "Failed to aggregate finding-link batch {} ({} finding-category task(s)): {}",
675                            &batch,
676                            batch.entries.len(),
677                            e
678                        );
679                    } else {
680                        commit_completed_findings(
681                            db,
682                            &mut aggregated_results,
683                            &mut failed_findings,
684                            &mut committed_findings,
685                            batch.entries.iter().map(|e| e.pending.finding_id),
686                        )
687                        .await;
688                    }
689                }
690                Err(e) => {
691                    mark_batch_findings_failed(&mut failed_findings, &batch);
692                    tracing::error!(
693                        "Finding-link batch {} ({} finding-category task(s)) failed: {}",
694                        &batch,
695                        batch.entries.len(),
696                        e
697                    );
698                }
699            }
700        }
701
702        while let Some(handle) = handles.join_next().await {
703            match handle {
704                Ok(Ok(())) => {}
705                Ok(Err(e)) => {
706                    tracing::error!("Finding-link worker task failed: {}", e);
707                }
708                Err(e) => {
709                    tracing::error!("Finding-link worker task panicked: {}", e);
710                }
711            }
712        }
713    }
714
715    let final_results = finalize_finding_link_results(aggregated_results, &mut failed_findings);
716    for result in final_results {
717        if let Err(e) = db.write_finding_link_result(&result).await {
718            failed_findings.insert(result.finding_id);
719            tracing::error!(
720                "Failed to save finding links for #{}: {}",
721                result.finding_id,
722                e
723            );
724        }
725    }
726
727    if !failed_findings.is_empty() {
728        tracing::warn!(
729            "Finding linking failed for {} finding(s)",
730            failed_findings.len()
731        );
732    }
733
734    let findings_without_links = db.list_processed_findings_without_semantic_links().await?;
735    if !findings_without_links.is_empty() {
736        let preview = findings_without_links
737            .iter()
738            .take(20)
739            .map(|finding_id| format!("finding-{finding_id}"))
740            .collect::<Vec<_>>()
741            .join(", ");
742        let remainder = findings_without_links.len().saturating_sub(20);
743        let more = if remainder > 0 {
744            format!(" and {remainder} more")
745        } else {
746            String::new()
747        };
748        let retry_hint = if options.include_unlinked {
749            "These findings were included in this run because `--include-unlinked` was enabled, but they still ended up without semantic links."
750        } else {
751            "To retry just these findings, run `knowdit link --include-unlinked`."
752        };
753        tracing::warn!(
754            "{} processed finding(s) still have no semantic links after this run: {}{}. {}",
755            findings_without_links.len(),
756            preview,
757            more,
758            retry_hint,
759        );
760    }
761
762    tracing::info!("Finding linking complete.");
763    Ok(())
764}
765
766fn partition_finding_link_entries(
767    entries: Vec<FindingLinkBatchEntry>,
768    token_budget: usize,
769    max_count: usize,
770) -> Vec<Vec<FindingLinkBatchEntry>> {
771    let mut batches = Vec::new();
772    let mut current = Vec::new();
773    let mut current_tokens = 0usize;
774
775    for entry in entries {
776        let count_exceeded = current.len() >= max_count;
777        let tokens_exceeded = current_tokens + entry.token_count > token_budget;
778        if !current.is_empty() && (count_exceeded || tokens_exceeded) {
779            batches.push(current);
780            current = Vec::new();
781            current_tokens = 0;
782        }
783
784        current_tokens += entry.token_count;
785        current.push(entry);
786    }
787
788    if !current.is_empty() {
789        batches.push(current);
790    }
791
792    batches
793}
794
795fn partition_finding_link_candidate_groups(
796    groups: Vec<FindingLinkCandidateGroup>,
797    token_budget: usize,
798) -> Vec<Vec<FindingLinkCandidateEntry>> {
799    let mut batches = Vec::new();
800    let mut current = Vec::new();
801    let mut current_tokens = 0usize;
802
803    for group in groups {
804        if !current.is_empty() && current_tokens + group.token_count > token_budget {
805            batches.push(current);
806            current = Vec::new();
807            current_tokens = 0;
808        }
809
810        current_tokens += group.token_count;
811        current.extend(group.entries);
812    }
813
814    if !current.is_empty() {
815        batches.push(current);
816    }
817
818    batches
819}
820
821fn group_finding_link_candidate_entries(
822    entries: Vec<FindingLinkCandidateEntry>,
823) -> Result<Vec<FindingLinkCandidateGroup>> {
824    let mut grouped_entries = BTreeMap::<i32, Vec<FindingLinkCandidateEntry>>::new();
825    for entry in entries {
826        grouped_entries
827            .entry(entry.canonical_semantic_id)
828            .or_default()
829            .push(entry);
830    }
831
832    let mut groups = Vec::new();
833    for (canonical_semantic_id, mut group_entries) in grouped_entries {
834        group_entries.sort_by(|lhs, rhs| {
835            rhs.is_canonical
836                .cmp(&lhs.is_canonical)
837                .then_with(|| lhs.name.cmp(&rhs.name))
838                .then_with(|| lhs.candidate_id.cmp(&rhs.candidate_id))
839        });
840
841        let canonical_entry = group_entries
842            .first()
843            .filter(|entry| entry.is_canonical)
844            .ok_or_else(|| {
845                KgError::other(format!(
846                    "Missing canonical semantic sem-{} while building finding-link candidate group",
847                    canonical_semantic_id
848                ))
849            })?;
850
851        let token_count = group_entries.iter().map(|entry| entry.token_count).sum();
852        groups.push(FindingLinkCandidateGroup {
853            canonical_semantic_id,
854            category: canonical_entry.category,
855            name: canonical_entry.name.clone(),
856            entries: group_entries,
857            token_count,
858        });
859    }
860
861    groups.sort_by(|lhs, rhs| {
862        lhs.category
863            .as_str()
864            .cmp(rhs.category.as_str())
865            .then_with(|| lhs.name.cmp(&rhs.name))
866            .then_with(|| lhs.canonical_semantic_id.cmp(&rhs.canonical_semantic_id))
867    });
868
869    Ok(groups)
870}
871
872fn build_finding_link_batches(
873    model: &OpenAIModel,
874    pending_by_context: BTreeMap<FindingLinkContextKey, Vec<PendingFindingForLinking>>,
875    contexts: &BTreeMap<FindingLinkContextKey, std::sync::Arc<FindingLinkContext>>,
876    budgets: &FindingLinkBudgets,
877) -> Result<Vec<FindingLinkBatch>> {
878    let mut batches = Vec::new();
879
880    for (context_key, pending_findings) in pending_by_context {
881        let context = contexts.get(&context_key).ok_or_else(|| {
882            KgError::other(format!(
883                "Missing link context for key '{}'",
884                context_key.label()
885            ))
886        })?;
887        let finding_token_budget = budgets.finding_token_budget(model, context.as_ref())?;
888
889        let entries = pending_findings
890            .into_iter()
891            .map(|pending| {
892                let prompt_finding_id = prompt_finding_id(pending.finding_id);
893                let prompt_body = prompts::finding_link_finding_entry(
894                    &prompt_finding_id,
895                    &pending.categories,
896                    &pending.finding.title,
897                    pending.finding.severity,
898                    pending.finding.category,
899                    &pending.finding.subcategory,
900                    &pending.finding.root_cause,
901                    &pending.finding.description,
902                    &pending.finding.patterns,
903                    &pending.finding.exploits,
904                );
905                let token_count = count_tokens(model, &prompt_body);
906                FindingLinkBatchEntry {
907                    pending,
908                    prompt_finding_id,
909                    prompt_body,
910                    token_count,
911                }
912            })
913            .collect();
914
915        for batch_entries in partition_finding_link_entries(
916            entries,
917            finding_token_budget,
918            budgets.max_findings_per_batch,
919        ) {
920            let finding_token_count = batch_entries.iter().map(|entry| entry.token_count).sum();
921            if finding_token_count > finding_token_budget {
922                tracing::warn!(
923                    "Finding-link batch {} exceeds finding token budget {} with {} finding tokens; sending as singleton batch",
924                    finding_link_batch_entry_span(&batch_entries),
925                    finding_token_budget,
926                    finding_token_count
927                );
928            }
929
930            batches.push(FindingLinkBatch {
931                context_key: context_key.clone(),
932                entries: batch_entries,
933                finding_token_count,
934                finding_token_budget,
935                input_token_budget: budgets.input_token_budget,
936            });
937        }
938    }
939
940    Ok(batches)
941}
942
943async fn build_finding_link_contexts(
944    db: &HistoricalDatabase,
945    model: &OpenAIModel,
946    base_context_key: &FindingLinkContextKey,
947    budgets: &FindingLinkBudgets,
948) -> Result<Vec<(FindingLinkContextKey, FindingLinkContext)>> {
949    // The current linking strategy is "one-time, exhaustive": for every
950    // pending finding we present every canonical semantic across all
951    // categories. The empty key marks this all-canonical pass — there are no
952    // merged aliases in the candidate set, so the canonical-target indirection
953    // collapses to identity.
954    let candidates: Vec<SemanticLinkCandidate> = db
955        .all_canonical_semantic_link_candidates()
956        .await?
957        .into_iter()
958        .map(|node| SemanticLinkCandidate {
959            candidate_id: format!("sem-{}", node.id),
960            canonical_semantic_id: node.id,
961            is_canonical: true,
962            category: node.category,
963            name: node.name,
964            definition: node.definition,
965            description: node.description,
966        })
967        .collect();
968
969    let candidate_entries: Vec<FindingLinkCandidateEntry> = candidates
970        .into_iter()
971        .map(|candidate| {
972            let prompt_body = render_finding_link_candidate(&candidate);
973            FindingLinkCandidateEntry {
974                candidate_id: candidate.candidate_id,
975                canonical_semantic_id: candidate.canonical_semantic_id,
976                is_canonical: candidate.is_canonical,
977                category: candidate.category,
978                name: candidate.name,
979                token_count: count_tokens(model, &prompt_body),
980                prompt_body,
981            }
982        })
983        .collect();
984
985    if candidate_entries.is_empty() {
986        return Ok(vec![(
987            base_context_key.clone(),
988            FindingLinkContext::build(model, base_context_key, Vec::new()),
989        )]);
990    }
991
992    let semantic_token_budget =
993        budgets.semantic_token_budget(model, base_context_key.prompt_category())?;
994    let candidate_groups = group_finding_link_candidate_entries(candidate_entries)?;
995    let candidate_chunks =
996        partition_finding_link_candidate_groups(candidate_groups, semantic_token_budget);
997
998    Ok(candidate_chunks
999        .into_iter()
1000        .enumerate()
1001        .map(|(chunk_index, candidate_entries)| {
1002            let context_key = base_context_key.with_semantic_chunk(chunk_index);
1003            let context = FindingLinkContext::build(model, &context_key, candidate_entries);
1004            (context_key, context)
1005        })
1006        .collect())
1007}
1008
1009fn render_finding_link_candidate(candidate: &SemanticLinkCandidate) -> String {
1010    // After the all-canonical pass refactor, every candidate is canonical and
1011    // `is_canonical` is always true. Kept as one render path for simplicity.
1012    format!(
1013        "Candidate ID: {}\nCategory: {}\nName: {}\nDefinition: {}\nDescription: {}\n\n",
1014        candidate.candidate_id,
1015        candidate.category,
1016        candidate.name,
1017        candidate.definition,
1018        candidate.description
1019    )
1020}
1021
1022fn merge_finding_link_batch_results(
1023    aggregated_results: &mut HashMap<i32, AggregatedFindingLinkResult>,
1024    results: Vec<PersistedFindingLinkResult>,
1025) -> Result<()> {
1026    for result in results {
1027        let aggregate = aggregated_results
1028            .get_mut(&result.finding_id)
1029            .ok_or_else(|| {
1030                KgError::other(format!(
1031                    "Finding link aggregation is missing finding {}",
1032                    result.finding_id
1033                ))
1034            })?;
1035
1036        if aggregate.link_target_finding_id != result.link_target_finding_id {
1037            return Err(KgError::other(format!(
1038                "Finding {} resolved inconsistent link targets: {} vs {}",
1039                result.finding_id, aggregate.link_target_finding_id, result.link_target_finding_id
1040            )));
1041        }
1042
1043        aggregate.completed_contexts += 1;
1044        for link in result.semantic_links {
1045            // On collision keep the strongest strength; evidence sticks
1046            // with the strength that wins.
1047            aggregate
1048                .semantic_links
1049                .entry(link.semantic_id)
1050                .and_modify(|existing| {
1051                    if link.strength.rank() > existing.strength.rank() {
1052                        *existing = link.clone();
1053                    }
1054                })
1055                .or_insert(link);
1056        }
1057    }
1058
1059    Ok(())
1060}
1061
1062fn finalize_finding_link_results(
1063    aggregated_results: HashMap<i32, AggregatedFindingLinkResult>,
1064    failed_findings: &mut HashSet<i32>,
1065) -> Vec<PersistedFindingLinkResult> {
1066    let mut final_results = Vec::new();
1067
1068    for aggregate in aggregated_results
1069        .into_values()
1070        .sorted_by_key(|aggregate| aggregate.finding_id)
1071    {
1072        if failed_findings.contains(&aggregate.finding_id) {
1073            continue;
1074        }
1075
1076        if aggregate.completed_contexts != aggregate.expected_contexts {
1077            failed_findings.insert(aggregate.finding_id);
1078            tracing::error!(
1079                "Finding #{} only completed {}/{} category-specific link tasks",
1080                aggregate.finding_id,
1081                aggregate.completed_contexts,
1082                aggregate.expected_contexts
1083            );
1084            continue;
1085        }
1086
1087        final_results.push(PersistedFindingLinkResult {
1088            finding_id: aggregate.finding_id,
1089            link_target_finding_id: aggregate.link_target_finding_id,
1090            semantic_links: aggregate.semantic_links.into_values().collect(),
1091        });
1092    }
1093
1094    final_results
1095}
1096
1097fn mark_batch_findings_failed(failed_findings: &mut HashSet<i32>, batch: &FindingLinkBatch) {
1098    failed_findings.extend(batch.entries.iter().map(|entry| entry.pending.finding_id));
1099}
1100
1101/// Per-batch durable write: every finding's batch-level decision lands
1102/// in `semantic_finding_link` immediately. The finding stays out of
1103/// `finding_link_status` until the last batch completes, so downstream
1104/// consumers (mapper, spec-gen) don't see the partial state via
1105/// `load_knowledge_graph`. The upsert is strongest-strength-wins so
1106/// repeated batches over the same (semantic, finding) edge keep the
1107/// strongest tier seen — making resume idempotent: a re-run merges on
1108/// top of any rows the prior abort left behind.
1109async fn persist_batch_partials(
1110    db: &HistoricalDatabase,
1111    failed_findings: &HashSet<i32>,
1112    committed_findings: &HashSet<i32>,
1113    results: &[PersistedFindingLinkResult],
1114) {
1115    for result in results {
1116        if failed_findings.contains(&result.finding_id)
1117            || committed_findings.contains(&result.finding_id)
1118        {
1119            continue;
1120        }
1121        if let Err(e) = db.upsert_finding_link_partial(result).await {
1122            tracing::error!(
1123                "Failed to persist partial batch links for finding #{}: {}",
1124                result.finding_id,
1125                e
1126            );
1127            // Don't add to failed_findings here — the in-memory
1128            // aggregate still wants the merge so the finding can
1129            // either succeed via a later batch's write or fall
1130            // through to the final-flush diagnostic. The retry
1131            // dance lives at the batch-runner layer.
1132        }
1133    }
1134}
1135
1136/// Drains any aggregated findings whose every expected context has now been
1137/// merged and inserts their `finding_link_status` row. The actual link
1138/// rows were already durably written by `persist_batch_partials` after each
1139/// batch; this is the one transactional step that promotes them from
1140/// "partial" (sfl-only) to "queryable by downstream consumers" (sfl + fls).
1141async fn commit_completed_findings<I: IntoIterator<Item = i32>>(
1142    db: &HistoricalDatabase,
1143    aggregated_results: &mut HashMap<i32, AggregatedFindingLinkResult>,
1144    failed_findings: &mut HashSet<i32>,
1145    committed_findings: &mut HashSet<i32>,
1146    finding_ids: I,
1147) {
1148    let mut seen: HashSet<i32> = HashSet::new();
1149    for fid in finding_ids {
1150        if !seen.insert(fid) {
1151            continue;
1152        }
1153        if failed_findings.contains(&fid) || committed_findings.contains(&fid) {
1154            continue;
1155        }
1156        let ready = aggregated_results
1157            .get(&fid)
1158            .map(|agg| agg.expected_contexts > 0 && agg.completed_contexts >= agg.expected_contexts)
1159            .unwrap_or(false);
1160        if !ready {
1161            continue;
1162        }
1163        let agg = aggregated_results
1164            .remove(&fid)
1165            .expect("readiness check just confirmed presence");
1166        let link_count = agg.semantic_links.len();
1167        let finding_id = agg.finding_id;
1168        match db.mark_finding_link_complete(finding_id).await {
1169            Ok(()) => {
1170                committed_findings.insert(finding_id);
1171                tracing::debug!(
1172                    "Committed finding #{} ({} semantic link(s))",
1173                    finding_id,
1174                    link_count,
1175                );
1176            }
1177            Err(e) => {
1178                failed_findings.insert(finding_id);
1179                tracing::error!("Failed to mark finding #{} complete: {}", finding_id, e);
1180            }
1181        }
1182    }
1183}
1184
1185// ---------------------------------------------------------------------------
1186// Per-batch agent runner — replaces the old JSON+parse `link_pending_findings_batch`
1187// ---------------------------------------------------------------------------
1188
1189#[derive(Debug, Clone)]
1190#[tool(
1191    arguments = FindingLinkDecision,
1192    invoke = invoke,
1193    description = "Emit one link decision for a single finding in this prompt. Call once per `finding_id` shown under \"Findings To Link\". `semantic_evidence` is a list of {semantic_id, strength, why_finding_can_fire} entries — include candidates the finding is at least tangentially related to (Low or higher). Candidates with NO meaningful relation can be omitted entirely; silent omission is treated as 'no link'. Each entry's `semantic_id` MUST be a Candidate ID from the prompt; `strength` is one of `High` / `Medium` / `Low`; `why_finding_can_fire` cites the specific behaviour from the semantic's description that ties to this finding. High/Medium evidence must be at least 40 characters; Low can be as short as 15. The tool validates ids, strength parsing, and evidence length; rejected decisions can be re-emitted.",
1194    name = "emit_finding_link_decision",
1195)]
1196struct EmitFindingLinkDecisionTool {
1197    buffer: Arc<AgentChunkBuffer<FindingLinkDecision>>,
1198    /// Set of `finding-N` ids the prompt actually listed. Rejecting any
1199    /// other id at the tool boundary lets the agent see the error message
1200    /// and retry within its step loop instead of having the orchestrator
1201    /// silently fold the decision into "no-link" later.
1202    valid_finding_ids: Arc<HashSet<String>>,
1203    /// Set of `Candidate ID` values the prompt actually listed.
1204    valid_semantic_ids: Arc<HashSet<String>>,
1205    /// Per-attempt heartbeat label so progress logs can identify which
1206    /// batch+attempt is making forward progress. Set by the batch
1207    /// runner before installing the tool.
1208    label: String,
1209    /// Total findings this attempt was asked to emit decisions for.
1210    /// Used as the denominator in the progress heartbeat.
1211    target_finding_count: usize,
1212    /// Running count of accepted emit calls during this attempt.
1213    /// Atomic so the tool's `&self` invoke can mutate without locks.
1214    emit_count: Arc<std::sync::atomic::AtomicUsize>,
1215}
1216
1217/// Minimum byte length of `why_finding_can_fire` per strength tier. Low
1218/// rationales are short by design (they exist for audit, not downstream
1219/// consumption); High and Medium must be substantive enough to justify
1220/// the claim. See plan_link.md §6.1.
1221const LINK_EVIDENCE_MIN_HIGH_MEDIUM: usize = 40;
1222const LINK_EVIDENCE_MIN_LOW: usize = 15;
1223
1224impl EmitFindingLinkDecisionTool {
1225    async fn invoke(&self, args: FindingLinkDecision) -> std::result::Result<String, LLMYError> {
1226        if args.finding_id.trim().is_empty() {
1227            return Ok("error: `finding_id` must not be empty".to_string());
1228        }
1229        if !self.valid_finding_ids.contains(&args.finding_id) {
1230            return Ok(format!(
1231                "error: `finding_id` '{}' is not in this prompt's \"Findings To Link\" section. Re-emit using one of the IDs shown there exactly.",
1232                args.finding_id,
1233            ));
1234        }
1235
1236        // Only validate that referenced ids exist. Silent omission is
1237        // allowed and means "no link" for unreferenced candidates —
1238        // the full per-candidate coverage contract was unworkable at
1239        // the link agent's scale (1493 canonical sems per context vs
1240        // mapper's ~16); see plan_link.md §4 撤回 entry for the post-
1241        // mortem.
1242        let invalid_ids: Vec<&String> = args
1243            .semantic_evidence
1244            .iter()
1245            .map(|e| &e.semantic_id)
1246            .filter(|id| !self.valid_semantic_ids.contains(*id))
1247            .collect();
1248        if !invalid_ids.is_empty() {
1249            return Ok(format!(
1250                "error: `semantic_evidence` contains semantic_id values not in this prompt's \"Candidate Semantics\" list: {invalid_ids:?}. Re-emit this decision using only `Candidate ID` values shown above.",
1251            ));
1252        }
1253
1254        // Per-entry: strength parses + evidence length floor depending on tier.
1255        for evidence in &args.semantic_evidence {
1256            let Some(strength) =
1257                knowdit_kg_model::link_strength::LinkStrength::parse(&evidence.strength)
1258            else {
1259                return Ok(format!(
1260                    "error: evidence for semantic_id '{}' under finding '{}' has unrecognised `strength` value {:?}. Use one of `High`, `Medium`, or `Low` (case-insensitive).",
1261                    evidence.semantic_id, args.finding_id, evidence.strength,
1262                ));
1263            };
1264            let trimmed = evidence.why_finding_can_fire.trim();
1265            if trimmed.is_empty() {
1266                return Ok(format!(
1267                    "error: evidence for semantic_id '{}' under finding '{}' has empty `why_finding_can_fire`. Even Low strength needs a brief reason (>= {} chars) explaining why the link is tangential.",
1268                    evidence.semantic_id, args.finding_id, LINK_EVIDENCE_MIN_LOW,
1269                ));
1270            }
1271            let floor = match strength {
1272                knowdit_kg_model::link_strength::LinkStrength::High
1273                | knowdit_kg_model::link_strength::LinkStrength::Medium => {
1274                    LINK_EVIDENCE_MIN_HIGH_MEDIUM
1275                }
1276                knowdit_kg_model::link_strength::LinkStrength::Low => LINK_EVIDENCE_MIN_LOW,
1277            };
1278            if trimmed.len() < floor {
1279                return Ok(format!(
1280                    "error: evidence for semantic_id '{}' (strength={}) under finding '{}' is only {} chars; the {} tier requires at least {} chars. Cite a specific behaviour from the semantic's description that matches the finding's bug prerequisites.",
1281                    evidence.semantic_id,
1282                    strength,
1283                    args.finding_id,
1284                    trimmed.len(),
1285                    strength,
1286                    floor,
1287                ));
1288            }
1289        }
1290        let response = self
1291            .buffer
1292            .push_with_message(args, "finding link decision")
1293            .await;
1294        // Heartbeat: print one line per `LINK_PROGRESS_HEARTBEAT_EVERY`
1295        // accepted emits so a stuck attempt is visible as a flatline,
1296        // not as ambiguous silence between attempts.
1297        let n = self
1298            .emit_count
1299            .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
1300            + 1;
1301        if n.is_multiple_of(LINK_PROGRESS_HEARTBEAT_EVERY) || n == self.target_finding_count {
1302            tracing::info!(
1303                "{}: progress {}/{} decisions emitted",
1304                self.label,
1305                n,
1306                self.target_finding_count,
1307            );
1308        }
1309        Ok(response)
1310    }
1311}
1312
1313/// Emit one heartbeat log line for every N accepted emits. Tuned so a
1314/// 100-finding batch logs ~4 times per attempt; large batches still
1315/// stay readable without flooding.
1316const LINK_PROGRESS_HEARTBEAT_EVERY: usize = 25;
1317
1318#[derive(Debug, Clone)]
1319#[tool(
1320    arguments = FinalizeArgs,
1321    invoke = invoke,
1322    description = "Signal that every finding in this prompt has been processed via emit_finding_link_decision (with possibly empty semantic_ids when no candidate is related). Call exactly once, then stop.",
1323    name = "finalize_finding_link",
1324)]
1325struct FinalizeFindingLinkTool {
1326    buffer: Arc<AgentChunkBuffer<FindingLinkDecision>>,
1327}
1328
1329impl FinalizeFindingLinkTool {
1330    async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
1331        Ok(self
1332            .buffer
1333            .finalize_with_message(args.summary, "finding linking")
1334            .await)
1335    }
1336}
1337
1338/// Single-batch link runner. Drives one Agent per attempt (re-running with
1339/// the prompt restricted to still-missing findings between attempts), up to
1340/// `max_response_attempts` times. The final pass folds any remaining
1341/// uncovered findings into empty `PersistedFindingLinkResult`s and logs a
1342/// warning, matching the prior best-effort behaviour.
1343struct FindingLinkBatchAgentRunner<'a> {
1344    llm: LLM,
1345    batch: &'a FindingLinkBatch,
1346    context: &'a FindingLinkContext,
1347    agent_options: AgentRunOptions,
1348    max_response_attempts: usize,
1349}
1350
1351impl<'a> FindingLinkBatchAgentRunner<'a> {
1352    async fn run(self) -> Result<Vec<PersistedFindingLinkResult>> {
1353        if self.batch.entries.is_empty() {
1354            return Ok(Vec::new());
1355        }
1356        if self.context.candidate_map.is_empty() {
1357            tracing::info!(
1358                "No semantic candidates for finding batch {}, marking {} finding(s) as processed without links",
1359                self.batch,
1360                self.batch.entries.len(),
1361            );
1362            return Ok(self
1363                .batch
1364                .entries
1365                .iter()
1366                .map(|entry| PersistedFindingLinkResult {
1367                    finding_id: entry.pending.finding_id,
1368                    link_target_finding_id: entry.pending.link_target_finding_id,
1369                    semantic_links: Vec::new(),
1370                })
1371                .collect());
1372        }
1373
1374        tracing::info!(
1375            "Linking finding batch {} ({} finding(s), ~{} semantic tokens, ~{} finding tokens, finding_budget={}, input_budget={}, max_response_attempts={})",
1376            self.batch,
1377            self.batch.entries.len(),
1378            self.context.candidate_token_count,
1379            self.batch.finding_token_count,
1380            self.batch.finding_token_budget,
1381            self.batch.input_token_budget,
1382            self.max_response_attempts,
1383        );
1384
1385        // Step budget sanity: agent needs ~1 step per emit + 1 finalize
1386        // + reasoning headroom. If the step cap looks tight, warn so
1387        // the operator can raise --link-max-agent-steps before the run
1388        // burns through 3 attempts hitting the step ceiling.
1389        let max_agent_steps = self.agent_options.max_agent_steps;
1390        let step_headroom = max_agent_steps.saturating_sub(self.batch.entries.len() + 1);
1391        if step_headroom < 4 {
1392            tracing::warn!(
1393                "Finding batch {} may not fit the step budget: {} finding(s) need ≥{} steps (emit+finalize), but --link-max-agent-steps is {}. Expect premature finalize / no-link defaults. Lower --max-findings-per-batch or raise --link-max-agent-steps.",
1394                self.batch,
1395                self.batch.entries.len(),
1396                self.batch.entries.len() + 1,
1397                max_agent_steps,
1398            );
1399        }
1400
1401        let valid_semantic_ids: Arc<HashSet<String>> =
1402            Arc::new(self.context.candidate_map.keys().cloned().collect());
1403        // `collected[finding_id]` holds the most recent decision the agent
1404        // emitted for that finding across all attempts. Re-attempts only
1405        // ask the agent about findings still missing from this map.
1406        let mut collected: HashMap<i32, FindingLinkDecision> = HashMap::new();
1407
1408        for attempt in 1..=self.max_response_attempts {
1409            let still_missing: Vec<&FindingLinkBatchEntry> = self
1410                .batch
1411                .entries
1412                .iter()
1413                .filter(|entry| !collected.contains_key(&entry.pending.finding_id))
1414                .collect();
1415            if still_missing.is_empty() {
1416                break;
1417            }
1418
1419            let valid_finding_ids: Arc<HashSet<String>> = Arc::new(
1420                still_missing
1421                    .iter()
1422                    .map(|entry| entry.prompt_finding_id.clone())
1423                    .collect(),
1424            );
1425            let mut user_prompt = self.context.prompt_prefix.clone();
1426            for entry in &still_missing {
1427                user_prompt.push_str(&entry.prompt_body);
1428            }
1429            let cache_key = format!("{}-attempt{:02}", self.context.cache_key, attempt);
1430            let label = format!("finding-link-{}-attempt{:02}", self.batch, attempt);
1431            let target_finding_count = still_missing.len();
1432            let agent_decisions = self
1433                .run_one_attempt(
1434                    user_prompt,
1435                    cache_key,
1436                    label.clone(),
1437                    valid_finding_ids,
1438                    valid_semantic_ids.clone(),
1439                    attempt,
1440                    target_finding_count,
1441                )
1442                .await?;
1443            tracing::info!(
1444                "{}: agent emitted {} decision(s) for {} pending finding(s)",
1445                label,
1446                agent_decisions.len(),
1447                still_missing.len(),
1448            );
1449
1450            // Map prompt_finding_id ("finding-N") back to the numeric
1451            // finding_id and store the latest decision per finding.
1452            let entries_by_prompt_id: HashMap<&str, &FindingLinkBatchEntry> = self
1453                .batch
1454                .entries
1455                .iter()
1456                .map(|entry| (entry.prompt_finding_id.as_str(), entry))
1457                .collect();
1458            for decision in agent_decisions {
1459                if let Some(entry) = entries_by_prompt_id.get(decision.finding_id.as_str()) {
1460                    collected.insert(entry.pending.finding_id, decision);
1461                }
1462            }
1463
1464            // Coverage check after this attempt — log + retry if anything
1465            // is still missing and we haven't blown the cap yet.
1466            let leftover: Vec<&FindingLinkBatchEntry> = self
1467                .batch
1468                .entries
1469                .iter()
1470                .filter(|entry| !collected.contains_key(&entry.pending.finding_id))
1471                .collect();
1472            if leftover.is_empty() {
1473                break;
1474            }
1475            if attempt < self.max_response_attempts {
1476                tracing::warn!(
1477                    "Finding-link batch {} attempt {}/{}: {} finding(s) still missing decisions ({}); retrying with restricted prompt",
1478                    self.batch,
1479                    attempt,
1480                    self.max_response_attempts,
1481                    leftover.len(),
1482                    leftover
1483                        .iter()
1484                        .map(|entry| entry.prompt_finding_id.as_str())
1485                        .collect::<Vec<_>>()
1486                        .join(", "),
1487                );
1488            } else {
1489                tracing::warn!(
1490                    "Finding-link batch {} exhausted {} attempt(s) with {} finding(s) still missing ({}); treating them as no-link results",
1491                    self.batch,
1492                    self.max_response_attempts,
1493                    leftover.len(),
1494                    leftover
1495                        .iter()
1496                        .map(|entry| entry.prompt_finding_id.as_str())
1497                        .collect::<Vec<_>>()
1498                        .join(", "),
1499                );
1500            }
1501        }
1502
1503        // Materialize one PersistedFindingLinkResult per batch entry, in
1504        // input order. Findings without a recorded decision get an empty
1505        // semantic_ids list (already warned about above).
1506        Ok(self
1507            .batch
1508            .entries
1509            .iter()
1510            .map(|entry| {
1511                let semantic_links = collected
1512                    .remove(&entry.pending.finding_id)
1513                    .map(|d| {
1514                        Self::resolve_semantic_links(
1515                            d.semantic_evidence,
1516                            &self.context.candidate_map,
1517                        )
1518                    })
1519                    .unwrap_or_default();
1520                PersistedFindingLinkResult {
1521                    finding_id: entry.pending.finding_id,
1522                    link_target_finding_id: entry.pending.link_target_finding_id,
1523                    semantic_links,
1524                }
1525            })
1526            .collect())
1527    }
1528
1529    async fn run_one_attempt(
1530        &self,
1531        user_prompt: String,
1532        cache_key: String,
1533        label: String,
1534        valid_finding_ids: Arc<HashSet<String>>,
1535        valid_semantic_ids: Arc<HashSet<String>>,
1536        attempt: usize,
1537        target_finding_count: usize,
1538    ) -> Result<Vec<FindingLinkDecision>> {
1539        let buffer = AgentChunkBuffer::<FindingLinkDecision>::new();
1540        let mut tools = ToolBox::new();
1541        tools.add_tool(EmitFindingLinkDecisionTool {
1542            buffer: buffer.clone(),
1543            valid_finding_ids,
1544            valid_semantic_ids,
1545            label: label.clone(),
1546            target_finding_count,
1547            emit_count: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
1548        });
1549        tools.add_tool(FinalizeFindingLinkTool {
1550            buffer: buffer.clone(),
1551        });
1552        let runner = AgentChunkRunner {
1553            llm: self.llm.clone(),
1554            options: self
1555                .agent_options
1556                .scoped(&format!("link-{}-attempt{:02}", self.batch, attempt)),
1557            buffer: buffer.clone(),
1558            tools,
1559            system_prompt: prompts::GENERAL_ROLE_SYSTEM.to_string(),
1560            user_prompt,
1561            cache_key,
1562            label,
1563        };
1564        let _outcome = runner.run().await?;
1565        Ok(buffer.drain().await)
1566    }
1567
1568    /// Resolve the agent-emitted evidence list into
1569    /// [`PersistedSemanticLink`] entries carrying canonical semantic ids
1570    /// (via the context's candidate map) plus the per-link strength +
1571    /// evidence text. Unknown or unparseable strengths are silently
1572    /// dropped here (the tool handler already rejected them before this
1573    /// point; this is the defensive last line). On duplicate semantic
1574    /// ids the strongest emit wins.
1575    fn resolve_semantic_links(
1576        evidence: Vec<LinkEvidence>,
1577        candidate_map: &HashMap<String, i32>,
1578    ) -> Vec<PersistedSemanticLink> {
1579        let mut by_id: BTreeMap<i32, PersistedSemanticLink> = BTreeMap::new();
1580        for ev in evidence {
1581            let Some(target) = candidate_map.get(&ev.semantic_id).copied() else {
1582                continue;
1583            };
1584            let Some(strength) = knowdit_kg_model::link_strength::LinkStrength::parse(&ev.strength)
1585            else {
1586                continue;
1587            };
1588            let new_link = PersistedSemanticLink {
1589                semantic_id: target,
1590                strength,
1591                evidence: ev.why_finding_can_fire,
1592            };
1593            by_id
1594                .entry(target)
1595                .and_modify(|existing| {
1596                    if new_link.strength.rank() > existing.strength.rank() {
1597                        *existing = new_link.clone();
1598                    }
1599                })
1600                .or_insert(new_link);
1601        }
1602        by_id.into_values().collect()
1603    }
1604}
1605
1606fn prompt_finding_id(finding_id: i32) -> String {
1607    format!("finding-{}", finding_id)
1608}
1609
1610/// Retro-link pass for the incremental learn-new-project flow.
1611///
1612/// Inputs:
1613///   - `pending_semantic` queue (canonical semantics newly introduced by
1614///     the most recent project admission(s));
1615///   - `finding_link_status` set (findings that have already been globally
1616///     linked in a previous pass and won't go through `link_pending_findings`
1617///     again).
1618///
1619/// For each pre-linked finding the LLM is asked which of the pending
1620/// canonical semantics it should additionally link to. New rows are appended
1621/// to `semantic_finding_link`. After every finding has been processed the
1622/// `pending_semantic` queue is cleared. `finding_link_status` is NOT
1623/// touched (those findings were already there).
1624///
1625/// Bootstrap callers (no prior `finding_link_status` rows) can still call
1626/// this — the function will just clear the pending queue and return.
1627pub async fn retro_link_pending_semantics(
1628    db: &HistoricalDatabase,
1629    llm: &LLM,
1630    options: FindingLinkOptions,
1631) -> Result<()> {
1632    let pending_semantics = db.list_pending_canonical_semantics().await?;
1633    if pending_semantics.is_empty() {
1634        tracing::info!("Retro-link: pending_semantic queue is empty, nothing to do");
1635        return Ok(());
1636    }
1637    let already_linked_findings = db.list_findings_with_link_status().await?;
1638    if already_linked_findings.is_empty() {
1639        tracing::info!(
1640            "Retro-link: no findings in finding_link_status; clearing the {} queued pending semantic(s) without LLM work (bootstrap path)",
1641            pending_semantics.len()
1642        );
1643        db.clear_pending_semantic_queue().await?;
1644        return Ok(());
1645    }
1646
1647    let model = &llm.model;
1648    let max_response_attempts = options.max_response_attempts.max(1);
1649    let agent_options = AgentRunOptions::new(options.max_agent_steps);
1650    let budgets = FindingLinkBudgets::from_options(model, options);
1651
1652    // Render the (small, bounded) pending semantics as a single candidate
1653    // block — they all fit in one prompt.
1654    let candidate_entries: Vec<FindingLinkCandidateEntry> = pending_semantics
1655        .iter()
1656        .map(|node| {
1657            let candidate = SemanticLinkCandidate {
1658                candidate_id: format!("sem-{}", node.id),
1659                canonical_semantic_id: node.id,
1660                is_canonical: true,
1661                category: node.category,
1662                name: node.name.clone(),
1663                definition: node.definition.clone(),
1664                description: node.description.clone(),
1665            };
1666            let prompt_body = render_finding_link_candidate(&candidate);
1667            FindingLinkCandidateEntry {
1668                candidate_id: candidate.candidate_id,
1669                canonical_semantic_id: candidate.canonical_semantic_id,
1670                is_canonical: candidate.is_canonical,
1671                category: candidate.category,
1672                name: candidate.name,
1673                token_count: count_tokens(model, &prompt_body),
1674                prompt_body,
1675            }
1676        })
1677        .collect();
1678
1679    let context_key = FindingLinkContextKey::empty();
1680    let context = std::sync::Arc::new(FindingLinkContext::build(
1681        model,
1682        &context_key,
1683        candidate_entries,
1684    ));
1685    let finding_token_budget = budgets.finding_token_budget(model, context.as_ref())?;
1686
1687    // Build per-finding entries.
1688    let entries: Vec<FindingLinkBatchEntry> = already_linked_findings
1689        .into_iter()
1690        .map(|pending| {
1691            let prompt_finding_id = prompt_finding_id(pending.finding_id);
1692            let prompt_body = prompts::finding_link_finding_entry(
1693                &prompt_finding_id,
1694                &pending.categories,
1695                &pending.finding.title,
1696                pending.finding.severity,
1697                pending.finding.category,
1698                &pending.finding.subcategory,
1699                &pending.finding.root_cause,
1700                &pending.finding.description,
1701                &pending.finding.patterns,
1702                &pending.finding.exploits,
1703            );
1704            let token_count = count_tokens(model, &prompt_body);
1705            FindingLinkBatchEntry {
1706                pending,
1707                prompt_finding_id,
1708                prompt_body,
1709                token_count,
1710            }
1711        })
1712        .collect();
1713
1714    let total = entries.len();
1715    let batches = partition_finding_link_entries(
1716        entries,
1717        finding_token_budget,
1718        budgets.max_findings_per_batch,
1719    );
1720    tracing::info!(
1721        "Retro-link: {} pending canonical semantic(s) × {} pre-linked finding(s) → {} batch(es)",
1722        context.candidate_map.len(),
1723        total,
1724        batches.len()
1725    );
1726
1727    let mut total_inserted = 0usize;
1728    for (idx, batch_entries) in batches.into_iter().enumerate() {
1729        let finding_token_count = batch_entries.iter().map(|e| e.token_count).sum();
1730        let batch = FindingLinkBatch {
1731            context_key: context_key.clone(),
1732            entries: batch_entries,
1733            finding_token_count,
1734            finding_token_budget,
1735            input_token_budget: budgets.input_token_budget,
1736        };
1737        tracing::info!(
1738            "Retro-link batch {}: {} finding(s)",
1739            idx + 1,
1740            batch.entries.len()
1741        );
1742        let runner = FindingLinkBatchAgentRunner {
1743            llm: llm.clone(),
1744            batch: &batch,
1745            context: context.as_ref(),
1746            agent_options: agent_options.clone(),
1747            max_response_attempts,
1748        };
1749        let results = runner.run().await?;
1750
1751        // Translate to LinkEdge rows (carrying strength + evidence) and
1752        // write directly. Skip writing finding_link_status — these
1753        // findings are already there.
1754        let mut edges = Vec::new();
1755        for result in results {
1756            for link in result.semantic_links {
1757                edges.push(crate::db::LinkEdge {
1758                    semantic_node_id: link.semantic_id,
1759                    audit_finding_id: result.link_target_finding_id,
1760                    strength: link.strength,
1761                    evidence: link.evidence,
1762                });
1763            }
1764        }
1765        let inserted = db.append_semantic_finding_links(&edges).await?;
1766        total_inserted += inserted;
1767        tracing::info!(
1768            "Retro-link batch {} inserted {} new semantic_finding_link row(s)",
1769            idx + 1,
1770            inserted
1771        );
1772    }
1773
1774    let cleared = db.clear_pending_semantic_queue().await?;
1775    tracing::info!(
1776        "Retro-link complete: {} new link(s) total, cleared {} row(s) from pending_semantic queue",
1777        total_inserted,
1778        cleared
1779    );
1780    Ok(())
1781}
1782
1783fn finding_link_batch_entry_span(entries: &[FindingLinkBatchEntry]) -> String {
1784    let first = entries.first().map(|entry| entry.pending.finding_id);
1785    let last = entries.last().map(|entry| entry.pending.finding_id);
1786    match (first, last) {
1787        (Some(first), Some(last)) if first == last => format!("{}", first),
1788        (Some(first), Some(last)) => format!("{}-{}", first, last),
1789        _ => "empty".to_string(),
1790    }
1791}
1792
1793#[cfg(test)]
1794mod tests {
1795    use super::*;
1796    use crate::vulnerability::{FindingSeverity, VulnerabilityCategory};
1797
1798    fn sample_finding(id: i32) -> PendingFindingForLinking {
1799        PendingFindingForLinking {
1800            finding_id: id,
1801            link_target_finding_id: id,
1802            categories: vec![DeFiCategory::Dexes],
1803            finding: ExtractedFinding {
1804                title: format!("Finding {}", id),
1805                severity: FindingSeverity::Medium,
1806                category: VulnerabilityCategory::AccessControl,
1807                subcategory: "Missing Input Validation".to_string(),
1808                root_cause: "Unchecked parameters".to_string(),
1809                description: "A critical flow trusts malformed input.".to_string(),
1810                patterns: "Input not validated".to_string(),
1811                exploits: "Attacker passes malformed values".to_string(),
1812            },
1813        }
1814    }
1815
1816    fn sample_finding_with_categories(
1817        id: i32,
1818        categories: Vec<DeFiCategory>,
1819    ) -> PendingFindingForLinking {
1820        let mut finding = sample_finding(id);
1821        finding.categories = categories;
1822        finding
1823    }
1824
1825    fn sample_batch_entry(id: i32, token_count: usize) -> FindingLinkBatchEntry {
1826        FindingLinkBatchEntry {
1827            pending: sample_finding(id),
1828            prompt_finding_id: prompt_finding_id(id),
1829            prompt_body: format!("### finding-{}\n", id),
1830            token_count,
1831        }
1832    }
1833
1834    fn sample_context_key(category: DeFiCategory) -> FindingLinkContextKey {
1835        FindingLinkContextKey::for_category(category)
1836    }
1837
1838    #[test]
1839    fn partition_finding_link_entries_respects_budget_and_order() {
1840        let batches = partition_finding_link_entries(
1841            vec![
1842                sample_batch_entry(101, 40),
1843                sample_batch_entry(102, 20),
1844                sample_batch_entry(103, 50),
1845            ],
1846            60,
1847            usize::MAX,
1848        );
1849
1850        assert_eq!(batches.len(), 2);
1851        assert_eq!(
1852            batches[0]
1853                .iter()
1854                .map(|entry| entry.pending.finding_id)
1855                .collect::<Vec<_>>(),
1856            vec![101, 102]
1857        );
1858        assert_eq!(
1859            batches[1]
1860                .iter()
1861                .map(|entry| entry.pending.finding_id)
1862                .collect::<Vec<_>>(),
1863            vec![103]
1864        );
1865    }
1866
1867    #[test]
1868    fn partition_finding_link_entries_keeps_oversized_singleton_batch() {
1869        let batches = partition_finding_link_entries(
1870            vec![sample_batch_entry(201, 120), sample_batch_entry(202, 30)],
1871            100,
1872            usize::MAX,
1873        );
1874
1875        assert_eq!(batches.len(), 2);
1876        assert_eq!(batches[0].len(), 1);
1877        assert_eq!(batches[0][0].pending.finding_id, 201);
1878        assert_eq!(batches[1].len(), 1);
1879        assert_eq!(batches[1][0].pending.finding_id, 202);
1880    }
1881
1882    #[test]
1883    fn partition_finding_link_entries_honours_count_cap_under_token_budget() {
1884        // Tokens-wise everything fits; count cap should still split.
1885        let batches = partition_finding_link_entries(
1886            vec![
1887                sample_batch_entry(301, 10),
1888                sample_batch_entry(302, 10),
1889                sample_batch_entry(303, 10),
1890                sample_batch_entry(304, 10),
1891                sample_batch_entry(305, 10),
1892            ],
1893            10_000,
1894            2,
1895        );
1896
1897        assert_eq!(batches.len(), 3);
1898        assert_eq!(batches[0].len(), 2);
1899        assert_eq!(batches[1].len(), 2);
1900        assert_eq!(batches[2].len(), 1);
1901    }
1902
1903    #[test]
1904    fn resolve_semantic_links_dedupes_and_maps_through_candidate_map() {
1905        // The agent-side tool already validates ids; this helper maps the
1906        // surviving `Candidate ID` strings back to canonical i32 ids and
1907        // parses the per-link strength tier. On dedupe the strongest
1908        // strength wins (here all three are High).
1909        let candidate_map: HashMap<String, i32> = HashMap::from([
1910            ("sem-1".to_string(), 1),
1911            ("sem-merged".to_string(), 1),
1912            ("sem-2".to_string(), 2),
1913        ]);
1914        let resolved = FindingLinkBatchAgentRunner::resolve_semantic_links(
1915            vec![
1916                LinkEvidence {
1917                    semantic_id: "sem-1".to_string(),
1918                    strength: "High".to_string(),
1919                    why_finding_can_fire: "shared behaviour 1".to_string(),
1920                },
1921                LinkEvidence {
1922                    semantic_id: "sem-merged".to_string(),
1923                    strength: "High".to_string(),
1924                    why_finding_can_fire: "shared behaviour merged".to_string(),
1925                },
1926                LinkEvidence {
1927                    semantic_id: "sem-2".to_string(),
1928                    strength: "High".to_string(),
1929                    why_finding_can_fire: "shared behaviour 2".to_string(),
1930                },
1931            ],
1932            &candidate_map,
1933        );
1934        let resolved_ids: Vec<i32> = resolved.iter().map(|link| link.semantic_id).collect();
1935        assert_eq!(resolved_ids, vec![1, 2]);
1936    }
1937
1938    #[test]
1939    fn merge_finding_link_batch_results_aggregates_multiple_categories() {
1940        use knowdit_kg_model::link_strength::LinkStrength;
1941
1942        fn link(sid: i32, strength: LinkStrength) -> PersistedSemanticLink {
1943            PersistedSemanticLink {
1944                semantic_id: sid,
1945                strength,
1946                evidence: format!("evidence for sem-{sid}"),
1947            }
1948        }
1949
1950        let mut aggregated_results = HashMap::from([(
1951            501,
1952            AggregatedFindingLinkResult {
1953                finding_id: 501,
1954                link_target_finding_id: 777,
1955                expected_contexts: 2,
1956                completed_contexts: 0,
1957                semantic_links: BTreeMap::new(),
1958            },
1959        )]);
1960
1961        merge_finding_link_batch_results(
1962            &mut aggregated_results,
1963            vec![PersistedFindingLinkResult {
1964                finding_id: 501,
1965                link_target_finding_id: 777,
1966                semantic_links: vec![link(3, LinkStrength::Medium), link(1, LinkStrength::Medium)],
1967            }],
1968        )
1969        .expect("first category result should merge");
1970
1971        merge_finding_link_batch_results(
1972            &mut aggregated_results,
1973            vec![PersistedFindingLinkResult {
1974                finding_id: 501,
1975                link_target_finding_id: 777,
1976                semantic_links: vec![link(2, LinkStrength::Medium), link(3, LinkStrength::Medium)],
1977            }],
1978        )
1979        .expect("second category result should merge");
1980
1981        let mut failed_findings = HashSet::new();
1982        let results = finalize_finding_link_results(aggregated_results, &mut failed_findings);
1983
1984        assert!(failed_findings.is_empty());
1985        assert_eq!(results.len(), 1);
1986        assert_eq!(results[0].finding_id, 501);
1987        assert_eq!(results[0].link_target_finding_id, 777);
1988        let resolved_ids: Vec<i32> = results[0]
1989            .semantic_links
1990            .iter()
1991            .map(|l| l.semantic_id)
1992            .collect();
1993        assert_eq!(resolved_ids, vec![1, 2, 3]);
1994    }
1995
1996    #[test]
1997    fn finalize_finding_link_results_rejects_missing_category_runs() {
1998        let aggregated_results = HashMap::from([(
1999            601,
2000            AggregatedFindingLinkResult {
2001                finding_id: 601,
2002                link_target_finding_id: 601,
2003                expected_contexts: 3,
2004                completed_contexts: 2,
2005                semantic_links: BTreeMap::new(),
2006            },
2007        )]);
2008
2009        let mut failed_findings = HashSet::new();
2010        let results = finalize_finding_link_results(aggregated_results, &mut failed_findings);
2011
2012        assert!(results.is_empty());
2013        assert!(failed_findings.contains(&601));
2014    }
2015
2016    #[test]
2017    fn finding_link_context_plan_collapses_to_single_all_canonical_key() {
2018        // After the all-canonical refactor every pending finding routes through
2019        // the same single empty-categories context key, regardless of the
2020        // originating project's categories.
2021        let finding = sample_finding_with_categories(
2022            701,
2023            vec![
2024                DeFiCategory::Yield,
2025                DeFiCategory::Dexes,
2026                DeFiCategory::Yield,
2027            ],
2028        );
2029
2030        let context_keys = FindingLinkContextKey::from_project_categories(&finding.categories);
2031        assert_eq!(context_keys.len(), 1);
2032        assert!(context_keys[0].categories.is_empty());
2033    }
2034
2035    #[test]
2036    fn partition_finding_link_candidate_groups_keeps_aliases_with_canonical() {
2037        let batches = partition_finding_link_candidate_groups(
2038            vec![
2039                FindingLinkCandidateGroup {
2040                    canonical_semantic_id: 1,
2041                    category: DeFiCategory::Dexes,
2042                    name: "Canonical 1".to_string(),
2043                    token_count: 70,
2044                    entries: vec![
2045                        FindingLinkCandidateEntry {
2046                            candidate_id: "sem-1".to_string(),
2047                            canonical_semantic_id: 1,
2048                            is_canonical: true,
2049                            category: DeFiCategory::Dexes,
2050                            name: "Canonical 1".to_string(),
2051                            prompt_body: "candidate-1".to_string(),
2052                            token_count: 40,
2053                        },
2054                        FindingLinkCandidateEntry {
2055                            candidate_id: "sem-101".to_string(),
2056                            canonical_semantic_id: 1,
2057                            is_canonical: false,
2058                            category: DeFiCategory::Dexes,
2059                            name: "Alias 101".to_string(),
2060                            prompt_body: "candidate-101".to_string(),
2061                            token_count: 30,
2062                        },
2063                    ],
2064                },
2065                FindingLinkCandidateGroup {
2066                    canonical_semantic_id: 2,
2067                    category: DeFiCategory::Dexes,
2068                    name: "Canonical 2".to_string(),
2069                    token_count: 20,
2070                    entries: vec![FindingLinkCandidateEntry {
2071                        candidate_id: "sem-2".to_string(),
2072                        canonical_semantic_id: 2,
2073                        is_canonical: true,
2074                        category: DeFiCategory::Dexes,
2075                        name: "Canonical 2".to_string(),
2076                        prompt_body: "candidate-2".to_string(),
2077                        token_count: 20,
2078                    }],
2079                },
2080            ],
2081            60,
2082        );
2083
2084        assert_eq!(batches.len(), 2);
2085        assert_eq!(
2086            batches[0]
2087                .iter()
2088                .map(|entry| entry.candidate_id.as_str())
2089                .collect::<Vec<_>>(),
2090            vec!["sem-1", "sem-101"]
2091        );
2092        assert_eq!(
2093            batches[1]
2094                .iter()
2095                .map(|entry| entry.candidate_id.as_str())
2096                .collect::<Vec<_>>(),
2097            vec!["sem-2"]
2098        );
2099    }
2100
2101    #[test]
2102    fn group_finding_link_candidate_entries_requires_canonical_entry() {
2103        let error = group_finding_link_candidate_entries(vec![FindingLinkCandidateEntry {
2104            candidate_id: "sem-101".to_string(),
2105            canonical_semantic_id: 1,
2106            is_canonical: false,
2107            category: DeFiCategory::Dexes,
2108            name: "Alias 101".to_string(),
2109            prompt_body: "candidate-101".to_string(),
2110            token_count: 30,
2111        }])
2112        .expect_err("candidate groups should require the canonical semantic entry");
2113
2114        assert!(
2115            error.to_string().contains(
2116                "Missing canonical semantic sem-1 while building finding-link candidate group"
2117            ),
2118            "unexpected error: {error}"
2119        );
2120    }
2121}