Skip to main content

knowdit_kg/
learn.rs

1use crate::agent_runner::AgentRunOptions;
2use crate::agents::{
3    AggregatedFindingMergeDecision, AggregatedSemanticMergeDecision, CategorizeRunner,
4    FindingChunkExtractor, FindingMerger, MergeChunkingOptions, SemanticChunkExtractor,
5    SemanticMerger,
6};
7use crate::category::DeFiCategory;
8use crate::db::HistoricalDatabase;
9use crate::error::{KgError, Result};
10pub use crate::link::{
11    FindingLinkOptions, PendingFindingForLinking, PersistedFindingLinkResult, link_pending_findings,
12};
13use crate::project_loader::ProjectData;
14use crate::prompts;
15use crate::vulnerability::{VulnerabilityCategory, resolve_taxonomy_entry};
16use itertools::Itertools;
17pub use knowdit_kg_model::{ExtractedFinding, ExtractedFunction, ExtractedSemantic};
18use llmy::client::client::LLM;
19use llmy::client::context::TokenCursor;
20use llmy::client::model::OpenAIModel;
21use llmy::tokenizer;
22use serde::Deserialize;
23use std::collections::{HashMap, HashSet};
24use std::time::Instant;
25
26// ── LLM response types ──────────────────────────────────────────────
27//
28// The categorize / extract semantic / extract finding / semantic merge /
29// finding merge phases all use llmy `Agent` + tool-calls now; their tool
30// arguments + tool implementations live in [`crate::agents`]. Only the
31// in-project linking phase still uses the JSON-mode prompt+parse pattern,
32// because it's a single LLM call producing a small, simply-shaped index map.
33
34#[derive(Debug, Deserialize)]
35#[allow(dead_code)]
36struct InProjectLinkEntry {
37    finding_index: usize,
38    semantic_indices: Vec<usize>,
39    #[serde(default)]
40    reasoning: String,
41}
42
43#[derive(Debug, Deserialize)]
44struct InProjectLinkResponse {
45    links: Vec<InProjectLinkEntry>,
46}
47
48fn render_semantics_for_in_project_link(semantics: &[ExtractedSemantic]) -> String {
49    semantics
50        .iter()
51        .enumerate()
52        .map(|(idx, s)| {
53            format!(
54                "S{idx}\nname: {name}\ncategory: {category}\ndefinition: {definition}\ndescription: {description}\n\n",
55                idx = idx,
56                name = s.name,
57                category = s.category.as_str(),
58                definition = s.definition.trim(),
59                description = s.description.trim()
60            )
61        })
62        .collect::<String>()
63}
64
65fn render_findings_for_in_project_link(findings: &[ExtractedFinding]) -> String {
66    findings
67        .iter()
68        .enumerate()
69        .map(|(idx, f)| {
70            format!(
71                "F{idx}\ntitle: {title}\nseverity: {severity}\ncategory: {category}\nsubcategory: {subcategory}\nroot_cause: {root_cause}\ndescription: {description}\npatterns: {patterns}\n\n",
72                idx = idx,
73                title = f.title,
74                severity = f.severity,
75                category = f.category,
76                subcategory = f.subcategory,
77                root_cause = f.root_cause.trim(),
78                description = f.description.trim(),
79                patterns = f.patterns.trim(),
80            )
81        })
82        .collect::<String>()
83}
84
85// ── Public merge types (used by HistoricalDatabase) ────────────────────────
86
87#[derive(Debug, Clone)]
88pub enum MergeAction {
89    /// Admit the new raw as a fresh canonical (no existing matches).
90    New,
91    /// Fold the new raw into one *or more* existing canonicals. Each
92    /// `target_id` becomes one row in `semantic_merge`. The canonical's
93    /// `name` and `definition` are stable identity and are NEVER touched
94    /// by a merge. `appended_description`, when present, is appended to
95    /// each merged-into canonical's existing description.
96    Merge {
97        target_ids: Vec<i32>,
98        appended_description: Option<String>,
99    },
100}
101
102#[derive(Debug, Clone)]
103pub struct MergeResult {
104    pub semantic: ExtractedSemantic,
105    pub action: MergeAction,
106}
107
108#[derive(Debug, Clone)]
109pub enum FindingMergeAction {
110    /// Admit the new raw finding as a fresh canonical.
111    New,
112    /// Fold the new raw into one *or more* existing canonical findings.
113    /// Canonical's `title`, `severity`, and `root_cause` are stable
114    /// identity and are NEVER touched. The `appended_*` fields are
115    /// concatenated to the canonical's existing description / patterns /
116    /// exploits at write time.
117    Merge {
118        target_ids: Vec<i32>,
119        appended_description: Option<String>,
120        appended_patterns: Option<String>,
121        appended_exploits: Option<String>,
122    },
123}
124
125#[derive(Debug, Clone)]
126pub struct FindingMergeResult {
127    pub finding: ExtractedFinding,
128    pub action: FindingMergeAction,
129}
130
131// ── ProjectData learning pipeline ───────────────────────────────────
132
133/// Index-based links produced by the in-project linking step. Each
134/// `(finding_index, semantic_index)` pair refers to positions in the
135/// parent `ExtractResult.findings` and `ExtractResult.semantics` arrays.
136/// The atomic admission step translates these positional indices into
137/// concrete row ids for `semantic_finding_link` after all raw inserts.
138#[derive(Debug, Clone, Default)]
139pub struct InProjectLinks {
140    pub edges: Vec<(usize, usize)>,
141}
142
143impl InProjectLinks {
144    pub fn is_empty(&self) -> bool {
145        self.edges.is_empty()
146    }
147}
148
149/// Intermediate result from the categorize + extract + in-project link phase.
150/// Can be computed concurrently across projects (no DB writes happen here).
151pub struct ExtractResult {
152    pub categories: Vec<DeFiCategory>,
153    pub semantics: Vec<ExtractedSemantic>,
154    pub findings: Vec<ExtractedFinding>,
155    /// In-project semantic↔finding links: every finding has ≥1 entry
156    /// (LLM-enforced). Indices are positional in `semantics` / `findings`.
157    pub in_project_links: InProjectLinks,
158}
159
160impl ProjectData {
161    /// Phase 1: Categorize the project and extract semantics.
162    /// Safe to run concurrently across multiple projects.
163    pub async fn categorize_and_extract(
164        &self,
165        llm: &LLM,
166        agent_options: &AgentRunOptions,
167        chunk_input_budget: Option<usize>,
168    ) -> Result<ExtractResult> {
169        let pid = self.display_id();
170
171        tracing::info!(
172            "Processing project {}: {} ({} source files)",
173            pid,
174            self.name(),
175            self.source_files().len()
176        );
177
178        if self.source_files().is_empty() {
179            tracing::warn!("No source files found for project {}", pid);
180            return Ok(ExtractResult {
181                categories: vec![],
182                semantics: vec![],
183                findings: vec![],
184                in_project_links: InProjectLinks::default(),
185            });
186        }
187
188        let categories = self.categorize(llm, agent_options).await?;
189        tracing::info!("Project {} categorized as: {:?}", pid, categories);
190
191        let (all_semantics, all_findings) = tokio::try_join!(
192            self.extract_semantics(llm, &categories, agent_options, chunk_input_budget),
193            self.extract_findings(llm, &categories, agent_options, chunk_input_budget)
194        )?;
195
196        tracing::info!(
197            "Extracted {} raw semantics from project {}",
198            all_semantics.len(),
199            pid
200        );
201
202        tracing::info!(
203            "Extracted {} raw findings from project {}",
204            all_findings.len(),
205            pid
206        );
207
208        let deduped = Self::dedup_semantics(all_semantics);
209        let deduped_findings = Self::dedup_findings(all_findings);
210        tracing::info!(
211            "After intra-project dedup: {} semantics for project {}",
212            deduped.len(),
213            pid
214        );
215
216        tracing::info!(
217            "After intra-project dedup: {} findings for project {}",
218            deduped_findings.len(),
219            pid
220        );
221
222        // Run the in-project linking step before any cross-project merge.
223        // Every finding must claim ≥1 same-project semantic; this is what
224        // grounds raw findings to raw semantics inside the atomic admission
225        // transaction even when both later get merged into canonicals.
226        let in_project_links = if deduped_findings.is_empty() || deduped.is_empty() {
227            InProjectLinks::default()
228        } else {
229            self.link_findings_in_project(llm, &categories, &deduped, &deduped_findings)
230                .await?
231        };
232        tracing::info!(
233            "In-project linking for project {}: {} edge(s) over {} finding(s)",
234            pid,
235            in_project_links.edges.len(),
236            deduped_findings.len()
237        );
238
239        Ok(ExtractResult {
240            categories,
241            semantics: deduped,
242            findings: deduped_findings,
243            in_project_links,
244        })
245    }
246
247    /// Categorize the project and extract only project semantics.
248    /// This is used by consumers that need semantic context but do not need audit findings.
249    pub async fn categorize_and_extract_semantics(
250        &self,
251        llm: &LLM,
252        agent_options: &AgentRunOptions,
253        chunk_input_budget: Option<usize>,
254    ) -> Result<ExtractResult> {
255        let pid = self.display_id();
256
257        tracing::info!(
258            "Processing project {} for semantics only: {} ({} source files)",
259            pid,
260            self.name(),
261            self.source_files().len()
262        );
263
264        if self.source_files().is_empty() {
265            tracing::warn!("No source files found for project {}", pid);
266            return Ok(ExtractResult {
267                categories: vec![],
268                semantics: vec![],
269                findings: vec![],
270                in_project_links: InProjectLinks::default(),
271            });
272        }
273
274        let categories = self.categorize(llm, agent_options).await?;
275        tracing::info!("Project {} categorized as: {:?}", pid, categories);
276
277        let all_semantics = self
278            .extract_semantics(llm, &categories, agent_options, chunk_input_budget)
279            .await?;
280        tracing::info!(
281            "Extracted {} raw semantics from project {}",
282            all_semantics.len(),
283            pid
284        );
285
286        let deduped = Self::dedup_semantics(all_semantics);
287        tracing::info!(
288            "After intra-project dedup: {} semantics for project {}",
289            deduped.len(),
290            pid
291        );
292
293        Ok(ExtractResult {
294            categories,
295            semantics: deduped,
296            findings: Vec::new(),
297            in_project_links: InProjectLinks::default(),
298        })
299    }
300
301    /// Phase 2: Merge extracted semantics with existing KB and write to DB.
302    /// MUST be run serially (one project at a time) to avoid merge conflicts.
303    /// Commit one project's full merge output to the historical KG.
304    ///
305    /// Begins its own transaction via
306    /// [`HistoricalDatabase::write_project_completed`] under the
307    /// hood and discards the new-canonical id list. Use this when
308    /// the caller doesn't need to compose additional writes
309    /// (`pending_semantic` enqueue etc.) inside the same
310    /// transaction. Bulk learn paths (`learn moves` / `learn c4`
311    /// / `learn projects`) go through here because they never run
312    /// retro-link.
313    pub async fn merge_and_write(
314        &self,
315        db: &HistoricalDatabase,
316        llm: &LLM,
317        extract: &ExtractResult,
318        agent_options: &AgentRunOptions,
319        merge_chunking: MergeChunkingOptions,
320    ) -> Result<()> {
321        let txn = db.begin().await?;
322        self.merge_and_write_txn(&txn, db, llm, extract, agent_options, merge_chunking)
323            .await?;
324        txn.commit().await?;
325        Ok(())
326    }
327
328    /// Transaction-scoped variant of [`Self::merge_and_write`].
329    /// Performs the merge-LLM passes (against the LIVE DB — these
330    /// are reads, not writes, so don't depend on the txn) and then
331    /// writes the resulting rows through
332    /// [`HistoricalDatabase::write_project_completed_txn`] using
333    /// the supplied transaction. Returns the canonical semantic
334    /// ids this project newly introduced, so the caller can chain
335    /// [`HistoricalDatabase::enqueue_pending_canonical_semantics_txn`]
336    /// in the same transaction when needed (incremental
337    /// `workflow learn` flow).
338    pub async fn merge_and_write_txn(
339        &self,
340        conn: &sea_orm::DatabaseTransaction,
341        db: &HistoricalDatabase,
342        llm: &LLM,
343        extract: &ExtractResult,
344        agent_options: &AgentRunOptions,
345        merge_chunking: MergeChunkingOptions,
346    ) -> Result<Vec<i32>> {
347        let pid = self.display_id();
348
349        if extract.semantics.is_empty() && extract.findings.is_empty() {
350            let new_canonicals = db
351                .write_project_completed_txn(
352                    conn,
353                    self.name(),
354                    self.platform_id(),
355                    &extract.categories,
356                    &[],
357                    &[],
358                    &InProjectLinks::default(),
359                )
360                .await?;
361            tracing::info!("Project {} written (no semantics or findings)", pid);
362            return Ok(new_canonicals);
363        }
364
365        let semantic_merge_results = self
366            .merge_with_existing(db, llm, extract, agent_options, merge_chunking)
367            .await?;
368        let finding_merge_results = self
369            .merge_findings_with_existing(db, llm, extract, agent_options, merge_chunking)
370            .await?;
371
372        let new_canonicals = db
373            .write_project_completed_txn(
374                conn,
375                self.name(),
376                self.platform_id(),
377                &extract.categories,
378                &semantic_merge_results,
379                &finding_merge_results,
380                &extract.in_project_links,
381            )
382            .await?;
383
384        tracing::info!("Project {} fully processed and saved", pid);
385        Ok(new_canonicals)
386    }
387
388    /// Check if this project is already completed in the DB.
389    pub async fn is_completed(&self, db: &HistoricalDatabase) -> Result<bool> {
390        if let Some(pid) = self.platform_id() {
391            db.is_project_completed(pid).await
392        } else {
393            Ok(db
394                .get_project_by_name(self.name())
395                .await?
396                .map(|p| p.status == "completed")
397                .unwrap_or(false))
398        }
399    }
400
401    fn build_project_prompt_body(&self) -> String {
402        let mut content = prompts::project_user_prefix();
403        content.push_str("## Source Files\n\n");
404
405        for file in self.source_files() {
406            content.push_str(&format!(
407                "### {}\n```{}\n{}\n```\n\n",
408                file.relative_path.display(),
409                self.source_language().code_fence(),
410                file.content
411            ));
412        }
413
414        if let Some(readme) = self.load_readme() {
415            content.push_str("## README\n\n");
416            content.push_str(&readme);
417            content.push_str("\n\n");
418        }
419
420        content
421    }
422
423    fn build_report_prompt_body(&self) -> Option<String> {
424        let report = self.audit_report()?.render();
425        let mut content = prompts::report_user_prefix();
426        content.push_str(&report);
427        content.push_str("\n\n");
428        Some(content)
429    }
430
431    fn load_readme(&self) -> Option<String> {
432        for name in &["README.md", "readme.md", "Readme.md"] {
433            let readme_path = self.root_dir().join(name);
434            if readme_path.exists()
435                && let Ok(readme) = std::fs::read_to_string(&readme_path)
436            {
437                return Some(readme);
438            }
439        }
440
441        None
442    }
443
444    fn prompt_cache_key(&self) -> String {
445        sanitize_prompt_prefix(&self.display_id())
446    }
447
448    fn debug_key(&self, stage: &str) -> String {
449        format!(
450            "{}-{}",
451            sanitize_prompt_prefix(stage),
452            self.prompt_cache_key()
453        )
454    }
455
456    fn merge_cache_key(&self) -> String {
457        format!("{}-merge", self.prompt_cache_key())
458    }
459
460    fn finding_cache_key(&self) -> String {
461        format!("{}-finding", self.prompt_cache_key())
462    }
463
464    fn finding_merge_cache_key(&self) -> String {
465        format!("{}-finding-merge", self.prompt_cache_key())
466    }
467
468    // ── Private pipeline steps ──────────────────────────────────────
469
470    /// Categorize the project via an `Agent` with two tools
471    /// (`set_project_categories` + `finalize_categorization`). Fills the
472    /// context window with the README and as many source files as fit.
473    async fn categorize(
474        &self,
475        llm: &LLM,
476        agent_options: &AgentRunOptions,
477    ) -> Result<Vec<DeFiCategory>> {
478        let started_at = Instant::now();
479        let model = &llm.model;
480        let user_suffix = prompts::CATEGORIZE_USER_SUFFIX;
481        let cache_key = sanitize_prompt_prefix(&self.display_id());
482        let sys_tokens = count_tokens(model, prompts::GENERAL_ROLE_SYSTEM);
483        let suffix_tokens = count_tokens(model, user_suffix);
484        let budget = get_context_budget(model).saturating_sub(sys_tokens + suffix_tokens);
485        let content = self.build_project_prompt_body();
486        tracing::info!(
487            "categorize preparing {}: source_files={}, body_chars={}, budget={}",
488            self.display_id(),
489            self.source_files().len(),
490            content.len(),
491            budget,
492        );
493
494        let Some(mut cursor) = TokenCursor::new(content, model.clone()) else {
495            return Err(KgError::other(
496                "Failed to initialize TokenCursor for categorization",
497            ));
498        };
499        let user_prompt = format!("{}{}", cursor.next_chunk(budget).unwrap_or(""), user_suffix,);
500        tracing::info!(
501            "Categorization prompt: ~{} tokens (budget: {})",
502            sys_tokens + count_tokens(model, &user_prompt),
503            sys_tokens + budget,
504        );
505
506        let label = format!("categorize-{}", self.display_id());
507        let local_options = agent_options.scoped(&self.debug_key("categorize"));
508        let runner = CategorizeRunner {
509            llm: llm.clone(),
510            options: local_options,
511            system_prompt: prompts::GENERAL_ROLE_SYSTEM.to_string(),
512            user_prompt,
513            cache_key,
514            label,
515        };
516        let record = runner.run().await?;
517        tracing::info!(
518            "categorize finished for {} in {:?}: categories={:?} ({})",
519            self.display_id(),
520            started_at.elapsed(),
521            record.categories,
522            record.reasoning,
523        );
524        Ok(record.categories)
525    }
526
527    /// Extract semantics from the project's source files. Splits the source
528    /// text into chunks that fit the context window and runs one Agent per
529    /// chunk; each chunk's semantics are emitted via `emit_semantic` tool
530    /// calls and the chunk is closed with `finalize_semantic_extraction`.
531    pub async fn extract_semantics(
532        &self,
533        llm: &LLM,
534        categories: &[DeFiCategory],
535        agent_options: &AgentRunOptions,
536        chunk_input_budget: Option<usize>,
537    ) -> Result<Vec<ExtractedSemantic>> {
538        let system_prompt = prompts::GENERAL_ROLE_SYSTEM;
539        let model = &llm.model;
540        let debug_key = self.debug_key("extract");
541        let cache_key_root = self.prompt_cache_key();
542        let sys_tokens = count_tokens(model, system_prompt);
543        let total_budget = get_context_budget(model);
544        let user_suffix = prompts::extract_semantics_user_suffix(categories);
545        let suffix_tokens = count_tokens(model, &user_suffix);
546
547        // Caller-overridable per-chunk input budget; default = ~80% of
548        // model max input minus the fixed system + suffix overhead.
549        let chunk_budget = match chunk_input_budget {
550            Some(cap) => cap.min(total_budget.saturating_sub(sys_tokens + suffix_tokens)),
551            None => total_budget.saturating_sub(sys_tokens + suffix_tokens),
552        };
553
554        let all_files = self.build_project_prompt_body();
555        let Some(mut cursor) = TokenCursor::new(all_files, model.clone()) else {
556            return Err(KgError::other(
557                "Failed to initialize TokenCursor for extraction",
558            ));
559        };
560
561        let mut all_semantics = Vec::new();
562        let mut chunk_idx = 0usize;
563        while let Some(chunk) = cursor.next_chunk(chunk_budget) {
564            let user_prompt = format!("{}{}", chunk, user_suffix);
565            tracing::info!(
566                "Extracting semantics from chunk {} (~{} tokens, done={})",
567                chunk_idx,
568                sys_tokens + count_tokens(model, &user_prompt),
569                cursor.is_done(),
570            );
571            let chunk_label = format!("semantic-extract-{}-chunk{}", self.display_id(), chunk_idx);
572            let chunk_debug = format!("{}-chunk{}", debug_key, chunk_idx);
573            let local_options = agent_options.scoped(&chunk_debug);
574            let extractor = SemanticChunkExtractor {
575                llm: llm.clone(),
576                options: local_options,
577                system_prompt: system_prompt.to_string(),
578                user_prompt,
579                cache_key: format!("{}-chunk{}", cache_key_root, chunk_idx),
580                label: chunk_label,
581            };
582            let chunk_semantics = extractor.run().await?;
583            tracing::info!(
584                "Chunk {} produced {} semantic(s)",
585                chunk_idx,
586                chunk_semantics.len(),
587            );
588            all_semantics.extend(chunk_semantics);
589            chunk_idx += 1;
590        }
591        Ok(all_semantics)
592    }
593
594    /// LLM-driven in-project linking. Returns positional `(finding_idx,
595    /// semantic_idx)` edges. Validates that every finding got at least one
596    /// link (the LLM is instructed to enforce this; we re-check and bail
597    /// on violations rather than silently produce an unlinked finding).
598    async fn link_findings_in_project(
599        &self,
600        llm: &LLM,
601        categories: &[DeFiCategory],
602        semantics: &[ExtractedSemantic],
603        findings: &[ExtractedFinding],
604    ) -> Result<InProjectLinks> {
605        debug_assert!(!findings.is_empty() && !semantics.is_empty());
606
607        let semantics_block = render_semantics_for_in_project_link(semantics);
608        let findings_block = render_findings_for_in_project_link(findings);
609        let user_msg =
610            prompts::in_project_link_user_message(categories, &semantics_block, &findings_block);
611
612        let debug_key = self.debug_key("in-project-link");
613        let cache_key = format!("{}-in-project-link", self.prompt_cache_key());
614
615        let parsed: InProjectLinkResponse = llm
616            .prompt_json_with_retry(
617                prompts::GENERAL_ROLE_SYSTEM,
618                &user_msg,
619                Some(&debug_key),
620                Some(&cache_key),
621                None,
622            )
623            .await
624            .map_err(|err| KgError::other(format!("in-project linking request failed: {err}")))?
625            .ok_or_else(|| KgError::other("in-project linking returned no JSON payload"))?;
626
627        let mut edges: Vec<(usize, usize)> = Vec::new();
628        let mut seen: HashSet<(usize, usize)> = HashSet::new();
629        let mut covered: HashSet<usize> = HashSet::new();
630        for entry in &parsed.links {
631            let f_idx = entry.finding_index;
632            if f_idx >= findings.len() {
633                return Err(KgError::other(format!(
634                    "in-project link response references unknown finding_index {}",
635                    f_idx
636                )));
637            }
638            if entry.semantic_indices.is_empty() {
639                return Err(KgError::other(format!(
640                    "in-project link response left finding_index {} unlinked (rule violation)",
641                    f_idx
642                )));
643            }
644            for s_idx in &entry.semantic_indices {
645                if *s_idx >= semantics.len() {
646                    return Err(KgError::other(format!(
647                        "in-project link response references unknown semantic_index {} for finding {}",
648                        s_idx, f_idx
649                    )));
650                }
651                if seen.insert((f_idx, *s_idx)) {
652                    edges.push((f_idx, *s_idx));
653                }
654            }
655            covered.insert(f_idx);
656        }
657        if covered.len() != findings.len() {
658            let missing: Vec<usize> = (0..findings.len())
659                .filter(|i| !covered.contains(i))
660                .collect();
661            return Err(KgError::other(format!(
662                "in-project link response missing {} finding(s): {:?}",
663                missing.len(),
664                missing
665            )));
666        }
667
668        Ok(InProjectLinks { edges })
669    }
670
671    /// Extract audit findings from the project's report. Same chunked
672    /// agent pattern as [`Self::extract_semantics`]: one Agent per chunk,
673    /// `emit_finding` tool per finding, terminated by
674    /// `finalize_finding_extraction`.
675    async fn extract_findings(
676        &self,
677        llm: &LLM,
678        categories: &[DeFiCategory],
679        agent_options: &AgentRunOptions,
680        chunk_input_budget: Option<usize>,
681    ) -> Result<Vec<ExtractedFinding>> {
682        let Some(report_body) = self.build_report_prompt_body() else {
683            tracing::warn!("No audit report found for project {}", self.display_id());
684            return Ok(Vec::new());
685        };
686
687        let system_prompt = prompts::GENERAL_ROLE_SYSTEM;
688        let model = &llm.model;
689        let debug_key = self.debug_key("finding-extract");
690        let cache_key_root = self.finding_cache_key();
691        let sys_tokens = count_tokens(model, system_prompt);
692        let total_budget = get_context_budget(model);
693        let user_suffix = prompts::extract_findings_user_suffix(categories);
694        let suffix_tokens = count_tokens(model, &user_suffix);
695        let chunk_budget = match chunk_input_budget {
696            Some(cap) => cap.min(total_budget.saturating_sub(sys_tokens + suffix_tokens)),
697            None => total_budget.saturating_sub(sys_tokens + suffix_tokens),
698        };
699
700        let Some(mut cursor) = TokenCursor::new(report_body, model.clone()) else {
701            return Err(KgError::other(
702                "Failed to initialize TokenCursor for finding extraction",
703            ));
704        };
705
706        let mut all_findings = Vec::new();
707        let mut chunk_idx = 0usize;
708        while let Some(chunk) = cursor.next_chunk(chunk_budget) {
709            let user_prompt = format!("{}{}", chunk, user_suffix);
710            tracing::info!(
711                "Extracting findings from chunk {} (~{} tokens, done={})",
712                chunk_idx,
713                sys_tokens + count_tokens(model, &user_prompt),
714                cursor.is_done(),
715            );
716            let chunk_label = format!("finding-extract-{}-chunk{}", self.display_id(), chunk_idx);
717            let chunk_debug = format!("{}-chunk{}", debug_key, chunk_idx);
718            let local_options = agent_options.scoped(&chunk_debug);
719            let extractor = FindingChunkExtractor {
720                llm: llm.clone(),
721                options: local_options,
722                system_prompt: system_prompt.to_string(),
723                user_prompt,
724                cache_key: format!("{}-chunk{}", cache_key_root, chunk_idx),
725                label: chunk_label,
726            };
727            let raw_findings = extractor.run().await?;
728            tracing::info!(
729                "Chunk {} produced {} finding(s)",
730                chunk_idx,
731                raw_findings.len(),
732            );
733            for finding in raw_findings {
734                all_findings.push(Self::canonicalize_finding(finding)?);
735            }
736            chunk_idx += 1;
737        }
738
739        Ok(all_findings)
740    }
741
742    /// Deduplicate semantics by name (case-insensitive). Keeps the longer
743    /// description and merges function lists.
744    fn dedup_semantics(semantics: Vec<ExtractedSemantic>) -> Vec<ExtractedSemantic> {
745        let mut by_name: HashMap<String, ExtractedSemantic> = HashMap::new();
746
747        for sem in semantics {
748            let key = sem.name.to_lowercase().trim().to_string();
749            if let Some(existing) = by_name.get_mut(&key) {
750                for func in sem.functions {
751                    let already_has = existing
752                        .functions
753                        .iter()
754                        .any(|f| f.name == func.name && f.contract == func.contract);
755                    if !already_has {
756                        existing.functions.push(func);
757                    }
758                }
759                if sem.description.len() > existing.description.len() {
760                    existing.description = sem.description;
761                    existing.definition = sem.definition;
762                }
763            } else {
764                by_name.insert(key, sem);
765            }
766        }
767
768        by_name.into_values().collect()
769    }
770
771    fn dedup_findings(findings: Vec<ExtractedFinding>) -> Vec<ExtractedFinding> {
772        let mut by_title: HashMap<String, ExtractedFinding> = HashMap::new();
773
774        for finding in findings {
775            let key = finding.title.to_lowercase().trim().to_string();
776            if let Some(existing) = by_title.get_mut(&key) {
777                existing.severity = existing.severity.max(finding.severity);
778
779                if finding.description.len() > existing.description.len() {
780                    existing.category = finding.category;
781                    existing.subcategory = finding.subcategory.clone();
782                    existing.description = finding.description.clone();
783                }
784
785                if finding.root_cause.len() > existing.root_cause.len() {
786                    existing.root_cause = finding.root_cause.clone();
787                }
788
789                if finding.patterns.len() > existing.patterns.len() {
790                    existing.patterns = finding.patterns.clone();
791                }
792
793                if finding.exploits.len() > existing.exploits.len() {
794                    existing.exploits = finding.exploits.clone();
795                }
796            } else {
797                by_title.insert(key, finding);
798            }
799        }
800
801        by_title.into_values().collect()
802    }
803
804    fn canonicalize_finding(mut finding: ExtractedFinding) -> Result<ExtractedFinding> {
805        finding.title = finding.title.trim().to_string();
806        finding.root_cause = finding.root_cause.trim().to_string();
807        finding.description = finding.description.trim().to_string();
808        finding.patterns = finding.patterns.trim().to_string();
809        finding.exploits = finding.exploits.trim().to_string();
810
811        let Some(entry) = resolve_taxonomy_entry(finding.category, &finding.subcategory) else {
812            return Err(KgError::other(format!(
813                "Unknown vulnerability subcategory '{}' for category '{}'",
814                finding.subcategory, finding.category
815            )));
816        };
817
818        finding.subcategory = entry.subcategory.to_string();
819        Ok(finding)
820    }
821}
822
823impl ProjectData {
824    /// Merge newly-extracted semantics against the historical KB. Chunks
825    /// the existing canonicals (with their merged-away raw children
826    /// rendered alongside, so the LLM's `updated_*` generalizations can
827    /// take prior merges into account) by token budget, runs one merge
828    /// agent per chunk in parallel via [`SemanticMerger`], and unions the
829    /// per-chunk decisions into the final per-raw [`MergeAction`].
830    ///
831    /// A new raw can merge into multiple canonicals; that's encoded in
832    /// `MergeAction::Merge { target_ids, .. }`.
833    async fn merge_with_existing(
834        &self,
835        db: &HistoricalDatabase,
836        llm: &LLM,
837        extract: &ExtractResult,
838        agent_options: &AgentRunOptions,
839        merge_chunking: MergeChunkingOptions,
840    ) -> Result<Vec<MergeResult>> {
841        if extract.semantics.is_empty() {
842            return Ok(Vec::new());
843        }
844        let semantic_categories: Vec<DeFiCategory> = extract
845            .semantics
846            .iter()
847            .map(|sem| sem.category)
848            .unique()
849            .collect();
850        let candidates = db
851            .canonical_semantics_with_children_for_categories(&semantic_categories)
852            .await?;
853        let merger = SemanticMerger {
854            new_semantics: extract.semantics.clone(),
855            candidates,
856            llm: llm.clone(),
857            agent_options: agent_options.clone(),
858            chunking: merge_chunking,
859            cache_key_root: self.merge_cache_key(),
860            debug_key_root: self.debug_key("merge"),
861            label_root: format!("semantic-merge-{}", self.display_id()),
862        };
863        let aggregated = merger.run().await?;
864        Ok(Self::semantic_merge_results_from_aggregated(
865            &extract.semantics,
866            aggregated,
867        ))
868    }
869
870    /// Convert the orchestrator's `(name → AggregatedSemanticMergeDecision)`
871    /// list into the persistence layer's `MergeResult` (one per raw, in the
872    /// same order as `extracted`).
873    fn semantic_merge_results_from_aggregated(
874        extracted: &[ExtractedSemantic],
875        aggregated: Vec<AggregatedSemanticMergeDecision>,
876    ) -> Vec<MergeResult> {
877        let by_name: HashMap<String, AggregatedSemanticMergeDecision> = aggregated
878            .into_iter()
879            .map(|d| (d.new_semantic_name.to_lowercase(), d))
880            .collect();
881        extracted
882            .iter()
883            .map(|sem| {
884                let action = match by_name.get(&sem.name.to_lowercase()) {
885                    Some(d) if !d.merge_target_ids.is_empty() => MergeAction::Merge {
886                        target_ids: d.merge_target_ids.clone(),
887                        appended_description: d.appended_description.clone(),
888                    },
889                    _ => MergeAction::New,
890                };
891                MergeResult {
892                    semantic: sem.clone(),
893                    action,
894                }
895            })
896            .collect()
897    }
898
899    /// Merge newly-extracted findings against the historical KB. Same
900    /// chunked + parallel design as [`Self::merge_with_existing`]; uses
901    /// [`FindingMerger`] under the hood. Multi-target merges supported.
902    async fn merge_findings_with_existing(
903        &self,
904        db: &HistoricalDatabase,
905        llm: &LLM,
906        extract: &ExtractResult,
907        agent_options: &AgentRunOptions,
908        merge_chunking: MergeChunkingOptions,
909    ) -> Result<Vec<FindingMergeResult>> {
910        if extract.findings.is_empty() {
911            return Ok(Vec::new());
912        }
913        let finding_categories: Vec<VulnerabilityCategory> = extract
914            .findings
915            .iter()
916            .map(|finding| finding.category)
917            .unique()
918            .collect();
919        let candidates = db
920            .canonical_findings_with_children_for_categories(&finding_categories)
921            .await?;
922        let merger = FindingMerger {
923            new_findings: extract.findings.clone(),
924            candidates,
925            llm: llm.clone(),
926            agent_options: agent_options.clone(),
927            chunking: merge_chunking,
928            cache_key_root: self.finding_merge_cache_key(),
929            debug_key_root: self.debug_key("finding-merge"),
930            label_root: format!("finding-merge-{}", self.display_id()),
931        };
932        let aggregated = merger.run().await?;
933        Ok(Self::finding_merge_results_from_aggregated(
934            &extract.findings,
935            aggregated,
936        ))
937    }
938
939    fn finding_merge_results_from_aggregated(
940        extracted: &[ExtractedFinding],
941        aggregated: Vec<AggregatedFindingMergeDecision>,
942    ) -> Vec<FindingMergeResult> {
943        let by_title: HashMap<String, AggregatedFindingMergeDecision> = aggregated
944            .into_iter()
945            .map(|d| (d.new_finding_title.to_lowercase(), d))
946            .collect();
947        extracted
948            .iter()
949            .map(|finding| {
950                let action = match by_title.get(&finding.title.to_lowercase()) {
951                    Some(d) if !d.merge_target_ids.is_empty() => FindingMergeAction::Merge {
952                        target_ids: d.merge_target_ids.clone(),
953                        appended_description: d.appended_description.clone(),
954                        appended_patterns: d.appended_patterns.clone(),
955                        appended_exploits: d.appended_exploits.clone(),
956                    },
957                    _ => FindingMergeAction::New,
958                };
959                FindingMergeResult {
960                    finding: finding.clone(),
961                    action,
962                }
963            })
964            .collect()
965    }
966}
967
968// ── Token counting utilities ────────────────────────────────────────
969
970pub(crate) fn count_tokens(model: &OpenAIModel, text: &str) -> usize {
971    tokenizer::count_tokens_for_model(model.model_id_str(), text).unwrap_or(text.len() / 4)
972}
973
974pub(crate) fn get_context_budget(model: &OpenAIModel) -> usize {
975    (model.config.max_input() as f64 * 0.8) as _
976}
977
978pub(crate) fn sanitize_prompt_prefix(value: &str) -> String {
979    let mut out = String::new();
980    let mut last_was_dash = false;
981
982    for ch in value.chars() {
983        if ch.is_ascii_alphanumeric() {
984            out.push(ch.to_ascii_lowercase());
985            last_was_dash = false;
986        } else if !last_was_dash {
987            out.push('-');
988            last_was_dash = true;
989        }
990
991        if out.len() >= 48 {
992            break;
993        }
994    }
995
996    let trimmed = out.trim_matches('-');
997    if trimmed.is_empty() {
998        "project".to_string()
999    } else {
1000        trimmed.to_string()
1001    }
1002}
1003
1004// Merge-response validation lived here in the JSON-mode era: the prompt
1005// returned a flat array of decisions which we cross-checked against the
1006// existing canonical id set. With the agent-tool form, validation is
1007// inlined into `project_*_merge_results`: decisions referencing an
1008// unknown id are downgraded to `New` (and logged) rather than rejected.