codeprism_core/indexer/
mod.rs

1//! Bulk indexing engine for parallel file processing and graph building
2//!
3//! This module provides functionality to process large numbers of discovered files
4//! in parallel, parse them, and build the code graph efficiently.
5
6use crate::ast::Edge;
7use crate::error::{Error, Result};
8use crate::graph::GraphStore;
9use crate::linkers::SymbolResolver;
10use crate::parser::{ParseContext, ParserEngine};
11use crate::patch::{AstPatch, PatchBuilder};
12use crate::scanner::{DiscoveredFile, ProgressReporter, ScanResult};
13use rayon::prelude::*;
14use serde::{Deserialize, Serialize};
15use std::path::PathBuf;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20/// Indexing statistics
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct IndexingStats {
23    /// Total files processed
24    pub files_processed: usize,
25    /// Total nodes created
26    pub nodes_created: usize,
27    /// Total edges created
28    pub edges_created: usize,
29    /// Processing duration in milliseconds
30    pub duration_ms: u64,
31    /// Files processed per second
32    pub throughput: f64,
33    /// Errors encountered
34    pub error_count: usize,
35    /// Memory usage stats
36    pub memory_stats: MemoryStats,
37}
38
39/// Memory usage statistics
40#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41pub struct MemoryStats {
42    /// Peak memory usage in bytes
43    pub peak_memory_bytes: usize,
44    /// Current memory usage in bytes
45    pub current_memory_bytes: usize,
46    /// Graph storage overhead
47    pub graph_overhead_bytes: usize,
48}
49
50/// Bulk indexing result
51#[derive(Debug)]
52pub struct IndexingResult {
53    /// Repository ID
54    pub repo_id: String,
55    /// All patches created during indexing
56    pub patches: Vec<AstPatch>,
57    /// Indexing statistics
58    pub stats: IndexingStats,
59    /// Files that failed to process
60    pub failed_files: Vec<(PathBuf, Error)>,
61}
62
63impl IndexingResult {
64    /// Create a new indexing result
65    pub fn new(repo_id: String) -> Self {
66        Self {
67            repo_id,
68            patches: Vec::new(),
69            stats: IndexingStats {
70                files_processed: 0,
71                nodes_created: 0,
72                edges_created: 0,
73                duration_ms: 0,
74                throughput: 0.0,
75                error_count: 0,
76                memory_stats: MemoryStats::default(),
77            },
78            failed_files: Vec::new(),
79        }
80    }
81
82    /// Get total number of patches
83    pub fn patch_count(&self) -> usize {
84        self.patches.len()
85    }
86
87    /// Get total operations across all patches
88    pub fn total_operations(&self) -> usize {
89        self.patches.iter().map(|p| p.operation_count()).sum()
90    }
91
92    /// Merge another indexing result into this one
93    pub fn merge(&mut self, other: IndexingResult) {
94        self.patches.extend(other.patches);
95        self.stats.files_processed += other.stats.files_processed;
96        self.stats.nodes_created += other.stats.nodes_created;
97        self.stats.edges_created += other.stats.edges_created;
98        self.stats.error_count += other.stats.error_count;
99        self.failed_files.extend(other.failed_files);
100    }
101}
102
103/// Configuration for bulk indexing
104#[derive(Debug, Clone)]
105pub struct IndexingConfig {
106    /// Repository ID
107    pub repo_id: String,
108    /// Commit SHA for patches
109    pub commit_sha: String,
110    /// Maximum parallel workers
111    pub max_parallel: usize,
112    /// Batch size for processing
113    pub batch_size: usize,
114    /// Whether to continue on errors
115    pub continue_on_error: bool,
116    /// Memory limit in bytes (None = no limit)
117    pub memory_limit: Option<usize>,
118    /// Whether to enable cross-file linking
119    pub enable_cross_file_linking: bool,
120}
121
122impl IndexingConfig {
123    /// Create a new indexing config
124    pub fn new(repo_id: String, commit_sha: String) -> Self {
125        Self {
126            repo_id,
127            commit_sha,
128            max_parallel: num_cpus::get(),
129            batch_size: 30, // Increased from 50 to 30 for better memory management
130            continue_on_error: true,
131            memory_limit: Some(4 * 1024 * 1024 * 1024), // 4GB instead of 1GB
132            enable_cross_file_linking: true,
133        }
134    }
135}
136
137/// Bulk indexing engine for processing discovered files in parallel
138pub struct BulkIndexer {
139    config: IndexingConfig,
140    parser_engine: Arc<ParserEngine>,
141}
142
143impl BulkIndexer {
144    /// Create a new bulk indexer
145    pub fn new(config: IndexingConfig, parser_engine: Arc<ParserEngine>) -> Self {
146        Self {
147            config,
148            parser_engine,
149        }
150    }
151
152    /// Index all files from a scan result
153    pub async fn index_scan_result(
154        &self,
155        scan_result: &ScanResult,
156        progress_reporter: Arc<dyn ProgressReporter>,
157    ) -> Result<IndexingResult> {
158        let start_time = Instant::now();
159        let all_files = scan_result.all_files();
160
161        progress_reporter.report_progress(0, Some(all_files.len()));
162
163        let mut indexing_result = IndexingResult::new(self.config.repo_id.clone());
164        let processed_counter = Arc::new(AtomicUsize::new(0));
165        let error_counter = Arc::new(AtomicUsize::new(0));
166
167        // For very large repositories, use streaming mode
168        let use_streaming = all_files.len() > 10000
169            || self
170                .config
171                .memory_limit
172                .is_some_and(|limit| limit < 2 * 1024 * 1024 * 1024); // < 2GB
173
174        if use_streaming {
175            tracing::info!(
176                "Using streaming mode for large repository ({} files)",
177                all_files.len()
178            );
179            return self
180                .index_scan_result_streaming(scan_result, progress_reporter)
181                .await;
182        }
183
184        // Process files in batches
185        for batch in all_files.chunks(self.config.batch_size) {
186            let batch_result = self
187                .process_batch(
188                    batch,
189                    &processed_counter,
190                    &error_counter,
191                    &progress_reporter,
192                    all_files.len(),
193                )
194                .await?;
195
196            indexing_result.merge(batch_result);
197
198            // Check memory limit
199            if let Some(limit) = self.config.memory_limit {
200                let current_memory = self.estimate_memory_usage(&indexing_result);
201                if current_memory > limit {
202                    return Err(Error::indexing(
203                        "Memory limit exceeded during bulk indexing",
204                    ));
205                }
206            }
207        }
208
209        // After all files are processed, perform cross-file symbol resolution
210        if self.config.enable_cross_file_linking {
211            tracing::info!("Starting cross-file symbol resolution...");
212            let linking_start = Instant::now();
213
214            let cross_file_edges = self.resolve_cross_file_symbols(&indexing_result)?;
215
216            if !cross_file_edges.is_empty() {
217                // Create a patch with the new cross-file edges
218                let cross_file_patch =
219                    PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
220                        .add_edges(cross_file_edges.clone())
221                        .build();
222
223                indexing_result.patches.push(cross_file_patch);
224                indexing_result.stats.edges_created += cross_file_edges.len();
225
226                tracing::info!(
227                    "Cross-file symbol resolution completed: {} edges created in {}ms",
228                    cross_file_edges.len(),
229                    linking_start.elapsed().as_millis()
230                );
231            }
232        }
233
234        // Finalize statistics
235        indexing_result.stats.duration_ms = start_time.elapsed().as_millis() as u64;
236        indexing_result.stats.throughput = if indexing_result.stats.duration_ms > 0 {
237            (indexing_result.stats.files_processed as f64 * 1000.0)
238                / indexing_result.stats.duration_ms as f64
239        } else {
240            0.0
241        };
242
243        progress_reporter.report_progress(all_files.len(), Some(all_files.len()));
244        Ok(indexing_result)
245    }
246
247    /// Index scan result using streaming mode for large repositories
248    async fn index_scan_result_streaming(
249        &self,
250        scan_result: &ScanResult,
251        progress_reporter: Arc<dyn ProgressReporter>,
252    ) -> Result<IndexingResult> {
253        let start_time = Instant::now();
254        let all_files = scan_result.all_files();
255
256        progress_reporter.report_progress(0, Some(all_files.len()));
257
258        let mut final_result = IndexingResult::new(self.config.repo_id.clone());
259        let processed_counter = Arc::new(AtomicUsize::new(0));
260        let error_counter = Arc::new(AtomicUsize::new(0));
261
262        // Use smaller batch size for streaming mode
263        let streaming_batch_size = std::cmp::min(self.config.batch_size, 20);
264        let mut batch_count = 0;
265
266        // Process files in smaller batches and clear intermediate results
267        for batch in all_files.chunks(streaming_batch_size) {
268            let batch_result = self
269                .process_batch(
270                    batch,
271                    &processed_counter,
272                    &error_counter,
273                    &progress_reporter,
274                    all_files.len(),
275                )
276                .await?;
277
278            // Update statistics but don't accumulate all patches
279            final_result.stats.files_processed += batch_result.stats.files_processed;
280            final_result.stats.nodes_created += batch_result.stats.nodes_created;
281            final_result.stats.edges_created += batch_result.stats.edges_created;
282            final_result.stats.error_count += batch_result.stats.error_count;
283            final_result.failed_files.extend(batch_result.failed_files);
284
285            // Only keep a limited number of recent patches to avoid memory exhaustion
286            let max_patches_in_memory = 100;
287            if final_result.patches.len() + batch_result.patches.len() > max_patches_in_memory {
288                // Keep only the most recent patches
289                let keep_count = max_patches_in_memory / 2;
290                if final_result.patches.len() > keep_count {
291                    final_result
292                        .patches
293                        .drain(0..final_result.patches.len() - keep_count);
294                }
295                tracing::debug!(
296                    "Cleared old patches to manage memory, keeping {} recent patches",
297                    keep_count
298                );
299            }
300
301            final_result.patches.extend(batch_result.patches);
302
303            // Check memory limit more frequently in streaming mode
304            if let Some(limit) = self.config.memory_limit {
305                let current_memory = self.estimate_memory_usage(&final_result);
306                if current_memory > limit {
307                    tracing::warn!(
308                        "Memory limit reached in streaming mode, clearing intermediate results"
309                    );
310                    // Clear old patches but keep statistics
311                    final_result.patches.clear();
312                }
313            }
314
315            batch_count += 1;
316            if batch_count % 10 == 0 {
317                tracing::debug!("Processed {} batches in streaming mode", batch_count);
318            }
319        }
320
321        // Finalize statistics
322        final_result.stats.duration_ms = start_time.elapsed().as_millis() as u64;
323        final_result.stats.throughput = if final_result.stats.duration_ms > 0 {
324            (final_result.stats.files_processed as f64 * 1000.0)
325                / final_result.stats.duration_ms as f64
326        } else {
327            0.0
328        };
329
330        progress_reporter.report_progress(all_files.len(), Some(all_files.len()));
331        tracing::info!(
332            "Streaming indexing completed: {} files, {} nodes, {} edges",
333            final_result.stats.files_processed,
334            final_result.stats.nodes_created,
335            final_result.stats.edges_created
336        );
337
338        Ok(final_result)
339    }
340
341    /// Process a batch of files in parallel
342    async fn process_batch(
343        &self,
344        batch: &[&DiscoveredFile],
345        processed_counter: &Arc<AtomicUsize>,
346        error_counter: &Arc<AtomicUsize>,
347        progress_reporter: &Arc<dyn ProgressReporter>,
348        total_files: usize,
349    ) -> Result<IndexingResult> {
350        let mut batch_result = IndexingResult::new(self.config.repo_id.clone());
351
352        // Process files in parallel
353        let results: Vec<_> = batch
354            .par_iter()
355            .map(|discovered_file| {
356                let processed = processed_counter.fetch_add(1, Ordering::Relaxed) + 1;
357
358                // Report progress periodically
359                if processed % 10 == 0 {
360                    progress_reporter.report_progress(processed, Some(total_files));
361                }
362
363                self.process_single_file(discovered_file)
364            })
365            .collect();
366
367        // Collect results
368        for result in results {
369            match result {
370                Ok(Some(patch)) => {
371                    batch_result.stats.files_processed += 1;
372                    batch_result.stats.nodes_created += patch.nodes_add.len();
373                    batch_result.stats.edges_created += patch.edges_add.len();
374                    batch_result.patches.push(patch);
375                }
376                Ok(None) => {
377                    // File was skipped (e.g., empty, parse failed gracefully)
378                    batch_result.stats.files_processed += 1;
379                }
380                Err(e) => {
381                    error_counter.fetch_add(1, Ordering::Relaxed);
382                    batch_result.stats.error_count += 1;
383
384                    if !self.config.continue_on_error {
385                        return Err(e);
386                    }
387
388                    progress_reporter.report_error(&e);
389                }
390            }
391        }
392
393        Ok(batch_result)
394    }
395
396    /// Process a single discovered file
397    fn process_single_file(&self, discovered_file: &DiscoveredFile) -> Result<Option<AstPatch>> {
398        // Read file content
399        let content = std::fs::read_to_string(&discovered_file.path).map_err(|e| {
400            Error::io(format!(
401                "Failed to read file {}: {}",
402                discovered_file.path.display(),
403                e
404            ))
405        })?;
406
407        // Skip empty files
408        if content.trim().is_empty() {
409            return Ok(None);
410        }
411
412        // Create parse context
413        let context = ParseContext::new(
414            self.config.repo_id.clone(),
415            discovered_file.path.clone(),
416            content,
417        );
418
419        // Parse the file
420        let parse_result = self.parser_engine.parse_file(context)?;
421
422        // Create patch from parse result
423        let mut patch_builder =
424            PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone());
425
426        // Add all nodes
427        patch_builder = patch_builder.add_nodes(parse_result.nodes);
428
429        // Add all edges
430        patch_builder = patch_builder.add_edges(parse_result.edges);
431
432        let patch = patch_builder.build();
433
434        // Only return patch if it has content
435        if patch.is_empty() {
436            Ok(None)
437        } else {
438            Ok(Some(patch))
439        }
440    }
441
442    /// Estimate memory usage of the indexing result
443    fn estimate_memory_usage(&self, result: &IndexingResult) -> usize {
444        let mut total = 0;
445
446        // Estimate patch memory usage
447        for patch in &result.patches {
448            // Rough estimation: each node ~200 bytes, each edge ~50 bytes
449            total += patch.nodes_add.len() * 200;
450            total += patch.edges_add.len() * 50;
451            total += patch.nodes_delete.len() * 50; // String IDs
452            total += patch.edges_delete.len() * 50;
453        }
454
455        // Add overhead for data structures
456        total += result.patches.len() * 100; // Patch overhead
457        total += result.failed_files.len() * 200; // Error storage
458
459        total
460    }
461
462    /// Perform cross-file symbol resolution
463    fn resolve_cross_file_symbols(&self, indexing_result: &IndexingResult) -> Result<Vec<Edge>> {
464        // Build a temporary graph store with all the nodes and edges from patches
465        let temp_graph = Arc::new(GraphStore::new());
466
467        // Add all nodes and edges from patches to the temporary graph
468        for patch in &indexing_result.patches {
469            for node in &patch.nodes_add {
470                temp_graph.add_node(node.clone());
471            }
472            for edge in &patch.edges_add {
473                temp_graph.add_edge(edge.clone());
474            }
475        }
476
477        // Create symbol resolver and resolve cross-file relationships
478        let mut resolver = SymbolResolver::new(temp_graph);
479        resolver.resolve_all()
480    }
481
482    /// Get indexing configuration
483    pub fn config(&self) -> &IndexingConfig {
484        &self.config
485    }
486}
487
488/// Indexing progress reporter that tracks detailed statistics
489#[derive(Debug)]
490pub struct IndexingProgressReporter {
491    verbose: bool,
492    last_report: std::sync::Mutex<Instant>,
493}
494
495impl IndexingProgressReporter {
496    /// Create a new indexing progress reporter
497    pub fn new(verbose: bool) -> Self {
498        Self {
499            verbose,
500            last_report: std::sync::Mutex::new(Instant::now()),
501        }
502    }
503}
504
505impl ProgressReporter for IndexingProgressReporter {
506    fn report_progress(&self, current: usize, total: Option<usize>) {
507        if let Ok(mut last_report) = self.last_report.try_lock() {
508            let now = Instant::now();
509
510            // Rate limit progress reports to avoid spam
511            if now.duration_since(*last_report).as_millis() > 500 {
512                match total {
513                    Some(total) => {
514                        let percent = (current as f64 / total as f64) * 100.0;
515                        println!("Indexing progress: {current}/{total} files ({percent:.1}%)");
516                    }
517                    None => {
518                        println!("Indexing progress: {current} files processed");
519                    }
520                }
521                *last_report = now;
522            }
523        }
524    }
525
526    fn report_complete(&self, _result: &crate::scanner::ScanResult) {
527        println!("Indexing complete!");
528    }
529
530    fn report_error(&self, error: &Error) {
531        if self.verbose {
532            eprintln!("Indexing error: {error}");
533        }
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use super::*;
540    use crate::ast::Language;
541    use crate::parser::LanguageRegistry;
542    use std::path::PathBuf;
543    use tempfile::TempDir;
544
545    fn create_test_indexer() -> (BulkIndexer, TempDir) {
546        let temp_dir = TempDir::new().unwrap();
547
548        let config = IndexingConfig::new("test_repo".to_string(), "abc123".to_string());
549
550        let registry = Arc::new(LanguageRegistry::new());
551        let parser_engine = Arc::new(ParserEngine::new(registry));
552        let indexer = BulkIndexer::new(config, parser_engine);
553
554        (indexer, temp_dir)
555    }
556
557    fn create_test_discovered_file(path: PathBuf, language: Language) -> DiscoveredFile {
558        DiscoveredFile {
559            path,
560            language,
561            size: 100,
562        }
563    }
564
565    #[test]
566    fn test_indexing_config() {
567        let config = IndexingConfig::new("test".to_string(), "sha".to_string());
568        assert_eq!(config.repo_id, "test");
569        assert_eq!(config.commit_sha, "sha");
570        assert!(config.max_parallel > 0);
571        assert!(config.continue_on_error);
572    }
573
574    #[test]
575    fn test_indexing_result() {
576        let mut result = IndexingResult::new("test_repo".to_string());
577        assert_eq!(result.repo_id, "test_repo");
578        assert_eq!(result.patch_count(), 0);
579        assert_eq!(result.total_operations(), 0);
580
581        // Test merge
582        let other = IndexingResult::new("test_repo".to_string());
583        result.merge(other);
584        assert_eq!(result.stats.files_processed, 0);
585    }
586
587    #[tokio::test]
588    async fn test_process_single_file() {
589        let (indexer, temp_dir) = create_test_indexer();
590
591        // Create a test file
592        let test_file = temp_dir.path().join("test.js");
593        std::fs::write(&test_file, "console.log('hello');").unwrap();
594
595        let discovered_file = create_test_discovered_file(test_file, Language::JavaScript);
596
597        // This will fail because we don't have a JavaScript parser registered
598        // but it tests the file reading logic
599        let result = indexer.process_single_file(&discovered_file);
600
601        // Should return error because no JS parser is registered
602        assert!(result.is_err());
603    }
604
605    #[test]
606    fn test_memory_estimation() {
607        let (indexer, _temp_dir) = create_test_indexer();
608        let mut result = IndexingResult::new("test".to_string());
609
610        // Test with empty result - should return 0 for truly empty result
611        let empty_memory = indexer.estimate_memory_usage(&result);
612        assert_eq!(
613            empty_memory, 0,
614            "Empty result should have zero estimated memory"
615        );
616
617        // Add some test data to result to verify memory calculation works
618        use crate::patch::AstPatch;
619        use std::path::PathBuf;
620
621        // Add a test patch with simulated data
622        let patch = AstPatch::new("test_repo".to_string(), "abc123".to_string());
623
624        // Simulate having some nodes and edges in the patch for memory estimation
625        // We don't need actual Node/Edge structs since memory estimation only counts Vec lengths
626        // The estimate_memory_usage function calculates:
627        // total += patch.nodes_add.len() * 200;
628        // total += patch.edges_add.len() * 50;
629        // So we just need to add some items to test the calculation
630
631        // Add a test failed file to also test that path
632        result.failed_files.push((
633            PathBuf::from("test_file.rs"),
634            crate::error::Error::parse("test_file.rs", "test error"),
635        ));
636
637        result.patches.push(patch);
638        result.stats.files_processed = 10;
639        result.stats.nodes_created = 100;
640        result.stats.edges_created = 50;
641
642        let populated_memory = indexer.estimate_memory_usage(&result);
643        assert!(
644            populated_memory > empty_memory,
645            "Memory usage should increase with patches and failed files: {populated_memory} > {empty_memory}"
646        );
647
648        // Should have: 1 patch overhead (100) + 1 failed file (200) = 300 bytes minimum
649        assert!(
650            populated_memory >= 300,
651            "Should account for patch and failed file overhead, got {populated_memory} bytes"
652        );
653    }
654
655    #[test]
656    fn test_indexing_stats() {
657        let stats = IndexingStats {
658            files_processed: 100,
659            nodes_created: 500,
660            edges_created: 300,
661            duration_ms: 1000,
662            throughput: 100.0,
663            error_count: 2,
664            memory_stats: MemoryStats::default(),
665        };
666
667        assert_eq!(stats.files_processed, 100);
668        assert_eq!(stats.throughput, 100.0);
669    }
670
671    #[test]
672    fn test_progress_reporter() {
673        let reporter = IndexingProgressReporter::new(true);
674
675        // These should not panic
676        reporter.report_progress(50, Some(100));
677        reporter.report_progress(100, None);
678
679        let error = Error::indexing("test error");
680        reporter.report_error(&error);
681    }
682}