Skip to main content

sqry_daemon/
rebuild.rs

1//! Rebuild dispatcher for the sqryd daemon (Task 7 Phase 7a + 7b1).
2//!
3//! The [`RebuildDispatcher`] owns the per-workspace call path that
4//! maps a debounced [`ChangeSet`] from the [`sqry_core::watch`] layer
5//! through to either a full (`build_unified_graph`) or incremental
6//! (`incremental_rebuild`) rebuild, and publishes the resulting
7//! [`CodeGraph`] via
8//! [`WorkspaceManager::publish_and_retain`](crate::workspace::WorkspaceManager::publish_and_retain).
9//!
10//! Phase 7a shipped the synchronous skeleton (coalesce-then-execute
11//! single-caller driver). Phase 7b1 adds the A2 §J.2 runner-role gate
12//! so concurrent callers serialise cleanly:
13//!
14//! - [`PendingRebuild::coalesce_with`] — A2 §J.2 lane-merge algebra
15//!   (union of files, OR of `git_state_changed`, max of `enqueued_at`,
16//!   full-rebuild-dominance merge on `git_change_class`).
17//! - [`RebuildDispatcher::handle_changes`] — Phase A (acquire-or-park
18//!   runner role via [`LoadedWorkspace::rebuild_in_flight`] CAS under
19//!   the [`LoadedWorkspace::rebuild_lane`] mutex) + Phase B (drain
20//!   loop: eviction gate → pipeline → re-lock-and-drain →
21//!   loop-or-exit).
22//! - [`RebuildDispatcher::execute_one_rebuild`] — one iteration of
23//!   the pipeline: decide / estimate / reserve / execute
24//!   (`spawn_blocking`) / publish / record success|failure.
25//! - [`DrainLoopSentinel`] — panic-safety recovery for
26//!   `rebuild_in_flight`; the sole out-of-lane transition exception.
27//! - Hybrid decision: `git_change_class.requires_full_rebuild()` OR
28//!   `changed_files.len() > incremental_threshold` OR
29//!   `closure.len() > file_count * closure_limit_percent / 100` → Full;
30//!   else Incremental.
31//! - Working-set estimate via
32//!   [`crate::workspace::working_set_estimate`] populated with
33//!   [`crate::config::ESTIMATE_STAGING_PER_FILE_BYTES`] +
34//!   [`crate::config::ESTIMATE_FINAL_PER_FILE_BYTES`] heuristic consts.
35//!
36//! Phase 7b2 (out of scope here) adds the per-workspace tokio watcher
37//! event loop, the editor-pattern and git-scenario dispatcher-count
38//! matrix, and the §J.2 serialization stress. Phase 7c (also out of
39//! scope) wires the [`CancellationToken`] from
40//! [`LoadedWorkspace::rebuild_cancelled`] through
41//! [`incremental_rebuild`] at pass boundaries.
42//!
43//! # §J.2 runner-role invariant (Phase 7b1)
44//!
45//! At most one [`execute_one_rebuild`](RebuildDispatcher::execute_one_rebuild)
46//! executes at a time per workspace, and at most one additional
47//! [`PendingRebuild`] is parked in the lane awaiting the runner. A
48//! caller arriving while the runner is active coalesces its incoming
49//! `ChangeSet` into the lane (A2 §J.2 merge rules) and returns
50//! `Ok(())` without running the pipeline — the active runner will
51//! drain the lane at its next drain-loop iteration.
52//!
53//! All normal-path transitions of [`LoadedWorkspace::rebuild_in_flight`]
54//! happen while [`LoadedWorkspace::rebuild_lane`] is held.
55//! [`DrainLoopSentinel::drop`] is the sole recovery exception.
56//!
57//! # Eviction cooperation
58//!
59//! Every drain-loop iteration (including the first) checks
60//! `ws.rebuild_cancelled` at the top of the loop. If set, the runner
61//! abandons any parked pending, releases `rebuild_in_flight` under the
62//! lane, and returns [`DaemonError::WorkspaceEvicted`]. The same
63//! [`DaemonError::WorkspaceEvicted`] is surfaced by
64//! [`WorkspaceManager::reserve_rebuild`]'s Phase-1 membership +
65//! cancellation check — so a gate-check → `reserve_rebuild` race that
66//! eviction wins cannot publish into an orphaned workspace.
67//!
68//! # Lock order (§J.4)
69//!
70//! [`RebuildDispatcher`] is the sole acquirer of
71//! [`LoadedWorkspace::rebuild_lane`](crate::workspace::LoadedWorkspace::rebuild_lane).
72//! The canonical call path honours the A2 §J.4 total order:
73//!
74//! ```text
75//!   workspaces (manager.lookup)  →  rebuild_lane  →  admission (reserve_rebuild)
76//! ```
77//!
78//! Rules enforced by this module:
79//! - `manager.lookup` acquires `workspaces.read()` as a *precondition*
80//!   and drops the guard before touching `rebuild_lane`.
81//! - `rebuild_lane` is held **only** to coalesce/take `PendingRebuild`
82//!   and to mutate `rebuild_in_flight`. The guard is dropped before
83//!   [`WorkspaceManager::reserve_rebuild`] so §G.1's phase-1
84//!   `workspaces.read()` does not nest under `rebuild_lane`.
85//! - `admission` is strictly innermost and is held only inside
86//!   [`WorkspaceManager::reserve_rebuild`] / `publish_and_retain` /
87//!   retention-reaper paths — never reacquired by the dispatcher.
88
89use std::{
90    collections::HashMap,
91    path::PathBuf,
92    sync::{
93        Arc, OnceLock,
94        atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
95    },
96    time::{Duration, Instant, SystemTime},
97};
98
99use sqry_core::graph::{
100    CodeGraph, GraphBuilderError,
101    unified::{
102        build::{
103            BuildConfig, CancellationToken, build_unified_graph_cancellable,
104            compute_reverse_dep_closure, incremental_rebuild,
105        },
106        memory::GraphMemorySize,
107    },
108};
109use sqry_core::plugin::PluginManager;
110use sqry_core::watch::{ChangeSet, GitChangeClass, LastIndexedGitState, SourceTreeWatcher};
111use tokio::task::JoinHandle;
112
113use crate::{
114    config::{DaemonConfig, ESTIMATE_FINAL_PER_FILE_BYTES, ESTIMATE_STAGING_PER_FILE_BYTES},
115    error::DaemonError,
116    workspace::{
117        LoadedWorkspace, PendingRebuild, WorkingSetInputs, WorkspaceKey, WorkspaceManager,
118        WorkspaceState, clone_err, working_set_estimate,
119    },
120};
121
122// ---------------------------------------------------------------------------
123// RebuildMode
124// ---------------------------------------------------------------------------
125
126/// Outcome of the hybrid decision function: is this rebuild run via
127/// [`build_unified_graph`] (Full) or [`incremental_rebuild`]
128/// (Incremental)?
129///
130/// Encoded as `u8` for the [`RebuildDispatcher::last_mode`] atomic
131/// observability surface. The encoding is stable across the
132/// dispatcher's lifetime but is not part of any on-wire contract —
133/// Task 8's IPC layer surfaces the mode only through structured
134/// tracing, not a raw byte.
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum RebuildMode {
137    /// Run `build_unified_graph` from scratch.
138    Full,
139    /// Run `incremental_rebuild` over the reverse-dep closure.
140    Incremental,
141}
142
143impl RebuildMode {
144    /// Encode for the [`AtomicU8`] slot: 0=None, 1=Full, 2=Incremental.
145    const fn as_u8(self) -> u8 {
146        match self {
147            Self::Full => 1,
148            Self::Incremental => 2,
149        }
150    }
151
152    /// Decode from the atomic slot. Returns `None` for `0` (never set)
153    /// or any unexpected discriminant (not observable through the
154    /// `store_last_mode` path but round-tripped defensively).
155    const fn from_u8(raw: u8) -> Option<Self> {
156        match raw {
157            1 => Some(Self::Full),
158            2 => Some(Self::Incremental),
159            _ => None,
160        }
161    }
162}
163
164// ---------------------------------------------------------------------------
165// PendingRebuild::coalesce_with (lane-merge algebra, A2 §J.2)
166// ---------------------------------------------------------------------------
167
168impl PendingRebuild {
169    /// Coalesce two queued rebuilds per A2 §J.2.
170    ///
171    /// Merge rules:
172    /// 1. **File union.** Deduplicated set of `changed_files` from
173    ///    both sides, returned in lexicographic order so the merged
174    ///    vector is deterministic across runs (important for
175    ///    downstream decision-fork determinism and test assertions).
176    /// 2. **OR of `git_state_changed`.** If either side observed a
177    ///    `.git/` event, the merged entry records a change.
178    /// 3. **Full-rebuild-dominance merge on `git_change_class`.**
179    ///    If either side has a class with `requires_full_rebuild() ==
180    ///    true` (currently `BranchSwitch` or `TreeDiverged`), the
181    ///    merged class is canonically `Some(TreeDiverged)` — any
182    ///    downstream dispatcher decision only checks
183    ///    `requires_full_rebuild()`, so the specific discriminant
184    ///    beyond the canonical "full trigger observed" marker is
185    ///    unused. If neither side is a full trigger, the later-side
186    ///    class wins (most-recent observation); `None` is absorbed
187    ///    from either side when the other is `Some`.
188    /// 4. **`enqueued_at = max(self, later)`.** Later wins so the
189    ///    lane reflects the most-recent activity for staleness /
190    ///    tracing purposes.
191    /// 5. **`git_state_at_enqueue` — absorb-None, later wins
192    ///    (Task 7 Phase 7b2).** When both sides carry a snapshot,
193    ///    the newer observation wins (the merged coalesced pending
194    ///    reflects the freshest valid baseline for the runner's
195    ///    publish commit). When only one side carries a snapshot,
196    ///    that one is preserved. Both `None` → `None`.
197    ///
198    /// `self` is treated as the earlier enqueue; `later` is the
199    /// newly-arrived enqueue the dispatcher is merging in.
200    ///
201    /// # Determinism
202    ///
203    /// The merged `changed_files` vector is sorted. `coalesce_with`
204    /// is commutative under the `requires_full_rebuild()` predicate
205    /// (full-rebuild dominance is symmetric). It is **not**
206    /// commutative under the raw `git_change_class` discriminant
207    /// when neither side is a full trigger — `Some(LocalCommit) ⊕
208    /// Some(Noise) = Some(Noise)` but `Some(Noise) ⊕ Some(LocalCommit)
209    /// = Some(LocalCommit)`. Nor is `git_state_at_enqueue` commutative
210    /// in general (later wins when both sides are `Some`); both
211    /// asymmetries are by design.
212    #[must_use]
213    pub fn coalesce_with(self, later: PendingRebuild) -> PendingRebuild {
214        // 1. File union, deterministic order.
215        let mut file_set: std::collections::BTreeSet<PathBuf> =
216            self.changes.changed_files.into_iter().collect();
217        file_set.extend(later.changes.changed_files);
218        let changed_files: Vec<PathBuf> = file_set.into_iter().collect();
219
220        // 2. OR of git_state_changed.
221        let git_state_changed = self.changes.git_state_changed || later.changes.git_state_changed;
222
223        // 3. Full-rebuild-dominance merge on git_change_class.
224        let git_change_class = merge_git_class(
225            self.changes.git_change_class,
226            later.changes.git_change_class,
227        );
228
229        // 4. enqueued_at = max.
230        let enqueued_at = self.enqueued_at.max(later.enqueued_at);
231
232        // 5. git_state_at_enqueue: absorb-None, later wins when both Some.
233        let git_state_at_enqueue = later.git_state_at_enqueue.or(self.git_state_at_enqueue);
234
235        PendingRebuild {
236            changes: ChangeSet {
237                changed_files,
238                git_state_changed,
239                git_change_class,
240            },
241            enqueued_at,
242            git_state_at_enqueue,
243        }
244    }
245}
246
247/// Merge two `git_change_class` observations per §J.2
248/// "full-rebuild-dominance" semantics.
249///
250/// See [`PendingRebuild::coalesce_with`] for the merge contract.
251fn merge_git_class(a: Option<GitChangeClass>, b: Option<GitChangeClass>) -> Option<GitChangeClass> {
252    let requires_full = a.is_some_and(GitChangeClass::requires_full_rebuild)
253        || b.is_some_and(GitChangeClass::requires_full_rebuild);
254    if requires_full {
255        return Some(GitChangeClass::TreeDiverged);
256    }
257    // later wins for non-full; fallback to earlier if later is None.
258    b.or(a)
259}
260
261// ---------------------------------------------------------------------------
262// Decision fork
263// ---------------------------------------------------------------------------
264
265/// Hybrid rebuild-mode decision per plan line 1422 and Amendment 2
266/// §J.
267///
268/// Full-rebuild triggers (in evaluation order):
269///
270/// 1. [`ChangeSet::requires_full_rebuild`] — a committed git state
271///    change that mandates a full rebuild (currently `BranchSwitch`
272///    or `TreeDiverged`; `LocalCommit` / `Noise` do not force full).
273/// 2. `changed_files.len() > config.incremental_threshold` — too many
274///    files for incremental economics to pay off.
275/// 3. `closure.len() > graph.file_count() * closure_limit_percent /
276///    100` — the reverse-dep closure would touch more files than the
277///    full-rebuild cost; take the full path instead.
278///
279/// Otherwise, Incremental. Empty `ChangeSet` ([`ChangeSet::is_empty`])
280/// returns Incremental as a legitimate no-op rebuild (Phase 3e
281/// supports this explicitly — see `sqry-core/src/graph/unified/build/
282/// incremental.rs:2842` for the empty-rebuild regression test).
283///
284/// Closure math resolves only paths already present in the graph's
285/// file registry. Paths not yet registered (new files) contribute
286/// zero to the closure but are still passed through to
287/// [`incremental_rebuild`]; its internal `phase3e_discover_new_file_paths`
288/// (Phase 3e, `incremental.rs:935`) handles them via a first-class
289/// new-file discovery leg. This is intentional: forcing Full on every
290/// new file would regress Phase 3e's shipped behavior.
291#[must_use]
292pub fn decide_mode(config: &DaemonConfig, changes: &ChangeSet, graph: &CodeGraph) -> RebuildMode {
293    if changes.is_empty() {
294        return RebuildMode::Incremental;
295    }
296    if changes.requires_full_rebuild() {
297        return RebuildMode::Full;
298    }
299    if changes.changed_files.len() > config.incremental_threshold {
300        return RebuildMode::Full;
301    }
302
303    // Resolve registered paths to FileId for closure math. Unresolved
304    // paths (new files) simply don't contribute to the closure.
305    let file_ids: Vec<_> = changes
306        .changed_files
307        .iter()
308        .filter_map(|p| graph.files().get(p))
309        .collect();
310
311    let closure = compute_reverse_dep_closure(&file_ids, graph);
312    let file_count = graph.files().len();
313    // Integer math: > file_count * pct / 100 → Full. `closure_limit_percent`
314    // is validated as 1..=100 in `DaemonConfig::validate`.
315    let limit = file_count.saturating_mul(config.closure_limit_percent as usize) / 100;
316
317    if closure.len() > limit {
318        RebuildMode::Full
319    } else {
320        RebuildMode::Incremental
321    }
322}
323
324// ---------------------------------------------------------------------------
325// Working-set estimate
326// ---------------------------------------------------------------------------
327
328/// Compute the A2 §G.6 working-set estimate for the given rebuild
329/// mode.
330///
331/// Formula (see
332/// [`crate::workspace::working_set_estimate`] for the multipliers):
333///
334/// | Mode        | `new_graph_final_estimate`                                                 | `staging_overhead`                                               | `interner_snapshot_bytes`                |
335/// |-------------|---------------------------------------------------------------------------|------------------------------------------------------------------|------------------------------------------|
336/// | Full        | `prior.heap_bytes()`                                                      | `file_count * ESTIMATE_STAGING_PER_FILE_BYTES`                   | `prior.strings().heap_bytes()`           |
337/// | Incremental | `prior.heap_bytes() + closure.len() * ESTIMATE_FINAL_PER_FILE_BYTES`      | `closure_file_count * ESTIMATE_STAGING_PER_FILE_BYTES`           | `prior.strings().heap_bytes()`           |
338///
339/// Where `closure_file_count` for Incremental uses
340/// `changes.changed_files.len()` (an upper bound — we compute the
341/// exact reverse-dep closure size only when we already decided to run
342/// incremental, so the caller may pass the file count directly rather
343/// than re-running closure math here).
344fn compute_working_set_estimate(prior: &CodeGraph, changes: &ChangeSet, mode: RebuildMode) -> u64 {
345    let prior_bytes = prior.heap_bytes() as u64;
346    let interner_bytes = prior.strings().heap_bytes() as u64;
347    let file_count = prior.files().len() as u64;
348
349    let (final_estimate, staging_file_count) = match mode {
350        RebuildMode::Full => (prior_bytes, file_count),
351        RebuildMode::Incremental => {
352            let n = changes.changed_files.len() as u64;
353            let final_est =
354                prior_bytes.saturating_add(n.saturating_mul(ESTIMATE_FINAL_PER_FILE_BYTES));
355            (final_est, n)
356        }
357    };
358
359    let staging = staging_file_count.saturating_mul(ESTIMATE_STAGING_PER_FILE_BYTES);
360
361    working_set_estimate(WorkingSetInputs {
362        new_graph_final_estimate: final_estimate,
363        staging_overhead: staging,
364        interner_snapshot_bytes: interner_bytes,
365    })
366}
367
368// ---------------------------------------------------------------------------
369// Test hooks — gate + capture (Task 7 Phase 7b2)
370// ---------------------------------------------------------------------------
371//
372// Both hooks are gated behind `std::sync::OnceLock`. In production the
373// `OnceLock::get()` fast path is a single relaxed atomic load per
374// `execute_one_rebuild` iteration that returns `None` and short-circuits
375// the hook entirely. Tests install the hooks once at harness setup;
376// subsequent dispatcher iterations see the hook and behave
377// deterministically.
378
379/// **Test-only** gate for §J.2 serialization stress tests.
380///
381/// When installed, each `execute_one_rebuild` iteration awaits
382/// [`Self::release`] while [`Self::hold`] is non-zero. The test fires
383/// `release.notify_one()` once per iteration it wants to unblock, and
384/// the gate atomically decrements `hold` on release. When `hold`
385/// reaches zero, subsequent iterations pass through without waiting.
386///
387/// # Lost-wakeup safety
388///
389/// [`gate_check`](RebuildDispatcher::gate_check) obtains the
390/// `notified()` future BEFORE re-checking `hold`, matching the 7b1
391/// `Notify` handshake pattern (`tests/rebuild_runner_gate.rs` inline
392/// comments). A test that first sets `hold = N` then fires N
393/// `notify_one()` calls is guaranteed to release the first N
394/// iterations without lost wakeups.
395#[doc(hidden)]
396#[derive(Debug)]
397pub struct TestGate {
398    /// Number of iterations remaining that must wait on `release`.
399    /// Initialised by the test (e.g. `AtomicUsize::new(1)` to block
400    /// only the first iteration); decremented on each gate release.
401    pub hold: AtomicUsize,
402    /// Notify fired by the test driver to release one waiting
403    /// iteration.
404    pub release: tokio::sync::Notify,
405}
406
407/// **Test-only** per-iteration capture for §J.2 file-union correctness
408/// assertions.
409///
410/// When installed, each `execute_one_rebuild` iteration appends a
411/// [`CapturedIteration`] to [`Self::iterations`] AFTER the mode
412/// decision and BEFORE the gate check — so the test observes the
413/// exact `ChangeSet` consumed by each iteration regardless of whether
414/// the gate stalls that iteration.
415///
416/// Task 7 Phase 7c extension: three additional fields for the
417/// eviction-during-rebuild abort test drive the
418/// [`RebuildDispatcher::post_reservation_check`] hook. The hook fires
419/// AFTER `reserve_rebuild` returns `Ok` and BEFORE the rebuild
420/// pipeline starts, so tests can observe a live reservation and race
421/// eviction against it.
422#[doc(hidden)]
423#[derive(Debug, Default)]
424pub struct TestCapture {
425    /// Records one entry per `execute_one_rebuild` invocation, in
426    /// order. Never cleared by the dispatcher; the test inspects it
427    /// after synchronising on the dispatchers completion.
428    pub iterations: parking_lot::Mutex<Vec<CapturedIteration>>,
429
430    /// Counter of iterations that must stall at the post-reservation
431    /// hook. `0` = no hold; `> 0` = one iteration will stall per unit.
432    /// Armed via [`Self::arm_post_reservation_hold`], released via
433    /// [`Self::release_post_reservation`].
434    pub post_reservation_hold: AtomicUsize,
435
436    /// Fired when [`RebuildDispatcher::post_reservation_check`] is
437    /// entered. The test awaits [`Self::wait_until_post_reservation`]
438    /// to synchronise on "rebuild has reserved bytes and is about to
439    /// run".
440    pub post_reservation_reached: tokio::sync::Notify,
441
442    /// Fired by the test driver to release one waiting iteration. Uses
443    /// the same handshake pattern as [`TestGate`] (`notified()` future
444    /// armed before re-checking `hold`).
445    pub post_reservation_release: tokio::sync::Notify,
446
447    /// Task 7 Phase 7c feat iter-1 (Codex MAJOR 2): counter for every
448    /// `execute_one_rebuild` iteration where the §5e
449    /// `workspaces.read()` recheck (or map-missing recheck) observed
450    /// cancellation AFTER a successful pipeline run and BEFORE
451    /// publish. Fires when eviction raced during a pipeline that
452    /// completed before the forwarder's first poll.
453    pub publish_path_evictions: AtomicUsize,
454    /// Counter for every `execute_one_rebuild` iteration where the
455    /// sqry-core pipeline itself returned
456    /// `GraphBuilderError::Cancelled` from a pass boundary (the
457    /// forwarder had time to flip the token before the pipeline
458    /// completed).
459    pub pass_boundary_cancellations: AtomicUsize,
460
461    /// Test-only switch (iter-1): when `true`, the next
462    /// `execute_rebuild` call does NOT spawn a
463    /// [`spawn_cancellation_forwarder`]. Tests use this to
464    /// deterministically force the §5e publish-path recheck
465    /// (without a forwarder to flip the token, the pipeline
466    /// completes Ok even though `ws.rebuild_cancelled = true`, and
467    /// the §5e recheck picks up the eviction).
468    ///
469    /// Production leaves this `false`; the forwarder always runs.
470    pub suppress_forwarder: AtomicBool,
471
472    /// Test-only switch (iter-2 Codex MAJOR 1): when `true`,
473    /// `execute_rebuild` synchronously calls `token.cancel()`
474    /// immediately after spawning (or electing to suppress) the
475    /// forwarder and BEFORE dispatching `spawn_blocking`. This
476    /// guarantees the pipeline's very first `cancellation.check()?`
477    /// observes the cancelled token, forcing the pass-boundary
478    /// cancellation path deterministically.
479    ///
480    /// Production leaves this `false`.
481    pub precancel_token_for_pass_boundary: AtomicBool,
482
483    /// Durable flag (iter-2 Codex MAJOR 2): set by
484    /// [`RebuildDispatcher::post_reservation_check`] when the hook
485    /// fires. Paired with `post_reservation_reached` notify to
486    /// provide lost-wakeup-safe synchronisation: tests that arm
487    /// `post_reservation_hold` AFTER a rebuild has already reached
488    /// the hook (rare but possible under fast scheduling) still see
489    /// the flag and do not block waiting for a signal that already
490    /// fired.
491    ///
492    /// Cleared by [`Self::reset_post_reservation_reached`] for
493    /// multi-iteration tests.
494    pub post_reservation_reached_flag: AtomicBool,
495}
496
497impl TestCapture {
498    /// Construct a zero-initialised capture. Same as
499    /// [`Default::default`]; named constructor for clarity in tests.
500    #[must_use]
501    pub fn new() -> Self {
502        Self::default()
503    }
504
505    /// Read the §5e publish-path-recheck eviction counter (iter-1).
506    #[must_use]
507    pub fn publish_path_evictions(&self) -> usize {
508        self.publish_path_evictions.load(Ordering::Acquire)
509    }
510
511    /// Read the pass-boundary cancellation counter (iter-1).
512    #[must_use]
513    pub fn pass_boundary_cancellations(&self) -> usize {
514        self.pass_boundary_cancellations.load(Ordering::Acquire)
515    }
516
517    /// Arm a single post-reservation stall. The next
518    /// `execute_one_rebuild` iteration that reaches
519    /// [`RebuildDispatcher::post_reservation_check`] will block until
520    /// [`Self::release_post_reservation`] is called. Stacks: calling
521    /// this N times blocks N iterations.
522    pub fn arm_post_reservation_hold(&self) {
523        self.post_reservation_hold.fetch_add(1, Ordering::AcqRel);
524    }
525
526    /// Release exactly one stalled iteration. Matches the `TestGate`
527    /// release semantics — the held iteration wakes, decrements
528    /// `post_reservation_hold` one more step via the loop, and
529    /// continues to `execute_rebuild`. Safe to call before an
530    /// iteration arms (lost-wakeup-safe via the handshake in
531    /// [`RebuildDispatcher::post_reservation_check`]).
532    pub fn release_post_reservation(&self) {
533        self.post_reservation_release.notify_one();
534    }
535
536    /// Await the next `execute_one_rebuild` iteration reaching the
537    /// post-reservation hook. Returns as soon as the hook fires.
538    ///
539    /// Iter-2 Codex MAJOR 2: lost-wakeup-safe. If the hook has
540    /// already fired (`post_reservation_reached_flag == true`),
541    /// returns immediately without awaiting. Otherwise arms the
542    /// `notified()` future BEFORE re-checking the flag (handshake
543    /// pattern) so a signal that fires between arm and recheck is
544    /// still observed.
545    pub async fn wait_until_post_reservation(&self) {
546        if self.post_reservation_reached_flag.load(Ordering::Acquire) {
547            return;
548        }
549        let notified = self.post_reservation_reached.notified();
550        if self.post_reservation_reached_flag.load(Ordering::Acquire) {
551            return;
552        }
553        notified.await;
554    }
555
556    /// Reset the durable reached-flag so a second iteration of a
557    /// multi-iteration test (e.g. the 100-iter stress) can await a
558    /// fresh hook firing. Test-only.
559    pub fn reset_post_reservation_reached(&self) {
560        self.post_reservation_reached_flag
561            .store(false, Ordering::Release);
562    }
563}
564
565/// One captured `execute_one_rebuild` iteration (test-only).
566#[doc(hidden)]
567#[derive(Debug, Clone)]
568pub struct CapturedIteration {
569    /// The ChangeSet as-consumed by this iteration (post-coalesce).
570    pub changeset: ChangeSet,
571    /// Mode decided for this iteration.
572    pub mode: RebuildMode,
573    /// Git-state snapshot attached to the consumed `PendingRebuild`.
574    /// `None` for direct (non-bridge) callers.
575    pub git_state_at_enqueue: Option<LastIndexedGitState>,
576    /// Wall-clock when the iteration started.
577    pub started_at: Instant,
578}
579
580// ---------------------------------------------------------------------------
581// Watcher bridge registry (Task 7 Phase 7b2)
582// ---------------------------------------------------------------------------
583
584/// One per-workspace watcher + dispatcher task pair.
585///
586/// Both `JoinHandle`s are **observability-only**: they do not own the
587/// task lifetimes. Dropping a `WatcherEntry` detaches both tasks
588/// (Tokio's `JoinHandle::drop` is detach, not cancel — see
589/// `tokio::task::JoinHandle` docs).
590///
591/// Shutdown is cooperative through two independent signals:
592/// 1. `ws.rebuild_cancelled = true` (set by eviction) → the
593///    cancellable watcher returns `Ok(None)` → blocking thread exits
594///    → tokio mpsc sender drops → async task's `rx.recv()` returns
595///    `None` → async task exits.
596/// 2. `dispatcher.handle_changes_with_git_state` returns
597///    `Err(WorkspaceEvicted)` → async task exits → drops receiver →
598///    next `blocking_send` on the blocking thread fails → blocking
599///    thread exits.
600///
601/// Before exiting, the async task flips [`Self::live`] to `false`
602/// (so a concurrent `ensure_watching` call treats the entry as
603/// draining) and then calls
604/// [`RebuildDispatcher::reap_watcher`](crate::RebuildDispatcher::reap_watcher)
605/// with its [`Self::generation`] to remove the entry from the map. The
606/// generation token ensures a late old task cannot erase a newer
607/// replacement entry after a fast evict+reload.
608struct WatcherEntry {
609    /// Monotonic generation token. Assigned at construction time from
610    /// `RebuildDispatcher::next_watcher_generation`; used by
611    /// `reap_watcher` to distinguish "my entry" from "a newer entry
612    /// for the same WorkspaceKey".
613    generation: u64,
614    /// `true` while the async task is processing dispatches. Flipped
615    /// to `false` as the first action of the post-loop cleanup
616    /// sequence, BEFORE `reap_watcher` is called. Fast-path callers
617    /// of `ensure_watching` read this as the authoritative liveness
618    /// signal (rather than `JoinHandle::is_finished`, which returns
619    /// `false` while the task is still inside its final cleanup
620    /// closure).
621    live: Arc<AtomicBool>,
622    /// Handle to the async dispatcher task. Stored only to keep the
623    /// task attached for the entry's lifetime — never awaited or
624    /// aborted. Shutdown is cooperative (see struct-level docs).
625    #[allow(dead_code)]
626    async_handle: JoinHandle<()>,
627    /// Handle to the blocking watcher thread. Stored only for
628    /// attachment — dropping it detaches the task, which continues to
629    /// completion via cooperative cancellation.
630    #[allow(dead_code)]
631    blocking_handle: JoinHandle<()>,
632}
633
634// ---------------------------------------------------------------------------
635// RebuildDispatcher
636// ---------------------------------------------------------------------------
637
638/// Sole acquirer of [`LoadedWorkspace::rebuild_lane`] (A2 §J).
639///
640/// Constructed once at daemon startup with a shared
641/// [`Arc<PluginManager>`]. Every [`Self::handle_changes`] call honours
642/// the canonical 7-step reservation call path (plan line 1495) and
643/// the §J.4 lock-order contract documented on the module.
644pub struct RebuildDispatcher {
645    manager: Arc<WorkspaceManager>,
646    config: Arc<DaemonConfig>,
647    plugins: Arc<PluginManager>,
648    build_config: BuildConfig,
649    dispatched_count: AtomicU64,
650    last_mode: AtomicU8,
651
652    /// Per-workspace watcher+dispatcher task pairs (Task 7 Phase 7b2).
653    /// Populated by [`Self::ensure_watching`]; pruned by
654    /// [`Self::reap_watcher`] as each async task exits.
655    watchers: parking_lot::Mutex<HashMap<WorkspaceKey, WatcherEntry>>,
656    /// Monotonic counter used to tag each `WatcherEntry` with a unique
657    /// generation, enabling `reap_watcher`'s compare-and-remove.
658    next_watcher_generation: AtomicU64,
659
660    /// Test-only synchronisation gate. `None` in production; tests
661    /// install once at harness setup. See [`TestGate`] docstring.
662    #[doc(hidden)]
663    test_gate: OnceLock<Arc<TestGate>>,
664    /// Test-only per-iteration capture recorder. `None` in production.
665    /// See [`TestCapture`] docstring.
666    #[doc(hidden)]
667    test_capture: OnceLock<Arc<TestCapture>>,
668}
669
670impl std::fmt::Debug for RebuildDispatcher {
671    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
672        // `PluginManager` does not implement `Debug` because its
673        // internal registry is non-trivial; skip it here. `BuildConfig`
674        // also does not implement `Debug` on HEAD. Report the shape of
675        // the dispatcher instead of its full contents.
676        f.debug_struct("RebuildDispatcher")
677            .field(
678                "dispatched_count",
679                &self.dispatched_count.load(Ordering::Relaxed),
680            )
681            .field("last_mode", &self.last_mode())
682            .field("memory_limit_mb", &self.config.memory_limit_mb)
683            .finish_non_exhaustive()
684    }
685}
686
687impl RebuildDispatcher {
688    /// Construct a fresh dispatcher sharing the daemon's manager,
689    /// config, and plugin manager.
690    ///
691    /// `build_config` defaults to [`BuildConfig::default`]; Task 14
692    /// calibration may override this from [`DaemonConfig`] knobs.
693    #[must_use]
694    pub fn new(
695        manager: Arc<WorkspaceManager>,
696        config: Arc<DaemonConfig>,
697        plugins: Arc<PluginManager>,
698    ) -> Arc<Self> {
699        Arc::new(Self {
700            manager,
701            config,
702            plugins,
703            build_config: BuildConfig::default(),
704            dispatched_count: AtomicU64::new(0),
705            last_mode: AtomicU8::new(0),
706            watchers: parking_lot::Mutex::new(HashMap::new()),
707            next_watcher_generation: AtomicU64::new(0),
708            test_gate: OnceLock::new(),
709            test_capture: OnceLock::new(),
710        })
711    }
712
713    // -----------------------------------------------------------------
714    // Test-only hook installers (Task 7 Phase 7b2)
715    // -----------------------------------------------------------------
716
717    /// **Test-only.** Install a [`TestGate`] that stalls the FIRST
718    /// `hold` iterations of `execute_one_rebuild` until the test
719    /// driver fires `gate.release.notify_one()`. Returns `Err(gate)` on
720    /// the original argument if a gate was already installed (one-shot
721    /// per dispatcher lifetime).
722    ///
723    /// Zero production overhead — production callers never install a
724    /// gate and `gate_check` short-circuits on the `OnceLock::get()
725    /// == None` fast path.
726    #[doc(hidden)]
727    pub fn install_test_gate(&self, gate: Arc<TestGate>) -> Result<(), Arc<TestGate>> {
728        self.test_gate.set(gate)
729    }
730
731    /// **Test-only.** Install a [`TestCapture`] recorder that pushes
732    /// one [`CapturedIteration`] per `execute_one_rebuild` invocation.
733    /// Returns `Err(capture)` on the original argument if a capture was
734    /// already installed.
735    #[doc(hidden)]
736    pub fn install_test_capture(&self, capture: Arc<TestCapture>) -> Result<(), Arc<TestCapture>> {
737        self.test_capture.set(capture)
738    }
739
740    /// Internal gate check, called by `execute_one_rebuild` after
741    /// mode selection and before admission reservation (iter-2 §4
742    /// placement rationale: the gate must NOT hold an admission
743    /// reservation across a synthetic test stall).
744    ///
745    /// # Handshake pattern
746    ///
747    /// Obtain the `notified()` future BEFORE rechecking `hold`. If we
748    /// load `hold > 0`, THEN enter `notified().await` only if `hold`
749    /// is still `> 0` after the future is armed. This matches the 7b1
750    /// pattern documented in `tests/rebuild_runner_gate.rs`.
751    async fn gate_check(&self) {
752        let Some(gate) = self.test_gate.get() else {
753            return;
754        };
755        if gate.hold.load(Ordering::Acquire) == 0 {
756            return;
757        }
758        let notified = gate.release.notified();
759        tokio::pin!(notified);
760        // Re-check AFTER arming the future: a concurrent
761        // notify_one() between the first load and the await point
762        // would otherwise be lost. Since `notified()` retroactively
763        // matches any pending permit, re-checking `hold` after
764        // arming is sufficient.
765        if gate.hold.load(Ordering::Acquire) > 0 {
766            notified.await;
767            gate.hold.fetch_sub(1, Ordering::AcqRel);
768        }
769    }
770
771    /// Cumulative count of successful dispatches across this
772    /// dispatcher's lifetime.
773    ///
774    /// Observability surface for the Task 7 §I dispatcher-count
775    /// matrix (arrives in 7b). Not reset on workspace eviction; the
776    /// counter is daemon-process-scoped.
777    #[must_use]
778    pub fn dispatched_count(&self) -> u64 {
779        self.dispatched_count.load(Ordering::Relaxed)
780    }
781
782    /// Most-recent mode selected by [`Self::handle_changes`], or
783    /// `None` if no dispatch has happened yet.
784    ///
785    /// Observability surface for tests and tracing spans.
786    #[must_use]
787    pub fn last_mode(&self) -> Option<RebuildMode> {
788        RebuildMode::from_u8(self.last_mode.load(Ordering::Relaxed))
789    }
790
791    /// §J.2 runner-role handoff + drain-loop orchestrator (Phase 7b1).
792    ///
793    /// Callers (the Phase 7b2 watcher task, direct test drivers,
794    /// future IPC `workspace/force_rebuild`) invoke this for each
795    /// debounced [`ChangeSet`]. Exactly one concurrent invocation
796    /// per workspace runs the full pipeline; others coalesce their
797    /// [`ChangeSet`] into the lane and return `Ok(())` promptly.
798    ///
799    /// Preconditions (not part of the §J.4 ordered sequence):
800    /// - The workspace must already be registered with the manager
801    ///   (i.e. [`WorkspaceManager::get_or_load`] succeeded earlier).
802    ///   A caller whose [`WorkspaceManager::lookup`] returns `None`
803    ///   sees [`DaemonError::WorkspaceEvicted`].
804    ///
805    /// # Phase A (acquire-or-park) — single lane lock scope
806    ///
807    /// 1. Lock [`LoadedWorkspace::rebuild_lane`].
808    /// 2. Coalesce incoming `changes` with any prior
809    ///    [`PendingRebuild`] parked in the lane (A2 §J.2 merge rules).
810    /// 3. CAS [`LoadedWorkspace::rebuild_in_flight`] `false → true`.
811    ///    - On CAS success: we own the runner role; keep coalesced
812    ///      pending as `current`, drop lane.
813    ///    - On CAS failure: another runner is active; park coalesced
814    ///      in lane, drop lane, return `Ok(())`.
815    ///
816    /// Holding the lane across the in-flight CAS makes the acquire
817    /// race-free: every in-flight transition happens under the lane,
818    /// so no two runners ever claim the role simultaneously.
819    ///
820    /// # Phase B (drain loop) — sentinel-protected
821    ///
822    /// Loop, armed with [`DrainLoopSentinel`] for panic-safety:
823    ///
824    /// 1. **Top-of-loop eviction gate.** Check
825    ///    [`LoadedWorkspace::rebuild_cancelled`]. If set, take and
826    ///    drop any parked pending (workspace is gone), release
827    ///    `rebuild_in_flight` under the lane, disarm sentinel, return
828    ///    [`DaemonError::WorkspaceEvicted`].
829    /// 2. Call [`Self::execute_one_rebuild`] on `current`. Records
830    ///    `record_success` / `record_failure` on the workspace; the
831    ///    result flows into `last_result`.
832    /// 3. Re-lock lane. If a new `PendingRebuild` is parked: take it
833    ///    as the next `current`, loop to step 1 (in-flight stays
834    ///    true). If the lane is empty: release `rebuild_in_flight`
835    ///    under the lane, disarm sentinel, return `last_result`.
836    ///
837    /// # §J.4 lock order
838    ///
839    /// `manager.lookup` takes `workspaces.read()` and drops before
840    /// `rebuild_lane`. `rebuild_lane` is dropped before
841    /// [`WorkspaceManager::reserve_rebuild`] (which re-takes
842    /// `workspaces.read() → admission.lock()` internally). No
843    /// `rebuild_lane` ↔ `admission` nesting ever occurs.
844    ///
845    /// # Errors
846    ///
847    /// - [`DaemonError::WorkspaceEvicted`] when the top-of-loop gate
848    ///   observes `rebuild_cancelled == true`, or when
849    ///   [`WorkspaceManager::reserve_rebuild`]'s Phase-1 check finds
850    ///   the workspace missing from the manager map or cancelled.
851    /// - [`DaemonError::MemoryBudgetExceeded`] when admission cannot
852    ///   satisfy a reservation after eviction.
853    /// - [`DaemonError::WorkspaceBuildFailed`] when either full or
854    ///   incremental rebuild fails (including `spawn_blocking` join
855    ///   failures).
856    ///
857    /// Only the FINAL drain-loop iteration's result is surfaced to
858    /// the caller. Per-iteration success/failure is recorded on the
859    /// workspace (`ws.last_error`, `ws.last_good_at`,
860    /// `ws.retry_count`) via [`Self::execute_one_rebuild`].
861    pub async fn handle_changes(
862        &self,
863        key: &WorkspaceKey,
864        changes: ChangeSet,
865    ) -> Result<(), DaemonError> {
866        // Plain handle_changes — no git-state snapshot attached. The
867        // runner will consume this PendingRebuild but will NOT advance
868        // `ws.last_indexed_git_state` because `git_state_at_enqueue`
869        // is `None`. Used by direct callers (tests, future IPC
870        // `workspace/force_rebuild`) that don't own a watcher.
871        self.handle_changes_inner(
872            key,
873            PendingRebuild {
874                changes,
875                enqueued_at: Instant::now(),
876                git_state_at_enqueue: None,
877            },
878        )
879        .await
880    }
881
882    /// Like [`Self::handle_changes`] but attaches a
883    /// [`LastIndexedGitState`] snapshot to the enqueued
884    /// [`PendingRebuild`]. When the runner (either us directly or a
885    /// concurrent runner we parked against) successfully publishes
886    /// the graph derived from this `PendingRebuild`, it commits the
887    /// snapshot into [`LoadedWorkspace::last_indexed_git_state`]
888    /// as the new classifier baseline.
889    ///
890    /// Task 7 Phase 7b2 — used exclusively by the per-workspace
891    /// watcher bridge spawned by [`Self::ensure_watching`]. Plain
892    /// [`Self::handle_changes`] remains the API for callers without a
893    /// watcher-owned git-state snapshot.
894    ///
895    /// # §J.4 lock order, error taxonomy, gate, sentinel
896    ///
897    /// Identical to [`Self::handle_changes`] — Phase A
898    /// (acquire-or-park under lane + CAS) and Phase B (drain loop)
899    /// behave identically. The only difference is the
900    /// `git_state_at_enqueue: Some(git_state)` construction.
901    ///
902    /// # Errors
903    ///
904    /// Same as [`Self::handle_changes`] —
905    /// [`DaemonError::WorkspaceEvicted`],
906    /// [`DaemonError::MemoryBudgetExceeded`],
907    /// [`DaemonError::WorkspaceBuildFailed`].
908    pub async fn handle_changes_with_git_state(
909        &self,
910        key: &WorkspaceKey,
911        changes: ChangeSet,
912        git_state: LastIndexedGitState,
913    ) -> Result<(), DaemonError> {
914        self.handle_changes_inner(
915            key,
916            PendingRebuild {
917                changes,
918                enqueued_at: Instant::now(),
919                git_state_at_enqueue: Some(git_state),
920            },
921        )
922        .await
923    }
924
925    /// Shared Phase A (acquire-or-park) + Phase B (drain loop) body
926    /// for [`Self::handle_changes`] and
927    /// [`Self::handle_changes_with_git_state`]. The public methods
928    /// construct the incoming [`PendingRebuild`] and thread it here.
929    async fn handle_changes_inner(
930        &self,
931        key: &WorkspaceKey,
932        incoming: PendingRebuild,
933    ) -> Result<(), DaemonError> {
934        // --- Precondition: lookup Arc<LoadedWorkspace>. ---
935        let ws: Arc<LoadedWorkspace> =
936            self.manager
937                .lookup(key)
938                .ok_or_else(|| DaemonError::WorkspaceEvicted {
939                    root: key.source_root.clone(),
940                })?;
941
942        // ================================================================
943        // Phase A — acquire-runner-or-park (single lane lock scope).
944        //
945        // Holding `rebuild_lane` across the `rebuild_in_flight` CAS is
946        // the load-bearing invariant: every in-flight transition
947        // happens under the lane, so no two runners ever claim the
948        // role simultaneously.
949        // ================================================================
950        let mut current: PendingRebuild = {
951            let mut lane_guard = ws.rebuild_lane.lock().await;
952
953            // Coalesce incoming with any prior parked pending.
954            let coalesced = match lane_guard.take() {
955                Some(prior) => prior.coalesce_with(incoming),
956                None => incoming,
957            };
958
959            // Try to acquire the runner role.
960            match ws.rebuild_in_flight.compare_exchange(
961                false,
962                true,
963                Ordering::AcqRel,
964                Ordering::Acquire,
965            ) {
966                Ok(_) => coalesced, // We own the runner role.
967                Err(_) => {
968                    // Another runner is active — park coalesced in
969                    // the lane; that runner will drain it at its next
970                    // drain-loop iteration. Return Ok(()) without
971                    // executing the pipeline.
972                    *lane_guard = Some(coalesced);
973                    return Ok(());
974                }
975            }
976            // lane_guard dropped at end of scope.
977        };
978
979        // ================================================================
980        // Phase B — drain loop.
981        //
982        // Sentinel guarantees `rebuild_in_flight` is released if the
983        // loop unwinds abnormally (documented narrow race on the
984        // unwind path — plugin panics inside `spawn_blocking` are
985        // caught by `execute_rebuild` and mapped to `Err`, so the
986        // only realistic unwind trigger is a runtime-level failure).
987        // ================================================================
988        let mut sentinel = DrainLoopSentinel {
989            ws: Arc::clone(&ws),
990            armed: true,
991        };
992
993        // `last_result` is assigned by `execute_one_rebuild` on every
994        // iteration before any exit path that reads it. The eviction
995        // gate exits via its own `return Err(...)` without reading
996        // `last_result`, and the drain-exit `return last_result`
997        // branch is only reachable after an iteration has assigned.
998        let mut last_result: Result<(), DaemonError>;
999        loop {
1000            // --- Top-of-loop cancellation/eviction gate ---
1001            //
1002            // `rebuild_cancelled` is set either by eviction (under
1003            // `workspaces.write()` in `execute_eviction` BEFORE
1004            // `workspaces.remove(key)`) or by `daemon/cancel_rebuild`
1005            // (user-initiated cancel of an in-flight rebuild).
1006            //
1007            // `swap(false)` atomically reads AND clears the flag so a
1008            // user-cancel does not poison subsequent rebuilds. For
1009            // eviction this is harmless — the workspace is removed
1010            // from the map and this `LoadedWorkspace` instance will
1011            // not be reused.
1012            if ws.rebuild_cancelled.swap(false, Ordering::AcqRel) {
1013                let mut lane_guard = ws.rebuild_lane.lock().await;
1014                // Abandon any parked pending — the workspace is gone.
1015                let _dropped: Option<PendingRebuild> = lane_guard.take();
1016                ws.rebuild_in_flight.store(false, Ordering::Release);
1017                sentinel.armed = false;
1018                // Cluster-G iter-2 BLOCKER 3: distinguish an eviction
1019                // (the eviction path already set state to `Evicted`
1020                // under `workspaces.write()` BEFORE flipping the
1021                // flag) from a `daemon/reset` cancellation (which
1022                // leaves the state as `Rebuilding`). For reset, the
1023                // runner is responsible for the post-cancel state
1024                // transition — `record_and_transition_on_err` no-ops
1025                // on `WorkspaceEvicted`, so without this branch the
1026                // workspace would stay stuck in `Rebuilding` forever
1027                // (codex iter-1 review).
1028                if ws.load_state() == WorkspaceState::Rebuilding {
1029                    ws.store_state(WorkspaceState::Unloaded);
1030                }
1031                return Err(DaemonError::WorkspaceEvicted {
1032                    root: key.source_root.clone(),
1033                });
1034            }
1035
1036            // --- Execute iteration ---
1037            //
1038            // `execute_one_rebuild` records success/failure on the
1039            // workspace before returning, so workspace-level
1040            // observability is threaded through every iteration even
1041            // when the drain loop continues past an error.
1042            last_result = self
1043                .execute_one_rebuild(key, &ws, current.changes, current.git_state_at_enqueue)
1044                .await;
1045
1046            // --- Drain-or-exit decision (under lane lock) ---
1047            //
1048            // Releasing `rebuild_in_flight` under the lane is
1049            // load-bearing: a caller about to park in the lane
1050            // observes the release atomically with the empty-lane
1051            // snapshot, so parked pending cannot be stranded.
1052            let next: Option<PendingRebuild> = {
1053                let mut lane_guard = ws.rebuild_lane.lock().await;
1054                match lane_guard.take() {
1055                    Some(next) => Some(next),
1056                    None => {
1057                        ws.rebuild_in_flight.store(false, Ordering::Release);
1058                        sentinel.armed = false;
1059                        None
1060                    }
1061                }
1062                // lane_guard dropped at end of scope.
1063            };
1064
1065            match next {
1066                Some(n) => current = n,
1067                None => return last_result,
1068            }
1069        }
1070    }
1071
1072    /// Run a single pipeline iteration: decide mode, compute
1073    /// working-set estimate, reserve admission headroom, execute the
1074    /// rebuild on a blocking thread, publish atomically. Records
1075    /// `record_success` on successful publish or `record_failure` on
1076    /// any error path.
1077    ///
1078    /// Called by [`Self::handle_changes`]'s Phase B drain loop for
1079    /// each coalesced [`PendingRebuild`]. The drain loop may invoke
1080    /// this multiple times if new pending arrives between iterations;
1081    /// each invocation is independent from the caller's perspective.
1082    ///
1083    /// # Error paths
1084    ///
1085    /// All three non-panic error paths call `ws.record_failure` with
1086    /// a cloned [`DaemonError`] before returning:
1087    /// - [`WorkspaceManager::reserve_rebuild`] Err (e.g.
1088    ///   [`DaemonError::MemoryBudgetExceeded`],
1089    ///   [`DaemonError::WorkspaceEvicted`] from the Phase-1 check).
1090    /// - [`Self::execute_rebuild`] Err (pipeline failure — plugin
1091    ///   error, `spawn_blocking` join panic mapped to
1092    ///   [`DaemonError::WorkspaceBuildFailed`]).
1093    ///
1094    /// A panic inside [`WorkspaceManager::publish_and_retain`] — which
1095    /// is documented as infallible — unwinds past these match arms.
1096    /// Admission state is restored by `RollbackGuard` + reservation
1097    /// RAII drop, but `record_failure` is NOT called. That is
1098    /// acceptable as defense-in-depth only: in practice a
1099    /// `publish_and_retain` panic means the daemon is in
1100    /// damage-control territory where missing workspace-level error
1101    /// bookkeeping is a minor concern.
1102    ///
1103    /// On successful publish, calls `ws.record_success` which:
1104    /// - Stamps `last_good_at = SystemTime::now()`.
1105    /// - Clears `last_error`.
1106    /// - Resets `retry_count` to 0.
1107    /// - Also increments `self.dispatched_count` for the §I
1108    ///   observability matrix.
1109    ///
1110    /// Additionally (Task 7 Phase 7b2): when `git_state_at_enqueue`
1111    /// is `Some`, writes the snapshot into
1112    /// [`LoadedWorkspace::last_indexed_git_state`] AFTER the
1113    /// `publish_and_retain` call so the classifier baseline advances
1114    /// only with actual publish consumption. `None` entries (direct
1115    /// non-watcher callers) leave the baseline untouched.
1116    async fn execute_one_rebuild(
1117        &self,
1118        key: &WorkspaceKey,
1119        ws: &Arc<LoadedWorkspace>,
1120        changes: ChangeSet,
1121        git_state_at_enqueue: Option<LastIndexedGitState>,
1122    ) -> Result<(), DaemonError> {
1123        let prior_graph: Arc<CodeGraph> = ws.graph.load_full();
1124        let mode = decide_mode(&self.config, &changes, &prior_graph);
1125        self.store_last_mode(mode);
1126
1127        // Task 7 Phase 7c: transition to Rebuilding at iteration entry.
1128        // Queries keep serving the prior ArcSwap snapshot (A2 §G.5).
1129        // Placement BEFORE gate_check is intentional: the `Rebuilding`
1130        // lifecycle covers the synthetic test stall too, so
1131        // `classify_for_serve_returns_fresh_for_rebuilding_workspace`
1132        // observes a real state. The store is atomic; a concurrent
1133        // `execute_eviction` that wins the race overwrites this with
1134        // `Evicted` under `workspaces.write()` — `classify_for_serve`'s
1135        // map-missing arm wins, so this is harmless.
1136        ws.store_state(WorkspaceState::Rebuilding);
1137
1138        // Task 7 Phase 7b2: record the iteration input for tests
1139        // BEFORE the optional gate stall, so the test sees the input
1140        // even when the gate blocks. No-op when `test_capture` is not
1141        // installed (production).
1142        if let Some(cap) = self.test_capture.get() {
1143            cap.iterations.lock().push(CapturedIteration {
1144                changeset: changes.clone(),
1145                mode,
1146                git_state_at_enqueue: git_state_at_enqueue.clone(),
1147                started_at: Instant::now(),
1148            });
1149        }
1150
1151        // Task 7 Phase 7b2: optional test-only gate. Stalls the
1152        // iteration until the test driver releases it. Production
1153        // callers never install a gate — this is a single atomic
1154        // load + short-circuit.
1155        self.gate_check().await;
1156
1157        let estimate = compute_working_set_estimate(&prior_graph, &changes, mode);
1158
1159        // Admission reservation. `reserve_rebuild` itself performs the
1160        // Phase-1 membership + cancellation check (Task 7 Phase 7b1)
1161        // which can surface `WorkspaceEvicted`. Either way, record on
1162        // the workspace so `last_error` / `retry_count` reflect the
1163        // failure — EXCEPT on the eviction path (Task 7 Phase 7c: the
1164        // workspace is gone; polluting telemetry with a "failure"
1165        // record is misleading).
1166        let reservation = match self.manager.reserve_rebuild(key, estimate) {
1167            Ok(r) => r,
1168            Err(e) => {
1169                self.record_and_transition_on_err(ws, &e);
1170                return Err(e);
1171            }
1172        };
1173
1174        // Task 7 Phase 7c: post-reservation hook fires HERE with the
1175        // reservation alive. Tests use this to snapshot admission
1176        // state mid-rebuild (e.g., assert `reserved_bytes > 0`) and
1177        // race eviction. Production is a single atomic load + return.
1178        self.post_reservation_check().await;
1179
1180        // Pipeline execution (`spawn_blocking` catches plugin panics
1181        // internally and maps them to `WorkspaceBuildFailed`).
1182        //
1183        // Task 7 Phase 7c: `execute_rebuild` now wires a
1184        // `CancellationToken` to `ws.rebuild_cancelled` via
1185        // `spawn_cancellation_forwarder`. A mid-pipeline eviction sets
1186        // `rebuild_cancelled = true`, the forwarder flips the token,
1187        // the pipeline returns `GraphBuilderError::Cancelled` →
1188        // mapped to `DaemonError::WorkspaceEvicted`.
1189        let new_graph: CodeGraph = match self
1190            .execute_rebuild(key, ws, &prior_graph, mode, changes)
1191            .await
1192        {
1193            Ok(g) => g,
1194            Err(e) => {
1195                // Reservation refunds via RAII on drop at return.
1196                drop(reservation);
1197                // Task 7 Phase 7c feat iter-1 (Codex MAJOR 2):
1198                // increment the pass-boundary cancellation counter
1199                // when a `WorkspaceEvicted` surfaces from the
1200                // pipeline. Lets tests distinguish the two
1201                // cancellation surfaces (§5e publish-recheck vs
1202                // sqry-core pass-boundary).
1203                if matches!(e, DaemonError::WorkspaceEvicted { .. })
1204                    && let Some(cap) = self.test_capture.get()
1205                {
1206                    cap.pass_boundary_cancellations
1207                        .fetch_add(1, Ordering::AcqRel);
1208                }
1209                self.record_and_transition_on_err(ws, &e);
1210                return Err(e);
1211            }
1212        };
1213
1214        // Task 7 Phase 7c §5e: hold `workspaces.read()` across the
1215        // final cancellation/membership re-check AND
1216        // `publish_and_retain`. `execute_eviction` holds
1217        // `workspaces.write()` for its entire critical section, so
1218        // the RwLock makes this publish atomic with respect to
1219        // eviction: either eviction has fully completed (our
1220        // re-checks observe cancellation or map-missing) or eviction
1221        // cannot start until we drop the read guard. Pattern mirrors
1222        // `WorkspaceManager::get_or_load` (manager.rs:687-761, Codex
1223        // Task 6 Phase 6b iter-2 MAJOR). Lock order §J.4:
1224        // `workspaces -> admission`; `publish_and_retain` takes
1225        // `admission` internally, which nests correctly.
1226        let publish_result = {
1227            let workspaces_guard = self.manager.workspaces_read();
1228
1229            if ws.rebuild_cancelled.load(Ordering::Acquire) {
1230                drop(workspaces_guard);
1231                drop(reservation);
1232                // Task 7 Phase 7c feat iter-1 (Codex MAJOR 2):
1233                // counter — §5e recheck surface.
1234                if let Some(cap) = self.test_capture.get() {
1235                    cap.publish_path_evictions.fetch_add(1, Ordering::AcqRel);
1236                }
1237                // NO record_failure / state transition: eviction owns
1238                // those.
1239                return Err(DaemonError::WorkspaceEvicted {
1240                    root: key.source_root.clone(),
1241                });
1242            }
1243            if !workspaces_guard.contains_key(key) {
1244                drop(workspaces_guard);
1245                drop(reservation);
1246                if let Some(cap) = self.test_capture.get() {
1247                    cap.publish_path_evictions.fetch_add(1, Ordering::AcqRel);
1248                }
1249                return Err(DaemonError::WorkspaceEvicted {
1250                    root: key.source_root.clone(),
1251                });
1252            }
1253
1254            // `G_daemon_control_plane.md` §3.5 caller-migration —
1255            // execute_one_rebuild (production caller 2). On
1256            // post-build oversize, propagate the typed error
1257            // upstream; the reservation's RAII Drop refunds bytes.
1258            //
1259            // Cluster-G iter-2 BLOCKER 2: also transition the
1260            // workspace to `Failed` so it doesn't stay stuck in
1261            // `Rebuilding`. The success branch below handles the
1262            // happy-path `Loaded` transition; the error branch
1263            // previously only refunded bytes and returned, leaving
1264            // the workspace observable as `Rebuilding` forever
1265            // (codex iter-1 review — only `daemon reset` could
1266            // recover, and the reset path itself was also broken).
1267            let (_token, published_arc) =
1268                match self.manager.publish_and_retain(reservation, ws, new_graph) {
1269                    Ok((token, arc)) => (token, arc),
1270                    Err(e) => {
1271                        self.record_and_transition_on_err(ws, &e);
1272                        return Err(e);
1273                    }
1274                };
1275            published_arc
1276            // workspaces_guard drops at end of this block; eviction
1277            // can proceed immediately afterward.
1278        };
1279
1280        // Task 7 Phase 7b2: advance the classifier baseline when the
1281        // consumed PendingRebuild carried a watcher-captured snapshot.
1282        // This happens AFTER `publish_and_retain` + read-guard drop,
1283        // which is safe: the write is a per-workspace atomic on an
1284        // Arc the caller holds. A concurrent eviction stamps its own
1285        // state transition but does not invalidate this field — the
1286        // next `classify_for_serve` observes `WorkspaceEvicted` from
1287        // the map-missing arm.
1288        if let Some(git_state) = git_state_at_enqueue {
1289            *ws.last_indexed_git_state.write() = Some(git_state);
1290        }
1291
1292        // Success bookkeeping.
1293        ws.record_success(SystemTime::now());
1294        // Task 7 Phase 7c: transition Rebuilding -> Loaded. A
1295        // concurrent eviction that wins the race after our read guard
1296        // drop overwrites this with Evicted — harmless, see header
1297        // comment above the read-guard block.
1298        ws.store_state(WorkspaceState::Loaded);
1299        self.dispatched_count.fetch_add(1, Ordering::Relaxed);
1300        let _ = publish_result;
1301
1302        Ok(())
1303    }
1304
1305    /// Task 7 Phase 7c helper: update workspace state + bookkeeping on
1306    /// a rebuild-failure error path.
1307    ///
1308    /// - `WorkspaceEvicted`:
1309    ///   - **State already `Evicted`** → NO-OP. Eviction wrote
1310    ///     `Evicted` under `workspaces.write()` BEFORE flipping
1311    ///     `rebuild_cancelled`; clobbering it with `Failed` would
1312    ///     destroy that contract.
1313    ///   - **State still `Rebuilding`** → cluster-G iter-3 fix.
1314    ///     This is a `daemon reset` cancellation that fired AFTER
1315    ///     the runner's top-of-loop gate but BEFORE the publish
1316    ///     recheck (`rebuild.rs:1229`). The runner converts that
1317    ///     into `WorkspaceEvicted`, but no eviction actually
1318    ///     happened — `WorkspaceManager::reset` only set
1319    ///     `rebuild_cancelled = true` and returned
1320    ///     `ResetCancellationDispatched`. Without the transition
1321    ///     below, the workspace stays stuck in `Rebuilding` and a
1322    ///     subsequent `daemon reset` returns
1323    ///     `ResetCancellationDispatched` again — operator must
1324    ///     `daemon stop && daemon start` to recover.
1325    ///
1326    ///     We transition to `Unloaded` (the same destination
1327    ///     `WorkspaceManager::reset` would have written if reset
1328    ///     had won the race against the iteration). The map entry
1329    ///     and `pinned` bit are preserved by the in-place
1330    ///     `store_state`; the next `daemon reset` (or the caller's
1331    ///     retry after the documented `retry_after_ms = 250`) sees
1332    ///     `Unloaded` and is a no-op, and `daemon load` then
1333    ///     recovers the workspace cheaply.
1334    ///
1335    /// - Any other `DaemonError` → `record_failure` + transition to
1336    ///   `WorkspaceState::Failed`. This is the entry point to A2
1337    ///   §G.7's stale-serve flow for that workspace.
1338    fn record_and_transition_on_err(&self, ws: &LoadedWorkspace, err: &DaemonError) {
1339        if matches!(err, DaemonError::WorkspaceEvicted { .. }) {
1340            // Cluster-G iter-3 BLOCKER 3 fix: differentiate
1341            // eviction-path from reset-path cancellations by reading
1342            // the state the eviction path would have written. See
1343            // doc-comment above for the full rationale.
1344            if ws.load_state() == WorkspaceState::Rebuilding {
1345                ws.store_state(WorkspaceState::Unloaded);
1346            }
1347            return;
1348        }
1349        ws.record_failure(clone_err(err));
1350        ws.store_state(WorkspaceState::Failed);
1351    }
1352
1353    /// Drive the actual sqry-core rebuild pipeline on a blocking
1354    /// thread, with cooperative cancellation via a forwarder task
1355    /// that mirrors `ws.rebuild_cancelled` into a
1356    /// [`CancellationToken`].
1357    ///
1358    /// Sync-in-async bridge: `build_unified_graph_cancellable` and
1359    /// `incremental_rebuild` are CPU-bound + use rayon internally, so
1360    /// they must not block a tokio runtime worker thread. The blocking
1361    /// closure owns the cloned `Arc<PluginManager>` /
1362    /// [`BuildConfig`] / `Arc<CodeGraph>` / cancellation token so it
1363    /// outlives the awaited JoinHandle.
1364    ///
1365    /// Task 7 Phase 7c: a tokio task (`spawn_cancellation_forwarder`)
1366    /// polls `ws.rebuild_cancelled` at `CANCEL_FORWARDER_POLL_MS`
1367    /// cadence; the first `true` observation calls `token.cancel()`.
1368    /// The forwarder is `abort()`ed after the rebuild future returns
1369    /// regardless of outcome — polling stops immediately.
1370    ///
1371    /// # Error mapping
1372    ///
1373    /// - `GraphBuilderError::Cancelled` → `DaemonError::WorkspaceEvicted`
1374    ///   (cancellation only fires on eviction per the forwarder
1375    ///   contract).
1376    /// - Any other `GraphBuilderError` → `DaemonError::WorkspaceBuildFailed`
1377    ///   with a human-readable reason.
1378    /// - `spawn_blocking` join errors (panic inside the closure) →
1379    ///   `WorkspaceBuildFailed`.
1380    async fn execute_rebuild(
1381        &self,
1382        key: &WorkspaceKey,
1383        ws: &Arc<LoadedWorkspace>,
1384        prior: &Arc<CodeGraph>,
1385        mode: RebuildMode,
1386        changes: ChangeSet,
1387    ) -> Result<CodeGraph, DaemonError> {
1388        let root = key.source_root.clone();
1389        let plugins = Arc::clone(&self.plugins);
1390        let cfg = self.build_config.clone();
1391        let prior_for_blocking = Arc::clone(prior);
1392        let root_for_err = root.clone();
1393
1394        // Task 7 Phase 7c: fresh cancellation token per iteration so
1395        // a cancelled token from a prior run cannot permanently break
1396        // subsequent rebuilds on the same workspace.
1397        let token = CancellationToken::new();
1398        // Task 7 Phase 7c feat iter-1: optional forwarder suppression
1399        // for §5e publish-path recheck tests. Production builds and
1400        // tests without a `TestCapture` always spawn the forwarder;
1401        // only a test with `suppress_forwarder=true` skips it.
1402        let forwarder_handle = if self
1403            .test_capture
1404            .get()
1405            .is_some_and(|cap| cap.suppress_forwarder.load(Ordering::Acquire))
1406        {
1407            None
1408        } else {
1409            Some(spawn_cancellation_forwarder(Arc::clone(ws), token.clone()))
1410        };
1411        // Task 7 Phase 7c feat iter-2 (Codex MAJOR 1): optional
1412        // synchronous pre-cancel for pass-boundary-determinism
1413        // tests. When armed, the token is already cancelled by the
1414        // time spawn_blocking dispatches the pipeline, so the very
1415        // first `cancellation.check()?` inside
1416        // `build_unified_graph_cancellable` /
1417        // `incremental_rebuild` fires. Forces the pass-boundary
1418        // cancellation surface without racing the forwarder.
1419        if self.test_capture.get().is_some_and(|cap| {
1420            cap.precancel_token_for_pass_boundary
1421                .load(Ordering::Acquire)
1422        }) {
1423            token.cancel();
1424        }
1425
1426        let token_for_blocking = token.clone();
1427        let join_result = tokio::task::spawn_blocking(move || {
1428            execute_rebuild_blocking(
1429                &root,
1430                &prior_for_blocking,
1431                mode,
1432                changes,
1433                &plugins,
1434                &cfg,
1435                &token_for_blocking,
1436            )
1437        })
1438        .await;
1439
1440        // Task 7 Phase 7c: stop the forwarder unconditionally once the
1441        // rebuild future completes. (Iter-1 Option<JoinHandle>: if
1442        // forwarder suppression is armed in TestCapture, handle is
1443        // None — nothing to abort.)
1444        if let Some(handle) = forwarder_handle {
1445            handle.abort();
1446        }
1447
1448        match join_result {
1449            Ok(Ok(graph)) => Ok(graph),
1450            Ok(Err(e)) => Err(e),
1451            Err(join_err) => Err(DaemonError::WorkspaceBuildFailed {
1452                root: root_for_err,
1453                reason: format!("spawn_blocking join error: {join_err}"),
1454            }),
1455        }
1456    }
1457
1458    /// Task 7 Phase 7c: test-only observation + stall point inside
1459    /// `execute_one_rebuild`, fired AFTER `reserve_rebuild` returns
1460    /// Ok and BEFORE `execute_rebuild` runs the blocking pipeline.
1461    ///
1462    /// Production builds with no `TestCapture` installed see a single
1463    /// atomic load + return. With a capture installed, each
1464    /// invocation:
1465    ///
1466    /// 1. Fires `post_reservation_reached.notify_waiters()` so tests
1467    ///    awaiting `wait_until_post_reservation()` return.
1468    /// 2. If `post_reservation_hold.load > 0`, stalls on
1469    ///    `post_reservation_release.notified()` — matches the 7b1
1470    ///    `Notify` handshake pattern (arm `notified()` future BEFORE
1471    ///    re-checking `hold`) to close the lost-wakeup window.
1472    async fn post_reservation_check(&self) {
1473        let Some(cap) = self.test_capture.get() else {
1474            return;
1475        };
1476        // Iter-2 Codex MAJOR 2: set the durable reached-flag BEFORE
1477        // firing the notify. A test that awaits via
1478        // `wait_until_post_reservation` observes either the flag
1479        // (fast path) or the notify (slow path); the flag closes the
1480        // lost-wakeup hole where the hook fires before the test
1481        // arms its await.
1482        cap.post_reservation_reached_flag
1483            .store(true, Ordering::Release);
1484        cap.post_reservation_reached.notify_waiters();
1485        if cap.post_reservation_hold.load(Ordering::Acquire) == 0 {
1486            return;
1487        }
1488        let notified = cap.post_reservation_release.notified();
1489        if cap.post_reservation_hold.load(Ordering::Acquire) > 0 {
1490            notified.await;
1491            // Decrement once per release.
1492            cap.post_reservation_hold.fetch_sub(1, Ordering::AcqRel);
1493        }
1494    }
1495
1496    /// Encode the mode into the atomic observability slot.
1497    fn store_last_mode(&self, mode: RebuildMode) {
1498        self.last_mode.store(mode.as_u8(), Ordering::Relaxed);
1499    }
1500
1501    // -----------------------------------------------------------------
1502    // Per-workspace watcher bridge (Task 7 Phase 7b2)
1503    // -----------------------------------------------------------------
1504
1505    /// Idempotently spawn (if not already active) the per-workspace
1506    /// watcher + async dispatcher task pair for `(key, ws, root)`.
1507    ///
1508    /// # Idempotence
1509    ///
1510    /// Looks up `watchers[key]`:
1511    /// - If present AND the entry's `live` flag is `true`, returns
1512    ///   `Ok(())` without spawning — an active pair is already
1513    ///   producing dispatches.
1514    /// - If present AND `live == false` (the async task has started
1515    ///   its post-loop cleanup), prunes the stale entry and spawns a
1516    ///   new pair with a fresh generation.
1517    /// - If absent, spawns a new pair.
1518    ///
1519    /// # Shutdown lifecycle
1520    ///
1521    /// Each pair is cooperatively shut down by:
1522    /// - Eviction (`ws.rebuild_cancelled = true` propagates through
1523    ///   the cancellable watcher → blocking thread exits → mpsc
1524    ///   sender drops → async task exits), OR
1525    /// - The async task observing `DaemonError::WorkspaceEvicted`
1526    ///   from `handle_changes_with_git_state` → async task exits →
1527    ///   receiver drops → blocking thread's next send fails → exits.
1528    ///
1529    /// The async task's last action is `live.store(false)` followed
1530    /// by [`Self::reap_watcher`]`(&key, generation)` — removing the
1531    /// entry from the map via compare-and-remove on the generation.
1532    ///
1533    /// # Placement constraint
1534    ///
1535    /// This method lives on `RebuildDispatcher`, NOT `WorkspaceManager`
1536    /// (per the 7b2 RESUME_PROMPT constraint): coupling watcher
1537    /// lifecycle into `manager.get_or_load` would pollute the
1538    /// manager's responsibilities and create a dispatcher↔manager
1539    /// cycle. Test harnesses (and Task 9's future daemon bootstrap)
1540    /// call this method explicitly after `get_or_load` succeeds.
1541    ///
1542    /// # Errors
1543    ///
1544    /// - [`DaemonError::Io`] if
1545    ///   [`sqry_core::watch::SourceTreeWatcher::new`] fails (typical
1546    ///   cause: `.gitignore` read error or
1547    ///   `notify::RecommendedWatcher::new` failure).
1548    pub async fn ensure_watching(
1549        self: &Arc<Self>,
1550        key: &WorkspaceKey,
1551        ws: Arc<LoadedWorkspace>,
1552        root: PathBuf,
1553    ) -> Result<(), DaemonError> {
1554        // Hold `self.watchers` across the ENTIRE operation — check,
1555        // watcher construction, spawn, and insert. Releasing the
1556        // lock between the liveness check and the insert would
1557        // permit two concurrent callers for the same `WorkspaceKey`
1558        // to both pass the "no live entry" check and both spawn
1559        // watcher pairs; the later insert would replace the tracked
1560        // entry without stopping the earlier spawned pair (Tokio
1561        // `JoinHandle::drop` detaches, per the `WatcherEntry`
1562        // docstring), producing duplicate rebuild dispatches and
1563        // leaked watcher resources. This issue was flagged by the
1564        // 7b2 iter-0 feat review (MAJOR).
1565        //
1566        // Holding a `parking_lot::Mutex` across sync operations is
1567        // the intended usage. The critical section covers:
1568        //   1. Fast-path liveness check (atomic read).
1569        //   2. `next_watcher_generation.fetch_add` (atomic).
1570        //   3. `SourceTreeWatcher::new` — bounded sync I/O
1571        //      (.gitignore read + notify subscribe). Typically 1–5
1572        //      ms; does NOT block on any lock held by code that
1573        //      might reacquire `self.watchers`.
1574        //   4. Git-state priming — one `RwLock` write on the
1575        //      workspace's `last_indexed_git_state`. Distinct lock
1576        //      from `watchers`; no lock-order violation.
1577        //   5. `tokio::sync::mpsc::channel` — sync allocation.
1578        //   6. `tokio::spawn` + `tokio::task::spawn_blocking` —
1579        //      enqueue-only, do not yield. Task bodies may later
1580        //      call `reap_watcher`, which re-acquires
1581        //      `self.watchers`; parking_lot blocks the executor
1582        //      thread briefly until we release here. Acceptable
1583        //      because the window is sub-ms after spawn.
1584        //   7. `HashMap::insert` (instant).
1585        //
1586        // No `.await` points exist between steps 1–7, so the async
1587        // function's single poll progresses synchronously while
1588        // the lock is held; the lock is released at function
1589        // return (including the error-return path for
1590        // `SourceTreeWatcher::new` failures via `?`).
1591        let mut watchers = self.watchers.lock();
1592
1593        // Step 1 — Fast-path liveness check.
1594        if let Some(entry) = watchers.get(key)
1595            && entry.live.load(Ordering::Acquire)
1596        {
1597            return Ok(());
1598        }
1599
1600        // Step 2 — Allocate a monotonic generation token.
1601        let generation = self.next_watcher_generation.fetch_add(1, Ordering::Relaxed);
1602
1603        // Step 3 — Construct the watcher (sync I/O). Errors bubble
1604        // up via `?`; the locked guard is dropped on return.
1605        let watcher = SourceTreeWatcher::new(&root).map_err(|e| {
1606            DaemonError::Io(std::io::Error::other(format!(
1607                "failed to create watcher for {}: {e:#}",
1608                root.display()
1609            )))
1610        })?;
1611
1612        // Step 4 — Prime `ws.last_indexed_git_state` with the
1613        // CURRENT git snapshot. Without this, the classifier has no
1614        // baseline on the first debounce window and every benign
1615        // `.git/` event would produce `git_change_class = None`
1616        // (which the bridge cannot distinguish from a real
1617        // divergence without a baseline). Priming is a single
1618        // atomic write — if a real git operation is in flight
1619        // concurrently, the snapshot captures whichever side wins;
1620        // subsequent debounce windows compare against it and
1621        // classify correctly.
1622        //
1623        // Only PRIME if no baseline exists yet. A respawn (after
1624        // evict+reload) should NOT overwrite a baseline that the
1625        // prior watcher successfully committed, because that
1626        // baseline reflects the last PUBLISHED graph's git state —
1627        // overwriting it would lose the classifier's memory.
1628        {
1629            let mut baseline = ws.last_indexed_git_state.write();
1630            if baseline.is_none() {
1631                *baseline = Some(watcher.git_state().current_state());
1632            }
1633        }
1634
1635        // Step 5 — Bounded tokio mpsc: capacity 16 is generous —
1636        // the async consumer drains items at dispatch rate and the
1637        // blocking producer already consolidates many filesystem
1638        // events into a single ChangeSet before sending.
1639        let (tx, rx) = tokio::sync::mpsc::channel::<(ChangeSet, LastIndexedGitState)>(16);
1640
1641        let debounce = Duration::from_millis(self.config.debounce_ms);
1642        // 100 ms is the design's recommended cancellation-poll cadence:
1643        // tight enough that an evicted workspace's watcher thread
1644        // terminates promptly, loose enough not to burn CPU on a
1645        // quiet repo.
1646        let cancel_poll_period = Duration::from_millis(100);
1647
1648        // Liveness flag shared between the stored entry and the async
1649        // task's post-loop cleanup. Flipped to `false` BEFORE
1650        // reap_watcher is called so `ensure_watching` re-calls for
1651        // the same key observe "drained" rather than "live".
1652        let live = Arc::new(AtomicBool::new(true));
1653
1654        // Step 6a — Spawn the blocking watcher thread.
1655        let blocking_handle = {
1656            let ws = Arc::clone(&ws);
1657            tokio::task::spawn_blocking(move || {
1658                watch_loop_blocking(&watcher, &tx, &ws, debounce, cancel_poll_period);
1659            })
1660        };
1661
1662        // Step 6b — Spawn the async dispatcher task.
1663        let async_handle = {
1664            let dispatcher = Arc::clone(self);
1665            let key = key.clone();
1666            let ws = Arc::clone(&ws);
1667            let live_for_task = Arc::clone(&live);
1668            tokio::spawn(async move {
1669                dispatch_loop_async(&dispatcher, &key, &ws, rx).await;
1670                // Mark ourselves as draining BEFORE reap_watcher so a
1671                // concurrent ensure_watching observes the correct
1672                // liveness state.
1673                live_for_task.store(false, Ordering::Release);
1674                dispatcher.reap_watcher(&key, generation);
1675            })
1676        };
1677
1678        // Step 7 — Prune any stale entry + insert the new one.
1679        // Prune covers the case where a prior entry existed with
1680        // `live == false` (observed by step 1 falling through).
1681        // `remove` is idempotent when no entry exists.
1682        watchers.remove(key);
1683        watchers.insert(
1684            key.clone(),
1685            WatcherEntry {
1686                generation,
1687                live,
1688                async_handle,
1689                blocking_handle,
1690            },
1691        );
1692        // `watchers` drops here → lock released.
1693        Ok(())
1694    }
1695
1696    /// Remove the watcher entry for `key` if and only if the stored
1697    /// entry's generation equals `my_generation`. Called by the
1698    /// per-workspace async task as its LAST action before exit.
1699    ///
1700    /// # Why compare-and-remove
1701    ///
1702    /// A fast evict+reload sequence can result in:
1703    /// 1. Old watcher A (gen 0) exits cooperatively.
1704    /// 2. Before A's closure finishes, `ensure_watching` is called
1705    ///    again for the same key and observes A's `live == false`,
1706    ///    prunes A's entry, and inserts new watcher B (gen 1).
1707    /// 3. A's closure reaches its final statement — `reap_watcher`.
1708    ///
1709    /// Without a generation check, A's reap would delete B's entry.
1710    /// Compare-and-remove guarantees A's reap is a no-op because
1711    /// `entry.generation == 1 != 0 == my_generation`.
1712    ///
1713    /// # Test observability
1714    ///
1715    /// Exposed as `pub(crate)` because external callers should never
1716    /// need to force-reap a watcher — cooperative shutdown via
1717    /// `rebuild_cancelled` triggers reap automatically. Tests that
1718    /// assert on the map size use [`Self::watchers_len`].
1719    pub(crate) fn reap_watcher(&self, key: &WorkspaceKey, my_generation: u64) {
1720        let mut watchers = self.watchers.lock();
1721        if let Some(entry) = watchers.get(key)
1722            && entry.generation == my_generation
1723        {
1724            watchers.remove(key);
1725        }
1726    }
1727
1728    /// **Test-only** size observation on the watchers map.
1729    ///
1730    /// Used by `rebuild_watcher_shutdown.rs` to assert that the
1731    /// eviction cascade reaches quiescence (both tasks exit AND the
1732    /// map entry is reaped). Production callers should consult
1733    /// workspace-level `status()` rather than the dispatcher's
1734    /// bookkeeping.
1735    #[doc(hidden)]
1736    #[must_use]
1737    pub fn watchers_len(&self) -> usize {
1738        self.watchers.lock().len()
1739    }
1740}
1741
1742// ---------------------------------------------------------------------------
1743// Watcher bridge loops (Task 7 Phase 7b2)
1744// ---------------------------------------------------------------------------
1745//
1746// These are free functions (not `RebuildDispatcher` methods) so the
1747// closures passed to `tokio::task::spawn_blocking` and `tokio::spawn`
1748// in `ensure_watching` own only the state they need. The blocking
1749// loop is `Send + 'static` (captures the `SourceTreeWatcher`, mpsc
1750// sender, workspace Arc, and two Durations); the async loop is
1751// `Send + 'static` (captures the dispatcher Arc, workspace key+Arc,
1752// and the mpsc receiver).
1753
1754/// Blocking watcher loop — runs on `tokio::task::spawn_blocking`.
1755///
1756/// Repeatedly calls
1757/// [`SourceTreeWatcher::wait_for_changes_cancellable`](sqry_core::watch::SourceTreeWatcher::wait_for_changes_cancellable)
1758/// using the workspace's last-indexed baseline as the classifier
1759/// reference. On each non-empty `ChangeSet`, captures the current
1760/// git state via `watcher.git_state().current_state()` and forwards
1761/// the `(ChangeSet, LastIndexedGitState)` pair to the async
1762/// dispatcher task via tokio mpsc.
1763///
1764/// # Termination
1765///
1766/// Exits on:
1767/// - `wait_for_changes_cancellable` returns `Ok(None)` — eviction
1768///   observed cooperatively via `ws.rebuild_cancelled`.
1769/// - `wait_for_changes_cancellable` returns `Err` — notify channel
1770///   disconnect (unrecoverable); logged at error level.
1771/// - `tx.blocking_send` returns `Err` — async receiver dropped
1772///   (normal shutdown); logged at debug.
1773fn watch_loop_blocking(
1774    watcher: &SourceTreeWatcher,
1775    tx: &tokio::sync::mpsc::Sender<(ChangeSet, LastIndexedGitState)>,
1776    ws: &LoadedWorkspace,
1777    debounce: Duration,
1778    cancel_poll_period: Duration,
1779) {
1780    loop {
1781        let last_git = ws.last_indexed_git_state.read().clone();
1782        match watcher.wait_for_changes_cancellable(
1783            debounce,
1784            last_git.as_ref(),
1785            &ws.rebuild_cancelled,
1786            cancel_poll_period,
1787        ) {
1788            Ok(None) => {
1789                tracing::info!(
1790                    target: "sqry_daemon::watch",
1791                    workspace = %ws.key.source_root.display(),
1792                    "watcher cancelled; terminating blocking loop"
1793                );
1794                break;
1795            }
1796            Err(e) => {
1797                tracing::error!(
1798                    target: "sqry_daemon::watch",
1799                    workspace = %ws.key.source_root.display(),
1800                    error = %e,
1801                    "watcher channel disconnected; terminating blocking loop"
1802                );
1803                break;
1804            }
1805            Ok(Some(cs)) if cs.is_empty() => {
1806                // Empty ChangeSet — watcher debounced a burst of
1807                // events that all got filtered (editor temps,
1808                // gitignored paths, .git/ internals). Do not wake
1809                // the async side; loop and wait for the next batch.
1810                continue;
1811            }
1812            Ok(Some(cs)) if cs.changed_files.is_empty() && !cs.requires_full_rebuild() => {
1813                // Git-state-only change whose classifier output does
1814                // NOT require a full rebuild (Noise or LocalCommit
1815                // class). A2 §B mandates: these classes are reported
1816                // for telemetry but do not trigger a rebuild by
1817                // themselves — a real commit that changed the working
1818                // tree was already observed as a source-tree event.
1819                // Skip silently; loop for the next debounce window.
1820                continue;
1821            }
1822            Ok(Some(cs)) if cs.changed_files.is_empty() && cs.requires_full_rebuild() => {
1823                // Empty-files + full-rebuild classification
1824                // (BranchSwitch or TreeDiverged) is either a TOCTOU
1825                // artifact or a graph-neutral git operation. In both
1826                // cases, skipping is correct:
1827                //
1828                // * **TOCTOU artifact.** Immediately after a git
1829                //   operation, the classifier's
1830                //   `git rev-parse HEAD HEAD^{tree}` subprocess can
1831                //   transiently return partial output, causing
1832                //   `current_state` fields to drop to `None` and the
1833                //   classifier to fall back to BranchSwitch (see
1834                //   `GitStateWatcher::classify` at
1835                //   `sqry-core/src/watch/git_state.rs:240-258`). A
1836                //   subsequent debounce window will re-observe once
1837                //   git settles and fire a legitimate dispatch if
1838                //   state actually diverged.
1839                //
1840                // * **Graph-neutral branch/tree move.** Git can
1841                //   genuinely switch refs without swapping
1842                //   working-tree content when both refs point at the
1843                //   same tree (for example,
1844                //   `git checkout other-branch` where `other-branch`
1845                //   is already at HEAD's tree). The classifier
1846                //   reports BranchSwitch because `head_ref` changed,
1847                //   but no source file events fire because no source
1848                //   content changed. The published graph is already
1849                //   consistent with the new ref — a rebuild would be
1850                //   pure overhead.
1851                //
1852                // A "real" tree divergence that our graph does not
1853                // yet reflect (a pull, a reset, a branch switch that
1854                // actually rewrites files) emits concrete source
1855                // file events the source-tree watcher captures; that
1856                // case falls through to the dispatch arm below and
1857                // triggers the rebuild as intended.
1858                tracing::debug!(
1859                    target: "sqry_daemon::watch",
1860                    workspace = %ws.key.source_root.display(),
1861                    git_class = ?cs.git_change_class,
1862                    "skipping empty-files full-rebuild signal: TOCTOU or graph-neutral git move"
1863                );
1864                continue;
1865            }
1866            Ok(Some(cs)) => {
1867                // Capture the git state AS OF now (after debounce
1868                // completion). The async side will attach this to
1869                // the PendingRebuild via
1870                // `handle_changes_with_git_state`; the runner will
1871                // commit it to `ws.last_indexed_git_state` at
1872                // publish time.
1873                let new_git_state = watcher.git_state().current_state();
1874                if tx.blocking_send((cs, new_git_state)).is_err() {
1875                    tracing::debug!(
1876                        target: "sqry_daemon::watch",
1877                        workspace = %ws.key.source_root.display(),
1878                        "async dispatcher task dropped receiver; terminating blocking loop"
1879                    );
1880                    break;
1881                }
1882            }
1883        }
1884    }
1885}
1886
1887/// Async dispatcher loop — runs on a `tokio::spawn`ed task.
1888///
1889/// Consumes `(ChangeSet, LastIndexedGitState)` pairs from `rx` and
1890/// dispatches each via
1891/// [`RebuildDispatcher::handle_changes_with_git_state`]. The runner
1892/// commits `ws.last_indexed_git_state` as part of its publish
1893/// bookkeeping, keyed off the attached snapshot.
1894///
1895/// # Termination
1896///
1897/// Exits on:
1898/// - `rx.recv()` returns `None` — blocking side exited, channel
1899///   closed; logged at debug.
1900/// - `handle_changes_with_git_state` returns
1901///   `Err(WorkspaceEvicted)` — workspace is gone; logged at info.
1902///
1903/// Transient errors (`MemoryBudgetExceeded`, `WorkspaceBuildFailed`,
1904/// `Io`) continue the loop — the baseline is not advanced (the
1905/// publish did not happen), so the next wait_for_changes_cancellable
1906/// call re-observes the divergence and retries.
1907async fn dispatch_loop_async(
1908    dispatcher: &Arc<RebuildDispatcher>,
1909    key: &WorkspaceKey,
1910    ws: &LoadedWorkspace,
1911    mut rx: tokio::sync::mpsc::Receiver<(ChangeSet, LastIndexedGitState)>,
1912) {
1913    loop {
1914        let Some((cs, new_git_state)) = rx.recv().await else {
1915            tracing::debug!(
1916                target: "sqry_daemon::watch",
1917                workspace = %ws.key.source_root.display(),
1918                "watcher channel closed; terminating async dispatcher"
1919            );
1920            break;
1921        };
1922        match dispatcher
1923            .handle_changes_with_git_state(key, cs, new_git_state)
1924            .await
1925        {
1926            Ok(()) => {
1927                // Baseline advance (if any) was handled by the
1928                // runner inside execute_one_rebuild at publish
1929                // time — nothing for the bridge to do here.
1930            }
1931            Err(DaemonError::WorkspaceEvicted { .. }) => {
1932                tracing::info!(
1933                    target: "sqry_daemon::watch",
1934                    workspace = %ws.key.source_root.display(),
1935                    "workspace evicted; terminating async dispatcher"
1936                );
1937                break;
1938            }
1939            Err(e) => {
1940                tracing::warn!(
1941                    target: "sqry_daemon::watch",
1942                    workspace = %ws.key.source_root.display(),
1943                    error = %e,
1944                    "rebuild failed; baseline unchanged, retrying on next change"
1945                );
1946                // loop continues
1947            }
1948        }
1949    }
1950}
1951
1952/// Run the appropriate sqry-core entrypoint on the current (blocking)
1953/// thread. Factored out of `execute_rebuild` so the blocking closure
1954/// is a plain free function — easier to review and easier to mock in
1955/// unit tests if future phases need to.
1956///
1957/// Task 7 Phase 7c: takes a [`CancellationToken`] that's polled at
1958/// every pass boundary. A cancelled token produces
1959/// [`GraphBuilderError::Cancelled`] which this helper maps to
1960/// [`DaemonError::WorkspaceEvicted`] — cancellation only fires when
1961/// the workspace is evicted (the dispatcher's
1962/// [`spawn_cancellation_forwarder`] flips the token on observing
1963/// `ws.rebuild_cancelled = true`).
1964fn execute_rebuild_blocking(
1965    root: &std::path::Path,
1966    prior: &Arc<CodeGraph>,
1967    mode: RebuildMode,
1968    changes: ChangeSet,
1969    plugins: &PluginManager,
1970    cfg: &BuildConfig,
1971    cancellation: &CancellationToken,
1972) -> Result<CodeGraph, DaemonError> {
1973    match mode {
1974        RebuildMode::Full => {
1975            match build_unified_graph_cancellable(root, plugins, cfg, cancellation) {
1976                Ok(graph) => Ok(graph),
1977                Err(e) => Err(map_graph_builder_err(e, root.to_path_buf(), "full rebuild")),
1978            }
1979        }
1980        RebuildMode::Incremental => {
1981            let paths: &[PathBuf] = &changes.changed_files;
1982
1983            // Closure math — resolve paths registered in the graph.
1984            // Unresolved paths are handled by
1985            // `phase3e_discover_new_file_paths` inside
1986            // `incremental_rebuild`.
1987            let file_ids: Vec<_> = paths.iter().filter_map(|p| prior.files().get(p)).collect();
1988            let closure = compute_reverse_dep_closure(&file_ids, prior.as_ref());
1989
1990            // Task 7 Phase 7c: real cancellation token from
1991            // `LoadedWorkspace::rebuild_cancelled`, wired via the
1992            // forwarder spawned in `execute_rebuild`.
1993            incremental_rebuild(prior.as_ref(), paths, &closure, plugins, cfg, cancellation)
1994                .map_err(|e| map_graph_builder_err(e, root.to_path_buf(), "incremental rebuild"))
1995        }
1996    }
1997}
1998
1999/// Map a sqry-core [`GraphBuilderError`] to the daemon surface type.
2000///
2001/// - `Cancelled` → [`DaemonError::WorkspaceEvicted`] (JSON-RPC -32004).
2002///   Cancellation only fires on eviction in the current design, so the
2003///   evicted-workspace termination signal is the correct mapping.
2004/// - Any other variant → [`DaemonError::WorkspaceBuildFailed`] (-32001)
2005///   with a human-readable reason prefixed by `stage`.
2006fn map_graph_builder_err(err: GraphBuilderError, root: PathBuf, stage: &str) -> DaemonError {
2007    match err {
2008        GraphBuilderError::Cancelled => DaemonError::WorkspaceEvicted { root },
2009        other => DaemonError::WorkspaceBuildFailed {
2010            root,
2011            reason: format!("{stage}: {other}"),
2012        },
2013    }
2014}
2015
2016// ---------------------------------------------------------------------------
2017// Cancellation forwarder (Task 7 Phase 7c)
2018// ---------------------------------------------------------------------------
2019
2020/// Poll period for the cancellation forwarder. 50 ms is coarse enough
2021/// to keep the background task's CPU footprint negligible while still
2022/// bounding cancellation latency at `50ms + next pass boundary`. Tests
2023/// that need faster propagation can lower via a future hook; the
2024/// constant is sufficient for production.
2025const CANCEL_FORWARDER_POLL_MS: u64 = 50;
2026
2027/// Spawn a tokio task that mirrors `ws.rebuild_cancelled` into
2028/// `token`. The task polls the atomic on a [`CANCEL_FORWARDER_POLL_MS`]
2029/// cadence; the first observation of `true` calls `token.cancel()`
2030/// and exits.
2031///
2032/// The returned [`JoinHandle`] MUST be `abort()`ed by the caller after
2033/// the rebuild future completes — otherwise a quiet workspace (no
2034/// eviction) leaves the polling task running until the runtime is
2035/// dropped.
2036///
2037/// Task 7 Phase 7c rationale (Codex iter-2 Q2, Q9): a `Notify`-based
2038/// forwarder is not demonstrably better here — the atomic remains the
2039/// authoritative source of truth, lock-free, with
2040/// `Release`/`Acquire` ordering. Polling adds one atomic load every
2041/// 50 ms, which is negligible against rebuild timescales (seconds).
2042fn spawn_cancellation_forwarder(
2043    ws: Arc<LoadedWorkspace>,
2044    token: CancellationToken,
2045) -> JoinHandle<()> {
2046    tokio::spawn(async move {
2047        loop {
2048            if ws.rebuild_cancelled.load(Ordering::Acquire) {
2049                token.cancel();
2050                return;
2051            }
2052            tokio::time::sleep(std::time::Duration::from_millis(CANCEL_FORWARDER_POLL_MS)).await;
2053        }
2054    })
2055}
2056
2057// ---------------------------------------------------------------------------
2058// DrainLoopSentinel — panic-safety for rebuild_in_flight (Task 7 Phase 7b1)
2059// ---------------------------------------------------------------------------
2060
2061/// Panic-safety sentinel for the Phase B drain loop in
2062/// [`RebuildDispatcher::handle_changes`].
2063///
2064/// Guarantees that [`LoadedWorkspace::rebuild_in_flight`] is released
2065/// if `handle_changes` unwinds abnormally. The normal path disarms
2066/// the sentinel (`armed = false`) after releasing `rebuild_in_flight`
2067/// under the lane lock inside the drain-loop-exit block; the Drop
2068/// impl is a no-op on the happy path.
2069///
2070/// # Narrow race on the unwind path
2071///
2072/// If `handle_changes` unwinds abnormally, the Drop impl stores
2073/// `rebuild_in_flight = false` WITHOUT re-acquiring the lane. There
2074/// is a narrow window between the unwind and the Drop store during
2075/// which a concurrent caller can:
2076///
2077/// 1. Lock the lane (freed by the unwinding guard's tokio-Mutex drop).
2078/// 2. Coalesce incoming with whatever was in the lane.
2079/// 3. Observe `rebuild_in_flight = true` (we have not stored `false`
2080///    yet).
2081/// 4. Park coalesced in the lane and return `Ok(())`.
2082///
2083/// After the sentinel's Drop stores `false`, that parked
2084/// `PendingRebuild` sits without a runner until the NEXT dispatch
2085/// arrives — which will see `rebuild_in_flight = false`, take the
2086/// runner role, drain the stranded pending, and process it.
2087///
2088/// This is accepted as defense-in-depth: the only realistic trigger
2089/// for a `handle_changes` unwind is a runtime-level failure (OOM,
2090/// tokio internal panic) in which case the daemon is already in
2091/// damage-control territory. Plugin panics during the rebuild
2092/// pipeline are caught by `spawn_blocking` inside
2093/// [`RebuildDispatcher::execute_rebuild`] and mapped to
2094/// [`DaemonError::WorkspaceBuildFailed`], which flows through the
2095/// drain loop as `last_result` — NOT as an unwind.
2096struct DrainLoopSentinel {
2097    /// Shared workspace ref so the `Drop` impl outlives any borrow.
2098    ws: Arc<LoadedWorkspace>,
2099    /// Disarmed (`false`) after the normal-path under-lane release
2100    /// in the drain loop's exit blocks.
2101    armed: bool,
2102}
2103
2104impl Drop for DrainLoopSentinel {
2105    fn drop(&mut self) {
2106        if !self.armed {
2107            return;
2108        }
2109        tracing::error!(
2110            target: "sqry_daemon::rebuild",
2111            workspace = %self.ws.key.source_root.display(),
2112            "handle_changes unwound with armed DrainLoopSentinel — \
2113             releasing rebuild_in_flight defensively; any parked \
2114             PendingRebuild will be processed on the next dispatch"
2115        );
2116        self.ws.rebuild_in_flight.store(false, Ordering::Release);
2117    }
2118}
2119
2120// ---------------------------------------------------------------------------
2121// Inline unit tests — narrow helpers only.
2122// ---------------------------------------------------------------------------
2123//
2124// The exhaustive decision-fork / coalesce-algebra / integration
2125// matrices live in the `tests/rebuild_*` binaries. These inline
2126// tests pin down the private helpers (`RebuildMode` encoding,
2127// `merge_git_class`, `DrainLoopSentinel` Drop semantics) that the
2128// external binaries exercise indirectly.
2129#[cfg(test)]
2130mod tests {
2131    use super::*;
2132
2133    #[test]
2134    fn rebuild_mode_u8_roundtrip() {
2135        for mode in [RebuildMode::Full, RebuildMode::Incremental] {
2136            let encoded = mode.as_u8();
2137            assert_eq!(RebuildMode::from_u8(encoded), Some(mode));
2138        }
2139        // Unset (fresh AtomicU8) → None.
2140        assert_eq!(RebuildMode::from_u8(0), None);
2141        // Out-of-range → None.
2142        assert_eq!(RebuildMode::from_u8(3), None);
2143        assert_eq!(RebuildMode::from_u8(255), None);
2144    }
2145
2146    #[test]
2147    fn merge_git_class_full_rebuild_dominance_canonicalises_to_tree_diverged() {
2148        for full_variant in [GitChangeClass::BranchSwitch, GitChangeClass::TreeDiverged] {
2149            for non_full in [
2150                None,
2151                Some(GitChangeClass::LocalCommit),
2152                Some(GitChangeClass::Noise),
2153            ] {
2154                assert_eq!(
2155                    merge_git_class(Some(full_variant), non_full),
2156                    Some(GitChangeClass::TreeDiverged),
2157                );
2158                assert_eq!(
2159                    merge_git_class(non_full, Some(full_variant)),
2160                    Some(GitChangeClass::TreeDiverged),
2161                );
2162            }
2163        }
2164    }
2165
2166    #[test]
2167    fn merge_git_class_non_full_later_wins() {
2168        assert_eq!(
2169            merge_git_class(
2170                Some(GitChangeClass::LocalCommit),
2171                Some(GitChangeClass::Noise)
2172            ),
2173            Some(GitChangeClass::Noise),
2174        );
2175        assert_eq!(
2176            merge_git_class(
2177                Some(GitChangeClass::Noise),
2178                Some(GitChangeClass::LocalCommit)
2179            ),
2180            Some(GitChangeClass::LocalCommit),
2181        );
2182    }
2183
2184    #[test]
2185    fn merge_git_class_absorbs_none_symmetrically() {
2186        assert_eq!(merge_git_class(None, None), None);
2187        assert_eq!(
2188            merge_git_class(None, Some(GitChangeClass::Noise)),
2189            Some(GitChangeClass::Noise),
2190        );
2191        assert_eq!(
2192            merge_git_class(Some(GitChangeClass::LocalCommit), None),
2193            Some(GitChangeClass::LocalCommit),
2194        );
2195    }
2196
2197    // ---------------------------------------------------------------
2198    // DrainLoopSentinel (Task 7 Phase 7b1)
2199    // ---------------------------------------------------------------
2200
2201    fn make_sentinel_workspace() -> Arc<LoadedWorkspace> {
2202        use sqry_core::project::ProjectRootMode;
2203        Arc::new(LoadedWorkspace::new(
2204            WorkspaceKey::new(
2205                std::path::PathBuf::from("/repos/sentinel-test"),
2206                ProjectRootMode::GitRoot,
2207                0xBEEF,
2208            ),
2209            false,
2210        ))
2211    }
2212
2213    #[test]
2214    fn drain_loop_sentinel_disarmed_is_noop() {
2215        // Normal path: the drain loop disarms the sentinel after the
2216        // under-lane release. Dropping a disarmed sentinel must NOT
2217        // touch `rebuild_in_flight` — the release already happened.
2218        let ws = make_sentinel_workspace();
2219        // Simulate the drain loop having already released the flag.
2220        ws.rebuild_in_flight.store(false, Ordering::Release);
2221        {
2222            let sentinel = DrainLoopSentinel {
2223                ws: Arc::clone(&ws),
2224                armed: false,
2225            };
2226            // sentinel dropped here — disarmed, Drop is a no-op.
2227            drop(sentinel);
2228        }
2229        assert!(
2230            !ws.rebuild_in_flight.load(Ordering::Acquire),
2231            "disarmed sentinel must not flip the flag"
2232        );
2233    }
2234
2235    #[test]
2236    fn drain_loop_sentinel_armed_releases_in_flight_on_drop() {
2237        // Panic/unwind path: the sentinel is still armed when dropped.
2238        // Its Drop impl must release `rebuild_in_flight` so a future
2239        // caller can take the runner role.
2240        let ws = make_sentinel_workspace();
2241        ws.rebuild_in_flight.store(true, Ordering::Release);
2242        {
2243            let sentinel = DrainLoopSentinel {
2244                ws: Arc::clone(&ws),
2245                armed: true,
2246            };
2247            drop(sentinel);
2248        }
2249        assert!(
2250            !ws.rebuild_in_flight.load(Ordering::Acquire),
2251            "armed sentinel Drop must release rebuild_in_flight"
2252        );
2253    }
2254
2255    // ---------------------------------------------------------------
2256    // gate_check — TestGate plumbing (Task 7 Phase 7b2)
2257    // ---------------------------------------------------------------
2258    //
2259    // These tests stand up a RebuildDispatcher with no workspace and
2260    // exercise the gate_check helper in isolation. The dispatcher's
2261    // WorkspaceManager / PluginManager fields are initialised but
2262    // unused — gate_check only reads `self.test_gate`.
2263
2264    fn make_dispatcher_for_gate_test() -> Arc<RebuildDispatcher> {
2265        let config = Arc::new(crate::config::DaemonConfig::default());
2266        let manager = crate::workspace::WorkspaceManager::new_without_reaper(Arc::clone(&config));
2267        let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
2268        RebuildDispatcher::new(manager, config, plugins)
2269    }
2270
2271    #[tokio::test]
2272    async fn gate_check_is_noop_when_no_gate_installed() {
2273        // Production fast path: `test_gate.get()` returns `None`, the
2274        // helper short-circuits before allocating a `notified()`
2275        // future, and execute_one_rebuild proceeds immediately.
2276        let dispatcher = make_dispatcher_for_gate_test();
2277        // Call gate_check; it must return without awaiting anything.
2278        tokio::time::timeout(Duration::from_millis(50), dispatcher.gate_check())
2279            .await
2280            .expect("gate_check with no installed gate must return immediately");
2281    }
2282
2283    #[tokio::test]
2284    async fn gate_check_blocks_then_decrements_hold_on_release() {
2285        // Install a gate with hold=1. The first gate_check blocks
2286        // until notify_one is fired; after decrementing, subsequent
2287        // gate_checks pass through immediately because hold==0.
2288        let dispatcher = make_dispatcher_for_gate_test();
2289        let gate = Arc::new(TestGate {
2290            hold: AtomicUsize::new(1),
2291            release: tokio::sync::Notify::new(),
2292        });
2293        dispatcher
2294            .install_test_gate(Arc::clone(&gate))
2295            .expect("first install must succeed");
2296
2297        // Spawn a task that will block in gate_check.
2298        let dispatcher_for_task = Arc::clone(&dispatcher);
2299        let blocked = tokio::spawn(async move { dispatcher_for_task.gate_check().await });
2300
2301        // Give the task a moment to enter gate_check's await.
2302        tokio::time::sleep(Duration::from_millis(25)).await;
2303        assert!(
2304            !blocked.is_finished(),
2305            "gate_check must block while hold > 0"
2306        );
2307
2308        // Release. The blocked task wakes and decrements hold.
2309        gate.release.notify_one();
2310        tokio::time::timeout(Duration::from_millis(500), blocked)
2311            .await
2312            .expect("gate_check must complete promptly after release")
2313            .expect("task panicked");
2314
2315        // hold must have been decremented to 0.
2316        assert_eq!(
2317            gate.hold.load(Ordering::Acquire),
2318            0,
2319            "gate release must decrement hold"
2320        );
2321
2322        // Subsequent gate_check is a no-op (hold==0 short-circuit).
2323        tokio::time::timeout(Duration::from_millis(50), dispatcher.gate_check())
2324            .await
2325            .expect("gate_check with hold==0 must return immediately without awaiting");
2326    }
2327
2328    // ─────────────────────────────────────────────────────────────────
2329    // Cluster-G iter-3 BLOCKER 3 — `record_and_transition_on_err`
2330    // differentiates eviction from in-iteration `daemon reset`
2331    // cancellation.
2332    // ─────────────────────────────────────────────────────────────────
2333
2334    /// Eviction path: state was already written to `Evicted` by
2335    /// `execute_eviction` under `workspaces.write()` BEFORE the
2336    /// rebuild_cancelled flag was flipped. The runner observes
2337    /// `WorkspaceEvicted`, calls `record_and_transition_on_err`, and
2338    /// the helper must NOT clobber the Evicted state.
2339    #[test]
2340    fn record_and_transition_on_err_preserves_evicted_state() {
2341        let dispatcher = make_dispatcher_for_gate_test();
2342        let ws = Arc::new(LoadedWorkspace::new(
2343            crate::workspace::state::WorkspaceKey::new(
2344                std::path::PathBuf::from("/repo"),
2345                sqry_core::project::ProjectRootMode::GitRoot,
2346                0x1,
2347            ),
2348            false,
2349        ));
2350        ws.store_state(crate::workspace::state::WorkspaceState::Evicted);
2351        let err = DaemonError::WorkspaceEvicted {
2352            root: std::path::PathBuf::from("/repo"),
2353        };
2354        dispatcher.record_and_transition_on_err(&ws, &err);
2355        assert_eq!(
2356            ws.load_state(),
2357            crate::workspace::state::WorkspaceState::Evicted,
2358            "eviction-path WorkspaceEvicted must NOT transition state"
2359        );
2360    }
2361
2362    /// `daemon reset` path (cluster-G iter-3 BLOCKER 3 fix): reset
2363    /// fired AFTER the runner's top-of-loop gate but BEFORE the
2364    /// publish recheck. The runner converts the cancellation into
2365    /// `WorkspaceEvicted` while state is still `Rebuilding` (no
2366    /// eviction wrote `Evicted` because no eviction actually
2367    /// happened — `WorkspaceManager::reset`'s `Rebuilding` arm only
2368    /// flipped `rebuild_cancelled` and returned
2369    /// `ResetCancellationDispatched`). The helper MUST transition to
2370    /// `Unloaded` so the next `daemon load` recovers the workspace
2371    /// (without this fix, the workspace stays stuck in `Rebuilding`
2372    /// until `daemon stop && daemon start`).
2373    #[test]
2374    fn record_and_transition_on_err_unloads_reset_in_iteration_cancel() {
2375        let dispatcher = make_dispatcher_for_gate_test();
2376        let ws = Arc::new(LoadedWorkspace::new(
2377            crate::workspace::state::WorkspaceKey::new(
2378                std::path::PathBuf::from("/repo"),
2379                sqry_core::project::ProjectRootMode::GitRoot,
2380                0x1,
2381            ),
2382            false,
2383        ));
2384        // Runner is mid-iteration: state is `Rebuilding` (the
2385        // distinguisher between eviction and reset).
2386        ws.store_state(crate::workspace::state::WorkspaceState::Rebuilding);
2387        let err = DaemonError::WorkspaceEvicted {
2388            root: std::path::PathBuf::from("/repo"),
2389        };
2390        dispatcher.record_and_transition_on_err(&ws, &err);
2391        assert_eq!(
2392            ws.load_state(),
2393            crate::workspace::state::WorkspaceState::Unloaded,
2394            "in-iteration reset cancellation must transition Rebuilding → Unloaded; \
2395             without this, daemon reset → daemon load cannot recover"
2396        );
2397    }
2398
2399    /// Sanity: any other `DaemonError` (e.g. `WorkspaceOversize`,
2400    /// `Internal`) → `Failed` regardless of starting state.
2401    #[test]
2402    fn record_and_transition_on_err_failed_for_non_eviction_errors() {
2403        let dispatcher = make_dispatcher_for_gate_test();
2404        let ws = Arc::new(LoadedWorkspace::new(
2405            crate::workspace::state::WorkspaceKey::new(
2406                std::path::PathBuf::from("/repo"),
2407                sqry_core::project::ProjectRootMode::GitRoot,
2408                0x1,
2409            ),
2410            false,
2411        ));
2412        ws.store_state(crate::workspace::state::WorkspaceState::Rebuilding);
2413        let err = DaemonError::Internal(anyhow::anyhow!("plugin panic"));
2414        dispatcher.record_and_transition_on_err(&ws, &err);
2415        assert_eq!(
2416            ws.load_state(),
2417            crate::workspace::state::WorkspaceState::Failed,
2418            "non-eviction errors must transition to Failed"
2419        );
2420    }
2421}