sqry_daemon/rebuild.rs
1//! Rebuild dispatcher for the sqryd daemon (Task 7 Phase 7a + 7b1).
2//!
3//! The [`RebuildDispatcher`] owns the per-workspace call path that
4//! maps a debounced [`ChangeSet`] from the [`sqry_core::watch`] layer
5//! through to either a full (`build_unified_graph`) or incremental
6//! (`incremental_rebuild`) rebuild, and publishes the resulting
7//! [`CodeGraph`] via
8//! [`WorkspaceManager::publish_and_retain`](crate::workspace::WorkspaceManager::publish_and_retain).
9//!
10//! Phase 7a shipped the synchronous skeleton (coalesce-then-execute
11//! single-caller driver). Phase 7b1 adds the A2 §J.2 runner-role gate
12//! so concurrent callers serialise cleanly:
13//!
14//! - [`PendingRebuild::coalesce_with`] — A2 §J.2 lane-merge algebra
15//! (union of files, OR of `git_state_changed`, max of `enqueued_at`,
16//! full-rebuild-dominance merge on `git_change_class`).
17//! - [`RebuildDispatcher::handle_changes`] — Phase A (acquire-or-park
18//! runner role via [`LoadedWorkspace::rebuild_in_flight`] CAS under
19//! the [`LoadedWorkspace::rebuild_lane`] mutex) + Phase B (drain
20//! loop: eviction gate → pipeline → re-lock-and-drain →
21//! loop-or-exit).
22//! - [`RebuildDispatcher::execute_one_rebuild`] — one iteration of
23//! the pipeline: decide / estimate / reserve / execute
24//! (`spawn_blocking`) / publish / record success|failure.
25//! - [`DrainLoopSentinel`] — panic-safety recovery for
26//! `rebuild_in_flight`; the sole out-of-lane transition exception.
27//! - Hybrid decision: `git_change_class.requires_full_rebuild()` OR
28//! `changed_files.len() > incremental_threshold` OR
29//! `closure.len() > file_count * closure_limit_percent / 100` → Full;
30//! else Incremental.
31//! - Working-set estimate via
32//! [`crate::workspace::working_set_estimate`] populated with
33//! [`crate::config::ESTIMATE_STAGING_PER_FILE_BYTES`] +
34//! [`crate::config::ESTIMATE_FINAL_PER_FILE_BYTES`] heuristic consts.
35//!
36//! Phase 7b2 (out of scope here) adds the per-workspace tokio watcher
37//! event loop, the editor-pattern and git-scenario dispatcher-count
38//! matrix, and the §J.2 serialization stress. Phase 7c (also out of
39//! scope) wires the [`CancellationToken`] from
40//! [`LoadedWorkspace::rebuild_cancelled`] through
41//! [`incremental_rebuild`] at pass boundaries.
42//!
43//! # §J.2 runner-role invariant (Phase 7b1)
44//!
45//! At most one [`execute_one_rebuild`](RebuildDispatcher::execute_one_rebuild)
46//! executes at a time per workspace, and at most one additional
47//! [`PendingRebuild`] is parked in the lane awaiting the runner. A
48//! caller arriving while the runner is active coalesces its incoming
49//! `ChangeSet` into the lane (A2 §J.2 merge rules) and returns
50//! `Ok(())` without running the pipeline — the active runner will
51//! drain the lane at its next drain-loop iteration.
52//!
53//! All normal-path transitions of [`LoadedWorkspace::rebuild_in_flight`]
54//! happen while [`LoadedWorkspace::rebuild_lane`] is held.
55//! [`DrainLoopSentinel::drop`] is the sole recovery exception.
56//!
57//! # Eviction cooperation
58//!
59//! Every drain-loop iteration (including the first) checks
60//! `ws.rebuild_cancelled` at the top of the loop. If set, the runner
61//! abandons any parked pending, releases `rebuild_in_flight` under the
62//! lane, and returns [`DaemonError::WorkspaceEvicted`]. The same
63//! [`DaemonError::WorkspaceEvicted`] is surfaced by
64//! [`WorkspaceManager::reserve_rebuild`]'s Phase-1 membership +
65//! cancellation check — so a gate-check → `reserve_rebuild` race that
66//! eviction wins cannot publish into an orphaned workspace.
67//!
68//! # Lock order (§J.4)
69//!
70//! [`RebuildDispatcher`] is the sole acquirer of
71//! [`LoadedWorkspace::rebuild_lane`](crate::workspace::LoadedWorkspace::rebuild_lane).
72//! The canonical call path honours the A2 §J.4 total order:
73//!
74//! ```text
75//! workspaces (manager.lookup) → rebuild_lane → admission (reserve_rebuild)
76//! ```
77//!
78//! Rules enforced by this module:
79//! - `manager.lookup` acquires `workspaces.read()` as a *precondition*
80//! and drops the guard before touching `rebuild_lane`.
81//! - `rebuild_lane` is held **only** to coalesce/take `PendingRebuild`
82//! and to mutate `rebuild_in_flight`. The guard is dropped before
83//! [`WorkspaceManager::reserve_rebuild`] so §G.1's phase-1
84//! `workspaces.read()` does not nest under `rebuild_lane`.
85//! - `admission` is strictly innermost and is held only inside
86//! [`WorkspaceManager::reserve_rebuild`] / `publish_and_retain` /
87//! retention-reaper paths — never reacquired by the dispatcher.
88
89use std::{
90 collections::HashMap,
91 path::PathBuf,
92 sync::{
93 Arc, OnceLock,
94 atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
95 },
96 time::{Duration, Instant, SystemTime},
97};
98
99use sqry_core::graph::{
100 CodeGraph, GraphBuilderError,
101 unified::{
102 build::{
103 BuildConfig, CancellationToken, build_unified_graph_cancellable,
104 compute_reverse_dep_closure, incremental_rebuild,
105 },
106 memory::GraphMemorySize,
107 },
108};
109use sqry_core::plugin::PluginManager;
110use sqry_core::watch::{ChangeSet, GitChangeClass, LastIndexedGitState, SourceTreeWatcher};
111use tokio::task::JoinHandle;
112
113use crate::{
114 config::{DaemonConfig, ESTIMATE_FINAL_PER_FILE_BYTES, ESTIMATE_STAGING_PER_FILE_BYTES},
115 error::DaemonError,
116 workspace::{
117 LoadedWorkspace, PendingRebuild, WorkingSetInputs, WorkspaceKey, WorkspaceManager,
118 WorkspaceState, clone_err, working_set_estimate,
119 },
120};
121
122// ---------------------------------------------------------------------------
123// RebuildMode
124// ---------------------------------------------------------------------------
125
126/// Outcome of the hybrid decision function: is this rebuild run via
127/// [`build_unified_graph`] (Full) or [`incremental_rebuild`]
128/// (Incremental)?
129///
130/// Encoded as `u8` for the [`RebuildDispatcher::last_mode`] atomic
131/// observability surface. The encoding is stable across the
132/// dispatcher's lifetime but is not part of any on-wire contract —
133/// Task 8's IPC layer surfaces the mode only through structured
134/// tracing, not a raw byte.
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum RebuildMode {
137 /// Run `build_unified_graph` from scratch.
138 Full,
139 /// Run `incremental_rebuild` over the reverse-dep closure.
140 Incremental,
141}
142
143impl RebuildMode {
144 /// Encode for the [`AtomicU8`] slot: 0=None, 1=Full, 2=Incremental.
145 const fn as_u8(self) -> u8 {
146 match self {
147 Self::Full => 1,
148 Self::Incremental => 2,
149 }
150 }
151
152 /// Decode from the atomic slot. Returns `None` for `0` (never set)
153 /// or any unexpected discriminant (not observable through the
154 /// `store_last_mode` path but round-tripped defensively).
155 const fn from_u8(raw: u8) -> Option<Self> {
156 match raw {
157 1 => Some(Self::Full),
158 2 => Some(Self::Incremental),
159 _ => None,
160 }
161 }
162}
163
164// ---------------------------------------------------------------------------
165// PendingRebuild::coalesce_with (lane-merge algebra, A2 §J.2)
166// ---------------------------------------------------------------------------
167
168impl PendingRebuild {
169 /// Coalesce two queued rebuilds per A2 §J.2.
170 ///
171 /// Merge rules:
172 /// 1. **File union.** Deduplicated set of `changed_files` from
173 /// both sides, returned in lexicographic order so the merged
174 /// vector is deterministic across runs (important for
175 /// downstream decision-fork determinism and test assertions).
176 /// 2. **OR of `git_state_changed`.** If either side observed a
177 /// `.git/` event, the merged entry records a change.
178 /// 3. **Full-rebuild-dominance merge on `git_change_class`.**
179 /// If either side has a class with `requires_full_rebuild() ==
180 /// true` (currently `BranchSwitch` or `TreeDiverged`), the
181 /// merged class is canonically `Some(TreeDiverged)` — any
182 /// downstream dispatcher decision only checks
183 /// `requires_full_rebuild()`, so the specific discriminant
184 /// beyond the canonical "full trigger observed" marker is
185 /// unused. If neither side is a full trigger, the later-side
186 /// class wins (most-recent observation); `None` is absorbed
187 /// from either side when the other is `Some`.
188 /// 4. **`enqueued_at = max(self, later)`.** Later wins so the
189 /// lane reflects the most-recent activity for staleness /
190 /// tracing purposes.
191 /// 5. **`git_state_at_enqueue` — absorb-None, later wins
192 /// (Task 7 Phase 7b2).** When both sides carry a snapshot,
193 /// the newer observation wins (the merged coalesced pending
194 /// reflects the freshest valid baseline for the runner's
195 /// publish commit). When only one side carries a snapshot,
196 /// that one is preserved. Both `None` → `None`.
197 ///
198 /// `self` is treated as the earlier enqueue; `later` is the
199 /// newly-arrived enqueue the dispatcher is merging in.
200 ///
201 /// # Determinism
202 ///
203 /// The merged `changed_files` vector is sorted. `coalesce_with`
204 /// is commutative under the `requires_full_rebuild()` predicate
205 /// (full-rebuild dominance is symmetric). It is **not**
206 /// commutative under the raw `git_change_class` discriminant
207 /// when neither side is a full trigger — `Some(LocalCommit) ⊕
208 /// Some(Noise) = Some(Noise)` but `Some(Noise) ⊕ Some(LocalCommit)
209 /// = Some(LocalCommit)`. Nor is `git_state_at_enqueue` commutative
210 /// in general (later wins when both sides are `Some`); both
211 /// asymmetries are by design.
212 #[must_use]
213 pub fn coalesce_with(self, later: PendingRebuild) -> PendingRebuild {
214 // 1. File union, deterministic order.
215 let mut file_set: std::collections::BTreeSet<PathBuf> =
216 self.changes.changed_files.into_iter().collect();
217 file_set.extend(later.changes.changed_files);
218 let changed_files: Vec<PathBuf> = file_set.into_iter().collect();
219
220 // 2. OR of git_state_changed.
221 let git_state_changed = self.changes.git_state_changed || later.changes.git_state_changed;
222
223 // 3. Full-rebuild-dominance merge on git_change_class.
224 let git_change_class = merge_git_class(
225 self.changes.git_change_class,
226 later.changes.git_change_class,
227 );
228
229 // 4. enqueued_at = max.
230 let enqueued_at = self.enqueued_at.max(later.enqueued_at);
231
232 // 5. git_state_at_enqueue: absorb-None, later wins when both Some.
233 let git_state_at_enqueue = later.git_state_at_enqueue.or(self.git_state_at_enqueue);
234
235 PendingRebuild {
236 changes: ChangeSet {
237 changed_files,
238 git_state_changed,
239 git_change_class,
240 },
241 enqueued_at,
242 git_state_at_enqueue,
243 }
244 }
245}
246
247/// Merge two `git_change_class` observations per §J.2
248/// "full-rebuild-dominance" semantics.
249///
250/// See [`PendingRebuild::coalesce_with`] for the merge contract.
251fn merge_git_class(a: Option<GitChangeClass>, b: Option<GitChangeClass>) -> Option<GitChangeClass> {
252 let requires_full = a.is_some_and(GitChangeClass::requires_full_rebuild)
253 || b.is_some_and(GitChangeClass::requires_full_rebuild);
254 if requires_full {
255 return Some(GitChangeClass::TreeDiverged);
256 }
257 // later wins for non-full; fallback to earlier if later is None.
258 b.or(a)
259}
260
261// ---------------------------------------------------------------------------
262// Decision fork
263// ---------------------------------------------------------------------------
264
265/// Hybrid rebuild-mode decision per plan line 1422 and Amendment 2
266/// §J.
267///
268/// Full-rebuild triggers (in evaluation order):
269///
270/// 1. [`ChangeSet::requires_full_rebuild`] — a committed git state
271/// change that mandates a full rebuild (currently `BranchSwitch`
272/// or `TreeDiverged`; `LocalCommit` / `Noise` do not force full).
273/// 2. `changed_files.len() > config.incremental_threshold` — too many
274/// files for incremental economics to pay off.
275/// 3. `closure.len() > graph.file_count() * closure_limit_percent /
276/// 100` — the reverse-dep closure would touch more files than the
277/// full-rebuild cost; take the full path instead.
278///
279/// Otherwise, Incremental. Empty `ChangeSet` ([`ChangeSet::is_empty`])
280/// returns Incremental as a legitimate no-op rebuild (Phase 3e
281/// supports this explicitly — see `sqry-core/src/graph/unified/build/
282/// incremental.rs:2842` for the empty-rebuild regression test).
283///
284/// Closure math resolves only paths already present in the graph's
285/// file registry. Paths not yet registered (new files) contribute
286/// zero to the closure but are still passed through to
287/// [`incremental_rebuild`]; its internal `phase3e_discover_new_file_paths`
288/// (Phase 3e, `incremental.rs:935`) handles them via a first-class
289/// new-file discovery leg. This is intentional: forcing Full on every
290/// new file would regress Phase 3e's shipped behavior.
291#[must_use]
292pub fn decide_mode(config: &DaemonConfig, changes: &ChangeSet, graph: &CodeGraph) -> RebuildMode {
293 if changes.is_empty() {
294 return RebuildMode::Incremental;
295 }
296 if changes.requires_full_rebuild() {
297 return RebuildMode::Full;
298 }
299 if changes.changed_files.len() > config.incremental_threshold {
300 return RebuildMode::Full;
301 }
302
303 // Resolve registered paths to FileId for closure math. Unresolved
304 // paths (new files) simply don't contribute to the closure.
305 let file_ids: Vec<_> = changes
306 .changed_files
307 .iter()
308 .filter_map(|p| graph.files().get(p))
309 .collect();
310
311 let closure = compute_reverse_dep_closure(&file_ids, graph);
312 let file_count = graph.files().len();
313 // Integer math: > file_count * pct / 100 → Full. `closure_limit_percent`
314 // is validated as 1..=100 in `DaemonConfig::validate`.
315 let limit = file_count.saturating_mul(config.closure_limit_percent as usize) / 100;
316
317 if closure.len() > limit {
318 RebuildMode::Full
319 } else {
320 RebuildMode::Incremental
321 }
322}
323
324// ---------------------------------------------------------------------------
325// Working-set estimate
326// ---------------------------------------------------------------------------
327
328/// Compute the A2 §G.6 working-set estimate for the given rebuild
329/// mode.
330///
331/// Formula (see
332/// [`crate::workspace::working_set_estimate`] for the multipliers):
333///
334/// | Mode | `new_graph_final_estimate` | `staging_overhead` | `interner_snapshot_bytes` |
335/// |-------------|---------------------------------------------------------------------------|------------------------------------------------------------------|------------------------------------------|
336/// | Full | `prior.heap_bytes()` | `file_count * ESTIMATE_STAGING_PER_FILE_BYTES` | `prior.strings().heap_bytes()` |
337/// | Incremental | `prior.heap_bytes() + closure.len() * ESTIMATE_FINAL_PER_FILE_BYTES` | `closure_file_count * ESTIMATE_STAGING_PER_FILE_BYTES` | `prior.strings().heap_bytes()` |
338///
339/// Where `closure_file_count` for Incremental uses
340/// `changes.changed_files.len()` (an upper bound — we compute the
341/// exact reverse-dep closure size only when we already decided to run
342/// incremental, so the caller may pass the file count directly rather
343/// than re-running closure math here).
344fn compute_working_set_estimate(prior: &CodeGraph, changes: &ChangeSet, mode: RebuildMode) -> u64 {
345 let prior_bytes = prior.heap_bytes() as u64;
346 let interner_bytes = prior.strings().heap_bytes() as u64;
347 let file_count = prior.files().len() as u64;
348
349 let (final_estimate, staging_file_count) = match mode {
350 RebuildMode::Full => (prior_bytes, file_count),
351 RebuildMode::Incremental => {
352 let n = changes.changed_files.len() as u64;
353 let final_est =
354 prior_bytes.saturating_add(n.saturating_mul(ESTIMATE_FINAL_PER_FILE_BYTES));
355 (final_est, n)
356 }
357 };
358
359 let staging = staging_file_count.saturating_mul(ESTIMATE_STAGING_PER_FILE_BYTES);
360
361 working_set_estimate(WorkingSetInputs {
362 new_graph_final_estimate: final_estimate,
363 staging_overhead: staging,
364 interner_snapshot_bytes: interner_bytes,
365 })
366}
367
368// ---------------------------------------------------------------------------
369// Test hooks — gate + capture (Task 7 Phase 7b2)
370// ---------------------------------------------------------------------------
371//
372// Both hooks are gated behind `std::sync::OnceLock`. In production the
373// `OnceLock::get()` fast path is a single relaxed atomic load per
374// `execute_one_rebuild` iteration that returns `None` and short-circuits
375// the hook entirely. Tests install the hooks once at harness setup;
376// subsequent dispatcher iterations see the hook and behave
377// deterministically.
378
379/// **Test-only** gate for §J.2 serialization stress tests.
380///
381/// When installed, each `execute_one_rebuild` iteration awaits
382/// [`Self::release`] while [`Self::hold`] is non-zero. The test fires
383/// `release.notify_one()` once per iteration it wants to unblock, and
384/// the gate atomically decrements `hold` on release. When `hold`
385/// reaches zero, subsequent iterations pass through without waiting.
386///
387/// # Lost-wakeup safety
388///
389/// [`gate_check`](RebuildDispatcher::gate_check) obtains the
390/// `notified()` future BEFORE re-checking `hold`, matching the 7b1
391/// `Notify` handshake pattern (`tests/rebuild_runner_gate.rs` inline
392/// comments). A test that first sets `hold = N` then fires N
393/// `notify_one()` calls is guaranteed to release the first N
394/// iterations without lost wakeups.
395#[doc(hidden)]
396#[derive(Debug)]
397pub struct TestGate {
398 /// Number of iterations remaining that must wait on `release`.
399 /// Initialised by the test (e.g. `AtomicUsize::new(1)` to block
400 /// only the first iteration); decremented on each gate release.
401 pub hold: AtomicUsize,
402 /// Notify fired by the test driver to release one waiting
403 /// iteration.
404 pub release: tokio::sync::Notify,
405}
406
407/// **Test-only** per-iteration capture for §J.2 file-union correctness
408/// assertions.
409///
410/// When installed, each `execute_one_rebuild` iteration appends a
411/// [`CapturedIteration`] to [`Self::iterations`] AFTER the mode
412/// decision and BEFORE the gate check — so the test observes the
413/// exact `ChangeSet` consumed by each iteration regardless of whether
414/// the gate stalls that iteration.
415///
416/// Task 7 Phase 7c extension: three additional fields for the
417/// eviction-during-rebuild abort test drive the
418/// [`RebuildDispatcher::post_reservation_check`] hook. The hook fires
419/// AFTER `reserve_rebuild` returns `Ok` and BEFORE the rebuild
420/// pipeline starts, so tests can observe a live reservation and race
421/// eviction against it.
422#[doc(hidden)]
423#[derive(Debug, Default)]
424pub struct TestCapture {
425 /// Records one entry per `execute_one_rebuild` invocation, in
426 /// order. Never cleared by the dispatcher; the test inspects it
427 /// after synchronising on the dispatchers completion.
428 pub iterations: parking_lot::Mutex<Vec<CapturedIteration>>,
429
430 /// Counter of iterations that must stall at the post-reservation
431 /// hook. `0` = no hold; `> 0` = one iteration will stall per unit.
432 /// Armed via [`Self::arm_post_reservation_hold`], released via
433 /// [`Self::release_post_reservation`].
434 pub post_reservation_hold: AtomicUsize,
435
436 /// Fired when [`RebuildDispatcher::post_reservation_check`] is
437 /// entered. The test awaits [`Self::wait_until_post_reservation`]
438 /// to synchronise on "rebuild has reserved bytes and is about to
439 /// run".
440 pub post_reservation_reached: tokio::sync::Notify,
441
442 /// Fired by the test driver to release one waiting iteration. Uses
443 /// the same handshake pattern as [`TestGate`] (`notified()` future
444 /// armed before re-checking `hold`).
445 pub post_reservation_release: tokio::sync::Notify,
446
447 /// Task 7 Phase 7c feat iter-1 (Codex MAJOR 2): counter for every
448 /// `execute_one_rebuild` iteration where the §5e
449 /// `workspaces.read()` recheck (or map-missing recheck) observed
450 /// cancellation AFTER a successful pipeline run and BEFORE
451 /// publish. Fires when eviction raced during a pipeline that
452 /// completed before the forwarder's first poll.
453 pub publish_path_evictions: AtomicUsize,
454 /// Counter for every `execute_one_rebuild` iteration where the
455 /// sqry-core pipeline itself returned
456 /// `GraphBuilderError::Cancelled` from a pass boundary (the
457 /// forwarder had time to flip the token before the pipeline
458 /// completed).
459 pub pass_boundary_cancellations: AtomicUsize,
460
461 /// Test-only switch (iter-1): when `true`, the next
462 /// `execute_rebuild` call does NOT spawn a
463 /// [`spawn_cancellation_forwarder`]. Tests use this to
464 /// deterministically force the §5e publish-path recheck
465 /// (without a forwarder to flip the token, the pipeline
466 /// completes Ok even though `ws.rebuild_cancelled = true`, and
467 /// the §5e recheck picks up the eviction).
468 ///
469 /// Production leaves this `false`; the forwarder always runs.
470 pub suppress_forwarder: AtomicBool,
471
472 /// Test-only switch (iter-2 Codex MAJOR 1): when `true`,
473 /// `execute_rebuild` synchronously calls `token.cancel()`
474 /// immediately after spawning (or electing to suppress) the
475 /// forwarder and BEFORE dispatching `spawn_blocking`. This
476 /// guarantees the pipeline's very first `cancellation.check()?`
477 /// observes the cancelled token, forcing the pass-boundary
478 /// cancellation path deterministically.
479 ///
480 /// Production leaves this `false`.
481 pub precancel_token_for_pass_boundary: AtomicBool,
482
483 /// Durable flag (iter-2 Codex MAJOR 2): set by
484 /// [`RebuildDispatcher::post_reservation_check`] when the hook
485 /// fires. Paired with `post_reservation_reached` notify to
486 /// provide lost-wakeup-safe synchronisation: tests that arm
487 /// `post_reservation_hold` AFTER a rebuild has already reached
488 /// the hook (rare but possible under fast scheduling) still see
489 /// the flag and do not block waiting for a signal that already
490 /// fired.
491 ///
492 /// Cleared by [`Self::reset_post_reservation_reached`] for
493 /// multi-iteration tests.
494 pub post_reservation_reached_flag: AtomicBool,
495}
496
497impl TestCapture {
498 /// Construct a zero-initialised capture. Same as
499 /// [`Default::default`]; named constructor for clarity in tests.
500 #[must_use]
501 pub fn new() -> Self {
502 Self::default()
503 }
504
505 /// Read the §5e publish-path-recheck eviction counter (iter-1).
506 #[must_use]
507 pub fn publish_path_evictions(&self) -> usize {
508 self.publish_path_evictions.load(Ordering::Acquire)
509 }
510
511 /// Read the pass-boundary cancellation counter (iter-1).
512 #[must_use]
513 pub fn pass_boundary_cancellations(&self) -> usize {
514 self.pass_boundary_cancellations.load(Ordering::Acquire)
515 }
516
517 /// Arm a single post-reservation stall. The next
518 /// `execute_one_rebuild` iteration that reaches
519 /// [`RebuildDispatcher::post_reservation_check`] will block until
520 /// [`Self::release_post_reservation`] is called. Stacks: calling
521 /// this N times blocks N iterations.
522 pub fn arm_post_reservation_hold(&self) {
523 self.post_reservation_hold.fetch_add(1, Ordering::AcqRel);
524 }
525
526 /// Release exactly one stalled iteration. Matches the `TestGate`
527 /// release semantics — the held iteration wakes, decrements
528 /// `post_reservation_hold` one more step via the loop, and
529 /// continues to `execute_rebuild`. Safe to call before an
530 /// iteration arms (lost-wakeup-safe via the handshake in
531 /// [`RebuildDispatcher::post_reservation_check`]).
532 pub fn release_post_reservation(&self) {
533 self.post_reservation_release.notify_one();
534 }
535
536 /// Await the next `execute_one_rebuild` iteration reaching the
537 /// post-reservation hook. Returns as soon as the hook fires.
538 ///
539 /// Iter-2 Codex MAJOR 2: lost-wakeup-safe. If the hook has
540 /// already fired (`post_reservation_reached_flag == true`),
541 /// returns immediately without awaiting. Otherwise arms the
542 /// `notified()` future BEFORE re-checking the flag (handshake
543 /// pattern) so a signal that fires between arm and recheck is
544 /// still observed.
545 pub async fn wait_until_post_reservation(&self) {
546 if self.post_reservation_reached_flag.load(Ordering::Acquire) {
547 return;
548 }
549 let notified = self.post_reservation_reached.notified();
550 if self.post_reservation_reached_flag.load(Ordering::Acquire) {
551 return;
552 }
553 notified.await;
554 }
555
556 /// Reset the durable reached-flag so a second iteration of a
557 /// multi-iteration test (e.g. the 100-iter stress) can await a
558 /// fresh hook firing. Test-only.
559 pub fn reset_post_reservation_reached(&self) {
560 self.post_reservation_reached_flag
561 .store(false, Ordering::Release);
562 }
563}
564
565/// One captured `execute_one_rebuild` iteration (test-only).
566#[doc(hidden)]
567#[derive(Debug, Clone)]
568pub struct CapturedIteration {
569 /// The ChangeSet as-consumed by this iteration (post-coalesce).
570 pub changeset: ChangeSet,
571 /// Mode decided for this iteration.
572 pub mode: RebuildMode,
573 /// Git-state snapshot attached to the consumed `PendingRebuild`.
574 /// `None` for direct (non-bridge) callers.
575 pub git_state_at_enqueue: Option<LastIndexedGitState>,
576 /// Wall-clock when the iteration started.
577 pub started_at: Instant,
578}
579
580// ---------------------------------------------------------------------------
581// Watcher bridge registry (Task 7 Phase 7b2)
582// ---------------------------------------------------------------------------
583
584/// One per-workspace watcher + dispatcher task pair.
585///
586/// Both `JoinHandle`s are **observability-only**: they do not own the
587/// task lifetimes. Dropping a `WatcherEntry` detaches both tasks
588/// (Tokio's `JoinHandle::drop` is detach, not cancel — see
589/// `tokio::task::JoinHandle` docs).
590///
591/// Shutdown is cooperative through two independent signals:
592/// 1. `ws.rebuild_cancelled = true` (set by eviction) → the
593/// cancellable watcher returns `Ok(None)` → blocking thread exits
594/// → tokio mpsc sender drops → async task's `rx.recv()` returns
595/// `None` → async task exits.
596/// 2. `dispatcher.handle_changes_with_git_state` returns
597/// `Err(WorkspaceEvicted)` → async task exits → drops receiver →
598/// next `blocking_send` on the blocking thread fails → blocking
599/// thread exits.
600///
601/// Before exiting, the async task flips [`Self::live`] to `false`
602/// (so a concurrent `ensure_watching` call treats the entry as
603/// draining) and then calls
604/// [`RebuildDispatcher::reap_watcher`](crate::RebuildDispatcher::reap_watcher)
605/// with its [`Self::generation`] to remove the entry from the map. The
606/// generation token ensures a late old task cannot erase a newer
607/// replacement entry after a fast evict+reload.
608struct WatcherEntry {
609 /// Monotonic generation token. Assigned at construction time from
610 /// `RebuildDispatcher::next_watcher_generation`; used by
611 /// `reap_watcher` to distinguish "my entry" from "a newer entry
612 /// for the same WorkspaceKey".
613 generation: u64,
614 /// `true` while the async task is processing dispatches. Flipped
615 /// to `false` as the first action of the post-loop cleanup
616 /// sequence, BEFORE `reap_watcher` is called. Fast-path callers
617 /// of `ensure_watching` read this as the authoritative liveness
618 /// signal (rather than `JoinHandle::is_finished`, which returns
619 /// `false` while the task is still inside its final cleanup
620 /// closure).
621 live: Arc<AtomicBool>,
622 /// Handle to the async dispatcher task. Stored only to keep the
623 /// task attached for the entry's lifetime — never awaited or
624 /// aborted. Shutdown is cooperative (see struct-level docs).
625 #[allow(dead_code)]
626 async_handle: JoinHandle<()>,
627 /// Handle to the blocking watcher thread. Stored only for
628 /// attachment — dropping it detaches the task, which continues to
629 /// completion via cooperative cancellation.
630 #[allow(dead_code)]
631 blocking_handle: JoinHandle<()>,
632}
633
634// ---------------------------------------------------------------------------
635// RebuildDispatcher
636// ---------------------------------------------------------------------------
637
638/// Sole acquirer of [`LoadedWorkspace::rebuild_lane`] (A2 §J).
639///
640/// Constructed once at daemon startup with a shared
641/// [`Arc<PluginManager>`]. Every [`Self::handle_changes`] call honours
642/// the canonical 7-step reservation call path (plan line 1495) and
643/// the §J.4 lock-order contract documented on the module.
644pub struct RebuildDispatcher {
645 manager: Arc<WorkspaceManager>,
646 config: Arc<DaemonConfig>,
647 plugins: Arc<PluginManager>,
648 build_config: BuildConfig,
649 dispatched_count: AtomicU64,
650 last_mode: AtomicU8,
651
652 /// Per-workspace watcher+dispatcher task pairs (Task 7 Phase 7b2).
653 /// Populated by [`Self::ensure_watching`]; pruned by
654 /// [`Self::reap_watcher`] as each async task exits.
655 watchers: parking_lot::Mutex<HashMap<WorkspaceKey, WatcherEntry>>,
656 /// Monotonic counter used to tag each `WatcherEntry` with a unique
657 /// generation, enabling `reap_watcher`'s compare-and-remove.
658 next_watcher_generation: AtomicU64,
659
660 /// Test-only synchronisation gate. `None` in production; tests
661 /// install once at harness setup. See [`TestGate`] docstring.
662 #[doc(hidden)]
663 test_gate: OnceLock<Arc<TestGate>>,
664 /// Test-only per-iteration capture recorder. `None` in production.
665 /// See [`TestCapture`] docstring.
666 #[doc(hidden)]
667 test_capture: OnceLock<Arc<TestCapture>>,
668}
669
670impl std::fmt::Debug for RebuildDispatcher {
671 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
672 // `PluginManager` does not implement `Debug` because its
673 // internal registry is non-trivial; skip it here. `BuildConfig`
674 // also does not implement `Debug` on HEAD. Report the shape of
675 // the dispatcher instead of its full contents.
676 f.debug_struct("RebuildDispatcher")
677 .field(
678 "dispatched_count",
679 &self.dispatched_count.load(Ordering::Relaxed),
680 )
681 .field("last_mode", &self.last_mode())
682 .field("memory_limit_mb", &self.config.memory_limit_mb)
683 .finish_non_exhaustive()
684 }
685}
686
687impl RebuildDispatcher {
688 /// Construct a fresh dispatcher sharing the daemon's manager,
689 /// config, and plugin manager.
690 ///
691 /// `build_config` defaults to [`BuildConfig::default`]; Task 14
692 /// calibration may override this from [`DaemonConfig`] knobs.
693 #[must_use]
694 pub fn new(
695 manager: Arc<WorkspaceManager>,
696 config: Arc<DaemonConfig>,
697 plugins: Arc<PluginManager>,
698 ) -> Arc<Self> {
699 Arc::new(Self {
700 manager,
701 config,
702 plugins,
703 build_config: BuildConfig::default(),
704 dispatched_count: AtomicU64::new(0),
705 last_mode: AtomicU8::new(0),
706 watchers: parking_lot::Mutex::new(HashMap::new()),
707 next_watcher_generation: AtomicU64::new(0),
708 test_gate: OnceLock::new(),
709 test_capture: OnceLock::new(),
710 })
711 }
712
713 // -----------------------------------------------------------------
714 // Test-only hook installers (Task 7 Phase 7b2)
715 // -----------------------------------------------------------------
716
717 /// **Test-only.** Install a [`TestGate`] that stalls the FIRST
718 /// `hold` iterations of `execute_one_rebuild` until the test
719 /// driver fires `gate.release.notify_one()`. Returns `Err(gate)` on
720 /// the original argument if a gate was already installed (one-shot
721 /// per dispatcher lifetime).
722 ///
723 /// Zero production overhead — production callers never install a
724 /// gate and `gate_check` short-circuits on the `OnceLock::get()
725 /// == None` fast path.
726 #[doc(hidden)]
727 pub fn install_test_gate(&self, gate: Arc<TestGate>) -> Result<(), Arc<TestGate>> {
728 self.test_gate.set(gate)
729 }
730
731 /// **Test-only.** Install a [`TestCapture`] recorder that pushes
732 /// one [`CapturedIteration`] per `execute_one_rebuild` invocation.
733 /// Returns `Err(capture)` on the original argument if a capture was
734 /// already installed.
735 #[doc(hidden)]
736 pub fn install_test_capture(&self, capture: Arc<TestCapture>) -> Result<(), Arc<TestCapture>> {
737 self.test_capture.set(capture)
738 }
739
740 /// Internal gate check, called by `execute_one_rebuild` after
741 /// mode selection and before admission reservation (iter-2 §4
742 /// placement rationale: the gate must NOT hold an admission
743 /// reservation across a synthetic test stall).
744 ///
745 /// # Handshake pattern
746 ///
747 /// Obtain the `notified()` future BEFORE rechecking `hold`. If we
748 /// load `hold > 0`, THEN enter `notified().await` only if `hold`
749 /// is still `> 0` after the future is armed. This matches the 7b1
750 /// pattern documented in `tests/rebuild_runner_gate.rs`.
751 async fn gate_check(&self) {
752 let Some(gate) = self.test_gate.get() else {
753 return;
754 };
755 if gate.hold.load(Ordering::Acquire) == 0 {
756 return;
757 }
758 let notified = gate.release.notified();
759 tokio::pin!(notified);
760 // Re-check AFTER arming the future: a concurrent
761 // notify_one() between the first load and the await point
762 // would otherwise be lost. Since `notified()` retroactively
763 // matches any pending permit, re-checking `hold` after
764 // arming is sufficient.
765 if gate.hold.load(Ordering::Acquire) > 0 {
766 notified.await;
767 gate.hold.fetch_sub(1, Ordering::AcqRel);
768 }
769 }
770
771 /// Cumulative count of successful dispatches across this
772 /// dispatcher's lifetime.
773 ///
774 /// Observability surface for the Task 7 §I dispatcher-count
775 /// matrix (arrives in 7b). Not reset on workspace eviction; the
776 /// counter is daemon-process-scoped.
777 #[must_use]
778 pub fn dispatched_count(&self) -> u64 {
779 self.dispatched_count.load(Ordering::Relaxed)
780 }
781
782 /// Most-recent mode selected by [`Self::handle_changes`], or
783 /// `None` if no dispatch has happened yet.
784 ///
785 /// Observability surface for tests and tracing spans.
786 #[must_use]
787 pub fn last_mode(&self) -> Option<RebuildMode> {
788 RebuildMode::from_u8(self.last_mode.load(Ordering::Relaxed))
789 }
790
791 /// §J.2 runner-role handoff + drain-loop orchestrator (Phase 7b1).
792 ///
793 /// Callers (the Phase 7b2 watcher task, direct test drivers,
794 /// future IPC `workspace/force_rebuild`) invoke this for each
795 /// debounced [`ChangeSet`]. Exactly one concurrent invocation
796 /// per workspace runs the full pipeline; others coalesce their
797 /// [`ChangeSet`] into the lane and return `Ok(())` promptly.
798 ///
799 /// Preconditions (not part of the §J.4 ordered sequence):
800 /// - The workspace must already be registered with the manager
801 /// (i.e. [`WorkspaceManager::get_or_load`] succeeded earlier).
802 /// A caller whose [`WorkspaceManager::lookup`] returns `None`
803 /// sees [`DaemonError::WorkspaceEvicted`].
804 ///
805 /// # Phase A (acquire-or-park) — single lane lock scope
806 ///
807 /// 1. Lock [`LoadedWorkspace::rebuild_lane`].
808 /// 2. Coalesce incoming `changes` with any prior
809 /// [`PendingRebuild`] parked in the lane (A2 §J.2 merge rules).
810 /// 3. CAS [`LoadedWorkspace::rebuild_in_flight`] `false → true`.
811 /// - On CAS success: we own the runner role; keep coalesced
812 /// pending as `current`, drop lane.
813 /// - On CAS failure: another runner is active; park coalesced
814 /// in lane, drop lane, return `Ok(())`.
815 ///
816 /// Holding the lane across the in-flight CAS makes the acquire
817 /// race-free: every in-flight transition happens under the lane,
818 /// so no two runners ever claim the role simultaneously.
819 ///
820 /// # Phase B (drain loop) — sentinel-protected
821 ///
822 /// Loop, armed with [`DrainLoopSentinel`] for panic-safety:
823 ///
824 /// 1. **Top-of-loop eviction gate.** Check
825 /// [`LoadedWorkspace::rebuild_cancelled`]. If set, take and
826 /// drop any parked pending (workspace is gone), release
827 /// `rebuild_in_flight` under the lane, disarm sentinel, return
828 /// [`DaemonError::WorkspaceEvicted`].
829 /// 2. Call [`Self::execute_one_rebuild`] on `current`. Records
830 /// `record_success` / `record_failure` on the workspace; the
831 /// result flows into `last_result`.
832 /// 3. Re-lock lane. If a new `PendingRebuild` is parked: take it
833 /// as the next `current`, loop to step 1 (in-flight stays
834 /// true). If the lane is empty: release `rebuild_in_flight`
835 /// under the lane, disarm sentinel, return `last_result`.
836 ///
837 /// # §J.4 lock order
838 ///
839 /// `manager.lookup` takes `workspaces.read()` and drops before
840 /// `rebuild_lane`. `rebuild_lane` is dropped before
841 /// [`WorkspaceManager::reserve_rebuild`] (which re-takes
842 /// `workspaces.read() → admission.lock()` internally). No
843 /// `rebuild_lane` ↔ `admission` nesting ever occurs.
844 ///
845 /// # Errors
846 ///
847 /// - [`DaemonError::WorkspaceEvicted`] when the top-of-loop gate
848 /// observes `rebuild_cancelled == true`, or when
849 /// [`WorkspaceManager::reserve_rebuild`]'s Phase-1 check finds
850 /// the workspace missing from the manager map or cancelled.
851 /// - [`DaemonError::MemoryBudgetExceeded`] when admission cannot
852 /// satisfy a reservation after eviction.
853 /// - [`DaemonError::WorkspaceBuildFailed`] when either full or
854 /// incremental rebuild fails (including `spawn_blocking` join
855 /// failures).
856 ///
857 /// Only the FINAL drain-loop iteration's result is surfaced to
858 /// the caller. Per-iteration success/failure is recorded on the
859 /// workspace (`ws.last_error`, `ws.last_good_at`,
860 /// `ws.retry_count`) via [`Self::execute_one_rebuild`].
861 pub async fn handle_changes(
862 &self,
863 key: &WorkspaceKey,
864 changes: ChangeSet,
865 ) -> Result<(), DaemonError> {
866 // Plain handle_changes — no git-state snapshot attached. The
867 // runner will consume this PendingRebuild but will NOT advance
868 // `ws.last_indexed_git_state` because `git_state_at_enqueue`
869 // is `None`. Used by direct callers (tests, future IPC
870 // `workspace/force_rebuild`) that don't own a watcher.
871 self.handle_changes_inner(
872 key,
873 PendingRebuild {
874 changes,
875 enqueued_at: Instant::now(),
876 git_state_at_enqueue: None,
877 },
878 )
879 .await
880 }
881
882 /// Like [`Self::handle_changes`] but attaches a
883 /// [`LastIndexedGitState`] snapshot to the enqueued
884 /// [`PendingRebuild`]. When the runner (either us directly or a
885 /// concurrent runner we parked against) successfully publishes
886 /// the graph derived from this `PendingRebuild`, it commits the
887 /// snapshot into [`LoadedWorkspace::last_indexed_git_state`]
888 /// as the new classifier baseline.
889 ///
890 /// Task 7 Phase 7b2 — used exclusively by the per-workspace
891 /// watcher bridge spawned by [`Self::ensure_watching`]. Plain
892 /// [`Self::handle_changes`] remains the API for callers without a
893 /// watcher-owned git-state snapshot.
894 ///
895 /// # §J.4 lock order, error taxonomy, gate, sentinel
896 ///
897 /// Identical to [`Self::handle_changes`] — Phase A
898 /// (acquire-or-park under lane + CAS) and Phase B (drain loop)
899 /// behave identically. The only difference is the
900 /// `git_state_at_enqueue: Some(git_state)` construction.
901 ///
902 /// # Errors
903 ///
904 /// Same as [`Self::handle_changes`] —
905 /// [`DaemonError::WorkspaceEvicted`],
906 /// [`DaemonError::MemoryBudgetExceeded`],
907 /// [`DaemonError::WorkspaceBuildFailed`].
908 pub async fn handle_changes_with_git_state(
909 &self,
910 key: &WorkspaceKey,
911 changes: ChangeSet,
912 git_state: LastIndexedGitState,
913 ) -> Result<(), DaemonError> {
914 self.handle_changes_inner(
915 key,
916 PendingRebuild {
917 changes,
918 enqueued_at: Instant::now(),
919 git_state_at_enqueue: Some(git_state),
920 },
921 )
922 .await
923 }
924
925 /// Shared Phase A (acquire-or-park) + Phase B (drain loop) body
926 /// for [`Self::handle_changes`] and
927 /// [`Self::handle_changes_with_git_state`]. The public methods
928 /// construct the incoming [`PendingRebuild`] and thread it here.
929 async fn handle_changes_inner(
930 &self,
931 key: &WorkspaceKey,
932 incoming: PendingRebuild,
933 ) -> Result<(), DaemonError> {
934 // --- Precondition: lookup Arc<LoadedWorkspace>. ---
935 let ws: Arc<LoadedWorkspace> =
936 self.manager
937 .lookup(key)
938 .ok_or_else(|| DaemonError::WorkspaceEvicted {
939 root: key.source_root.clone(),
940 })?;
941
942 // ================================================================
943 // Phase A — acquire-runner-or-park (single lane lock scope).
944 //
945 // Holding `rebuild_lane` across the `rebuild_in_flight` CAS is
946 // the load-bearing invariant: every in-flight transition
947 // happens under the lane, so no two runners ever claim the
948 // role simultaneously.
949 // ================================================================
950 let mut current: PendingRebuild = {
951 let mut lane_guard = ws.rebuild_lane.lock().await;
952
953 // Coalesce incoming with any prior parked pending.
954 let coalesced = match lane_guard.take() {
955 Some(prior) => prior.coalesce_with(incoming),
956 None => incoming,
957 };
958
959 // Try to acquire the runner role.
960 match ws.rebuild_in_flight.compare_exchange(
961 false,
962 true,
963 Ordering::AcqRel,
964 Ordering::Acquire,
965 ) {
966 Ok(_) => coalesced, // We own the runner role.
967 Err(_) => {
968 // Another runner is active — park coalesced in
969 // the lane; that runner will drain it at its next
970 // drain-loop iteration. Return Ok(()) without
971 // executing the pipeline.
972 *lane_guard = Some(coalesced);
973 return Ok(());
974 }
975 }
976 // lane_guard dropped at end of scope.
977 };
978
979 // ================================================================
980 // Phase B — drain loop.
981 //
982 // Sentinel guarantees `rebuild_in_flight` is released if the
983 // loop unwinds abnormally (documented narrow race on the
984 // unwind path — plugin panics inside `spawn_blocking` are
985 // caught by `execute_rebuild` and mapped to `Err`, so the
986 // only realistic unwind trigger is a runtime-level failure).
987 // ================================================================
988 let mut sentinel = DrainLoopSentinel {
989 ws: Arc::clone(&ws),
990 armed: true,
991 };
992
993 // `last_result` is assigned by `execute_one_rebuild` on every
994 // iteration before any exit path that reads it. The eviction
995 // gate exits via its own `return Err(...)` without reading
996 // `last_result`, and the drain-exit `return last_result`
997 // branch is only reachable after an iteration has assigned.
998 let mut last_result: Result<(), DaemonError>;
999 loop {
1000 // --- Top-of-loop cancellation/eviction gate ---
1001 //
1002 // `rebuild_cancelled` is set either by eviction (under
1003 // `workspaces.write()` in `execute_eviction` BEFORE
1004 // `workspaces.remove(key)`) or by `daemon/cancel_rebuild`
1005 // (user-initiated cancel of an in-flight rebuild).
1006 //
1007 // `swap(false)` atomically reads AND clears the flag so a
1008 // user-cancel does not poison subsequent rebuilds. For
1009 // eviction this is harmless — the workspace is removed
1010 // from the map and this `LoadedWorkspace` instance will
1011 // not be reused.
1012 if ws.rebuild_cancelled.swap(false, Ordering::AcqRel) {
1013 let mut lane_guard = ws.rebuild_lane.lock().await;
1014 // Abandon any parked pending — the workspace is gone.
1015 let _dropped: Option<PendingRebuild> = lane_guard.take();
1016 ws.rebuild_in_flight.store(false, Ordering::Release);
1017 sentinel.armed = false;
1018 // Cluster-G iter-2 BLOCKER 3: distinguish an eviction
1019 // (the eviction path already set state to `Evicted`
1020 // under `workspaces.write()` BEFORE flipping the
1021 // flag) from a `daemon/reset` cancellation (which
1022 // leaves the state as `Rebuilding`). For reset, the
1023 // runner is responsible for the post-cancel state
1024 // transition — `record_and_transition_on_err` no-ops
1025 // on `WorkspaceEvicted`, so without this branch the
1026 // workspace would stay stuck in `Rebuilding` forever
1027 // (codex iter-1 review).
1028 if ws.load_state() == WorkspaceState::Rebuilding {
1029 ws.store_state(WorkspaceState::Unloaded);
1030 }
1031 return Err(DaemonError::WorkspaceEvicted {
1032 root: key.source_root.clone(),
1033 });
1034 }
1035
1036 // --- Execute iteration ---
1037 //
1038 // `execute_one_rebuild` records success/failure on the
1039 // workspace before returning, so workspace-level
1040 // observability is threaded through every iteration even
1041 // when the drain loop continues past an error.
1042 last_result = self
1043 .execute_one_rebuild(key, &ws, current.changes, current.git_state_at_enqueue)
1044 .await;
1045
1046 // --- Drain-or-exit decision (under lane lock) ---
1047 //
1048 // Releasing `rebuild_in_flight` under the lane is
1049 // load-bearing: a caller about to park in the lane
1050 // observes the release atomically with the empty-lane
1051 // snapshot, so parked pending cannot be stranded.
1052 let next: Option<PendingRebuild> = {
1053 let mut lane_guard = ws.rebuild_lane.lock().await;
1054 match lane_guard.take() {
1055 Some(next) => Some(next),
1056 None => {
1057 ws.rebuild_in_flight.store(false, Ordering::Release);
1058 sentinel.armed = false;
1059 None
1060 }
1061 }
1062 // lane_guard dropped at end of scope.
1063 };
1064
1065 match next {
1066 Some(n) => current = n,
1067 None => return last_result,
1068 }
1069 }
1070 }
1071
1072 /// Run a single pipeline iteration: decide mode, compute
1073 /// working-set estimate, reserve admission headroom, execute the
1074 /// rebuild on a blocking thread, publish atomically. Records
1075 /// `record_success` on successful publish or `record_failure` on
1076 /// any error path.
1077 ///
1078 /// Called by [`Self::handle_changes`]'s Phase B drain loop for
1079 /// each coalesced [`PendingRebuild`]. The drain loop may invoke
1080 /// this multiple times if new pending arrives between iterations;
1081 /// each invocation is independent from the caller's perspective.
1082 ///
1083 /// # Error paths
1084 ///
1085 /// All three non-panic error paths call `ws.record_failure` with
1086 /// a cloned [`DaemonError`] before returning:
1087 /// - [`WorkspaceManager::reserve_rebuild`] Err (e.g.
1088 /// [`DaemonError::MemoryBudgetExceeded`],
1089 /// [`DaemonError::WorkspaceEvicted`] from the Phase-1 check).
1090 /// - [`Self::execute_rebuild`] Err (pipeline failure — plugin
1091 /// error, `spawn_blocking` join panic mapped to
1092 /// [`DaemonError::WorkspaceBuildFailed`]).
1093 ///
1094 /// A panic inside [`WorkspaceManager::publish_and_retain`] — which
1095 /// is documented as infallible — unwinds past these match arms.
1096 /// Admission state is restored by `RollbackGuard` + reservation
1097 /// RAII drop, but `record_failure` is NOT called. That is
1098 /// acceptable as defense-in-depth only: in practice a
1099 /// `publish_and_retain` panic means the daemon is in
1100 /// damage-control territory where missing workspace-level error
1101 /// bookkeeping is a minor concern.
1102 ///
1103 /// On successful publish, calls `ws.record_success` which:
1104 /// - Stamps `last_good_at = SystemTime::now()`.
1105 /// - Clears `last_error`.
1106 /// - Resets `retry_count` to 0.
1107 /// - Also increments `self.dispatched_count` for the §I
1108 /// observability matrix.
1109 ///
1110 /// Additionally (Task 7 Phase 7b2): when `git_state_at_enqueue`
1111 /// is `Some`, writes the snapshot into
1112 /// [`LoadedWorkspace::last_indexed_git_state`] AFTER the
1113 /// `publish_and_retain` call so the classifier baseline advances
1114 /// only with actual publish consumption. `None` entries (direct
1115 /// non-watcher callers) leave the baseline untouched.
1116 async fn execute_one_rebuild(
1117 &self,
1118 key: &WorkspaceKey,
1119 ws: &Arc<LoadedWorkspace>,
1120 changes: ChangeSet,
1121 git_state_at_enqueue: Option<LastIndexedGitState>,
1122 ) -> Result<(), DaemonError> {
1123 let prior_graph: Arc<CodeGraph> = ws.graph.load_full();
1124 let mode = decide_mode(&self.config, &changes, &prior_graph);
1125 self.store_last_mode(mode);
1126
1127 // Task 7 Phase 7c: transition to Rebuilding at iteration entry.
1128 // Queries keep serving the prior ArcSwap snapshot (A2 §G.5).
1129 // Placement BEFORE gate_check is intentional: the `Rebuilding`
1130 // lifecycle covers the synthetic test stall too, so
1131 // `classify_for_serve_returns_fresh_for_rebuilding_workspace`
1132 // observes a real state. The store is atomic; a concurrent
1133 // `execute_eviction` that wins the race overwrites this with
1134 // `Evicted` under `workspaces.write()` — `classify_for_serve`'s
1135 // map-missing arm wins, so this is harmless.
1136 ws.store_state(WorkspaceState::Rebuilding);
1137
1138 // Task 7 Phase 7b2: record the iteration input for tests
1139 // BEFORE the optional gate stall, so the test sees the input
1140 // even when the gate blocks. No-op when `test_capture` is not
1141 // installed (production).
1142 if let Some(cap) = self.test_capture.get() {
1143 cap.iterations.lock().push(CapturedIteration {
1144 changeset: changes.clone(),
1145 mode,
1146 git_state_at_enqueue: git_state_at_enqueue.clone(),
1147 started_at: Instant::now(),
1148 });
1149 }
1150
1151 // Task 7 Phase 7b2: optional test-only gate. Stalls the
1152 // iteration until the test driver releases it. Production
1153 // callers never install a gate — this is a single atomic
1154 // load + short-circuit.
1155 self.gate_check().await;
1156
1157 let estimate = compute_working_set_estimate(&prior_graph, &changes, mode);
1158
1159 // Admission reservation. `reserve_rebuild` itself performs the
1160 // Phase-1 membership + cancellation check (Task 7 Phase 7b1)
1161 // which can surface `WorkspaceEvicted`. Either way, record on
1162 // the workspace so `last_error` / `retry_count` reflect the
1163 // failure — EXCEPT on the eviction path (Task 7 Phase 7c: the
1164 // workspace is gone; polluting telemetry with a "failure"
1165 // record is misleading).
1166 let reservation = match self.manager.reserve_rebuild(key, estimate) {
1167 Ok(r) => r,
1168 Err(e) => {
1169 self.record_and_transition_on_err(ws, &e);
1170 return Err(e);
1171 }
1172 };
1173
1174 // Task 7 Phase 7c: post-reservation hook fires HERE with the
1175 // reservation alive. Tests use this to snapshot admission
1176 // state mid-rebuild (e.g., assert `reserved_bytes > 0`) and
1177 // race eviction. Production is a single atomic load + return.
1178 self.post_reservation_check().await;
1179
1180 // Pipeline execution (`spawn_blocking` catches plugin panics
1181 // internally and maps them to `WorkspaceBuildFailed`).
1182 //
1183 // Task 7 Phase 7c: `execute_rebuild` now wires a
1184 // `CancellationToken` to `ws.rebuild_cancelled` via
1185 // `spawn_cancellation_forwarder`. A mid-pipeline eviction sets
1186 // `rebuild_cancelled = true`, the forwarder flips the token,
1187 // the pipeline returns `GraphBuilderError::Cancelled` →
1188 // mapped to `DaemonError::WorkspaceEvicted`.
1189 let new_graph: CodeGraph = match self
1190 .execute_rebuild(key, ws, &prior_graph, mode, changes)
1191 .await
1192 {
1193 Ok(g) => g,
1194 Err(e) => {
1195 // Reservation refunds via RAII on drop at return.
1196 drop(reservation);
1197 // Task 7 Phase 7c feat iter-1 (Codex MAJOR 2):
1198 // increment the pass-boundary cancellation counter
1199 // when a `WorkspaceEvicted` surfaces from the
1200 // pipeline. Lets tests distinguish the two
1201 // cancellation surfaces (§5e publish-recheck vs
1202 // sqry-core pass-boundary).
1203 if matches!(e, DaemonError::WorkspaceEvicted { .. })
1204 && let Some(cap) = self.test_capture.get()
1205 {
1206 cap.pass_boundary_cancellations
1207 .fetch_add(1, Ordering::AcqRel);
1208 }
1209 self.record_and_transition_on_err(ws, &e);
1210 return Err(e);
1211 }
1212 };
1213
1214 // Task 7 Phase 7c §5e: hold `workspaces.read()` across the
1215 // final cancellation/membership re-check AND
1216 // `publish_and_retain`. `execute_eviction` holds
1217 // `workspaces.write()` for its entire critical section, so
1218 // the RwLock makes this publish atomic with respect to
1219 // eviction: either eviction has fully completed (our
1220 // re-checks observe cancellation or map-missing) or eviction
1221 // cannot start until we drop the read guard. Pattern mirrors
1222 // `WorkspaceManager::get_or_load` (manager.rs:687-761, Codex
1223 // Task 6 Phase 6b iter-2 MAJOR). Lock order §J.4:
1224 // `workspaces -> admission`; `publish_and_retain` takes
1225 // `admission` internally, which nests correctly.
1226 let publish_result = {
1227 let workspaces_guard = self.manager.workspaces_read();
1228
1229 if ws.rebuild_cancelled.load(Ordering::Acquire) {
1230 drop(workspaces_guard);
1231 drop(reservation);
1232 // Task 7 Phase 7c feat iter-1 (Codex MAJOR 2):
1233 // counter — §5e recheck surface.
1234 if let Some(cap) = self.test_capture.get() {
1235 cap.publish_path_evictions.fetch_add(1, Ordering::AcqRel);
1236 }
1237 // NO record_failure / state transition: eviction owns
1238 // those.
1239 return Err(DaemonError::WorkspaceEvicted {
1240 root: key.source_root.clone(),
1241 });
1242 }
1243 if !workspaces_guard.contains_key(key) {
1244 drop(workspaces_guard);
1245 drop(reservation);
1246 if let Some(cap) = self.test_capture.get() {
1247 cap.publish_path_evictions.fetch_add(1, Ordering::AcqRel);
1248 }
1249 return Err(DaemonError::WorkspaceEvicted {
1250 root: key.source_root.clone(),
1251 });
1252 }
1253
1254 // `G_daemon_control_plane.md` §3.5 caller-migration —
1255 // execute_one_rebuild (production caller 2). On
1256 // post-build oversize, propagate the typed error
1257 // upstream; the reservation's RAII Drop refunds bytes.
1258 //
1259 // Cluster-G iter-2 BLOCKER 2: also transition the
1260 // workspace to `Failed` so it doesn't stay stuck in
1261 // `Rebuilding`. The success branch below handles the
1262 // happy-path `Loaded` transition; the error branch
1263 // previously only refunded bytes and returned, leaving
1264 // the workspace observable as `Rebuilding` forever
1265 // (codex iter-1 review — only `daemon reset` could
1266 // recover, and the reset path itself was also broken).
1267 let (_token, published_arc) =
1268 match self.manager.publish_and_retain(reservation, ws, new_graph) {
1269 Ok((token, arc)) => (token, arc),
1270 Err(e) => {
1271 self.record_and_transition_on_err(ws, &e);
1272 return Err(e);
1273 }
1274 };
1275 published_arc
1276 // workspaces_guard drops at end of this block; eviction
1277 // can proceed immediately afterward.
1278 };
1279
1280 // Task 7 Phase 7b2: advance the classifier baseline when the
1281 // consumed PendingRebuild carried a watcher-captured snapshot.
1282 // This happens AFTER `publish_and_retain` + read-guard drop,
1283 // which is safe: the write is a per-workspace atomic on an
1284 // Arc the caller holds. A concurrent eviction stamps its own
1285 // state transition but does not invalidate this field — the
1286 // next `classify_for_serve` observes `WorkspaceEvicted` from
1287 // the map-missing arm.
1288 if let Some(git_state) = git_state_at_enqueue {
1289 *ws.last_indexed_git_state.write() = Some(git_state);
1290 }
1291
1292 // Success bookkeeping.
1293 ws.record_success(SystemTime::now());
1294 // Task 7 Phase 7c: transition Rebuilding -> Loaded. A
1295 // concurrent eviction that wins the race after our read guard
1296 // drop overwrites this with Evicted — harmless, see header
1297 // comment above the read-guard block.
1298 ws.store_state(WorkspaceState::Loaded);
1299 self.dispatched_count.fetch_add(1, Ordering::Relaxed);
1300 let _ = publish_result;
1301
1302 Ok(())
1303 }
1304
1305 /// Task 7 Phase 7c helper: update workspace state + bookkeeping on
1306 /// a rebuild-failure error path.
1307 ///
1308 /// - `WorkspaceEvicted`:
1309 /// - **State already `Evicted`** → NO-OP. Eviction wrote
1310 /// `Evicted` under `workspaces.write()` BEFORE flipping
1311 /// `rebuild_cancelled`; clobbering it with `Failed` would
1312 /// destroy that contract.
1313 /// - **State still `Rebuilding`** → cluster-G iter-3 fix.
1314 /// This is a `daemon reset` cancellation that fired AFTER
1315 /// the runner's top-of-loop gate but BEFORE the publish
1316 /// recheck (`rebuild.rs:1229`). The runner converts that
1317 /// into `WorkspaceEvicted`, but no eviction actually
1318 /// happened — `WorkspaceManager::reset` only set
1319 /// `rebuild_cancelled = true` and returned
1320 /// `ResetCancellationDispatched`. Without the transition
1321 /// below, the workspace stays stuck in `Rebuilding` and a
1322 /// subsequent `daemon reset` returns
1323 /// `ResetCancellationDispatched` again — operator must
1324 /// `daemon stop && daemon start` to recover.
1325 ///
1326 /// We transition to `Unloaded` (the same destination
1327 /// `WorkspaceManager::reset` would have written if reset
1328 /// had won the race against the iteration). The map entry
1329 /// and `pinned` bit are preserved by the in-place
1330 /// `store_state`; the next `daemon reset` (or the caller's
1331 /// retry after the documented `retry_after_ms = 250`) sees
1332 /// `Unloaded` and is a no-op, and `daemon load` then
1333 /// recovers the workspace cheaply.
1334 ///
1335 /// - Any other `DaemonError` → `record_failure` + transition to
1336 /// `WorkspaceState::Failed`. This is the entry point to A2
1337 /// §G.7's stale-serve flow for that workspace.
1338 fn record_and_transition_on_err(&self, ws: &LoadedWorkspace, err: &DaemonError) {
1339 if matches!(err, DaemonError::WorkspaceEvicted { .. }) {
1340 // Cluster-G iter-3 BLOCKER 3 fix: differentiate
1341 // eviction-path from reset-path cancellations by reading
1342 // the state the eviction path would have written. See
1343 // doc-comment above for the full rationale.
1344 if ws.load_state() == WorkspaceState::Rebuilding {
1345 ws.store_state(WorkspaceState::Unloaded);
1346 }
1347 return;
1348 }
1349 ws.record_failure(clone_err(err));
1350 ws.store_state(WorkspaceState::Failed);
1351 }
1352
1353 /// Drive the actual sqry-core rebuild pipeline on a blocking
1354 /// thread, with cooperative cancellation via a forwarder task
1355 /// that mirrors `ws.rebuild_cancelled` into a
1356 /// [`CancellationToken`].
1357 ///
1358 /// Sync-in-async bridge: `build_unified_graph_cancellable` and
1359 /// `incremental_rebuild` are CPU-bound + use rayon internally, so
1360 /// they must not block a tokio runtime worker thread. The blocking
1361 /// closure owns the cloned `Arc<PluginManager>` /
1362 /// [`BuildConfig`] / `Arc<CodeGraph>` / cancellation token so it
1363 /// outlives the awaited JoinHandle.
1364 ///
1365 /// Task 7 Phase 7c: a tokio task (`spawn_cancellation_forwarder`)
1366 /// polls `ws.rebuild_cancelled` at `CANCEL_FORWARDER_POLL_MS`
1367 /// cadence; the first `true` observation calls `token.cancel()`.
1368 /// The forwarder is `abort()`ed after the rebuild future returns
1369 /// regardless of outcome — polling stops immediately.
1370 ///
1371 /// # Error mapping
1372 ///
1373 /// - `GraphBuilderError::Cancelled` → `DaemonError::WorkspaceEvicted`
1374 /// (cancellation only fires on eviction per the forwarder
1375 /// contract).
1376 /// - Any other `GraphBuilderError` → `DaemonError::WorkspaceBuildFailed`
1377 /// with a human-readable reason.
1378 /// - `spawn_blocking` join errors (panic inside the closure) →
1379 /// `WorkspaceBuildFailed`.
1380 async fn execute_rebuild(
1381 &self,
1382 key: &WorkspaceKey,
1383 ws: &Arc<LoadedWorkspace>,
1384 prior: &Arc<CodeGraph>,
1385 mode: RebuildMode,
1386 changes: ChangeSet,
1387 ) -> Result<CodeGraph, DaemonError> {
1388 let root = key.source_root.clone();
1389 let plugins = Arc::clone(&self.plugins);
1390 let cfg = self.build_config.clone();
1391 let prior_for_blocking = Arc::clone(prior);
1392 let root_for_err = root.clone();
1393
1394 // Task 7 Phase 7c: fresh cancellation token per iteration so
1395 // a cancelled token from a prior run cannot permanently break
1396 // subsequent rebuilds on the same workspace.
1397 let token = CancellationToken::new();
1398 // Task 7 Phase 7c feat iter-1: optional forwarder suppression
1399 // for §5e publish-path recheck tests. Production builds and
1400 // tests without a `TestCapture` always spawn the forwarder;
1401 // only a test with `suppress_forwarder=true` skips it.
1402 let forwarder_handle = if self
1403 .test_capture
1404 .get()
1405 .is_some_and(|cap| cap.suppress_forwarder.load(Ordering::Acquire))
1406 {
1407 None
1408 } else {
1409 Some(spawn_cancellation_forwarder(Arc::clone(ws), token.clone()))
1410 };
1411 // Task 7 Phase 7c feat iter-2 (Codex MAJOR 1): optional
1412 // synchronous pre-cancel for pass-boundary-determinism
1413 // tests. When armed, the token is already cancelled by the
1414 // time spawn_blocking dispatches the pipeline, so the very
1415 // first `cancellation.check()?` inside
1416 // `build_unified_graph_cancellable` /
1417 // `incremental_rebuild` fires. Forces the pass-boundary
1418 // cancellation surface without racing the forwarder.
1419 if self.test_capture.get().is_some_and(|cap| {
1420 cap.precancel_token_for_pass_boundary
1421 .load(Ordering::Acquire)
1422 }) {
1423 token.cancel();
1424 }
1425
1426 let token_for_blocking = token.clone();
1427 let join_result = tokio::task::spawn_blocking(move || {
1428 execute_rebuild_blocking(
1429 &root,
1430 &prior_for_blocking,
1431 mode,
1432 changes,
1433 &plugins,
1434 &cfg,
1435 &token_for_blocking,
1436 )
1437 })
1438 .await;
1439
1440 // Task 7 Phase 7c: stop the forwarder unconditionally once the
1441 // rebuild future completes. (Iter-1 Option<JoinHandle>: if
1442 // forwarder suppression is armed in TestCapture, handle is
1443 // None — nothing to abort.)
1444 if let Some(handle) = forwarder_handle {
1445 handle.abort();
1446 }
1447
1448 match join_result {
1449 Ok(Ok(graph)) => Ok(graph),
1450 Ok(Err(e)) => Err(e),
1451 Err(join_err) => Err(DaemonError::WorkspaceBuildFailed {
1452 root: root_for_err,
1453 reason: format!("spawn_blocking join error: {join_err}"),
1454 }),
1455 }
1456 }
1457
1458 /// Task 7 Phase 7c: test-only observation + stall point inside
1459 /// `execute_one_rebuild`, fired AFTER `reserve_rebuild` returns
1460 /// Ok and BEFORE `execute_rebuild` runs the blocking pipeline.
1461 ///
1462 /// Production builds with no `TestCapture` installed see a single
1463 /// atomic load + return. With a capture installed, each
1464 /// invocation:
1465 ///
1466 /// 1. Fires `post_reservation_reached.notify_waiters()` so tests
1467 /// awaiting `wait_until_post_reservation()` return.
1468 /// 2. If `post_reservation_hold.load > 0`, stalls on
1469 /// `post_reservation_release.notified()` — matches the 7b1
1470 /// `Notify` handshake pattern (arm `notified()` future BEFORE
1471 /// re-checking `hold`) to close the lost-wakeup window.
1472 async fn post_reservation_check(&self) {
1473 let Some(cap) = self.test_capture.get() else {
1474 return;
1475 };
1476 // Iter-2 Codex MAJOR 2: set the durable reached-flag BEFORE
1477 // firing the notify. A test that awaits via
1478 // `wait_until_post_reservation` observes either the flag
1479 // (fast path) or the notify (slow path); the flag closes the
1480 // lost-wakeup hole where the hook fires before the test
1481 // arms its await.
1482 cap.post_reservation_reached_flag
1483 .store(true, Ordering::Release);
1484 cap.post_reservation_reached.notify_waiters();
1485 if cap.post_reservation_hold.load(Ordering::Acquire) == 0 {
1486 return;
1487 }
1488 let notified = cap.post_reservation_release.notified();
1489 if cap.post_reservation_hold.load(Ordering::Acquire) > 0 {
1490 notified.await;
1491 // Decrement once per release.
1492 cap.post_reservation_hold.fetch_sub(1, Ordering::AcqRel);
1493 }
1494 }
1495
1496 /// Encode the mode into the atomic observability slot.
1497 fn store_last_mode(&self, mode: RebuildMode) {
1498 self.last_mode.store(mode.as_u8(), Ordering::Relaxed);
1499 }
1500
1501 // -----------------------------------------------------------------
1502 // Per-workspace watcher bridge (Task 7 Phase 7b2)
1503 // -----------------------------------------------------------------
1504
1505 /// Idempotently spawn (if not already active) the per-workspace
1506 /// watcher + async dispatcher task pair for `(key, ws, root)`.
1507 ///
1508 /// # Idempotence
1509 ///
1510 /// Looks up `watchers[key]`:
1511 /// - If present AND the entry's `live` flag is `true`, returns
1512 /// `Ok(())` without spawning — an active pair is already
1513 /// producing dispatches.
1514 /// - If present AND `live == false` (the async task has started
1515 /// its post-loop cleanup), prunes the stale entry and spawns a
1516 /// new pair with a fresh generation.
1517 /// - If absent, spawns a new pair.
1518 ///
1519 /// # Shutdown lifecycle
1520 ///
1521 /// Each pair is cooperatively shut down by:
1522 /// - Eviction (`ws.rebuild_cancelled = true` propagates through
1523 /// the cancellable watcher → blocking thread exits → mpsc
1524 /// sender drops → async task exits), OR
1525 /// - The async task observing `DaemonError::WorkspaceEvicted`
1526 /// from `handle_changes_with_git_state` → async task exits →
1527 /// receiver drops → blocking thread's next send fails → exits.
1528 ///
1529 /// The async task's last action is `live.store(false)` followed
1530 /// by [`Self::reap_watcher`]`(&key, generation)` — removing the
1531 /// entry from the map via compare-and-remove on the generation.
1532 ///
1533 /// # Placement constraint
1534 ///
1535 /// This method lives on `RebuildDispatcher`, NOT `WorkspaceManager`
1536 /// (per the 7b2 RESUME_PROMPT constraint): coupling watcher
1537 /// lifecycle into `manager.get_or_load` would pollute the
1538 /// manager's responsibilities and create a dispatcher↔manager
1539 /// cycle. Test harnesses (and Task 9's future daemon bootstrap)
1540 /// call this method explicitly after `get_or_load` succeeds.
1541 ///
1542 /// # Errors
1543 ///
1544 /// - [`DaemonError::Io`] if
1545 /// [`sqry_core::watch::SourceTreeWatcher::new`] fails (typical
1546 /// cause: `.gitignore` read error or
1547 /// `notify::RecommendedWatcher::new` failure).
1548 pub async fn ensure_watching(
1549 self: &Arc<Self>,
1550 key: &WorkspaceKey,
1551 ws: Arc<LoadedWorkspace>,
1552 root: PathBuf,
1553 ) -> Result<(), DaemonError> {
1554 // Hold `self.watchers` across the ENTIRE operation — check,
1555 // watcher construction, spawn, and insert. Releasing the
1556 // lock between the liveness check and the insert would
1557 // permit two concurrent callers for the same `WorkspaceKey`
1558 // to both pass the "no live entry" check and both spawn
1559 // watcher pairs; the later insert would replace the tracked
1560 // entry without stopping the earlier spawned pair (Tokio
1561 // `JoinHandle::drop` detaches, per the `WatcherEntry`
1562 // docstring), producing duplicate rebuild dispatches and
1563 // leaked watcher resources. This issue was flagged by the
1564 // 7b2 iter-0 feat review (MAJOR).
1565 //
1566 // Holding a `parking_lot::Mutex` across sync operations is
1567 // the intended usage. The critical section covers:
1568 // 1. Fast-path liveness check (atomic read).
1569 // 2. `next_watcher_generation.fetch_add` (atomic).
1570 // 3. `SourceTreeWatcher::new` — bounded sync I/O
1571 // (.gitignore read + notify subscribe). Typically 1–5
1572 // ms; does NOT block on any lock held by code that
1573 // might reacquire `self.watchers`.
1574 // 4. Git-state priming — one `RwLock` write on the
1575 // workspace's `last_indexed_git_state`. Distinct lock
1576 // from `watchers`; no lock-order violation.
1577 // 5. `tokio::sync::mpsc::channel` — sync allocation.
1578 // 6. `tokio::spawn` + `tokio::task::spawn_blocking` —
1579 // enqueue-only, do not yield. Task bodies may later
1580 // call `reap_watcher`, which re-acquires
1581 // `self.watchers`; parking_lot blocks the executor
1582 // thread briefly until we release here. Acceptable
1583 // because the window is sub-ms after spawn.
1584 // 7. `HashMap::insert` (instant).
1585 //
1586 // No `.await` points exist between steps 1–7, so the async
1587 // function's single poll progresses synchronously while
1588 // the lock is held; the lock is released at function
1589 // return (including the error-return path for
1590 // `SourceTreeWatcher::new` failures via `?`).
1591 let mut watchers = self.watchers.lock();
1592
1593 // Step 1 — Fast-path liveness check.
1594 if let Some(entry) = watchers.get(key)
1595 && entry.live.load(Ordering::Acquire)
1596 {
1597 return Ok(());
1598 }
1599
1600 // Step 2 — Allocate a monotonic generation token.
1601 let generation = self.next_watcher_generation.fetch_add(1, Ordering::Relaxed);
1602
1603 // Step 3 — Construct the watcher (sync I/O). Errors bubble
1604 // up via `?`; the locked guard is dropped on return.
1605 let watcher = SourceTreeWatcher::new(&root).map_err(|e| {
1606 DaemonError::Io(std::io::Error::other(format!(
1607 "failed to create watcher for {}: {e:#}",
1608 root.display()
1609 )))
1610 })?;
1611
1612 // Step 4 — Prime `ws.last_indexed_git_state` with the
1613 // CURRENT git snapshot. Without this, the classifier has no
1614 // baseline on the first debounce window and every benign
1615 // `.git/` event would produce `git_change_class = None`
1616 // (which the bridge cannot distinguish from a real
1617 // divergence without a baseline). Priming is a single
1618 // atomic write — if a real git operation is in flight
1619 // concurrently, the snapshot captures whichever side wins;
1620 // subsequent debounce windows compare against it and
1621 // classify correctly.
1622 //
1623 // Only PRIME if no baseline exists yet. A respawn (after
1624 // evict+reload) should NOT overwrite a baseline that the
1625 // prior watcher successfully committed, because that
1626 // baseline reflects the last PUBLISHED graph's git state —
1627 // overwriting it would lose the classifier's memory.
1628 {
1629 let mut baseline = ws.last_indexed_git_state.write();
1630 if baseline.is_none() {
1631 *baseline = Some(watcher.git_state().current_state());
1632 }
1633 }
1634
1635 // Step 5 — Bounded tokio mpsc: capacity 16 is generous —
1636 // the async consumer drains items at dispatch rate and the
1637 // blocking producer already consolidates many filesystem
1638 // events into a single ChangeSet before sending.
1639 let (tx, rx) = tokio::sync::mpsc::channel::<(ChangeSet, LastIndexedGitState)>(16);
1640
1641 let debounce = Duration::from_millis(self.config.debounce_ms);
1642 // 100 ms is the design's recommended cancellation-poll cadence:
1643 // tight enough that an evicted workspace's watcher thread
1644 // terminates promptly, loose enough not to burn CPU on a
1645 // quiet repo.
1646 let cancel_poll_period = Duration::from_millis(100);
1647
1648 // Liveness flag shared between the stored entry and the async
1649 // task's post-loop cleanup. Flipped to `false` BEFORE
1650 // reap_watcher is called so `ensure_watching` re-calls for
1651 // the same key observe "drained" rather than "live".
1652 let live = Arc::new(AtomicBool::new(true));
1653
1654 // Step 6a — Spawn the blocking watcher thread.
1655 let blocking_handle = {
1656 let ws = Arc::clone(&ws);
1657 tokio::task::spawn_blocking(move || {
1658 watch_loop_blocking(&watcher, &tx, &ws, debounce, cancel_poll_period);
1659 })
1660 };
1661
1662 // Step 6b — Spawn the async dispatcher task.
1663 let async_handle = {
1664 let dispatcher = Arc::clone(self);
1665 let key = key.clone();
1666 let ws = Arc::clone(&ws);
1667 let live_for_task = Arc::clone(&live);
1668 tokio::spawn(async move {
1669 dispatch_loop_async(&dispatcher, &key, &ws, rx).await;
1670 // Mark ourselves as draining BEFORE reap_watcher so a
1671 // concurrent ensure_watching observes the correct
1672 // liveness state.
1673 live_for_task.store(false, Ordering::Release);
1674 dispatcher.reap_watcher(&key, generation);
1675 })
1676 };
1677
1678 // Step 7 — Prune any stale entry + insert the new one.
1679 // Prune covers the case where a prior entry existed with
1680 // `live == false` (observed by step 1 falling through).
1681 // `remove` is idempotent when no entry exists.
1682 watchers.remove(key);
1683 watchers.insert(
1684 key.clone(),
1685 WatcherEntry {
1686 generation,
1687 live,
1688 async_handle,
1689 blocking_handle,
1690 },
1691 );
1692 // `watchers` drops here → lock released.
1693 Ok(())
1694 }
1695
1696 /// Remove the watcher entry for `key` if and only if the stored
1697 /// entry's generation equals `my_generation`. Called by the
1698 /// per-workspace async task as its LAST action before exit.
1699 ///
1700 /// # Why compare-and-remove
1701 ///
1702 /// A fast evict+reload sequence can result in:
1703 /// 1. Old watcher A (gen 0) exits cooperatively.
1704 /// 2. Before A's closure finishes, `ensure_watching` is called
1705 /// again for the same key and observes A's `live == false`,
1706 /// prunes A's entry, and inserts new watcher B (gen 1).
1707 /// 3. A's closure reaches its final statement — `reap_watcher`.
1708 ///
1709 /// Without a generation check, A's reap would delete B's entry.
1710 /// Compare-and-remove guarantees A's reap is a no-op because
1711 /// `entry.generation == 1 != 0 == my_generation`.
1712 ///
1713 /// # Test observability
1714 ///
1715 /// Exposed as `pub(crate)` because external callers should never
1716 /// need to force-reap a watcher — cooperative shutdown via
1717 /// `rebuild_cancelled` triggers reap automatically. Tests that
1718 /// assert on the map size use [`Self::watchers_len`].
1719 pub(crate) fn reap_watcher(&self, key: &WorkspaceKey, my_generation: u64) {
1720 let mut watchers = self.watchers.lock();
1721 if let Some(entry) = watchers.get(key)
1722 && entry.generation == my_generation
1723 {
1724 watchers.remove(key);
1725 }
1726 }
1727
1728 /// **Test-only** size observation on the watchers map.
1729 ///
1730 /// Used by `rebuild_watcher_shutdown.rs` to assert that the
1731 /// eviction cascade reaches quiescence (both tasks exit AND the
1732 /// map entry is reaped). Production callers should consult
1733 /// workspace-level `status()` rather than the dispatcher's
1734 /// bookkeeping.
1735 #[doc(hidden)]
1736 #[must_use]
1737 pub fn watchers_len(&self) -> usize {
1738 self.watchers.lock().len()
1739 }
1740}
1741
1742// ---------------------------------------------------------------------------
1743// Watcher bridge loops (Task 7 Phase 7b2)
1744// ---------------------------------------------------------------------------
1745//
1746// These are free functions (not `RebuildDispatcher` methods) so the
1747// closures passed to `tokio::task::spawn_blocking` and `tokio::spawn`
1748// in `ensure_watching` own only the state they need. The blocking
1749// loop is `Send + 'static` (captures the `SourceTreeWatcher`, mpsc
1750// sender, workspace Arc, and two Durations); the async loop is
1751// `Send + 'static` (captures the dispatcher Arc, workspace key+Arc,
1752// and the mpsc receiver).
1753
1754/// Blocking watcher loop — runs on `tokio::task::spawn_blocking`.
1755///
1756/// Repeatedly calls
1757/// [`SourceTreeWatcher::wait_for_changes_cancellable`](sqry_core::watch::SourceTreeWatcher::wait_for_changes_cancellable)
1758/// using the workspace's last-indexed baseline as the classifier
1759/// reference. On each non-empty `ChangeSet`, captures the current
1760/// git state via `watcher.git_state().current_state()` and forwards
1761/// the `(ChangeSet, LastIndexedGitState)` pair to the async
1762/// dispatcher task via tokio mpsc.
1763///
1764/// # Termination
1765///
1766/// Exits on:
1767/// - `wait_for_changes_cancellable` returns `Ok(None)` — eviction
1768/// observed cooperatively via `ws.rebuild_cancelled`.
1769/// - `wait_for_changes_cancellable` returns `Err` — notify channel
1770/// disconnect (unrecoverable); logged at error level.
1771/// - `tx.blocking_send` returns `Err` — async receiver dropped
1772/// (normal shutdown); logged at debug.
1773fn watch_loop_blocking(
1774 watcher: &SourceTreeWatcher,
1775 tx: &tokio::sync::mpsc::Sender<(ChangeSet, LastIndexedGitState)>,
1776 ws: &LoadedWorkspace,
1777 debounce: Duration,
1778 cancel_poll_period: Duration,
1779) {
1780 loop {
1781 let last_git = ws.last_indexed_git_state.read().clone();
1782 match watcher.wait_for_changes_cancellable(
1783 debounce,
1784 last_git.as_ref(),
1785 &ws.rebuild_cancelled,
1786 cancel_poll_period,
1787 ) {
1788 Ok(None) => {
1789 tracing::info!(
1790 target: "sqry_daemon::watch",
1791 workspace = %ws.key.source_root.display(),
1792 "watcher cancelled; terminating blocking loop"
1793 );
1794 break;
1795 }
1796 Err(e) => {
1797 tracing::error!(
1798 target: "sqry_daemon::watch",
1799 workspace = %ws.key.source_root.display(),
1800 error = %e,
1801 "watcher channel disconnected; terminating blocking loop"
1802 );
1803 break;
1804 }
1805 Ok(Some(cs)) if cs.is_empty() => {
1806 // Empty ChangeSet — watcher debounced a burst of
1807 // events that all got filtered (editor temps,
1808 // gitignored paths, .git/ internals). Do not wake
1809 // the async side; loop and wait for the next batch.
1810 continue;
1811 }
1812 Ok(Some(cs)) if cs.changed_files.is_empty() && !cs.requires_full_rebuild() => {
1813 // Git-state-only change whose classifier output does
1814 // NOT require a full rebuild (Noise or LocalCommit
1815 // class). A2 §B mandates: these classes are reported
1816 // for telemetry but do not trigger a rebuild by
1817 // themselves — a real commit that changed the working
1818 // tree was already observed as a source-tree event.
1819 // Skip silently; loop for the next debounce window.
1820 continue;
1821 }
1822 Ok(Some(cs)) if cs.changed_files.is_empty() && cs.requires_full_rebuild() => {
1823 // Empty-files + full-rebuild classification
1824 // (BranchSwitch or TreeDiverged) is either a TOCTOU
1825 // artifact or a graph-neutral git operation. In both
1826 // cases, skipping is correct:
1827 //
1828 // * **TOCTOU artifact.** Immediately after a git
1829 // operation, the classifier's
1830 // `git rev-parse HEAD HEAD^{tree}` subprocess can
1831 // transiently return partial output, causing
1832 // `current_state` fields to drop to `None` and the
1833 // classifier to fall back to BranchSwitch (see
1834 // `GitStateWatcher::classify` at
1835 // `sqry-core/src/watch/git_state.rs:240-258`). A
1836 // subsequent debounce window will re-observe once
1837 // git settles and fire a legitimate dispatch if
1838 // state actually diverged.
1839 //
1840 // * **Graph-neutral branch/tree move.** Git can
1841 // genuinely switch refs without swapping
1842 // working-tree content when both refs point at the
1843 // same tree (for example,
1844 // `git checkout other-branch` where `other-branch`
1845 // is already at HEAD's tree). The classifier
1846 // reports BranchSwitch because `head_ref` changed,
1847 // but no source file events fire because no source
1848 // content changed. The published graph is already
1849 // consistent with the new ref — a rebuild would be
1850 // pure overhead.
1851 //
1852 // A "real" tree divergence that our graph does not
1853 // yet reflect (a pull, a reset, a branch switch that
1854 // actually rewrites files) emits concrete source
1855 // file events the source-tree watcher captures; that
1856 // case falls through to the dispatch arm below and
1857 // triggers the rebuild as intended.
1858 tracing::debug!(
1859 target: "sqry_daemon::watch",
1860 workspace = %ws.key.source_root.display(),
1861 git_class = ?cs.git_change_class,
1862 "skipping empty-files full-rebuild signal: TOCTOU or graph-neutral git move"
1863 );
1864 continue;
1865 }
1866 Ok(Some(cs)) => {
1867 // Capture the git state AS OF now (after debounce
1868 // completion). The async side will attach this to
1869 // the PendingRebuild via
1870 // `handle_changes_with_git_state`; the runner will
1871 // commit it to `ws.last_indexed_git_state` at
1872 // publish time.
1873 let new_git_state = watcher.git_state().current_state();
1874 if tx.blocking_send((cs, new_git_state)).is_err() {
1875 tracing::debug!(
1876 target: "sqry_daemon::watch",
1877 workspace = %ws.key.source_root.display(),
1878 "async dispatcher task dropped receiver; terminating blocking loop"
1879 );
1880 break;
1881 }
1882 }
1883 }
1884 }
1885}
1886
1887/// Async dispatcher loop — runs on a `tokio::spawn`ed task.
1888///
1889/// Consumes `(ChangeSet, LastIndexedGitState)` pairs from `rx` and
1890/// dispatches each via
1891/// [`RebuildDispatcher::handle_changes_with_git_state`]. The runner
1892/// commits `ws.last_indexed_git_state` as part of its publish
1893/// bookkeeping, keyed off the attached snapshot.
1894///
1895/// # Termination
1896///
1897/// Exits on:
1898/// - `rx.recv()` returns `None` — blocking side exited, channel
1899/// closed; logged at debug.
1900/// - `handle_changes_with_git_state` returns
1901/// `Err(WorkspaceEvicted)` — workspace is gone; logged at info.
1902///
1903/// Transient errors (`MemoryBudgetExceeded`, `WorkspaceBuildFailed`,
1904/// `Io`) continue the loop — the baseline is not advanced (the
1905/// publish did not happen), so the next wait_for_changes_cancellable
1906/// call re-observes the divergence and retries.
1907async fn dispatch_loop_async(
1908 dispatcher: &Arc<RebuildDispatcher>,
1909 key: &WorkspaceKey,
1910 ws: &LoadedWorkspace,
1911 mut rx: tokio::sync::mpsc::Receiver<(ChangeSet, LastIndexedGitState)>,
1912) {
1913 loop {
1914 let Some((cs, new_git_state)) = rx.recv().await else {
1915 tracing::debug!(
1916 target: "sqry_daemon::watch",
1917 workspace = %ws.key.source_root.display(),
1918 "watcher channel closed; terminating async dispatcher"
1919 );
1920 break;
1921 };
1922 match dispatcher
1923 .handle_changes_with_git_state(key, cs, new_git_state)
1924 .await
1925 {
1926 Ok(()) => {
1927 // Baseline advance (if any) was handled by the
1928 // runner inside execute_one_rebuild at publish
1929 // time — nothing for the bridge to do here.
1930 }
1931 Err(DaemonError::WorkspaceEvicted { .. }) => {
1932 tracing::info!(
1933 target: "sqry_daemon::watch",
1934 workspace = %ws.key.source_root.display(),
1935 "workspace evicted; terminating async dispatcher"
1936 );
1937 break;
1938 }
1939 Err(e) => {
1940 tracing::warn!(
1941 target: "sqry_daemon::watch",
1942 workspace = %ws.key.source_root.display(),
1943 error = %e,
1944 "rebuild failed; baseline unchanged, retrying on next change"
1945 );
1946 // loop continues
1947 }
1948 }
1949 }
1950}
1951
1952/// Run the appropriate sqry-core entrypoint on the current (blocking)
1953/// thread. Factored out of `execute_rebuild` so the blocking closure
1954/// is a plain free function — easier to review and easier to mock in
1955/// unit tests if future phases need to.
1956///
1957/// Task 7 Phase 7c: takes a [`CancellationToken`] that's polled at
1958/// every pass boundary. A cancelled token produces
1959/// [`GraphBuilderError::Cancelled`] which this helper maps to
1960/// [`DaemonError::WorkspaceEvicted`] — cancellation only fires when
1961/// the workspace is evicted (the dispatcher's
1962/// [`spawn_cancellation_forwarder`] flips the token on observing
1963/// `ws.rebuild_cancelled = true`).
1964fn execute_rebuild_blocking(
1965 root: &std::path::Path,
1966 prior: &Arc<CodeGraph>,
1967 mode: RebuildMode,
1968 changes: ChangeSet,
1969 plugins: &PluginManager,
1970 cfg: &BuildConfig,
1971 cancellation: &CancellationToken,
1972) -> Result<CodeGraph, DaemonError> {
1973 match mode {
1974 RebuildMode::Full => {
1975 match build_unified_graph_cancellable(root, plugins, cfg, cancellation) {
1976 Ok(graph) => Ok(graph),
1977 Err(e) => Err(map_graph_builder_err(e, root.to_path_buf(), "full rebuild")),
1978 }
1979 }
1980 RebuildMode::Incremental => {
1981 let paths: &[PathBuf] = &changes.changed_files;
1982
1983 // Closure math — resolve paths registered in the graph.
1984 // Unresolved paths are handled by
1985 // `phase3e_discover_new_file_paths` inside
1986 // `incremental_rebuild`.
1987 let file_ids: Vec<_> = paths.iter().filter_map(|p| prior.files().get(p)).collect();
1988 let closure = compute_reverse_dep_closure(&file_ids, prior.as_ref());
1989
1990 // Task 7 Phase 7c: real cancellation token from
1991 // `LoadedWorkspace::rebuild_cancelled`, wired via the
1992 // forwarder spawned in `execute_rebuild`.
1993 incremental_rebuild(prior.as_ref(), paths, &closure, plugins, cfg, cancellation)
1994 .map_err(|e| map_graph_builder_err(e, root.to_path_buf(), "incremental rebuild"))
1995 }
1996 }
1997}
1998
1999/// Map a sqry-core [`GraphBuilderError`] to the daemon surface type.
2000///
2001/// - `Cancelled` → [`DaemonError::WorkspaceEvicted`] (JSON-RPC -32004).
2002/// Cancellation only fires on eviction in the current design, so the
2003/// evicted-workspace termination signal is the correct mapping.
2004/// - Any other variant → [`DaemonError::WorkspaceBuildFailed`] (-32001)
2005/// with a human-readable reason prefixed by `stage`.
2006fn map_graph_builder_err(err: GraphBuilderError, root: PathBuf, stage: &str) -> DaemonError {
2007 match err {
2008 GraphBuilderError::Cancelled => DaemonError::WorkspaceEvicted { root },
2009 other => DaemonError::WorkspaceBuildFailed {
2010 root,
2011 reason: format!("{stage}: {other}"),
2012 },
2013 }
2014}
2015
2016// ---------------------------------------------------------------------------
2017// Cancellation forwarder (Task 7 Phase 7c)
2018// ---------------------------------------------------------------------------
2019
2020/// Poll period for the cancellation forwarder. 50 ms is coarse enough
2021/// to keep the background task's CPU footprint negligible while still
2022/// bounding cancellation latency at `50ms + next pass boundary`. Tests
2023/// that need faster propagation can lower via a future hook; the
2024/// constant is sufficient for production.
2025const CANCEL_FORWARDER_POLL_MS: u64 = 50;
2026
2027/// Spawn a tokio task that mirrors `ws.rebuild_cancelled` into
2028/// `token`. The task polls the atomic on a [`CANCEL_FORWARDER_POLL_MS`]
2029/// cadence; the first observation of `true` calls `token.cancel()`
2030/// and exits.
2031///
2032/// The returned [`JoinHandle`] MUST be `abort()`ed by the caller after
2033/// the rebuild future completes — otherwise a quiet workspace (no
2034/// eviction) leaves the polling task running until the runtime is
2035/// dropped.
2036///
2037/// Task 7 Phase 7c rationale (Codex iter-2 Q2, Q9): a `Notify`-based
2038/// forwarder is not demonstrably better here — the atomic remains the
2039/// authoritative source of truth, lock-free, with
2040/// `Release`/`Acquire` ordering. Polling adds one atomic load every
2041/// 50 ms, which is negligible against rebuild timescales (seconds).
2042fn spawn_cancellation_forwarder(
2043 ws: Arc<LoadedWorkspace>,
2044 token: CancellationToken,
2045) -> JoinHandle<()> {
2046 tokio::spawn(async move {
2047 loop {
2048 if ws.rebuild_cancelled.load(Ordering::Acquire) {
2049 token.cancel();
2050 return;
2051 }
2052 tokio::time::sleep(std::time::Duration::from_millis(CANCEL_FORWARDER_POLL_MS)).await;
2053 }
2054 })
2055}
2056
2057// ---------------------------------------------------------------------------
2058// DrainLoopSentinel — panic-safety for rebuild_in_flight (Task 7 Phase 7b1)
2059// ---------------------------------------------------------------------------
2060
2061/// Panic-safety sentinel for the Phase B drain loop in
2062/// [`RebuildDispatcher::handle_changes`].
2063///
2064/// Guarantees that [`LoadedWorkspace::rebuild_in_flight`] is released
2065/// if `handle_changes` unwinds abnormally. The normal path disarms
2066/// the sentinel (`armed = false`) after releasing `rebuild_in_flight`
2067/// under the lane lock inside the drain-loop-exit block; the Drop
2068/// impl is a no-op on the happy path.
2069///
2070/// # Narrow race on the unwind path
2071///
2072/// If `handle_changes` unwinds abnormally, the Drop impl stores
2073/// `rebuild_in_flight = false` WITHOUT re-acquiring the lane. There
2074/// is a narrow window between the unwind and the Drop store during
2075/// which a concurrent caller can:
2076///
2077/// 1. Lock the lane (freed by the unwinding guard's tokio-Mutex drop).
2078/// 2. Coalesce incoming with whatever was in the lane.
2079/// 3. Observe `rebuild_in_flight = true` (we have not stored `false`
2080/// yet).
2081/// 4. Park coalesced in the lane and return `Ok(())`.
2082///
2083/// After the sentinel's Drop stores `false`, that parked
2084/// `PendingRebuild` sits without a runner until the NEXT dispatch
2085/// arrives — which will see `rebuild_in_flight = false`, take the
2086/// runner role, drain the stranded pending, and process it.
2087///
2088/// This is accepted as defense-in-depth: the only realistic trigger
2089/// for a `handle_changes` unwind is a runtime-level failure (OOM,
2090/// tokio internal panic) in which case the daemon is already in
2091/// damage-control territory. Plugin panics during the rebuild
2092/// pipeline are caught by `spawn_blocking` inside
2093/// [`RebuildDispatcher::execute_rebuild`] and mapped to
2094/// [`DaemonError::WorkspaceBuildFailed`], which flows through the
2095/// drain loop as `last_result` — NOT as an unwind.
2096struct DrainLoopSentinel {
2097 /// Shared workspace ref so the `Drop` impl outlives any borrow.
2098 ws: Arc<LoadedWorkspace>,
2099 /// Disarmed (`false`) after the normal-path under-lane release
2100 /// in the drain loop's exit blocks.
2101 armed: bool,
2102}
2103
2104impl Drop for DrainLoopSentinel {
2105 fn drop(&mut self) {
2106 if !self.armed {
2107 return;
2108 }
2109 tracing::error!(
2110 target: "sqry_daemon::rebuild",
2111 workspace = %self.ws.key.source_root.display(),
2112 "handle_changes unwound with armed DrainLoopSentinel — \
2113 releasing rebuild_in_flight defensively; any parked \
2114 PendingRebuild will be processed on the next dispatch"
2115 );
2116 self.ws.rebuild_in_flight.store(false, Ordering::Release);
2117 }
2118}
2119
2120// ---------------------------------------------------------------------------
2121// Inline unit tests — narrow helpers only.
2122// ---------------------------------------------------------------------------
2123//
2124// The exhaustive decision-fork / coalesce-algebra / integration
2125// matrices live in the `tests/rebuild_*` binaries. These inline
2126// tests pin down the private helpers (`RebuildMode` encoding,
2127// `merge_git_class`, `DrainLoopSentinel` Drop semantics) that the
2128// external binaries exercise indirectly.
2129#[cfg(test)]
2130mod tests {
2131 use super::*;
2132
2133 #[test]
2134 fn rebuild_mode_u8_roundtrip() {
2135 for mode in [RebuildMode::Full, RebuildMode::Incremental] {
2136 let encoded = mode.as_u8();
2137 assert_eq!(RebuildMode::from_u8(encoded), Some(mode));
2138 }
2139 // Unset (fresh AtomicU8) → None.
2140 assert_eq!(RebuildMode::from_u8(0), None);
2141 // Out-of-range → None.
2142 assert_eq!(RebuildMode::from_u8(3), None);
2143 assert_eq!(RebuildMode::from_u8(255), None);
2144 }
2145
2146 #[test]
2147 fn merge_git_class_full_rebuild_dominance_canonicalises_to_tree_diverged() {
2148 for full_variant in [GitChangeClass::BranchSwitch, GitChangeClass::TreeDiverged] {
2149 for non_full in [
2150 None,
2151 Some(GitChangeClass::LocalCommit),
2152 Some(GitChangeClass::Noise),
2153 ] {
2154 assert_eq!(
2155 merge_git_class(Some(full_variant), non_full),
2156 Some(GitChangeClass::TreeDiverged),
2157 );
2158 assert_eq!(
2159 merge_git_class(non_full, Some(full_variant)),
2160 Some(GitChangeClass::TreeDiverged),
2161 );
2162 }
2163 }
2164 }
2165
2166 #[test]
2167 fn merge_git_class_non_full_later_wins() {
2168 assert_eq!(
2169 merge_git_class(
2170 Some(GitChangeClass::LocalCommit),
2171 Some(GitChangeClass::Noise)
2172 ),
2173 Some(GitChangeClass::Noise),
2174 );
2175 assert_eq!(
2176 merge_git_class(
2177 Some(GitChangeClass::Noise),
2178 Some(GitChangeClass::LocalCommit)
2179 ),
2180 Some(GitChangeClass::LocalCommit),
2181 );
2182 }
2183
2184 #[test]
2185 fn merge_git_class_absorbs_none_symmetrically() {
2186 assert_eq!(merge_git_class(None, None), None);
2187 assert_eq!(
2188 merge_git_class(None, Some(GitChangeClass::Noise)),
2189 Some(GitChangeClass::Noise),
2190 );
2191 assert_eq!(
2192 merge_git_class(Some(GitChangeClass::LocalCommit), None),
2193 Some(GitChangeClass::LocalCommit),
2194 );
2195 }
2196
2197 // ---------------------------------------------------------------
2198 // DrainLoopSentinel (Task 7 Phase 7b1)
2199 // ---------------------------------------------------------------
2200
2201 fn make_sentinel_workspace() -> Arc<LoadedWorkspace> {
2202 use sqry_core::project::ProjectRootMode;
2203 Arc::new(LoadedWorkspace::new(
2204 WorkspaceKey::new(
2205 std::path::PathBuf::from("/repos/sentinel-test"),
2206 ProjectRootMode::GitRoot,
2207 0xBEEF,
2208 ),
2209 false,
2210 ))
2211 }
2212
2213 #[test]
2214 fn drain_loop_sentinel_disarmed_is_noop() {
2215 // Normal path: the drain loop disarms the sentinel after the
2216 // under-lane release. Dropping a disarmed sentinel must NOT
2217 // touch `rebuild_in_flight` — the release already happened.
2218 let ws = make_sentinel_workspace();
2219 // Simulate the drain loop having already released the flag.
2220 ws.rebuild_in_flight.store(false, Ordering::Release);
2221 {
2222 let sentinel = DrainLoopSentinel {
2223 ws: Arc::clone(&ws),
2224 armed: false,
2225 };
2226 // sentinel dropped here — disarmed, Drop is a no-op.
2227 drop(sentinel);
2228 }
2229 assert!(
2230 !ws.rebuild_in_flight.load(Ordering::Acquire),
2231 "disarmed sentinel must not flip the flag"
2232 );
2233 }
2234
2235 #[test]
2236 fn drain_loop_sentinel_armed_releases_in_flight_on_drop() {
2237 // Panic/unwind path: the sentinel is still armed when dropped.
2238 // Its Drop impl must release `rebuild_in_flight` so a future
2239 // caller can take the runner role.
2240 let ws = make_sentinel_workspace();
2241 ws.rebuild_in_flight.store(true, Ordering::Release);
2242 {
2243 let sentinel = DrainLoopSentinel {
2244 ws: Arc::clone(&ws),
2245 armed: true,
2246 };
2247 drop(sentinel);
2248 }
2249 assert!(
2250 !ws.rebuild_in_flight.load(Ordering::Acquire),
2251 "armed sentinel Drop must release rebuild_in_flight"
2252 );
2253 }
2254
2255 // ---------------------------------------------------------------
2256 // gate_check — TestGate plumbing (Task 7 Phase 7b2)
2257 // ---------------------------------------------------------------
2258 //
2259 // These tests stand up a RebuildDispatcher with no workspace and
2260 // exercise the gate_check helper in isolation. The dispatcher's
2261 // WorkspaceManager / PluginManager fields are initialised but
2262 // unused — gate_check only reads `self.test_gate`.
2263
2264 fn make_dispatcher_for_gate_test() -> Arc<RebuildDispatcher> {
2265 let config = Arc::new(crate::config::DaemonConfig::default());
2266 let manager = crate::workspace::WorkspaceManager::new_without_reaper(Arc::clone(&config));
2267 let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
2268 RebuildDispatcher::new(manager, config, plugins)
2269 }
2270
2271 #[tokio::test]
2272 async fn gate_check_is_noop_when_no_gate_installed() {
2273 // Production fast path: `test_gate.get()` returns `None`, the
2274 // helper short-circuits before allocating a `notified()`
2275 // future, and execute_one_rebuild proceeds immediately.
2276 let dispatcher = make_dispatcher_for_gate_test();
2277 // Call gate_check; it must return without awaiting anything.
2278 tokio::time::timeout(Duration::from_millis(50), dispatcher.gate_check())
2279 .await
2280 .expect("gate_check with no installed gate must return immediately");
2281 }
2282
2283 #[tokio::test]
2284 async fn gate_check_blocks_then_decrements_hold_on_release() {
2285 // Install a gate with hold=1. The first gate_check blocks
2286 // until notify_one is fired; after decrementing, subsequent
2287 // gate_checks pass through immediately because hold==0.
2288 let dispatcher = make_dispatcher_for_gate_test();
2289 let gate = Arc::new(TestGate {
2290 hold: AtomicUsize::new(1),
2291 release: tokio::sync::Notify::new(),
2292 });
2293 dispatcher
2294 .install_test_gate(Arc::clone(&gate))
2295 .expect("first install must succeed");
2296
2297 // Spawn a task that will block in gate_check.
2298 let dispatcher_for_task = Arc::clone(&dispatcher);
2299 let blocked = tokio::spawn(async move { dispatcher_for_task.gate_check().await });
2300
2301 // Give the task a moment to enter gate_check's await.
2302 tokio::time::sleep(Duration::from_millis(25)).await;
2303 assert!(
2304 !blocked.is_finished(),
2305 "gate_check must block while hold > 0"
2306 );
2307
2308 // Release. The blocked task wakes and decrements hold.
2309 gate.release.notify_one();
2310 tokio::time::timeout(Duration::from_millis(500), blocked)
2311 .await
2312 .expect("gate_check must complete promptly after release")
2313 .expect("task panicked");
2314
2315 // hold must have been decremented to 0.
2316 assert_eq!(
2317 gate.hold.load(Ordering::Acquire),
2318 0,
2319 "gate release must decrement hold"
2320 );
2321
2322 // Subsequent gate_check is a no-op (hold==0 short-circuit).
2323 tokio::time::timeout(Duration::from_millis(50), dispatcher.gate_check())
2324 .await
2325 .expect("gate_check with hold==0 must return immediately without awaiting");
2326 }
2327
2328 // ─────────────────────────────────────────────────────────────────
2329 // Cluster-G iter-3 BLOCKER 3 — `record_and_transition_on_err`
2330 // differentiates eviction from in-iteration `daemon reset`
2331 // cancellation.
2332 // ─────────────────────────────────────────────────────────────────
2333
2334 /// Eviction path: state was already written to `Evicted` by
2335 /// `execute_eviction` under `workspaces.write()` BEFORE the
2336 /// rebuild_cancelled flag was flipped. The runner observes
2337 /// `WorkspaceEvicted`, calls `record_and_transition_on_err`, and
2338 /// the helper must NOT clobber the Evicted state.
2339 #[test]
2340 fn record_and_transition_on_err_preserves_evicted_state() {
2341 let dispatcher = make_dispatcher_for_gate_test();
2342 let ws = Arc::new(LoadedWorkspace::new(
2343 crate::workspace::state::WorkspaceKey::new(
2344 std::path::PathBuf::from("/repo"),
2345 sqry_core::project::ProjectRootMode::GitRoot,
2346 0x1,
2347 ),
2348 false,
2349 ));
2350 ws.store_state(crate::workspace::state::WorkspaceState::Evicted);
2351 let err = DaemonError::WorkspaceEvicted {
2352 root: std::path::PathBuf::from("/repo"),
2353 };
2354 dispatcher.record_and_transition_on_err(&ws, &err);
2355 assert_eq!(
2356 ws.load_state(),
2357 crate::workspace::state::WorkspaceState::Evicted,
2358 "eviction-path WorkspaceEvicted must NOT transition state"
2359 );
2360 }
2361
2362 /// `daemon reset` path (cluster-G iter-3 BLOCKER 3 fix): reset
2363 /// fired AFTER the runner's top-of-loop gate but BEFORE the
2364 /// publish recheck. The runner converts the cancellation into
2365 /// `WorkspaceEvicted` while state is still `Rebuilding` (no
2366 /// eviction wrote `Evicted` because no eviction actually
2367 /// happened — `WorkspaceManager::reset`'s `Rebuilding` arm only
2368 /// flipped `rebuild_cancelled` and returned
2369 /// `ResetCancellationDispatched`). The helper MUST transition to
2370 /// `Unloaded` so the next `daemon load` recovers the workspace
2371 /// (without this fix, the workspace stays stuck in `Rebuilding`
2372 /// until `daemon stop && daemon start`).
2373 #[test]
2374 fn record_and_transition_on_err_unloads_reset_in_iteration_cancel() {
2375 let dispatcher = make_dispatcher_for_gate_test();
2376 let ws = Arc::new(LoadedWorkspace::new(
2377 crate::workspace::state::WorkspaceKey::new(
2378 std::path::PathBuf::from("/repo"),
2379 sqry_core::project::ProjectRootMode::GitRoot,
2380 0x1,
2381 ),
2382 false,
2383 ));
2384 // Runner is mid-iteration: state is `Rebuilding` (the
2385 // distinguisher between eviction and reset).
2386 ws.store_state(crate::workspace::state::WorkspaceState::Rebuilding);
2387 let err = DaemonError::WorkspaceEvicted {
2388 root: std::path::PathBuf::from("/repo"),
2389 };
2390 dispatcher.record_and_transition_on_err(&ws, &err);
2391 assert_eq!(
2392 ws.load_state(),
2393 crate::workspace::state::WorkspaceState::Unloaded,
2394 "in-iteration reset cancellation must transition Rebuilding → Unloaded; \
2395 without this, daemon reset → daemon load cannot recover"
2396 );
2397 }
2398
2399 /// Sanity: any other `DaemonError` (e.g. `WorkspaceOversize`,
2400 /// `Internal`) → `Failed` regardless of starting state.
2401 #[test]
2402 fn record_and_transition_on_err_failed_for_non_eviction_errors() {
2403 let dispatcher = make_dispatcher_for_gate_test();
2404 let ws = Arc::new(LoadedWorkspace::new(
2405 crate::workspace::state::WorkspaceKey::new(
2406 std::path::PathBuf::from("/repo"),
2407 sqry_core::project::ProjectRootMode::GitRoot,
2408 0x1,
2409 ),
2410 false,
2411 ));
2412 ws.store_state(crate::workspace::state::WorkspaceState::Rebuilding);
2413 let err = DaemonError::Internal(anyhow::anyhow!("plugin panic"));
2414 dispatcher.record_and_transition_on_err(&ws, &err);
2415 assert_eq!(
2416 ws.load_state(),
2417 crate::workspace::state::WorkspaceState::Failed,
2418 "non-eviction errors must transition to Failed"
2419 );
2420 }
2421}