trusty-search 0.27.2

Machine-wide hybrid code search service: BM25 + vector + KG, zero cold-start, MCP server
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
//! Core async reindex runner extracted from `orchestrator.rs`.
//!
//! Why: `orchestrator.rs` exceeded the 500-SLOC production cap (issue #1175 follow-up).
//! This module holds Phase 1 (walk) and Phase 2 (pipelined parse/embed/commit).
//! Post-loop completion (prune, KG rebuild, corpus swap, terminal event) lives
//! in `finish.rs`; the RSS pollers live in `pollers.rs`.
//!
//! What: exports `run_reindex`, a `pub(super)` async function called by
//! `orchestrator::spawn_reindex_with_cleanup`.
//!
//! Test: covered by `reindex_walks_directory_and_emits_events` and the
//! integration tests in `tests.rs`.

use crate::core::memguard::{current_rss_mb, current_rss_mb_for_pid, index_memory_limit_mb};
use crate::core::registry::{IndexHandle, IndexId};
use dashmap::DashMap;
use std::path::PathBuf;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering as AtomicOrdering};
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::mpsc;

use super::batch::{
    commit_parsed_and_finalize, prepare_and_parse_batch, BatchCtx, REINDEX_BATCH_SIZE,
};
use super::corpus_swap::begin_staged_corpus_swap;
use super::finish::{BatchTotals, FileHashes, FinishCtx};
use super::guard::ReindexTerminationGuard;
use super::hash::hashes_for;
use super::hash_cache;
use super::progress::{ReindexProgress, ReindexStatus};
use super::quarantine::ReindexQuarantine;
use super::semaphore::{reindex_semaphore_for, BACKGROUND_QUEUE_DEPTH};
use super::stages::{mark_reindex_failed, now_rfc3339, schedule_progress_cleanup};
use super::staging;
use super::validate;

/// Full three-phase async reindex body, spawned by `spawn_reindex_with_cleanup`.
///
/// Why: extracted from `orchestrator.rs` so each file stays under the 500-SLOC
/// production cap (issue #1175). Holds the semaphore acquisition, Phase 1 (walk
/// + filter), and Phase 2 (pipelined parse/embed/commit).
///
/// What: acquires the correct semaphore, runs Phases 1 and 2, then delegates
/// to `finish::finish_reindex` for KG rebuild, corpus swap, event emission, and GC.
///
/// Test: `reindex_walks_directory_and_emits_events` (primary integration test);
/// `interactive_reindex_not_starved_by_background` covers semaphore prioritisation.
#[allow(clippy::too_many_arguments)]
pub(super) async fn run_reindex(
    handle: Arc<IndexHandle>,
    progress: Arc<ReindexProgress>,
    force: bool,
    cleanup_map: Option<Arc<DashMap<IndexId, Arc<ReindexProgress>>>>,
    aborted_map: Option<Arc<DashMap<IndexId, Instant>>>,
    embedderd_pid_slot: Option<Arc<AtomicU32>>,
    priority: bool,
    quarantine: Option<ReindexQuarantine>,
) {
    use std::sync::atomic::Ordering;

    let cleanup_id = handle.id.clone();

    // Issue #458: route to the correct semaphore based on priority.
    let _permit = reindex_semaphore_for(priority)
        .acquire()
        .await
        .expect("reindex semaphore is never closed");
    // Decrement the background queue counter once the permit is held.
    if !priority {
        BACKGROUND_QUEUE_DEPTH.fetch_sub(1, AtomicOrdering::Relaxed);
    }

    // Arm the termination guard. Any early exit — panic, early return, or
    // `.await` cancellation — fires `ReindexTerminationGuard::drop`, which logs
    // the cause at `error!` (issue #1428: never silent), broadcasts an error
    // event, and marks the status `Failed`. The guard is disarmed just after
    // `emit_complete_event` confirms the normal terminal event has been sent.
    // Stamping the index id makes the stderr line greppable per-index; the
    // failure-reason slot lets us hand a specific cause (e.g. a captured
    // producer-task panic) to `Drop`.
    let term_guard =
        ReindexTerminationGuard::new(Arc::clone(&progress)).with_index_id(handle.id.0.clone());
    let failure_slot = term_guard.failure_reason_slot();

    let started = Instant::now();
    // Issue #602 — portable paths.
    let root = handle.root_path.clone();
    let canonical_root = validate::canonical_walk_root(&root);
    let index_id: IndexId = handle.id.clone();

    // Issue #109, Phase 1: reset the staged-pipeline status surface.
    super::stages::reset_stages_for_reindex(&handle).await;

    // Phase 1: walk + filter the source tree.
    {
        let mut diag = handle.walk_diagnostics.write().await;
        diag.last_walk_started_at = Some(now_rfc3339());
        diag.last_walk_files_seen = 0;
        diag.last_walk_files_skipped = 0;
        diag.last_walk_error = None;
    }
    // Issue #744: stamp the walk end time.
    let walk = super::orchestrator::collect_files_to_index(&handle);
    let walk_ms = started.elapsed().as_millis() as u64;
    let total = walk.files.len();
    {
        let mut diag = handle.walk_diagnostics.write().await;
        diag.last_walk_files_seen = total as u64;
        diag.last_walk_files_skipped = walk.skipped_dirs as u64;
        if total == 0 {
            let reason = if !handle.root_path.exists() {
                format!("root path does not exist: {}", handle.root_path.display())
            } else {
                format!(
                    "walk produced zero files under {}; check gitignore rules, \
                     path_filter, and extension allow-list",
                    handle.root_path.display()
                )
            };
            diag.last_walk_error = Some(reason);
        }
    }
    progress.total_files.store(total, Ordering::Release);

    // Issues #840 / #662: load the persisted hash cache BEFORE emitting
    // the `start` event so `hashes_loaded` is available for the event payload.
    let hashes: FileHashes = hashes_for(&index_id);
    // Issue #602: detect a root move against the corpus's persisted
    // `indexed_root` and, for legacy (non-colocated) indexes, clear the
    // hash cache so every file is re-written relative to the new canonical root.
    //
    // Issue #1073: for colocated indexes the chunk and hash keys are
    // ROOT-RELATIVE (#402), so they are valid at any root location — a pure
    // move changes the root prefix only. Do NOT clear the hash cache on a
    // root move for colocated indexes.
    let prior_indexed_root = handle.read_indexed_root().await.unwrap_or(None);
    let root_moved =
        validate::needs_path_relativization(prior_indexed_root.as_deref(), &canonical_root);
    let is_colocated = crate::service::colocated_storage::has_colocated_storage(&canonical_root);
    let hashes_loaded: usize = if force {
        hashes.clear();
        hash_cache::clear_persisted(&handle).await;
        0
    } else if root_moved && !is_colocated {
        tracing::warn!(
            "reindex[{}]: legacy index root moved from {:?} to {} — clearing hash \
             cache to re-relativize all chunk paths against the new root",
            index_id.0,
            prior_indexed_root,
            canonical_root.display(),
        );
        hashes.clear();
        hash_cache::clear_persisted(&handle).await;
        0
    } else if root_moved {
        // Issue #1073: colocated index moved — keys are root-relative and
        // survive the move.
        tracing::info!(
            "reindex[{}]: colocated index root moved from {:?} to {} — \
             keys are root-relative (#402); preserving hash cache (no re-embed)",
            index_id.0,
            prior_indexed_root,
            canonical_root.display(),
        );
        if let Err(e) = handle.write_indexed_root(&canonical_root).await {
            tracing::warn!(
                "reindex[{}]: failed to update indexed_root after colocated \
                 root move ({e}) — next reindex will re-detect the move",
                index_id.0,
            );
        }
        hash_cache::load_into_cache(&handle, &hashes).await
    } else {
        hash_cache::load_into_cache(&handle, &hashes).await
    };

    // Issue #317: emit `walk_complete` BEFORE `start`.
    progress
        .push(serde_json::json!({
            "event": "walk_complete",
            "total_files": total,
            "index_id": index_id.0,
        }))
        .await;
    // Issue #840 Part 2: `hashes_loaded` shows whether warm-skip is primed.
    // Issue #929: `defer_embed` tells the CLI whether embedding will run
    // in the background.
    let effective_defer_embed = handle.defer_embed && !handle.lexical_only;
    progress
        .push(serde_json::json!({
            "event": "start",
            "total_files": total,
            "index_id": index_id.0,
            "root_path": root,
            "force": force,
            "lexical_only": handle.lexical_only,
            "hashes_loaded": hashes_loaded,
            "defer_embed": effective_defer_embed,
        }))
        .await;

    // Issue #744 — concurrent embedder warm-up.
    if !handle.lexical_only {
        let warm_indexer = Arc::clone(&handle.indexer);
        let warm_index_id = index_id.0.clone();
        let warm_ms = started;
        tokio::spawn(async move {
            tracing::debug!("reindex[{warm_index_id}]: starting concurrent embedder warm-up");
            let t0 = std::time::Instant::now();
            warm_indexer.read().await.warm_embedder().await;
            tracing::info!(
                "reindex[{warm_index_id}]: embedder warm-up complete in {}ms \
                 (started {}ms after reindex began)",
                t0.elapsed().as_millis(),
                warm_ms.elapsed().as_millis(),
            );
        });
    }

    // Issue #28, Phase 4 + #603: stage the rebuilt corpus.
    let corpus_swap_tmp: Option<PathBuf> =
        if staging::should_stage(handle.indexer.read().await.has_corpus_store()) {
            match begin_staged_corpus_swap(&handle, &index_id, force).await {
                Ok(path) => path,
                Err(e) => {
                    tracing::error!(
                        "reindex[{}]: ABORTING incremental reindex — carryover copy \
                         from live corpus failed ({e}); live corpus is intact",
                        index_id.0
                    );
                    mark_reindex_failed(&handle, "carryover copy failed — live corpus intact")
                        .await;
                    progress.status.store(ReindexStatus::Failed);
                    progress
                        .push(serde_json::json!({
                            "event": "error",
                            "index_id": index_id.0,
                            "message": format!(
                                "incremental reindex aborted: failed to copy live corpus \
                                 into staging store ({e}) — live corpus is intact"
                            ),
                            "fatal": true,
                        }))
                        .await;
                    // term_guard drops here → broadcasts error event via Drop.
                    drop(term_guard);
                    schedule_progress_cleanup(cleanup_map, cleanup_id);
                    if let Some(ref q) = quarantine {
                        q.record_failure(&index_id);
                    }
                    return;
                }
            }
        } else {
            None
        };

    // Per-subsystem timing accumulators.
    let mut total_parse_ms: u64 = 0;
    let mut total_embed_ms: u64 = 0;
    let mut total_bm25_ms: u64 = 0;
    let mut total_vector_upsert_ms: u64 = 0;
    let mut total_vector_count: usize = 0;
    let mut total_chunks_dropped_by_cap: usize = 0;

    // Memory-protection state (issues #76, #82).
    let mem_limit = index_memory_limit_mb();
    let mem_abort = Arc::new(AtomicBool::new(false));
    let peak_rss_atomic = Arc::new(AtomicU64::new(current_rss_mb().unwrap_or(0)));
    let mut mem_limit_hit: bool = false;

    // Spawn the background poller.
    let (poller_handle, poller_stop) = super::pollers::spawn_memory_poller(
        mem_limit,
        mem_abort.clone(),
        peak_rss_atomic.clone(),
        index_id.0.clone(),
    );

    // Issue #282: spawn the embedderd RSS poller if available.
    let peak_embedderd_rss_atomic = Arc::new(AtomicU64::new(0));
    let (embedderd_poller_handle, embedderd_poller_stop) =
        if let Some(pid_slot) = embedderd_pid_slot.as_ref() {
            let initial_pid = pid_slot.load(AtomicOrdering::Acquire);
            if let Some(rss) = current_rss_mb_for_pid(initial_pid) {
                peak_embedderd_rss_atomic.store(rss, AtomicOrdering::Release);
            }
            let (h, s) = super::pollers::spawn_embedderd_rss_poller(
                Arc::clone(pid_slot),
                Arc::clone(&peak_embedderd_rss_atomic),
            );
            (Some(h), Some(s))
        } else {
            (None, None)
        };

    // Phase 2: pipelined parse/embed/commit (issue #20).
    let ctx = BatchCtx {
        handle: handle.clone(),
        progress: progress.clone(),
        root: canonical_root.clone(),
        index_id: index_id.clone(),
        hashes: hashes.clone(),
        mem_limit,
        mem_abort: mem_abort.clone(),
        peak_rss_atomic: peak_rss_atomic.clone(),
        started,
        total,
        lexical_only: handle.lexical_only,
        defer_embed: handle.defer_embed && !handle.lexical_only,
        embedder_pid_slot: embedderd_pid_slot.clone(),
    };

    let batches: Vec<Vec<PathBuf>> = walk
        .files
        .chunks(REINDEX_BATCH_SIZE)
        .map(|b| b.to_vec())
        .collect();

    // Bounded channel — capacity 1 keeps memory in the same envelope as
    // the prior sequential loop (one batch in transit, one being committed).
    let (tx, mut rx) = mpsc::channel::<super::batch::ParsedReadyBatch>(1);
    let producer_ctx = ctx.clone();
    let producer_mem_abort = mem_abort.clone();
    let producer_index_id = index_id.0.clone();
    let total_batches = batches.len();
    let producer = tokio::spawn(async move {
        for (batch_idx, batch) in batches.into_iter().enumerate() {
            if producer_mem_abort.load(AtomicOrdering::Acquire) {
                let rss = current_rss_mb().unwrap_or(0);
                tracing::warn!(
                    "reindex: memory limit hit before batch (rss={}MB, \
                     limit={:?}MB) — producer halting for index {}",
                    rss,
                    producer_ctx.mem_limit,
                    producer_index_id
                );
                break;
            }
            // RUST_LOG=debug visibility into batch flush cadence — pinpoints the
            // exact batch index where a deterministic mid-run halt occurs
            // (issue #1428). Cheap: a single debug-gated log per batch.
            tracing::debug!(
                index_id = %producer_index_id,
                batch_idx,
                total_batches,
                batch_files = batch.len(),
                "reindex: producer preparing batch"
            );
            let Some(ready) = prepare_and_parse_batch(&producer_ctx, &batch).await else {
                continue;
            };
            if tx.send(ready).await.is_err() {
                break;
            }
        }
    });

    // Consumer loop: commits batches sequentially.
    while let Some(ready) = rx.recv().await {
        tracing::debug!(
            index_id = %index_id.0,
            indexed = progress.indexed_count(),
            "reindex: consumer committing batch"
        );
        let outcome = commit_parsed_and_finalize(&ctx, ready).await;
        total_parse_ms = total_parse_ms.saturating_add(outcome.parse_ms);
        total_embed_ms = total_embed_ms.saturating_add(outcome.embed_ms);
        total_bm25_ms = total_bm25_ms.saturating_add(outcome.bm25_ms);
        total_vector_upsert_ms = total_vector_upsert_ms.saturating_add(outcome.vector_upsert_ms);
        total_vector_count = total_vector_count.saturating_add(outcome.vector_count);
        total_chunks_dropped_by_cap =
            total_chunks_dropped_by_cap.saturating_add(outcome.chunks_dropped_by_cap);
        if outcome.chunks_dropped_by_cap > 0 {
            progress
                .chunks_dropped_by_cap
                .fetch_add(outcome.chunks_dropped_by_cap, Ordering::Release);
        }
        if outcome.mem_limit_hit {
            mem_limit_hit = true;
            rx.close();
            while rx.recv().await.is_some() {}
            break;
        }
    }
    // Issue #1428: the producer's `JoinHandle` result was previously discarded
    // (`let _ = producer.await`), so a PANIC inside the parse/embed producer
    // task (e.g. an `unwrap` deep in the embed path, or an allocation failure
    // under GPU/memory pressure) unwound silently — the consumer loop just saw
    // the channel close and fell through to a "successful" finish. Capture the
    // `JoinError` here: log it at `error!` (stderr) and record it in the guard's
    // failure-reason slot so that if the run ends up failing, the operator sees
    // the real cause rather than the generic "exited unexpectedly" message.
    //
    // Limitation: this handles a panic/cancellation *inside* the spawned producer
    // task (surfaced as a `JoinError`). It does NOT cover `producer.await` itself
    // panicking (e.g. the tokio runtime shutting down out from under the await):
    // in that case the failure_slot is never written and the termination guard's
    // `Drop` falls back to its generic "exited unexpectedly" message. That path
    // is still non-silent (the guard logs at `error!`), just less specific.
    if let Err(join_err) = producer.await {
        if join_err.is_panic() {
            // Index id lives in the `reindex[{}]:` message prefix only (no
            // duplicate structured `index_id` field) to avoid double emission
            // in JSON log backends; `reindex[...]: ... PANICKED` greps still
            // match (issue #1428 review follow-up).
            tracing::error!(
                "reindex[{}]: parse/embed producer task PANICKED — the reindex \
                 is incomplete; this usually indicates an embedder fault (e.g. \
                 GPU OOM / sidecar stall). JoinError: {join_err}",
                index_id.0,
            );
            ReindexTerminationGuard::set_failure_reason(
                &failure_slot,
                format!(
                    "parse/embed producer task panicked ({join_err}) — reindex \
                     incomplete; check the daemon log for the panic backtrace \
                     and the embedder (GPU OOM / sidecar stall is the common cause)"
                ),
            );
        } else {
            // Cancellation (e.g. runtime shutdown) — still non-silent. Index id
            // is in the `reindex[{}]:` message prefix only (no duplicate
            // structured field) per the #1428 review follow-up.
            tracing::error!(
                "reindex[{}]: parse/embed producer task was cancelled — reindex \
                 incomplete. JoinError: {join_err}",
                index_id.0,
            );
            ReindexTerminationGuard::set_failure_reason(
                &failure_slot,
                format!("parse/embed producer task cancelled ({join_err}) — reindex incomplete"),
            );
        }
    }

    // Delegate post-loop work: prune, KG rebuild, corpus swap, terminal event, GC.
    let finish_ctx = FinishCtx {
        handle,
        progress,
        index_id,
        canonical_root,
        walked_files: walk.files,
        hashes,
        total,
        started,
        defer_embed: ctx.defer_embed,
        corpus_swap_tmp,
        mem_abort,
        peak_rss_atomic,
        peak_embedderd_rss_atomic,
        embedderd_pid_slot,
        poller_handle,
        poller_stop,
        embedderd_poller_handle,
        embedderd_poller_stop,
        term_guard,
        cleanup_map,
        cleanup_id,
        aborted_map,
        quarantine,
        mem_limit,
        force,
    };
    let batch_totals = BatchTotals {
        walk_ms,
        parse_ms: total_parse_ms,
        embed_ms: total_embed_ms,
        bm25_ms: total_bm25_ms,
        vector_upsert_ms: total_vector_upsert_ms,
        vector_count: total_vector_count,
        chunks_dropped_by_cap: total_chunks_dropped_by_cap,
        mem_limit_hit,
    };
    super::finish::finish_reindex(finish_ctx, batch_totals).await;
}