1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::thread;
4
5use anyhow::Context;
6use serde_json::{json, Value};
7use tracing::{debug, info, warn};
8
9use super::hasher::hash_files_parallel;
10use super::store::{CachedSymbol, IndexStore};
11use crate::commands::find::symbol_kind_name;
12use crate::detect::Language;
13use crate::lsp::client::{path_to_uri, LspClient};
14use crate::lsp::files::FileTracker;
15use crate::lsp::install;
16
17#[derive(Debug, Default)]
19pub struct IndexStats {
20 pub files_total: usize,
21 pub files_indexed: usize,
22 pub files_cached: usize,
23 pub symbols_total: usize,
24}
25
26pub struct FileEntry {
28 pub abs_path: PathBuf,
29 pub rel_path: String,
30 pub hash: String,
31}
32
33pub fn plan_index(
40 store: &IndexStore,
41 project_root: &Path,
42 extensions: &[&str],
43) -> anyhow::Result<(Vec<FileEntry>, usize)> {
44 let source_files = walk_source_files(project_root, extensions)?;
45 info!("index: found {} source files", source_files.len());
46
47 let hashes = hash_files_parallel(&source_files);
49
50 let path_hashes: Vec<(String, PathBuf, String)> = hashes
52 .into_iter()
53 .map(|(abs_path, hash)| {
54 let rel_path = abs_path
55 .strip_prefix(project_root)
56 .unwrap_or(&abs_path)
57 .to_string_lossy()
58 .to_string();
59 (rel_path, abs_path, hash)
60 })
61 .collect();
62
63 let rel_paths: Vec<&str> = path_hashes.iter().map(|(r, _, _)| r.as_str()).collect();
65 let stored = store.get_file_hashes_batch(&rel_paths).unwrap_or_default();
66
67 let mut to_index = Vec::new();
68 let mut cached = 0usize;
69
70 for (rel_path, abs_path, hash) in path_hashes {
71 if stored.get(&rel_path).is_some_and(|h| *h == hash) {
72 cached += 1;
73 } else {
74 to_index.push(FileEntry {
75 abs_path,
76 rel_path,
77 hash,
78 });
79 }
80 }
81
82 Ok((to_index, cached))
83}
84
85#[must_use]
87pub fn detect_batch_size() -> usize {
88 let cpus = thread::available_parallelism()
89 .map(std::num::NonZero::get)
90 .unwrap_or(4);
91 (cpus * 4).clamp(8, 64)
93}
94
95#[must_use]
97pub fn detect_worker_count() -> usize {
98 let cpus = thread::available_parallelism()
99 .map(std::num::NonZero::get)
100 .unwrap_or(4);
101 let workers = (cpus * 2) / 3;
104 workers.clamp(1, 10)
105}
106
107pub async fn collect_symbols_parallel(
115 files: Vec<FileEntry>,
116 lang: Language,
117 workspace_root: &Path,
118 num_workers: usize,
119) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
120 if files.is_empty() {
121 return Ok(Vec::new());
122 }
123
124 let num_workers = num_workers.min(files.len()).max(1);
125 let batch_size = detect_batch_size();
126
127 if num_workers <= 1 {
128 return collect_with_single_worker(files, lang, workspace_root, batch_size).await;
129 }
130
131 info!(
132 "init: spawning {num_workers} parallel workers for {lang} ({} files)",
133 files.len()
134 );
135
136 let boot_start = std::time::Instant::now();
138 let (binary_path, entry) = install::ensure_server(lang).await?;
139 let mut boot_handles = Vec::new();
140 for i in 0..num_workers {
141 let bp = binary_path.clone();
142 let args: Vec<String> = entry.args.iter().map(|s| (*s).to_string()).collect();
143 let wr = workspace_root.to_path_buf();
144 boot_handles.push(tokio::spawn(async move {
145 let args_refs: Vec<&str> = args.iter().map(String::as_str).collect();
146 (i, boot_temp_worker(&bp, &args_refs, lang, &wr).await)
147 }));
148 }
149
150 let mut workers = Vec::new();
151 for handle in boot_handles {
152 if let Ok((i, result)) = handle.await {
153 match result {
154 Ok((client, tracker)) => workers.push((client, tracker)),
155 Err(e) => warn!("init: worker {i} failed to start: {e}"),
156 }
157 }
158 }
159
160 if workers.is_empty() {
161 anyhow::bail!("no init workers could be started for {lang}");
162 }
163
164 let actual_workers = workers.len();
165 info!(
166 "init: {actual_workers} workers booted in {:?}",
167 boot_start.elapsed()
168 );
169
170 let files = Arc::new(files);
172 let mut handles = Vec::new();
173
174 for (worker_idx, (mut client, mut tracker)) in workers.into_iter().enumerate() {
175 let files_ref = Arc::clone(&files);
176 let worker_indices: Vec<usize> = (worker_idx..files_ref.len())
177 .step_by(actual_workers)
178 .collect();
179
180 handles.push(tokio::spawn(async move {
181 let worker_files: Vec<&FileEntry> =
182 worker_indices.iter().map(|&i| &files_ref[i]).collect();
183 info!(
184 "init: worker {worker_idx} processing {} files",
185 worker_files.len()
186 );
187 let results =
188 collect_symbols(&worker_files, &mut client, &mut tracker, batch_size).await;
189
190 let _ = tracker.close_all(client.transport_mut()).await;
192 if let Err(e) = client.shutdown().await {
193 debug!("init: worker {worker_idx} shutdown error: {e}");
194 }
195
196 results
197 }));
198 }
199
200 let mut all_results = Vec::new();
202 for handle in handles {
203 match handle.await {
204 Ok(results) => all_results.extend(results),
205 Err(e) => debug!("init: worker task panicked: {e}"),
206 }
207 }
208
209 Ok(all_results)
210}
211
212async fn collect_with_single_worker(
214 files: Vec<FileEntry>,
215 lang: Language,
216 workspace_root: &Path,
217 batch_size: usize,
218) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
219 let (binary_path, entry) = install::ensure_server(lang).await?;
220 let (mut client, mut tracker) =
221 boot_temp_worker(&binary_path, entry.args, lang, workspace_root).await?;
222
223 let file_refs: Vec<&FileEntry> = files.iter().collect();
224 let results = collect_symbols(&file_refs, &mut client, &mut tracker, batch_size).await;
225
226 let _ = tracker.close_all(client.transport_mut()).await;
227 let _ = client.shutdown().await;
228
229 Ok(results)
230}
231
232async fn boot_temp_worker(
234 binary_path: &Path,
235 args: &[&str],
236 lang: Language,
237 workspace_root: &Path,
238) -> anyhow::Result<(LspClient, FileTracker)> {
239 let mut client = LspClient::start_with_binary(binary_path, args, lang, workspace_root)
240 .map_err(|e| anyhow::anyhow!("{e}"))?;
241 client
242 .initialize(workspace_root)
243 .await
244 .context("LSP initialize failed")?;
245 let tracker = FileTracker::new(lang);
246 Ok((client, tracker))
247}
248
249struct PreReadFile<'a> {
251 entry: &'a FileEntry,
252 content: String,
253 uri: String,
254}
255
256async fn prefetch_files<'a>(files: &[&'a FileEntry]) -> Vec<PreReadFile<'a>> {
260 let paths: Vec<(usize, PathBuf)> = files
261 .iter()
262 .enumerate()
263 .map(|(i, e)| (i, e.abs_path.clone()))
264 .collect();
265
266 let read_results = tokio::task::spawn_blocking(move || {
267 paths
268 .into_iter()
269 .filter_map(|(i, path)| {
270 let canonical = std::fs::canonicalize(&path).ok()?;
271 let content = std::fs::read_to_string(&canonical).ok()?;
272 let uri = path_to_uri(&canonical).ok()?.to_string();
273 Some((i, content, uri))
274 })
275 .collect::<Vec<_>>()
276 })
277 .await
278 .unwrap_or_default();
279
280 read_results
281 .into_iter()
282 .map(|(i, content, uri)| PreReadFile {
283 entry: files[i],
284 content,
285 uri,
286 })
287 .collect()
288}
289
290pub async fn collect_symbols(
299 files: &[&FileEntry],
300 client: &mut LspClient,
301 file_tracker: &mut FileTracker,
302 batch_size: usize,
303) -> Vec<(String, String, Vec<CachedSymbol>)> {
304 let mut results = Vec::new();
305 let total = files.len();
306 let chunks: Vec<&[&FileEntry]> = files.chunks(batch_size).collect();
307
308 let mut prefetched = if chunks.is_empty() {
310 Vec::new()
311 } else {
312 prefetch_files(chunks[0]).await
313 };
314
315 for (batch_idx, batch) in chunks.iter().enumerate() {
316 let batch_start = batch_idx * batch_size;
317 debug!(
318 "index: batch {}-{}/{total} ({} files)",
319 batch_start + 1,
320 (batch_start + batch.len()).min(total),
321 batch.len()
322 );
323
324 let next_prefetch = if batch_idx + 1 < chunks.len() {
326 let next_batch = chunks[batch_idx + 1];
327 let paths: Vec<(usize, PathBuf)> = next_batch
328 .iter()
329 .enumerate()
330 .map(|(i, e)| (i, e.abs_path.clone()))
331 .collect();
332 Some(tokio::task::spawn_blocking(move || {
333 paths
334 .into_iter()
335 .filter_map(|(i, path)| {
336 let canonical = std::fs::canonicalize(&path).ok()?;
337 let content = std::fs::read_to_string(&canonical).ok()?;
338 let uri = path_to_uri(&canonical).ok()?.to_string();
339 Some((i, content, uri))
340 })
341 .collect::<Vec<_>>()
342 }))
343 } else {
344 None
345 };
346
347 process_batch(&prefetched, batch, client, file_tracker, &mut results).await;
349
350 if let Some(handle) = next_prefetch {
352 let next_batch = chunks[batch_idx + 1];
353 prefetched = handle
354 .await
355 .unwrap_or_default()
356 .into_iter()
357 .map(|(i, content, uri)| PreReadFile {
358 entry: next_batch[i],
359 content,
360 uri,
361 })
362 .collect();
363 }
364 }
365
366 info!(
367 "index: collected symbols from {}/{total} files (batch_size={batch_size})",
368 results.len()
369 );
370 results
371}
372
373async fn process_batch(
375 prefetched: &[PreReadFile<'_>],
376 batch: &[&FileEntry],
377 client: &mut LspClient,
378 file_tracker: &mut FileTracker,
379 results: &mut Vec<(String, String, Vec<CachedSymbol>)>,
380) {
381 let mut pending: Vec<(&FileEntry, i64)> = Vec::new();
383 for file in prefetched {
384 if let Err(e) = file_tracker
385 .open_with_content(
386 &file.entry.abs_path,
387 &file.uri,
388 &file.content,
389 client.transport_mut(),
390 )
391 .await
392 {
393 debug!("index: failed to open {}: {e}", file.entry.rel_path);
394 continue;
395 }
396
397 let params = json!({ "textDocument": { "uri": file.uri } });
398 match client
399 .transport_mut()
400 .send_request("textDocument/documentSymbol", params)
401 .await
402 {
403 Ok(id) => pending.push((file.entry, id)),
404 Err(e) => debug!(
405 "index: failed to send request for {}: {e}",
406 file.entry.rel_path
407 ),
408 }
409 }
410
411 for (entry, request_id) in &pending {
413 match client.wait_for_response_public(*request_id).await {
414 Ok(response) => {
415 let symbols = flatten_document_symbols(&response, None);
416 results.push((entry.rel_path.clone(), entry.hash.clone(), symbols));
417 }
418 Err(e) => {
419 debug!("index: failed to index {}: {e}", entry.rel_path);
420 }
421 }
422 }
423
424 for entry in batch {
426 if let Err(e) = file_tracker
427 .close(&entry.abs_path, client.transport_mut())
428 .await
429 {
430 debug!("index: failed to close {}: {e}", entry.rel_path);
431 }
432 }
433}
434
435pub fn commit_index(
440 store: &IndexStore,
441 results: &[(String, String, Vec<CachedSymbol>)],
442) -> anyhow::Result<usize> {
443 Ok(store.batch_commit(results)?)
444}
445
446#[allow(clippy::cast_possible_truncation)]
448fn flatten_document_symbols(value: &Value, parent: Option<&str>) -> Vec<CachedSymbol> {
449 let Some(items) = value.as_array() else {
450 return Vec::new();
451 };
452
453 let mut result = Vec::new();
454 for item in items {
455 let name = item
456 .get("name")
457 .and_then(Value::as_str)
458 .unwrap_or_default()
459 .to_string();
460
461 let kind =
462 symbol_kind_name(item.get("kind").and_then(Value::as_u64).unwrap_or(0)).to_string();
463
464 let start_line = item
465 .pointer("/range/start/line")
466 .and_then(Value::as_u64)
467 .unwrap_or(0) as u32;
468 let start_col = item
469 .pointer("/range/start/character")
470 .and_then(Value::as_u64)
471 .unwrap_or(0) as u32;
472 let end_line = item
473 .pointer("/range/end/line")
474 .and_then(Value::as_u64)
475 .unwrap_or(0) as u32;
476 let end_col = item
477 .pointer("/range/end/character")
478 .and_then(Value::as_u64)
479 .unwrap_or(0) as u32;
480
481 result.push(CachedSymbol {
482 name: name.clone(),
483 kind,
484 path: String::new(),
485 range_start_line: start_line,
486 range_start_col: start_col,
487 range_end_line: end_line,
488 range_end_col: end_col,
489 parent_name: parent.map(String::from),
490 });
491
492 if let Some(children) = item.get("children") {
493 result.extend(flatten_document_symbols(children, Some(&name)));
494 }
495 }
496 result
497}
498
499fn walk_source_files(project_root: &Path, extensions: &[&str]) -> anyhow::Result<Vec<PathBuf>> {
501 let mut builder = ignore::WalkBuilder::new(project_root);
502 builder
503 .hidden(true)
504 .git_ignore(true)
505 .git_global(false)
506 .git_exclude(true);
507
508 let mut files = Vec::new();
509 for entry in builder.build() {
510 let entry = entry?;
511 if !entry.file_type().is_some_and(|ft| ft.is_file()) {
512 continue;
513 }
514 let path = entry.path();
515 if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
516 if extensions.contains(&ext) {
517 files.push(path.to_path_buf());
518 }
519 }
520 }
521
522 files.sort();
523 Ok(files)
524}
525
526#[cfg(test)]
527mod tests {
528 use serde_json::json;
529
530 use super::*;
531
532 #[test]
533 fn flatten_empty_response() {
534 let result = flatten_document_symbols(&json!(null), None);
535 assert!(result.is_empty());
536 }
537
538 #[test]
539 fn flatten_nested_symbols() {
540 let response = json!([
541 {
542 "name": "Config",
543 "kind": 5,
544 "range": {
545 "start": { "line": 0, "character": 0 },
546 "end": { "line": 20, "character": 1 }
547 },
548 "children": [
549 {
550 "name": "new",
551 "kind": 6,
552 "range": {
553 "start": { "line": 5, "character": 2 },
554 "end": { "line": 10, "character": 3 }
555 }
556 }
557 ]
558 },
559 {
560 "name": "greet",
561 "kind": 12,
562 "range": {
563 "start": { "line": 22, "character": 0 },
564 "end": { "line": 25, "character": 1 }
565 }
566 }
567 ]);
568
569 let symbols = flatten_document_symbols(&response, None);
570 assert_eq!(symbols.len(), 3);
571 assert_eq!(symbols[0].name, "Config");
572 assert!(symbols[0].parent_name.is_none());
573 assert_eq!(symbols[1].name, "new");
574 assert_eq!(symbols[1].parent_name, Some("Config".to_string()));
575 assert_eq!(symbols[2].name, "greet");
576 assert!(symbols[2].parent_name.is_none());
577 }
578
579 #[test]
580 fn walk_source_files_filters_by_extension() {
581 let dir = tempfile::tempdir().unwrap();
582 std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
583 std::fs::write(dir.path().join("lib.rs"), "pub fn lib() {}").unwrap();
584 std::fs::write(dir.path().join("notes.txt"), "notes").unwrap();
585 std::fs::write(dir.path().join("data.json"), "{}").unwrap();
586
587 let files = walk_source_files(dir.path(), &["rs"]).unwrap();
588 assert_eq!(files.len(), 2);
589 assert!(files.iter().all(|f| f.extension().unwrap() == "rs"));
590 }
591
592 #[test]
593 fn walk_source_files_respects_gitignore() {
594 let dir = tempfile::tempdir().unwrap();
595
596 std::process::Command::new("git")
597 .args(["init"])
598 .current_dir(dir.path())
599 .output()
600 .unwrap();
601
602 std::fs::write(dir.path().join(".gitignore"), "target/\n").unwrap();
603 std::fs::create_dir_all(dir.path().join("target")).unwrap();
604 std::fs::write(dir.path().join("target/output.rs"), "// generated").unwrap();
605 std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
606
607 let files = walk_source_files(dir.path(), &["rs"]).unwrap();
608 assert_eq!(files.len(), 1);
609 assert!(files[0].ends_with("main.rs"));
610 }
611}