Skip to main content

krait/index/
builder.rs

1use std::path::{Path, PathBuf};
2use std::sync::Arc;
3use std::thread;
4
5use anyhow::Context;
6use serde_json::{json, Value};
7use tracing::{debug, info, warn};
8
9use super::hasher::hash_files_parallel;
10use super::store::{CachedSymbol, IndexStore};
11use crate::commands::find::symbol_kind_name;
12use crate::detect::Language;
13use crate::lsp::client::{path_to_uri, LspClient};
14use crate::lsp::files::FileTracker;
15use crate::lsp::install;
16
17/// Statistics from an index build.
18#[derive(Debug, Default)]
19pub struct IndexStats {
20    pub files_total: usize,
21    pub files_indexed: usize,
22    pub files_cached: usize,
23    pub symbols_total: usize,
24}
25
26/// A file entry to be indexed: absolute path, relative path, and BLAKE3 hash.
27pub struct FileEntry {
28    pub abs_path: PathBuf,
29    pub rel_path: String,
30    pub hash: String,
31}
32
33/// Determine which files need (re)indexing by comparing BLAKE3 hashes.
34///
35/// Returns `(files_to_index, cached_count)`.
36///
37/// # Errors
38/// Returns an error if walking the source tree fails.
39pub fn plan_index(
40    store: &IndexStore,
41    project_root: &Path,
42    extensions: &[&str],
43) -> anyhow::Result<(Vec<FileEntry>, usize)> {
44    let source_files = walk_source_files(project_root, extensions)?;
45    info!("index: found {} source files", source_files.len());
46
47    // Hash all files in parallel (rayon)
48    let hashes = hash_files_parallel(&source_files);
49
50    // Build rel_path → abs_path + hash map
51    let path_hashes: Vec<(String, PathBuf, String)> = hashes
52        .into_iter()
53        .map(|(abs_path, hash)| {
54            let rel_path = abs_path
55                .strip_prefix(project_root)
56                .unwrap_or(&abs_path)
57                .to_string_lossy()
58                .to_string();
59            (rel_path, abs_path, hash)
60        })
61        .collect();
62
63    // Batch SELECT all stored hashes in one query
64    let rel_paths: Vec<&str> = path_hashes.iter().map(|(r, _, _)| r.as_str()).collect();
65    let stored = store.get_file_hashes_batch(&rel_paths).unwrap_or_default();
66
67    let mut to_index = Vec::new();
68    let mut cached = 0usize;
69
70    for (rel_path, abs_path, hash) in path_hashes {
71        if stored.get(&rel_path).is_some_and(|h| *h == hash) {
72            cached += 1;
73        } else {
74            to_index.push(FileEntry {
75                abs_path,
76                rel_path,
77                hash,
78            });
79        }
80    }
81
82    Ok((to_index, cached))
83}
84
85/// Detect optimal batch size for pipelined LSP requests based on system resources.
86#[must_use]
87pub fn detect_batch_size() -> usize {
88    let cpus = thread::available_parallelism()
89        .map(std::num::NonZero::get)
90        .unwrap_or(4);
91    // I/O-bound work: use 4x CPU cores, clamped to [8, 64]
92    (cpus * 4).clamp(8, 64)
93}
94
95/// Detect the number of parallel init workers based on system resources.
96#[must_use]
97pub fn detect_worker_count() -> usize {
98    let cpus = thread::available_parallelism()
99        .map(std::num::NonZero::get)
100        .unwrap_or(4);
101    // Each LSP process is single-threaded, so more workers = more parallelism.
102    // Aggressive: use 2/3 of available cores, cap at 10.
103    let workers = (cpus * 2) / 3;
104    workers.clamp(1, 10)
105}
106
107/// Index files using N parallel LSP workers for a single language.
108///
109/// Spawns temporary LSP server processes, splits files across them,
110/// indexes in parallel, then shuts them all down. Works for all languages.
111///
112/// # Errors
113/// Returns an error if no workers can be started.
114pub async fn collect_symbols_parallel(
115    files: Vec<FileEntry>,
116    lang: Language,
117    workspace_root: &Path,
118    num_workers: usize,
119) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
120    if files.is_empty() {
121        return Ok(Vec::new());
122    }
123
124    let num_workers = num_workers.min(files.len()).max(1);
125    let batch_size = detect_batch_size();
126
127    if num_workers <= 1 {
128        return collect_with_single_worker(files, lang, workspace_root, batch_size).await;
129    }
130
131    info!(
132        "init: spawning {num_workers} parallel workers for {lang} ({} files)",
133        files.len()
134    );
135
136    // Boot N temporary LSP servers in parallel
137    let boot_start = std::time::Instant::now();
138    let (binary_path, entry) = install::ensure_server(lang).await?;
139    let mut boot_handles = Vec::new();
140    for i in 0..num_workers {
141        let bp = binary_path.clone();
142        let args: Vec<String> = entry.args.iter().map(|s| (*s).to_string()).collect();
143        let wr = workspace_root.to_path_buf();
144        boot_handles.push(tokio::spawn(async move {
145            let args_refs: Vec<&str> = args.iter().map(String::as_str).collect();
146            (i, boot_temp_worker(&bp, &args_refs, lang, &wr).await)
147        }));
148    }
149
150    let mut workers = Vec::new();
151    for handle in boot_handles {
152        if let Ok((i, result)) = handle.await {
153            match result {
154                Ok((client, tracker)) => workers.push((client, tracker)),
155                Err(e) => warn!("init: worker {i} failed to start: {e}"),
156            }
157        }
158    }
159
160    if workers.is_empty() {
161        anyhow::bail!("no init workers could be started for {lang}");
162    }
163
164    let actual_workers = workers.len();
165    info!(
166        "init: {actual_workers} workers booted in {:?}",
167        boot_start.elapsed()
168    );
169
170    // Split files round-robin across workers
171    let files = Arc::new(files);
172    let mut handles = Vec::new();
173
174    for (worker_idx, (mut client, mut tracker)) in workers.into_iter().enumerate() {
175        let files_ref = Arc::clone(&files);
176        let worker_indices: Vec<usize> = (worker_idx..files_ref.len())
177            .step_by(actual_workers)
178            .collect();
179
180        handles.push(tokio::spawn(async move {
181            let worker_files: Vec<&FileEntry> =
182                worker_indices.iter().map(|&i| &files_ref[i]).collect();
183            info!(
184                "init: worker {worker_idx} processing {} files",
185                worker_files.len()
186            );
187            let results =
188                collect_symbols(&worker_files, &mut client, &mut tracker, batch_size).await;
189
190            // Shut down temp worker
191            let _ = tracker.close_all(client.transport_mut()).await;
192            if let Err(e) = client.shutdown().await {
193                debug!("init: worker {worker_idx} shutdown error: {e}");
194            }
195
196            results
197        }));
198    }
199
200    // Collect all results
201    let mut all_results = Vec::new();
202    for handle in handles {
203        match handle.await {
204            Ok(results) => all_results.extend(results),
205            Err(e) => debug!("init: worker task panicked: {e}"),
206        }
207    }
208
209    Ok(all_results)
210}
211
212/// Fallback: single worker (same as before).
213async fn collect_with_single_worker(
214    files: Vec<FileEntry>,
215    lang: Language,
216    workspace_root: &Path,
217    batch_size: usize,
218) -> anyhow::Result<Vec<(String, String, Vec<CachedSymbol>)>> {
219    let (binary_path, entry) = install::ensure_server(lang).await?;
220    let (mut client, mut tracker) =
221        boot_temp_worker(&binary_path, entry.args, lang, workspace_root).await?;
222
223    let file_refs: Vec<&FileEntry> = files.iter().collect();
224    let results = collect_symbols(&file_refs, &mut client, &mut tracker, batch_size).await;
225
226    let _ = tracker.close_all(client.transport_mut()).await;
227    let _ = client.shutdown().await;
228
229    Ok(results)
230}
231
232/// Boot a temporary LSP server for indexing (not part of the pool).
233async fn boot_temp_worker(
234    binary_path: &Path,
235    args: &[&str],
236    lang: Language,
237    workspace_root: &Path,
238) -> anyhow::Result<(LspClient, FileTracker)> {
239    let mut client = LspClient::start_with_binary(binary_path, args, lang, workspace_root)
240        .map_err(|e| anyhow::anyhow!("{e}"))?;
241    client
242        .initialize(workspace_root)
243        .await
244        .context("LSP initialize failed")?;
245    let tracker = FileTracker::new(lang);
246    Ok((client, tracker))
247}
248
249/// Pre-read file content from disk (for parallel pre-fetching).
250struct PreReadFile<'a> {
251    entry: &'a FileEntry,
252    content: String,
253    uri: String,
254}
255
256/// Pre-read a batch of files from disk using a blocking thread pool.
257///
258/// Returns files with their content already loaded, ready for `didOpen`.
259async fn prefetch_files<'a>(files: &[&'a FileEntry]) -> Vec<PreReadFile<'a>> {
260    let paths: Vec<(usize, PathBuf)> = files
261        .iter()
262        .enumerate()
263        .map(|(i, e)| (i, e.abs_path.clone()))
264        .collect();
265
266    let read_results = tokio::task::spawn_blocking(move || {
267        paths
268            .into_iter()
269            .filter_map(|(i, path)| {
270                let canonical = std::fs::canonicalize(&path).ok()?;
271                let content = std::fs::read_to_string(&canonical).ok()?;
272                let uri = path_to_uri(&canonical).ok()?.to_string();
273                Some((i, content, uri))
274            })
275            .collect::<Vec<_>>()
276    })
277    .await
278    .unwrap_or_default();
279
280    read_results
281        .into_iter()
282        .map(|(i, content, uri)| PreReadFile {
283            entry: files[i],
284            content,
285            uri,
286        })
287        .collect()
288}
289
290/// Query LSP for document symbols using pipelined batches with parallel disk I/O.
291///
292/// For each batch: pre-reads files from disk in parallel, opens them in the LSP,
293/// fires all `documentSymbol` requests, collects all responses, then closes files.
294/// The next batch's disk I/O overlaps with the current batch's LSP processing.
295///
296/// Returns `(rel_path, hash, symbols)` for each successfully indexed file.
297/// This function is `Send` — it does NOT touch `IndexStore`.
298pub async fn collect_symbols(
299    files: &[&FileEntry],
300    client: &mut LspClient,
301    file_tracker: &mut FileTracker,
302    batch_size: usize,
303) -> Vec<(String, String, Vec<CachedSymbol>)> {
304    let mut results = Vec::new();
305    let total = files.len();
306    let chunks: Vec<&[&FileEntry]> = files.chunks(batch_size).collect();
307
308    // Pre-read first batch from disk
309    let mut prefetched = if chunks.is_empty() {
310        Vec::new()
311    } else {
312        prefetch_files(chunks[0]).await
313    };
314
315    for (batch_idx, batch) in chunks.iter().enumerate() {
316        let batch_start = batch_idx * batch_size;
317        debug!(
318            "index: batch {}-{}/{total} ({} files)",
319            batch_start + 1,
320            (batch_start + batch.len()).min(total),
321            batch.len()
322        );
323
324        // Kick off prefetch for NEXT batch while we process this one
325        let next_prefetch = if batch_idx + 1 < chunks.len() {
326            let next_batch = chunks[batch_idx + 1];
327            let paths: Vec<(usize, PathBuf)> = next_batch
328                .iter()
329                .enumerate()
330                .map(|(i, e)| (i, e.abs_path.clone()))
331                .collect();
332            Some(tokio::task::spawn_blocking(move || {
333                paths
334                    .into_iter()
335                    .filter_map(|(i, path)| {
336                        let canonical = std::fs::canonicalize(&path).ok()?;
337                        let content = std::fs::read_to_string(&canonical).ok()?;
338                        let uri = path_to_uri(&canonical).ok()?.to_string();
339                        Some((i, content, uri))
340                    })
341                    .collect::<Vec<_>>()
342            }))
343        } else {
344            None
345        };
346
347        // Process batch: open → query → collect → close
348        process_batch(&prefetched, batch, client, file_tracker, &mut results).await;
349
350        // Collect next batch's prefetch results
351        if let Some(handle) = next_prefetch {
352            let next_batch = chunks[batch_idx + 1];
353            prefetched = handle
354                .await
355                .unwrap_or_default()
356                .into_iter()
357                .map(|(i, content, uri)| PreReadFile {
358                    entry: next_batch[i],
359                    content,
360                    uri,
361                })
362                .collect();
363        }
364    }
365
366    info!(
367        "index: collected symbols from {}/{total} files (batch_size={batch_size})",
368        results.len()
369    );
370    results
371}
372
373/// Process one batch: open pre-read files, send requests, collect responses, close files.
374async fn process_batch(
375    prefetched: &[PreReadFile<'_>],
376    batch: &[&FileEntry],
377    client: &mut LspClient,
378    file_tracker: &mut FileTracker,
379    results: &mut Vec<(String, String, Vec<CachedSymbol>)>,
380) {
381    // Phase 1: Open pre-read files and send all documentSymbol requests
382    let mut pending: Vec<(&FileEntry, i64)> = Vec::new();
383    for file in prefetched {
384        if let Err(e) = file_tracker
385            .open_with_content(
386                &file.entry.abs_path,
387                &file.uri,
388                &file.content,
389                client.transport_mut(),
390            )
391            .await
392        {
393            debug!("index: failed to open {}: {e}", file.entry.rel_path);
394            continue;
395        }
396
397        let params = json!({ "textDocument": { "uri": file.uri } });
398        match client
399            .transport_mut()
400            .send_request("textDocument/documentSymbol", params)
401            .await
402        {
403            Ok(id) => pending.push((file.entry, id)),
404            Err(e) => debug!(
405                "index: failed to send request for {}: {e}",
406                file.entry.rel_path
407            ),
408        }
409    }
410
411    // Phase 2: Collect all responses
412    for (entry, request_id) in &pending {
413        match client.wait_for_response_public(*request_id).await {
414            Ok(response) => {
415                let symbols = flatten_document_symbols(&response, None);
416                results.push((entry.rel_path.clone(), entry.hash.clone(), symbols));
417            }
418            Err(e) => {
419                debug!("index: failed to index {}: {e}", entry.rel_path);
420            }
421        }
422    }
423
424    // Phase 3: Close batch files to keep LSP memory bounded
425    for entry in batch {
426        if let Err(e) = file_tracker
427            .close(&entry.abs_path, client.transport_mut())
428            .await
429        {
430            debug!("index: failed to close {}: {e}", entry.rel_path);
431        }
432    }
433}
434
435/// Write collected symbols to the index store in a single transaction.
436///
437/// # Errors
438/// Returns an error if upserting files or inserting symbols into the store fails.
439pub fn commit_index(
440    store: &IndexStore,
441    results: &[(String, String, Vec<CachedSymbol>)],
442) -> anyhow::Result<usize> {
443    Ok(store.batch_commit(results)?)
444}
445
446/// Flatten a hierarchical `documentSymbol` response into a flat list.
447#[allow(clippy::cast_possible_truncation)]
448fn flatten_document_symbols(value: &Value, parent: Option<&str>) -> Vec<CachedSymbol> {
449    let Some(items) = value.as_array() else {
450        return Vec::new();
451    };
452
453    let mut result = Vec::new();
454    for item in items {
455        let name = item
456            .get("name")
457            .and_then(Value::as_str)
458            .unwrap_or_default()
459            .to_string();
460
461        let kind =
462            symbol_kind_name(item.get("kind").and_then(Value::as_u64).unwrap_or(0)).to_string();
463
464        let start_line = item
465            .pointer("/range/start/line")
466            .and_then(Value::as_u64)
467            .unwrap_or(0) as u32;
468        let start_col = item
469            .pointer("/range/start/character")
470            .and_then(Value::as_u64)
471            .unwrap_or(0) as u32;
472        let end_line = item
473            .pointer("/range/end/line")
474            .and_then(Value::as_u64)
475            .unwrap_or(0) as u32;
476        let end_col = item
477            .pointer("/range/end/character")
478            .and_then(Value::as_u64)
479            .unwrap_or(0) as u32;
480
481        result.push(CachedSymbol {
482            name: name.clone(),
483            kind,
484            path: String::new(),
485            range_start_line: start_line,
486            range_start_col: start_col,
487            range_end_line: end_line,
488            range_end_col: end_col,
489            parent_name: parent.map(String::from),
490        });
491
492        if let Some(children) = item.get("children") {
493            result.extend(flatten_document_symbols(children, Some(&name)));
494        }
495    }
496    result
497}
498
499/// Walk project source files, respecting .gitignore.
500fn walk_source_files(project_root: &Path, extensions: &[&str]) -> anyhow::Result<Vec<PathBuf>> {
501    let mut builder = ignore::WalkBuilder::new(project_root);
502    builder
503        .hidden(true)
504        .git_ignore(true)
505        .git_global(false)
506        .git_exclude(true);
507
508    let mut files = Vec::new();
509    for entry in builder.build() {
510        let entry = entry?;
511        if !entry.file_type().is_some_and(|ft| ft.is_file()) {
512            continue;
513        }
514        let path = entry.path();
515        if let Some(ext) = path.extension().and_then(|e| e.to_str()) {
516            if extensions.contains(&ext) {
517                files.push(path.to_path_buf());
518            }
519        }
520    }
521
522    files.sort();
523    Ok(files)
524}
525
526#[cfg(test)]
527mod tests {
528    use serde_json::json;
529
530    use super::*;
531
532    #[test]
533    fn flatten_empty_response() {
534        let result = flatten_document_symbols(&json!(null), None);
535        assert!(result.is_empty());
536    }
537
538    #[test]
539    fn flatten_nested_symbols() {
540        let response = json!([
541            {
542                "name": "Config",
543                "kind": 5,
544                "range": {
545                    "start": { "line": 0, "character": 0 },
546                    "end": { "line": 20, "character": 1 }
547                },
548                "children": [
549                    {
550                        "name": "new",
551                        "kind": 6,
552                        "range": {
553                            "start": { "line": 5, "character": 2 },
554                            "end": { "line": 10, "character": 3 }
555                        }
556                    }
557                ]
558            },
559            {
560                "name": "greet",
561                "kind": 12,
562                "range": {
563                    "start": { "line": 22, "character": 0 },
564                    "end": { "line": 25, "character": 1 }
565                }
566            }
567        ]);
568
569        let symbols = flatten_document_symbols(&response, None);
570        assert_eq!(symbols.len(), 3);
571        assert_eq!(symbols[0].name, "Config");
572        assert!(symbols[0].parent_name.is_none());
573        assert_eq!(symbols[1].name, "new");
574        assert_eq!(symbols[1].parent_name, Some("Config".to_string()));
575        assert_eq!(symbols[2].name, "greet");
576        assert!(symbols[2].parent_name.is_none());
577    }
578
579    #[test]
580    fn walk_source_files_filters_by_extension() {
581        let dir = tempfile::tempdir().unwrap();
582        std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
583        std::fs::write(dir.path().join("lib.rs"), "pub fn lib() {}").unwrap();
584        std::fs::write(dir.path().join("notes.txt"), "notes").unwrap();
585        std::fs::write(dir.path().join("data.json"), "{}").unwrap();
586
587        let files = walk_source_files(dir.path(), &["rs"]).unwrap();
588        assert_eq!(files.len(), 2);
589        assert!(files.iter().all(|f| f.extension().unwrap() == "rs"));
590    }
591
592    #[test]
593    fn walk_source_files_respects_gitignore() {
594        let dir = tempfile::tempdir().unwrap();
595
596        std::process::Command::new("git")
597            .args(["init"])
598            .current_dir(dir.path())
599            .output()
600            .unwrap();
601
602        std::fs::write(dir.path().join(".gitignore"), "target/\n").unwrap();
603        std::fs::create_dir_all(dir.path().join("target")).unwrap();
604        std::fs::write(dir.path().join("target/output.rs"), "// generated").unwrap();
605        std::fs::write(dir.path().join("main.rs"), "fn main() {}").unwrap();
606
607        let files = walk_source_files(dir.path(), &["rs"]).unwrap();
608        assert_eq!(files.len(), 1);
609        assert!(files[0].ends_with("main.rs"));
610    }
611}