1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::thread;
4
5use anyhow::Context;
6use serde_json::{json, Value};
7use tracing::{debug, info, warn};
8
9use super::hasher::hash_files_parallel;
10use super::store::{CachedSymbol, IndexStore};
11use crate::commands::find::symbol_kind_name;
12use crate::detect::Language;
13use crate::lsp::client::{path_to_uri, LspClient};
14use crate::lsp::files::FileTracker;
15use crate::lsp::install;
16use crate::lsp::pool::probe_until_ready;
17
18#[derive(Debug, Default)]
20pub struct IndexStats {
21 pub files_total: usize,
22 pub files_indexed: usize,
23 pub files_cached: usize,
24 pub symbols_total: usize,
25}
26
27pub struct FileEntry {
29 pub abs_path: PathBuf,
30 pub rel_path: String,
31 pub hash: String,
32}
33
34pub fn plan_index(
41 store: &IndexStore,
42 project_root: &Path,
43 extensions: &[&str],
44) -> anyhow::Result<(Vec<FileEntry>, usize)> {
45 let source_files = walk_source_files(project_root, extensions)?;
46 info!("index: found {} source files", source_files.len());
47
48 let hashes = hash_files_parallel(&source_files);
50
51 let path_hashes: Vec<(String, PathBuf, String)> = hashes
53 .into_iter()
54 .map(|(abs_path, hash)| {
55 let rel_path = abs_path
56 .strip_prefix(project_root)
57 .unwrap_or(&abs_path)
58 .to_string_lossy()
59 .to_string();
60 (rel_path, abs_path, hash)
61 })
62 .collect();
63
64 let rel_paths: Vec<&str> = path_hashes.iter().map(|(r, _, _)| r.as_str()).collect();
66 let stored = store.get_file_hashes_batch(&rel_paths).unwrap_or_default();
67
68 let mut to_index = Vec::new();
69 let mut cached = 0usize;
70
71 for (rel_path, abs_path, hash) in path_hashes {
72 if stored.get(&rel_path).is_some_and(|h| *h == hash) {
73 cached += 1;
74 } else {
75 to_index.push(FileEntry {
76 abs_path,
77 rel_path,
78 hash,
79 });
80 }
81 }
82
83 Ok((to_index, cached))
84}
85
86#[must_use]
88pub fn detect_batch_size() -> usize {
89 let cpus = thread::available_parallelism()
90 .map(std::num::NonZero::get)
91 .unwrap_or(4);
92 (cpus * 4).clamp(8, 64)
94}
95
96#[must_use]
98pub fn detect_worker_count() -> usize {
99 let cpus = thread::available_parallelism()
100 .map(std::num::NonZero::get)
101 .unwrap_or(4);
102 let workers = (cpus * 2) / 3;
105 workers.clamp(1, 10)
106}
107
108pub async fn collect_symbols_parallel(
116 files: Vec<FileEntry>,
117 lang: Language,
118 workspace_root: &Path,
119 num_workers: usize,
120) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
121 if files.is_empty() {
122 return Ok(Vec::new());
123 }
124
125 let num_workers = num_workers.min(files.len()).max(1);
126 let batch_size = detect_batch_size();
127
128 if num_workers <= 1 {
129 return collect_with_single_worker(files, lang, workspace_root, batch_size).await;
130 }
131
132 info!(
133 "init: spawning {num_workers} parallel workers for {lang} ({} files)",
134 files.len()
135 );
136
137 let boot_start = std::time::Instant::now();
139 let (binary_path, entry) = install::ensure_server(lang).await?;
140 let warmup_file = files[0].abs_path.clone();
142 let mut boot_handles = Vec::new();
143 for i in 0..num_workers {
144 let bp = binary_path.clone();
145 let args: Vec<String> = entry.args.iter().map(|s| (*s).to_string()).collect();
146 let wr = workspace_root.to_path_buf();
147 let wf = warmup_file.clone();
148 boot_handles.push(tokio::spawn(async move {
149 let args_refs: Vec<&str> = args.iter().map(String::as_str).collect();
150 (i, boot_temp_worker(&bp, &args_refs, lang, &wr, &wf).await)
151 }));
152 }
153
154 let mut workers = Vec::new();
155 for handle in boot_handles {
156 if let Ok((i, result)) = handle.await {
157 match result {
158 Ok((client, tracker)) => workers.push((client, tracker)),
159 Err(e) => warn!("init: worker {i} failed to start: {e}"),
160 }
161 }
162 }
163
164 if workers.is_empty() {
165 anyhow::bail!("no init workers could be started for {lang}");
166 }
167
168 let actual_workers = workers.len();
169 info!(
170 "init: {actual_workers} workers booted in {:?}",
171 boot_start.elapsed()
172 );
173
174 let files = Arc::new(files);
176 let mut handles = Vec::new();
177
178 for (worker_idx, (mut client, mut tracker)) in workers.into_iter().enumerate() {
179 let files_ref = Arc::clone(&files);
180 let worker_indices: Vec<usize> = (worker_idx..files_ref.len())
181 .step_by(actual_workers)
182 .collect();
183
184 handles.push(tokio::spawn(async move {
185 let worker_files: Vec<&FileEntry> =
186 worker_indices.iter().map(|&i| &files_ref[i]).collect();
187 info!(
188 "init: worker {worker_idx} processing {} files",
189 worker_files.len()
190 );
191 let results =
192 collect_symbols(&worker_files, &mut client, &mut tracker, batch_size).await;
193
194 let _ = tracker.close_all(client.transport_mut()).await;
196 if let Err(e) = client.shutdown().await {
197 debug!("init: worker {worker_idx} shutdown error: {e}");
198 }
199
200 results
201 }));
202 }
203
204 let mut all_results = Vec::new();
206 for handle in handles {
207 match handle.await {
208 Ok(results) => all_results.extend(results),
209 Err(e) => debug!("init: worker task panicked: {e}"),
210 }
211 }
212
213 Ok(all_results)
214}
215
216async fn collect_with_single_worker(
218 files: Vec<FileEntry>,
219 lang: Language,
220 workspace_root: &Path,
221 batch_size: usize,
222) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
223 let (binary_path, entry) = install::ensure_server(lang).await?;
224 let warmup_file = files[0].abs_path.clone();
225 let (mut client, mut tracker) =
226 boot_temp_worker(&binary_path, entry.args, lang, workspace_root, &warmup_file).await?;
227
228 let file_refs: Vec<&FileEntry> = files.iter().collect();
229 let results = collect_symbols(&file_refs, &mut client, &mut tracker, batch_size).await;
230
231 let _ = tracker.close_all(client.transport_mut()).await;
232 let _ = client.shutdown().await;
233
234 Ok(results)
235}
236
237async fn boot_temp_worker(
243 binary_path: &Path,
244 args: &[&str],
245 lang: Language,
246 workspace_root: &Path,
247 warmup_file: &Path,
248) -> anyhow::Result<(LspClient, FileTracker)> {
249 let mut client = LspClient::start_with_binary(binary_path, args, lang, workspace_root)
250 .map_err(|e| anyhow::anyhow!("{e}"))?;
251 client
252 .initialize(workspace_root)
253 .await
254 .context("LSP initialize failed")?;
255 let mut tracker = FileTracker::new(lang);
256 if tracker
259 .ensure_open(warmup_file, client.transport_mut())
260 .await
261 .is_ok()
262 {
263 probe_until_ready(&mut client, warmup_file).await;
264 }
265 Ok((client, tracker))
266}
267
268struct PreReadFile<'a> {
270 entry: &'a FileEntry,
271 content: String,
272 uri: String,
273}
274
275async fn prefetch_files<'a>(files: &[&'a FileEntry]) -> Vec<PreReadFile<'a>> {
279 let paths: Vec<(usize, PathBuf)> = files
280 .iter()
281 .enumerate()
282 .map(|(i, e)| (i, e.abs_path.clone()))
283 .collect();
284
285 let read_results = tokio::task::spawn_blocking(move || {
286 paths
287 .into_iter()
288 .filter_map(|(i, path)| {
289 let canonical = std::fs::canonicalize(&path).ok()?;
290 let content = std::fs::read_to_string(&canonical).ok()?;
291 let uri = path_to_uri(&canonical).ok()?.to_string();
292 Some((i, content, uri))
293 })
294 .collect::<Vec<_>>()
295 })
296 .await
297 .unwrap_or_default();
298
299 read_results
300 .into_iter()
301 .map(|(i, content, uri)| PreReadFile {
302 entry: files[i],
303 content,
304 uri,
305 })
306 .collect()
307}
308
309pub async fn collect_symbols(
318 files: &[&FileEntry],
319 client: &mut LspClient,
320 file_tracker: &mut FileTracker,
321 batch_size: usize,
322) -> Vec<(String, String, Vec<CachedSymbol>)> {
323 let mut results = Vec::new();
324 let total = files.len();
325 let chunks: Vec<&[&FileEntry]> = files.chunks(batch_size).collect();
326
327 let mut prefetched = if chunks.is_empty() {
329 Vec::new()
330 } else {
331 prefetch_files(chunks[0]).await
332 };
333
334 for (batch_idx, batch) in chunks.iter().enumerate() {
335 let batch_start = batch_idx * batch_size;
336 debug!(
337 "index: batch {}-{}/{total} ({} files)",
338 batch_start + 1,
339 (batch_start + batch.len()).min(total),
340 batch.len()
341 );
342
343 let next_prefetch = if batch_idx + 1 < chunks.len() {
345 let next_batch = chunks[batch_idx + 1];
346 let paths: Vec<(usize, PathBuf)> = next_batch
347 .iter()
348 .enumerate()
349 .map(|(i, e)| (i, e.abs_path.clone()))
350 .collect();
351 Some(tokio::task::spawn_blocking(move || {
352 paths
353 .into_iter()
354 .filter_map(|(i, path)| {
355 let canonical = std::fs::canonicalize(&path).ok()?;
356 let content = std::fs::read_to_string(&canonical).ok()?;
357 let uri = path_to_uri(&canonical).ok()?.to_string();
358 Some((i, content, uri))
359 })
360 .collect::<Vec<_>>()
361 }))
362 } else {
363 None
364 };
365
366 process_batch(&prefetched, batch, client, file_tracker, &mut results).await;
368
369 if let Some(handle) = next_prefetch {
371 let next_batch = chunks[batch_idx + 1];
372 prefetched = handle
373 .await
374 .unwrap_or_default()
375 .into_iter()
376 .map(|(i, content, uri)| PreReadFile {
377 entry: next_batch[i],
378 content,
379 uri,
380 })
381 .collect();
382 }
383 }
384
385 info!(
386 "index: collected symbols from {}/{total} files (batch_size={batch_size})",
387 results.len()
388 );
389 results
390}
391
392async fn process_batch(
394 prefetched: &[PreReadFile<'_>],
395 batch: &[&FileEntry],
396 client: &mut LspClient,
397 file_tracker: &mut FileTracker,
398 results: &mut Vec<(String, String, Vec<CachedSymbol>)>,
399) {
400 let mut pending: Vec<(&FileEntry, i64)> = Vec::new();
402 for file in prefetched {
403 if let Err(e) = file_tracker
404 .open_with_content(
405 &file.entry.abs_path,
406 &file.uri,
407 &file.content,
408 client.transport_mut(),
409 )
410 .await
411 {
412 debug!("index: failed to open {}: {e}", file.entry.rel_path);
413 continue;
414 }
415
416 let params = json!({ "textDocument": { "uri": file.uri } });
417 match client
418 .transport_mut()
419 .send_request("textDocument/documentSymbol", params)
420 .await
421 {
422 Ok(id) => pending.push((file.entry, id)),
423 Err(e) => debug!(
424 "index: failed to send request for {}: {e}",
425 file.entry.rel_path
426 ),
427 }
428 }
429
430 for (entry, request_id) in &pending {
432 match client.wait_for_response_public(*request_id).await {
433 Ok(response) => {
434 let symbols = flatten_document_symbols(&response, None);
435 results.push((entry.rel_path.clone(), entry.hash.clone(), symbols));
436 }
437 Err(e) => {
438 debug!("index: failed to index {}: {e}", entry.rel_path);
439 }
440 }
441 }
442
443 for entry in batch {
445 if let Err(e) = file_tracker
446 .close(&entry.abs_path, client.transport_mut())
447 .await
448 {
449 debug!("index: failed to close {}: {e}", entry.rel_path);
450 }
451 }
452}
453
454pub fn commit_index(
459 store: &IndexStore,
460 results: &[(String, String, Vec<CachedSymbol>)],
461) -> anyhow::Result<usize> {
462 Ok(store.batch_commit(results)?)
463}
464
465#[allow(clippy::cast_possible_truncation)]
467fn flatten_document_symbols(value: &Value, parent: Option<&str>) -> Vec<CachedSymbol> {
468 let Some(items) = value.as_array() else {
469 return Vec::new();
470 };
471
472 let mut result = Vec::new();
473 for item in items {
474 let name = item
475 .get("name")
476 .and_then(Value::as_str)
477 .unwrap_or_default()
478 .to_string();
479
480 let kind =
481 symbol_kind_name(item.get("kind").and_then(Value::as_u64).unwrap_or(0)).to_string();
482
483 let start_line = item
484 .pointer("/range/start/line")
485 .and_then(Value::as_u64)
486 .unwrap_or(0) as u32;
487 let start_col = item
488 .pointer("/range/start/character")
489 .and_then(Value::as_u64)
490 .unwrap_or(0) as u32;
491 let end_line = item
492 .pointer("/range/end/line")
493 .and_then(Value::as_u64)
494 .unwrap_or(0) as u32;
495 let end_col = item
496 .pointer("/range/end/character")
497 .and_then(Value::as_u64)
498 .unwrap_or(0) as u32;
499
500 result.push(CachedSymbol {
501 name: name.clone(),
502 kind,
503 path: String::new(),
504 range_start_line: start_line,
505 range_start_col: start_col,
506 range_end_line: end_line,
507 range_end_col: end_col,
508 parent_name: parent.map(String::from),
509 });
510
511 if let Some(children) = item.get("children") {
512 result.extend(flatten_document_symbols(children, Some(&name)));
513 }
514 }
515 result
516}
517
518fn walk_source_files(project_root: &Path, extensions: &[&str]) -> anyhow::Result<Vec<PathBuf>> {
520 let mut builder = ignore::WalkBuilder::new(project_root);
521 builder
522 .hidden(true)
523 .git_ignore(true)
524 .git_global(false)
525 .git_exclude(true);
526
527 let mut files = Vec::new();
528 for entry in builder.build() {
529 let entry = entry?;
530 if !entry.file_type().is_some_and(|ft| ft.is_file()) {
531 continue;
532 }
533 let path = entry.path();
534 if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
535 if extensions.contains(&ext) {
536 files.push(path.to_path_buf());
537 }
538 }
539 }
540
541 files.sort();
542 Ok(files)
543}
544
545#[cfg(test)]
546mod tests {
547 use serde_json::json;
548
549 use super::*;
550
551 #[test]
552 fn flatten_empty_response() {
553 let result = flatten_document_symbols(&json!(null), None);
554 assert!(result.is_empty());
555 }
556
557 #[test]
558 fn flatten_nested_symbols() {
559 let response = json!([
560 {
561 "name": "Config",
562 "kind": 5,
563 "range": {
564 "start": { "line": 0, "character": 0 },
565 "end": { "line": 20, "character": 1 }
566 },
567 "children": [
568 {
569 "name": "new",
570 "kind": 6,
571 "range": {
572 "start": { "line": 5, "character": 2 },
573 "end": { "line": 10, "character": 3 }
574 }
575 }
576 ]
577 },
578 {
579 "name": "greet",
580 "kind": 12,
581 "range": {
582 "start": { "line": 22, "character": 0 },
583 "end": { "line": 25, "character": 1 }
584 }
585 }
586 ]);
587
588 let symbols = flatten_document_symbols(&response, None);
589 assert_eq!(symbols.len(), 3);
590 assert_eq!(symbols[0].name, "Config");
591 assert!(symbols[0].parent_name.is_none());
592 assert_eq!(symbols[1].name, "new");
593 assert_eq!(symbols[1].parent_name, Some("Config".to_string()));
594 assert_eq!(symbols[2].name, "greet");
595 assert!(symbols[2].parent_name.is_none());
596 }
597
598 #[test]
599 fn walk_source_files_filters_by_extension() {
600 let dir = tempfile::tempdir().unwrap();
601 std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
602 std::fs::write(dir.path().join("lib.rs"), "pub fn lib() {}").unwrap();
603 std::fs::write(dir.path().join("notes.txt"), "notes").unwrap();
604 std::fs::write(dir.path().join("data.json"), "{}").unwrap();
605
606 let files = walk_source_files(dir.path(), &["rs"]).unwrap();
607 assert_eq!(files.len(), 2);
608 assert!(files.iter().all(|f| f.extension().unwrap() == "rs"));
609 }
610
611 #[test]
612 fn walk_source_files_respects_gitignore() {
613 let dir = tempfile::tempdir().unwrap();
614
615 std::process::Command::new("git")
616 .args(["init"])
617 .current_dir(dir.path())
618 .output()
619 .unwrap();
620
621 std::fs::write(dir.path().join(".gitignore"), "target/\n").unwrap();
622 std::fs::create_dir_all(dir.path().join("target")).unwrap();
623 std::fs::write(dir.path().join("target/output.rs"), "// generated").unwrap();
624 std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
625
626 let files = walk_source_files(dir.path(), &["rs"]).unwrap();
627 assert_eq!(files.len(), 1);
628 assert!(files[0].ends_with("main.rs"));
629 }
630}