1use std::collections::HashSet;
29use std::sync::Arc;
30
31use llmy::agent::tool::ToolBox;
32use llmy::agent::{LLMYError, tool};
33use llmy::client::client::LLM;
34use llmy::client::model::OpenAIModel;
35use schemars::JsonSchema;
36use serde::Deserialize;
37use tokio::task::JoinSet;
38
39use knowdit_kg_model::category::DeFiCategory;
40use knowdit_kg_model::db::{audit_finding, finding_category, semantic_node};
41use knowdit_kg_model::{ExtractedFinding, ExtractedSemantic};
42
43use crate::agent_runner::{AgentChunkBuffer, AgentChunkRunner, AgentRunOptions, AgentRunOutcome};
44use crate::error::{KgError, Result};
45use crate::vulnerability::validate_taxonomy_pair;
46
47#[derive(Debug, Clone, Deserialize, JsonSchema)]
55pub struct FinalizeArgs {
56 #[serde(default)]
60 pub summary: Option<String>,
61}
62
63#[derive(Debug, Clone, Deserialize, JsonSchema)]
72pub struct CategorizationRecord {
73 pub reasoning: String,
75 pub project_name: String,
77 pub categories: Vec<DeFiCategory>,
80}
81
82#[derive(Debug, Clone)]
83#[tool(
84 arguments = CategorizationRecord,
85 invoke = invoke,
86 description = "Record the project's DeFi categories. Call exactly once per project, before invoking finalize_categorization.",
87 name = "set_project_categories",
88)]
89struct SetProjectCategoriesTool {
90 buffer: Arc<AgentChunkBuffer<CategorizationRecord>>,
91}
92
93impl SetProjectCategoriesTool {
94 async fn invoke(&self, args: CategorizationRecord) -> std::result::Result<String, LLMYError> {
95 if args.categories.is_empty() {
96 return Ok(
97 "error: categories must be a non-empty list; pick at least one category"
98 .to_string(),
99 );
100 }
101 Ok(self.buffer.push_with_message(args, "categorization").await)
102 }
103}
104
105#[derive(Debug, Clone)]
106#[tool(
107 arguments = FinalizeArgs,
108 invoke = invoke,
109 description = "Signal that categorization is complete. Call exactly once, after set_project_categories. Stop emitting tool calls afterwards.",
110 name = "finalize_categorization",
111)]
112struct FinalizeCategorizationTool {
113 buffer: Arc<AgentChunkBuffer<CategorizationRecord>>,
114}
115
116impl FinalizeCategorizationTool {
117 async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
118 if self.buffer.len().await == 0 {
119 return Ok(
120 "error: cannot finalize before calling set_project_categories at least once"
121 .to_string(),
122 );
123 }
124 Ok(self
125 .buffer
126 .finalize_with_message(args.summary, "categorization")
127 .await)
128 }
129}
130
131pub struct CategorizeRunner {
135 pub llm: LLM,
136 pub options: AgentRunOptions,
137 pub system_prompt: String,
138 pub user_prompt: String,
139 pub cache_key: String,
140 pub label: String,
141}
142
143impl CategorizeRunner {
144 pub async fn run(self) -> Result<CategorizationRecord> {
145 let buffer = AgentChunkBuffer::<CategorizationRecord>::new();
146 let label = self.label.clone();
147
148 let mut tools = ToolBox::new();
149 tools.add_tool(SetProjectCategoriesTool {
150 buffer: buffer.clone(),
151 });
152 tools.add_tool(FinalizeCategorizationTool {
153 buffer: buffer.clone(),
154 });
155
156 let runner = AgentChunkRunner {
157 llm: self.llm,
158 options: self.options,
159 buffer: buffer.clone(),
160 tools,
161 system_prompt: self.system_prompt,
162 user_prompt: self.user_prompt,
163 cache_key: self.cache_key,
164 label: self.label,
165 };
166
167 let _outcome: AgentRunOutcome = runner.run().await?;
168
169 let mut records = buffer.drain().await;
170 if records.is_empty() {
171 return Err(KgError::other(format!(
172 "{label} agent finished without recording any categorization",
173 )));
174 }
175 Ok(records.remove(records.len() - 1))
176 }
177}
178
179#[derive(Debug, Clone)]
184#[tool(
185 arguments = ExtractedSemantic,
186 invoke = invoke,
187 description = "Emit one DeFi Semantic extracted from the chunk. Call once per distinct semantic; cluster all callable functions that share the same business meaning under one call. `functions` must contain at least one entry.",
188 name = "emit_semantic",
189)]
190struct EmitSemanticTool {
191 buffer: Arc<AgentChunkBuffer<ExtractedSemantic>>,
192}
193
194impl EmitSemanticTool {
195 async fn invoke(&self, args: ExtractedSemantic) -> std::result::Result<String, LLMYError> {
196 if args.name.trim().is_empty() {
197 return Ok("error: `name` must not be empty".to_string());
198 }
199 if args.functions.is_empty() {
200 return Ok(
201 "error: `functions` must contain at least one entry pointing at the source"
202 .to_string(),
203 );
204 }
205 Ok(self.buffer.push_with_message(args, "semantic").await)
206 }
207}
208
209#[derive(Debug, Clone)]
210#[tool(
211 arguments = FinalizeArgs,
212 invoke = invoke,
213 description = "Signal that every DeFi semantic visible in this chunk has been emitted via emit_semantic. Call exactly once, then stop.",
214 name = "finalize_semantic_extraction",
215)]
216struct FinalizeSemanticExtractTool {
217 buffer: Arc<AgentChunkBuffer<ExtractedSemantic>>,
218}
219
220impl FinalizeSemanticExtractTool {
221 async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
222 Ok(self
223 .buffer
224 .finalize_with_message(args.summary, "semantic extraction")
225 .await)
226 }
227}
228
229pub struct SemanticChunkExtractor {
230 pub llm: LLM,
231 pub options: AgentRunOptions,
232 pub system_prompt: String,
233 pub user_prompt: String,
234 pub cache_key: String,
235 pub label: String,
236}
237
238impl SemanticChunkExtractor {
239 pub async fn run(self) -> Result<Vec<ExtractedSemantic>> {
240 let buffer = AgentChunkBuffer::<ExtractedSemantic>::new();
241
242 let mut tools = ToolBox::new();
243 tools.add_tool(EmitSemanticTool {
244 buffer: buffer.clone(),
245 });
246 tools.add_tool(FinalizeSemanticExtractTool {
247 buffer: buffer.clone(),
248 });
249
250 let runner = AgentChunkRunner {
251 llm: self.llm,
252 options: self.options,
253 buffer: buffer.clone(),
254 tools,
255 system_prompt: self.system_prompt,
256 user_prompt: self.user_prompt,
257 cache_key: self.cache_key,
258 label: self.label,
259 };
260 let _outcome = runner.run().await?;
261 Ok(buffer.drain().await)
262 }
263}
264
265#[derive(Debug, Clone)]
270#[tool(
271 arguments = ExtractedFinding,
272 invoke = invoke,
273 description = "Emit one vulnerability finding extracted from the report chunk. Decompose a multi-mechanism finding into multiple calls (one per independent mechanism); deduplicate repeated mentions of the same finding.",
274 name = "emit_finding",
275)]
276struct EmitFindingTool {
277 buffer: Arc<AgentChunkBuffer<ExtractedFinding>>,
278}
279
280impl EmitFindingTool {
281 async fn invoke(&self, args: ExtractedFinding) -> std::result::Result<String, LLMYError> {
282 if args.title.trim().is_empty() {
283 return Ok("error: `title` must not be empty".to_string());
284 }
285 if let Some(hint) = validate_taxonomy_pair(args.category, &args.subcategory) {
286 return Ok(format!("error: {hint}"));
287 }
288 Ok(self.buffer.push_with_message(args, "finding").await)
289 }
290}
291
292#[derive(Debug, Clone)]
293#[tool(
294 arguments = FinalizeArgs,
295 invoke = invoke,
296 description = "Signal that every distinct vulnerability finding visible in this chunk has been emitted. Call exactly once, then stop.",
297 name = "finalize_finding_extraction",
298)]
299struct FinalizeFindingExtractTool {
300 buffer: Arc<AgentChunkBuffer<ExtractedFinding>>,
301}
302
303impl FinalizeFindingExtractTool {
304 async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
305 Ok(self
306 .buffer
307 .finalize_with_message(args.summary, "finding extraction")
308 .await)
309 }
310}
311
312pub struct FindingChunkExtractor {
313 pub llm: LLM,
314 pub options: AgentRunOptions,
315 pub system_prompt: String,
316 pub user_prompt: String,
317 pub cache_key: String,
318 pub label: String,
319}
320
321impl FindingChunkExtractor {
322 pub async fn run(self) -> Result<Vec<ExtractedFinding>> {
323 let buffer = AgentChunkBuffer::<ExtractedFinding>::new();
324
325 let mut tools = ToolBox::new();
326 tools.add_tool(EmitFindingTool {
327 buffer: buffer.clone(),
328 });
329 tools.add_tool(FinalizeFindingExtractTool {
330 buffer: buffer.clone(),
331 });
332
333 let runner = AgentChunkRunner {
334 llm: self.llm,
335 options: self.options,
336 buffer: buffer.clone(),
337 tools,
338 system_prompt: self.system_prompt,
339 user_prompt: self.user_prompt,
340 cache_key: self.cache_key,
341 label: self.label,
342 };
343 let _outcome = runner.run().await?;
344 Ok(buffer.drain().await)
345 }
346}
347
348#[derive(Debug, Clone, Copy)]
356pub struct MergeChunkingOptions {
357 pub chunk_candidate_token_budget: usize,
362 pub concurrency: usize,
365}
366
367impl Default for MergeChunkingOptions {
368 fn default() -> Self {
369 Self {
370 chunk_candidate_token_budget: 50_000,
371 concurrency: 1,
372 }
373 }
374}
375
376impl MergeChunkingOptions {
377 pub fn new(chunk_candidate_token_budget: usize, concurrency: usize) -> Self {
378 Self {
379 chunk_candidate_token_budget: chunk_candidate_token_budget.max(1024),
380 concurrency: concurrency.max(1),
381 }
382 }
383}
384
385#[derive(Debug, Clone)]
390pub struct CanonicalWithChildren<T> {
391 pub canonical: T,
392 pub raw_children: Vec<T>,
395}
396
397#[derive(Debug, Clone, Deserialize, JsonSchema)]
407pub struct SemanticMergeDecision {
408 pub reason: String,
410 pub new_semantic_name: String,
414 pub merge_target_ids: Vec<i32>,
418 pub appended_description: Option<String>,
426}
427
428#[derive(Debug, Clone)]
429#[tool(
430 arguments = SemanticMergeDecision,
431 invoke = invoke,
432 description = "Emit one merge decision for a new raw semantic. Call once per `new_semantic_name` shown in the prompt. `merge_target_ids` may list multiple canonicals from THIS chunk; an empty list means NEW (no match in this chunk). The canonical's `name` and `definition` are stable identity and are NEVER modified by this decision — only `appended_description` (optional, when merging) is appended to the canonical's existing description. The tool validates `new_semantic_name` against the prompt's new-semantics list and `merge_target_ids` against this chunk's candidate IDs; invalid decisions are rejected with an error message so you can re-emit a corrected one.",
433 name = "emit_semantic_merge_decision",
434)]
435struct EmitSemanticMergeDecisionTool {
436 buffer: Arc<AgentChunkBuffer<SemanticMergeDecision>>,
437 valid_names: Arc<HashSet<String>>,
443 valid_target_ids: Arc<HashSet<i32>>,
446}
447
448impl EmitSemanticMergeDecisionTool {
449 async fn invoke(&self, args: SemanticMergeDecision) -> std::result::Result<String, LLMYError> {
450 if args.new_semantic_name.trim().is_empty() {
451 return Ok("error: `new_semantic_name` must not be empty".to_string());
452 }
453 if !self
454 .valid_names
455 .contains(&args.new_semantic_name.to_lowercase())
456 {
457 return Ok(format!(
458 "error: `new_semantic_name` '{}' does not match any name in this prompt's new-semantics list. Re-emit using one of the names shown there (case-insensitive).",
459 args.new_semantic_name,
460 ));
461 }
462 let invalid: Vec<i32> = args
463 .merge_target_ids
464 .iter()
465 .filter(|id| !self.valid_target_ids.contains(id))
466 .copied()
467 .collect();
468 if !invalid.is_empty() {
469 return Ok(format!(
470 "error: `merge_target_ids` contains IDs not in this chunk's candidate list: {invalid:?}. Re-emit this decision using only canonical IDs that appear in the candidate list above. If '{}' has no match in this chunk, emit `merge_target_ids: []`.",
471 args.new_semantic_name,
472 ));
473 }
474 Ok(self
475 .buffer
476 .push_with_message(args, "semantic merge decision")
477 .await)
478 }
479}
480
481#[derive(Debug, Clone)]
482#[tool(
483 arguments = FinalizeArgs,
484 invoke = invoke,
485 description = "Signal that every newly-extracted semantic listed in the prompt has received a merge decision (with possibly empty target_ids when no match in this chunk). Call exactly once, then stop.",
486 name = "finalize_semantic_merge",
487)]
488struct FinalizeSemanticMergeTool {
489 buffer: Arc<AgentChunkBuffer<SemanticMergeDecision>>,
490}
491
492impl FinalizeSemanticMergeTool {
493 async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
494 Ok(self
495 .buffer
496 .finalize_with_message(args.summary, "semantic merge")
497 .await)
498 }
499}
500
501pub struct SemanticMergeChunkRunner {
508 pub llm: LLM,
509 pub options: AgentRunOptions,
510 pub system_prompt: String,
511 pub user_prompt: String,
512 pub cache_key: String,
513 pub label: String,
514 pub valid_names: Arc<HashSet<String>>,
515 pub valid_target_ids: Arc<HashSet<i32>>,
516}
517
518impl SemanticMergeChunkRunner {
519 pub async fn run(self) -> Result<Vec<SemanticMergeDecision>> {
520 let buffer = AgentChunkBuffer::<SemanticMergeDecision>::new();
521
522 let mut tools = ToolBox::new();
523 tools.add_tool(EmitSemanticMergeDecisionTool {
524 buffer: buffer.clone(),
525 valid_names: self.valid_names,
526 valid_target_ids: self.valid_target_ids,
527 });
528 tools.add_tool(FinalizeSemanticMergeTool {
529 buffer: buffer.clone(),
530 });
531
532 let runner = AgentChunkRunner {
533 llm: self.llm,
534 options: self.options,
535 buffer: buffer.clone(),
536 tools,
537 system_prompt: self.system_prompt,
538 user_prompt: self.user_prompt,
539 cache_key: self.cache_key,
540 label: self.label,
541 };
542 let _outcome = runner.run().await?;
543 Ok(buffer.drain().await)
544 }
545}
546
547pub struct SemanticMerger {
552 pub new_semantics: Vec<ExtractedSemantic>,
555 pub candidates: Vec<CanonicalWithChildren<semantic_node::Model>>,
558 pub llm: LLM,
560 pub agent_options: AgentRunOptions,
562 pub chunking: MergeChunkingOptions,
563 pub cache_key_root: String,
565 pub debug_key_root: String,
567 pub label_root: String,
569}
570
571#[derive(Debug, Clone)]
573pub struct AggregatedSemanticMergeDecision {
574 pub new_semantic_name: String,
575 pub merge_target_ids: Vec<i32>,
578 pub appended_description: Option<String>,
581}
582
583impl SemanticMerger {
584 pub async fn run(mut self) -> Result<Vec<AggregatedSemanticMergeDecision>> {
585 if self.new_semantics.is_empty() {
586 return Ok(Vec::new());
587 }
588 let mut aggregated: Vec<AggregatedSemanticMergeDecision> = self
589 .new_semantics
590 .iter()
591 .map(|s| AggregatedSemanticMergeDecision {
592 new_semantic_name: s.name.clone(),
593 merge_target_ids: Vec::new(),
594 appended_description: None,
595 })
596 .collect();
597 let candidates = std::mem::take(&mut self.candidates);
598 if candidates.is_empty() {
599 return Ok(aggregated);
600 }
601 let chunks = self.pack_into_chunks(candidates);
602 tracing::info!(
603 "{}: {} candidate chunks (concurrency={}, candidate_token_budget={})",
604 self.label_root,
605 chunks.len(),
606 self.chunking.concurrency,
607 self.chunking.chunk_candidate_token_budget,
608 );
609 let valid_names: Arc<HashSet<String>> = Arc::new(
614 self.new_semantics
615 .iter()
616 .map(|s| s.name.to_lowercase())
617 .collect(),
618 );
619 let by_name = SemanticMergeAggregator::name_index(&self.new_semantics);
620 let known_targets = SemanticMergeAggregator::known_canonical_ids(&chunks);
621 let new_semantics_block = Self::render_new_block(&self.new_semantics);
622 let chunk_results = self
623 .dispatch_chunks(chunks, new_semantics_block, valid_names)
624 .await?;
625 SemanticMergeAggregator::merge(&mut aggregated, chunk_results, &by_name, &known_targets);
626 Ok(aggregated)
627 }
628
629 fn pack_into_chunks(
630 &self,
631 candidates: Vec<CanonicalWithChildren<semantic_node::Model>>,
632 ) -> Vec<Vec<CanonicalWithChildren<semantic_node::Model>>> {
633 let model = self.llm.model.clone();
634 Self::pack_chunks(
635 &model,
636 candidates,
637 self.chunking.chunk_candidate_token_budget,
638 )
639 }
640
641 fn build_chunk_runner(
642 &self,
643 idx: usize,
644 total: usize,
645 chunk: &[CanonicalWithChildren<semantic_node::Model>],
646 new_semantics_block: &str,
647 valid_names: Arc<HashSet<String>>,
648 ) -> SemanticMergeChunkRunner {
649 let user_prompt = crate::prompts::merge_semantics_user_message(
650 &Self::render_candidate_chunk(chunk),
651 new_semantics_block,
652 );
653 let cache_key = format!("{}-chunk{idx:04}", self.cache_key_root);
654 let debug_scope = format!("{}-chunk{idx:04}", self.debug_key_root);
655 let label = format!("{}-chunk{idx:04}of{total:04}", self.label_root);
656 let valid_target_ids: Arc<HashSet<i32>> =
657 Arc::new(chunk.iter().map(|c| c.canonical.id).collect());
658 SemanticMergeChunkRunner {
659 llm: self.llm.clone(),
660 options: self.agent_options.scoped(&debug_scope),
661 system_prompt: crate::prompts::GENERAL_ROLE_SYSTEM.to_string(),
662 user_prompt,
663 cache_key,
664 label,
665 valid_names,
666 valid_target_ids,
667 }
668 }
669
670 async fn dispatch_chunks(
671 &self,
672 chunks: Vec<Vec<CanonicalWithChildren<semantic_node::Model>>>,
673 new_semantics_block: String,
674 valid_names: Arc<HashSet<String>>,
675 ) -> Result<Vec<Vec<SemanticMergeDecision>>> {
676 let total = chunks.len();
677 let concurrency = self.chunking.concurrency.max(1);
678 if concurrency <= 1 || total <= 1 {
679 self.dispatch_sequential(chunks, new_semantics_block, valid_names)
680 .await
681 } else {
682 self.dispatch_concurrent(chunks, new_semantics_block, valid_names, concurrency)
683 .await
684 }
685 }
686
687 async fn dispatch_sequential(
688 &self,
689 chunks: Vec<Vec<CanonicalWithChildren<semantic_node::Model>>>,
690 new_semantics_block: String,
691 valid_names: Arc<HashSet<String>>,
692 ) -> Result<Vec<Vec<SemanticMergeDecision>>> {
693 let total = chunks.len();
694 let mut out = Vec::with_capacity(total);
695 for (idx, chunk) in chunks.into_iter().enumerate() {
696 let runner = self.build_chunk_runner(
697 idx,
698 total,
699 &chunk,
700 &new_semantics_block,
701 valid_names.clone(),
702 );
703 out.push(runner.run().await?);
704 }
705 Ok(out)
706 }
707
708 async fn dispatch_concurrent(
709 &self,
710 chunks: Vec<Vec<CanonicalWithChildren<semantic_node::Model>>>,
711 new_semantics_block: String,
712 valid_names: Arc<HashSet<String>>,
713 concurrency: usize,
714 ) -> Result<Vec<Vec<SemanticMergeDecision>>> {
715 let total = chunks.len();
716 let mut joins: JoinSet<(usize, Result<Vec<SemanticMergeDecision>>)> = JoinSet::new();
717 let mut pending = chunks.into_iter().enumerate().collect::<Vec<_>>();
718 let mut alive = 0usize;
719 let mut results: Vec<Option<Vec<SemanticMergeDecision>>> =
720 (0..total).map(|_| None).collect();
721 while !pending.is_empty() || alive > 0 {
722 while alive < concurrency && !pending.is_empty() {
723 let (idx, chunk) = pending.remove(0);
724 let runner = self.build_chunk_runner(
725 idx,
726 total,
727 &chunk,
728 &new_semantics_block,
729 valid_names.clone(),
730 );
731 joins.spawn(async move { (idx, runner.run().await) });
732 alive += 1;
733 }
734 if let Some(joined) = joins.join_next().await {
735 alive -= 1;
736 let (idx, res) = joined.map_err(|e| KgError::other(format!("merge join: {e}")))?;
737 results[idx] = Some(res?);
738 }
739 }
740 Ok(results.into_iter().map(|o| o.unwrap_or_default()).collect())
741 }
742}
743
744struct SemanticMergeAggregator;
746
747impl SemanticMergeAggregator {
748 fn name_index(new_semantics: &[ExtractedSemantic]) -> std::collections::HashMap<String, usize> {
749 new_semantics
750 .iter()
751 .enumerate()
752 .map(|(idx, s)| (s.name.to_lowercase(), idx))
753 .collect()
754 }
755
756 fn known_canonical_ids(
757 chunks: &[Vec<CanonicalWithChildren<semantic_node::Model>>],
758 ) -> std::collections::HashSet<i32> {
759 chunks
760 .iter()
761 .flat_map(|chunk| chunk.iter().map(|c| c.canonical.id))
762 .collect()
763 }
764
765 fn merge(
766 aggregated: &mut [AggregatedSemanticMergeDecision],
767 chunk_results: Vec<Vec<SemanticMergeDecision>>,
768 by_name: &std::collections::HashMap<String, usize>,
769 known_targets: &std::collections::HashSet<i32>,
770 ) {
771 for decisions in chunk_results {
772 for decision in decisions {
773 let Some(&idx) = by_name.get(&decision.new_semantic_name.to_lowercase()) else {
774 tracing::warn!(
775 "Semantic merge decision referenced unknown new_semantic_name '{}'; ignoring",
776 decision.new_semantic_name,
777 );
778 continue;
779 };
780 let agg = &mut aggregated[idx];
781 for target in decision.merge_target_ids {
782 if !known_targets.contains(&target) {
783 tracing::warn!(
784 "Semantic merge decision for '{}' targeted unknown sem-{}; dropping",
785 agg.new_semantic_name,
786 target
787 );
788 continue;
789 }
790 if !agg.merge_target_ids.contains(&target) {
791 agg.merge_target_ids.push(target);
792 }
793 }
794 Self::adopt_longer(&mut agg.appended_description, decision.appended_description);
795 }
796 }
797 }
798
799 fn adopt_longer(slot: &mut Option<String>, candidate: Option<String>) {
800 if let Some(candidate) = candidate {
801 if candidate.trim().is_empty() {
802 return;
803 }
804 match slot {
805 Some(existing) if existing.len() >= candidate.len() => {}
806 _ => *slot = Some(candidate),
807 }
808 }
809 }
810}
811
812impl SemanticMerger {
815 fn render_new_block(new_semantics: &[ExtractedSemantic]) -> String {
816 let mut buf = String::new();
817 for sem in new_semantics {
818 buf.push_str(&format!(
819 "Category: {}\nName: {}\nDefinition: {}\nDescription: {}\n\n",
820 sem.category, sem.name, sem.definition, sem.description,
821 ));
822 }
823 buf
824 }
825
826 fn render_candidate_chunk(chunk: &[CanonicalWithChildren<semantic_node::Model>]) -> String {
827 let mut buf = String::new();
828 for entry in chunk {
829 let canonical = &entry.canonical;
830 buf.push_str(&format!(
831 "ID: {}\nCategory: {}\nName: {}\nDefinition: {}\nDescription: {}\n",
832 canonical.id,
833 canonical.category,
834 canonical.name,
835 canonical.definition,
836 canonical.description,
837 ));
838 if !entry.raw_children.is_empty() {
839 buf.push_str(
840 "Merged from (historical raws — for context, not selectable as merge_target_ids):\n",
841 );
842 for raw in &entry.raw_children {
843 buf.push_str(&format!(
844 " - Name: {}\n Definition: {}\n Description: {}\n",
845 raw.name, raw.definition, raw.description,
846 ));
847 }
848 }
849 buf.push('\n');
850 }
851 buf
852 }
853
854 fn pack_chunks(
855 model: &OpenAIModel,
856 candidates: Vec<CanonicalWithChildren<semantic_node::Model>>,
857 chunk_token_budget: usize,
858 ) -> Vec<Vec<CanonicalWithChildren<semantic_node::Model>>> {
859 let mut chunks: Vec<Vec<CanonicalWithChildren<semantic_node::Model>>> = Vec::new();
860 let mut current: Vec<CanonicalWithChildren<semantic_node::Model>> = Vec::new();
861 let mut current_tokens: usize = 0;
862 for entry in candidates {
863 let single = Self::render_candidate_chunk(std::slice::from_ref(&entry));
864 let cost = crate::learn::count_tokens(model, &single);
865 if !current.is_empty() && current_tokens + cost > chunk_token_budget {
866 chunks.push(std::mem::take(&mut current));
867 current_tokens = 0;
868 }
869 current.push(entry);
870 current_tokens += cost;
871 }
872 if !current.is_empty() {
873 chunks.push(current);
874 }
875 chunks
876 }
877}
878
879#[derive(Debug, Clone, Deserialize, JsonSchema)]
884pub struct FindingMergeDecision {
885 pub reason: String,
887 pub new_finding_title: String,
891 pub merge_target_ids: Vec<i32>,
894 pub appended_description: Option<String>,
899 pub appended_patterns: Option<String>,
902 pub appended_exploits: Option<String>,
905}
906
907#[derive(Debug, Clone)]
908#[tool(
909 arguments = FindingMergeDecision,
910 invoke = invoke,
911 description = "Emit one merge decision for a new raw finding. Call once per `new_finding_title` shown in the prompt. `merge_target_ids` may list multiple canonicals from THIS chunk; an empty list means NEW (no match in this chunk). The canonical's `title`, `severity`, and `root_cause` are stable identity and are NEVER modified by this decision — only `appended_description` / `appended_patterns` / `appended_exploits` (optional, when merging) are appended to the canonical's existing fields. The tool validates `new_finding_title` against the prompt's new-findings list and `merge_target_ids` against this chunk's candidate IDs; invalid decisions are rejected with an error message so you can re-emit a corrected one.",
912 name = "emit_finding_merge_decision",
913)]
914struct EmitFindingMergeDecisionTool {
915 buffer: Arc<AgentChunkBuffer<FindingMergeDecision>>,
916 valid_titles: Arc<HashSet<String>>,
918 valid_target_ids: Arc<HashSet<i32>>,
920}
921
922impl EmitFindingMergeDecisionTool {
923 async fn invoke(&self, args: FindingMergeDecision) -> std::result::Result<String, LLMYError> {
924 if args.new_finding_title.trim().is_empty() {
925 return Ok("error: `new_finding_title` must not be empty".to_string());
926 }
927 if !self
928 .valid_titles
929 .contains(&args.new_finding_title.to_lowercase())
930 {
931 return Ok(format!(
932 "error: `new_finding_title` '{}' does not match any title in this prompt's new-findings list. Re-emit using one of the titles shown there (case-insensitive).",
933 args.new_finding_title,
934 ));
935 }
936 let invalid: Vec<i32> = args
937 .merge_target_ids
938 .iter()
939 .filter(|id| !self.valid_target_ids.contains(id))
940 .copied()
941 .collect();
942 if !invalid.is_empty() {
943 return Ok(format!(
944 "error: `merge_target_ids` contains IDs not in this chunk's candidate list: {invalid:?}. Re-emit this decision using only canonical IDs that appear in the candidate list above. If '{}' has no match in this chunk, emit `merge_target_ids: []`.",
945 args.new_finding_title,
946 ));
947 }
948 Ok(self
949 .buffer
950 .push_with_message(args, "finding merge decision")
951 .await)
952 }
953}
954
955#[derive(Debug, Clone)]
956#[tool(
957 arguments = FinalizeArgs,
958 invoke = invoke,
959 description = "Signal that every newly-extracted finding listed in the prompt has received a merge decision (with possibly empty target_ids). Call exactly once, then stop.",
960 name = "finalize_finding_merge",
961)]
962struct FinalizeFindingMergeTool {
963 buffer: Arc<AgentChunkBuffer<FindingMergeDecision>>,
964}
965
966impl FinalizeFindingMergeTool {
967 async fn invoke(&self, args: FinalizeArgs) -> std::result::Result<String, LLMYError> {
968 Ok(self
969 .buffer
970 .finalize_with_message(args.summary, "finding merge")
971 .await)
972 }
973}
974
975pub struct FindingMergeChunkRunner {
976 pub llm: LLM,
977 pub options: AgentRunOptions,
978 pub system_prompt: String,
979 pub user_prompt: String,
980 pub cache_key: String,
981 pub label: String,
982 pub valid_titles: Arc<HashSet<String>>,
983 pub valid_target_ids: Arc<HashSet<i32>>,
984}
985
986impl FindingMergeChunkRunner {
987 pub async fn run(self) -> Result<Vec<FindingMergeDecision>> {
988 let buffer = AgentChunkBuffer::<FindingMergeDecision>::new();
989
990 let mut tools = ToolBox::new();
991 tools.add_tool(EmitFindingMergeDecisionTool {
992 buffer: buffer.clone(),
993 valid_titles: self.valid_titles,
994 valid_target_ids: self.valid_target_ids,
995 });
996 tools.add_tool(FinalizeFindingMergeTool {
997 buffer: buffer.clone(),
998 });
999
1000 let runner = AgentChunkRunner {
1001 llm: self.llm,
1002 options: self.options,
1003 buffer: buffer.clone(),
1004 tools,
1005 system_prompt: self.system_prompt,
1006 user_prompt: self.user_prompt,
1007 cache_key: self.cache_key,
1008 label: self.label,
1009 };
1010 let _outcome = runner.run().await?;
1011 Ok(buffer.drain().await)
1012 }
1013}
1014
1015#[derive(Debug, Clone)]
1019pub struct FindingCanonicalWithTaxonomy {
1020 pub canonical: audit_finding::Model,
1021 pub category: finding_category::Model,
1022 pub raw_children: Vec<audit_finding::Model>,
1023}
1024
1025pub struct FindingMerger {
1026 pub new_findings: Vec<ExtractedFinding>,
1027 pub candidates: Vec<FindingCanonicalWithTaxonomy>,
1028 pub llm: LLM,
1029 pub agent_options: AgentRunOptions,
1030 pub chunking: MergeChunkingOptions,
1031 pub cache_key_root: String,
1032 pub debug_key_root: String,
1033 pub label_root: String,
1034}
1035
1036#[derive(Debug, Clone)]
1037pub struct AggregatedFindingMergeDecision {
1038 pub new_finding_title: String,
1039 pub merge_target_ids: Vec<i32>,
1040 pub appended_description: Option<String>,
1042 pub appended_patterns: Option<String>,
1044 pub appended_exploits: Option<String>,
1046}
1047
1048impl FindingMerger {
1049 pub async fn run(mut self) -> Result<Vec<AggregatedFindingMergeDecision>> {
1050 if self.new_findings.is_empty() {
1051 return Ok(Vec::new());
1052 }
1053 let mut aggregated: Vec<AggregatedFindingMergeDecision> = self
1054 .new_findings
1055 .iter()
1056 .map(|f| AggregatedFindingMergeDecision {
1057 new_finding_title: f.title.clone(),
1058 merge_target_ids: Vec::new(),
1059 appended_description: None,
1060 appended_patterns: None,
1061 appended_exploits: None,
1062 })
1063 .collect();
1064 let candidates = std::mem::take(&mut self.candidates);
1065 if candidates.is_empty() {
1066 return Ok(aggregated);
1067 }
1068 let chunks = self.pack_into_chunks(candidates);
1069 tracing::info!(
1070 "{}: {} candidate chunks (concurrency={}, candidate_token_budget={})",
1071 self.label_root,
1072 chunks.len(),
1073 self.chunking.concurrency,
1074 self.chunking.chunk_candidate_token_budget,
1075 );
1076 let by_title: std::collections::HashMap<String, usize> = self
1077 .new_findings
1078 .iter()
1079 .enumerate()
1080 .map(|(idx, f)| (f.title.to_lowercase(), idx))
1081 .collect();
1082 let known_targets: std::collections::HashSet<i32> = chunks
1083 .iter()
1084 .flat_map(|chunk| chunk.iter().map(|c| c.canonical.id))
1085 .collect();
1086 let valid_titles: Arc<HashSet<String>> = Arc::new(by_title.keys().cloned().collect());
1090 let new_findings_block = Self::render_new_block(&self.new_findings);
1091 let chunk_results = self
1092 .dispatch_chunks(chunks, new_findings_block, valid_titles)
1093 .await?;
1094 for decisions in chunk_results {
1095 for decision in decisions {
1096 let Some(&idx) = by_title.get(&decision.new_finding_title.to_lowercase()) else {
1097 tracing::warn!(
1098 "Finding merge decision referenced unknown new_finding_title '{}'; ignoring",
1099 decision.new_finding_title,
1100 );
1101 continue;
1102 };
1103 let agg = &mut aggregated[idx];
1104 for target in decision.merge_target_ids {
1105 if !known_targets.contains(&target) {
1106 tracing::warn!(
1107 "Finding merge decision for '{}' targeted unknown finding-{}; dropping",
1108 agg.new_finding_title,
1109 target
1110 );
1111 continue;
1112 }
1113 if !agg.merge_target_ids.contains(&target) {
1114 agg.merge_target_ids.push(target);
1115 }
1116 }
1117 SemanticMergeAggregator::adopt_longer(
1118 &mut agg.appended_description,
1119 decision.appended_description,
1120 );
1121 SemanticMergeAggregator::adopt_longer(
1122 &mut agg.appended_patterns,
1123 decision.appended_patterns,
1124 );
1125 SemanticMergeAggregator::adopt_longer(
1126 &mut agg.appended_exploits,
1127 decision.appended_exploits,
1128 );
1129 }
1130 }
1131 Ok(aggregated)
1132 }
1133
1134 fn pack_into_chunks(
1135 &self,
1136 candidates: Vec<FindingCanonicalWithTaxonomy>,
1137 ) -> Vec<Vec<FindingCanonicalWithTaxonomy>> {
1138 let model = self.llm.model.clone();
1139 Self::pack_chunks(
1140 &model,
1141 candidates,
1142 self.chunking.chunk_candidate_token_budget,
1143 )
1144 }
1145
1146 fn build_chunk_runner(
1147 &self,
1148 idx: usize,
1149 total: usize,
1150 chunk: &[FindingCanonicalWithTaxonomy],
1151 new_findings_block: &str,
1152 valid_titles: Arc<HashSet<String>>,
1153 ) -> FindingMergeChunkRunner {
1154 let user_prompt = crate::prompts::merge_findings_user_message(
1155 &Self::render_candidate_chunk(chunk),
1156 new_findings_block,
1157 );
1158 let cache_key = format!("{}-chunk{idx:04}", self.cache_key_root);
1159 let debug_scope = format!("{}-chunk{idx:04}", self.debug_key_root);
1160 let label = format!("{}-chunk{idx:04}of{total:04}", self.label_root);
1161 let valid_target_ids: Arc<HashSet<i32>> =
1162 Arc::new(chunk.iter().map(|c| c.canonical.id).collect());
1163 FindingMergeChunkRunner {
1164 llm: self.llm.clone(),
1165 options: self.agent_options.scoped(&debug_scope),
1166 system_prompt: crate::prompts::GENERAL_ROLE_SYSTEM.to_string(),
1167 user_prompt,
1168 cache_key,
1169 label,
1170 valid_titles,
1171 valid_target_ids,
1172 }
1173 }
1174
1175 async fn dispatch_chunks(
1176 &self,
1177 chunks: Vec<Vec<FindingCanonicalWithTaxonomy>>,
1178 new_findings_block: String,
1179 valid_titles: Arc<HashSet<String>>,
1180 ) -> Result<Vec<Vec<FindingMergeDecision>>> {
1181 let total = chunks.len();
1182 let concurrency = self.chunking.concurrency.max(1);
1183 if concurrency <= 1 || total <= 1 {
1184 self.dispatch_sequential(chunks, new_findings_block, valid_titles)
1185 .await
1186 } else {
1187 self.dispatch_concurrent(chunks, new_findings_block, valid_titles, concurrency)
1188 .await
1189 }
1190 }
1191
1192 async fn dispatch_sequential(
1193 &self,
1194 chunks: Vec<Vec<FindingCanonicalWithTaxonomy>>,
1195 new_findings_block: String,
1196 valid_titles: Arc<HashSet<String>>,
1197 ) -> Result<Vec<Vec<FindingMergeDecision>>> {
1198 let total = chunks.len();
1199 let mut out = Vec::with_capacity(total);
1200 for (idx, chunk) in chunks.into_iter().enumerate() {
1201 let runner = self.build_chunk_runner(
1202 idx,
1203 total,
1204 &chunk,
1205 &new_findings_block,
1206 valid_titles.clone(),
1207 );
1208 out.push(runner.run().await?);
1209 }
1210 Ok(out)
1211 }
1212
1213 async fn dispatch_concurrent(
1214 &self,
1215 chunks: Vec<Vec<FindingCanonicalWithTaxonomy>>,
1216 new_findings_block: String,
1217 valid_titles: Arc<HashSet<String>>,
1218 concurrency: usize,
1219 ) -> Result<Vec<Vec<FindingMergeDecision>>> {
1220 let total = chunks.len();
1221 let mut joins: JoinSet<(usize, Result<Vec<FindingMergeDecision>>)> = JoinSet::new();
1222 let mut pending = chunks.into_iter().enumerate().collect::<Vec<_>>();
1223 let mut alive = 0usize;
1224 let mut results: Vec<Option<Vec<FindingMergeDecision>>> =
1225 (0..total).map(|_| None).collect();
1226 while !pending.is_empty() || alive > 0 {
1227 while alive < concurrency && !pending.is_empty() {
1228 let (idx, chunk) = pending.remove(0);
1229 let runner = self.build_chunk_runner(
1230 idx,
1231 total,
1232 &chunk,
1233 &new_findings_block,
1234 valid_titles.clone(),
1235 );
1236 joins.spawn(async move { (idx, runner.run().await) });
1237 alive += 1;
1238 }
1239 if let Some(joined) = joins.join_next().await {
1240 alive -= 1;
1241 let (idx, res) = joined.map_err(|e| KgError::other(format!("merge join: {e}")))?;
1242 results[idx] = Some(res?);
1243 }
1244 }
1245 Ok(results.into_iter().map(|o| o.unwrap_or_default()).collect())
1246 }
1247}
1248
1249impl FindingMerger {
1252 fn render_new_block(new_findings: &[ExtractedFinding]) -> String {
1253 let mut buf = String::new();
1254 for finding in new_findings {
1255 buf.push_str(&format!(
1256 "Severity: {}\nCategory: {}\nSubcategory: {}\nTitle: {}\nRoot Cause: {}\nDescription: {}\nPatterns: {}\nExploits: {}\n\n",
1257 finding.severity,
1258 finding.category,
1259 finding.subcategory,
1260 finding.title,
1261 finding.root_cause,
1262 finding.description,
1263 finding.patterns,
1264 finding.exploits,
1265 ));
1266 }
1267 buf
1268 }
1269
1270 fn render_candidate_chunk(chunk: &[FindingCanonicalWithTaxonomy]) -> String {
1271 let mut buf = String::new();
1272 for entry in chunk {
1273 let f = &entry.canonical;
1274 buf.push_str(&format!(
1275 "ID: {}\nSeverity: {}\nCategory: {}\nSubcategory: {}\nTitle: {}\nRoot Cause: {}\nDescription: {}\nPatterns: {}\nExploits: {}\n",
1276 f.id,
1277 f.severity,
1278 entry.category.category,
1279 entry.category.name,
1280 f.title,
1281 f.root_cause,
1282 f.description,
1283 f.patterns,
1284 f.exploits,
1285 ));
1286 if !entry.raw_children.is_empty() {
1287 buf.push_str(
1288 "Merged from (historical raws — for context, not selectable as merge_target_ids):\n",
1289 );
1290 for raw in &entry.raw_children {
1291 buf.push_str(&format!(
1292 " - Title: {}\n Severity: {}\n Root Cause: {}\n Description: {}\n",
1293 raw.title, raw.severity, raw.root_cause, raw.description,
1294 ));
1295 }
1296 }
1297 buf.push('\n');
1298 }
1299 buf
1300 }
1301
1302 fn pack_chunks(
1303 model: &OpenAIModel,
1304 candidates: Vec<FindingCanonicalWithTaxonomy>,
1305 chunk_token_budget: usize,
1306 ) -> Vec<Vec<FindingCanonicalWithTaxonomy>> {
1307 let mut chunks: Vec<Vec<FindingCanonicalWithTaxonomy>> = Vec::new();
1308 let mut current: Vec<FindingCanonicalWithTaxonomy> = Vec::new();
1309 let mut current_tokens: usize = 0;
1310 for entry in candidates {
1311 let single = Self::render_candidate_chunk(std::slice::from_ref(&entry));
1312 let cost = crate::learn::count_tokens(model, &single);
1313 if !current.is_empty() && current_tokens + cost > chunk_token_budget {
1314 chunks.push(std::mem::take(&mut current));
1315 current_tokens = 0;
1316 }
1317 current.push(entry);
1318 current_tokens += cost;
1319 }
1320 if !current.is_empty() {
1321 chunks.push(current);
1322 }
1323 chunks
1324 }
1325}