1use crate::ast::Edge;
7use crate::error::{Error, Result};
8use crate::graph::GraphStore;
9use crate::linkers::SymbolResolver;
10use crate::parser::{ParseContext, ParserEngine};
11use crate::patch::{AstPatch, PatchBuilder};
12use crate::scanner::{DiscoveredFile, ProgressReporter, ScanResult};
13use rayon::prelude::*;
14use serde::{Deserialize, Serialize};
15use std::path::PathBuf;
16use std::sync::atomic::{AtomicUsize, Ordering};
17use std::sync::Arc;
18use std::time::Instant;
19
20#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct IndexingStats {
23 pub files_processed: usize,
25 pub nodes_created: usize,
27 pub edges_created: usize,
29 pub duration_ms: u64,
31 pub throughput: f64,
33 pub error_count: usize,
35 pub memory_stats: MemoryStats,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize, Default)]
41pub struct MemoryStats {
42 pub peak_memory_bytes: usize,
44 pub current_memory_bytes: usize,
46 pub graph_overhead_bytes: usize,
48}
49
50#[derive(Debug)]
52pub struct IndexingResult {
53 pub repo_id: String,
55 pub patches: Vec<AstPatch>,
57 pub stats: IndexingStats,
59 pub failed_files: Vec<(PathBuf, Error)>,
61}
62
63impl IndexingResult {
64 pub fn new(repo_id: String) -> Self {
66 Self {
67 repo_id,
68 patches: Vec::new(),
69 stats: IndexingStats {
70 files_processed: 0,
71 nodes_created: 0,
72 edges_created: 0,
73 duration_ms: 0,
74 throughput: 0.0,
75 error_count: 0,
76 memory_stats: MemoryStats::default(),
77 },
78 failed_files: Vec::new(),
79 }
80 }
81
82 pub fn patch_count(&self) -> usize {
84 self.patches.len()
85 }
86
87 pub fn total_operations(&self) -> usize {
89 self.patches.iter().map(|p| p.operation_count()).sum()
90 }
91
92 pub fn merge(&mut self, other: IndexingResult) {
94 self.patches.extend(other.patches);
95 self.stats.files_processed += other.stats.files_processed;
96 self.stats.nodes_created += other.stats.nodes_created;
97 self.stats.edges_created += other.stats.edges_created;
98 self.stats.error_count += other.stats.error_count;
99 self.failed_files.extend(other.failed_files);
100 }
101}
102
103#[derive(Debug, Clone)]
105pub struct IndexingConfig {
106 pub repo_id: String,
108 pub commit_sha: String,
110 pub max_parallel: usize,
112 pub batch_size: usize,
114 pub continue_on_error: bool,
116 pub memory_limit: Option<usize>,
118 pub enable_cross_file_linking: bool,
120}
121
122impl IndexingConfig {
123 pub fn new(repo_id: String, commit_sha: String) -> Self {
125 Self {
126 repo_id,
127 commit_sha,
128 max_parallel: num_cpus::get(),
129 batch_size: 30, continue_on_error: true,
131 memory_limit: Some(4 * 1024 * 1024 * 1024), enable_cross_file_linking: true,
133 }
134 }
135}
136
137pub struct BulkIndexer {
139 config: IndexingConfig,
140 parser_engine: Arc<ParserEngine>,
141}
142
143impl BulkIndexer {
144 pub fn new(config: IndexingConfig, parser_engine: Arc<ParserEngine>) -> Self {
146 Self {
147 config,
148 parser_engine,
149 }
150 }
151
152 pub async fn index_scan_result(
154 &self,
155 scan_result: &ScanResult,
156 progress_reporter: Arc<dyn ProgressReporter>,
157 ) -> Result<IndexingResult> {
158 let start_time = Instant::now();
159 let all_files = scan_result.all_files();
160
161 progress_reporter.report_progress(0, Some(all_files.len()));
162
163 let mut indexing_result = IndexingResult::new(self.config.repo_id.clone());
164 let processed_counter = Arc::new(AtomicUsize::new(0));
165 let error_counter = Arc::new(AtomicUsize::new(0));
166
167 let use_streaming = all_files.len() > 10000
169 || self
170 .config
171 .memory_limit
172 .is_some_and(|limit| limit < 2 * 1024 * 1024 * 1024); if use_streaming {
175 tracing::info!(
176 "Using streaming mode for large repository ({} files)",
177 all_files.len()
178 );
179 return self
180 .index_scan_result_streaming(scan_result, progress_reporter)
181 .await;
182 }
183
184 for batch in all_files.chunks(self.config.batch_size) {
186 let batch_result = self
187 .process_batch(
188 batch,
189 &processed_counter,
190 &error_counter,
191 &progress_reporter,
192 all_files.len(),
193 )
194 .await?;
195
196 indexing_result.merge(batch_result);
197
198 if let Some(limit) = self.config.memory_limit {
200 let current_memory = self.estimate_memory_usage(&indexing_result);
201 if current_memory > limit {
202 return Err(Error::indexing(
203 "Memory limit exceeded during bulk indexing",
204 ));
205 }
206 }
207 }
208
209 if self.config.enable_cross_file_linking {
211 tracing::info!("Starting cross-file symbol resolution...");
212 let linking_start = Instant::now();
213
214 let cross_file_edges = self.resolve_cross_file_symbols(&indexing_result)?;
215
216 if !cross_file_edges.is_empty() {
217 let cross_file_patch =
219 PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone())
220 .add_edges(cross_file_edges.clone())
221 .build();
222
223 indexing_result.patches.push(cross_file_patch);
224 indexing_result.stats.edges_created += cross_file_edges.len();
225
226 tracing::info!(
227 "Cross-file symbol resolution completed: {} edges created in {}ms",
228 cross_file_edges.len(),
229 linking_start.elapsed().as_millis()
230 );
231 }
232 }
233
234 indexing_result.stats.duration_ms = start_time.elapsed().as_millis() as u64;
236 indexing_result.stats.throughput = if indexing_result.stats.duration_ms > 0 {
237 (indexing_result.stats.files_processed as f64 * 1000.0)
238 / indexing_result.stats.duration_ms as f64
239 } else {
240 0.0
241 };
242
243 progress_reporter.report_progress(all_files.len(), Some(all_files.len()));
244 Ok(indexing_result)
245 }
246
247 async fn index_scan_result_streaming(
249 &self,
250 scan_result: &ScanResult,
251 progress_reporter: Arc<dyn ProgressReporter>,
252 ) -> Result<IndexingResult> {
253 let start_time = Instant::now();
254 let all_files = scan_result.all_files();
255
256 progress_reporter.report_progress(0, Some(all_files.len()));
257
258 let mut final_result = IndexingResult::new(self.config.repo_id.clone());
259 let processed_counter = Arc::new(AtomicUsize::new(0));
260 let error_counter = Arc::new(AtomicUsize::new(0));
261
262 let streaming_batch_size = std::cmp::min(self.config.batch_size, 20);
264 let mut batch_count = 0;
265
266 for batch in all_files.chunks(streaming_batch_size) {
268 let batch_result = self
269 .process_batch(
270 batch,
271 &processed_counter,
272 &error_counter,
273 &progress_reporter,
274 all_files.len(),
275 )
276 .await?;
277
278 final_result.stats.files_processed += batch_result.stats.files_processed;
280 final_result.stats.nodes_created += batch_result.stats.nodes_created;
281 final_result.stats.edges_created += batch_result.stats.edges_created;
282 final_result.stats.error_count += batch_result.stats.error_count;
283 final_result.failed_files.extend(batch_result.failed_files);
284
285 let max_patches_in_memory = 100;
287 if final_result.patches.len() + batch_result.patches.len() > max_patches_in_memory {
288 let keep_count = max_patches_in_memory / 2;
290 if final_result.patches.len() > keep_count {
291 final_result
292 .patches
293 .drain(0..final_result.patches.len() - keep_count);
294 }
295 tracing::debug!(
296 "Cleared old patches to manage memory, keeping {} recent patches",
297 keep_count
298 );
299 }
300
301 final_result.patches.extend(batch_result.patches);
302
303 if let Some(limit) = self.config.memory_limit {
305 let current_memory = self.estimate_memory_usage(&final_result);
306 if current_memory > limit {
307 tracing::warn!(
308 "Memory limit reached in streaming mode, clearing intermediate results"
309 );
310 final_result.patches.clear();
312 }
313 }
314
315 batch_count += 1;
316 if batch_count % 10 == 0 {
317 tracing::debug!("Processed {} batches in streaming mode", batch_count);
318 }
319 }
320
321 final_result.stats.duration_ms = start_time.elapsed().as_millis() as u64;
323 final_result.stats.throughput = if final_result.stats.duration_ms > 0 {
324 (final_result.stats.files_processed as f64 * 1000.0)
325 / final_result.stats.duration_ms as f64
326 } else {
327 0.0
328 };
329
330 progress_reporter.report_progress(all_files.len(), Some(all_files.len()));
331 tracing::info!(
332 "Streaming indexing completed: {} files, {} nodes, {} edges",
333 final_result.stats.files_processed,
334 final_result.stats.nodes_created,
335 final_result.stats.edges_created
336 );
337
338 Ok(final_result)
339 }
340
341 async fn process_batch(
343 &self,
344 batch: &[&DiscoveredFile],
345 processed_counter: &Arc<AtomicUsize>,
346 error_counter: &Arc<AtomicUsize>,
347 progress_reporter: &Arc<dyn ProgressReporter>,
348 total_files: usize,
349 ) -> Result<IndexingResult> {
350 let mut batch_result = IndexingResult::new(self.config.repo_id.clone());
351
352 let results: Vec<_> = batch
354 .par_iter()
355 .map(|discovered_file| {
356 let processed = processed_counter.fetch_add(1, Ordering::Relaxed) + 1;
357
358 if processed % 10 == 0 {
360 progress_reporter.report_progress(processed, Some(total_files));
361 }
362
363 self.process_single_file(discovered_file)
364 })
365 .collect();
366
367 for result in results {
369 match result {
370 Ok(Some(patch)) => {
371 batch_result.stats.files_processed += 1;
372 batch_result.stats.nodes_created += patch.nodes_add.len();
373 batch_result.stats.edges_created += patch.edges_add.len();
374 batch_result.patches.push(patch);
375 }
376 Ok(None) => {
377 batch_result.stats.files_processed += 1;
379 }
380 Err(e) => {
381 error_counter.fetch_add(1, Ordering::Relaxed);
382 batch_result.stats.error_count += 1;
383
384 if !self.config.continue_on_error {
385 return Err(e);
386 }
387
388 progress_reporter.report_error(&e);
389 }
390 }
391 }
392
393 Ok(batch_result)
394 }
395
396 fn process_single_file(&self, discovered_file: &DiscoveredFile) -> Result<Option<AstPatch>> {
398 let content = std::fs::read_to_string(&discovered_file.path).map_err(|e| {
400 Error::io(format!(
401 "Failed to read file {}: {}",
402 discovered_file.path.display(),
403 e
404 ))
405 })?;
406
407 if content.trim().is_empty() {
409 return Ok(None);
410 }
411
412 let context = ParseContext::new(
414 self.config.repo_id.clone(),
415 discovered_file.path.clone(),
416 content,
417 );
418
419 let parse_result = self.parser_engine.parse_file(context)?;
421
422 let mut patch_builder =
424 PatchBuilder::new(self.config.repo_id.clone(), self.config.commit_sha.clone());
425
426 patch_builder = patch_builder.add_nodes(parse_result.nodes);
428
429 patch_builder = patch_builder.add_edges(parse_result.edges);
431
432 let patch = patch_builder.build();
433
434 if patch.is_empty() {
436 Ok(None)
437 } else {
438 Ok(Some(patch))
439 }
440 }
441
442 fn estimate_memory_usage(&self, result: &IndexingResult) -> usize {
444 let mut total = 0;
445
446 for patch in &result.patches {
448 total += patch.nodes_add.len() * 200;
450 total += patch.edges_add.len() * 50;
451 total += patch.nodes_delete.len() * 50; total += patch.edges_delete.len() * 50;
453 }
454
455 total += result.patches.len() * 100; total += result.failed_files.len() * 200; total
460 }
461
462 fn resolve_cross_file_symbols(&self, indexing_result: &IndexingResult) -> Result<Vec<Edge>> {
464 let temp_graph = Arc::new(GraphStore::new());
466
467 for patch in &indexing_result.patches {
469 for node in &patch.nodes_add {
470 temp_graph.add_node(node.clone());
471 }
472 for edge in &patch.edges_add {
473 temp_graph.add_edge(edge.clone());
474 }
475 }
476
477 let mut resolver = SymbolResolver::new(temp_graph);
479 resolver.resolve_all()
480 }
481
482 pub fn config(&self) -> &IndexingConfig {
484 &self.config
485 }
486}
487
488#[derive(Debug)]
490pub struct IndexingProgressReporter {
491 verbose: bool,
492 last_report: std::sync::Mutex<Instant>,
493}
494
495impl IndexingProgressReporter {
496 pub fn new(verbose: bool) -> Self {
498 Self {
499 verbose,
500 last_report: std::sync::Mutex::new(Instant::now()),
501 }
502 }
503}
504
505impl ProgressReporter for IndexingProgressReporter {
506 fn report_progress(&self, current: usize, total: Option<usize>) {
507 if let Ok(mut last_report) = self.last_report.try_lock() {
508 let now = Instant::now();
509
510 if now.duration_since(*last_report).as_millis() > 500 {
512 match total {
513 Some(total) => {
514 let percent = (current as f64 / total as f64) * 100.0;
515 println!("Indexing progress: {current}/{total} files ({percent:.1}%)");
516 }
517 None => {
518 println!("Indexing progress: {current} files processed");
519 }
520 }
521 *last_report = now;
522 }
523 }
524 }
525
526 fn report_complete(&self, _result: &crate::scanner::ScanResult) {
527 println!("Indexing complete!");
528 }
529
530 fn report_error(&self, error: &Error) {
531 if self.verbose {
532 eprintln!("Indexing error: {error}");
533 }
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use super::*;
540 use crate::ast::Language;
541 use crate::parser::LanguageRegistry;
542 use std::path::PathBuf;
543 use tempfile::TempDir;
544
545 fn create_test_indexer() -> (BulkIndexer, TempDir) {
546 let temp_dir = TempDir::new().unwrap();
547
548 let config = IndexingConfig::new("test_repo".to_string(), "abc123".to_string());
549
550 let registry = Arc::new(LanguageRegistry::new());
551 let parser_engine = Arc::new(ParserEngine::new(registry));
552 let indexer = BulkIndexer::new(config, parser_engine);
553
554 (indexer, temp_dir)
555 }
556
557 fn create_test_discovered_file(path: PathBuf, language: Language) -> DiscoveredFile {
558 DiscoveredFile {
559 path,
560 language,
561 size: 100,
562 }
563 }
564
565 #[test]
566 fn test_indexing_config() {
567 let config = IndexingConfig::new("test".to_string(), "sha".to_string());
568 assert_eq!(config.repo_id, "test");
569 assert_eq!(config.commit_sha, "sha");
570 assert!(config.max_parallel > 0);
571 assert!(config.continue_on_error);
572 }
573
574 #[test]
575 fn test_indexing_result() {
576 let mut result = IndexingResult::new("test_repo".to_string());
577 assert_eq!(result.repo_id, "test_repo");
578 assert_eq!(result.patch_count(), 0);
579 assert_eq!(result.total_operations(), 0);
580
581 let other = IndexingResult::new("test_repo".to_string());
583 result.merge(other);
584 assert_eq!(result.stats.files_processed, 0);
585 }
586
587 #[tokio::test]
588 async fn test_process_single_file() {
589 let (indexer, temp_dir) = create_test_indexer();
590
591 let test_file = temp_dir.path().join("test.js");
593 std::fs::write(&test_file, "console.log('hello');").unwrap();
594
595 let discovered_file = create_test_discovered_file(test_file, Language::JavaScript);
596
597 let result = indexer.process_single_file(&discovered_file);
600
601 assert!(result.is_err());
603 }
604
605 #[test]
606 fn test_memory_estimation() {
607 let (indexer, _temp_dir) = create_test_indexer();
608 let mut result = IndexingResult::new("test".to_string());
609
610 let empty_memory = indexer.estimate_memory_usage(&result);
612 assert_eq!(
613 empty_memory, 0,
614 "Empty result should have zero estimated memory"
615 );
616
617 use crate::patch::AstPatch;
619 use std::path::PathBuf;
620
621 let patch = AstPatch::new("test_repo".to_string(), "abc123".to_string());
623
624 result.failed_files.push((
633 PathBuf::from("test_file.rs"),
634 crate::error::Error::parse("test_file.rs", "test error"),
635 ));
636
637 result.patches.push(patch);
638 result.stats.files_processed = 10;
639 result.stats.nodes_created = 100;
640 result.stats.edges_created = 50;
641
642 let populated_memory = indexer.estimate_memory_usage(&result);
643 assert!(
644 populated_memory > empty_memory,
645 "Memory usage should increase with patches and failed files: {populated_memory} > {empty_memory}"
646 );
647
648 assert!(
650 populated_memory >= 300,
651 "Should account for patch and failed file overhead, got {populated_memory} bytes"
652 );
653 }
654
655 #[test]
656 fn test_indexing_stats() {
657 let stats = IndexingStats {
658 files_processed: 100,
659 nodes_created: 500,
660 edges_created: 300,
661 duration_ms: 1000,
662 throughput: 100.0,
663 error_count: 2,
664 memory_stats: MemoryStats::default(),
665 };
666
667 assert_eq!(stats.files_processed, 100);
668 assert_eq!(stats.throughput, 100.0);
669 }
670
671 #[test]
672 fn test_progress_reporter() {
673 let reporter = IndexingProgressReporter::new(true);
674
675 reporter.report_progress(50, Some(100));
677 reporter.report_progress(100, None);
678
679 let error = Error::indexing("test error");
680 reporter.report_error(&error);
681 }
682}