Skip to main content

sqry_daemon/workspace/
manager.rs

1//! [`WorkspaceManager`] — admission accounting entry points.
2//!
3//! Covers Task 6 Steps 3 / 4 / 4a / 4b / 4c / 4d of the sqryd plan
4//! (Amendment 2 §G.1–§G.7). This file lands the admission-accounting
5//! half of the manager — `reserve_rebuild`, `publish_and_retain`,
6//! `RollbackGuard`, and the retention reaper. Workspace lifecycle
7//! (`get_or_load`, `evict_lru`, `unload`, `status`, Failed-state
8//! handling) lands in Phase 6b.
9//!
10//! ## Lock order (authoritative — referenced by §J.4)
11//!
12//! All code paths that acquire more than one lock MUST follow this
13//! total order; acquiring out of order is a bug enforced by code
14//! review.
15//!
16//! 1. `WorkspaceManager.workspaces: RwLock<HashMap<...>>`
17//! 2. `LoadedWorkspace.rebuild_lane: tokio::sync::Mutex<_>` *(Task 7)*
18//! 3. `WorkspaceManager.admission: parking_lot::Mutex<AdmissionState>`
19//!
20//! `WorkspaceManager.hook: RwLock<SharedHook>` is a disjoint
21//! sibling — it is NEVER acquired while any of the three locks
22//! above are held. In particular, the post-publish hook dispatch
23//! (`hook_snapshot` + `SqrydHook::on_publish`) is fired from
24//! `get_or_load` AFTER dropping `workspaces_guard` so the hook
25//! dispatch, and any re-entrant manager method a hook impl might
26//! call, cannot deadlock against the loader that fired it
27//! (Codex Task 6 Phase 6c iter-2 MAJOR).
28//!
29//! Rules:
30//! - A holder of `admission` may NOT acquire `rebuild_lane` or
31//!   `workspaces` — it is the innermost lock.
32//! - A holder of `rebuild_lane` may NOT acquire `workspaces`.
33//!   `rebuild_lane` is used only for scheduling/coalescing pending
34//!   rebuilds; it is never held across a call that takes `workspaces`
35//!   or `admission` nestedly.
36//! - A holder of `workspaces` (reader or writer) may NOT acquire
37//!   `hook`. Hook dispatch happens only after every outer
38//!   workspaces-lock holder has released.
39//! - Eviction iterates `workspaces`, sets the per-workspace atomic
40//!   `rebuild_cancelled` flag (no lock), then acquires `admission`
41//!   alone to update accounting. Eviction never takes `rebuild_lane`.
42//! - The retention reaper acquires only `admission`.
43
44use std::{
45    collections::HashMap,
46    path::Path,
47    sync::{
48        Arc, Weak,
49        atomic::{AtomicU64, Ordering},
50    },
51    time::{Duration, Instant, SystemTime},
52};
53
54use parking_lot::{Mutex, RwLock};
55use sqry_core::graph::{CodeGraph, unified::GraphMemorySize};
56use tokio::task::JoinHandle;
57use tracing::warn;
58
59use crate::{config::DaemonConfig, error::DaemonError};
60
61use super::{
62    admission::{AdmissionState, RetainedEntry},
63    builder::WorkspaceBuilder,
64    hook::{NoOpHook, SharedHook, SqrydHook},
65    loaded::LoadedWorkspace,
66    staleness::{StalenessVerdict, classify_staleness},
67    state::{OldGraphToken, WorkspaceKey, WorkspaceState},
68    status::{DaemonStatus, MemoryStatus, WorkspaceStatus},
69};
70
71// ---------------------------------------------------------------------------
72// ServeVerdict
73// ---------------------------------------------------------------------------
74
75/// Outcome of [`WorkspaceManager::classify_for_serve`].
76///
77/// Task 7 Phase 7c. Rich-enum return so the IPC router (Task 8) can
78/// decide how to shape its response without re-classifying.
79#[derive(Debug, Clone)]
80pub enum ServeVerdict {
81    /// Workspace is healthy; serve from `graph`. Wraps an `Arc` — the
82    /// caller holds a strong reference until it is dropped, independent
83    /// of any subsequent publish or eviction.
84    Fresh {
85        graph: Arc<CodeGraph>,
86        /// Observed workspace state at classification time — either
87        /// [`WorkspaceState::Loaded`] or [`WorkspaceState::Rebuilding`].
88        /// Task 7's envelope populates `meta.workspace_state` from this
89        /// field so clients can tell which flavour of Fresh they
90        /// received (a freshly-loaded snapshot vs. one whose successor
91        /// rebuild is already in flight).
92        state: WorkspaceState,
93    },
94    /// Workspace is in `Failed` state but within the
95    /// `stale_serve_max_age_hours` cap. Serve from `graph` with
96    /// `meta.stale = true` and `age_hours` in the response envelope.
97    Stale {
98        graph: Arc<CodeGraph>,
99        age_hours: u64,
100        /// Timestamp of the last successful build. Task 7 renders this
101        /// into the `_stale_warning` string as RFC3339 / UTC-Zulu.
102        last_good_at: SystemTime,
103        /// Textual diagnostic from the most recent failed build, if any.
104        /// `None` when the workspace has been Failed since the last good
105        /// build but no error text was captured.
106        last_error: Option<String>,
107    },
108    /// Workspace exists in the manager map but is not yet ready to
109    /// serve (`Unloaded` or `Loading`). The IPC router decides what to
110    /// do next (retry-after-delay, enqueue, surface a client-appropriate
111    /// code) — the manager does not prescribe a retry policy.
112    NotReady { state: WorkspaceState },
113}
114
115// ---------------------------------------------------------------------------
116// WorkspaceManager
117// ---------------------------------------------------------------------------
118
119/// Owns every loaded workspace plus the admission-accounting state.
120///
121/// Construction spawns the retention reaper task (§G.3). The handle is
122/// stored so `Drop` can abort it cleanly — on daemon shutdown the
123/// reaper is aborted, then the admission state drops, dropping every
124/// retained `Arc<CodeGraph>` in one pass. No accounting leak, no
125/// dangling `Arc`.
126#[derive(Debug)]
127pub struct WorkspaceManager {
128    /// Immutable daemon configuration — used for the memory budget,
129    /// the reaper interval, and the drain-timeout warning threshold.
130    config: Arc<DaemonConfig>,
131
132    /// Per-workspace state, keyed by [`WorkspaceKey`]. `RwLock` so
133    /// the read-only status path contends only with infrequent
134    /// insert / remove writers.
135    workspaces: RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>>,
136
137    /// Single-mutex admission accounting — see [`AdmissionState`]
138    /// module docs for the §G.5 invariant.
139    admission: Mutex<AdmissionState>,
140
141    /// Join handle of the spawned retention reaper. `Option` so
142    /// `Drop` can `.take().abort()` without requiring `&mut self`.
143    reaper: Mutex<Option<JoinHandle<()>>>,
144
145    /// Instant captured at construction. `daemon/status` reports
146    /// `uptime_seconds` = `Instant::now() - started_at`.
147    started_at: Instant,
148
149    /// Monotonic peak of `AdmissionState::total_committed_bytes`
150    /// observed across the daemon's uptime. Updated via `fetch_max`
151    /// on every admission-mutating operation. Amendment 2 §D.
152    total_memory_high_water: AtomicU64,
153
154    /// Post-publish persistence hook. Defaults to a no-op; Task 9's
155    /// daemon binary installs the production `QueryDbHook` that
156    /// wraps `sqry_db::persistence::save_derived`. Swapped via
157    /// [`Self::set_hook`] at daemon boot after the `QueryDb` is
158    /// constructed.
159    ///
160    /// `RwLock` rather than `ArcSwap` because `SharedHook = Arc<dyn
161    /// Trait + Send + Sync>` is cheap to clone inside the read
162    /// critical section, and the hook is only consulted on publish
163    /// (not on every query) — the RwLock is never a hot path.
164    hook: RwLock<SharedHook>,
165}
166
167impl WorkspaceManager {
168    /// Construct a fresh manager and spawn the retention reaper.
169    ///
170    /// The reaper is spawned on the current Tokio runtime. Callers
171    /// must therefore construct the manager from a Tokio context
172    /// (`#[tokio::main]`, an `async` block driven by `Runtime::block_on`,
173    /// etc.). Tests that don't need the reaper can use
174    /// [`Self::new_without_reaper`].
175    pub fn new(config: Arc<DaemonConfig>) -> Arc<Self> {
176        let mgr = Arc::new(Self {
177            config: Arc::clone(&config),
178            workspaces: RwLock::new(HashMap::new()),
179            admission: Mutex::new(AdmissionState::default()),
180            reaper: Mutex::new(None),
181            started_at: Instant::now(),
182            total_memory_high_water: AtomicU64::new(0),
183            hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
184        });
185        let handle = tokio::spawn(retention_reaper(Arc::downgrade(&mgr)));
186        *mgr.reaper.lock() = Some(handle);
187        mgr
188    }
189
190    /// Like [`Self::new`] but does not spawn the reaper — useful in
191    /// unit tests that drive the retention map synchronously via
192    /// [`Self::reap_once`].
193    #[doc(hidden)]
194    pub fn new_without_reaper(config: Arc<DaemonConfig>) -> Arc<Self> {
195        Arc::new(Self {
196            config,
197            workspaces: RwLock::new(HashMap::new()),
198            admission: Mutex::new(AdmissionState::default()),
199            reaper: Mutex::new(None),
200            started_at: Instant::now(),
201            total_memory_high_water: AtomicU64::new(0),
202            hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
203        })
204    }
205
206    /// Install a post-publish hook. Task 9's daemon binary calls
207    /// this once at startup after constructing the shared
208    /// `QueryDb`; unit tests call it to install a recording hook.
209    /// The old hook is dropped immediately; no retention semantics
210    /// apply.
211    pub fn set_hook(&self, hook: SharedHook) {
212        *self.hook.write() = hook;
213    }
214
215    /// Snapshot the currently installed hook. Internal — used by
216    /// `get_or_load` (Phase 6c iter-2) after dropping the
217    /// `workspaces.read()` guard so the `on_publish` dispatch
218    /// never nests under `workspaces`. Taking the hook under its
219    /// own short read-lock avoids holding the lock across the
220    /// dispatch so a misbehaving hook cannot block a concurrent
221    /// `set_hook` swap.
222    fn hook_snapshot(&self) -> SharedHook {
223        Arc::clone(&*self.hook.read())
224    }
225
226    /// Memory budget in bytes (derived from `config.memory_limit_mb`).
227    #[must_use]
228    pub fn memory_limit_bytes(&self) -> u64 {
229        self.config.memory_limit_bytes()
230    }
231
232    /// Access to the workspace registry (read-only view).
233    ///
234    /// Intentionally `pub(crate)` and `#[allow(dead_code)]` in Phase 6a:
235    /// Phase 6b consumers (`get_or_load`, `evict_lru`, `status`) are the
236    /// first real callers. Keeping the accessor here documents the
237    /// intended visibility boundary rather than forcing later code to
238    /// reach into the field directly.
239    #[allow(dead_code)]
240    pub(crate) fn workspaces(&self) -> &RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
241        &self.workspaces
242    }
243
244    /// Access to the admission mutex (internal). See
245    /// [`Self::workspaces`] for the `#[allow(dead_code)]` rationale.
246    #[allow(dead_code)]
247    pub(crate) fn admission(&self) -> &Mutex<AdmissionState> {
248        &self.admission
249    }
250
251    /// Look up a loaded workspace by key without acquiring `rebuild_lane`
252    /// or `admission`.
253    ///
254    /// Returns `Some(Arc<LoadedWorkspace>)` if a workspace is currently
255    /// registered under `key`, or `None` otherwise. The `workspaces`
256    /// read guard is acquired and released inside the call — callers
257    /// never observe it nested with any other lock.
258    ///
259    /// Added for the Task 7 [`crate::rebuild::RebuildDispatcher`] which
260    /// needs a cheap handle on `Arc<LoadedWorkspace>` as a precondition
261    /// before entering the canonical §J.4 ordered sequence
262    /// (`rebuild_lane` → `admission`). This is *not* part of the
263    /// ordered sequence itself — the §J.4 contract only constrains
264    /// paths that hold more than one lock simultaneously. Here, the
265    /// `workspaces` guard is dropped before the caller takes
266    /// `rebuild_lane`, so there is no nesting.
267    #[allow(dead_code)] // Consumed by rebuild.rs once Task 7 `rebuild` module lands.
268    /// Shared lookup: returns the `Arc<LoadedWorkspace>` keyed by
269    /// `key` if present. Used by `RebuildDispatcher::handle_changes`
270    /// (inside the crate) and by external test harnesses (Task 7
271    /// Phase 7b1 `rebuild_runner_gate.rs`) that need to inspect
272    /// workspace-level atomics (`rebuild_in_flight`, `rebuild_cancelled`)
273    /// or the `rebuild_lane` mutex directly.
274    ///
275    /// This is NOT a JSON-RPC surface — the IPC layer should use
276    /// `status()` for point-in-time workspace state. Direct `lookup`
277    /// access bypasses the LRU touch that `status()` performs.
278    pub fn lookup(&self, key: &WorkspaceKey) -> Option<Arc<LoadedWorkspace>> {
279        self.workspaces.read().get(key).cloned()
280    }
281
282    /// Retention reaper: a single pass over `retained_old`.
283    ///
284    /// Removes entries whose `Arc::strong_count` has dropped to 1 —
285    /// meaning the admission map is the last holder. Emits a
286    /// one-shot WARN log line when an entry exceeds
287    /// `rebuild_drain_timeout_ms` without dropping.
288    ///
289    /// **This is the only code path that removes tokens from
290    /// `retained_old`.** Any other code that mutates the retention
291    /// map is a violation of §G.3.
292    pub fn reap_once(&self) {
293        let timeout = Duration::from_millis(self.config.rebuild_drain_timeout_ms);
294        let now = Instant::now();
295        let mut to_log: Vec<OldGraphToken> = Vec::new();
296        {
297            let mut state = self.admission.lock();
298            state.retained_old.retain(|token, entry| {
299                if Arc::strong_count(&entry.graph) == 1 {
300                    false // Last holder: drop entry + Arc together.
301                } else {
302                    if !entry.warned_past_timeout
303                        && now.saturating_duration_since(entry.published_at) > timeout
304                    {
305                        entry.warned_past_timeout = true;
306                        to_log.push(*token);
307                    }
308                    true
309                }
310            });
311        }
312        for token in to_log {
313            warn!(
314                token = %token,
315                drain_timeout_ms = self.config.rebuild_drain_timeout_ms,
316                "sqryd retention reaper: retained old graph still held past drain timeout \
317                 (not an accounting deadline — bytes stay accounted until strong_count == 1)",
318            );
319        }
320    }
321
322    /// Amendment 2 §G.1 two-phase reservation protocol.
323    ///
324    /// ```text
325    /// Phase 1 (workspaces read + admission read):
326    ///     project_total + estimate ≤ limit?  → commit
327    ///     otherwise                          → pick LRU non-pinned
328    ///                                          victims (`for_key` is
329    ///                                          exempt — a workspace
330    ///                                          cannot evict itself)
331    /// Phase 2 (no locks held):
332    ///     for each victim: execute_eviction()
333    /// Phase 3 (admission alone):
334    ///     re-check projected vs limit     → authoritative commit
335    ///     reserved_bytes += estimate     → return RebuildReservation
336    /// ```
337    ///
338    /// Lock order is `workspaces → admission` in Phase 1, nothing in
339    /// Phase 2, `admission` alone in Phase 3. No nesting of
340    /// `rebuild_lane` — Task 7 adds that layer outside this function.
341    ///
342    /// Returns a [`RebuildReservation`] RAII guard on success. On
343    /// `Err`, the admission state is exactly pre-call — either no
344    /// eviction happened (headroom already available) or the
345    /// eviction cleared retained entries but could not fit.
346    pub fn reserve_rebuild(
347        self: &Arc<Self>,
348        for_key: &WorkspaceKey,
349        working_set_estimate: u64,
350    ) -> Result<RebuildReservation, DaemonError> {
351        let limit = self.memory_limit_bytes();
352
353        // --- Phase 1: peek + plan (holds workspaces → admission) ---
354        //
355        // Task 7 Phase 7b1 tightening: reject if the requester has been
356        // evicted or removed between dispatch and reservation. Both the
357        // membership check and the `rebuild_cancelled` read happen under
358        // the Phase-1 `workspaces.read()` so they serialise against
359        // `execute_eviction`'s `workspaces.write()` (which holds across
360        // both `rebuild_cancelled.store(true)` and `workspaces.remove`).
361        //
362        // Post-serialisation snapshot: the reader sees EITHER pre-eviction
363        // state (`Some(ws)` with `cancelled == false`) OR post-eviction
364        // state (`None` OR `cancelled == true`). Keeping both checks is
365        // belt-and-suspenders against any future eviction-protocol change
366        // that could reorder the two mutations.
367        let victims = {
368            let workspaces = self.workspaces.read();
369
370            let Some(requester_ws) = workspaces.get(for_key) else {
371                return Err(DaemonError::WorkspaceEvicted {
372                    root: for_key.source_root.clone(),
373                });
374            };
375            if requester_ws.rebuild_cancelled.load(Ordering::Acquire) {
376                return Err(DaemonError::WorkspaceEvicted {
377                    root: for_key.source_root.clone(),
378                });
379            }
380
381            let state = self.admission.lock();
382            let projected = state
383                .total_committed_bytes()
384                .saturating_add(working_set_estimate);
385            if projected <= limit {
386                Vec::new() // no victim selection needed
387            } else {
388                let need = projected - limit;
389                Self::plan_eviction(&workspaces, &state, need, for_key)
390            }
391            // Both guards drop here — Phase 2 runs with no locks.
392        };
393
394        // --- Phase 2: execute each eviction with no locks held ---
395        for key in &victims {
396            self.execute_eviction(key);
397        }
398
399        // --- Phase 2.5: opportunistic reap ----------------------
400        //
401        // `execute_eviction` moves the evicted workspace's bytes
402        // from `loaded_bytes` into `retained_old`. If no slow query
403        // still holds the evicted `Arc<CodeGraph>`, the retention
404        // reaper's next tick (25 ms) would free those bytes — but
405        // Phase 3's authoritative re-check runs *now*, before the
406        // reaper gets the chance. Run a synchronous reap pass so
407        // admission sees the free bytes immediately on the common
408        // case of "no outstanding slow queries". Slow-query-held
409        // entries stay retained and still count against the budget,
410        // which is correct per §G.5.
411        if !victims.is_empty() {
412            self.reap_once();
413        }
414
415        // --- Phase 3: authoritative commit (admission alone) ------
416        let mut state = self.admission.lock();
417        let projected = state
418            .total_committed_bytes()
419            .saturating_add(working_set_estimate);
420        if projected > limit {
421            return Err(DaemonError::MemoryBudgetExceeded {
422                limit_bytes: limit,
423                current_bytes: state.loaded_bytes,
424                reserved_bytes: state.reserved_bytes,
425                retained_bytes: state.retained_total_bytes(),
426                requested_bytes: working_set_estimate,
427            });
428        }
429        state.reserved_bytes = state.reserved_bytes.saturating_add(working_set_estimate);
430        self.bump_high_water(&state);
431        drop(state);
432
433        Ok(RebuildReservation {
434            manager: Arc::downgrade(self),
435            bytes: working_set_estimate,
436            released: false,
437        })
438    }
439
440    /// Phase-1 helper: pick the LRU-ordered set of non-pinned
441    /// workspace keys (excluding `for_key`) whose cumulative
442    /// `memory_bytes` meets or exceeds `need`.
443    ///
444    /// Returns keys in eviction order (oldest-first). Callers execute
445    /// evictions in Phase 2 without holding any lock.
446    fn plan_eviction(
447        workspaces: &HashMap<WorkspaceKey, Arc<LoadedWorkspace>>,
448        _state: &AdmissionState,
449        need: u64,
450        for_key: &WorkspaceKey,
451    ) -> Vec<WorkspaceKey> {
452        let mut candidates: Vec<(Instant, u64, WorkspaceKey)> = workspaces
453            .iter()
454            .filter(|(k, ws)| {
455                // Skip the requester (§G.7: a pinned workspace that
456                // exceeds the budget must fail, not evict itself) and
457                // every pinned workspace. Also skip workspaces in
458                // Evicted or Unloaded state — they have no bytes to
459                // reclaim and would be no-ops.
460                **k != *for_key
461                    && !ws.pinned
462                    && ws.load_state() != WorkspaceState::Evicted
463                    && ws.load_state() != WorkspaceState::Unloaded
464            })
465            .map(|(k, ws)| {
466                let last = *ws.last_accessed.read();
467                let bytes = ws.memory_bytes.load(Ordering::Acquire) as u64;
468                (last, bytes, k.clone())
469            })
470            .collect();
471        // Oldest last_accessed first.
472        candidates.sort_by_key(|(ts, _, _)| *ts);
473
474        let mut plan = Vec::new();
475        let mut reclaimed: u64 = 0;
476        for (_, bytes, key) in candidates {
477            if reclaimed >= need {
478                break;
479            }
480            plan.push(key);
481            reclaimed = reclaimed.saturating_add(bytes);
482        }
483        plan
484    }
485
486    /// Execute Phase-2 of an eviction.
487    ///
488    /// Steps, in order:
489    ///
490    /// 1. Swap the workspace's `ArcSwap<CodeGraph>` to an empty
491    ///    placeholder. This releases the old `Arc` from the
492    ///    `ArcSwap` itself — any outstanding slow-query `Arc`s
493    ///    still exist at the same strong count.
494    /// 2. Move those bytes from `loaded_bytes` into `retained_old`
495    ///    (under the admission mutex) — keying on a fresh
496    ///    [`OldGraphToken`]. This preserves the §G.5 invariant:
497    ///    bytes shift from the loaded tier to the retained tier
498    ///    rather than disappearing. The retention reaper frees the
499    ///    entry (and therefore the bytes) when `strong_count` drops
500    ///    to 1, i.e. when every slow query has released its `Arc`.
501    /// 3. Set `rebuild_cancelled = true` so any concurrent
502    ///    `get_or_load` / rebuild running against this workspace
503    ///    observes the signal at its next pass boundary and aborts
504    ///    without publishing.
505    /// 4. Mark the state `Evicted` — and **leave the entry in the
506    ///    manager map** as a tombstone. STEP_6 (workspace-aware-
507    ///    cross-repo, 2026-04-26): keeping the tombstone is what
508    ///    makes per-source-root partial eviction observable through
509    ///    `daemon/workspaceStatus`. The aggregate must report
510    ///    `state == Evicted` for individually-evicted source roots
511    ///    while siblings remain `Loaded`. Removing the entry would
512    ///    silently hide the eviction from the aggregate — exactly
513    ///    the codex iter-1 BLOCK item.
514    ///
515    /// The order is load-bearing: the cancellation flag is set
516    /// *before* the state transition so a concurrent loader that
517    /// re-checks `rebuild_cancelled` after its build (per
518    /// [`Self::get_or_load`]) sees the cancel.
519    ///
520    /// To **fully unload** a workspace (drop the tombstone too),
521    /// callers route through [`Self::unload`] / `daemon/unload`,
522    /// which calls this function and then explicitly removes the
523    /// map entry. LRU eviction (`evict_lru`, `reserve_rebuild`'s
524    /// Phase 2) keeps the tombstone; only an explicit user-driven
525    /// unload removes it.
526    ///
527    /// Codex Task 6 Phase 6b iter-1 MAJOR: the pre-fix version
528    /// dropped the evicted `Arc` at function end and subtracted
529    /// bytes from `loaded_bytes` without inserting a retained
530    /// entry — leaking accounting for any graph still held by a
531    /// slow query.
532    ///
533    /// Codex STEP_6 iter-1 BLOCK: the pre-fix version unconditionally
534    /// removed the entry from `self.workspaces` after marking it
535    /// `Evicted`, defeating partial-eviction reporting. The
536    /// remove-entry step now lives in [`Self::unload`] alone.
537    fn execute_eviction(&self, key: &WorkspaceKey) {
538        // Hold `workspaces.write()` across the ENTIRE eviction —
539        // from the initial lookup through the final state store —
540        // so no concurrent `get_or_load` post-build re-check can
541        // interleave with us. Loaders serialize against eviction
542        // by holding `workspaces.read()` across their own publish
543        // critical section (see `get_or_load` step 7+).
544        //
545        // Lock order is `workspaces → admission` per plan §J.4.
546        // We take `admission` INSIDE this write-lock in Step 2,
547        // which is the outermost-first order the contract
548        // requires.
549        //
550        // Codex Task 6 Phase 6b iter-2 MAJOR: the iter-1 version
551        // took `workspaces.read()` only briefly for the initial
552        // lookup, then dropped it — leaving a window where a
553        // concurrent load's post-build re-check could observe
554        // workspace-still-in-map / cancelled-still-false and then
555        // publish into an already-evicted workspace. Holding
556        // `workspaces.write()` across the full eviction closes
557        // that window.
558        let mut workspaces = self.workspaces.write();
559        // Steps 1–3 (ArcSwap, admission tier transfer, cancellation
560        // + state store) are factored into the shared helper so
561        // [`Self::unload`] can reuse them under a single
562        // workspaces.write() guard.
563        //
564        // Step 4 (DO NOT remove from `self.workspaces`) is implicit
565        // here — the entry stays in the map as a tombstone. The
566        // tombstone is what STEP_6 partial-eviction reporting
567        // depends on. `unload` (the explicit user-driven path)
568        // removes the entry separately after this function returns.
569        self.evict_to_tombstone_locked(&mut workspaces, key);
570        drop(workspaces);
571    }
572
573    /// Load the workspace's graph, building it via `builder` if not
574    /// already present.
575    ///
576    /// Lifecycle gate:
577    ///
578    /// 1. Cache-hit fast path — if the workspace is present AND in
579    ///    [`WorkspaceState::Loaded`], touch + return.
580    /// 2. CAS `Unloaded`/`Evicted`/`Failed` → `Loading`. Exactly one
581    ///    caller wins. If another caller already holds the gate
582    ///    (`Loading`/`Rebuilding`), return an error — Phase 6c /
583    ///    Task 7 will introduce a wait-for-done notify channel.
584    /// 3. The winner arms a [`LoadingGuard`] RAII wrapper that
585    ///    transitions the workspace into [`WorkspaceState::Failed`]
586    ///    on *any* non-success exit (`Err`, early `return`, or
587    ///    panic). This covers the Codex iter-1 MAJOR that a panic
588    ///    from `builder.build()` would leave the workspace stuck
589    ///    in Loading.
590    /// 4. Reserve admission headroom (§G.1 three-phase).
591    /// 5. Build the graph via the injected `builder`.
592    /// 6. Re-check `rebuild_cancelled` + workspace map membership
593    ///    before publishing. If eviction ran during the build, the
594    ///    reservation refunds via RAII and no graph is published.
595    /// 7. Publish via `publish_and_retain`. Disarm the LoadingGuard
596    ///    + record success + touch.
597    /// 8. Release `workspaces_guard`, THEN dispatch the
598    ///    post-publish `SqrydHook`. The hook fires outside every
599    ///    outer manager lock so a hook impl is free to call back
600    ///    into `unload` / `get_or_load` / `set_hook` / `status`
601    ///    without deadlocking against the loader that fired it.
602    ///
603    /// Codex Task 6 Phase 6b iter-1 MAJOR (×2): the pre-fix version
604    /// clobbered a concurrent eviction's `rebuild_cancelled` signal
605    /// and could publish into a workspace already removed from the
606    /// map. The CAS + post-build re-check + LoadingGuard together
607    /// close both holes.
608    ///
609    /// Codex Task 6 Phase 6c iter-2 MAJOR: the pre-fix version
610    /// dispatched the hook from inside `publish_and_retain` while
611    /// the caller still held `workspaces.read()`, giving a hook
612    /// impl that needed `workspaces.write()` (e.g. via `unload`)
613    /// a guaranteed re-entrancy deadlock. Splitting publish and
614    /// hook dispatch into Steps 7 and 8 closes that hole.
615    ///
616    /// # Errors
617    ///
618    /// - [`DaemonError::MemoryBudgetExceeded`] if Phase 3 cannot
619    ///   admit the reservation even after LRU eviction.
620    /// - [`DaemonError::WorkspaceBuildFailed`] surfaced from the
621    ///   builder OR synthesised when a concurrent eviction races
622    ///   the load (`reason = "workspace evicted mid-load"`).
623    pub fn get_or_load(
624        self: &Arc<Self>,
625        key: &WorkspaceKey,
626        builder: &dyn WorkspaceBuilder,
627        working_set_estimate: u64,
628    ) -> Result<Arc<CodeGraph>, DaemonError> {
629        // --- Step 1: cache-hit fast path ------------------------
630        {
631            let workspaces = self.workspaces.read();
632            if let Some(ws) = workspaces.get(key)
633                && ws.load_state() == WorkspaceState::Loaded
634            {
635                ws.touch();
636                return Ok(ws.graph.load_full());
637            }
638        }
639
640        // --- Step 2: take the lifecycle gate via state CAS ------
641        let ws = self.get_or_insert_workspace(key);
642        let allowed = [
643            WorkspaceState::Unloaded.as_u8(),
644            WorkspaceState::Failed.as_u8(),
645            WorkspaceState::Evicted.as_u8(),
646        ];
647        let mut acquired_from: Option<WorkspaceState> = None;
648        for prior in allowed {
649            if ws
650                .state
651                .compare_exchange(
652                    prior,
653                    WorkspaceState::Loading.as_u8(),
654                    Ordering::AcqRel,
655                    Ordering::Acquire,
656                )
657                .is_ok()
658            {
659                acquired_from = WorkspaceState::from_u8(prior);
660                break;
661            }
662        }
663        let Some(prior_state) = acquired_from else {
664            // Someone else already holds the gate (Loading /
665            // Rebuilding) OR raced us into Loaded. Cache-read and
666            // return if Loaded, else surface a transient error.
667            let current = ws.load_state();
668            if current == WorkspaceState::Loaded {
669                ws.touch();
670                return Ok(ws.graph.load_full());
671            }
672            return Err(DaemonError::WorkspaceBuildFailed {
673                root: key.source_root.clone(),
674                reason: format!("workspace load already in progress ({current})"),
675            });
676        };
677        // We own the gate. Clear the cancellation flag AFTER the
678        // CAS, but interpret a pre-cleared `cancelled = true`
679        // differently depending on the prior state we won from:
680        //
681        // - Prior = `Evicted`: STEP_6 iter-2. LRU eviction
682        //   completed on this entry (workspaces.write() was held
683        //   across both the `cancelled.store(true)` and the
684        //   `state.store(Evicted)` in `execute_eviction`). The
685        //   cancelled flag is a stale residue of that completed
686        //   eviction; this `get_or_load` is a fresh reload and
687        //   must clear cancelled unconditionally.
688        // - Prior = `Unloaded` / `Failed`: a concurrent eviction
689        //   is racing us. The flag is a live cancel signal — the
690        //   eviction reached `cancelled.store(true)` before our
691        //   CAS but the state had not yet been moved to
692        //   `Evicted`. Honour the cancel and fail this load.
693        let pre_cancelled = ws.rebuild_cancelled.swap(false, Ordering::AcqRel);
694        if pre_cancelled && prior_state != WorkspaceState::Evicted {
695            // Evict raced us out of the allowed-state list. Put
696            // the cancelled flag back, transition to Failed (so
697            // this caller's LoadingGuard doesn't fire), and fail.
698            ws.rebuild_cancelled.store(true, Ordering::Release);
699            ws.store_state(WorkspaceState::Failed);
700            return Err(DaemonError::WorkspaceBuildFailed {
701                root: key.source_root.clone(),
702                reason: "workspace evicted mid-load".to_string(),
703            });
704        }
705
706        // --- Step 3: arm LoadingGuard for panic / early-return --
707        let mut loading = LoadingGuard {
708            ws: &ws,
709            key,
710            armed: true,
711        };
712
713        // --- Step 4: reserve admission headroom ------------------
714        let reservation = self.reserve_rebuild(key, working_set_estimate)?;
715
716        // --- Step 5: build the graph ----------------------------
717        let graph = match builder.build(&key.source_root) {
718            Ok(g) => g,
719            Err(err) => {
720                drop(reservation);
721                // The LoadingGuard will flip us to Failed + record
722                // a synthetic error; overwrite with the builder's
723                // real error for diagnostic fidelity.
724                ws.record_failure(clone_err(&err));
725                loading.armed = false;
726                ws.store_state(WorkspaceState::Failed);
727                return Err(err);
728            }
729        };
730
731        // --- Step 6+7: atomic re-check + publish -------------
732        //
733        // Hold `workspaces.read()` across the final cancellation
734        // / map-membership re-check AND the `publish_and_retain`
735        // call. `execute_eviction` holds `workspaces.write()` for
736        // the duration of every eviction, so the RwLock makes the
737        // publish critical section atomic with respect to
738        // eviction: either eviction has fully completed (the map
739        // lookup fails), or eviction has not started (and cannot
740        // start while we hold the read lock).
741        //
742        // Lock order per plan §J.4: `workspaces → admission`.
743        // `publish_and_retain` takes `admission` internally;
744        // that nests under our `workspaces.read()` correctly.
745        //
746        // Codex Task 6 Phase 6b iter-2 MAJOR: the iter-1 version
747        // released `workspaces.read()` after the map-membership
748        // check and then called `publish_and_retain` unlocked.
749        // Eviction could slip in between the two, satisfying
750        // both re-checks yet still reaching `remove(key)` after
751        // our publish. Holding the read lock across the publish
752        // closes the window.
753        let workspaces_guard = self.workspaces.read();
754
755        // Cancellation check INSIDE the read lock. If cancellation
756        // was set before we grabbed the lock, we still observe it;
757        // if it's set after we release, a future load will see it.
758        if ws.rebuild_cancelled.load(Ordering::Acquire) {
759            drop(workspaces_guard);
760            drop(reservation);
761            ws.record_failure(DaemonError::WorkspaceBuildFailed {
762                root: key.source_root.clone(),
763                reason: "workspace evicted mid-load".to_string(),
764            });
765            loading.armed = false;
766            ws.store_state(WorkspaceState::Failed);
767            return Err(DaemonError::WorkspaceBuildFailed {
768                root: key.source_root.clone(),
769                reason: "workspace evicted mid-load".to_string(),
770            });
771        }
772        if !workspaces_guard.contains_key(key) {
773            drop(workspaces_guard);
774            drop(reservation);
775            ws.record_failure(DaemonError::WorkspaceBuildFailed {
776                root: key.source_root.clone(),
777                reason: "workspace removed mid-load".to_string(),
778            });
779            loading.armed = false;
780            ws.store_state(WorkspaceState::Failed);
781            return Err(DaemonError::WorkspaceBuildFailed {
782                root: key.source_root.clone(),
783                reason: "workspace removed mid-load".to_string(),
784            });
785        }
786
787        // Publish while still holding `workspaces.read()`. An
788        // eviction started in parallel is blocked on
789        // `workspaces.write()` and cannot observe / mutate this
790        // workspace until we release.
791        //
792        // Per Codex Task 6 Phase 6c iter-2 MAJOR: the hook dispatch
793        // is deliberately NOT performed inside `publish_and_retain`
794        // — firing it here would nest `self.hook.read()` under
795        // `workspaces.read()`, creating a re-entrancy deadlock for
796        // any hook impl that calls back into manager methods
797        // needing `workspaces.write()` (e.g. `unload`). The fix
798        // returns the published `Arc<CodeGraph>` from
799        // `publish_and_retain`, releases `workspaces_guard`, and
800        // THEN invokes `on_publish` under a disjoint short-lived
801        // `self.hook.read()` acquisition.
802        //
803        // `G_daemon_control_plane.md` §3.5 caller-migration table —
804        // get_or_load (production caller 1). On post-build oversize,
805        // surface `DaemonError::WorkspaceOversize`; admission bytes
806        // are refunded by the reservation's RAII Drop on early
807        // return.
808        let (_token, published_arc) = match self.publish_and_retain(reservation, &ws, graph) {
809            Ok((token, arc)) => (token, arc),
810            Err(e) => {
811                drop(workspaces_guard);
812                ws.record_failure(clone_err(&e));
813                loading.armed = false;
814                ws.store_state(WorkspaceState::Failed);
815                return Err(e);
816            }
817        };
818        ws.record_success(std::time::SystemTime::now());
819        ws.store_state(WorkspaceState::Loaded);
820        ws.touch();
821        loading.armed = false;
822        drop(workspaces_guard);
823
824        // Hook fires OUTSIDE every outer lock. The only lock taken
825        // here is `self.hook.read()` (for the brief clone inside
826        // `hook_snapshot`). A hook impl is now free to call any
827        // manager method — including `unload`, which needs
828        // `workspaces.write()` — without deadlocking against the
829        // loader that fired it. The dispatch itself is synchronous
830        // but spawn-only: hook impls are expected to return
831        // immediately after scheduling background work.
832        let hook = self.hook_snapshot();
833        hook.on_publish(&key.source_root, Arc::clone(&published_arc));
834
835        Ok(published_arc)
836    }
837
838    /// Look up or insert a [`LoadedWorkspace`] for `key`. Returns
839    /// the shared `Arc` so both the caller and the manager map
840    /// reference the same state.
841    fn get_or_insert_workspace(&self, key: &WorkspaceKey) -> Arc<LoadedWorkspace> {
842        // Upgrade path — try a read first to avoid the write-lock
843        // cost when the entry already exists.
844        if let Some(ws) = self.workspaces.read().get(key) {
845            return Arc::clone(ws);
846        }
847        let mut workspaces = self.workspaces.write();
848        Arc::clone(
849            workspaces
850                .entry(key.clone())
851                .or_insert_with(|| Arc::new(LoadedWorkspace::new(key.clone(), false))),
852        )
853    }
854
855    /// Evict the least-recently-accessed non-pinned workspace, if
856    /// any. Returns the evicted key on success, `None` if there are
857    /// no eligible candidates.
858    pub fn evict_lru(&self) -> Option<WorkspaceKey> {
859        let candidate = {
860            let workspaces = self.workspaces.read();
861            workspaces
862                .iter()
863                .filter(|(_, ws)| {
864                    !ws.pinned
865                        && ws.load_state() != WorkspaceState::Evicted
866                        && ws.load_state() != WorkspaceState::Unloaded
867                })
868                .min_by_key(|(_, ws)| *ws.last_accessed.read())
869                .map(|(k, _)| k.clone())
870        };
871        if let Some(key) = &candidate {
872            self.execute_eviction(key);
873        }
874        candidate
875    }
876
877    /// Explicitly unload a workspace. Drives a full eviction
878    /// (releases graph data + admission accounting via
879    /// [`Self::evict_to_tombstone_locked`]) **and** removes the
880    /// tombstone entry from the manager map atomically under a
881    /// single `workspaces.write()` critical section.
882    ///
883    /// This is the only path that removes the map entry. LRU
884    /// eviction (`evict_lru`, `reserve_rebuild`'s Phase 2) leaves
885    /// the tombstone in place so per-source-root partial-eviction
886    /// state stays observable through `daemon/workspaceStatus` —
887    /// see [`Self::execute_eviction`] doc and STEP_6 iter-1 BLOCK.
888    ///
889    /// Returns `true` if the workspace was present, `false` if it
890    /// was already absent.
891    pub fn unload(&self, key: &WorkspaceKey) -> bool {
892        let mut workspaces = self.workspaces.write();
893        if !workspaces.contains_key(key) {
894            return false;
895        }
896        // Drop graph + admission bytes under the same write lock
897        // we will use for `remove`. Holding the lock across both
898        // operations means external observers see EITHER "entry
899        // present + Loaded" OR "entry absent" — never the "entry
900        // present + Evicted but about to be removed" intermediate
901        // state. (LRU eviction is a separate flow that DOES expose
902        // the Evicted tombstone — that is the STEP_6 contract.)
903        self.evict_to_tombstone_locked(&mut workspaces, key);
904        workspaces.remove(key);
905        true
906    }
907
908    /// Helper: run the eviction body (steps 1–4 of
909    /// [`Self::execute_eviction`]) with the caller's
910    /// `workspaces.write()` guard already held. Used by
911    /// [`Self::unload`] so unloading remains atomic — no observer
912    /// sees the `Evicted`-but-still-in-map intermediate window.
913    ///
914    /// Re-eviction safety mirrors `execute_eviction` — an entry
915    /// already in `Evicted` is left alone.
916    fn evict_to_tombstone_locked(
917        &self,
918        workspaces: &mut HashMap<WorkspaceKey, Arc<LoadedWorkspace>>,
919        key: &WorkspaceKey,
920    ) {
921        let Some(ws) = workspaces.get(key).cloned() else {
922            return;
923        };
924        if ws.load_state() == WorkspaceState::Evicted {
925            return;
926        }
927
928        let old_arc = ws.graph.swap(Arc::new(CodeGraph::new()));
929        let prior_bytes_usize = ws.memory_bytes.swap(0, Ordering::AcqRel);
930        let prior_bytes = prior_bytes_usize as u64;
931
932        let token = OldGraphToken::new();
933        {
934            let mut state = self.admission.lock();
935            state.loaded_bytes = state.loaded_bytes.saturating_sub(prior_bytes);
936            state.retained_old.insert(
937                token,
938                RetainedEntry {
939                    bytes: prior_bytes,
940                    graph: old_arc,
941                    published_at: Instant::now(),
942                    warned_past_timeout: false,
943                },
944            );
945            self.bump_high_water(&state);
946        }
947
948        ws.rebuild_cancelled.store(true, Ordering::Release);
949        ws.store_state(WorkspaceState::Evicted);
950    }
951
952    /// Cluster-G §3.2 — reset a workspace to `Unloaded` *without*
953    /// removing its manager-map entry.
954    ///
955    /// Drops the in-memory graph + admission bytes + retained
956    /// old-graph entries owned by this workspace, but preserves the
957    /// `WorkspaceKey`, `pinned` bit, and `last_error`. Files under
958    /// `<root>/.sqry/` are left untouched — destructive cleanup is
959    /// owned by `sqry workspace clean` (cluster-E IMP-E.4).
960    ///
961    /// Returns `Ok(true)` if the workspace was present and reset,
962    /// `Ok(false)` if not present.
963    ///
964    /// State transitions:
965    ///   `Loaded` / `Failed` / `Evicted` / `Unloaded` → `Unloaded`
966    ///   `Rebuilding` → cancellation dispatched, [`Err(ResetCancellationDispatched)`]
967    ///   `Loading`    → [`Err(ResetWhileLoading)`]
968    ///
969    /// `pinned` workspaces require `force = true` to reset; without
970    /// it, [`Err(WorkspacePinned)`] is returned.
971    ///
972    /// # Errors
973    ///
974    /// - [`DaemonError::WorkspacePinned`] when the workspace is pinned
975    ///   and `force = false`.
976    /// - [`DaemonError::ResetWhileLoading`] when the workspace is
977    ///   currently loading (caller must wait or cancel via the
978    ///   existing `daemon/cancel_rebuild` path).
979    /// - [`DaemonError::ResetCancellationDispatched`] when a rebuild
980    ///   was in flight; the caller should retry after `retry_after_ms`.
981    pub fn reset(self: &Arc<Self>, key: &WorkspaceKey, force: bool) -> Result<bool, DaemonError> {
982        use crate::error::DaemonError;
983        let mut workspaces = self.workspaces.write();
984        let Some(ws) = workspaces.get(key).cloned() else {
985            return Ok(false);
986        };
987        if ws.pinned && !force {
988            return Err(DaemonError::WorkspacePinned {
989                root: key.source_root.clone(),
990            });
991        }
992        let current = ws.load_state();
993        match current {
994            WorkspaceState::Loading => {
995                return Err(DaemonError::ResetWhileLoading {
996                    root: key.source_root.clone(),
997                });
998            }
999            WorkspaceState::Rebuilding => {
1000                ws.rebuild_cancelled.store(true, Ordering::Release);
1001                drop(workspaces);
1002                return Err(DaemonError::ResetCancellationDispatched {
1003                    root: key.source_root.clone(),
1004                    retry_after_ms: 250,
1005                });
1006            }
1007            _ => {}
1008        }
1009        // Drop the graph + refund admission bytes via the existing
1010        // tombstone helper, then transition to `Unloaded` (preserving
1011        // the map entry, `pinned`, and `last_error`).
1012        self.evict_to_tombstone_locked(&mut workspaces, key);
1013        // Cluster-G iter-2 BLOCKER 1: `evict_to_tombstone_locked`
1014        // sets `rebuild_cancelled = true` (`manager.rs:948`). Without
1015        // clearing it here, the next `get_or_load` from `Unloaded`
1016        // hits the `pre_cancelled && prior_state != Evicted` branch
1017        // at `manager.rs:693-704` and fails with `WorkspaceBuildFailed`
1018        // ("workspace evicted mid-load") — `daemon reset` would be
1019        // unable to recover the workspace it just reset (codex iter-1
1020        // review). Clear the flag now so the next reload starts from
1021        // a clean cancellation state.
1022        ws.rebuild_cancelled.store(false, Ordering::Release);
1023        ws.store_state(WorkspaceState::Unloaded);
1024        Ok(true)
1025    }
1026
1027    /// Find a loaded workspace by its directory path.
1028    ///
1029    /// Linear scan over all registered workspaces comparing each workspace's
1030    /// `index_root` against `path`. Callers (e.g. `daemon/rebuild`) supply a
1031    /// canonicalised path but not the full [`WorkspaceKey`].
1032    /// O(n) in the number of loaded workspaces; in practice n is small.
1033    ///
1034    /// Returns `None` if no workspace with a matching root is found.
1035    #[must_use]
1036    pub fn find_key_and_workspace_by_path(
1037        &self,
1038        path: &std::path::Path,
1039    ) -> Option<(WorkspaceKey, Arc<LoadedWorkspace>)> {
1040        let workspaces = self.workspaces.read();
1041        workspaces
1042            .iter()
1043            .find(|(k, _)| k.source_root == path)
1044            .map(|(k, ws)| (k.clone(), Arc::clone(ws)))
1045    }
1046
1047    /// Snapshot of daemon-wide status. Point-in-time, non-transactional.
1048    pub fn status(&self) -> DaemonStatus {
1049        let workspaces_snapshot: Vec<WorkspaceStatus> = {
1050            let workspaces = self.workspaces.read();
1051            let mut entries: Vec<_> = workspaces
1052                .iter()
1053                .map(|(k, ws)| WorkspaceStatus {
1054                    index_root: k.source_root.clone(),
1055                    state: ws.load_state(),
1056                    pinned: ws.pinned,
1057                    current_bytes: ws.memory_bytes.load(Ordering::Acquire) as u64,
1058                    high_water_bytes: ws.memory_high_water_bytes.load(Ordering::Acquire) as u64,
1059                    last_good_at: *ws.last_good_at.read(),
1060                    last_error: ws.last_error.read().as_ref().map(|e| e.to_string()),
1061                    retry_count: ws.retry_count.load(Ordering::Acquire),
1062                    // STEP_12 telemetry: surface both display and machine
1063                    // identity hex forms when the key carries a logical
1064                    // workspace_id; anonymous keys leave both as None so
1065                    // the wire shape is uniform.
1066                    workspace_id_short: k.workspace_id.as_ref().map(|id| id.as_short_hex()),
1067                    workspace_id_full: k.workspace_id.as_ref().map(|id| id.as_full_hex()),
1068                })
1069                .collect();
1070            entries.sort_by(|a, b| a.index_root.cmp(&b.index_root));
1071            entries
1072        };
1073
1074        let (current_bytes, reserved_bytes, high_water_bytes) = {
1075            let state = self.admission.lock();
1076            let current = state.total_committed_bytes();
1077            let reserved = state.reserved_bytes;
1078            // Bump high-water here in case the status read saw a
1079            // higher value than the last mutation captured. The
1080            // `drop(state)` at the end of this block keeps the
1081            // admission lock held across the `fetch_max` — serialising
1082            // the high-water update with any concurrent publish.
1083            let peak = self
1084                .total_memory_high_water
1085                .fetch_max(current, Ordering::AcqRel);
1086            let peak = peak.max(current);
1087            drop(state);
1088            (current, reserved, peak)
1089        };
1090
1091        DaemonStatus {
1092            uptime_seconds: self.started_at.elapsed().as_secs(),
1093            daemon_version: env!("CARGO_PKG_VERSION").to_string(),
1094            memory: MemoryStatus {
1095                limit_bytes: self.memory_limit_bytes(),
1096                current_bytes,
1097                reserved_bytes,
1098                high_water_bytes,
1099            },
1100            workspaces: workspaces_snapshot,
1101        }
1102    }
1103
1104    /// Enumerate the `.sqry/graph` directories belonging to every
1105    /// workspace currently in `state ∈ {Loading, Loaded, Rebuilding}`.
1106    ///
1107    /// This is the data source for the `daemon/active-artifacts`
1108    /// IPC method (per `00_contracts.md` §3.CC-4 + `E_p1_cluster.md`
1109    /// §E.4 DPG hand-off). The returned paths are absolute, in stable
1110    /// `WorkspaceKey::source_root` order, and include only the
1111    /// concrete `.sqry/graph` subdirectory — `<source_root>/.sqry/graph`
1112    /// — because that is the path `sqry workspace clean` discovers
1113    /// when it walks for stale artifacts.
1114    ///
1115    /// Read-only, concurrent-safe: takes `self.workspaces.read()`
1116    /// for the duration of the iteration; the caller is expected to
1117    /// honour the 250 ms response budget so the read lock does not
1118    /// stall a concurrent admission write.
1119    ///
1120    /// `Unloaded`, `Evicted`, and `Failed` states are deliberately
1121    /// excluded — those workspaces are not "live" artifacts and may
1122    /// be safely cleaned by the operator.
1123    #[must_use]
1124    pub fn active_artifact_dirs(&self) -> Vec<std::path::PathBuf> {
1125        use sqry_daemon_protocol::WorkspaceState;
1126
1127        let workspaces = self.workspaces.read();
1128        let mut out: Vec<std::path::PathBuf> = workspaces
1129            .iter()
1130            .filter_map(|(key, ws)| {
1131                let state = ws.load_state();
1132                let live = matches!(
1133                    state,
1134                    WorkspaceState::Loading | WorkspaceState::Loaded | WorkspaceState::Rebuilding
1135                );
1136                if live {
1137                    Some(key.source_root.join(".sqry").join("graph"))
1138                } else {
1139                    None
1140                }
1141            })
1142            .collect();
1143        out.sort();
1144        out
1145    }
1146
1147    /// Aggregate `daemon/workspaceStatus` snapshot for a single
1148    /// `workspace_id` (STEP_6 of the workspace-aware-cross-repo plan).
1149    ///
1150    /// Walks the manager's workspace map, collects every
1151    /// [`WorkspaceKey`] whose `workspace_id == Some(target_id)`, and
1152    /// renders a deterministic per-source-root rollup. Per-source-root
1153    /// LRU eviction means individual entries can carry
1154    /// [`WorkspaceState::Evicted`] while siblings remain
1155    /// [`WorkspaceState::Loaded`] — the aggregate exposes that
1156    /// "partially evicted" shape unchanged via
1157    /// [`sqry_daemon_protocol::WorkspaceIndexStatus::partially_evicted`].
1158    ///
1159    /// Returns `None` when no entry in the map carries the requested
1160    /// `workspace_id`. The IPC layer surfaces that as
1161    /// `DaemonError::WorkspaceNotLoaded`; the manager itself does not
1162    /// classify "no entries" as an error so callers can distinguish a
1163    /// genuinely absent grouping from an empty workspace.
1164    #[must_use]
1165    pub fn workspace_index_status(
1166        &self,
1167        target_id: &sqry_daemon_protocol::WorkspaceId,
1168    ) -> Option<sqry_daemon_protocol::WorkspaceIndexStatus> {
1169        let workspaces = self.workspaces.read();
1170        let mut rows: Vec<sqry_daemon_protocol::WorkspaceSourceRootStatus> = workspaces
1171            .iter()
1172            .filter_map(|(k, ws)| {
1173                k.workspace_id
1174                    .as_ref()
1175                    .filter(|id| *id == target_id)
1176                    .map(|_| sqry_daemon_protocol::WorkspaceSourceRootStatus {
1177                        source_root: k.source_root.clone(),
1178                        state: ws.load_state(),
1179                        current_bytes: ws.memory_bytes.load(Ordering::Acquire) as u64,
1180                        // STEP_11_4 — probe `<source_root>/.sqry/classpath/`
1181                        // for presence. Status path; never blocks on
1182                        // anything heavier than `fs::metadata`. Probe
1183                        // failures (permission denied, racy unlink, …)
1184                        // collapse to `false`; the LSP-side
1185                        // `WorkspaceIndexStatus.warnings` channel surfaces
1186                        // the underlying error detail when the daemon's
1187                        // workspace builder hits the same probe.
1188                        classpath_present: probe_classpath_present(&k.source_root),
1189                    })
1190            })
1191            .collect();
1192        if rows.is_empty() {
1193            return None;
1194        }
1195        rows.sort_by(|a, b| a.source_root.cmp(&b.source_root));
1196        Some(sqry_daemon_protocol::WorkspaceIndexStatus {
1197            workspace_id: *target_id,
1198            // STEP_12 — derive the hex display strings here so JSON
1199            // consumers (`sqry daemon status --json`, MCP redaction,
1200            // CI scripts) never have to re-encode the 32-byte digest
1201            // themselves. The two strings are byte-derivative of
1202            // `workspace_id`; they do not introduce a new identity
1203            // axis.
1204            workspace_id_short: target_id.as_short_hex(),
1205            workspace_id_full: target_id.as_full_hex(),
1206            source_roots: rows,
1207        })
1208    }
1209
1210    /// Bump the daemon-wide high-water mark using the current
1211    /// `AdmissionState`. Must be called with `admission` held.
1212    fn bump_high_water(&self, state: &AdmissionState) {
1213        let current = state.total_committed_bytes();
1214        self.total_memory_high_water
1215            .fetch_max(current, Ordering::AcqRel);
1216    }
1217
1218    /// Test-only helper: insert a `LoadedWorkspace` into the manager
1219    /// map in a specific state, bypassing `get_or_load`. Used by
1220    /// `classify_for_serve` integration tests that need to observe
1221    /// the `Unloaded` / `Loading` arms (both states are transient
1222    /// during the normal load path).
1223    ///
1224    /// `#[doc(hidden)]` to signal "test affordance only" — same
1225    /// pattern as [`crate::TestGate`] / [`crate::TestCapture`].
1226    /// Production code should not call this.
1227    #[doc(hidden)]
1228    pub fn insert_workspace_in_state_for_test(&self, key: WorkspaceKey, state: WorkspaceState) {
1229        let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
1230        ws.store_state(state);
1231        self.workspaces.write().insert(key, ws);
1232    }
1233
1234    /// Test-only helper: insert a `LoadedWorkspace` into the manager
1235    /// map with explicit state, pinning, and pre-set `memory_bytes`.
1236    /// STEP_6 LRU + workspace-aggregate tests use this to exercise
1237    /// per-source-root eviction without spinning up a full
1238    /// `RealWorkspaceBuilder` pipeline. Returns the inserted Arc so
1239    /// the caller can keep observing it (e.g. to assert `load_state`
1240    /// after a follow-up mutation).
1241    ///
1242    /// `#[doc(hidden)]` to signal "test affordance only".
1243    #[doc(hidden)]
1244    pub fn insert_workspace_for_test_with_bytes(
1245        &self,
1246        key: WorkspaceKey,
1247        state: WorkspaceState,
1248        pinned: bool,
1249        bytes: usize,
1250    ) -> Arc<LoadedWorkspace> {
1251        let ws = Arc::new(LoadedWorkspace::new(key.clone(), pinned));
1252        ws.store_state(state);
1253        ws.update_memory(bytes);
1254        self.workspaces.write().insert(key, Arc::clone(&ws));
1255        ws
1256    }
1257
1258    /// Acquire the internal `workspaces` RwLock in read mode.
1259    ///
1260    /// Task 7 Phase 7c: exposed so
1261    /// [`crate::RebuildDispatcher::execute_one_rebuild`] can hold the
1262    /// read lock across its cancel/membership re-check and
1263    /// [`Self::publish_and_retain`], matching the pattern in
1264    /// [`Self::get_or_load`] (Codex Task 6 Phase 6b iter-2 MAJOR — the
1265    /// publish critical section MUST exclude concurrent
1266    /// [`Self::execute_eviction`] on the same key to avoid
1267    /// orphaned-publish / admission-drift).
1268    ///
1269    /// Callers MUST respect lock order §J.4: acquire `workspaces`
1270    /// BEFORE `admission`. The returned guard is released when the
1271    /// caller drops it.
1272    ///
1273    /// `pub(crate)` (iter-2 design Codex MAJOR): the accessor is only
1274    /// used within the daemon crate; exposing it publicly would leak
1275    /// lock mechanics and broaden the blast radius for future callers
1276    /// that might violate the §J.4 discipline.
1277    pub(crate) fn workspaces_read(
1278        &self,
1279    ) -> parking_lot::RwLockReadGuard<'_, HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
1280        self.workspaces.read()
1281    }
1282
1283    /// Classify a workspace's readiness to serve a query.
1284    ///
1285    /// Task 7 Phase 7c. Used by the Task 8 IPC router on every query
1286    /// dispatch. Pure-read: no mutations, no `.await` (sync).
1287    ///
1288    /// # Returns
1289    ///
1290    /// | Workspace state | Map present | Result |
1291    /// |-----------------|-------------|--------|
1292    /// | `Loaded` or `Rebuilding` | yes | `Ok(ServeVerdict::Fresh { graph, state })` |
1293    /// | `Failed`, age < cap (or cap == 0) | yes | `Ok(ServeVerdict::Stale { graph, age_hours, last_good_at, last_error })` |
1294    /// | `Failed`, age >= cap | yes | `Err(WorkspaceStaleExpired { age_hours, cap_hours, last_good_at, last_error })` (→ JSON-RPC -32002) |
1295    /// | `Failed`, no prior good | yes | `Err(WorkspaceBuildFailed { reason })` (→ -32001) |
1296    /// | `Unloaded` or `Loading` | yes | `Ok(ServeVerdict::NotReady { state })` |
1297    /// | `Evicted` | yes (transient window) | `Err(WorkspaceEvicted)` (→ -32004) |
1298    /// | any | no | `Err(WorkspaceEvicted)` (→ -32004) |
1299    ///
1300    /// # Lock order
1301    ///
1302    /// Task 7 Phase 7c feat iter-1 Codex BLOCKER fix: takes
1303    /// `workspaces.read()` across the FULL snapshot — state, graph,
1304    /// last_good, and last_error_text are all captured inside the
1305    /// read critical section. Dropping the read lock before reading
1306    /// the graph would allow `execute_eviction` (which needs
1307    /// `workspaces.write()` for the full graph-swap + state-store +
1308    /// map-remove sequence) to interleave, surfacing the empty
1309    /// post-eviction placeholder graph as a `Fresh` verdict.
1310    ///
1311    /// Does not acquire `admission` or `rebuild_lane`; only
1312    /// `workspaces` + per-workspace field locks. §J.4 order preserved.
1313    ///
1314    /// # Errors
1315    ///
1316    /// Returns the variants listed in the table above.
1317    pub fn classify_for_serve(
1318        &self,
1319        key: &WorkspaceKey,
1320        now: std::time::SystemTime,
1321    ) -> Result<ServeVerdict, DaemonError> {
1322        // Task 7 Phase 7c — feat iter-0 Codex BLOCKER fix: the
1323        // previous iter-0 implementation cloned the workspace Arc and
1324        // dropped `workspaces.read()` BEFORE reading state and graph.
1325        // `execute_eviction` (see Self::execute_eviction at line 494)
1326        // holds `workspaces.write()` across:
1327        //   - ws.graph.swap(CodeGraph::new())
1328        //   - admission accounting transfer
1329        //   - ws.rebuild_cancelled.store(true)
1330        //   - ws.store_state(WorkspaceState::Evicted)
1331        //   - workspaces.remove(key)
1332        //
1333        // Without the read-lock hold extending across graph capture,
1334        // a classifier could observe `state == Loaded` but fetch the
1335        // post-eviction empty placeholder graph, returning
1336        // `Fresh { graph: empty }` — a correctness bug.
1337        //
1338        // Iter-1: snapshot every field under the read lock. The
1339        // returned `Arc<CodeGraph>` is a strong reference independent
1340        // of the lock lifetime; dropping the lock after capture is
1341        // safe for the caller.
1342        //
1343        // `last_error` is captured as a display-string (the error
1344        // type is not Clone; see `clone_err` rationale) because
1345        // `NoPriorGood` returns a `WorkspaceBuildFailed { reason }`
1346        // that embeds the stringified prior error.
1347        let snapshot = {
1348            let workspaces = self.workspaces.read();
1349            let Some(ws) = workspaces.get(key).cloned() else {
1350                return Err(DaemonError::WorkspaceEvicted {
1351                    root: key.source_root.clone(),
1352                });
1353            };
1354            let state = ws.load_state();
1355            let graph = ws.graph.load_full();
1356            let last_good = *ws.last_good_at.read();
1357            let last_error_text = ws.last_error.read().as_ref().map(|e| e.to_string());
1358            (state, graph, last_good, last_error_text)
1359            // workspaces.read() dropped here — the (state, graph)
1360            // pair is now a coherent snapshot taken atomically w.r.t.
1361            // execute_eviction's workspaces.write().
1362        };
1363        let (state, graph, last_good, last_error_text) = snapshot;
1364
1365        match state {
1366            WorkspaceState::Loaded | WorkspaceState::Rebuilding => {
1367                Ok(ServeVerdict::Fresh { graph, state })
1368            }
1369            WorkspaceState::Failed => {
1370                let cap = self.config.stale_serve_max_age_hours;
1371                match classify_staleness(last_good, cap, now) {
1372                    StalenessVerdict::NoPriorGood => Err(DaemonError::WorkspaceBuildFailed {
1373                        root: key.source_root.clone(),
1374                        reason: last_error_text
1375                            .unwrap_or_else(|| "no prior successful build".into()),
1376                    }),
1377                    StalenessVerdict::Stale { age_hours } => Ok(ServeVerdict::Stale {
1378                        graph,
1379                        age_hours,
1380                        // Invariant: `classify_staleness` only returns
1381                        // `Stale` when `last_good.is_some()` (see
1382                        // `workspace/staleness.rs:54-73`).
1383                        last_good_at: last_good
1384                            .expect("Stale verdict only emitted when last_good.is_some()"),
1385                        last_error: last_error_text,
1386                    }),
1387                    StalenessVerdict::Expired { age_hours } => {
1388                        Err(DaemonError::WorkspaceStaleExpired {
1389                            root: key.source_root.clone(),
1390                            age_hours,
1391                            cap_hours: cap,
1392                            last_good_at: last_good,
1393                            last_error: last_error_text,
1394                        })
1395                    }
1396                }
1397            }
1398            WorkspaceState::Unloaded | WorkspaceState::Loading => {
1399                Ok(ServeVerdict::NotReady { state })
1400            }
1401            // Transient window between store_state(Evicted) and
1402            // workspaces.remove; same semantics as map-absent.
1403            WorkspaceState::Evicted => Err(DaemonError::WorkspaceEvicted {
1404                root: key.source_root.clone(),
1405            }),
1406        }
1407    }
1408
1409    /// Consume a [`RebuildReservation`] plus a freshly-built
1410    /// [`CodeGraph`] and atomically publish it to the workspace.
1411    ///
1412    /// Implements Amendment 2 §G.2:
1413    ///
1414    /// - Captures the prior `Arc<CodeGraph>` and `memory_bytes` into
1415    ///   a [`RollbackGuard`] **before** any swap — so a panic at any
1416    ///   point before the admission update reverts cleanly.
1417    /// - Swaps the `ArcSwap<CodeGraph>` to the new graph.
1418    /// - Swaps the per-workspace `memory_bytes` to the new size.
1419    /// - Under the admission mutex: moves `bytes_delta` from
1420    ///   `reserved_bytes` into `loaded_bytes`, inserts a
1421    ///   [`RetainedEntry`] holding the old `Arc` until the retention
1422    ///   reaper frees it.
1423    /// - Disarms the [`RollbackGuard`] on success.
1424    ///
1425    /// Sync `fn`. There is no `.await` between the first swap and the
1426    /// admission insert — tokio task cancellation can only interrupt
1427    /// at `.await` points, so this sequence is atomic with respect
1428    /// to cancellation per §G.2.
1429    ///
1430    /// Returns the minted [`OldGraphToken`] for tracing / integration
1431    /// tests, together with an `Arc<CodeGraph>` handle to the freshly
1432    /// published graph. Per Codex Task 6 Phase 6c iter-2 MAJOR the
1433    /// post-publish `SqrydHook` dispatch is NOT performed here —
1434    /// firing `on_publish` under the `workspaces.read()` guard
1435    /// `get_or_load` holds across this call would nest
1436    /// `self.hook.read()` inside `workspaces`, giving hook impls a
1437    /// re-entrancy deadlock hole if they call back into manager
1438    /// methods needing `workspaces.write()`. The caller is
1439    /// responsible for dispatching the hook after dropping every
1440    /// outer workspaces-lock holder.
1441    pub fn publish_and_retain(
1442        self: &Arc<Self>,
1443        reservation: RebuildReservation,
1444        workspace: &LoadedWorkspace,
1445        new_graph: CodeGraph,
1446    ) -> Result<(OldGraphToken, Arc<CodeGraph>), DaemonError> {
1447        // Compute the new graph's heap bytes before handing it to the
1448        // ArcSwap — once published, a concurrent reader holds it
1449        // alive, and measuring after publish race-races with the
1450        // admission update.
1451        let new_bytes_usize = new_graph.heap_bytes();
1452        // `usize as u64` is a no-op on 64-bit and a widen on 32-bit.
1453        let new_bytes = new_bytes_usize as u64;
1454
1455        // Post-build oversize gate (`G_daemon_control_plane.md` §1.4
1456        // + `00_contracts.md` §3.CC-3 admission boundary). Reject
1457        // BEFORE any visibility mutation so a ground-truth-too-big
1458        // workspace can never enter the serve path. The reservation
1459        // drops on early return — bytes are refunded via RAII and no
1460        // `OldGraphToken` is allocated.
1461        //
1462        // Subtract the prior workspace bytes from the projected
1463        // total because we will REPLACE this workspace's contribution
1464        // (the swap below subtracts `prev_memory_bytes` from
1465        // `loaded_bytes` and adds `new_bytes`); only the delta from
1466        // the prior contribution counts against the cap, while
1467        // every other workspace's loaded contribution and any
1468        // retained-old bytes still count.
1469        let limit = self.memory_limit_bytes();
1470        let prior_workspace_bytes = workspace
1471            .memory_bytes
1472            .load(std::sync::atomic::Ordering::Acquire) as u64;
1473        let projected = {
1474            let state = self.admission.lock();
1475            state
1476                .loaded_bytes
1477                .saturating_sub(prior_workspace_bytes)
1478                .saturating_add(state.retained_total_bytes())
1479                .saturating_add(new_bytes)
1480        };
1481        if projected > limit {
1482            return Err(DaemonError::WorkspaceOversize {
1483                root: workspace.key.source_root.clone(),
1484                measured_bytes: new_bytes,
1485                limit_bytes: limit,
1486                current_loaded_bytes: projected.saturating_sub(new_bytes),
1487            });
1488        }
1489
1490        // Take the reservation by value so this function owns it and
1491        // the Drop impl fires on any unwind path. `released` stays
1492        // `false` until *after* the admission commit succeeds, so a
1493        // panic before or during the admission mutex section refunds
1494        // `reserved_bytes` back to the pool (Codex Task 6 Phase 6a
1495        // iter-1 MAJOR: the previous ordering disarmed before the
1496        // commit and could leak reserved bytes on unwind).
1497        let mut reservation = reservation;
1498        let reservation_bytes = reservation.bytes;
1499
1500        let new_arc = Arc::new(new_graph);
1501        // Clone the Arc BEFORE the swap so the caller can still
1502        // obtain a handle to the published graph after the swap
1503        // moves `new_arc` into the ArcSwap. Re-reading via
1504        // `workspace.graph.load_full()` after the swap would work
1505        // today but is racy against any future swap path that
1506        // could run between the swap and the load — cheaper and
1507        // safer to clone the Arc once.
1508        let published_arc = Arc::clone(&new_arc);
1509        let token = OldGraphToken::new();
1510
1511        // --- RollbackGuard setup --------------------------------
1512        let prior_arc_for_rollback = workspace.graph.load_full();
1513        let prior_bytes = workspace
1514            .memory_bytes
1515            .load(std::sync::atomic::Ordering::Acquire);
1516
1517        let mut rollback = RollbackGuard {
1518            ws: workspace,
1519            prior_arc: Some(prior_arc_for_rollback),
1520            prior_bytes,
1521            armed: true,
1522        };
1523
1524        // --- Non-recoverable zone (no .await; no fallible ops) ---
1525        //
1526        // If any code between this point and `reservation.released = true`
1527        // panics, the following Drop order runs on unwind:
1528        //   1. `rollback` Drop reverts `workspace.graph` and
1529        //      `workspace.memory_bytes` to the pre-swap values
1530        //      (because `armed == true`).
1531        //   2. `reservation` Drop reacquires the admission mutex and
1532        //      refunds `reservation_bytes` back to `reserved_bytes`
1533        //      (because `released == false`).
1534        // This is the §G.5 invariant-preserving rollback described in
1535        // the plan; the reservation refund was missing before the
1536        // iter-1 fix.
1537        let old_arc = workspace.graph.swap(new_arc);
1538        let prev_memory_bytes = workspace.update_memory(new_bytes_usize);
1539        debug_assert_eq!(
1540            prev_memory_bytes, prior_bytes,
1541            "RollbackGuard prior_bytes must match update_memory's returned prior",
1542        );
1543
1544        // --- Admission commit (mutex-only; no other locks) -------
1545        //
1546        // The critical section is ordered so the only *fallible* op —
1547        // `HashMap::insert`, which can allocate on grow and therefore
1548        // panic — runs FIRST, before any admission counter is mutated
1549        // and before the reservation is disarmed. Everything that
1550        // follows (`saturating_*` arithmetic + `reservation.released
1551        // = true`) is guaranteed infallible, so once we reach those
1552        // lines the critical section cannot unwind mid-way and leave
1553        // admission state inconsistent.
1554        //
1555        // Codex Task 6 Phase 6a iter-2 MAJOR: the iter-1 ordering
1556        // disarmed the reservation before `retained_old.insert`
1557        // completed. A panic from the insert would leave
1558        // `reserved_bytes` drained and `loaded_bytes` updated while
1559        // no retained entry existed — rollback reverts ws.graph +
1560        // ws.memory_bytes but cannot refund the reservation
1561        // (released=true). The fix moves insert to the front of the
1562        // section so any unwind preserves the §G.5 invariant.
1563        //
1564        // Pre-build the `RetainedEntry` outside the lock so only the
1565        // `HashMap::insert` itself can allocate; the struct
1566        // construction is a field-by-field move.
1567        let retained_entry = RetainedEntry {
1568            bytes: prev_memory_bytes as u64,
1569            graph: old_arc,
1570            published_at: Instant::now(),
1571            warned_past_timeout: false,
1572        };
1573
1574        let mut state = self.admission.lock();
1575
1576        // Step 1 — fallible. `HashMap::insert` may reallocate; if it
1577        // panics the state is left unchanged (hashbrown's insert is
1578        // exception-safe: a failed grow leaves the map in its prior
1579        // capacity and does not insert the new entry). Unwind drops
1580        // `state` (releasing the mutex), then `rollback` reverts
1581        // ws.graph + ws.memory_bytes, then the `reservation`
1582        // (released=false) refunds `reservation_bytes` from
1583        // `reserved_bytes`. `loaded_bytes` is not mutated because
1584        // the lines below never run.
1585        state.retained_old.insert(token, retained_entry);
1586
1587        // Step 2 — infallible arithmetic (saturating ops on u64).
1588        // Move reservation → loaded. The prior workspace bytes are
1589        // already counted in `loaded_bytes` (they were added the
1590        // last time this workspace published). Swap by subtracting
1591        // the old and adding the new — keeps the §G.5 invariant
1592        // monotonic w.r.t. the commit.
1593        state.reserved_bytes = state.reserved_bytes.saturating_sub(reservation_bytes);
1594        state.loaded_bytes = state
1595            .loaded_bytes
1596            .saturating_sub(prev_memory_bytes as u64)
1597            .saturating_add(new_bytes);
1598
1599        // Step 3 — infallible disarm. The admission commit is
1600        // complete; the reservation's Drop is now a no-op so it
1601        // does not double-refund.
1602        reservation.released = true;
1603        self.bump_high_water(&state);
1604        drop(state);
1605
1606        rollback.armed = false; // disarm on success
1607
1608        // NOTE: `SqrydHook::on_publish` is NOT dispatched here.
1609        // `get_or_load` holds `workspaces.read()` across this call
1610        // (to make the re-check + publish critical section atomic
1611        // with respect to eviction, see that function's Step 6+7
1612        // comment block). Firing the hook here would acquire
1613        // `self.hook.read()` nested under `workspaces`, giving a
1614        // hook impl that calls back into manager methods needing
1615        // `workspaces.write()` (e.g. `unload`) a guaranteed
1616        // deadlock. The caller dispatches the hook after dropping
1617        // `workspaces_guard` — see `get_or_load` post-publish.
1618        //
1619        // `NoOpHook` remains the default; Task 9's daemon binary
1620        // installs the production `QueryDbHook` that wraps
1621        // `sqry_db::persistence::save_derived` with a timeout.
1622        Ok((token, published_arc))
1623    }
1624
1625    /// Release the reaper handle on Drop. Safe to call from any
1626    /// context — abort is a best-effort signal.
1627    fn shutdown_reaper(&self) {
1628        if let Some(handle) = self.reaper.lock().take() {
1629            handle.abort();
1630        }
1631    }
1632
1633    // ---------------------------------------------------------------------
1634    // SGA04 — Bounded read-only rehydrate after eviction
1635    // ---------------------------------------------------------------------
1636
1637    /// Read-only rehydrate of an existing persisted graph for `key`.
1638    ///
1639    /// Implements the daemon side of the bounded one-shot reload rule
1640    /// described in `docs/development/shared-graph-acquisition/02_DESIGN.md`.
1641    /// Used by [`crate::workspace::acquirer::DaemonGraphProvider`] when
1642    /// it observes [`DaemonError::WorkspaceEvicted`] (or an `Unloaded`
1643    /// state) for a [`AcquisitionOperation::ReadOnlyQuery`].
1644    ///
1645    /// Behaviour contract:
1646    ///
1647    /// 1. Drives the same lifecycle CAS gate as
1648    ///    [`Self::get_or_load`] — only one caller can rehydrate per
1649    ///    workspace at a time.
1650    /// 2. Reserves admission headroom via [`Self::reserve_rebuild`].
1651    /// 3. Calls [`WorkspaceBuilder::load_persisted`] to read
1652    ///    `<source_root>/.sqry/graph/snapshot.sqry`. Never calls
1653    ///    `WorkspaceBuilder::build`, never mutates `.sqry/graph/*`,
1654    ///    `.sqry/analysis/*`, or `derived.sqry`, and never invokes the
1655    ///    post-publish hook (the snapshot is bit-identical with what
1656    ///    the hook would produce — no fresh derived cache to warm).
1657    /// 4. Publishes through [`Self::publish_and_retain`] under the
1658    ///    standard `workspaces.read()` re-check + cancellation gate
1659    ///    so eviction races are caught the same way as `get_or_load`.
1660    ///
1661    /// `pub(crate)` because the entrypoint is internal to the daemon
1662    /// crate; SGA04's public surface is the
1663    /// [`crate::workspace::acquirer::DaemonGraphProvider`] adapter.
1664    ///
1665    /// # Errors
1666    ///
1667    /// Returns the same set of [`DaemonError`] variants as
1668    /// [`Self::get_or_load`]. The caller maps these into the shared
1669    /// [`sqry_core::graph::acquisition::GraphAcquisitionError`]
1670    /// taxonomy (typically [`GraphAcquisitionError::Evicted`] when the
1671    /// reload is the daemon-provider's bounded retry).
1672    ///
1673    /// [`AcquisitionOperation::ReadOnlyQuery`]: sqry_core::graph::acquisition::AcquisitionOperation::ReadOnlyQuery
1674    /// [`GraphAcquisitionError::Evicted`]: sqry_core::graph::acquisition::GraphAcquisitionError::Evicted
1675    pub(crate) fn reload_from_disk_read_only(
1676        self: &Arc<Self>,
1677        key: &WorkspaceKey,
1678        builder: &dyn WorkspaceBuilder,
1679        working_set_estimate: u64,
1680    ) -> Result<Arc<CodeGraph>, DaemonError> {
1681        // --- Step 1: cache-hit fast path ---------------------------
1682        {
1683            let workspaces = self.workspaces.read();
1684            if let Some(ws) = workspaces.get(key)
1685                && ws.load_state() == WorkspaceState::Loaded
1686            {
1687                ws.touch();
1688                return Ok(ws.graph.load_full());
1689            }
1690        }
1691
1692        // --- Step 2: lifecycle CAS gate (mirrors get_or_load) -------
1693        let ws = self.get_or_insert_workspace(key);
1694        let allowed = [
1695            WorkspaceState::Unloaded.as_u8(),
1696            WorkspaceState::Failed.as_u8(),
1697            WorkspaceState::Evicted.as_u8(),
1698        ];
1699        let mut acquired_from: Option<WorkspaceState> = None;
1700        for prior in allowed {
1701            if ws
1702                .state
1703                .compare_exchange(
1704                    prior,
1705                    WorkspaceState::Loading.as_u8(),
1706                    Ordering::AcqRel,
1707                    Ordering::Acquire,
1708                )
1709                .is_ok()
1710            {
1711                acquired_from = WorkspaceState::from_u8(prior);
1712                break;
1713            }
1714        }
1715        let Some(prior_state) = acquired_from else {
1716            let current = ws.load_state();
1717            if current == WorkspaceState::Loaded {
1718                ws.touch();
1719                return Ok(ws.graph.load_full());
1720            }
1721            return Err(DaemonError::WorkspaceBuildFailed {
1722                root: key.source_root.clone(),
1723                reason: format!("workspace load already in progress ({current})"),
1724            });
1725        };
1726
1727        let pre_cancelled = ws.rebuild_cancelled.swap(false, Ordering::AcqRel);
1728        if pre_cancelled && prior_state != WorkspaceState::Evicted {
1729            ws.rebuild_cancelled.store(true, Ordering::Release);
1730            ws.store_state(WorkspaceState::Failed);
1731            return Err(DaemonError::WorkspaceBuildFailed {
1732                root: key.source_root.clone(),
1733                reason: "workspace evicted mid-load".to_string(),
1734            });
1735        }
1736
1737        // --- Step 3: arm LoadingGuard for panic / early-return ----
1738        let mut loading = LoadingGuard {
1739            ws: &ws,
1740            key,
1741            armed: true,
1742        };
1743
1744        // --- Step 4: reserve admission headroom -------------------
1745        let reservation = self.reserve_rebuild(key, working_set_estimate)?;
1746
1747        // --- Step 5: load_persisted (read-only, no build pipeline)
1748        let graph = match builder.load_persisted(&key.source_root) {
1749            Ok(g) => g,
1750            Err(err) => {
1751                drop(reservation);
1752                ws.record_failure(clone_err(&err));
1753                loading.armed = false;
1754                ws.store_state(WorkspaceState::Failed);
1755                return Err(err);
1756            }
1757        };
1758
1759        // --- Step 6+7: atomic re-check + publish ------------------
1760        let workspaces_guard = self.workspaces.read();
1761        if ws.rebuild_cancelled.load(Ordering::Acquire) {
1762            drop(workspaces_guard);
1763            drop(reservation);
1764            ws.record_failure(DaemonError::WorkspaceBuildFailed {
1765                root: key.source_root.clone(),
1766                reason: "workspace evicted mid-reload".to_string(),
1767            });
1768            loading.armed = false;
1769            ws.store_state(WorkspaceState::Failed);
1770            return Err(DaemonError::WorkspaceBuildFailed {
1771                root: key.source_root.clone(),
1772                reason: "workspace evicted mid-reload".to_string(),
1773            });
1774        }
1775        if !workspaces_guard.contains_key(key) {
1776            drop(workspaces_guard);
1777            drop(reservation);
1778            ws.record_failure(DaemonError::WorkspaceBuildFailed {
1779                root: key.source_root.clone(),
1780                reason: "workspace removed mid-reload".to_string(),
1781            });
1782            loading.armed = false;
1783            ws.store_state(WorkspaceState::Failed);
1784            return Err(DaemonError::WorkspaceBuildFailed {
1785                root: key.source_root.clone(),
1786                reason: "workspace removed mid-reload".to_string(),
1787            });
1788        }
1789
1790        // `G_daemon_control_plane.md` §3.5 + §3.6 — read-only
1791        // reload exemption proof: in steady-state operation this
1792        // path cannot observe `WorkspaceOversize` because the
1793        // snapshot-on-disk was bounded by a prior successful
1794        // publish + the deserialization size cap. Defensive match
1795        // arm preserved so a contract violation surfaces as the
1796        // typed error rather than silently masquerading as a
1797        // success.
1798        let (_token, published_arc) = match self.publish_and_retain(reservation, &ws, graph) {
1799            Ok((token, arc)) => (token, arc),
1800            Err(e) => {
1801                drop(workspaces_guard);
1802                ws.record_failure(clone_err(&e));
1803                loading.armed = false;
1804                ws.store_state(WorkspaceState::Failed);
1805                return Err(e);
1806            }
1807        };
1808        ws.record_success(std::time::SystemTime::now());
1809        ws.store_state(WorkspaceState::Loaded);
1810        ws.touch();
1811        loading.armed = false;
1812        drop(workspaces_guard);
1813
1814        // No post-publish `SqrydHook::on_publish` dispatch on the
1815        // read-only reload path — the snapshot we just loaded is the
1816        // SAME bytes the hook would have re-serialised, so the derived
1817        // cache must already match it. Firing the hook here would be
1818        // redundant work (and on the spec contract: this path "must
1819        // not write any artifact").
1820
1821        Ok(published_arc)
1822    }
1823
1824    /// Test-only: synchronously evict `key` regardless of memory
1825    /// pressure.
1826    ///
1827    /// Used by SGA04 / SGA07 parity tests to drive a workspace from
1828    /// `Loaded` into `Evicted` deterministically (the production
1829    /// eviction paths are budget-driven and time-sensitive). Behaves
1830    /// exactly like the LRU eviction path: graph is swapped out, bytes
1831    /// move from `loaded_bytes` into `retained_old`, the entry stays
1832    /// in the manager map as a tombstone (matching STEP_6 partial
1833    /// eviction reporting).
1834    ///
1835    /// Returns `true` if the key was present and evicted, `false`
1836    /// otherwise.
1837    ///
1838    /// # Visibility
1839    ///
1840    /// Marked `#[doc(hidden)]` and named with the `_for_test` suffix
1841    /// to advertise "test affordance only" (matching
1842    /// [`Self::insert_workspace_in_state_for_test`] /
1843    /// [`crate::TestGate`] / [`crate::TestCapture`]). It is **not**
1844    /// re-exported through `sqry-daemon`'s public prelude
1845    /// (`pub use workspace::{...}` in `lib.rs` does not list it), so
1846    /// release / IPC / MCP / HTTP surfaces cannot reach it. Production
1847    /// code MUST NOT call this; the canonical eviction entrypoints
1848    /// remain [`Self::evict_lru`] and [`Self::unload`].
1849    ///
1850    /// # Visibility (SGA04 Gate-A blocker fix)
1851    ///
1852    /// Even though `lib.rs` does not re-export this method, it was
1853    /// previously declared `pub fn` on a `pub struct WorkspaceManager`,
1854    /// which means callers could reach it through any path that already
1855    /// holds a `&WorkspaceManager` — including any public re-export of
1856    /// the type. The Codex Gate-A review flagged this as a leak of a
1857    /// test-only hook into the release surface.
1858    ///
1859    /// The fix is a compile-time gate: the entire item is now
1860    /// `#[cfg(any(test, feature = "test-hooks"))]`, so default release
1861    /// builds (`cargo build -p sqry-daemon`) cannot see the symbol at
1862    /// all. SGA07 parity tests that live in the integration-test crate
1863    /// (`sqry-daemon/tests/`) opt in via
1864    /// `cargo test -p sqry-daemon --features test-hooks --tests`, while
1865    /// in-crate `#[cfg(test)] mod tests` blocks reach it through
1866    /// `cfg(test)`.
1867    #[cfg(any(test, feature = "test-hooks"))]
1868    #[doc(hidden)]
1869    pub fn evict_for_test(&self, key: &WorkspaceKey) -> bool {
1870        let present = self.workspaces.read().contains_key(key);
1871        if !present {
1872            return false;
1873        }
1874        self.execute_eviction(key);
1875        true
1876    }
1877}
1878
1879impl Drop for WorkspaceManager {
1880    fn drop(&mut self) {
1881        self.shutdown_reaper();
1882    }
1883}
1884
1885/// STEP_11_4 — probe `<source_root>/.sqry/classpath/` for presence at
1886/// `daemon/workspaceStatus` time.
1887///
1888/// Status path: cheap (`fs::metadata`), never blocks on anything
1889/// heavier, and degrades silently to `false` on any error so a racy
1890/// classpath unlink or a permission denial cannot fail the status
1891/// response. The LSP-side `WorkspaceIndexStatus.warnings` channel
1892/// surfaces the underlying error detail when the daemon's workspace
1893/// builder hits the same probe and wants to record the failure.
1894fn probe_classpath_present(source_root: &std::path::Path) -> bool {
1895    let probe = source_root.join(".sqry").join("classpath");
1896    std::fs::metadata(&probe)
1897        .map(|m| m.is_dir())
1898        .unwrap_or(false)
1899}
1900
1901// ---------------------------------------------------------------------------
1902// LoadingGuard (panic-safety for get_or_load)
1903// ---------------------------------------------------------------------------
1904
1905/// RAII guard that transitions the workspace into
1906/// [`WorkspaceState::Failed`] on any non-success exit from
1907/// [`WorkspaceManager::get_or_load`] — including panics.
1908///
1909/// Codex Task 6 Phase 6b iter-1 MAJOR: without this guard, a panic
1910/// in `builder.build()` would leave the workspace stuck in
1911/// `Loading` with `last_error = None`, permanently blocking
1912/// re-load attempts and corrupting status output.
1913///
1914/// The guard is armed until the final `loaded.armed = false` on
1915/// the success path (after publish succeeds). Every other exit
1916/// path — `Err` from admission, `Err` from builder, panic from
1917/// builder, early returns on the cancellation/map-membership
1918/// re-check — fires `Drop` with `armed == true` and performs the
1919/// Failed-state transition.
1920pub(crate) struct LoadingGuard<'a> {
1921    pub(crate) ws: &'a LoadedWorkspace,
1922    pub(crate) key: &'a WorkspaceKey,
1923    pub(crate) armed: bool,
1924}
1925
1926impl<'a> Drop for LoadingGuard<'a> {
1927    fn drop(&mut self) {
1928        if !self.armed {
1929            return;
1930        }
1931        // Only overwrite `last_error` if it hasn't been populated
1932        // with a more specific diagnostic by the explicit `Err`
1933        // branches above — those set last_error before `armed =
1934        // false`, so seeing None here means we are in the panic
1935        // window or an early-return path that did not record one.
1936        {
1937            let mut slot = self.ws.last_error.write();
1938            if slot.is_none() {
1939                *slot = Some(DaemonError::WorkspaceBuildFailed {
1940                    root: self.key.source_root.clone(),
1941                    reason: "workspace load aborted unexpectedly".to_string(),
1942                });
1943            }
1944        }
1945        self.ws.retry_count.fetch_add(1, Ordering::AcqRel);
1946        self.ws.store_state(WorkspaceState::Failed);
1947    }
1948}
1949
1950/// Clone a [`DaemonError`] for storage on [`LoadedWorkspace::last_error`]
1951/// or for propagation to `handle_changes` error returns in
1952/// [`crate::RebuildDispatcher::execute_one_rebuild`] (Task 7 Phase 7b1).
1953///
1954/// [`DaemonError`] is not `Clone` because some variants wrap
1955/// non-`Clone` types (notably [`std::io::Error`] and
1956/// [`anyhow::Error`]). `last_error` is a diagnostic surface only —
1957/// it is serialised as `e.to_string()` by the status endpoint — so
1958/// reducing the error to a textual form is the right trade-off here.
1959pub(crate) fn clone_err(err: &DaemonError) -> DaemonError {
1960    match err {
1961        DaemonError::WorkspaceBuildFailed { root, reason } => DaemonError::WorkspaceBuildFailed {
1962            root: root.clone(),
1963            reason: reason.clone(),
1964        },
1965        DaemonError::WorkspaceStaleExpired {
1966            root,
1967            age_hours,
1968            cap_hours,
1969            last_good_at,
1970            last_error,
1971        } => DaemonError::WorkspaceStaleExpired {
1972            root: root.clone(),
1973            age_hours: *age_hours,
1974            cap_hours: *cap_hours,
1975            // `SystemTime` is `Copy`; `Option<String>` needs `.clone()`.
1976            last_good_at: *last_good_at,
1977            last_error: last_error.clone(),
1978        },
1979        DaemonError::MemoryBudgetExceeded {
1980            limit_bytes,
1981            current_bytes,
1982            reserved_bytes,
1983            retained_bytes,
1984            requested_bytes,
1985        } => DaemonError::MemoryBudgetExceeded {
1986            limit_bytes: *limit_bytes,
1987            current_bytes: *current_bytes,
1988            reserved_bytes: *reserved_bytes,
1989            retained_bytes: *retained_bytes,
1990            requested_bytes: *requested_bytes,
1991        },
1992        DaemonError::WorkspaceEvicted { root } => {
1993            DaemonError::WorkspaceEvicted { root: root.clone() }
1994        }
1995        DaemonError::WorkspaceNotLoaded { root } => {
1996            DaemonError::WorkspaceNotLoaded { root: root.clone() }
1997        }
1998        // SGA04 Gate-A major #5 — round-trip the path-policy variant
1999        // distinctly. Collapsing it into `WorkspaceBuildFailed` would
2000        // re-introduce the exact bug Codex flagged.
2001        DaemonError::WorkspaceIncompatibleGraph { root, reason } => {
2002            DaemonError::WorkspaceIncompatibleGraph {
2003                root: root.clone(),
2004                reason: reason.clone(),
2005            }
2006        }
2007        // Task 8 Phase 8c U5 — tool-dispatch variants surfaced by
2008        // `tool_core::classify_and_execute` (Phase 8c U6). Each
2009        // variant must round-trip cleanly so `classify_for_serve`
2010        // reproduces the original typed error on every read path —
2011        // collapsing any of these into `WorkspaceBuildFailed` would
2012        // break the wire-contract codes registered in
2013        // [`crate::lib`] / the design doc §O.
2014        DaemonError::ToolTimeout {
2015            root,
2016            secs,
2017            deadline_ms,
2018        } => DaemonError::ToolTimeout {
2019            root: root.clone(),
2020            secs: *secs,
2021            deadline_ms: *deadline_ms,
2022        },
2023        DaemonError::InvalidArgument { reason } => DaemonError::InvalidArgument {
2024            reason: reason.clone(),
2025        },
2026        // Cluster-C iter-3: RpcError implements Clone, so this is a
2027        // direct deep copy.
2028        DaemonError::RpcErrorPreserved(rpc) => DaemonError::RpcErrorPreserved(rpc.clone()),
2029        DaemonError::Internal(err) => {
2030            // `anyhow::Error` is not `Clone`; re-create it from its
2031            // full-chain `Display` form (`{:#}`) so every layer of
2032            // the causal chain survives the round-trip. Callers only
2033            // read this via `to_string()` on the status endpoint, so
2034            // losing the typed causes (if any) is acceptable.
2035            DaemonError::Internal(anyhow::anyhow!("{err:#}"))
2036        }
2037        // Task 9 U1 — lifecycle variants (AlreadyRunning, AutoStartTimeout,
2038        // SignalSetup). These errors all fire before IpcServer::bind and
2039        // therefore before any workspace is registered; they should never
2040        // reach `clone_err`. If they somehow do (e.g. a future code path
2041        // stores them in `last_error`), collapse to WorkspaceBuildFailed so
2042        // the clone contract is preserved without losing observability.
2043        DaemonError::AlreadyRunning { socket, lock, .. } => DaemonError::WorkspaceBuildFailed {
2044            root: Path::new("<unknown>").to_path_buf(),
2045            reason: format!(
2046                "daemon already running on socket {} (lock: {})",
2047                socket.display(),
2048                lock.display()
2049            ),
2050        },
2051        DaemonError::AutoStartTimeout {
2052            timeout_secs,
2053            socket,
2054        } => DaemonError::WorkspaceBuildFailed {
2055            root: Path::new("<unknown>").to_path_buf(),
2056            reason: format!(
2057                "daemon did not become ready within {timeout_secs}s on socket {}",
2058                socket.display()
2059            ),
2060        },
2061        DaemonError::SignalSetup { source } => DaemonError::WorkspaceBuildFailed {
2062            root: Path::new("<unknown>").to_path_buf(),
2063            reason: format!("failed to install signal handlers: {source}"),
2064        },
2065        // sqry-mcp flakiness P0/P1 admission + recovery variants
2066        // (G_daemon_control_plane.md §1.4 / §3.2 / §5.2 +
2067        // B_cost_gate.md §3 + 00_contracts.md §3.CC-2 / §3.CC-4).
2068        // Each carries cheap Clone-able payload; round-trip in place.
2069        DaemonError::WorkspaceOversize {
2070            root,
2071            measured_bytes,
2072            limit_bytes,
2073            current_loaded_bytes,
2074        } => DaemonError::WorkspaceOversize {
2075            root: root.clone(),
2076            measured_bytes: *measured_bytes,
2077            limit_bytes: *limit_bytes,
2078            current_loaded_bytes: *current_loaded_bytes,
2079        },
2080        DaemonError::WorkspacePinned { root } => {
2081            DaemonError::WorkspacePinned { root: root.clone() }
2082        }
2083        DaemonError::ResetWhileLoading { root } => {
2084            DaemonError::ResetWhileLoading { root: root.clone() }
2085        }
2086        DaemonError::ResetCancellationDispatched {
2087            root,
2088            retry_after_ms,
2089        } => DaemonError::ResetCancellationDispatched {
2090            root: root.clone(),
2091            retry_after_ms: *retry_after_ms,
2092        },
2093        DaemonError::SocketSetup { path, reason } => DaemonError::SocketSetup {
2094            path: path.clone(),
2095            reason: reason.clone(),
2096        },
2097        DaemonError::QueryTooBroad { reason, details } => DaemonError::QueryTooBroad {
2098            reason: reason.clone(),
2099            details: details.clone(),
2100        },
2101        other @ (DaemonError::Config { .. } | DaemonError::Io(_)) => {
2102            DaemonError::WorkspaceBuildFailed {
2103                root: Path::new("<unknown>").to_path_buf(),
2104                reason: other.to_string(),
2105            }
2106        }
2107    }
2108}
2109
2110// ---------------------------------------------------------------------------
2111// RebuildReservation (RAII)
2112// ---------------------------------------------------------------------------
2113
2114/// RAII guard representing an in-flight rebuild's admission headroom.
2115///
2116/// - On the success path, the guard is consumed by
2117///   [`WorkspaceManager::publish_and_retain`], which sets
2118///   `released = true` before draining `bytes` from `reserved_bytes`.
2119/// - On any other drop path (rebuild panic, cancellation, early
2120///   return on plugin error) the guard's `Drop` releases the reserved
2121///   bytes back to the admission pool. This keeps the §G.5 invariant
2122///   intact across every exit path.
2123///
2124/// The manager pointer is a [`Weak`] so a guard that outlives its
2125/// manager (e.g. the daemon is dropped mid-rebuild) does not try to
2126/// touch freed memory. A `None` upgrade on drop is silently ignored —
2127/// the manager took the retained bytes with it when it dropped.
2128#[must_use = "RebuildReservation must either be consumed by publish_and_retain() \
2129              or intentionally dropped to return its bytes to the admission pool"]
2130pub struct RebuildReservation {
2131    manager: Weak<WorkspaceManager>,
2132    bytes: u64,
2133    released: bool,
2134}
2135
2136impl RebuildReservation {
2137    /// How many bytes this reservation currently holds.
2138    #[must_use]
2139    pub fn bytes(&self) -> u64 {
2140        self.bytes
2141    }
2142}
2143
2144impl std::fmt::Debug for RebuildReservation {
2145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2146        f.debug_struct("RebuildReservation")
2147            .field("bytes", &self.bytes)
2148            .field("released", &self.released)
2149            .finish()
2150    }
2151}
2152
2153impl Drop for RebuildReservation {
2154    fn drop(&mut self) {
2155        if self.released {
2156            return;
2157        }
2158        if let Some(mgr) = self.manager.upgrade() {
2159            let mut state = mgr.admission.lock();
2160            state.reserved_bytes = state.reserved_bytes.saturating_sub(self.bytes);
2161        }
2162    }
2163}
2164
2165// ---------------------------------------------------------------------------
2166// RollbackGuard (panic-safety for publish_and_retain)
2167// ---------------------------------------------------------------------------
2168
2169/// Panic-safe rollback wrapper used by [`WorkspaceManager::publish_and_retain`].
2170///
2171/// Captures the prior `Arc<CodeGraph>` and the prior `memory_bytes`
2172/// *before* any swap. If the thread unwinds between the swap and the
2173/// admission-mutex acquisition, the guard's `Drop` restores both
2174/// fields — leaving the workspace serving its pre-rebuild graph as if
2175/// the publish never happened.
2176///
2177/// Correctness depends on three contracts:
2178///
2179/// 1. The guard is constructed *before* the `ArcSwap::swap` call.
2180/// 2. `armed` is set to `false` only on the success path, after the
2181///    admission mutex has released.
2182/// 3. No fallible operation (heap allocation failure, etc.) runs
2183///    between the two swaps — otherwise the guard would be asked to
2184///    reverse a partial swap.
2185pub(crate) struct RollbackGuard<'a> {
2186    pub(crate) ws: &'a LoadedWorkspace,
2187    pub(crate) prior_arc: Option<Arc<CodeGraph>>,
2188    pub(crate) prior_bytes: usize,
2189    pub(crate) armed: bool,
2190}
2191
2192impl<'a> Drop for RollbackGuard<'a> {
2193    fn drop(&mut self) {
2194        if !self.armed {
2195            return;
2196        }
2197        if let Some(arc) = self.prior_arc.take() {
2198            self.ws.graph.store(arc);
2199        }
2200        self.ws
2201            .memory_bytes
2202            .store(self.prior_bytes, std::sync::atomic::Ordering::Release);
2203    }
2204}
2205
2206// ---------------------------------------------------------------------------
2207// Retention reaper task
2208// ---------------------------------------------------------------------------
2209
2210/// Long-lived tokio task: polls [`WorkspaceManager::reap_once`] on a
2211/// fixed 25 ms cadence (A2 §G.3).
2212///
2213/// Takes a `Weak<WorkspaceManager>` so a `WorkspaceManager::drop`
2214/// before the task notices the abort signal does not dereference
2215/// freed memory. The first failed `Weak::upgrade` exits the loop
2216/// cleanly.
2217async fn retention_reaper(mgr: Weak<WorkspaceManager>) {
2218    let interval = Duration::from_millis(25);
2219    loop {
2220        tokio::time::sleep(interval).await;
2221        let Some(mgr) = mgr.upgrade() else {
2222            return;
2223        };
2224        mgr.reap_once();
2225    }
2226}
2227
2228// ---------------------------------------------------------------------------
2229// Tests
2230// ---------------------------------------------------------------------------
2231
2232#[cfg(test)]
2233mod tests {
2234    use std::{path::PathBuf, sync::atomic::Ordering};
2235
2236    use sqry_core::project::ProjectRootMode;
2237
2238    use crate::config::DaemonConfig;
2239
2240    use super::{
2241        super::{loaded::LoadedWorkspace, state::WorkspaceKey},
2242        *,
2243    };
2244
2245    fn make_config() -> Arc<DaemonConfig> {
2246        // 1 MiB budget keeps the arithmetic tractable in assertions.
2247        Arc::new(DaemonConfig {
2248            memory_limit_mb: 1,
2249            ..DaemonConfig::default()
2250        })
2251    }
2252
2253    fn make_workspace() -> Arc<LoadedWorkspace> {
2254        Arc::new(LoadedWorkspace::new(
2255            WorkspaceKey::new(
2256                PathBuf::from("/repos/example"),
2257                ProjectRootMode::GitRoot,
2258                0x1,
2259            ),
2260            false,
2261        ))
2262    }
2263
2264    /// Register a workspace under `key` on `mgr` so that
2265    /// `reserve_rebuild` sees it present in its Phase-1
2266    /// `workspaces.read()` scope. Phase 7b1 tightens `reserve_rebuild`
2267    /// to reject unregistered keys with `DaemonError::WorkspaceEvicted`,
2268    /// so every admission-level test that expects a reservation (or a
2269    /// memory-budget rejection) must insert a workspace first.
2270    fn register_workspace(mgr: &WorkspaceManager, key: &WorkspaceKey) {
2271        mgr.workspaces.write().insert(
2272            key.clone(),
2273            Arc::new(LoadedWorkspace::new(key.clone(), false)),
2274        );
2275    }
2276
2277    #[test]
2278    fn reserve_rebuild_succeeds_when_headroom_available() {
2279        let mgr = WorkspaceManager::new_without_reaper(make_config());
2280        let key = WorkspaceKey::new(
2281            PathBuf::from("/repos/example"),
2282            ProjectRootMode::GitRoot,
2283            0x1,
2284        );
2285        register_workspace(&mgr, &key);
2286        let reservation = mgr
2287            .reserve_rebuild(&key, 500_000) // 500 kB into 1 MiB budget
2288            .expect("reservation fits");
2289        assert_eq!(reservation.bytes(), 500_000);
2290        assert_eq!(mgr.admission.lock().reserved_bytes, 500_000);
2291        drop(reservation);
2292        assert_eq!(
2293            mgr.admission.lock().reserved_bytes,
2294            0,
2295            "dropping an unconsumed reservation must return its bytes",
2296        );
2297    }
2298
2299    #[test]
2300    fn reserve_rebuild_rejects_oversized_request() {
2301        let mgr = WorkspaceManager::new_without_reaper(make_config());
2302        let key = WorkspaceKey::new(
2303            PathBuf::from("/repos/example"),
2304            ProjectRootMode::GitRoot,
2305            0x1,
2306        );
2307        register_workspace(&mgr, &key);
2308        let err = mgr.reserve_rebuild(&key, 10 * 1024 * 1024).expect_err(
2309            "a reservation bigger than the budget must be rejected with MemoryBudgetExceeded",
2310        );
2311        match err {
2312            DaemonError::MemoryBudgetExceeded {
2313                limit_bytes,
2314                requested_bytes,
2315                ..
2316            } => {
2317                assert_eq!(limit_bytes, 1024 * 1024);
2318                assert_eq!(requested_bytes, 10 * 1024 * 1024);
2319            }
2320            other => panic!("wrong error variant: {other:?}"),
2321        }
2322        assert_eq!(
2323            mgr.admission.lock().reserved_bytes,
2324            0,
2325            "a rejected reservation must not mutate admission state",
2326        );
2327    }
2328
2329    #[test]
2330    fn reserve_rebuild_rejects_when_running_total_would_exceed_budget() {
2331        let mgr = WorkspaceManager::new_without_reaper(make_config());
2332        let key = WorkspaceKey::new(
2333            PathBuf::from("/repos/example"),
2334            ProjectRootMode::GitRoot,
2335            0x1,
2336        );
2337        register_workspace(&mgr, &key);
2338        let a = mgr.reserve_rebuild(&key, 600_000).expect("first fits");
2339        let err = mgr
2340            .reserve_rebuild(&key, 600_000)
2341            .expect_err("second pushes over 1 MiB budget");
2342        match err {
2343            DaemonError::MemoryBudgetExceeded { reserved_bytes, .. } => {
2344                assert_eq!(reserved_bytes, 600_000, "first reservation still held");
2345            }
2346            other => panic!("wrong error variant: {other:?}"),
2347        }
2348        drop(a);
2349    }
2350
2351    #[test]
2352    fn reserve_rebuild_rejects_unknown_key() {
2353        // Task 7 Phase 7b1: unregistered keys must be rejected with
2354        // WorkspaceEvicted instead of succeeding. Prevents publishing
2355        // into an orphaned LoadedWorkspace after a race with eviction.
2356        let mgr = WorkspaceManager::new_without_reaper(make_config());
2357        let key = WorkspaceKey::new(
2358            PathBuf::from("/repos/never-registered"),
2359            ProjectRootMode::GitRoot,
2360            0xDEAD,
2361        );
2362        let err = mgr
2363            .reserve_rebuild(&key, 100_000)
2364            .expect_err("unknown key must surface WorkspaceEvicted");
2365        match err {
2366            DaemonError::WorkspaceEvicted { root } => {
2367                assert_eq!(root, PathBuf::from("/repos/never-registered"));
2368            }
2369            other => panic!("wrong error variant: {other:?}"),
2370        }
2371        assert_eq!(
2372            mgr.admission.lock().reserved_bytes,
2373            0,
2374            "a rejected reservation must not mutate admission state",
2375        );
2376    }
2377
2378    #[test]
2379    fn reserve_rebuild_rejects_cancelled_workspace() {
2380        // Task 7 Phase 7b1: a workspace whose `rebuild_cancelled` flag
2381        // is set (by `execute_eviction`) must be rejected even if still
2382        // present in the map (the two mutations run under the same
2383        // `workspaces.write()` scope, but defensive reads should catch
2384        // either signal).
2385        let mgr = WorkspaceManager::new_without_reaper(make_config());
2386        let key = WorkspaceKey::new(
2387            PathBuf::from("/repos/cancelled"),
2388            ProjectRootMode::GitRoot,
2389            0xCAFE,
2390        );
2391        let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
2392        ws.rebuild_cancelled.store(true, Ordering::Release);
2393        mgr.workspaces.write().insert(key.clone(), ws);
2394
2395        let err = mgr
2396            .reserve_rebuild(&key, 100_000)
2397            .expect_err("cancelled workspace must surface WorkspaceEvicted");
2398        match err {
2399            DaemonError::WorkspaceEvicted { root } => {
2400                assert_eq!(root, PathBuf::from("/repos/cancelled"));
2401            }
2402            other => panic!("wrong error variant: {other:?}"),
2403        }
2404    }
2405
2406    #[test]
2407    fn publish_and_retain_moves_bytes_and_retains_old_arc() {
2408        let mgr = WorkspaceManager::new_without_reaper(make_config());
2409        let ws = make_workspace();
2410        mgr.workspaces
2411            .write()
2412            .insert(ws.key.clone(), Arc::clone(&ws));
2413        let reservation = mgr.reserve_rebuild(&ws.key, 100_000).expect("reserve fits");
2414
2415        // Pre-seed workspace memory_bytes so publish exercises the
2416        // loaded-bytes swap (subtract prior, add new).
2417        ws.memory_bytes.store(50_000, Ordering::Release);
2418        mgr.admission.lock().loaded_bytes = 50_000;
2419
2420        let new_graph = CodeGraph::new();
2421        let new_bytes = new_graph.heap_bytes() as u64;
2422        let (token, _published_arc) = mgr
2423            .publish_and_retain(reservation, &ws, new_graph)
2424            .expect("publish_and_retain succeeds within memory budget");
2425
2426        let state = mgr.admission.lock();
2427        assert_eq!(
2428            state.reserved_bytes, 0,
2429            "reservation bytes must drain on publish"
2430        );
2431        assert_eq!(
2432            state.loaded_bytes, new_bytes,
2433            "loaded_bytes = prior(50k) - prior(50k) + new(heap_bytes())",
2434        );
2435        assert_eq!(state.retained_old.len(), 1, "exactly one retained entry");
2436        let retained = state.retained_old.get(&token).expect("token present");
2437        assert_eq!(
2438            retained.bytes, 50_000,
2439            "retained bytes is the prior workspace memory_bytes",
2440        );
2441        assert_eq!(
2442            Arc::strong_count(&retained.graph),
2443            1,
2444            "admission map is the sole holder of the old Arc after publish",
2445        );
2446    }
2447
2448    #[test]
2449    fn rollback_guard_restores_workspace_on_panic_path() {
2450        // Synthesise the exact field layout publish_and_retain sets up
2451        // so the guard's Drop behaviour can be exercised directly,
2452        // without the heavy publish path.
2453        let ws = make_workspace();
2454        let old_graph = Arc::new(CodeGraph::new());
2455        ws.graph.store(Arc::clone(&old_graph));
2456        ws.memory_bytes.store(10_000, Ordering::Release);
2457
2458        {
2459            let mut guard = RollbackGuard {
2460                ws: &ws,
2461                prior_arc: Some(Arc::clone(&old_graph)),
2462                prior_bytes: 10_000,
2463                armed: true,
2464            };
2465
2466            // Simulate a partial publish: swap the ArcSwap + memory_bytes.
2467            let stomped = Arc::new(CodeGraph::new());
2468            ws.graph.store(Arc::clone(&stomped));
2469            ws.memory_bytes.store(99_999, Ordering::Release);
2470
2471            // `armed == true` so the guard reverses both fields on drop.
2472            // Flip the disarm check intentionally OFF — mimics panic path.
2473            let _ = &mut guard;
2474        }
2475
2476        // After the guard drops, both fields must match the prior.
2477        let restored = ws.graph.load_full();
2478        assert!(Arc::ptr_eq(&restored, &old_graph));
2479        assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 10_000);
2480    }
2481
2482    #[test]
2483    fn rollback_guard_disarmed_is_noop() {
2484        let ws = make_workspace();
2485        let old_graph = Arc::new(CodeGraph::new());
2486        ws.graph.store(Arc::clone(&old_graph));
2487        ws.memory_bytes.store(10_000, Ordering::Release);
2488
2489        {
2490            let mut guard = RollbackGuard {
2491                ws: &ws,
2492                prior_arc: Some(Arc::clone(&old_graph)),
2493                prior_bytes: 10_000,
2494                armed: true,
2495            };
2496            let stomped = Arc::new(CodeGraph::new());
2497            ws.graph.store(Arc::clone(&stomped));
2498            ws.memory_bytes.store(99_999, Ordering::Release);
2499
2500            // Success path disarms the guard.
2501            guard.armed = false;
2502        }
2503
2504        // State must stay "stomped" — the guard was disarmed.
2505        assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 99_999);
2506    }
2507
2508    #[test]
2509    fn reap_once_drops_last_holder_entries() {
2510        let mgr = WorkspaceManager::new_without_reaper(make_config());
2511        let ws = make_workspace();
2512        mgr.workspaces
2513            .write()
2514            .insert(ws.key.clone(), Arc::clone(&ws));
2515        let reservation = mgr
2516            .reserve_rebuild(&ws.key, 0)
2517            .expect("zero-size reservation always fits");
2518        // Publish-and-retain with a fresh empty graph; the old graph
2519        // becomes retained.
2520        mgr.publish_and_retain(reservation, &ws, CodeGraph::new())
2521            .expect("publish_and_retain succeeds within memory budget");
2522        assert_eq!(mgr.admission.lock().retained_old.len(), 1);
2523
2524        // No query holds the old Arc, so the next reap tick frees it.
2525        mgr.reap_once();
2526        assert_eq!(
2527            mgr.admission.lock().retained_old.len(),
2528            0,
2529            "reaper must free entries whose strong_count == 1",
2530        );
2531    }
2532
2533    #[test]
2534    fn reap_once_retains_entries_with_outstanding_holders() {
2535        let mgr = WorkspaceManager::new_without_reaper(make_config());
2536        let ws = make_workspace();
2537        mgr.workspaces
2538            .write()
2539            .insert(ws.key.clone(), Arc::clone(&ws));
2540        let reservation = mgr
2541            .reserve_rebuild(&ws.key, 0)
2542            .expect("zero-size reservation always fits");
2543        mgr.publish_and_retain(reservation, &ws, CodeGraph::new())
2544            .expect("publish_and_retain succeeds within memory budget");
2545
2546        // Simulate a slow query holding the retained Arc.
2547        let held = {
2548            let state = mgr.admission.lock();
2549            let token = *state.retained_old.keys().next().expect("one entry");
2550            Arc::clone(&state.retained_old.get(&token).unwrap().graph)
2551        };
2552        assert_eq!(Arc::strong_count(&held), 2);
2553
2554        mgr.reap_once();
2555        assert_eq!(
2556            mgr.admission.lock().retained_old.len(),
2557            1,
2558            "reaper must not drop entries that slow queries still hold",
2559        );
2560        drop(held);
2561
2562        mgr.reap_once();
2563        assert_eq!(
2564            mgr.admission.lock().retained_old.len(),
2565            0,
2566            "reaper frees the entry once the last slow query releases",
2567        );
2568    }
2569
2570    #[test]
2571    fn unconsumed_reservation_refunds_reserved_bytes_on_drop() {
2572        // Regression for Codex Task 6 Phase 6a iter-1 MAJOR:
2573        // if a rebuild panics *between* `reserve_rebuild` and the
2574        // admission-mutex section of `publish_and_retain`, the
2575        // reservation's Drop must refund `reserved_bytes` back to
2576        // the admission pool. A pre-fix bug disarmed the reservation
2577        // too early and leaked bytes on any unwind path.
2578        let mgr = WorkspaceManager::new_without_reaper(make_config());
2579        let ws = make_workspace();
2580        mgr.workspaces
2581            .write()
2582            .insert(ws.key.clone(), Arc::clone(&ws));
2583        let reservation = mgr
2584            .reserve_rebuild(&ws.key, 250_000)
2585            .expect("reservation fits");
2586        assert_eq!(mgr.admission.lock().reserved_bytes, 250_000);
2587
2588        // Simulate a rebuild that panics after reservation but
2589        // before publish by letting the reservation drop on the
2590        // unwind-equivalent code path (explicit drop here; the
2591        // RAII guard fires the same way under `catch_unwind`).
2592        drop(reservation);
2593
2594        assert_eq!(
2595            mgr.admission.lock().reserved_bytes,
2596            0,
2597            "unconsumed reservation must refund reserved_bytes on drop \
2598             (Codex Task 6 Phase 6a iter-1 MAJOR regression)",
2599        );
2600    }
2601
2602    #[test]
2603    fn publish_and_retain_leaves_reservation_fully_disarmed_on_success() {
2604        // Companion to the refund regression: once publish_and_retain
2605        // completes successfully, the reservation must be disarmed —
2606        // otherwise its Drop at scope-exit would double-refund and
2607        // corrupt admission state.
2608        let mgr = WorkspaceManager::new_without_reaper(make_config());
2609        let ws = make_workspace();
2610        mgr.workspaces
2611            .write()
2612            .insert(ws.key.clone(), Arc::clone(&ws));
2613        let reservation = mgr
2614            .reserve_rebuild(&ws.key, 100_000)
2615            .expect("reservation fits");
2616        let admission_before = mgr.admission.lock().reserved_bytes;
2617        assert_eq!(admission_before, 100_000);
2618
2619        // Drive the full commit path. After this returns the
2620        // reservation is already moved into the function, so we can
2621        // only observe the *absence* of any stray refund.
2622        let (_token, _published_arc) = mgr
2623            .publish_and_retain(reservation, &ws, CodeGraph::new())
2624            .expect("publish_and_retain succeeds within memory budget");
2625        let admission_after = mgr.admission.lock().reserved_bytes;
2626        assert_eq!(
2627            admission_after, 0,
2628            "publish must drain reserved_bytes exactly once, not double-drain or leak",
2629        );
2630
2631        // A fresh reservation should see headroom = budget - loaded - retained;
2632        // if the previous publish leaked reserved_bytes this would fail.
2633        let again = mgr
2634            .reserve_rebuild(&ws.key, 100_000)
2635            .expect("post-publish admission must still admit a same-size reservation");
2636        drop(again);
2637        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2638    }
2639
2640    #[test]
2641    fn unwind_after_swap_before_admission_commit_restores_full_state() {
2642        // Regression for Codex Task 6 Phase 6a iter-2 MAJOR:
2643        // simulate a panic *between* the ArcSwap swap and the
2644        // admission mutex acquisition. After unwind, the admission
2645        // state must be exactly pre-call: reserved_bytes refunded,
2646        // loaded_bytes untouched, retained_old empty, workspace.graph
2647        // and workspace.memory_bytes restored to their prior values.
2648        //
2649        // We can't inject a panic into the real `publish_and_retain`
2650        // without mocking the allocator, so we reproduce the exact
2651        // Drop-order interaction using the public types: build a
2652        // RollbackGuard + RebuildReservation in the same geometry as
2653        // the real function, run `catch_unwind` over the non-
2654        // recoverable zone, and panic inside it.
2655        use std::panic::{AssertUnwindSafe, catch_unwind};
2656
2657        let mgr = WorkspaceManager::new_without_reaper(make_config());
2658        let ws = Arc::new(LoadedWorkspace::new(
2659            WorkspaceKey::new(
2660                PathBuf::from("/repos/example"),
2661                ProjectRootMode::GitRoot,
2662                0x1,
2663            ),
2664            false,
2665        ));
2666        mgr.workspaces
2667            .write()
2668            .insert(ws.key.clone(), Arc::clone(&ws));
2669
2670        // Pre-seed workspace bytes so we can observe rollback.
2671        let prior_bytes_usize = 50_000usize;
2672        ws.memory_bytes.store(prior_bytes_usize, Ordering::Release);
2673        mgr.admission.lock().loaded_bytes = 50_000;
2674        let prior_arc = ws.graph.load_full();
2675
2676        // Reserve headroom as the real function does.
2677        let reservation = mgr
2678            .reserve_rebuild(&ws.key, 100_000)
2679            .expect("reservation fits");
2680        assert_eq!(mgr.admission.lock().reserved_bytes, 100_000);
2681
2682        let outcome = catch_unwind(AssertUnwindSafe(|| {
2683            // Mirror `publish_and_retain` up to and INCLUDING the
2684            // ArcSwap swap + update_memory, then panic *before* we
2685            // would have acquired the admission mutex. This is the
2686            // exact unwind window the iter-2 finding describes.
2687            let new_arc = Arc::new(CodeGraph::new());
2688            let prior_arc_clone = ws.graph.load_full();
2689            // The guard is armed and has no visible use after this
2690            // point; its Drop is the entire reason the scope exists,
2691            // so the binding is deliberately underscore-prefixed and
2692            // held until the panic unwinds the stack.
2693            let _rollback = RollbackGuard {
2694                ws: &ws,
2695                prior_arc: Some(prior_arc_clone),
2696                prior_bytes: prior_bytes_usize,
2697                armed: true,
2698            };
2699            let _old_arc = ws.graph.swap(new_arc);
2700            let _prev = ws.update_memory(99_999);
2701
2702            // Hand the reservation into the scope so its Drop fires
2703            // on unwind if we never disarm it — which we won't.
2704            let _hold = reservation;
2705
2706            // Simulate the panic site (e.g. retained_old.insert OOM).
2707            panic!("simulated panic inside publish_and_retain");
2708        }));
2709        assert!(outcome.is_err(), "catch_unwind must observe the panic");
2710
2711        // Post-unwind assertions — every piece of admission state and
2712        // every observable piece of workspace state must match the
2713        // pre-call snapshot exactly.
2714        let restored = ws.graph.load_full();
2715        assert!(
2716            Arc::ptr_eq(&restored, &prior_arc),
2717            "RollbackGuard must restore ws.graph to the prior Arc after unwind",
2718        );
2719        assert_eq!(
2720            ws.memory_bytes.load(Ordering::Acquire),
2721            prior_bytes_usize,
2722            "RollbackGuard must restore ws.memory_bytes after unwind",
2723        );
2724        let state = mgr.admission.lock();
2725        assert_eq!(
2726            state.reserved_bytes, 0,
2727            "reservation refund must return reserved_bytes to pre-call value (0)",
2728        );
2729        assert_eq!(
2730            state.loaded_bytes, 50_000,
2731            "loaded_bytes must not be mutated when admission commit is never entered",
2732        );
2733        assert_eq!(
2734            state.retained_old.len(),
2735            0,
2736            "retained_old must be empty when admission commit is never entered",
2737        );
2738    }
2739
2740    // --- Phase 6b: lifecycle primitives --------------------------
2741
2742    fn make_key_at(path: &str, fingerprint: u64) -> WorkspaceKey {
2743        WorkspaceKey::new(PathBuf::from(path), ProjectRootMode::GitRoot, fingerprint)
2744    }
2745
2746    #[test]
2747    fn get_or_load_builds_on_miss_and_caches() {
2748        let mgr = WorkspaceManager::new_without_reaper(make_config());
2749        let key = make_key_at("/repos/example", 0x1);
2750        let builder = super::super::builder::EmptyGraphBuilder;
2751
2752        let g1 = mgr
2753            .get_or_load(&key, &builder, 1_000)
2754            .expect("first load succeeds");
2755        let g2 = mgr
2756            .get_or_load(&key, &builder, 1_000)
2757            .expect("second load hits cache");
2758        assert!(
2759            Arc::ptr_eq(&g1, &g2),
2760            "cache hit must return the same Arc as the initial build",
2761        );
2762    }
2763
2764    #[test]
2765    fn get_or_load_surfaces_builder_failures_and_sets_failed_state() {
2766        let mgr = WorkspaceManager::new_without_reaper(make_config());
2767        let key = make_key_at("/repos/example", 0x1);
2768        let failing = super::super::builder::FailingGraphBuilder::new("simulated plugin panic");
2769
2770        let err = mgr
2771            .get_or_load(&key, &failing, 1_000)
2772            .expect_err("builder failure must bubble up");
2773        match err {
2774            DaemonError::WorkspaceBuildFailed { reason, .. } => {
2775                assert_eq!(reason, "simulated plugin panic");
2776            }
2777            other => panic!("wrong variant: {other:?}"),
2778        }
2779
2780        // Workspace should be in Failed state with retry_count==1.
2781        let workspaces = mgr.workspaces.read();
2782        let ws = workspaces.get(&key).expect("workspace registered");
2783        assert_eq!(ws.load_state(), WorkspaceState::Failed);
2784        assert_eq!(ws.retry_count.load(Ordering::Acquire), 1);
2785        assert!(ws.last_error.read().is_some());
2786        drop(workspaces);
2787
2788        // Admission state must NOT have leaked the reservation —
2789        // RebuildReservation's Drop fires on the error path.
2790        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2791    }
2792
2793    #[test]
2794    fn evict_lru_picks_oldest_non_pinned_workspace() {
2795        let mgr = WorkspaceManager::new_without_reaper(make_config());
2796        let builder = super::super::builder::EmptyGraphBuilder;
2797
2798        let a = make_key_at("/repos/a", 0x1);
2799        let b = make_key_at("/repos/b", 0x1);
2800        mgr.get_or_load(&a, &builder, 100_000).unwrap();
2801        std::thread::sleep(Duration::from_millis(5));
2802        mgr.get_or_load(&b, &builder, 100_000).unwrap();
2803
2804        // `a` was touched first, so it should be the LRU victim.
2805        let victim = mgr.evict_lru().expect("one candidate");
2806        assert_eq!(victim, a, "oldest workspace must be evicted first");
2807        // STEP_6 iter-2 contract change: LRU eviction keeps the
2808        // tombstone in the map (state == Evicted) so partial-
2809        // eviction reporting via `daemon/workspaceStatus` can
2810        // still surface the source root. Only `unload` removes
2811        // the entry.
2812        let workspaces = mgr.workspaces.read();
2813        let evicted_ws = workspaces
2814            .get(&a)
2815            .expect("LRU victim stays as tombstone in the manager map");
2816        assert_eq!(
2817            evicted_ws.load_state(),
2818            WorkspaceState::Evicted,
2819            "LRU victim must transition to Evicted, not be removed",
2820        );
2821        assert!(
2822            workspaces.contains_key(&b),
2823            "non-victim workspace must remain",
2824        );
2825    }
2826
2827    #[test]
2828    fn evict_lru_returns_none_when_no_candidates() {
2829        let mgr = WorkspaceManager::new_without_reaper(make_config());
2830        assert!(
2831            mgr.evict_lru().is_none(),
2832            "empty manager has no eviction candidate",
2833        );
2834    }
2835
2836    #[test]
2837    fn evict_lru_skips_pinned_workspaces() {
2838        let mgr = WorkspaceManager::new_without_reaper(make_config());
2839        let builder = super::super::builder::EmptyGraphBuilder;
2840        let pinned_key = make_key_at("/repos/pinned", 0x1);
2841
2842        // Insert a pinned workspace by manually constructing + registering.
2843        {
2844            let mut ws_map = mgr.workspaces.write();
2845            ws_map.insert(
2846                pinned_key.clone(),
2847                Arc::new(LoadedWorkspace::new(
2848                    pinned_key.clone(),
2849                    /*pinned*/ true,
2850                )),
2851            );
2852        }
2853        // And drive it into Loaded state via a no-op publish.
2854        {
2855            let ws = mgr.workspaces.read().get(&pinned_key).unwrap().clone();
2856            ws.store_state(WorkspaceState::Loaded);
2857            ws.touch();
2858        }
2859
2860        // Plus a regular unpinned workspace.
2861        let other = make_key_at("/repos/other", 0x1);
2862        mgr.get_or_load(&other, &builder, 100_000).unwrap();
2863
2864        // Evict should pick `other`, not the pinned one.
2865        let victim = mgr.evict_lru().expect("one candidate");
2866        assert_eq!(victim, other);
2867        assert!(mgr.workspaces.read().contains_key(&pinned_key));
2868    }
2869
2870    #[test]
2871    fn unload_removes_workspace_and_reclaims_bytes() {
2872        let mgr = WorkspaceManager::new_without_reaper(make_config());
2873        let builder = super::super::builder::EmptyGraphBuilder;
2874        let key = make_key_at("/repos/example", 0x1);
2875        mgr.get_or_load(&key, &builder, 100_000).unwrap();
2876        assert!(mgr.workspaces.read().contains_key(&key));
2877
2878        assert!(mgr.unload(&key), "unload must report present");
2879        assert!(!mgr.workspaces.read().contains_key(&key));
2880
2881        assert!(!mgr.unload(&key), "unload on missing key returns false");
2882    }
2883
2884    #[test]
2885    fn status_reflects_loaded_workspaces_and_memory() {
2886        let mgr = WorkspaceManager::new_without_reaper(make_config());
2887        let builder = super::super::builder::EmptyGraphBuilder;
2888        let key = make_key_at("/repos/example", 0x1);
2889        mgr.get_or_load(&key, &builder, 100_000).unwrap();
2890
2891        let status = mgr.status();
2892        assert_eq!(status.daemon_version, env!("CARGO_PKG_VERSION"));
2893        assert_eq!(status.workspaces.len(), 1);
2894        assert_eq!(
2895            status.workspaces[0].index_root,
2896            PathBuf::from("/repos/example")
2897        );
2898        assert_eq!(status.workspaces[0].state, WorkspaceState::Loaded);
2899        assert!(!status.workspaces[0].pinned);
2900        assert_eq!(status.memory.limit_bytes, 1024 * 1024);
2901        // current_bytes is at least as large as the graph (empty here,
2902        // but loaded_bytes tracks an entry regardless).
2903        assert!(
2904            status.memory.high_water_bytes >= status.memory.current_bytes,
2905            "high_water_bytes must be monotonic wrt current_bytes",
2906        );
2907    }
2908
2909    #[test]
2910    fn reserve_rebuild_triggers_eviction_when_budget_tight() {
2911        // Budget is 1 MiB (from make_config). Fill it with a 700 kB
2912        // workspace, then reserve 600 kB — Phase 1 must pick the
2913        // 700 kB workspace as a victim, Phase 2 evicts it, Phase 3
2914        // commits the reservation.
2915        let mgr = WorkspaceManager::new_without_reaper(make_config());
2916        let victim_key = make_key_at("/repos/victim", 0x1);
2917        let victim = Arc::new(LoadedWorkspace::new(victim_key.clone(), false));
2918        victim.memory_bytes.store(700_000, Ordering::Release);
2919        victim.store_state(WorkspaceState::Loaded);
2920        victim.touch();
2921        mgr.workspaces
2922            .write()
2923            .insert(victim_key.clone(), Arc::clone(&victim));
2924        mgr.admission.lock().loaded_bytes = 700_000;
2925
2926        let new_key = make_key_at("/repos/new", 0x1);
2927        mgr.workspaces.write().insert(
2928            new_key.clone(),
2929            Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
2930        );
2931        let reservation = mgr
2932            .reserve_rebuild(&new_key, 600_000)
2933            .expect("Phase 2 eviction must free headroom");
2934        // STEP_6 iter-2 contract: LRU eviction (Phase 2 of
2935        // `reserve_rebuild`) leaves the tombstone in the map.
2936        // The entry is now `Evicted` with `memory_bytes == 0` —
2937        // accounting moved to `retained_old`, but the key stays
2938        // visible to `daemon/workspaceStatus`.
2939        let workspaces = mgr.workspaces.read();
2940        let victim_tombstone = workspaces
2941            .get(&victim_key)
2942            .expect("victim stays as tombstone");
2943        assert_eq!(victim_tombstone.load_state(), WorkspaceState::Evicted);
2944        assert_eq!(
2945            victim_tombstone.memory_bytes.load(Ordering::Acquire),
2946            0,
2947            "evicted tombstone must hold no resident bytes",
2948        );
2949        drop(workspaces);
2950        // Admission reserved the new bytes.
2951        assert_eq!(mgr.admission.lock().reserved_bytes, 600_000);
2952        drop(reservation);
2953    }
2954
2955    #[test]
2956    fn reserve_rebuild_rejects_when_only_pinned_workspaces_remain() {
2957        // Budget 1 MiB. Pin a 900 kB workspace. Requesting 600 kB
2958        // cannot evict the pin, so Phase 3 must reject.
2959        let mgr = WorkspaceManager::new_without_reaper(make_config());
2960        let pinned_key = make_key_at("/repos/pinned", 0x1);
2961        let pinned = Arc::new(LoadedWorkspace::new(
2962            pinned_key.clone(),
2963            /*pinned*/ true,
2964        ));
2965        pinned.memory_bytes.store(900_000, Ordering::Release);
2966        pinned.store_state(WorkspaceState::Loaded);
2967        mgr.workspaces
2968            .write()
2969            .insert(pinned_key.clone(), Arc::clone(&pinned));
2970        mgr.admission.lock().loaded_bytes = 900_000;
2971
2972        let new_key = make_key_at("/repos/new", 0x1);
2973        mgr.workspaces.write().insert(
2974            new_key.clone(),
2975            Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
2976        );
2977        let err = mgr
2978            .reserve_rebuild(&new_key, 600_000)
2979            .expect_err("pinned workspace makes budget unfittable");
2980        match err {
2981            DaemonError::MemoryBudgetExceeded {
2982                requested_bytes,
2983                current_bytes,
2984                ..
2985            } => {
2986                assert_eq!(requested_bytes, 600_000);
2987                assert_eq!(
2988                    current_bytes, 900_000,
2989                    "pinned workspace bytes still count after Phase 2",
2990                );
2991            }
2992            other => panic!("wrong variant: {other:?}"),
2993        }
2994        // Pinned workspace must still be present.
2995        assert!(mgr.workspaces.read().contains_key(&pinned_key));
2996    }
2997
2998    #[test]
2999    fn execute_eviction_routes_bytes_through_retained_old() {
3000        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #1:
3001        // eviction previously dropped the evicted Arc without
3002        // inserting a retained entry, leaking bytes if a slow
3003        // query still held the graph.
3004        let mgr = WorkspaceManager::new_without_reaper(make_config());
3005        let ws_key = make_key_at("/repos/example", 0x1);
3006        let ws = Arc::new(LoadedWorkspace::new(ws_key.clone(), false));
3007        ws.memory_bytes.store(300_000, Ordering::Release);
3008        ws.store_state(WorkspaceState::Loaded);
3009        mgr.workspaces
3010            .write()
3011            .insert(ws_key.clone(), Arc::clone(&ws));
3012        mgr.admission.lock().loaded_bytes = 300_000;
3013
3014        // Pin the current graph Arc via a simulated slow query
3015        // holder so the retained entry stays past the first reap.
3016        let slow_query_arc = ws.graph.load_full();
3017
3018        mgr.execute_eviction(&ws_key);
3019
3020        let state = mgr.admission.lock();
3021        assert_eq!(
3022            state.loaded_bytes, 0,
3023            "evicted workspace bytes must leave the loaded tier",
3024        );
3025        assert_eq!(
3026            state.retained_total_bytes(),
3027            300_000,
3028            "evicted workspace bytes must enter the retained tier",
3029        );
3030        assert_eq!(state.retained_old.len(), 1);
3031        drop(state);
3032
3033        // The slow query still holds the Arc. A reap does NOT free
3034        // yet — §G.5 is preserved until strong_count == 1.
3035        mgr.reap_once();
3036        assert_eq!(mgr.admission.lock().retained_total_bytes(), 300_000);
3037
3038        // Once the slow query releases, the next reap frees bytes.
3039        drop(slow_query_arc);
3040        mgr.reap_once();
3041        assert_eq!(
3042            mgr.admission.lock().retained_total_bytes(),
3043            0,
3044            "reaper must free retained entry once slow query releases",
3045        );
3046    }
3047
3048    #[test]
3049    fn get_or_load_state_cas_rejects_concurrent_load() {
3050        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #2:
3051        // two loaders must not both run the slow path. The state
3052        // CAS gates exactly one winner.
3053        let mgr = WorkspaceManager::new_without_reaper(make_config());
3054        let key = make_key_at("/repos/example", 0x1);
3055        let ws = mgr.get_or_insert_workspace(&key);
3056        // Simulate another loader holding the gate.
3057        ws.store_state(WorkspaceState::Loading);
3058
3059        let builder = super::super::builder::EmptyGraphBuilder;
3060        let err = mgr
3061            .get_or_load(&key, &builder, 1_000)
3062            .expect_err("concurrent load must be rejected");
3063        match err {
3064            DaemonError::WorkspaceBuildFailed { reason, .. } => {
3065                assert!(
3066                    reason.contains("already in progress"),
3067                    "unexpected reason: {reason}",
3068                );
3069            }
3070            other => panic!("wrong variant: {other:?}"),
3071        }
3072
3073        // Restore state so Drop order is clean; sanity-check that
3074        // the admission state was not mutated by the rejected call.
3075        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
3076    }
3077
3078    #[test]
3079    fn get_or_load_detects_cancellation_between_cas_and_publish() {
3080        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #2
3081        // (cancellation-detection subcase): if rebuild_cancelled was
3082        // set before our CAS — i.e. evict raced in front of us on
3083        // the prior state — get_or_load must honour the signal
3084        // instead of clobbering it and publishing into an evicted
3085        // workspace.
3086        let mgr = WorkspaceManager::new_without_reaper(make_config());
3087        let key = make_key_at("/repos/example", 0x1);
3088        let ws = mgr.get_or_insert_workspace(&key);
3089        // Simulate "evict ran on an earlier state but left the
3090        // workspace in the map": cancellation flag set, state
3091        // Unloaded (so CAS succeeds).
3092        ws.rebuild_cancelled.store(true, Ordering::Release);
3093        ws.store_state(WorkspaceState::Unloaded);
3094
3095        let builder = super::super::builder::EmptyGraphBuilder;
3096        let err = mgr
3097            .get_or_load(&key, &builder, 1_000)
3098            .expect_err("pre-CAS cancellation must be honoured");
3099        match err {
3100            DaemonError::WorkspaceBuildFailed { reason, .. } => {
3101                assert!(
3102                    reason.contains("evicted mid-load"),
3103                    "unexpected reason: {reason}",
3104                );
3105            }
3106            other => panic!("wrong variant: {other:?}"),
3107        }
3108        // rebuild_cancelled must still be true (we didn't clobber).
3109        assert!(ws.rebuild_cancelled.load(Ordering::Acquire));
3110        assert_eq!(ws.load_state(), WorkspaceState::Failed);
3111    }
3112
3113    #[test]
3114    fn get_or_load_loading_guard_recovers_from_builder_panic() {
3115        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #3:
3116        // a panic from builder.build must not leave the workspace
3117        // stuck in Loading with last_error unset.
3118        use std::panic::{AssertUnwindSafe, catch_unwind};
3119
3120        #[derive(Debug)]
3121        struct PanickingBuilder;
3122        impl WorkspaceBuilder for PanickingBuilder {
3123            fn build(&self, _root: &Path) -> Result<CodeGraph, DaemonError> {
3124                panic!("simulated builder panic");
3125            }
3126        }
3127
3128        let mgr = WorkspaceManager::new_without_reaper(make_config());
3129        let key = make_key_at("/repos/example", 0x1);
3130        let builder = PanickingBuilder;
3131
3132        let outcome = catch_unwind(AssertUnwindSafe(|| {
3133            let _ = mgr.get_or_load(&key, &builder, 1_000);
3134        }));
3135        assert!(outcome.is_err(), "panic must propagate through get_or_load");
3136
3137        let workspaces = mgr.workspaces.read();
3138        let ws = workspaces.get(&key).expect("workspace still registered");
3139        assert_eq!(
3140            ws.load_state(),
3141            WorkspaceState::Failed,
3142            "LoadingGuard must transition Loading → Failed on unwind",
3143        );
3144        assert!(
3145            ws.last_error.read().is_some(),
3146            "LoadingGuard must populate last_error on unwind",
3147        );
3148        assert!(
3149            ws.retry_count.load(Ordering::Acquire) >= 1,
3150            "LoadingGuard must increment retry_count",
3151        );
3152        drop(workspaces);
3153
3154        // Admission: the RebuildReservation Drop on unwind refunds
3155        // reserved_bytes, so the state is clean.
3156        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
3157    }
3158
3159    #[test]
3160    fn concurrent_load_and_evict_never_publishes_into_evicted_workspace() {
3161        // Regression for Codex Task 6 Phase 6b iter-2 MAJOR:
3162        // the post-build re-check was not atomic with
3163        // `publish_and_retain`. A concurrent eviction could slip
3164        // in between the re-check and the publish, so we'd end
3165        // up accounting bytes for an evicted workspace.
3166        //
3167        // Stress test: run many iterations of `get_or_load` and
3168        // `execute_eviction` concurrently; every iteration
3169        // should leave the admission state consistent (§G.5),
3170        // the workspace either fully loaded or fully evicted,
3171        // and never in a half-committed "loaded_bytes points at
3172        // a graph that isn't in the map" state.
3173        use std::sync::Barrier;
3174        use std::thread;
3175
3176        const ITERATIONS: usize = 64;
3177        for iter in 0..ITERATIONS {
3178            let mgr = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig {
3179                memory_limit_mb: 64,
3180                ..DaemonConfig::default()
3181            }));
3182            let key = make_key_at("/repos/example", iter as u64);
3183            let builder = Arc::new(super::super::builder::EmptyGraphBuilder);
3184
3185            let start = Arc::new(Barrier::new(2));
3186            let mgr_clone = Arc::clone(&mgr);
3187            let key_clone = key.clone();
3188            let builder_clone = Arc::clone(&builder);
3189            let start_load = Arc::clone(&start);
3190            let loader = thread::spawn(move || {
3191                start_load.wait();
3192                // Intentionally ignore the result — either success
3193                // or failure is valid; we assert post-hoc invariants.
3194                let _ = mgr_clone.get_or_load(&key_clone, &*builder_clone, 100_000);
3195            });
3196
3197            let mgr_clone = Arc::clone(&mgr);
3198            let key_clone = key.clone();
3199            let start_evict = Arc::clone(&start);
3200            let evictor = thread::spawn(move || {
3201                start_evict.wait();
3202                // Run unload against the same key; either it races
3203                // ahead of the loader (no-op), or evicts after the
3204                // loader publishes.
3205                mgr_clone.unload(&key_clone);
3206            });
3207
3208            loader.join().expect("loader panicked");
3209            evictor.join().expect("evictor panicked");
3210
3211            // Post-hoc invariants:
3212            // 1. The workspace is either Loaded AND in the map, or
3213            //    not in the map at all. No "evicted-but-in-map"
3214            //    intermediate state.
3215            // 2. Admission state is consistent: loaded_bytes +
3216            //    reserved_bytes + retained_total is whatever it is,
3217            //    but reserved_bytes must be zero (no in-flight
3218            //    reservations) and the invariant must hold as
3219            //    evidenced by positive counters.
3220            let workspaces = mgr.workspaces.read();
3221            if let Some(ws) = workspaces.get(&key) {
3222                assert_eq!(
3223                    ws.load_state(),
3224                    WorkspaceState::Loaded,
3225                    "iter {iter}: workspace in map must be Loaded, not {}",
3226                    ws.load_state(),
3227                );
3228            }
3229            drop(workspaces);
3230
3231            let state = mgr.admission.lock();
3232            assert_eq!(
3233                state.reserved_bytes, 0,
3234                "iter {iter}: no reservations should leak after the race"
3235            );
3236            // §G.5 is intrinsically maintained by the arithmetic
3237            // operations; assert the totals are non-negative and
3238            // fit the budget.
3239            assert!(
3240                state.total_committed_bytes() <= mgr.memory_limit_bytes(),
3241                "iter {iter}: total_committed {} over budget {}",
3242                state.total_committed_bytes(),
3243                mgr.memory_limit_bytes(),
3244            );
3245        }
3246    }
3247
3248    #[test]
3249    fn publish_fires_installed_hook() {
3250        // Phase 6c iter-2: `get_or_load` must invoke the installed
3251        // SqrydHook once the admission commit succeeds AND after
3252        // releasing `workspaces_guard`. This test drives the full
3253        // load path end-to-end so the fix (moving the hook out of
3254        // `publish_and_retain` and into the caller, outside every
3255        // workspaces-lock holder) is exercised — not just the raw
3256        // `publish_and_retain` critical section.
3257        let mgr = WorkspaceManager::new_without_reaper(make_config());
3258        let hook = super::super::hook::RecordingHook::new();
3259        mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
3260
3261        let key = make_key_at("/repos/example", 0x1);
3262        let builder = super::super::builder::EmptyGraphBuilder;
3263        mgr.get_or_load(&key, &builder, 0)
3264            .expect("load on empty builder succeeds");
3265
3266        assert_eq!(
3267            hook.invocation_count(),
3268            1,
3269            "hook must fire exactly once per publish",
3270        );
3271        assert_eq!(
3272            hook.invocation_roots(),
3273            vec![key.source_root.clone()],
3274            "hook must receive the workspace's index_root",
3275        );
3276    }
3277
3278    #[test]
3279    fn set_hook_replaces_prior_hook_for_subsequent_publishes() {
3280        // Phase 6c iter-2: install hook A, load, evict, install
3281        // hook B, load again. Hook A sees one invocation; hook B
3282        // sees one. Driving through `get_or_load` exercises the
3283        // post-`workspaces_guard`-drop dispatch path the iter-2
3284        // fix added.
3285        let mgr = WorkspaceManager::new_without_reaper(make_config());
3286        let hook_a = super::super::hook::RecordingHook::new();
3287        let hook_b = super::super::hook::RecordingHook::new();
3288        let builder = super::super::builder::EmptyGraphBuilder;
3289        let key = make_key_at("/repos/example", 0x1);
3290
3291        mgr.set_hook(Arc::clone(&hook_a) as super::super::hook::SharedHook);
3292        mgr.get_or_load(&key, &builder, 0)
3293            .expect("first load with hook A");
3294
3295        // Evict so the next `get_or_load` rebuilds and re-publishes
3296        // rather than hitting the Loaded-state cache fast path.
3297        mgr.unload(&key);
3298
3299        mgr.set_hook(Arc::clone(&hook_b) as super::super::hook::SharedHook);
3300        mgr.get_or_load(&key, &builder, 0)
3301            .expect("second load with hook B");
3302
3303        assert_eq!(hook_a.invocation_count(), 1);
3304        assert_eq!(hook_b.invocation_count(), 1);
3305    }
3306
3307    #[test]
3308    fn hook_can_call_manager_unload_without_deadlock() {
3309        // Regression for Codex Task 6 Phase 6c iter-1 MAJOR: the
3310        // hook must fire OUTSIDE the `workspaces.read()` guard
3311        // that `get_or_load` holds across `publish_and_retain`,
3312        // so a hook impl that calls back into `manager.unload(key)`
3313        // — which acquires `workspaces.write()` inside
3314        // `execute_eviction` — must NOT deadlock against the
3315        // loader that fired it.
3316        //
3317        // Pre-fix: the hook dispatched from inside
3318        // `publish_and_retain` under the caller's
3319        // `workspaces.read()` guard, so the re-entrant
3320        // `workspaces.write()` in `unload` would block forever.
3321        //
3322        // We run the load on a background thread and fail the
3323        // test if the thread is still alive after a generous
3324        // timeout — that turns any deadlock regression into a
3325        // deterministic failure rather than a stuck runner.
3326        use std::{sync::Weak, thread, time::Duration};
3327
3328        #[derive(Debug)]
3329        struct UnloadingHook {
3330            manager: Weak<WorkspaceManager>,
3331            key: WorkspaceKey,
3332        }
3333
3334        impl super::super::hook::SqrydHook for UnloadingHook {
3335            fn on_publish(&self, _workspace_root: &Path, _graph: Arc<CodeGraph>) {
3336                if let Some(mgr) = self.manager.upgrade() {
3337                    // If the iter-2 fix regressed and this fires
3338                    // under `workspaces.read()`, the `.write()`
3339                    // inside `execute_eviction` deadlocks here
3340                    // and the test's join timeout triggers below.
3341                    let _present = mgr.unload(&self.key);
3342                }
3343            }
3344        }
3345
3346        let mgr = WorkspaceManager::new_without_reaper(make_config());
3347        let key = make_key_at("/repos/example", 0x1);
3348        let builder = super::super::builder::EmptyGraphBuilder;
3349        let hook = Arc::new(UnloadingHook {
3350            manager: Arc::downgrade(&mgr),
3351            key: key.clone(),
3352        });
3353        mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
3354
3355        let mgr_for_thread = Arc::clone(&mgr);
3356        let key_for_thread = key.clone();
3357        let builder_for_thread = builder;
3358        let handle = thread::spawn(move || {
3359            mgr_for_thread
3360                .get_or_load(&key_for_thread, &builder_for_thread, 0)
3361                .expect("load succeeds even with re-entrant hook");
3362        });
3363
3364        let deadline = std::time::Instant::now() + Duration::from_secs(10);
3365        while !handle.is_finished() {
3366            if std::time::Instant::now() > deadline {
3367                panic!(
3368                    "get_or_load deadlocked while firing hook \
3369                     (Codex Task 6 Phase 6c iter-2 regression: \
3370                     hook must dispatch outside workspaces.read())",
3371                );
3372            }
3373            thread::sleep(Duration::from_millis(20));
3374        }
3375        handle
3376            .join()
3377            .expect("loader thread completed without panic");
3378
3379        // Hook's `unload` ran, so the workspace must no longer be
3380        // in the manager map.
3381        assert!(
3382            !mgr.workspaces.read().contains_key(&key),
3383            "hook's re-entrant unload must have removed the workspace",
3384        );
3385        // And the hook observation: it fired exactly once.
3386        // (The hook itself doesn't record invocations; the
3387        // absence-of-workspace assertion above is the positive
3388        // signal that `on_publish` ran to completion.)
3389    }
3390
3391    #[tokio::test]
3392    async fn retention_reaper_task_eventually_drops_free_entries() {
3393        let mgr = WorkspaceManager::new(make_config());
3394        let ws = make_workspace();
3395        mgr.workspaces
3396            .write()
3397            .insert(ws.key.clone(), Arc::clone(&ws));
3398        let reservation = mgr
3399            .reserve_rebuild(&ws.key, 0)
3400            .expect("zero-size reservation always fits");
3401        mgr.publish_and_retain(reservation, &ws, CodeGraph::new())
3402            .expect("publish_and_retain succeeds within memory budget");
3403        assert_eq!(mgr.admission.lock().retained_old.len(), 1);
3404
3405        // Reaper ticks every 25 ms; 200 ms is generous.
3406        for _ in 0..20 {
3407            tokio::time::sleep(Duration::from_millis(10)).await;
3408            if mgr.admission.lock().retained_old.is_empty() {
3409                return;
3410            }
3411        }
3412        panic!("reaper task never freed the entry within 200 ms");
3413    }
3414
3415    // -----------------------------------------------------------------
3416    // Cluster-G §3.2 — `WorkspaceManager::reset` tests
3417    // -----------------------------------------------------------------
3418
3419    /// Resetting an unregistered workspace returns `Ok(false)` and is
3420    /// a no-op.
3421    #[test]
3422    fn reset_returns_false_when_workspace_absent() {
3423        let mgr = WorkspaceManager::new_without_reaper(make_config());
3424        let key = WorkspaceKey::new(
3425            PathBuf::from("/repos/example"),
3426            ProjectRootMode::GitRoot,
3427            0x1,
3428        );
3429        let reset = mgr.reset(&key, false).expect("reset must succeed");
3430        assert!(!reset, "absent workspace should report `false`");
3431    }
3432
3433    /// Resetting a `Loaded` workspace transitions it to `Unloaded` and
3434    /// preserves the manager-map entry.
3435    #[test]
3436    fn reset_loaded_workspace_preserves_entry() {
3437        let mgr = WorkspaceManager::new_without_reaper(make_config());
3438        let key = WorkspaceKey::new(
3439            PathBuf::from("/repos/example"),
3440            ProjectRootMode::GitRoot,
3441            0x1,
3442        );
3443        register_workspace(&mgr, &key);
3444        // Force the workspace into Loaded for the test.
3445        if let Some(ws) = mgr.workspaces.read().get(&key).cloned() {
3446            ws.store_state(crate::workspace::state::WorkspaceState::Loaded);
3447        }
3448
3449        let reset = mgr
3450            .reset(&key, false)
3451            .expect("reset must succeed for Loaded workspace");
3452        assert!(reset, "present workspace should report `true`");
3453        assert!(
3454            mgr.workspaces.read().contains_key(&key),
3455            "reset must preserve the manager-map entry"
3456        );
3457    }
3458
3459    /// Resetting a `pinned` workspace without `force` returns
3460    /// `WorkspacePinned` and leaves the entry alone.
3461    #[test]
3462    fn reset_pinned_without_force_returns_pinned_error() {
3463        let mgr = WorkspaceManager::new_without_reaper(make_config());
3464        let key = WorkspaceKey::new(
3465            PathBuf::from("/repos/example"),
3466            ProjectRootMode::GitRoot,
3467            0x1,
3468        );
3469        // Insert a pinned workspace directly.
3470        mgr.workspaces.write().insert(
3471            key.clone(),
3472            Arc::new(LoadedWorkspace::new(key.clone(), true)),
3473        );
3474        let err = mgr
3475            .reset(&key, false)
3476            .expect_err("pinned workspace must reject reset without force");
3477        assert!(
3478            matches!(err, crate::error::DaemonError::WorkspacePinned { .. }),
3479            "expected WorkspacePinned, got {err:?}"
3480        );
3481    }
3482
3483    /// `force = true` allows resetting a `pinned` workspace.
3484    #[test]
3485    fn reset_pinned_with_force_succeeds() {
3486        let mgr = WorkspaceManager::new_without_reaper(make_config());
3487        let key = WorkspaceKey::new(
3488            PathBuf::from("/repos/example"),
3489            ProjectRootMode::GitRoot,
3490            0x1,
3491        );
3492        mgr.workspaces.write().insert(
3493            key.clone(),
3494            Arc::new(LoadedWorkspace::new(key.clone(), true)),
3495        );
3496        let reset = mgr
3497            .reset(&key, true)
3498            .expect("force-reset must succeed for pinned workspace");
3499        assert!(reset);
3500    }
3501
3502    /// Cluster-G iter-2 BLOCKER 1 regression: after a successful
3503    /// `reset`, `rebuild_cancelled` MUST be cleared so the next
3504    /// `get_or_load` does not hit the `pre_cancelled && prior_state
3505    /// != Evicted` branch and surface `WorkspaceBuildFailed`. Codex
3506    /// iter-1 review flagged that `evict_to_tombstone_locked` set
3507    /// the flag and `reset` never cleared it, leaving `daemon reset
3508    /// → daemon load` permanently broken.
3509    #[test]
3510    fn reset_clears_rebuild_cancelled_so_next_load_does_not_fail() {
3511        let mgr = WorkspaceManager::new_without_reaper(make_config());
3512        let key = WorkspaceKey::new(
3513            PathBuf::from("/repos/example"),
3514            ProjectRootMode::GitRoot,
3515            0x1,
3516        );
3517        register_workspace(&mgr, &key);
3518        if let Some(ws) = mgr.workspaces.read().get(&key).cloned() {
3519            ws.store_state(crate::workspace::state::WorkspaceState::Loaded);
3520        }
3521        let _ = mgr.reset(&key, false).expect("reset must succeed");
3522        let ws = mgr
3523            .workspaces
3524            .read()
3525            .get(&key)
3526            .cloned()
3527            .expect("entry preserved");
3528        assert!(
3529            !ws.rebuild_cancelled.load(Ordering::Acquire),
3530            "rebuild_cancelled must be CLEARED after reset; otherwise the next \
3531             get_or_load fails with WorkspaceBuildFailed and `daemon reset` is broken"
3532        );
3533    }
3534}