Skip to main content

krait/index/
builder.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::thread;
4
5use anyhow::Context;
6use serde_json::{json, Value};
7use tracing::{debug, info, warn};
8
9use super::hasher::hash_files_parallel;
10use super::store::{CachedSymbol, IndexStore};
11use crate::commands::find::symbol_kind_name;
12use crate::detect::Language;
13use crate::lsp::client::{path_to_uri, LspClient};
14use crate::lsp::files::FileTracker;
15use crate::lsp::install;
16use crate::lsp::pool::probe_until_ready;
17
18/// Statistics from an index build.
19#[derive(Debug, Default)]
20pub struct IndexStats {
21    pub files_total: usize,
22    pub files_indexed: usize,
23    pub files_cached: usize,
24    pub symbols_total: usize,
25}
26
27/// A file entry to be indexed: absolute path, relative path, and BLAKE3 hash.
28pub struct FileEntry {
29    pub abs_path: PathBuf,
30    pub rel_path: String,
31    pub hash: String,
32}
33
34/// Determine which files need (re)indexing by comparing BLAKE3 hashes.
35///
36/// Returns `(files_to_index, cached_count)`.
37///
38/// # Errors
39/// Returns an error if walking the source tree fails.
40pub fn plan_index(
41    store: &IndexStore,
42    project_root: &Path,
43    extensions: &[&str],
44) -> anyhow::Result<(Vec<FileEntry>, usize)> {
45    let source_files = walk_source_files(project_root, extensions)?;
46    info!("index: found {} source files", source_files.len());
47
48    // Hash all files in parallel (rayon)
49    let hashes = hash_files_parallel(&source_files);
50
51    // Build rel_path → abs_path + hash map
52    let path_hashes: Vec<(String, PathBuf, String)> = hashes
53        .into_iter()
54        .map(|(abs_path, hash)| {
55            let rel_path = abs_path
56                .strip_prefix(project_root)
57                .unwrap_or(&abs_path)
58                .to_string_lossy()
59                .to_string();
60            (rel_path, abs_path, hash)
61        })
62        .collect();
63
64    // Batch SELECT all stored hashes in one query
65    let rel_paths: Vec<&str> = path_hashes.iter().map(|(r, _, _)| r.as_str()).collect();
66    let stored = store.get_file_hashes_batch(&rel_paths).unwrap_or_default();
67
68    let mut to_index = Vec::new();
69    let mut cached = 0usize;
70
71    for (rel_path, abs_path, hash) in path_hashes {
72        if stored.get(&rel_path).is_some_and(|h| *h == hash) {
73            cached += 1;
74        } else {
75            to_index.push(FileEntry {
76                abs_path,
77                rel_path,
78                hash,
79            });
80        }
81    }
82
83    Ok((to_index, cached))
84}
85
86/// Detect optimal batch size for pipelined LSP requests based on system resources.
87#[must_use]
88pub fn detect_batch_size() -> usize {
89    let cpus = thread::available_parallelism()
90        .map(std::num::NonZero::get)
91        .unwrap_or(4);
92    // I/O-bound work: use 4x CPU cores, clamped to [8, 64]
93    (cpus * 4).clamp(8, 64)
94}
95
96/// Detect the number of parallel init workers based on system resources.
97#[must_use]
98pub fn detect_worker_count() -> usize {
99    let cpus = thread::available_parallelism()
100        .map(std::num::NonZero::get)
101        .unwrap_or(4);
102    // Each LSP process is single-threaded, so more workers = more parallelism.
103    // Aggressive: use 2/3 of available cores, cap at 10.
104    let workers = (cpus * 2) / 3;
105    workers.clamp(1, 10)
106}
107
108/// Index files using N parallel LSP workers for a single language.
109///
110/// Spawns temporary LSP server processes, splits files across them,
111/// indexes in parallel, then shuts them all down. Works for all languages.
112///
113/// # Errors
114/// Returns an error if no workers can be started.
115pub async fn collect_symbols_parallel(
116    files: Vec<FileEntry>,
117    lang: Language,
118    workspace_root: &Path,
119    num_workers: usize,
120) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
121    if files.is_empty() {
122        return Ok(Vec::new());
123    }
124
125    let num_workers = num_workers.min(files.len()).max(1);
126    let batch_size = detect_batch_size();
127
128    if num_workers <= 1 {
129        return collect_with_single_worker(files, lang, workspace_root, batch_size).await;
130    }
131
132    info!(
133        "init: spawning {num_workers} parallel workers for {lang} ({} files)",
134        files.len()
135    );
136
137    // Boot N temporary LSP servers in parallel
138    let boot_start = std::time::Instant::now();
139    let (binary_path, entry) = install::ensure_server(lang).await?;
140    // Use the first file in the list as a warmup probe target (guaranteed to exist).
141    let warmup_file = files[0].abs_path.clone();
142    let mut boot_handles = Vec::new();
143    for i in 0..num_workers {
144        let bp = binary_path.clone();
145        let args: Vec<String> = entry.args.iter().map(|s| (*s).to_string()).collect();
146        let wr = workspace_root.to_path_buf();
147        let wf = warmup_file.clone();
148        boot_handles.push(tokio::spawn(async move {
149            let args_refs: Vec<&str> = args.iter().map(String::as_str).collect();
150            (i, boot_temp_worker(&bp, &args_refs, lang, &wr, &wf).await)
151        }));
152    }
153
154    let mut workers = Vec::new();
155    for handle in boot_handles {
156        if let Ok((i, result)) = handle.await {
157            match result {
158                Ok((client, tracker)) => workers.push((client, tracker)),
159                Err(e) => warn!("init: worker {i} failed to start: {e}"),
160            }
161        }
162    }
163
164    if workers.is_empty() {
165        anyhow::bail!("no init workers could be started for {lang}");
166    }
167
168    let actual_workers = workers.len();
169    info!(
170        "init: {actual_workers} workers booted in {:?}",
171        boot_start.elapsed()
172    );
173
174    // Split files round-robin across workers
175    let files = Arc::new(files);
176    let mut handles = Vec::new();
177
178    for (worker_idx, (mut client, mut tracker)) in workers.into_iter().enumerate() {
179        let files_ref = Arc::clone(&files);
180        let worker_indices: Vec<usize> = (worker_idx..files_ref.len())
181            .step_by(actual_workers)
182            .collect();
183
184        handles.push(tokio::spawn(async move {
185            let worker_files: Vec<&FileEntry> =
186                worker_indices.iter().map(|&i| &files_ref[i]).collect();
187            info!(
188                "init: worker {worker_idx} processing {} files",
189                worker_files.len()
190            );
191            let results =
192                collect_symbols(&worker_files, &mut client, &mut tracker, batch_size).await;
193
194            // Shut down temp worker
195            let _ = tracker.close_all(client.transport_mut()).await;
196            if let Err(e) = client.shutdown().await {
197                debug!("init: worker {worker_idx} shutdown error: {e}");
198            }
199
200            results
201        }));
202    }
203
204    // Collect all results
205    let mut all_results = Vec::new();
206    for handle in handles {
207        match handle.await {
208            Ok(results) => all_results.extend(results),
209            Err(e) => debug!("init: worker task panicked: {e}"),
210        }
211    }
212
213    Ok(all_results)
214}
215
216/// Fallback: single worker (same as before).
217async fn collect_with_single_worker(
218    files: Vec<FileEntry>,
219    lang: Language,
220    workspace_root: &Path,
221    batch_size: usize,
222) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
223    let (binary_path, entry) = install::ensure_server(lang).await?;
224    let warmup_file = files[0].abs_path.clone();
225    let (mut client, mut tracker) =
226        boot_temp_worker(&binary_path, entry.args, lang, workspace_root, &warmup_file).await?;
227
228    let file_refs: Vec<&FileEntry> = files.iter().collect();
229    let results = collect_symbols(&file_refs, &mut client, &mut tracker, batch_size).await;
230
231    let _ = tracker.close_all(client.transport_mut()).await;
232    let _ = client.shutdown().await;
233
234    Ok(results)
235}
236
237/// Boot a temporary LSP server for indexing (not part of the pool).
238///
239/// `warmup_file` is the first file the caller intends to index — it's used to probe
240/// the server until its workspace view is ready (needed by gopls, which returns
241/// `"no views"` if queried before it finishes loading the go.mod).
242async fn boot_temp_worker(
243    binary_path: &Path,
244    args: &[&str],
245    lang: Language,
246    workspace_root: &Path,
247    warmup_file: &Path,
248) -> anyhow::Result<(LspClient, FileTracker)> {
249    let mut client = LspClient::start_with_binary(binary_path, args, lang, workspace_root)
250        .map_err(|e| anyhow::anyhow!("{e}"))?;
251    client
252        .initialize(workspace_root)
253        .await
254        .context("LSP initialize failed")?;
255    let mut tracker = FileTracker::new(lang);
256    // Open the warmup file and probe until the server is ready.
257    // gopls returns "no views" if queried before it finishes loading the module graph.
258    if tracker
259        .ensure_open(warmup_file, client.transport_mut())
260        .await
261        .is_ok()
262    {
263        probe_until_ready(&mut client, warmup_file).await;
264    }
265    Ok((client, tracker))
266}
267
268/// Pre-read file content from disk (for parallel pre-fetching).
269struct PreReadFile<'a> {
270    entry: &'a FileEntry,
271    content: String,
272    uri: String,
273}
274
275/// Pre-read a batch of files from disk using a blocking thread pool.
276///
277/// Returns files with their content already loaded, ready for `didOpen`.
278async fn prefetch_files<'a>(files: &[&'a FileEntry]) -> Vec<PreReadFile<'a>> {
279    let paths: Vec<(usize, PathBuf)> = files
280        .iter()
281        .enumerate()
282        .map(|(i, e)| (i, e.abs_path.clone()))
283        .collect();
284
285    let read_results = tokio::task::spawn_blocking(move || {
286        paths
287            .into_iter()
288            .filter_map(|(i, path)| {
289                let canonical = std::fs::canonicalize(&path).ok()?;
290                let content = std::fs::read_to_string(&canonical).ok()?;
291                let uri = path_to_uri(&canonical).ok()?.to_string();
292                Some((i, content, uri))
293            })
294            .collect::<Vec<_>>()
295    })
296    .await
297    .unwrap_or_default();
298
299    read_results
300        .into_iter()
301        .map(|(i, content, uri)| PreReadFile {
302            entry: files[i],
303            content,
304            uri,
305        })
306        .collect()
307}
308
309/// Query LSP for document symbols using pipelined batches with parallel disk I/O.
310///
311/// For each batch: pre-reads files from disk in parallel, opens them in the LSP,
312/// fires all `documentSymbol` requests, collects all responses, then closes files.
313/// The next batch's disk I/O overlaps with the current batch's LSP processing.
314///
315/// Returns `(rel_path, hash, symbols)` for each successfully indexed file.
316/// This function is `Send` — it does NOT touch `IndexStore`.
317pub async fn collect_symbols(
318    files: &[&FileEntry],
319    client: &mut LspClient,
320    file_tracker: &mut FileTracker,
321    batch_size: usize,
322) -> Vec<(String, String, Vec<CachedSymbol>)> {
323    let mut results = Vec::new();
324    let total = files.len();
325    let chunks: Vec<&[&FileEntry]> = files.chunks(batch_size).collect();
326
327    // Pre-read first batch from disk
328    let mut prefetched = if chunks.is_empty() {
329        Vec::new()
330    } else {
331        prefetch_files(chunks[0]).await
332    };
333
334    for (batch_idx, batch) in chunks.iter().enumerate() {
335        let batch_start = batch_idx * batch_size;
336        debug!(
337            "index: batch {}-{}/{total} ({} files)",
338            batch_start + 1,
339            (batch_start + batch.len()).min(total),
340            batch.len()
341        );
342
343        // Kick off prefetch for NEXT batch while we process this one
344        let next_prefetch = if batch_idx + 1 < chunks.len() {
345            let next_batch = chunks[batch_idx + 1];
346            let paths: Vec<(usize, PathBuf)> = next_batch
347                .iter()
348                .enumerate()
349                .map(|(i, e)| (i, e.abs_path.clone()))
350                .collect();
351            Some(tokio::task::spawn_blocking(move || {
352                paths
353                    .into_iter()
354                    .filter_map(|(i, path)| {
355                        let canonical = std::fs::canonicalize(&path).ok()?;
356                        let content = std::fs::read_to_string(&canonical).ok()?;
357                        let uri = path_to_uri(&canonical).ok()?.to_string();
358                        Some((i, content, uri))
359                    })
360                    .collect::<Vec<_>>()
361            }))
362        } else {
363            None
364        };
365
366        // Process batch: open → query → collect → close
367        process_batch(&prefetched, batch, client, file_tracker, &mut results).await;
368
369        // Collect next batch's prefetch results
370        if let Some(handle) = next_prefetch {
371            let next_batch = chunks[batch_idx + 1];
372            prefetched = handle
373                .await
374                .unwrap_or_default()
375                .into_iter()
376                .map(|(i, content, uri)| PreReadFile {
377                    entry: next_batch[i],
378                    content,
379                    uri,
380                })
381                .collect();
382        }
383    }
384
385    info!(
386        "index: collected symbols from {}/{total} files (batch_size={batch_size})",
387        results.len()
388    );
389    results
390}
391
392/// Process one batch: open pre-read files, send requests, collect responses, close files.
393async fn process_batch(
394    prefetched: &[PreReadFile<'_>],
395    batch: &[&FileEntry],
396    client: &mut LspClient,
397    file_tracker: &mut FileTracker,
398    results: &mut Vec<(String, String, Vec<CachedSymbol>)>,
399) {
400    // Phase 1: Open pre-read files and send all documentSymbol requests
401    let mut pending: Vec<(&FileEntry, i64)> = Vec::new();
402    for file in prefetched {
403        if let Err(e) = file_tracker
404            .open_with_content(
405                &file.entry.abs_path,
406                &file.uri,
407                &file.content,
408                client.transport_mut(),
409            )
410            .await
411        {
412            debug!("index: failed to open {}: {e}", file.entry.rel_path);
413            continue;
414        }
415
416        let params = json!({ "textDocument": { "uri": file.uri } });
417        match client
418            .transport_mut()
419            .send_request("textDocument/documentSymbol", params)
420            .await
421        {
422            Ok(id) => pending.push((file.entry, id)),
423            Err(e) => debug!(
424                "index: failed to send request for {}: {e}",
425                file.entry.rel_path
426            ),
427        }
428    }
429
430    // Phase 2: Collect all responses
431    for (entry, request_id) in &pending {
432        match client.wait_for_response_public(*request_id).await {
433            Ok(response) => {
434                let symbols = flatten_document_symbols(&response, None);
435                results.push((entry.rel_path.clone(), entry.hash.clone(), symbols));
436            }
437            Err(e) => {
438                debug!("index: failed to index {}: {e}", entry.rel_path);
439            }
440        }
441    }
442
443    // Phase 3: Close batch files to keep LSP memory bounded
444    for entry in batch {
445        if let Err(e) = file_tracker
446            .close(&entry.abs_path, client.transport_mut())
447            .await
448        {
449            debug!("index: failed to close {}: {e}", entry.rel_path);
450        }
451    }
452}
453
454/// Write collected symbols to the index store in a single transaction.
455///
456/// # Errors
457/// Returns an error if upserting files or inserting symbols into the store fails.
458pub fn commit_index(
459    store: &IndexStore,
460    results: &[(String, String, Vec<CachedSymbol>)],
461) -> anyhow::Result<usize> {
462    Ok(store.batch_commit(results)?)
463}
464
465/// Flatten a hierarchical `documentSymbol` response into a flat list.
466#[allow(clippy::cast_possible_truncation)]
467fn flatten_document_symbols(value: &Value, parent: Option<&str>) -> Vec<CachedSymbol> {
468    let Some(items) = value.as_array() else {
469        return Vec::new();
470    };
471
472    let mut result = Vec::new();
473    for item in items {
474        let name = item
475            .get("name")
476            .and_then(Value::as_str)
477            .unwrap_or_default()
478            .to_string();
479
480        let kind =
481            symbol_kind_name(item.get("kind").and_then(Value::as_u64).unwrap_or(0)).to_string();
482
483        let start_line = item
484            .pointer("/range/start/line")
485            .and_then(Value::as_u64)
486            .unwrap_or(0) as u32;
487        let start_col = item
488            .pointer("/range/start/character")
489            .and_then(Value::as_u64)
490            .unwrap_or(0) as u32;
491        let end_line = item
492            .pointer("/range/end/line")
493            .and_then(Value::as_u64)
494            .unwrap_or(0) as u32;
495        let end_col = item
496            .pointer("/range/end/character")
497            .and_then(Value::as_u64)
498            .unwrap_or(0) as u32;
499
500        result.push(CachedSymbol {
501            name: name.clone(),
502            kind,
503            path: String::new(),
504            range_start_line: start_line,
505            range_start_col: start_col,
506            range_end_line: end_line,
507            range_end_col: end_col,
508            parent_name: parent.map(String::from),
509        });
510
511        if let Some(children) = item.get("children") {
512            result.extend(flatten_document_symbols(children, Some(&name)));
513        }
514    }
515    result
516}
517
518/// Walk project source files, respecting .gitignore.
519fn walk_source_files(project_root: &Path, extensions: &[&str]) -> anyhow::Result<Vec<PathBuf>> {
520    let mut builder = ignore::WalkBuilder::new(project_root);
521    builder
522        .hidden(true)
523        .git_ignore(true)
524        .git_global(false)
525        .git_exclude(true);
526
527    let mut files = Vec::new();
528    for entry in builder.build() {
529        let entry = entry?;
530        if !entry.file_type().is_some_and(|ft| ft.is_file()) {
531            continue;
532        }
533        let path = entry.path();
534        if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
535            if extensions.contains(&ext) {
536                files.push(path.to_path_buf());
537            }
538        }
539    }
540
541    files.sort();
542    Ok(files)
543}
544
545#[cfg(test)]
546mod tests {
547    use serde_json::json;
548
549    use super::*;
550
551    #[test]
552    fn flatten_empty_response() {
553        let result = flatten_document_symbols(&json!(null), None);
554        assert!(result.is_empty());
555    }
556
557    #[test]
558    fn flatten_nested_symbols() {
559        let response = json!([
560            {
561                "name": "Config",
562                "kind": 5,
563                "range": {
564                    "start": { "line": 0, "character": 0 },
565                    "end": { "line": 20, "character": 1 }
566                },
567                "children": [
568                    {
569                        "name": "new",
570                        "kind": 6,
571                        "range": {
572                            "start": { "line": 5, "character": 2 },
573                            "end": { "line": 10, "character": 3 }
574                        }
575                    }
576                ]
577            },
578            {
579                "name": "greet",
580                "kind": 12,
581                "range": {
582                    "start": { "line": 22, "character": 0 },
583                    "end": { "line": 25, "character": 1 }
584                }
585            }
586        ]);
587
588        let symbols = flatten_document_symbols(&response, None);
589        assert_eq!(symbols.len(), 3);
590        assert_eq!(symbols[0].name, "Config");
591        assert!(symbols[0].parent_name.is_none());
592        assert_eq!(symbols[1].name, "new");
593        assert_eq!(symbols[1].parent_name, Some("Config".to_string()));
594        assert_eq!(symbols[2].name, "greet");
595        assert!(symbols[2].parent_name.is_none());
596    }
597
598    #[test]
599    fn walk_source_files_filters_by_extension() {
600        let dir = tempfile::tempdir().unwrap();
601        std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
602        std::fs::write(dir.path().join("lib.rs"), "pub fn lib() {}").unwrap();
603        std::fs::write(dir.path().join("notes.txt"), "notes").unwrap();
604        std::fs::write(dir.path().join("data.json"), "{}").unwrap();
605
606        let files = walk_source_files(dir.path(), &["rs"]).unwrap();
607        assert_eq!(files.len(), 2);
608        assert!(files.iter().all(|f| f.extension().unwrap() == "rs"));
609    }
610
611    #[test]
612    fn walk_source_files_respects_gitignore() {
613        let dir = tempfile::tempdir().unwrap();
614
615        std::process::Command::new("git")
616            .args(["init"])
617            .current_dir(dir.path())
618            .output()
619            .unwrap();
620
621        std::fs::write(dir.path().join(".gitignore"), "target/\n").unwrap();
622        std::fs::create_dir_all(dir.path().join("target")).unwrap();
623        std::fs::write(dir.path().join("target/output.rs"), "// generated").unwrap();
624        std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
625
626        let files = walk_source_files(dir.path(), &["rs"]).unwrap();
627        assert_eq!(files.len(), 1);
628        assert!(files[0].ends_with("main.rs"));
629    }
630}