use crate::config::Config;
use crate::embedding::{
calculate_unique_content_hash, create_embedding_provider_from_parts,
types::parse_provider_model, EmbeddingProvider,
};
use crate::indexer::graphrag::ai::AIEnhancements;
use crate::indexer::graphrag::database::DatabaseOperations;
use crate::indexer::graphrag::relationships::RelationshipDiscovery;
use crate::indexer::graphrag::types::{CodeGraph, CodeNode, CodeRelationship};
use crate::indexer::graphrag::utils::{cosine_similarity, detect_project_root, to_relative_path};
use crate::state::SharedState;
use crate::store::{CodeBlock, Store};
use anyhow::{Context, Result};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::sync::RwLock;
pub struct GraphBuilder {
config: Config,
graph: Arc<RwLock<CodeGraph>>,
embedding_provider: Arc<Box<dyn EmbeddingProvider>>,
store: Store,
project_root: PathBuf, ai_enhancements: Option<AIEnhancements>,
quiet: bool, }
impl GraphBuilder {
pub async fn new(config: Config) -> Result<Self> {
Self::new_with_quiet(config, false).await
}
pub async fn new_with_quiet(config: Config, quiet: bool) -> Result<Self> {
let project_root = detect_project_root()?;
let model_string = &config.embedding.text_model;
let Ok((provider_type, model)) = parse_provider_model(model_string) else {
return Err(anyhow::anyhow!(
"Failed to parse provider model: {}",
model_string
));
};
let embedding_provider = Arc::new(
create_embedding_provider_from_parts(&provider_type, &model)
.await
.context("Failed to initialize embedding provider from config")?,
);
let store = Store::new().await?;
let db_ops = DatabaseOperations::new(&store);
let graph = Arc::new(RwLock::new(db_ops.load_graph(&project_root, quiet).await?));
let ai_enhancements = if config.graphrag.use_llm {
Some(AIEnhancements::new(config.clone(), quiet))
} else {
None
};
Ok(Self {
config,
graph,
embedding_provider,
store,
project_root,
ai_enhancements,
quiet,
})
}
pub async fn new_with_ai_enhancements(
config: Config,
_use_ai_enhancements: bool,
) -> Result<Self> {
Self::new(config).await
}
fn llm_enabled(&self) -> bool {
self.config.graphrag.use_llm
}
fn to_relative_path(&self, absolute_path: &str) -> Result<String> {
to_relative_path(absolute_path, &self.project_root)
}
async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>> {
self.embedding_provider.generate_embedding(text).await
}
pub async fn process_files_from_codeblocks(
&self,
code_blocks: &[CodeBlock],
state: Option<SharedState>,
) -> Result<()> {
let mut new_nodes: Vec<CodeNode> = Vec::new();
let mut pending_embeddings = Vec::new(); let mut ai_batch_queue: Vec<crate::indexer::graphrag::ai::FileForAI> = Vec::new(); let mut ai_descriptions: HashMap<String, String> = HashMap::new(); let mut processed_count = 0;
let mut skipped_count = 0;
let mut batches_processed = 0;
let mut files_to_blocks: HashMap<String, Vec<&CodeBlock>> = HashMap::new();
for block in code_blocks {
files_to_blocks
.entry(block.path.clone())
.or_default()
.push(block);
}
for (file_path, file_blocks) in files_to_blocks {
let relative_path = match self.to_relative_path(&file_path) {
Ok(path) => path,
Err(_) => {
if !self.quiet {
eprintln!("Warning: Skipping file outside project root: {}", file_path);
}
continue;
}
};
let combined_content: String = file_blocks
.iter()
.map(|b| b.content.as_str())
.collect::<Vec<_>>()
.join("\n");
let content_hash = calculate_unique_content_hash(&combined_content, &file_path);
let graph = self.graph.read().await;
let needs_processing = match graph.nodes.get(&relative_path) {
Some(existing_node) if existing_node.hash == content_hash => {
skipped_count += 1;
false
}
_ => true,
};
drop(graph);
if needs_processing {
if let Err(e) = self.store.remove_graph_nodes_by_path(&relative_path).await {
if !self.quiet {
eprintln!(
"Warning: Failed to clean up old GraphRAG data for {}: {}",
relative_path, e
);
}
}
{
let mut graph = self.graph.write().await;
if graph.nodes.remove(&relative_path).is_some() && !self.quiet {
eprintln!("🗑️ Removed stale in-memory node: {}", relative_path);
}
}
let file_name = Path::new(&file_path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown")
.to_string();
let kind = RelationshipDiscovery::determine_file_kind(&relative_path);
let language = file_blocks
.first()
.map(|b| b.language.clone())
.unwrap_or_else(|| "unknown".to_string());
let mut all_symbols = HashSet::new();
let mut all_functions = Vec::new();
let mut total_lines = 0;
for block in &file_blocks {
all_symbols.extend(block.symbols.iter().cloned());
total_lines = total_lines.max(block.end_line);
if let Ok(functions) =
RelationshipDiscovery::extract_functions_from_block(block)
{
all_functions.extend(functions);
}
}
let symbols: Vec<String> = all_symbols.into_iter().collect();
let (imports, exports) = self
.extract_imports_exports_from_file(&file_path, &language)
.await
.unwrap_or_else(|e| {
if !self.quiet {
eprintln!(
"⚠️ Import/export extraction failed for {}: {}",
relative_path, e
);
}
RelationshipDiscovery::extract_imports_exports_efficient(
&symbols,
&language,
&relative_path,
)
});
if !self.quiet && (!imports.is_empty() || !exports.is_empty()) {
eprintln!(
"📦 Found {} imports, {} exports in {}",
imports.len(),
exports.len(),
relative_path
);
if !imports.is_empty() {
eprintln!(" Imports: {:?}", imports);
}
if !exports.is_empty() {
eprintln!(" Exports: {:?}", exports);
}
}
let description = if self.llm_enabled()
&& self.should_use_ai_for_description(&symbols, total_lines as u32, &language)
{
if !self.quiet {
eprintln!(
"🤖 Collecting for AI batch: {} ({} lines, {} symbols)",
relative_path,
total_lines,
symbols.len()
);
}
let content_sample = self.build_content_sample_for_ai(&file_blocks);
let file_for_ai = crate::indexer::graphrag::ai::FileForAI {
file_id: relative_path.clone(), file_path: file_path.clone(),
language: language.clone(),
symbols: symbols.clone(),
content_sample,
function_count: symbols
.iter()
.filter(|s| {
s.contains("fn ") || s.contains("function ") || s.contains("def ")
})
.count(),
class_count: symbols
.iter()
.filter(|s| {
s.contains("class ")
|| s.contains("struct ") || s.contains("interface ")
})
.count(),
};
ai_batch_queue.push(file_for_ai);
if ai_batch_queue.len() >= self.config.graphrag.llm.ai_batch_size {
if !self.quiet {
eprintln!("🚀 Processing AI batch: {} files", ai_batch_queue.len());
}
if let Some(ref ai_enhancements) = self.ai_enhancements {
match ai_enhancements
.extract_ai_descriptions_batch(&ai_batch_queue)
.await
{
Ok(batch_descriptions) => {
for (file_path, description) in batch_descriptions {
ai_descriptions.insert(file_path, description);
}
if !self.quiet {
eprintln!(
"✅ AI batch processing completed: {} descriptions",
ai_descriptions.len()
);
}
}
Err(e) => {
if !self.quiet {
eprintln!("⚠️ AI batch processing failed: {}", e);
}
}
}
}
ai_batch_queue.clear();
if !ai_descriptions.is_empty() {
{
let mut graph = self.graph.write().await;
for (file_path, ai_description) in &ai_descriptions {
if let Some(node) = graph.nodes.get_mut(file_path) {
node.description = ai_description.clone();
}
}
}
for node in &mut new_nodes {
if let Some(ai_description) = ai_descriptions.get(&node.id) {
node.description = ai_description.clone();
}
}
}
}
RelationshipDiscovery::generate_simple_description(
&file_name,
&language,
&symbols,
total_lines as u32,
)
} else {
if !self.quiet && self.llm_enabled() {
eprintln!(
"📝 Using simple description for: {} (AI criteria not met)",
relative_path
);
}
RelationshipDiscovery::generate_simple_description(
&file_name,
&language,
&symbols,
total_lines as u32,
)
};
let summary_text =
format!("{} {} symbols: {}", file_name, language, symbols.join(" "));
pending_embeddings.push(summary_text);
let node = CodeNode {
id: relative_path.clone(),
name: file_name,
kind,
path: relative_path.clone(),
description,
symbols,
imports,
exports,
functions: all_functions,
hash: content_hash,
embedding: Vec::new(), size_lines: total_lines as u32,
language,
};
new_nodes.push(node);
processed_count += 1;
if let Some(ref state) = state {
let mut state_guard = state.write();
state_guard.status_message = format!("Processing file: {}", file_path);
}
if self.should_process_batch(&pending_embeddings) {
self.process_nodes_batch(
&mut new_nodes,
&mut pending_embeddings,
&mut batches_processed,
&ai_descriptions, )
.await?;
}
}
}
if !ai_batch_queue.is_empty() {
if !self.quiet {
eprintln!(
"🚀 Processing final AI batch: {} files",
ai_batch_queue.len()
);
}
if let Some(ref ai_enhancements) = self.ai_enhancements {
match ai_enhancements
.extract_ai_descriptions_batch(&ai_batch_queue)
.await
{
Ok(batch_descriptions) => {
for (file_path, description) in batch_descriptions {
ai_descriptions.insert(file_path, description);
}
if !self.quiet {
eprintln!(
"✅ Final batch AI processing completed: {} descriptions",
ai_descriptions.len()
);
}
}
Err(e) => {
if !self.quiet {
eprintln!("⚠️ Final batch AI processing failed: {}", e);
}
}
}
}
}
if !ai_descriptions.is_empty() {
if !self.quiet {
eprintln!(
"🔄 Updating {} nodes with AI-generated descriptions",
ai_descriptions.len()
);
}
{
let mut graph = self.graph.write().await;
for (file_path, ai_description) in &ai_descriptions {
if let Some(node) = graph.nodes.get_mut(file_path) {
node.description = ai_description.clone();
if !self.quiet {
eprintln!("✅ Updated description for: {}", file_path);
}
}
}
}
for node in &mut new_nodes {
if let Some(ai_description) = ai_descriptions.get(&node.id) {
node.description = ai_description.clone();
if !self.quiet {
eprintln!("✅ Updated pending node description for: {}", node.id);
}
}
}
if !self.quiet {
eprintln!("🎉 AI description updates complete!");
}
}
if !ai_descriptions.is_empty() && !new_nodes.is_empty() {
if !self.quiet {
eprintln!(
"🔄 Applying AI descriptions to {} pending nodes before persistence",
new_nodes.len()
);
}
for node in &mut new_nodes {
if let Some(ai_description) = ai_descriptions.get(&node.id) {
node.description = ai_description.clone();
if !self.quiet {
eprintln!("✅ Applied AI description to: {}", node.id);
}
} else {
let file_name = std::path::Path::new(&node.path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
node.description = crate::indexer::graphrag::relationships::RelationshipDiscovery::generate_simple_description(
file_name,
&node.language,
&node.symbols,
node.size_lines,
);
if !self.quiet {
eprintln!("⚠️ Fallback to simple description for: {}", node.id);
}
}
}
}
if !new_nodes.is_empty() {
self.process_nodes_batch(
&mut new_nodes,
&mut pending_embeddings,
&mut batches_processed,
&ai_descriptions, )
.await?;
}
if processed_count > 0 {
{
let graph = self.graph.read().await;
if graph.nodes.is_empty() && !self.quiet {
eprintln!("📊 Loading nodes from database for relationship discovery...");
}
}
if self.graph.read().await.nodes.is_empty() {
let db_ops = DatabaseOperations::new(&self.store);
let loaded_graph = db_ops.load_graph(&self.project_root, true).await?;
let mut graph = self.graph.write().await;
*graph = loaded_graph;
}
let all_processed_nodes = {
let graph = self.graph.read().await;
graph.nodes.values().cloned().collect::<Vec<CodeNode>>()
};
if !all_processed_nodes.is_empty() {
let relationship_batch_size = self.config.index.embeddings_batch_size * 4;
let all_relationships = if self.llm_enabled() {
self.discover_relationships_with_ai_enhancement(&all_processed_nodes)
.await?
} else {
self.discover_relationships_efficiently(&all_processed_nodes)
.await?
};
if !all_relationships.is_empty() {
let mut relationship_batches_processed = 0;
for relationship_batch in all_relationships.chunks(relationship_batch_size) {
{
let mut graph = self.graph.write().await;
graph
.relationships
.extend(relationship_batch.iter().cloned());
}
let db_ops = DatabaseOperations::new(&self.store);
db_ops
.save_graph_incremental(&[], relationship_batch)
.await?;
relationship_batches_processed += 1;
if relationship_batches_processed >= self.config.index.flush_frequency {
self.store.flush().await?;
relationship_batches_processed = 0;
}
if let Some(ref state) = state {
let mut state_guard = state.write();
state_guard.status_message = format!(
"Processing relationships: {} of {} batches completed",
(relationship_batches_processed + 1),
all_relationships.len().div_ceil(relationship_batch_size)
);
}
}
}
}
}
self.store.flush().await?;
if let Some(state) = state {
let mut state_guard = state.write();
state_guard.status_message = format!(
"GraphRAG processing complete: {} files processed ({} skipped)",
processed_count, skipped_count
);
state_guard.graphrag_blocks += processed_count;
} else if !self.quiet {
println!(
"GraphRAG: Processed {} files ({} skipped)",
processed_count, skipped_count
);
}
Ok(())
}
async fn discover_relationships_with_ai_enhancement(
&self,
new_files: &[CodeNode],
) -> Result<Vec<CodeRelationship>> {
if let Some(ref ai) = self.ai_enhancements {
let all_nodes = {
let graph = self.graph.read().await;
graph.nodes.values().cloned().collect::<Vec<CodeNode>>()
};
ai.discover_relationships_with_ai_enhancement(new_files, &all_nodes)
.await
} else {
self.discover_relationships_efficiently(new_files).await
}
}
async fn discover_relationships_efficiently(
&self,
new_files: &[CodeNode],
) -> Result<Vec<CodeRelationship>> {
let all_nodes = {
let graph = self.graph.read().await;
graph.nodes.values().cloned().collect::<Vec<CodeNode>>()
};
RelationshipDiscovery::discover_relationships_efficiently(new_files, &all_nodes).await
}
fn should_use_ai_for_description(
&self,
symbols: &[String],
lines: u32,
language: &str,
) -> bool {
if let Some(ref ai) = self.ai_enhancements {
ai.should_use_ai_for_description(symbols, lines, language)
} else {
false
}
}
fn build_content_sample_for_ai(&self, file_blocks: &[&CodeBlock]) -> String {
if let Some(ref ai) = self.ai_enhancements {
ai.build_content_sample_for_ai(file_blocks)
} else {
String::new()
}
}
pub async fn process_code_blocks(
&self,
code_blocks: &[CodeBlock],
state: Option<SharedState>,
) -> Result<()> {
self.process_files_from_codeblocks(code_blocks, state).await
}
pub async fn build_from_existing_database(&self, state: Option<SharedState>) -> Result<()> {
if let Some(ref state) = state {
let mut state_guard = state.write();
state_guard.status_message = "Building GraphRAG from existing database...".to_string();
}
if let Err(e) = self.store.clear_graph_nodes().await {
if !self.quiet {
eprintln!("Warning: Failed to clear existing graph nodes: {}", e);
}
tracing::warn!(
error = %e,
"Failed to clear existing GraphRAG nodes"
);
}
if let Err(e) = self.store.clear_graph_relationships().await {
if !self.quiet {
eprintln!(
"Warning: Failed to clear existing graph relationships: {}",
e
);
}
tracing::warn!(
error = %e,
"Failed to clear existing GraphRAG relationships"
);
}
{
let mut graph = self.graph.write().await;
graph.nodes.clear();
graph.relationships.clear();
}
let all_code_blocks = self.store.get_all_code_blocks_for_graphrag().await?;
if all_code_blocks.is_empty() {
if let Some(ref state) = state {
let mut state_guard = state.write();
state_guard.status_message =
"No code blocks found in database for GraphRAG".to_string();
}
return Ok(());
}
if let Some(ref state) = state {
let mut state_guard = state.write();
state_guard.status_message = format!(
"Processing {} code blocks for GraphRAG...",
all_code_blocks.len()
);
}
self.process_files_from_codeblocks(&all_code_blocks, state.clone())
.await?;
self.store.flush().await?;
if let Some(ref state) = state {
let mut state_guard = state.write();
state_guard.status_message = format!(
"GraphRAG built from existing database: {} blocks processed",
all_code_blocks.len()
);
state_guard.graphrag_blocks += all_code_blocks.len();
} else if !self.quiet {
println!(
"GraphRAG: Built from existing database with {} code blocks",
all_code_blocks.len()
);
}
Ok(())
}
pub async fn get_graph(&self) -> Result<CodeGraph> {
let graph = self.graph.read().await;
if graph.nodes.is_empty() && graph.relationships.is_empty() {
drop(graph);
let db_ops = DatabaseOperations::new(&self.store);
let loaded_graph = db_ops
.load_graph(std::path::Path::new("."), self.quiet)
.await?;
{
let mut graph_write = self.graph.write().await;
*graph_write = loaded_graph.clone();
}
Ok(loaded_graph)
} else {
Ok(graph.clone())
}
}
pub async fn search_nodes(&self, query: &str) -> Result<Vec<CodeNode>> {
let in_memory_nodes = {
let graph = self.graph.read().await;
!graph.nodes.is_empty()
};
if in_memory_nodes {
return self.search_nodes_in_memory(query).await;
} else {
return self.search_nodes_in_database(query).await;
}
}
async fn search_nodes_in_memory(&self, query: &str) -> Result<Vec<CodeNode>> {
let query_embedding = self.generate_embedding(query).await?;
let graph = self.graph.read().await;
let nodes_array = graph.nodes.values().cloned().collect::<Vec<CodeNode>>();
drop(graph);
let mut similarities: Vec<(f32, CodeNode)> = Vec::new();
let query_lower = query.to_lowercase();
for node in nodes_array {
let similarity = cosine_similarity(&query_embedding, &node.embedding);
let name_contains = node.name.to_lowercase().contains(&query_lower);
let kind_contains = node.kind.to_lowercase().contains(&query_lower);
let desc_contains = node.description.to_lowercase().contains(&query_lower);
let symbols_contain = node
.symbols
.iter()
.any(|s| s.to_lowercase().contains(&query_lower));
if similarity > 0.5
|| name_contains
|| kind_contains
|| desc_contains
|| symbols_contain
{
let boosted_similarity = if name_contains || kind_contains || symbols_contain {
0.9_f32.max(similarity)
} else {
similarity
};
similarities.push((boosted_similarity, node));
}
}
similarities.sort_by(|a, b| b.0.partial_cmp(&a.0).unwrap_or(std::cmp::Ordering::Equal));
let results = similarities.into_iter().map(|(_, node)| node).collect();
Ok(results)
}
async fn search_nodes_in_database(&self, query: &str) -> Result<Vec<CodeNode>> {
let query_embedding = self.generate_embedding(query).await?;
let db_ops = DatabaseOperations::new(&self.store);
db_ops
.search_nodes_in_database(&query_embedding, query)
.await
}
pub async fn find_paths(
&self,
source_id: &str,
target_id: &str,
max_depth: usize,
) -> Result<Vec<Vec<String>>> {
let graph = self.graph.read().await;
if !graph.nodes.contains_key(source_id) || !graph.nodes.contains_key(target_id) {
return Ok(Vec::new());
}
let mut adjacency_list: HashMap<String, Vec<String>> = HashMap::new();
for rel in &graph.relationships {
adjacency_list
.entry(rel.source.clone())
.or_default()
.push(rel.target.clone());
}
let mut queue = Vec::new();
queue.push(vec![source_id.to_string()]);
let mut paths = Vec::new();
while let Some(path) = queue.pop() {
let current = path.last().unwrap();
if current == target_id {
paths.push(path);
continue;
}
if path.len() > max_depth {
continue;
}
if let Some(neighbors) = adjacency_list.get(current) {
for neighbor in neighbors {
if !path.contains(neighbor) {
let mut new_path = path.clone();
new_path.push(neighbor.clone());
queue.push(new_path);
}
}
}
}
Ok(paths)
}
fn should_process_batch(&self, pending_embeddings: &[String]) -> bool {
let batch_size = self.config.index.embeddings_batch_size;
let max_tokens = self.config.index.embeddings_max_tokens_per_batch;
if pending_embeddings.len() >= batch_size {
return true;
}
let total_tokens: usize = pending_embeddings.iter().map(|s| s.len() / 4).sum(); total_tokens >= max_tokens
}
async fn process_nodes_batch(
&self,
nodes: &mut Vec<CodeNode>,
pending_embeddings: &mut Vec<String>,
batches_processed: &mut usize,
ai_descriptions: &HashMap<String, String>, ) -> Result<()> {
if nodes.is_empty() || pending_embeddings.is_empty() {
return Ok(());
}
for node in nodes.iter_mut() {
if let Some(ai_description) = ai_descriptions.get(&node.id) {
node.description = ai_description.clone();
} else if node.description.starts_with("AI_PENDING:") {
let file_name = std::path::Path::new(&node.path)
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("unknown");
node.description = crate::indexer::graphrag::relationships::RelationshipDiscovery::generate_simple_description(
file_name,
&node.language,
&node.symbols,
node.size_lines,
);
}
}
let embeddings = crate::embedding::generate_embeddings_batch(
pending_embeddings.clone(),
false, &self.config,
crate::embedding::types::InputType::Document,
)
.await?;
for (node, embedding) in nodes.iter_mut().zip(embeddings.iter()) {
node.embedding = embedding.clone();
}
{
let mut graph = self.graph.write().await;
for node in nodes.iter() {
if let Some(existing_node) = graph.nodes.get(&node.id) {
if !self.quiet {
eprintln!("⚠️ Preventing duplicate node insertion: {} (existing hash: {}, new hash: {})",
node.id, existing_node.hash, node.hash);
}
if existing_node.hash != node.hash {
graph.nodes.insert(node.id.clone(), node.clone());
if !self.quiet {
eprintln!("🔄 Updated node with new content: {}", node.id);
}
}
} else {
graph.nodes.insert(node.id.clone(), node.clone());
if !self.quiet {
eprintln!("➕ Added new node: {}", node.id);
}
}
}
}
let db_ops = DatabaseOperations::new(&self.store);
db_ops.save_graph_incremental(nodes, &[]).await?;
nodes.clear();
pending_embeddings.clear();
*batches_processed += 1;
self.flush_if_needed(batches_processed).await?;
Ok(())
}
async fn flush_if_needed(&self, batches_processed: &mut usize) -> Result<()> {
if *batches_processed >= self.config.index.flush_frequency {
self.store.flush().await?;
*batches_processed = 0;
}
Ok(())
}
pub async fn extract_imports_exports_from_file(
&self,
file_path: &str,
language: &str,
) -> Result<(Vec<String>, Vec<String>)> {
use crate::indexer::languages;
use std::fs;
use tree_sitter::Parser;
let lang_impl = languages::get_language(language).ok_or_else(|| {
anyhow::anyhow!("Failed to get language implementation for: {}", language)
})?;
let contents = fs::read_to_string(file_path)?;
let mut parser = Parser::new();
parser.set_language(&lang_impl.get_ts_language())?;
let tree = parser
.parse(&contents, None)
.ok_or_else(|| anyhow::anyhow!("Failed to parse file"))?;
let mut all_imports = Vec::new();
let mut all_exports = Vec::new();
let cursor = tree.walk();
extract_imports_exports_recursive(
cursor.node(),
&contents,
lang_impl.as_ref(),
&mut all_imports,
&mut all_exports,
);
Ok((all_imports, all_exports))
}
}
fn extract_imports_exports_recursive(
node: tree_sitter::Node,
contents: &str,
lang_impl: &dyn crate::indexer::languages::Language,
all_imports: &mut Vec<String>,
all_exports: &mut Vec<String>,
) {
let (imports, exports) = lang_impl.extract_imports_exports(node, contents);
all_imports.extend(imports);
all_exports.extend(exports);
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
extract_imports_exports_recursive(child, contents, lang_impl, all_imports, all_exports);
}
}