codeprism_core/indexer/
mod.rs

1//! Bulk indexing engine for parallel file processing and graph building
2//!
3//! This module provides functionality to process large numbers of discovered files
4//! in parallel, parse them, and build the code graph efficiently.
5
6use crate::ast::Edge;
7use crate::error::{Error, Result};
8use crate::graph::GraphStore;
9use crate::linkers::SymbolResolver;
10use crate::parser::{ParseContext, ParserEngine};
11use crate::patch::{AstPatch, PatchBuilder};
12use crate::scanner::{DiscoveredFile, ProgressReporter, ScanResult};
13use rayon::prelude::*;
14use serde::{Deserialize, Serialize};
15use std::path::PathBuf;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20/// Indexing statistics
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct IndexingStats {
23    /// Total files processed
24    pub files_processed: usize,
25    /// Total nodes created
26    pub nodes_created: usize,
27    /// Total edges created
28    pub edges_created: usize,
29    /// Processing duration in milliseconds
30    pub duration_ms: u64,
31    /// Files processed per second
32    pub throughput: f64,
33    /// Errors encountered
34    pub error_count: usize,
35    /// Memory usage stats
36    pub memory_stats: MemoryStats,
37}
38
39/// Memory usage statistics
40#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41pub struct MemoryStats {
42    /// Peak memory usage in bytes
43    pub peak_memory_bytes: usize,
44    /// Current memory usage in bytes
45    pub current_memory_bytes: usize,
46    /// Graph storage overhead
47    pub graph_overhead_bytes: usize,
48}
49
50/// Bulk indexing result
51#[derive(Debug)]
52pub struct IndexingResult {
53    /// Repository ID
54    pub repo_id: String,
55    /// All patches created during indexing
56    pub patches: Vec<AstPatch>,
57    /// Indexing statistics
58    pub stats: IndexingStats,
59    /// Files that failed to process
60    pub failed_files: Vec<(PathBuf, Error)>,
61}
62
63impl IndexingResult {
64    /// Create a new indexing result
65    pub fn new(repo_id: String) -> Self {
66        Self {
67            repo_id,
68            patches: Vec::new(),
69            stats: IndexingStats {
70                files_processed: 0,
71                nodes_created: 0,
72                edges_created: 0,
73                duration_ms: 0,
74                throughput: 0.0,
75                error_count: 0,
76                memory_stats: MemoryStats::default(),
77            },
78            failed_files: Vec::new(),
79        }
80    }
81
82    /// Get total number of patches
83    pub fn patch_count(&self) -> usize {
84        self.patches.len()
85    }
86
87    /// Get total operations across all patches
88    pub fn total_operations(&self) -> usize {
89        self.patches.iter().map(|p| p.operation_count()).sum()
90    }
91
92    /// Merge another indexing result into this one
93    pub fn merge(&mut self, other: IndexingResult) {
94        self.patches.extend(other.patches);
95        self.stats.files_processed += other.stats.files_processed;
96        self.stats.nodes_created += other.stats.nodes_created;
97        self.stats.edges_created += other.stats.edges_created;
98        self.stats.error_count += other.stats.error_count;
99        self.failed_files.extend(other.failed_files);
100    }
101}
102
103/// Configuration for bulk indexing
104#[derive(Debug, Clone)]
105pub struct IndexingConfig {
106    /// Repository ID
107    pub repo_id: String,
108    /// Commit SHA for patches
109    pub commit_sha: String,
110    /// Maximum parallel workers
111    pub max_parallel: usize,
112    /// Batch size for processing
113    pub batch_size: usize,
114    /// Whether to continue on errors
115    pub continue_on_error: bool,
116    /// Memory limit in bytes (None = no limit)
117    pub memory_limit: Option<usize>,
118    /// Whether to enable cross-file linking
119    pub enable_cross_file_linking: bool,
120}
121
122impl IndexingConfig {
123    /// Create a new indexing config
124    pub fn new(repo_id: String, commit_sha: String) -> Self {
125        Self {
126            repo_id,
127            commit_sha,
128            max_parallel: num_cpus::get(),
129            batch_size: 30, // Increased from 50 to 30 for better memory management
130            continue_on_error: true,
131            memory_limit: Some(4 * 1024 * 1024 * 1024), // 4GB instead of 1GB
132            enable_cross_file_linking: true,
133        }
134    }
135}
136
137/// Bulk indexing engine for processing discovered files in parallel
138pub struct BulkIndexer {
139    config: IndexingConfig,
140    parser_engine: Arc<ParserEngine>,
141}
142
143impl BulkIndexer {
144    /// Create a new bulk indexer
145    pub fn new(config: IndexingConfig, parser_engine: Arc<ParserEngine>) -> Self {
146        Self {
147            config,
148            parser_engine,
149        }
150    }
151
152    /// Index all files from a scan result
153    pub async fn index_scan_result(
154        &self,
155        scan_result: &ScanResult,
156        progress_reporter: Arc<dyn ProgressReporter>,
157    ) -> Result<IndexingResult> {
158        let start_time = Instant::now();
159        let all_files = scan_result.all_files();
160
161        progress_reporter.report_progress(0, Some(all_files.len()));
162
163        let mut indexing_result = IndexingResult::new(self.config.repo_id.clone());
164        let processed_counter = Arc::new(AtomicUsize::new(0));
165        let error_counter = Arc::new(AtomicUsize::new(0));
166
167        // For very large repositories, use streaming mode
168        let use_streaming = all_files.len() > 10000
169            || self
170                .config
171                .memory_limit
172                .is_some_and(|limit| limit < 2 * 1024 * 1024 * 1024); // < 2GB
173
174        if use_streaming {
175            tracing::info!(
176                "Using streaming mode for large repository ({} files)",
177                all_files.len()
178            );
179            return self
180                .index_scan_result_streaming(scan_result, progress_reporter)
181                .await;
182        }
183
184        // Process files in batches
185        for batch in all_files.chunks(self.config.batch_size) {
186            let batch_result = self
187                .process_batch(
188                    batch,
189                    &processed_counter,
190                    &error_counter,
191                    &progress_reporter,
192                    all_files.len(),
193                )
194                .await?;
195
196            indexing_result.merge(batch_result);
197
198            // Check memory limit
199            if let Some(limit) = self.config.memory_limit {
200                let current_memory = self.estimate_memory_usage(&indexing_result);
201                if current_memory > limit {
202                    return Err(Error::indexing(
203                        "Memory limit exceeded during bulk indexing",
204                    ));
205                }
206            }
207        }
208
209        // After all files are processed, perform cross-file symbol resolution
210        if self.config.enable_cross_file_linking {
211            tracing::info!("Starting cross-file symbol resolution...");
212            let linking_start = Instant::now();
213
214            let cross_file_edges = self.resolve_cross_file_symbols(&indexing_result)?;
215
216            if !cross_file_edges.is_empty() {
217                // Create a patch with the new cross-file edges
218                let cross_file_patch =
219                    PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
220                        .add_edges(cross_file_edges.clone())
221                        .build();
222
223                indexing_result.patches.push(cross_file_patch);
224                indexing_result.stats.edges_created += cross_file_edges.len();
225
226                tracing::info!(
227                    "Cross-file symbol resolution completed: {} edges created in {}ms",
228                    cross_file_edges.len(),
229                    linking_start.elapsed().as_millis()
230                );
231            }
232        }
233
234        // Finalize statistics
235        indexing_result.stats.duration_ms = start_time.elapsed().as_millis() as u64;
236        indexing_result.stats.throughput = if indexing_result.stats.duration_ms > 0 {
237            (indexing_result.stats.files_processed as f64 * 1000.0)
238                / indexing_result.stats.duration_ms as f64
239        } else {
240            0.0
241        };
242
243        progress_reporter.report_progress(all_files.len(), Some(all_files.len()));
244        Ok(indexing_result)
245    }
246
247    /// Index scan result using streaming mode for large repositories
248    async fn index_scan_result_streaming(
249        &self,
250        scan_result: &ScanResult,
251        progress_reporter: Arc<dyn ProgressReporter>,
252    ) -> Result<IndexingResult> {
253        let start_time = Instant::now();
254        let all_files = scan_result.all_files();
255
256        progress_reporter.report_progress(0, Some(all_files.len()));
257
258        let mut final_result = IndexingResult::new(self.config.repo_id.clone());
259        let processed_counter = Arc::new(AtomicUsize::new(0));
260        let error_counter = Arc::new(AtomicUsize::new(0));
261
262        // Use smaller batch size for streaming mode
263        let streaming_batch_size = std::cmp::min(self.config.batch_size, 20);
264        let mut batch_count = 0;
265
266        // Process files in smaller batches and clear intermediate results
267        for batch in all_files.chunks(streaming_batch_size) {
268            let batch_result = self
269                .process_batch(
270                    batch,
271                    &processed_counter,
272                    &error_counter,
273                    &progress_reporter,
274                    all_files.len(),
275                )
276                .await?;
277
278            // Update statistics but don't accumulate all patches
279            final_result.stats.files_processed += batch_result.stats.files_processed;
280            final_result.stats.nodes_created += batch_result.stats.nodes_created;
281            final_result.stats.edges_created += batch_result.stats.edges_created;
282            final_result.stats.error_count += batch_result.stats.error_count;
283            final_result.failed_files.extend(batch_result.failed_files);
284
285            // Only keep a limited number of recent patches to avoid memory exhaustion
286            let max_patches_in_memory = 100;
287            if final_result.patches.len() + batch_result.patches.len() > max_patches_in_memory {
288                // Keep only the most recent patches
289                let keep_count = max_patches_in_memory / 2;
290                if final_result.patches.len() > keep_count {
291                    final_result
292                        .patches
293                        .drain(0..final_result.patches.len() - keep_count);
294                }
295                tracing::debug!(
296                    "Cleared old patches to manage memory, keeping {} recent patches",
297                    keep_count
298                );
299            }
300
301            final_result.patches.extend(batch_result.patches);
302
303            // Check memory limit more frequently in streaming mode
304            if let Some(limit) = self.config.memory_limit {
305                let current_memory = self.estimate_memory_usage(&final_result);
306                if current_memory > limit {
307                    tracing::warn!(
308                        "Memory limit reached in streaming mode, clearing intermediate results"
309                    );
310                    // Clear old patches but keep statistics
311                    final_result.patches.clear();
312                }
313            }
314
315            batch_count += 1;
316            if batch_count % 10 == 0 {
317                tracing::debug!("Processed {} batches in streaming mode", batch_count);
318            }
319        }
320
321        // Finalize statistics
322        final_result.stats.duration_ms = start_time.elapsed().as_millis() as u64;
323        final_result.stats.throughput = if final_result.stats.duration_ms > 0 {
324            (final_result.stats.files_processed as f64 * 1000.0)
325                / final_result.stats.duration_ms as f64
326        } else {
327            0.0
328        };
329
330        progress_reporter.report_progress(all_files.len(), Some(all_files.len()));
331        tracing::info!(
332            "Streaming indexing completed: {} files, {} nodes, {} edges",
333            final_result.stats.files_processed,
334            final_result.stats.nodes_created,
335            final_result.stats.edges_created
336        );
337
338        Ok(final_result)
339    }
340
341    /// Process a batch of files in parallel
342    async fn process_batch(
343        &self,
344        batch: &[&DiscoveredFile],
345        processed_counter: &Arc<AtomicUsize>,
346        error_counter: &Arc<AtomicUsize>,
347        progress_reporter: &Arc<dyn ProgressReporter>,
348        total_files: usize,
349    ) -> Result<IndexingResult> {
350        let mut batch_result = IndexingResult::new(self.config.repo_id.clone());
351
352        // Process files in parallel
353        let results: Vec<_> = batch
354            .par_iter()
355            .map(|discovered_file| {
356                let processed = processed_counter.fetch_add(1, Ordering::Relaxed) + 1;
357
358                // Report progress periodically
359                if processed % 10 == 0 {
360                    progress_reporter.report_progress(processed, Some(total_files));
361                }
362
363                self.process_single_file(discovered_file)
364            })
365            .collect();
366
367        // Collect results
368        for result in results {
369            match result {
370                Ok(Some(patch)) => {
371                    batch_result.stats.files_processed += 1;
372                    batch_result.stats.nodes_created += patch.nodes_add.len();
373                    batch_result.stats.edges_created += patch.edges_add.len();
374                    batch_result.patches.push(patch);
375                }
376                Ok(None) => {
377                    // File was skipped (e.g., empty, parse failed gracefully)
378                    batch_result.stats.files_processed += 1;
379                }
380                Err(e) => {
381                    error_counter.fetch_add(1, Ordering::Relaxed);
382                    batch_result.stats.error_count += 1;
383
384                    if !self.config.continue_on_error {
385                        return Err(e);
386                    }
387
388                    progress_reporter.report_error(&e);
389                }
390            }
391        }
392
393        Ok(batch_result)
394    }
395
396    /// Process a single discovered file
397    fn process_single_file(&self, discovered_file: &DiscoveredFile) -> Result<Option<AstPatch>> {
398        // Read file content
399        let content = std::fs::read_to_string(&discovered_file.path).map_err(|e| {
400            Error::io(format!(
401                "Failed to read file {}: {}",
402                discovered_file.path.display(),
403                e
404            ))
405        })?;
406
407        // Skip empty files
408        if content.trim().is_empty() {
409            return Ok(None);
410        }
411
412        // Create parse context
413        let context = ParseContext::new(
414            self.config.repo_id.clone(),
415            discovered_file.path.clone(),
416            content,
417        );
418
419        // Parse the file
420        let parse_result = self.parser_engine.parse_file(context)?;
421
422        // Create patch from parse result
423        let mut patch_builder =
424            PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone());
425
426        // Add all nodes
427        patch_builder = patch_builder.add_nodes(parse_result.nodes);
428
429        // Add all edges
430        patch_builder = patch_builder.add_edges(parse_result.edges);
431
432        let patch = patch_builder.build();
433
434        // Only return patch if it has content
435        if patch.is_empty() {
436            Ok(None)
437        } else {
438            Ok(Some(patch))
439        }
440    }
441
442    /// Estimate memory usage of the indexing result
443    fn estimate_memory_usage(&self, result: &IndexingResult) -> usize {
444        let mut total = 0;
445
446        // Estimate patch memory usage
447        for patch in &result.patches {
448            // Rough estimation: each node ~200 bytes, each edge ~50 bytes
449            total += patch.nodes_add.len() * 200;
450            total += patch.edges_add.len() * 50;
451            total += patch.nodes_delete.len() * 50; // String IDs
452            total += patch.edges_delete.len() * 50;
453        }
454
455        // Add overhead for data structures
456        total += result.patches.len() * 100; // Patch overhead
457        total += result.failed_files.len() * 200; // Error storage
458
459        total
460    }
461
462    /// Perform cross-file symbol resolution
463    fn resolve_cross_file_symbols(&self, indexing_result: &IndexingResult) -> Result<Vec<Edge>> {
464        // Build a temporary graph store with all the nodes and edges from patches
465        let temp_graph = Arc::new(GraphStore::new());
466
467        // Add all nodes and edges from patches to the temporary graph
468        for patch in &indexing_result.patches {
469            for node in &patch.nodes_add {
470                temp_graph.add_node(node.clone());
471            }
472            for edge in &patch.edges_add {
473                temp_graph.add_edge(edge.clone());
474            }
475        }
476
477        // Create symbol resolver and resolve cross-file relationships
478        let mut resolver = SymbolResolver::new(temp_graph);
479        resolver.resolve_all()
480    }
481
482    /// Get indexing configuration
483    pub fn config(&self) -> &IndexingConfig {
484        &self.config
485    }
486}
487
488/// Indexing progress reporter that tracks detailed statistics
489#[derive(Debug)]
490pub struct IndexingProgressReporter {
491    verbose: bool,
492    last_report: std::sync::Mutex<Instant>,
493}
494
495impl IndexingProgressReporter {
496    /// Create a new indexing progress reporter
497    pub fn new(verbose: bool) -> Self {
498        Self {
499            verbose,
500            last_report: std::sync::Mutex::new(Instant::now()),
501        }
502    }
503}
504
505impl ProgressReporter for IndexingProgressReporter {
506    fn report_progress(&self, current: usize, total: Option<usize>) {
507        if let Ok(mut last_report) = self.last_report.try_lock() {
508            let now = Instant::now();
509
510            // Rate limit progress reports to avoid spam
511            if now.duration_since(*last_report).as_millis() > 500 {
512                match total {
513                    Some(total) => {
514                        let percent = (current as f64 / total as f64) * 100.0;
515                        println!(
516                            "Indexing progress: {}/{} files ({:.1}%)",
517                            current, total, percent
518                        );
519                    }
520                    None => {
521                        println!("Indexing progress: {} files processed", current);
522                    }
523                }
524                *last_report = now;
525            }
526        }
527    }
528
529    fn report_complete(&self, _result: &crate::scanner::ScanResult) {
530        println!("Indexing complete!");
531    }
532
533    fn report_error(&self, error: &Error) {
534        if self.verbose {
535            eprintln!("Indexing error: {}", error);
536        }
537    }
538}
539
540#[cfg(test)]
541mod tests {
542    use super::*;
543    use crate::ast::Language;
544    use crate::parser::LanguageRegistry;
545    use std::path::PathBuf;
546    use tempfile::TempDir;
547
548    fn create_test_indexer() -> (BulkIndexer, TempDir) {
549        let temp_dir = TempDir::new().unwrap();
550
551        let config = IndexingConfig::new("test_repo".to_string(), "abc123".to_string());
552
553        let registry = Arc::new(LanguageRegistry::new());
554        let parser_engine = Arc::new(ParserEngine::new(registry));
555        let indexer = BulkIndexer::new(config, parser_engine);
556
557        (indexer, temp_dir)
558    }
559
560    fn create_test_discovered_file(path: PathBuf, language: Language) -> DiscoveredFile {
561        DiscoveredFile {
562            path,
563            language,
564            size: 100,
565        }
566    }
567
568    #[test]
569    fn test_indexing_config() {
570        let config = IndexingConfig::new("test".to_string(), "sha".to_string());
571        assert_eq!(config.repo_id, "test");
572        assert_eq!(config.commit_sha, "sha");
573        assert!(config.max_parallel > 0);
574        assert!(config.continue_on_error);
575    }
576
577    #[test]
578    fn test_indexing_result() {
579        let mut result = IndexingResult::new("test_repo".to_string());
580        assert_eq!(result.repo_id, "test_repo");
581        assert_eq!(result.patch_count(), 0);
582        assert_eq!(result.total_operations(), 0);
583
584        // Test merge
585        let other = IndexingResult::new("test_repo".to_string());
586        result.merge(other);
587        assert_eq!(result.stats.files_processed, 0);
588    }
589
590    #[tokio::test]
591    async fn test_process_single_file() {
592        let (indexer, temp_dir) = create_test_indexer();
593
594        // Create a test file
595        let test_file = temp_dir.path().join("test.js");
596        std::fs::write(&test_file, "console.log('hello');").unwrap();
597
598        let discovered_file = create_test_discovered_file(test_file, Language::JavaScript);
599
600        // This will fail because we don't have a JavaScript parser registered
601        // but it tests the file reading logic
602        let result = indexer.process_single_file(&discovered_file);
603
604        // Should return error because no JS parser is registered
605        assert!(result.is_err());
606    }
607
608    #[test]
609    fn test_memory_estimation() {
610        let (indexer, _temp_dir) = create_test_indexer();
611        let result = IndexingResult::new("test".to_string());
612
613        let _memory = indexer.estimate_memory_usage(&result);
614        // Memory usage is always >= 0 for usize type, test passes
615    }
616
617    #[test]
618    fn test_indexing_stats() {
619        let stats = IndexingStats {
620            files_processed: 100,
621            nodes_created: 500,
622            edges_created: 300,
623            duration_ms: 1000,
624            throughput: 100.0,
625            error_count: 2,
626            memory_stats: MemoryStats::default(),
627        };
628
629        assert_eq!(stats.files_processed, 100);
630        assert_eq!(stats.throughput, 100.0);
631    }
632
633    #[test]
634    fn test_progress_reporter() {
635        let reporter = IndexingProgressReporter::new(true);
636
637        // These should not panic
638        reporter.report_progress(50, Some(100));
639        reporter.report_progress(100, None);
640
641        let error = Error::indexing("test error");
642        reporter.report_error(&error);
643    }
644}