Skip to main content

sqry_daemon/workspace/
manager.rs

1//! [`WorkspaceManager`] — admission accounting entry points.
2//!
3//! Covers Task 6 Steps 3 / 4 / 4a / 4b / 4c / 4d of the sqryd plan
4//! (Amendment 2 §G.1–§G.7). This file lands the admission-accounting
5//! half of the manager — `reserve_rebuild`, `publish_and_retain`,
6//! `RollbackGuard`, and the retention reaper. Workspace lifecycle
7//! (`get_or_load`, `evict_lru`, `unload`, `status`, Failed-state
8//! handling) lands in Phase 6b.
9//!
10//! ## Lock order (authoritative — referenced by §J.4)
11//!
12//! All code paths that acquire more than one lock MUST follow this
13//! total order; acquiring out of order is a bug enforced by code
14//! review.
15//!
16//! 1. `WorkspaceManager.workspaces: RwLock<HashMap<...>>`
17//! 2. `LoadedWorkspace.rebuild_lane: tokio::sync::Mutex<_>` *(Task 7)*
18//! 3. `WorkspaceManager.admission: parking_lot::Mutex<AdmissionState>`
19//!
20//! `WorkspaceManager.hook: RwLock<SharedHook>` is a disjoint
21//! sibling — it is NEVER acquired while any of the three locks
22//! above are held. In particular, the post-publish hook dispatch
23//! (`hook_snapshot` + `SqrydHook::on_publish`) is fired from
24//! `get_or_load` AFTER dropping `workspaces_guard` so the hook
25//! dispatch, and any re-entrant manager method a hook impl might
26//! call, cannot deadlock against the loader that fired it
27//! (Codex Task 6 Phase 6c iter-2 MAJOR).
28//!
29//! Rules:
30//! - A holder of `admission` may NOT acquire `rebuild_lane` or
31//!   `workspaces` — it is the innermost lock.
32//! - A holder of `rebuild_lane` may NOT acquire `workspaces`.
33//!   `rebuild_lane` is used only for scheduling/coalescing pending
34//!   rebuilds; it is never held across a call that takes `workspaces`
35//!   or `admission` nestedly.
36//! - A holder of `workspaces` (reader or writer) may NOT acquire
37//!   `hook`. Hook dispatch happens only after every outer
38//!   workspaces-lock holder has released.
39//! - Eviction iterates `workspaces`, sets the per-workspace atomic
40//!   `rebuild_cancelled` flag (no lock), then acquires `admission`
41//!   alone to update accounting. Eviction never takes `rebuild_lane`.
42//! - The retention reaper acquires only `admission`.
43
44use std::{
45    collections::HashMap,
46    path::Path,
47    sync::{
48        Arc, Weak,
49        atomic::{AtomicU64, Ordering},
50    },
51    time::{Duration, Instant, SystemTime},
52};
53
54use parking_lot::{Mutex, RwLock};
55use sqry_core::graph::{CodeGraph, unified::GraphMemorySize};
56use tokio::task::JoinHandle;
57use tracing::warn;
58
59use crate::{config::DaemonConfig, error::DaemonError};
60
61use super::{
62    admission::{AdmissionState, RetainedEntry},
63    builder::WorkspaceBuilder,
64    hook::{NoOpHook, SharedHook, SqrydHook},
65    loaded::LoadedWorkspace,
66    staleness::{StalenessVerdict, classify_staleness},
67    state::{OldGraphToken, WorkspaceKey, WorkspaceState},
68    status::{DaemonStatus, MemoryStatus, WorkspaceStatus},
69};
70
71// ---------------------------------------------------------------------------
72// ServeVerdict
73// ---------------------------------------------------------------------------
74
75/// Outcome of [`WorkspaceManager::classify_for_serve`].
76///
77/// Task 7 Phase 7c. Rich-enum return so the IPC router (Task 8) can
78/// decide how to shape its response without re-classifying.
79#[derive(Debug, Clone)]
80pub enum ServeVerdict {
81    /// Workspace is healthy; serve from `graph`. Wraps an `Arc` — the
82    /// caller holds a strong reference until it is dropped, independent
83    /// of any subsequent publish or eviction.
84    Fresh {
85        graph: Arc<CodeGraph>,
86        /// Observed workspace state at classification time — either
87        /// [`WorkspaceState::Loaded`] or [`WorkspaceState::Rebuilding`].
88        /// Task 7's envelope populates `meta.workspace_state` from this
89        /// field so clients can tell which flavour of Fresh they
90        /// received (a freshly-loaded snapshot vs. one whose successor
91        /// rebuild is already in flight).
92        state: WorkspaceState,
93    },
94    /// Workspace is in `Failed` state but within the
95    /// `stale_serve_max_age_hours` cap. Serve from `graph` with
96    /// `meta.stale = true` and `age_hours` in the response envelope.
97    Stale {
98        graph: Arc<CodeGraph>,
99        age_hours: u64,
100        /// Timestamp of the last successful build. Task 7 renders this
101        /// into the `_stale_warning` string as RFC3339 / UTC-Zulu.
102        last_good_at: SystemTime,
103        /// Textual diagnostic from the most recent failed build, if any.
104        /// `None` when the workspace has been Failed since the last good
105        /// build but no error text was captured.
106        last_error: Option<String>,
107    },
108    /// Workspace exists in the manager map but is not yet ready to
109    /// serve (`Unloaded` or `Loading`). The IPC router decides what to
110    /// do next (retry-after-delay, enqueue, surface a client-appropriate
111    /// code) — the manager does not prescribe a retry policy.
112    NotReady { state: WorkspaceState },
113}
114
115// ---------------------------------------------------------------------------
116// WorkspaceManager
117// ---------------------------------------------------------------------------
118
119/// Owns every loaded workspace plus the admission-accounting state.
120///
121/// Construction spawns the retention reaper task (§G.3). The handle is
122/// stored so `Drop` can abort it cleanly — on daemon shutdown the
123/// reaper is aborted, then the admission state drops, dropping every
124/// retained `Arc<CodeGraph>` in one pass. No accounting leak, no
125/// dangling `Arc`.
126#[derive(Debug)]
127pub struct WorkspaceManager {
128    /// Immutable daemon configuration — used for the memory budget,
129    /// the reaper interval, and the drain-timeout warning threshold.
130    config: Arc<DaemonConfig>,
131
132    /// Per-workspace state, keyed by [`WorkspaceKey`]. `RwLock` so
133    /// the read-only status path contends only with infrequent
134    /// insert / remove writers.
135    workspaces: RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>>,
136
137    /// Single-mutex admission accounting — see [`AdmissionState`]
138    /// module docs for the §G.5 invariant.
139    admission: Mutex<AdmissionState>,
140
141    /// Join handle of the spawned retention reaper. `Option` so
142    /// `Drop` can `.take().abort()` without requiring `&mut self`.
143    reaper: Mutex<Option<JoinHandle<()>>>,
144
145    /// Instant captured at construction. `daemon/status` reports
146    /// `uptime_seconds` = `Instant::now() - started_at`.
147    started_at: Instant,
148
149    /// Monotonic peak of `AdmissionState::total_committed_bytes`
150    /// observed across the daemon's uptime. Updated via `fetch_max`
151    /// on every admission-mutating operation. Amendment 2 §D.
152    total_memory_high_water: AtomicU64,
153
154    /// Post-publish persistence hook. Defaults to a no-op; Task 9's
155    /// daemon binary installs the production `QueryDbHook` that
156    /// wraps `sqry_db::persistence::save_derived`. Swapped via
157    /// [`Self::set_hook`] at daemon boot after the `QueryDb` is
158    /// constructed.
159    ///
160    /// `RwLock` rather than `ArcSwap` because `SharedHook = Arc<dyn
161    /// Trait + Send + Sync>` is cheap to clone inside the read
162    /// critical section, and the hook is only consulted on publish
163    /// (not on every query) — the RwLock is never a hot path.
164    hook: RwLock<SharedHook>,
165}
166
167impl WorkspaceManager {
168    /// Construct a fresh manager and spawn the retention reaper.
169    ///
170    /// The reaper is spawned on the current Tokio runtime. Callers
171    /// must therefore construct the manager from a Tokio context
172    /// (`#[tokio::main]`, an `async` block driven by `Runtime::block_on`,
173    /// etc.). Tests that don't need the reaper can use
174    /// [`Self::new_without_reaper`].
175    pub fn new(config: Arc<DaemonConfig>) -> Arc<Self> {
176        let mgr = Arc::new(Self {
177            config: Arc::clone(&config),
178            workspaces: RwLock::new(HashMap::new()),
179            admission: Mutex::new(AdmissionState::default()),
180            reaper: Mutex::new(None),
181            started_at: Instant::now(),
182            total_memory_high_water: AtomicU64::new(0),
183            hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
184        });
185        let handle = tokio::spawn(retention_reaper(Arc::downgrade(&mgr)));
186        *mgr.reaper.lock() = Some(handle);
187        mgr
188    }
189
190    /// Like [`Self::new`] but does not spawn the reaper — useful in
191    /// unit tests that drive the retention map synchronously via
192    /// [`Self::reap_once`].
193    #[doc(hidden)]
194    pub fn new_without_reaper(config: Arc<DaemonConfig>) -> Arc<Self> {
195        Arc::new(Self {
196            config,
197            workspaces: RwLock::new(HashMap::new()),
198            admission: Mutex::new(AdmissionState::default()),
199            reaper: Mutex::new(None),
200            started_at: Instant::now(),
201            total_memory_high_water: AtomicU64::new(0),
202            hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
203        })
204    }
205
206    /// Install a post-publish hook. Task 9's daemon binary calls
207    /// this once at startup after constructing the shared
208    /// `QueryDb`; unit tests call it to install a recording hook.
209    /// The old hook is dropped immediately; no retention semantics
210    /// apply.
211    pub fn set_hook(&self, hook: SharedHook) {
212        *self.hook.write() = hook;
213    }
214
215    /// Snapshot the currently installed hook. Internal — used by
216    /// `get_or_load` (Phase 6c iter-2) after dropping the
217    /// `workspaces.read()` guard so the `on_publish` dispatch
218    /// never nests under `workspaces`. Taking the hook under its
219    /// own short read-lock avoids holding the lock across the
220    /// dispatch so a misbehaving hook cannot block a concurrent
221    /// `set_hook` swap.
222    fn hook_snapshot(&self) -> SharedHook {
223        Arc::clone(&*self.hook.read())
224    }
225
226    /// Memory budget in bytes (derived from `config.memory_limit_mb`).
227    #[must_use]
228    pub fn memory_limit_bytes(&self) -> u64 {
229        self.config.memory_limit_bytes()
230    }
231
232    /// Access to the workspace registry (read-only view).
233    ///
234    /// Intentionally `pub(crate)` and `#[allow(dead_code)]` in Phase 6a:
235    /// Phase 6b consumers (`get_or_load`, `evict_lru`, `status`) are the
236    /// first real callers. Keeping the accessor here documents the
237    /// intended visibility boundary rather than forcing later code to
238    /// reach into the field directly.
239    #[allow(dead_code)]
240    pub(crate) fn workspaces(&self) -> &RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
241        &self.workspaces
242    }
243
244    /// Access to the admission mutex (internal). See
245    /// [`Self::workspaces`] for the `#[allow(dead_code)]` rationale.
246    #[allow(dead_code)]
247    pub(crate) fn admission(&self) -> &Mutex<AdmissionState> {
248        &self.admission
249    }
250
251    /// Look up a loaded workspace by key without acquiring `rebuild_lane`
252    /// or `admission`.
253    ///
254    /// Returns `Some(Arc<LoadedWorkspace>)` if a workspace is currently
255    /// registered under `key`, or `None` otherwise. The `workspaces`
256    /// read guard is acquired and released inside the call — callers
257    /// never observe it nested with any other lock.
258    ///
259    /// Added for the Task 7 [`crate::rebuild::RebuildDispatcher`] which
260    /// needs a cheap handle on `Arc<LoadedWorkspace>` as a precondition
261    /// before entering the canonical §J.4 ordered sequence
262    /// (`rebuild_lane` → `admission`). This is *not* part of the
263    /// ordered sequence itself — the §J.4 contract only constrains
264    /// paths that hold more than one lock simultaneously. Here, the
265    /// `workspaces` guard is dropped before the caller takes
266    /// `rebuild_lane`, so there is no nesting.
267    #[allow(dead_code)] // Consumed by rebuild.rs once Task 7 `rebuild` module lands.
268    /// Shared lookup: returns the `Arc<LoadedWorkspace>` keyed by
269    /// `key` if present. Used by `RebuildDispatcher::handle_changes`
270    /// (inside the crate) and by external test harnesses (Task 7
271    /// Phase 7b1 `rebuild_runner_gate.rs`) that need to inspect
272    /// workspace-level atomics (`rebuild_in_flight`, `rebuild_cancelled`)
273    /// or the `rebuild_lane` mutex directly.
274    ///
275    /// This is NOT a JSON-RPC surface — the IPC layer should use
276    /// `status()` for point-in-time workspace state. Direct `lookup`
277    /// access bypasses the LRU touch that `status()` performs.
278    pub fn lookup(&self, key: &WorkspaceKey) -> Option<Arc<LoadedWorkspace>> {
279        self.workspaces.read().get(key).cloned()
280    }
281
282    /// Retention reaper: a single pass over `retained_old`.
283    ///
284    /// Removes entries whose `Arc::strong_count` has dropped to 1 —
285    /// meaning the admission map is the last holder. Emits a
286    /// one-shot WARN log line when an entry exceeds
287    /// `rebuild_drain_timeout_ms` without dropping.
288    ///
289    /// **This is the only code path that removes tokens from
290    /// `retained_old`.** Any other code that mutates the retention
291    /// map is a violation of §G.3.
292    pub fn reap_once(&self) {
293        let timeout = Duration::from_millis(self.config.rebuild_drain_timeout_ms);
294        let now = Instant::now();
295        let mut to_log: Vec<OldGraphToken> = Vec::new();
296        {
297            let mut state = self.admission.lock();
298            state.retained_old.retain(|token, entry| {
299                if Arc::strong_count(&entry.graph) == 1 {
300                    false // Last holder: drop entry + Arc together.
301                } else {
302                    if !entry.warned_past_timeout
303                        && now.saturating_duration_since(entry.published_at) > timeout
304                    {
305                        entry.warned_past_timeout = true;
306                        to_log.push(*token);
307                    }
308                    true
309                }
310            });
311        }
312        for token in to_log {
313            warn!(
314                token = %token,
315                drain_timeout_ms = self.config.rebuild_drain_timeout_ms,
316                "sqryd retention reaper: retained old graph still held past drain timeout \
317                 (not an accounting deadline — bytes stay accounted until strong_count == 1)",
318            );
319        }
320    }
321
322    /// Amendment 2 §G.1 two-phase reservation protocol.
323    ///
324    /// ```text
325    /// Phase 1 (workspaces read + admission read):
326    ///     project_total + estimate ≤ limit?  → commit
327    ///     otherwise                          → pick LRU non-pinned
328    ///                                          victims (`for_key` is
329    ///                                          exempt — a workspace
330    ///                                          cannot evict itself)
331    /// Phase 2 (no locks held):
332    ///     for each victim: execute_eviction()
333    /// Phase 3 (admission alone):
334    ///     re-check projected vs limit     → authoritative commit
335    ///     reserved_bytes += estimate     → return RebuildReservation
336    /// ```
337    ///
338    /// Lock order is `workspaces → admission` in Phase 1, nothing in
339    /// Phase 2, `admission` alone in Phase 3. No nesting of
340    /// `rebuild_lane` — Task 7 adds that layer outside this function.
341    ///
342    /// Returns a [`RebuildReservation`] RAII guard on success. On
343    /// `Err`, the admission state is exactly pre-call — either no
344    /// eviction happened (headroom already available) or the
345    /// eviction cleared retained entries but could not fit.
346    pub fn reserve_rebuild(
347        self: &Arc<Self>,
348        for_key: &WorkspaceKey,
349        working_set_estimate: u64,
350    ) -> Result<RebuildReservation, DaemonError> {
351        let limit = self.memory_limit_bytes();
352
353        // --- Phase 1: peek + plan (holds workspaces → admission) ---
354        //
355        // Task 7 Phase 7b1 tightening: reject if the requester has been
356        // evicted or removed between dispatch and reservation. Both the
357        // membership check and the `rebuild_cancelled` read happen under
358        // the Phase-1 `workspaces.read()` so they serialise against
359        // `execute_eviction`'s `workspaces.write()` (which holds across
360        // both `rebuild_cancelled.store(true)` and `workspaces.remove`).
361        //
362        // Post-serialisation snapshot: the reader sees EITHER pre-eviction
363        // state (`Some(ws)` with `cancelled == false`) OR post-eviction
364        // state (`None` OR `cancelled == true`). Keeping both checks is
365        // belt-and-suspenders against any future eviction-protocol change
366        // that could reorder the two mutations.
367        let victims = {
368            let workspaces = self.workspaces.read();
369
370            let Some(requester_ws) = workspaces.get(for_key) else {
371                return Err(DaemonError::WorkspaceEvicted {
372                    root: for_key.index_root.clone(),
373                });
374            };
375            if requester_ws.rebuild_cancelled.load(Ordering::Acquire) {
376                return Err(DaemonError::WorkspaceEvicted {
377                    root: for_key.index_root.clone(),
378                });
379            }
380
381            let state = self.admission.lock();
382            let projected = state
383                .total_committed_bytes()
384                .saturating_add(working_set_estimate);
385            if projected <= limit {
386                Vec::new() // no victim selection needed
387            } else {
388                let need = projected - limit;
389                Self::plan_eviction(&workspaces, &state, need, for_key)
390            }
391            // Both guards drop here — Phase 2 runs with no locks.
392        };
393
394        // --- Phase 2: execute each eviction with no locks held ---
395        for key in &victims {
396            self.execute_eviction(key);
397        }
398
399        // --- Phase 2.5: opportunistic reap ----------------------
400        //
401        // `execute_eviction` moves the evicted workspace's bytes
402        // from `loaded_bytes` into `retained_old`. If no slow query
403        // still holds the evicted `Arc<CodeGraph>`, the retention
404        // reaper's next tick (25 ms) would free those bytes — but
405        // Phase 3's authoritative re-check runs *now*, before the
406        // reaper gets the chance. Run a synchronous reap pass so
407        // admission sees the free bytes immediately on the common
408        // case of "no outstanding slow queries". Slow-query-held
409        // entries stay retained and still count against the budget,
410        // which is correct per §G.5.
411        if !victims.is_empty() {
412            self.reap_once();
413        }
414
415        // --- Phase 3: authoritative commit (admission alone) ------
416        let mut state = self.admission.lock();
417        let projected = state
418            .total_committed_bytes()
419            .saturating_add(working_set_estimate);
420        if projected > limit {
421            return Err(DaemonError::MemoryBudgetExceeded {
422                limit_bytes: limit,
423                current_bytes: state.loaded_bytes,
424                reserved_bytes: state.reserved_bytes,
425                retained_bytes: state.retained_total_bytes(),
426                requested_bytes: working_set_estimate,
427            });
428        }
429        state.reserved_bytes = state.reserved_bytes.saturating_add(working_set_estimate);
430        self.bump_high_water(&state);
431        drop(state);
432
433        Ok(RebuildReservation {
434            manager: Arc::downgrade(self),
435            bytes: working_set_estimate,
436            released: false,
437        })
438    }
439
440    /// Phase-1 helper: pick the LRU-ordered set of non-pinned
441    /// workspace keys (excluding `for_key`) whose cumulative
442    /// `memory_bytes` meets or exceeds `need`.
443    ///
444    /// Returns keys in eviction order (oldest-first). Callers execute
445    /// evictions in Phase 2 without holding any lock.
446    fn plan_eviction(
447        workspaces: &HashMap<WorkspaceKey, Arc<LoadedWorkspace>>,
448        _state: &AdmissionState,
449        need: u64,
450        for_key: &WorkspaceKey,
451    ) -> Vec<WorkspaceKey> {
452        let mut candidates: Vec<(Instant, u64, WorkspaceKey)> = workspaces
453            .iter()
454            .filter(|(k, ws)| {
455                // Skip the requester (§G.7: a pinned workspace that
456                // exceeds the budget must fail, not evict itself) and
457                // every pinned workspace. Also skip workspaces in
458                // Evicted or Unloaded state — they have no bytes to
459                // reclaim and would be no-ops.
460                **k != *for_key
461                    && !ws.pinned
462                    && ws.load_state() != WorkspaceState::Evicted
463                    && ws.load_state() != WorkspaceState::Unloaded
464            })
465            .map(|(k, ws)| {
466                let last = *ws.last_accessed.read();
467                let bytes = ws.memory_bytes.load(Ordering::Acquire) as u64;
468                (last, bytes, k.clone())
469            })
470            .collect();
471        // Oldest last_accessed first.
472        candidates.sort_by_key(|(ts, _, _)| *ts);
473
474        let mut plan = Vec::new();
475        let mut reclaimed: u64 = 0;
476        for (_, bytes, key) in candidates {
477            if reclaimed >= need {
478                break;
479            }
480            plan.push(key);
481            reclaimed = reclaimed.saturating_add(bytes);
482        }
483        plan
484    }
485
486    /// Execute Phase-2 of an eviction.
487    ///
488    /// Steps, in order:
489    ///
490    /// 1. Swap the workspace's `ArcSwap<CodeGraph>` to an empty
491    ///    placeholder. This releases the old `Arc` from the
492    ///    `ArcSwap` itself — any outstanding slow-query `Arc`s
493    ///    still exist at the same strong count.
494    /// 2. Move those bytes from `loaded_bytes` into `retained_old`
495    ///    (under the admission mutex) — keying on a fresh
496    ///    [`OldGraphToken`]. This preserves the §G.5 invariant:
497    ///    bytes shift from the loaded tier to the retained tier
498    ///    rather than disappearing. The retention reaper frees the
499    ///    entry (and therefore the bytes) when `strong_count` drops
500    ///    to 1, i.e. when every slow query has released its `Arc`.
501    /// 3. Set `rebuild_cancelled = true` so any concurrent
502    ///    `get_or_load` / rebuild running against this workspace
503    ///    observes the signal at its next pass boundary and aborts
504    ///    without publishing.
505    /// 4. Mark the state `Evicted`.
506    /// 5. Remove the workspace from the manager map.
507    ///
508    /// The order is load-bearing: the cancellation flag is set
509    /// *before* the map removal so a concurrent loader that
510    /// re-checks `rebuild_cancelled` after its build (per
511    /// [`Self::get_or_load`]) sees the cancel.
512    ///
513    /// Codex Task 6 Phase 6b iter-1 MAJOR: the pre-fix version
514    /// dropped the evicted `Arc` at function end and subtracted
515    /// bytes from `loaded_bytes` without inserting a retained
516    /// entry — leaking accounting for any graph still held by a
517    /// slow query.
518    fn execute_eviction(&self, key: &WorkspaceKey) {
519        // Hold `workspaces.write()` across the ENTIRE eviction —
520        // from the initial lookup through the final `remove` — so
521        // no concurrent `get_or_load` post-build re-check can
522        // interleave with us. Loaders serialize against eviction
523        // by holding `workspaces.read()` across their own publish
524        // critical section (see `get_or_load` step 7+).
525        //
526        // Lock order is `workspaces → admission` per plan §J.4.
527        // We take `admission` INSIDE this write-lock in Step 2,
528        // which is the outermost-first order the contract
529        // requires.
530        //
531        // Codex Task 6 Phase 6b iter-2 MAJOR: the iter-1 version
532        // took `workspaces.read()` only briefly for the initial
533        // lookup, then dropped it — leaving a window where a
534        // concurrent load's post-build re-check could observe
535        // workspace-still-in-map / cancelled-still-false and then
536        // publish into an already-evicted workspace. Holding
537        // `workspaces.write()` across the full eviction closes
538        // that window.
539        let mut workspaces = self.workspaces.write();
540        let Some(ws) = workspaces.get(key).cloned() else {
541            return; // already unloaded / never loaded
542        };
543
544        // Step 1 — swap the ArcSwap to an empty placeholder. This
545        // drops one ArcSwap-held strong reference on the old graph.
546        let old_arc = ws.graph.swap(Arc::new(CodeGraph::new()));
547        let prior_bytes_usize = ws.memory_bytes.swap(0, Ordering::AcqRel);
548        let prior_bytes = prior_bytes_usize as u64;
549
550        // Step 2 — move bytes loaded → retained under the admission
551        // mutex. A fresh token keeps this entry distinct from any
552        // retained entries published via `publish_and_retain`.
553        let token = OldGraphToken::new();
554        {
555            let mut state = self.admission.lock();
556            state.loaded_bytes = state.loaded_bytes.saturating_sub(prior_bytes);
557            state.retained_old.insert(
558                token,
559                RetainedEntry {
560                    bytes: prior_bytes,
561                    graph: old_arc,
562                    published_at: Instant::now(),
563                    warned_past_timeout: false,
564                },
565            );
566            self.bump_high_water(&state);
567        }
568
569        // Step 3 — cancellation + state transition (both are per-
570        // workspace atomic stores; safe to interleave with any
571        // concurrent query that already held an `Arc<LoadedWorkspace>`
572        // cloned from the map).
573        ws.rebuild_cancelled.store(true, Ordering::Release);
574        ws.store_state(WorkspaceState::Evicted);
575
576        // Step 4 — remove from the manager map. Still under the
577        // write lock, so no concurrent reader observes the
578        // stale "evicted-but-still-in-map" intermediate state.
579        workspaces.remove(key);
580    }
581
582    /// Load the workspace's graph, building it via `builder` if not
583    /// already present.
584    ///
585    /// Lifecycle gate:
586    ///
587    /// 1. Cache-hit fast path — if the workspace is present AND in
588    ///    [`WorkspaceState::Loaded`], touch + return.
589    /// 2. CAS `Unloaded`/`Evicted`/`Failed` → `Loading`. Exactly one
590    ///    caller wins. If another caller already holds the gate
591    ///    (`Loading`/`Rebuilding`), return an error — Phase 6c /
592    ///    Task 7 will introduce a wait-for-done notify channel.
593    /// 3. The winner arms a [`LoadingGuard`] RAII wrapper that
594    ///    transitions the workspace into [`WorkspaceState::Failed`]
595    ///    on *any* non-success exit (`Err`, early `return`, or
596    ///    panic). This covers the Codex iter-1 MAJOR that a panic
597    ///    from `builder.build()` would leave the workspace stuck
598    ///    in Loading.
599    /// 4. Reserve admission headroom (§G.1 three-phase).
600    /// 5. Build the graph via the injected `builder`.
601    /// 6. Re-check `rebuild_cancelled` + workspace map membership
602    ///    before publishing. If eviction ran during the build, the
603    ///    reservation refunds via RAII and no graph is published.
604    /// 7. Publish via `publish_and_retain`. Disarm the LoadingGuard
605    ///    + record success + touch.
606    /// 8. Release `workspaces_guard`, THEN dispatch the
607    ///    post-publish `SqrydHook`. The hook fires outside every
608    ///    outer manager lock so a hook impl is free to call back
609    ///    into `unload` / `get_or_load` / `set_hook` / `status`
610    ///    without deadlocking against the loader that fired it.
611    ///
612    /// Codex Task 6 Phase 6b iter-1 MAJOR (×2): the pre-fix version
613    /// clobbered a concurrent eviction's `rebuild_cancelled` signal
614    /// and could publish into a workspace already removed from the
615    /// map. The CAS + post-build re-check + LoadingGuard together
616    /// close both holes.
617    ///
618    /// Codex Task 6 Phase 6c iter-2 MAJOR: the pre-fix version
619    /// dispatched the hook from inside `publish_and_retain` while
620    /// the caller still held `workspaces.read()`, giving a hook
621    /// impl that needed `workspaces.write()` (e.g. via `unload`)
622    /// a guaranteed re-entrancy deadlock. Splitting publish and
623    /// hook dispatch into Steps 7 and 8 closes that hole.
624    ///
625    /// # Errors
626    ///
627    /// - [`DaemonError::MemoryBudgetExceeded`] if Phase 3 cannot
628    ///   admit the reservation even after LRU eviction.
629    /// - [`DaemonError::WorkspaceBuildFailed`] surfaced from the
630    ///   builder OR synthesised when a concurrent eviction races
631    ///   the load (`reason = "workspace evicted mid-load"`).
632    pub fn get_or_load(
633        self: &Arc<Self>,
634        key: &WorkspaceKey,
635        builder: &dyn WorkspaceBuilder,
636        working_set_estimate: u64,
637    ) -> Result<Arc<CodeGraph>, DaemonError> {
638        // --- Step 1: cache-hit fast path ------------------------
639        {
640            let workspaces = self.workspaces.read();
641            if let Some(ws) = workspaces.get(key)
642                && ws.load_state() == WorkspaceState::Loaded
643            {
644                ws.touch();
645                return Ok(ws.graph.load_full());
646            }
647        }
648
649        // --- Step 2: take the lifecycle gate via state CAS ------
650        let ws = self.get_or_insert_workspace(key);
651        let allowed = [
652            WorkspaceState::Unloaded.as_u8(),
653            WorkspaceState::Failed.as_u8(),
654            WorkspaceState::Evicted.as_u8(),
655        ];
656        let mut acquired = false;
657        for prior in allowed {
658            if ws
659                .state
660                .compare_exchange(
661                    prior,
662                    WorkspaceState::Loading.as_u8(),
663                    Ordering::AcqRel,
664                    Ordering::Acquire,
665                )
666                .is_ok()
667            {
668                acquired = true;
669                break;
670            }
671        }
672        if !acquired {
673            // Someone else already holds the gate (Loading /
674            // Rebuilding) OR raced us into Loaded. Cache-read and
675            // return if Loaded, else surface a transient error.
676            let current = ws.load_state();
677            if current == WorkspaceState::Loaded {
678                ws.touch();
679                return Ok(ws.graph.load_full());
680            }
681            return Err(DaemonError::WorkspaceBuildFailed {
682                root: key.index_root.clone(),
683                reason: format!("workspace load already in progress ({current})"),
684            });
685        }
686        // We own the gate. Clear the cancellation flag AFTER the
687        // CAS: at this point no prior evict can have raced in
688        // front of us because evict takes a shared Arc<Workspace>
689        // from the map and sets cancelled *before* removing. If
690        // cancelled is true here, evict ran on the prior state and
691        // we should honour it.
692        let pre_cancelled = ws.rebuild_cancelled.swap(false, Ordering::AcqRel);
693        if pre_cancelled {
694            // Evict raced us out of the allowed-state list. Put
695            // the cancelled flag back, transition to Failed (so
696            // this caller's LoadingGuard doesn't fire), and fail.
697            ws.rebuild_cancelled.store(true, Ordering::Release);
698            ws.store_state(WorkspaceState::Failed);
699            return Err(DaemonError::WorkspaceBuildFailed {
700                root: key.index_root.clone(),
701                reason: "workspace evicted mid-load".to_string(),
702            });
703        }
704
705        // --- Step 3: arm LoadingGuard for panic / early-return --
706        let mut loading = LoadingGuard {
707            ws: &ws,
708            key,
709            armed: true,
710        };
711
712        // --- Step 4: reserve admission headroom ------------------
713        let reservation = self.reserve_rebuild(key, working_set_estimate)?;
714
715        // --- Step 5: build the graph ----------------------------
716        let graph = match builder.build(&key.index_root) {
717            Ok(g) => g,
718            Err(err) => {
719                drop(reservation);
720                // The LoadingGuard will flip us to Failed + record
721                // a synthetic error; overwrite with the builder's
722                // real error for diagnostic fidelity.
723                ws.record_failure(clone_err(&err));
724                loading.armed = false;
725                ws.store_state(WorkspaceState::Failed);
726                return Err(err);
727            }
728        };
729
730        // --- Step 6+7: atomic re-check + publish -------------
731        //
732        // Hold `workspaces.read()` across the final cancellation
733        // / map-membership re-check AND the `publish_and_retain`
734        // call. `execute_eviction` holds `workspaces.write()` for
735        // the duration of every eviction, so the RwLock makes the
736        // publish critical section atomic with respect to
737        // eviction: either eviction has fully completed (the map
738        // lookup fails), or eviction has not started (and cannot
739        // start while we hold the read lock).
740        //
741        // Lock order per plan §J.4: `workspaces → admission`.
742        // `publish_and_retain` takes `admission` internally;
743        // that nests under our `workspaces.read()` correctly.
744        //
745        // Codex Task 6 Phase 6b iter-2 MAJOR: the iter-1 version
746        // released `workspaces.read()` after the map-membership
747        // check and then called `publish_and_retain` unlocked.
748        // Eviction could slip in between the two, satisfying
749        // both re-checks yet still reaching `remove(key)` after
750        // our publish. Holding the read lock across the publish
751        // closes the window.
752        let workspaces_guard = self.workspaces.read();
753
754        // Cancellation check INSIDE the read lock. If cancellation
755        // was set before we grabbed the lock, we still observe it;
756        // if it's set after we release, a future load will see it.
757        if ws.rebuild_cancelled.load(Ordering::Acquire) {
758            drop(workspaces_guard);
759            drop(reservation);
760            ws.record_failure(DaemonError::WorkspaceBuildFailed {
761                root: key.index_root.clone(),
762                reason: "workspace evicted mid-load".to_string(),
763            });
764            loading.armed = false;
765            ws.store_state(WorkspaceState::Failed);
766            return Err(DaemonError::WorkspaceBuildFailed {
767                root: key.index_root.clone(),
768                reason: "workspace evicted mid-load".to_string(),
769            });
770        }
771        if !workspaces_guard.contains_key(key) {
772            drop(workspaces_guard);
773            drop(reservation);
774            ws.record_failure(DaemonError::WorkspaceBuildFailed {
775                root: key.index_root.clone(),
776                reason: "workspace removed mid-load".to_string(),
777            });
778            loading.armed = false;
779            ws.store_state(WorkspaceState::Failed);
780            return Err(DaemonError::WorkspaceBuildFailed {
781                root: key.index_root.clone(),
782                reason: "workspace removed mid-load".to_string(),
783            });
784        }
785
786        // Publish while still holding `workspaces.read()`. An
787        // eviction started in parallel is blocked on
788        // `workspaces.write()` and cannot observe / mutate this
789        // workspace until we release.
790        //
791        // Per Codex Task 6 Phase 6c iter-2 MAJOR: the hook dispatch
792        // is deliberately NOT performed inside `publish_and_retain`
793        // — firing it here would nest `self.hook.read()` under
794        // `workspaces.read()`, creating a re-entrancy deadlock for
795        // any hook impl that calls back into manager methods
796        // needing `workspaces.write()` (e.g. `unload`). The fix
797        // returns the published `Arc<CodeGraph>` from
798        // `publish_and_retain`, releases `workspaces_guard`, and
799        // THEN invokes `on_publish` under a disjoint short-lived
800        // `self.hook.read()` acquisition.
801        let (_token, published_arc) = self.publish_and_retain(reservation, &ws, graph);
802        ws.record_success(std::time::SystemTime::now());
803        ws.store_state(WorkspaceState::Loaded);
804        ws.touch();
805        loading.armed = false;
806        drop(workspaces_guard);
807
808        // Hook fires OUTSIDE every outer lock. The only lock taken
809        // here is `self.hook.read()` (for the brief clone inside
810        // `hook_snapshot`). A hook impl is now free to call any
811        // manager method — including `unload`, which needs
812        // `workspaces.write()` — without deadlocking against the
813        // loader that fired it. The dispatch itself is synchronous
814        // but spawn-only: hook impls are expected to return
815        // immediately after scheduling background work.
816        let hook = self.hook_snapshot();
817        hook.on_publish(&key.index_root, Arc::clone(&published_arc));
818
819        Ok(published_arc)
820    }
821
822    /// Look up or insert a [`LoadedWorkspace`] for `key`. Returns
823    /// the shared `Arc` so both the caller and the manager map
824    /// reference the same state.
825    fn get_or_insert_workspace(&self, key: &WorkspaceKey) -> Arc<LoadedWorkspace> {
826        // Upgrade path — try a read first to avoid the write-lock
827        // cost when the entry already exists.
828        if let Some(ws) = self.workspaces.read().get(key) {
829            return Arc::clone(ws);
830        }
831        let mut workspaces = self.workspaces.write();
832        Arc::clone(
833            workspaces
834                .entry(key.clone())
835                .or_insert_with(|| Arc::new(LoadedWorkspace::new(key.clone(), false))),
836        )
837    }
838
839    /// Evict the least-recently-accessed non-pinned workspace, if
840    /// any. Returns the evicted key on success, `None` if there are
841    /// no eligible candidates.
842    pub fn evict_lru(&self) -> Option<WorkspaceKey> {
843        let candidate = {
844            let workspaces = self.workspaces.read();
845            workspaces
846                .iter()
847                .filter(|(_, ws)| {
848                    !ws.pinned
849                        && ws.load_state() != WorkspaceState::Evicted
850                        && ws.load_state() != WorkspaceState::Unloaded
851                })
852                .min_by_key(|(_, ws)| *ws.last_accessed.read())
853                .map(|(k, _)| k.clone())
854        };
855        if let Some(key) = &candidate {
856            self.execute_eviction(key);
857        }
858        candidate
859    }
860
861    /// Explicitly unload a workspace. Equivalent to
862    /// [`Self::execute_eviction`] but callable by the IPC
863    /// `daemon/unload` method and `sqry daemon unload <path>` CLI.
864    /// Returns `true` if the workspace was present, `false` if it
865    /// was already absent.
866    pub fn unload(&self, key: &WorkspaceKey) -> bool {
867        let present = self.workspaces.read().contains_key(key);
868        if present {
869            self.execute_eviction(key);
870        }
871        present
872    }
873
874    /// Find a loaded workspace by its directory path.
875    ///
876    /// Linear scan over all registered workspaces comparing each workspace's
877    /// `index_root` against `path`. Callers (e.g. `daemon/rebuild`) supply a
878    /// canonicalised path but not the full [`WorkspaceKey`].
879    /// O(n) in the number of loaded workspaces; in practice n is small.
880    ///
881    /// Returns `None` if no workspace with a matching root is found.
882    #[must_use]
883    pub fn find_key_and_workspace_by_path(
884        &self,
885        path: &std::path::Path,
886    ) -> Option<(WorkspaceKey, Arc<LoadedWorkspace>)> {
887        let workspaces = self.workspaces.read();
888        workspaces
889            .iter()
890            .find(|(k, _)| k.index_root == path)
891            .map(|(k, ws)| (k.clone(), Arc::clone(ws)))
892    }
893
894    /// Snapshot of daemon-wide status. Point-in-time, non-transactional.
895    pub fn status(&self) -> DaemonStatus {
896        let workspaces_snapshot: Vec<WorkspaceStatus> = {
897            let workspaces = self.workspaces.read();
898            let mut entries: Vec<_> = workspaces
899                .iter()
900                .map(|(k, ws)| WorkspaceStatus {
901                    index_root: k.index_root.clone(),
902                    state: ws.load_state(),
903                    pinned: ws.pinned,
904                    current_bytes: ws.memory_bytes.load(Ordering::Acquire) as u64,
905                    high_water_bytes: ws.memory_high_water_bytes.load(Ordering::Acquire) as u64,
906                    last_good_at: *ws.last_good_at.read(),
907                    last_error: ws.last_error.read().as_ref().map(|e| e.to_string()),
908                    retry_count: ws.retry_count.load(Ordering::Acquire),
909                })
910                .collect();
911            entries.sort_by(|a, b| a.index_root.cmp(&b.index_root));
912            entries
913        };
914
915        let (current_bytes, reserved_bytes, high_water_bytes) = {
916            let state = self.admission.lock();
917            let current = state.total_committed_bytes();
918            let reserved = state.reserved_bytes;
919            // Bump high-water here in case the status read saw a
920            // higher value than the last mutation captured. The
921            // `drop(state)` at the end of this block keeps the
922            // admission lock held across the `fetch_max` — serialising
923            // the high-water update with any concurrent publish.
924            let peak = self
925                .total_memory_high_water
926                .fetch_max(current, Ordering::AcqRel);
927            let peak = peak.max(current);
928            drop(state);
929            (current, reserved, peak)
930        };
931
932        DaemonStatus {
933            uptime_seconds: self.started_at.elapsed().as_secs(),
934            daemon_version: env!("CARGO_PKG_VERSION").to_string(),
935            memory: MemoryStatus {
936                limit_bytes: self.memory_limit_bytes(),
937                current_bytes,
938                reserved_bytes,
939                high_water_bytes,
940            },
941            workspaces: workspaces_snapshot,
942        }
943    }
944
945    /// Bump the daemon-wide high-water mark using the current
946    /// `AdmissionState`. Must be called with `admission` held.
947    fn bump_high_water(&self, state: &AdmissionState) {
948        let current = state.total_committed_bytes();
949        self.total_memory_high_water
950            .fetch_max(current, Ordering::AcqRel);
951    }
952
953    /// Test-only helper: insert a `LoadedWorkspace` into the manager
954    /// map in a specific state, bypassing `get_or_load`. Used by
955    /// `classify_for_serve` integration tests that need to observe
956    /// the `Unloaded` / `Loading` arms (both states are transient
957    /// during the normal load path).
958    ///
959    /// `#[doc(hidden)]` to signal "test affordance only" — same
960    /// pattern as [`crate::TestGate`] / [`crate::TestCapture`].
961    /// Production code should not call this.
962    #[doc(hidden)]
963    pub fn insert_workspace_in_state_for_test(&self, key: WorkspaceKey, state: WorkspaceState) {
964        let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
965        ws.store_state(state);
966        self.workspaces.write().insert(key, ws);
967    }
968
969    /// Acquire the internal `workspaces` RwLock in read mode.
970    ///
971    /// Task 7 Phase 7c: exposed so
972    /// [`crate::RebuildDispatcher::execute_one_rebuild`] can hold the
973    /// read lock across its cancel/membership re-check and
974    /// [`Self::publish_and_retain`], matching the pattern in
975    /// [`Self::get_or_load`] (Codex Task 6 Phase 6b iter-2 MAJOR — the
976    /// publish critical section MUST exclude concurrent
977    /// [`Self::execute_eviction`] on the same key to avoid
978    /// orphaned-publish / admission-drift).
979    ///
980    /// Callers MUST respect lock order §J.4: acquire `workspaces`
981    /// BEFORE `admission`. The returned guard is released when the
982    /// caller drops it.
983    ///
984    /// `pub(crate)` (iter-2 design Codex MAJOR): the accessor is only
985    /// used within the daemon crate; exposing it publicly would leak
986    /// lock mechanics and broaden the blast radius for future callers
987    /// that might violate the §J.4 discipline.
988    pub(crate) fn workspaces_read(
989        &self,
990    ) -> parking_lot::RwLockReadGuard<'_, HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
991        self.workspaces.read()
992    }
993
994    /// Classify a workspace's readiness to serve a query.
995    ///
996    /// Task 7 Phase 7c. Used by the Task 8 IPC router on every query
997    /// dispatch. Pure-read: no mutations, no `.await` (sync).
998    ///
999    /// # Returns
1000    ///
1001    /// | Workspace state | Map present | Result |
1002    /// |-----------------|-------------|--------|
1003    /// | `Loaded` or `Rebuilding` | yes | `Ok(ServeVerdict::Fresh { graph, state })` |
1004    /// | `Failed`, age < cap (or cap == 0) | yes | `Ok(ServeVerdict::Stale { graph, age_hours, last_good_at, last_error })` |
1005    /// | `Failed`, age >= cap | yes | `Err(WorkspaceStaleExpired { age_hours, cap_hours, last_good_at, last_error })` (→ JSON-RPC -32002) |
1006    /// | `Failed`, no prior good | yes | `Err(WorkspaceBuildFailed { reason })` (→ -32001) |
1007    /// | `Unloaded` or `Loading` | yes | `Ok(ServeVerdict::NotReady { state })` |
1008    /// | `Evicted` | yes (transient window) | `Err(WorkspaceEvicted)` (→ -32004) |
1009    /// | any | no | `Err(WorkspaceEvicted)` (→ -32004) |
1010    ///
1011    /// # Lock order
1012    ///
1013    /// Task 7 Phase 7c feat iter-1 Codex BLOCKER fix: takes
1014    /// `workspaces.read()` across the FULL snapshot — state, graph,
1015    /// last_good, and last_error_text are all captured inside the
1016    /// read critical section. Dropping the read lock before reading
1017    /// the graph would allow `execute_eviction` (which needs
1018    /// `workspaces.write()` for the full graph-swap + state-store +
1019    /// map-remove sequence) to interleave, surfacing the empty
1020    /// post-eviction placeholder graph as a `Fresh` verdict.
1021    ///
1022    /// Does not acquire `admission` or `rebuild_lane`; only
1023    /// `workspaces` + per-workspace field locks. §J.4 order preserved.
1024    ///
1025    /// # Errors
1026    ///
1027    /// Returns the variants listed in the table above.
1028    pub fn classify_for_serve(
1029        &self,
1030        key: &WorkspaceKey,
1031        now: std::time::SystemTime,
1032    ) -> Result<ServeVerdict, DaemonError> {
1033        // Task 7 Phase 7c — feat iter-0 Codex BLOCKER fix: the
1034        // previous iter-0 implementation cloned the workspace Arc and
1035        // dropped `workspaces.read()` BEFORE reading state and graph.
1036        // `execute_eviction` (see Self::execute_eviction at line 494)
1037        // holds `workspaces.write()` across:
1038        //   - ws.graph.swap(CodeGraph::new())
1039        //   - admission accounting transfer
1040        //   - ws.rebuild_cancelled.store(true)
1041        //   - ws.store_state(WorkspaceState::Evicted)
1042        //   - workspaces.remove(key)
1043        //
1044        // Without the read-lock hold extending across graph capture,
1045        // a classifier could observe `state == Loaded` but fetch the
1046        // post-eviction empty placeholder graph, returning
1047        // `Fresh { graph: empty }` — a correctness bug.
1048        //
1049        // Iter-1: snapshot every field under the read lock. The
1050        // returned `Arc<CodeGraph>` is a strong reference independent
1051        // of the lock lifetime; dropping the lock after capture is
1052        // safe for the caller.
1053        //
1054        // `last_error` is captured as a display-string (the error
1055        // type is not Clone; see `clone_err` rationale) because
1056        // `NoPriorGood` returns a `WorkspaceBuildFailed { reason }`
1057        // that embeds the stringified prior error.
1058        let snapshot = {
1059            let workspaces = self.workspaces.read();
1060            let Some(ws) = workspaces.get(key).cloned() else {
1061                return Err(DaemonError::WorkspaceEvicted {
1062                    root: key.index_root.clone(),
1063                });
1064            };
1065            let state = ws.load_state();
1066            let graph = ws.graph.load_full();
1067            let last_good = *ws.last_good_at.read();
1068            let last_error_text = ws.last_error.read().as_ref().map(|e| e.to_string());
1069            (state, graph, last_good, last_error_text)
1070            // workspaces.read() dropped here — the (state, graph)
1071            // pair is now a coherent snapshot taken atomically w.r.t.
1072            // execute_eviction's workspaces.write().
1073        };
1074        let (state, graph, last_good, last_error_text) = snapshot;
1075
1076        match state {
1077            WorkspaceState::Loaded | WorkspaceState::Rebuilding => {
1078                Ok(ServeVerdict::Fresh { graph, state })
1079            }
1080            WorkspaceState::Failed => {
1081                let cap = self.config.stale_serve_max_age_hours;
1082                match classify_staleness(last_good, cap, now) {
1083                    StalenessVerdict::NoPriorGood => Err(DaemonError::WorkspaceBuildFailed {
1084                        root: key.index_root.clone(),
1085                        reason: last_error_text
1086                            .unwrap_or_else(|| "no prior successful build".into()),
1087                    }),
1088                    StalenessVerdict::Stale { age_hours } => Ok(ServeVerdict::Stale {
1089                        graph,
1090                        age_hours,
1091                        // Invariant: `classify_staleness` only returns
1092                        // `Stale` when `last_good.is_some()` (see
1093                        // `workspace/staleness.rs:54-73`).
1094                        last_good_at: last_good
1095                            .expect("Stale verdict only emitted when last_good.is_some()"),
1096                        last_error: last_error_text,
1097                    }),
1098                    StalenessVerdict::Expired { age_hours } => {
1099                        Err(DaemonError::WorkspaceStaleExpired {
1100                            root: key.index_root.clone(),
1101                            age_hours,
1102                            cap_hours: cap,
1103                            last_good_at: last_good,
1104                            last_error: last_error_text,
1105                        })
1106                    }
1107                }
1108            }
1109            WorkspaceState::Unloaded | WorkspaceState::Loading => {
1110                Ok(ServeVerdict::NotReady { state })
1111            }
1112            // Transient window between store_state(Evicted) and
1113            // workspaces.remove; same semantics as map-absent.
1114            WorkspaceState::Evicted => Err(DaemonError::WorkspaceEvicted {
1115                root: key.index_root.clone(),
1116            }),
1117        }
1118    }
1119
1120    /// Consume a [`RebuildReservation`] plus a freshly-built
1121    /// [`CodeGraph`] and atomically publish it to the workspace.
1122    ///
1123    /// Implements Amendment 2 §G.2:
1124    ///
1125    /// - Captures the prior `Arc<CodeGraph>` and `memory_bytes` into
1126    ///   a [`RollbackGuard`] **before** any swap — so a panic at any
1127    ///   point before the admission update reverts cleanly.
1128    /// - Swaps the `ArcSwap<CodeGraph>` to the new graph.
1129    /// - Swaps the per-workspace `memory_bytes` to the new size.
1130    /// - Under the admission mutex: moves `bytes_delta` from
1131    ///   `reserved_bytes` into `loaded_bytes`, inserts a
1132    ///   [`RetainedEntry`] holding the old `Arc` until the retention
1133    ///   reaper frees it.
1134    /// - Disarms the [`RollbackGuard`] on success.
1135    ///
1136    /// Sync `fn`. There is no `.await` between the first swap and the
1137    /// admission insert — tokio task cancellation can only interrupt
1138    /// at `.await` points, so this sequence is atomic with respect
1139    /// to cancellation per §G.2.
1140    ///
1141    /// Returns the minted [`OldGraphToken`] for tracing / integration
1142    /// tests, together with an `Arc<CodeGraph>` handle to the freshly
1143    /// published graph. Per Codex Task 6 Phase 6c iter-2 MAJOR the
1144    /// post-publish `SqrydHook` dispatch is NOT performed here —
1145    /// firing `on_publish` under the `workspaces.read()` guard
1146    /// `get_or_load` holds across this call would nest
1147    /// `self.hook.read()` inside `workspaces`, giving hook impls a
1148    /// re-entrancy deadlock hole if they call back into manager
1149    /// methods needing `workspaces.write()`. The caller is
1150    /// responsible for dispatching the hook after dropping every
1151    /// outer workspaces-lock holder.
1152    pub fn publish_and_retain(
1153        self: &Arc<Self>,
1154        reservation: RebuildReservation,
1155        workspace: &LoadedWorkspace,
1156        new_graph: CodeGraph,
1157    ) -> (OldGraphToken, Arc<CodeGraph>) {
1158        // Compute the new graph's heap bytes before handing it to the
1159        // ArcSwap — once published, a concurrent reader holds it
1160        // alive, and measuring after publish race-races with the
1161        // admission update.
1162        let new_bytes_usize = new_graph.heap_bytes();
1163        // `usize as u64` is a no-op on 64-bit and a widen on 32-bit.
1164        let new_bytes = new_bytes_usize as u64;
1165
1166        // Take the reservation by value so this function owns it and
1167        // the Drop impl fires on any unwind path. `released` stays
1168        // `false` until *after* the admission commit succeeds, so a
1169        // panic before or during the admission mutex section refunds
1170        // `reserved_bytes` back to the pool (Codex Task 6 Phase 6a
1171        // iter-1 MAJOR: the previous ordering disarmed before the
1172        // commit and could leak reserved bytes on unwind).
1173        let mut reservation = reservation;
1174        let reservation_bytes = reservation.bytes;
1175
1176        let new_arc = Arc::new(new_graph);
1177        // Clone the Arc BEFORE the swap so the caller can still
1178        // obtain a handle to the published graph after the swap
1179        // moves `new_arc` into the ArcSwap. Re-reading via
1180        // `workspace.graph.load_full()` after the swap would work
1181        // today but is racy against any future swap path that
1182        // could run between the swap and the load — cheaper and
1183        // safer to clone the Arc once.
1184        let published_arc = Arc::clone(&new_arc);
1185        let token = OldGraphToken::new();
1186
1187        // --- RollbackGuard setup --------------------------------
1188        let prior_arc_for_rollback = workspace.graph.load_full();
1189        let prior_bytes = workspace
1190            .memory_bytes
1191            .load(std::sync::atomic::Ordering::Acquire);
1192
1193        let mut rollback = RollbackGuard {
1194            ws: workspace,
1195            prior_arc: Some(prior_arc_for_rollback),
1196            prior_bytes,
1197            armed: true,
1198        };
1199
1200        // --- Non-recoverable zone (no .await; no fallible ops) ---
1201        //
1202        // If any code between this point and `reservation.released = true`
1203        // panics, the following Drop order runs on unwind:
1204        //   1. `rollback` Drop reverts `workspace.graph` and
1205        //      `workspace.memory_bytes` to the pre-swap values
1206        //      (because `armed == true`).
1207        //   2. `reservation` Drop reacquires the admission mutex and
1208        //      refunds `reservation_bytes` back to `reserved_bytes`
1209        //      (because `released == false`).
1210        // This is the §G.5 invariant-preserving rollback described in
1211        // the plan; the reservation refund was missing before the
1212        // iter-1 fix.
1213        let old_arc = workspace.graph.swap(new_arc);
1214        let prev_memory_bytes = workspace.update_memory(new_bytes_usize);
1215        debug_assert_eq!(
1216            prev_memory_bytes, prior_bytes,
1217            "RollbackGuard prior_bytes must match update_memory's returned prior",
1218        );
1219
1220        // --- Admission commit (mutex-only; no other locks) -------
1221        //
1222        // The critical section is ordered so the only *fallible* op —
1223        // `HashMap::insert`, which can allocate on grow and therefore
1224        // panic — runs FIRST, before any admission counter is mutated
1225        // and before the reservation is disarmed. Everything that
1226        // follows (`saturating_*` arithmetic + `reservation.released
1227        // = true`) is guaranteed infallible, so once we reach those
1228        // lines the critical section cannot unwind mid-way and leave
1229        // admission state inconsistent.
1230        //
1231        // Codex Task 6 Phase 6a iter-2 MAJOR: the iter-1 ordering
1232        // disarmed the reservation before `retained_old.insert`
1233        // completed. A panic from the insert would leave
1234        // `reserved_bytes` drained and `loaded_bytes` updated while
1235        // no retained entry existed — rollback reverts ws.graph +
1236        // ws.memory_bytes but cannot refund the reservation
1237        // (released=true). The fix moves insert to the front of the
1238        // section so any unwind preserves the §G.5 invariant.
1239        //
1240        // Pre-build the `RetainedEntry` outside the lock so only the
1241        // `HashMap::insert` itself can allocate; the struct
1242        // construction is a field-by-field move.
1243        let retained_entry = RetainedEntry {
1244            bytes: prev_memory_bytes as u64,
1245            graph: old_arc,
1246            published_at: Instant::now(),
1247            warned_past_timeout: false,
1248        };
1249
1250        let mut state = self.admission.lock();
1251
1252        // Step 1 — fallible. `HashMap::insert` may reallocate; if it
1253        // panics the state is left unchanged (hashbrown's insert is
1254        // exception-safe: a failed grow leaves the map in its prior
1255        // capacity and does not insert the new entry). Unwind drops
1256        // `state` (releasing the mutex), then `rollback` reverts
1257        // ws.graph + ws.memory_bytes, then the `reservation`
1258        // (released=false) refunds `reservation_bytes` from
1259        // `reserved_bytes`. `loaded_bytes` is not mutated because
1260        // the lines below never run.
1261        state.retained_old.insert(token, retained_entry);
1262
1263        // Step 2 — infallible arithmetic (saturating ops on u64).
1264        // Move reservation → loaded. The prior workspace bytes are
1265        // already counted in `loaded_bytes` (they were added the
1266        // last time this workspace published). Swap by subtracting
1267        // the old and adding the new — keeps the §G.5 invariant
1268        // monotonic w.r.t. the commit.
1269        state.reserved_bytes = state.reserved_bytes.saturating_sub(reservation_bytes);
1270        state.loaded_bytes = state
1271            .loaded_bytes
1272            .saturating_sub(prev_memory_bytes as u64)
1273            .saturating_add(new_bytes);
1274
1275        // Step 3 — infallible disarm. The admission commit is
1276        // complete; the reservation's Drop is now a no-op so it
1277        // does not double-refund.
1278        reservation.released = true;
1279        self.bump_high_water(&state);
1280        drop(state);
1281
1282        rollback.armed = false; // disarm on success
1283
1284        // NOTE: `SqrydHook::on_publish` is NOT dispatched here.
1285        // `get_or_load` holds `workspaces.read()` across this call
1286        // (to make the re-check + publish critical section atomic
1287        // with respect to eviction, see that function's Step 6+7
1288        // comment block). Firing the hook here would acquire
1289        // `self.hook.read()` nested under `workspaces`, giving a
1290        // hook impl that calls back into manager methods needing
1291        // `workspaces.write()` (e.g. `unload`) a guaranteed
1292        // deadlock. The caller dispatches the hook after dropping
1293        // `workspaces_guard` — see `get_or_load` post-publish.
1294        //
1295        // `NoOpHook` remains the default; Task 9's daemon binary
1296        // installs the production `QueryDbHook` that wraps
1297        // `sqry_db::persistence::save_derived` with a timeout.
1298        (token, published_arc)
1299    }
1300
1301    /// Release the reaper handle on Drop. Safe to call from any
1302    /// context — abort is a best-effort signal.
1303    fn shutdown_reaper(&self) {
1304        if let Some(handle) = self.reaper.lock().take() {
1305            handle.abort();
1306        }
1307    }
1308}
1309
1310impl Drop for WorkspaceManager {
1311    fn drop(&mut self) {
1312        self.shutdown_reaper();
1313    }
1314}
1315
1316// ---------------------------------------------------------------------------
1317// LoadingGuard (panic-safety for get_or_load)
1318// ---------------------------------------------------------------------------
1319
1320/// RAII guard that transitions the workspace into
1321/// [`WorkspaceState::Failed`] on any non-success exit from
1322/// [`WorkspaceManager::get_or_load`] — including panics.
1323///
1324/// Codex Task 6 Phase 6b iter-1 MAJOR: without this guard, a panic
1325/// in `builder.build()` would leave the workspace stuck in
1326/// `Loading` with `last_error = None`, permanently blocking
1327/// re-load attempts and corrupting status output.
1328///
1329/// The guard is armed until the final `loaded.armed = false` on
1330/// the success path (after publish succeeds). Every other exit
1331/// path — `Err` from admission, `Err` from builder, panic from
1332/// builder, early returns on the cancellation/map-membership
1333/// re-check — fires `Drop` with `armed == true` and performs the
1334/// Failed-state transition.
1335pub(crate) struct LoadingGuard<'a> {
1336    pub(crate) ws: &'a LoadedWorkspace,
1337    pub(crate) key: &'a WorkspaceKey,
1338    pub(crate) armed: bool,
1339}
1340
1341impl<'a> Drop for LoadingGuard<'a> {
1342    fn drop(&mut self) {
1343        if !self.armed {
1344            return;
1345        }
1346        // Only overwrite `last_error` if it hasn't been populated
1347        // with a more specific diagnostic by the explicit `Err`
1348        // branches above — those set last_error before `armed =
1349        // false`, so seeing None here means we are in the panic
1350        // window or an early-return path that did not record one.
1351        {
1352            let mut slot = self.ws.last_error.write();
1353            if slot.is_none() {
1354                *slot = Some(DaemonError::WorkspaceBuildFailed {
1355                    root: self.key.index_root.clone(),
1356                    reason: "workspace load aborted unexpectedly".to_string(),
1357                });
1358            }
1359        }
1360        self.ws.retry_count.fetch_add(1, Ordering::AcqRel);
1361        self.ws.store_state(WorkspaceState::Failed);
1362    }
1363}
1364
1365/// Clone a [`DaemonError`] for storage on [`LoadedWorkspace::last_error`]
1366/// or for propagation to `handle_changes` error returns in
1367/// [`crate::RebuildDispatcher::execute_one_rebuild`] (Task 7 Phase 7b1).
1368///
1369/// [`DaemonError`] is not `Clone` because some variants wrap
1370/// non-`Clone` types (notably [`std::io::Error`] and
1371/// [`anyhow::Error`]). `last_error` is a diagnostic surface only —
1372/// it is serialised as `e.to_string()` by the status endpoint — so
1373/// reducing the error to a textual form is the right trade-off here.
1374pub(crate) fn clone_err(err: &DaemonError) -> DaemonError {
1375    match err {
1376        DaemonError::WorkspaceBuildFailed { root, reason } => DaemonError::WorkspaceBuildFailed {
1377            root: root.clone(),
1378            reason: reason.clone(),
1379        },
1380        DaemonError::WorkspaceStaleExpired {
1381            root,
1382            age_hours,
1383            cap_hours,
1384            last_good_at,
1385            last_error,
1386        } => DaemonError::WorkspaceStaleExpired {
1387            root: root.clone(),
1388            age_hours: *age_hours,
1389            cap_hours: *cap_hours,
1390            // `SystemTime` is `Copy`; `Option<String>` needs `.clone()`.
1391            last_good_at: *last_good_at,
1392            last_error: last_error.clone(),
1393        },
1394        DaemonError::MemoryBudgetExceeded {
1395            limit_bytes,
1396            current_bytes,
1397            reserved_bytes,
1398            retained_bytes,
1399            requested_bytes,
1400        } => DaemonError::MemoryBudgetExceeded {
1401            limit_bytes: *limit_bytes,
1402            current_bytes: *current_bytes,
1403            reserved_bytes: *reserved_bytes,
1404            retained_bytes: *retained_bytes,
1405            requested_bytes: *requested_bytes,
1406        },
1407        DaemonError::WorkspaceEvicted { root } => {
1408            DaemonError::WorkspaceEvicted { root: root.clone() }
1409        }
1410        DaemonError::WorkspaceNotLoaded { root } => {
1411            DaemonError::WorkspaceNotLoaded { root: root.clone() }
1412        }
1413        // Task 8 Phase 8c U5 — tool-dispatch variants surfaced by
1414        // `tool_core::classify_and_execute` (Phase 8c U6). Each
1415        // variant must round-trip cleanly so `classify_for_serve`
1416        // reproduces the original typed error on every read path —
1417        // collapsing any of these into `WorkspaceBuildFailed` would
1418        // break the wire-contract codes registered in
1419        // [`crate::lib`] / the design doc §O.
1420        DaemonError::ToolTimeout {
1421            root,
1422            secs,
1423            deadline_ms,
1424        } => DaemonError::ToolTimeout {
1425            root: root.clone(),
1426            secs: *secs,
1427            deadline_ms: *deadline_ms,
1428        },
1429        DaemonError::InvalidArgument { reason } => DaemonError::InvalidArgument {
1430            reason: reason.clone(),
1431        },
1432        DaemonError::Internal(err) => {
1433            // `anyhow::Error` is not `Clone`; re-create it from its
1434            // full-chain `Display` form (`{:#}`) so every layer of
1435            // the causal chain survives the round-trip. Callers only
1436            // read this via `to_string()` on the status endpoint, so
1437            // losing the typed causes (if any) is acceptable.
1438            DaemonError::Internal(anyhow::anyhow!("{err:#}"))
1439        }
1440        // Task 9 U1 — lifecycle variants (AlreadyRunning, AutoStartTimeout,
1441        // SignalSetup). These errors all fire before IpcServer::bind and
1442        // therefore before any workspace is registered; they should never
1443        // reach `clone_err`. If they somehow do (e.g. a future code path
1444        // stores them in `last_error`), collapse to WorkspaceBuildFailed so
1445        // the clone contract is preserved without losing observability.
1446        DaemonError::AlreadyRunning { socket, lock, .. } => DaemonError::WorkspaceBuildFailed {
1447            root: Path::new("<unknown>").to_path_buf(),
1448            reason: format!(
1449                "daemon already running on socket {} (lock: {})",
1450                socket.display(),
1451                lock.display()
1452            ),
1453        },
1454        DaemonError::AutoStartTimeout {
1455            timeout_secs,
1456            socket,
1457        } => DaemonError::WorkspaceBuildFailed {
1458            root: Path::new("<unknown>").to_path_buf(),
1459            reason: format!(
1460                "daemon did not become ready within {timeout_secs}s on socket {}",
1461                socket.display()
1462            ),
1463        },
1464        DaemonError::SignalSetup { source } => DaemonError::WorkspaceBuildFailed {
1465            root: Path::new("<unknown>").to_path_buf(),
1466            reason: format!("failed to install signal handlers: {source}"),
1467        },
1468        other @ (DaemonError::Config { .. } | DaemonError::Io(_)) => {
1469            DaemonError::WorkspaceBuildFailed {
1470                root: Path::new("<unknown>").to_path_buf(),
1471                reason: other.to_string(),
1472            }
1473        }
1474    }
1475}
1476
1477// ---------------------------------------------------------------------------
1478// RebuildReservation (RAII)
1479// ---------------------------------------------------------------------------
1480
1481/// RAII guard representing an in-flight rebuild's admission headroom.
1482///
1483/// - On the success path, the guard is consumed by
1484///   [`WorkspaceManager::publish_and_retain`], which sets
1485///   `released = true` before draining `bytes` from `reserved_bytes`.
1486/// - On any other drop path (rebuild panic, cancellation, early
1487///   return on plugin error) the guard's `Drop` releases the reserved
1488///   bytes back to the admission pool. This keeps the §G.5 invariant
1489///   intact across every exit path.
1490///
1491/// The manager pointer is a [`Weak`] so a guard that outlives its
1492/// manager (e.g. the daemon is dropped mid-rebuild) does not try to
1493/// touch freed memory. A `None` upgrade on drop is silently ignored —
1494/// the manager took the retained bytes with it when it dropped.
1495#[must_use = "RebuildReservation must either be consumed by publish_and_retain() \
1496              or intentionally dropped to return its bytes to the admission pool"]
1497pub struct RebuildReservation {
1498    manager: Weak<WorkspaceManager>,
1499    bytes: u64,
1500    released: bool,
1501}
1502
1503impl RebuildReservation {
1504    /// How many bytes this reservation currently holds.
1505    #[must_use]
1506    pub fn bytes(&self) -> u64 {
1507        self.bytes
1508    }
1509}
1510
1511impl std::fmt::Debug for RebuildReservation {
1512    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1513        f.debug_struct("RebuildReservation")
1514            .field("bytes", &self.bytes)
1515            .field("released", &self.released)
1516            .finish()
1517    }
1518}
1519
1520impl Drop for RebuildReservation {
1521    fn drop(&mut self) {
1522        if self.released {
1523            return;
1524        }
1525        if let Some(mgr) = self.manager.upgrade() {
1526            let mut state = mgr.admission.lock();
1527            state.reserved_bytes = state.reserved_bytes.saturating_sub(self.bytes);
1528        }
1529    }
1530}
1531
1532// ---------------------------------------------------------------------------
1533// RollbackGuard (panic-safety for publish_and_retain)
1534// ---------------------------------------------------------------------------
1535
1536/// Panic-safe rollback wrapper used by [`WorkspaceManager::publish_and_retain`].
1537///
1538/// Captures the prior `Arc<CodeGraph>` and the prior `memory_bytes`
1539/// *before* any swap. If the thread unwinds between the swap and the
1540/// admission-mutex acquisition, the guard's `Drop` restores both
1541/// fields — leaving the workspace serving its pre-rebuild graph as if
1542/// the publish never happened.
1543///
1544/// Correctness depends on three contracts:
1545///
1546/// 1. The guard is constructed *before* the `ArcSwap::swap` call.
1547/// 2. `armed` is set to `false` only on the success path, after the
1548///    admission mutex has released.
1549/// 3. No fallible operation (heap allocation failure, etc.) runs
1550///    between the two swaps — otherwise the guard would be asked to
1551///    reverse a partial swap.
1552pub(crate) struct RollbackGuard<'a> {
1553    pub(crate) ws: &'a LoadedWorkspace,
1554    pub(crate) prior_arc: Option<Arc<CodeGraph>>,
1555    pub(crate) prior_bytes: usize,
1556    pub(crate) armed: bool,
1557}
1558
1559impl<'a> Drop for RollbackGuard<'a> {
1560    fn drop(&mut self) {
1561        if !self.armed {
1562            return;
1563        }
1564        if let Some(arc) = self.prior_arc.take() {
1565            self.ws.graph.store(arc);
1566        }
1567        self.ws
1568            .memory_bytes
1569            .store(self.prior_bytes, std::sync::atomic::Ordering::Release);
1570    }
1571}
1572
1573// ---------------------------------------------------------------------------
1574// Retention reaper task
1575// ---------------------------------------------------------------------------
1576
1577/// Long-lived tokio task: polls [`WorkspaceManager::reap_once`] on a
1578/// fixed 25 ms cadence (A2 §G.3).
1579///
1580/// Takes a `Weak<WorkspaceManager>` so a `WorkspaceManager::drop`
1581/// before the task notices the abort signal does not dereference
1582/// freed memory. The first failed `Weak::upgrade` exits the loop
1583/// cleanly.
1584async fn retention_reaper(mgr: Weak<WorkspaceManager>) {
1585    let interval = Duration::from_millis(25);
1586    loop {
1587        tokio::time::sleep(interval).await;
1588        let Some(mgr) = mgr.upgrade() else {
1589            return;
1590        };
1591        mgr.reap_once();
1592    }
1593}
1594
1595// ---------------------------------------------------------------------------
1596// Tests
1597// ---------------------------------------------------------------------------
1598
1599#[cfg(test)]
1600mod tests {
1601    use std::{path::PathBuf, sync::atomic::Ordering};
1602
1603    use sqry_core::project::ProjectRootMode;
1604
1605    use crate::config::DaemonConfig;
1606
1607    use super::{
1608        super::{loaded::LoadedWorkspace, state::WorkspaceKey},
1609        *,
1610    };
1611
1612    fn make_config() -> Arc<DaemonConfig> {
1613        // 1 MiB budget keeps the arithmetic tractable in assertions.
1614        Arc::new(DaemonConfig {
1615            memory_limit_mb: 1,
1616            ..DaemonConfig::default()
1617        })
1618    }
1619
1620    fn make_workspace() -> Arc<LoadedWorkspace> {
1621        Arc::new(LoadedWorkspace::new(
1622            WorkspaceKey::new(
1623                PathBuf::from("/repos/example"),
1624                ProjectRootMode::GitRoot,
1625                0x1,
1626            ),
1627            false,
1628        ))
1629    }
1630
1631    /// Register a workspace under `key` on `mgr` so that
1632    /// `reserve_rebuild` sees it present in its Phase-1
1633    /// `workspaces.read()` scope. Phase 7b1 tightens `reserve_rebuild`
1634    /// to reject unregistered keys with `DaemonError::WorkspaceEvicted`,
1635    /// so every admission-level test that expects a reservation (or a
1636    /// memory-budget rejection) must insert a workspace first.
1637    fn register_workspace(mgr: &WorkspaceManager, key: &WorkspaceKey) {
1638        mgr.workspaces.write().insert(
1639            key.clone(),
1640            Arc::new(LoadedWorkspace::new(key.clone(), false)),
1641        );
1642    }
1643
1644    #[test]
1645    fn reserve_rebuild_succeeds_when_headroom_available() {
1646        let mgr = WorkspaceManager::new_without_reaper(make_config());
1647        let key = WorkspaceKey::new(
1648            PathBuf::from("/repos/example"),
1649            ProjectRootMode::GitRoot,
1650            0x1,
1651        );
1652        register_workspace(&mgr, &key);
1653        let reservation = mgr
1654            .reserve_rebuild(&key, 500_000) // 500 kB into 1 MiB budget
1655            .expect("reservation fits");
1656        assert_eq!(reservation.bytes(), 500_000);
1657        assert_eq!(mgr.admission.lock().reserved_bytes, 500_000);
1658        drop(reservation);
1659        assert_eq!(
1660            mgr.admission.lock().reserved_bytes,
1661            0,
1662            "dropping an unconsumed reservation must return its bytes",
1663        );
1664    }
1665
1666    #[test]
1667    fn reserve_rebuild_rejects_oversized_request() {
1668        let mgr = WorkspaceManager::new_without_reaper(make_config());
1669        let key = WorkspaceKey::new(
1670            PathBuf::from("/repos/example"),
1671            ProjectRootMode::GitRoot,
1672            0x1,
1673        );
1674        register_workspace(&mgr, &key);
1675        let err = mgr.reserve_rebuild(&key, 10 * 1024 * 1024).expect_err(
1676            "a reservation bigger than the budget must be rejected with MemoryBudgetExceeded",
1677        );
1678        match err {
1679            DaemonError::MemoryBudgetExceeded {
1680                limit_bytes,
1681                requested_bytes,
1682                ..
1683            } => {
1684                assert_eq!(limit_bytes, 1024 * 1024);
1685                assert_eq!(requested_bytes, 10 * 1024 * 1024);
1686            }
1687            other => panic!("wrong error variant: {other:?}"),
1688        }
1689        assert_eq!(
1690            mgr.admission.lock().reserved_bytes,
1691            0,
1692            "a rejected reservation must not mutate admission state",
1693        );
1694    }
1695
1696    #[test]
1697    fn reserve_rebuild_rejects_when_running_total_would_exceed_budget() {
1698        let mgr = WorkspaceManager::new_without_reaper(make_config());
1699        let key = WorkspaceKey::new(
1700            PathBuf::from("/repos/example"),
1701            ProjectRootMode::GitRoot,
1702            0x1,
1703        );
1704        register_workspace(&mgr, &key);
1705        let a = mgr.reserve_rebuild(&key, 600_000).expect("first fits");
1706        let err = mgr
1707            .reserve_rebuild(&key, 600_000)
1708            .expect_err("second pushes over 1 MiB budget");
1709        match err {
1710            DaemonError::MemoryBudgetExceeded { reserved_bytes, .. } => {
1711                assert_eq!(reserved_bytes, 600_000, "first reservation still held");
1712            }
1713            other => panic!("wrong error variant: {other:?}"),
1714        }
1715        drop(a);
1716    }
1717
1718    #[test]
1719    fn reserve_rebuild_rejects_unknown_key() {
1720        // Task 7 Phase 7b1: unregistered keys must be rejected with
1721        // WorkspaceEvicted instead of succeeding. Prevents publishing
1722        // into an orphaned LoadedWorkspace after a race with eviction.
1723        let mgr = WorkspaceManager::new_without_reaper(make_config());
1724        let key = WorkspaceKey::new(
1725            PathBuf::from("/repos/never-registered"),
1726            ProjectRootMode::GitRoot,
1727            0xDEAD,
1728        );
1729        let err = mgr
1730            .reserve_rebuild(&key, 100_000)
1731            .expect_err("unknown key must surface WorkspaceEvicted");
1732        match err {
1733            DaemonError::WorkspaceEvicted { root } => {
1734                assert_eq!(root, PathBuf::from("/repos/never-registered"));
1735            }
1736            other => panic!("wrong error variant: {other:?}"),
1737        }
1738        assert_eq!(
1739            mgr.admission.lock().reserved_bytes,
1740            0,
1741            "a rejected reservation must not mutate admission state",
1742        );
1743    }
1744
1745    #[test]
1746    fn reserve_rebuild_rejects_cancelled_workspace() {
1747        // Task 7 Phase 7b1: a workspace whose `rebuild_cancelled` flag
1748        // is set (by `execute_eviction`) must be rejected even if still
1749        // present in the map (the two mutations run under the same
1750        // `workspaces.write()` scope, but defensive reads should catch
1751        // either signal).
1752        let mgr = WorkspaceManager::new_without_reaper(make_config());
1753        let key = WorkspaceKey::new(
1754            PathBuf::from("/repos/cancelled"),
1755            ProjectRootMode::GitRoot,
1756            0xCAFE,
1757        );
1758        let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
1759        ws.rebuild_cancelled.store(true, Ordering::Release);
1760        mgr.workspaces.write().insert(key.clone(), ws);
1761
1762        let err = mgr
1763            .reserve_rebuild(&key, 100_000)
1764            .expect_err("cancelled workspace must surface WorkspaceEvicted");
1765        match err {
1766            DaemonError::WorkspaceEvicted { root } => {
1767                assert_eq!(root, PathBuf::from("/repos/cancelled"));
1768            }
1769            other => panic!("wrong error variant: {other:?}"),
1770        }
1771    }
1772
1773    #[test]
1774    fn publish_and_retain_moves_bytes_and_retains_old_arc() {
1775        let mgr = WorkspaceManager::new_without_reaper(make_config());
1776        let ws = make_workspace();
1777        mgr.workspaces
1778            .write()
1779            .insert(ws.key.clone(), Arc::clone(&ws));
1780        let reservation = mgr.reserve_rebuild(&ws.key, 100_000).expect("reserve fits");
1781
1782        // Pre-seed workspace memory_bytes so publish exercises the
1783        // loaded-bytes swap (subtract prior, add new).
1784        ws.memory_bytes.store(50_000, Ordering::Release);
1785        mgr.admission.lock().loaded_bytes = 50_000;
1786
1787        let new_graph = CodeGraph::new();
1788        let new_bytes = new_graph.heap_bytes() as u64;
1789        let (token, _published_arc) = mgr.publish_and_retain(reservation, &ws, new_graph);
1790
1791        let state = mgr.admission.lock();
1792        assert_eq!(
1793            state.reserved_bytes, 0,
1794            "reservation bytes must drain on publish"
1795        );
1796        assert_eq!(
1797            state.loaded_bytes, new_bytes,
1798            "loaded_bytes = prior(50k) - prior(50k) + new(heap_bytes())",
1799        );
1800        assert_eq!(state.retained_old.len(), 1, "exactly one retained entry");
1801        let retained = state.retained_old.get(&token).expect("token present");
1802        assert_eq!(
1803            retained.bytes, 50_000,
1804            "retained bytes is the prior workspace memory_bytes",
1805        );
1806        assert_eq!(
1807            Arc::strong_count(&retained.graph),
1808            1,
1809            "admission map is the sole holder of the old Arc after publish",
1810        );
1811    }
1812
1813    #[test]
1814    fn rollback_guard_restores_workspace_on_panic_path() {
1815        // Synthesise the exact field layout publish_and_retain sets up
1816        // so the guard's Drop behaviour can be exercised directly,
1817        // without the heavy publish path.
1818        let ws = make_workspace();
1819        let old_graph = Arc::new(CodeGraph::new());
1820        ws.graph.store(Arc::clone(&old_graph));
1821        ws.memory_bytes.store(10_000, Ordering::Release);
1822
1823        {
1824            let mut guard = RollbackGuard {
1825                ws: &ws,
1826                prior_arc: Some(Arc::clone(&old_graph)),
1827                prior_bytes: 10_000,
1828                armed: true,
1829            };
1830
1831            // Simulate a partial publish: swap the ArcSwap + memory_bytes.
1832            let stomped = Arc::new(CodeGraph::new());
1833            ws.graph.store(Arc::clone(&stomped));
1834            ws.memory_bytes.store(99_999, Ordering::Release);
1835
1836            // `armed == true` so the guard reverses both fields on drop.
1837            // Flip the disarm check intentionally OFF — mimics panic path.
1838            let _ = &mut guard;
1839        }
1840
1841        // After the guard drops, both fields must match the prior.
1842        let restored = ws.graph.load_full();
1843        assert!(Arc::ptr_eq(&restored, &old_graph));
1844        assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 10_000);
1845    }
1846
1847    #[test]
1848    fn rollback_guard_disarmed_is_noop() {
1849        let ws = make_workspace();
1850        let old_graph = Arc::new(CodeGraph::new());
1851        ws.graph.store(Arc::clone(&old_graph));
1852        ws.memory_bytes.store(10_000, Ordering::Release);
1853
1854        {
1855            let mut guard = RollbackGuard {
1856                ws: &ws,
1857                prior_arc: Some(Arc::clone(&old_graph)),
1858                prior_bytes: 10_000,
1859                armed: true,
1860            };
1861            let stomped = Arc::new(CodeGraph::new());
1862            ws.graph.store(Arc::clone(&stomped));
1863            ws.memory_bytes.store(99_999, Ordering::Release);
1864
1865            // Success path disarms the guard.
1866            guard.armed = false;
1867        }
1868
1869        // State must stay "stomped" — the guard was disarmed.
1870        assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 99_999);
1871    }
1872
1873    #[test]
1874    fn reap_once_drops_last_holder_entries() {
1875        let mgr = WorkspaceManager::new_without_reaper(make_config());
1876        let ws = make_workspace();
1877        mgr.workspaces
1878            .write()
1879            .insert(ws.key.clone(), Arc::clone(&ws));
1880        let reservation = mgr
1881            .reserve_rebuild(&ws.key, 0)
1882            .expect("zero-size reservation always fits");
1883        // Publish-and-retain with a fresh empty graph; the old graph
1884        // becomes retained.
1885        mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
1886        assert_eq!(mgr.admission.lock().retained_old.len(), 1);
1887
1888        // No query holds the old Arc, so the next reap tick frees it.
1889        mgr.reap_once();
1890        assert_eq!(
1891            mgr.admission.lock().retained_old.len(),
1892            0,
1893            "reaper must free entries whose strong_count == 1",
1894        );
1895    }
1896
1897    #[test]
1898    fn reap_once_retains_entries_with_outstanding_holders() {
1899        let mgr = WorkspaceManager::new_without_reaper(make_config());
1900        let ws = make_workspace();
1901        mgr.workspaces
1902            .write()
1903            .insert(ws.key.clone(), Arc::clone(&ws));
1904        let reservation = mgr
1905            .reserve_rebuild(&ws.key, 0)
1906            .expect("zero-size reservation always fits");
1907        mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
1908
1909        // Simulate a slow query holding the retained Arc.
1910        let held = {
1911            let state = mgr.admission.lock();
1912            let token = *state.retained_old.keys().next().expect("one entry");
1913            Arc::clone(&state.retained_old.get(&token).unwrap().graph)
1914        };
1915        assert_eq!(Arc::strong_count(&held), 2);
1916
1917        mgr.reap_once();
1918        assert_eq!(
1919            mgr.admission.lock().retained_old.len(),
1920            1,
1921            "reaper must not drop entries that slow queries still hold",
1922        );
1923        drop(held);
1924
1925        mgr.reap_once();
1926        assert_eq!(
1927            mgr.admission.lock().retained_old.len(),
1928            0,
1929            "reaper frees the entry once the last slow query releases",
1930        );
1931    }
1932
1933    #[test]
1934    fn unconsumed_reservation_refunds_reserved_bytes_on_drop() {
1935        // Regression for Codex Task 6 Phase 6a iter-1 MAJOR:
1936        // if a rebuild panics *between* `reserve_rebuild` and the
1937        // admission-mutex section of `publish_and_retain`, the
1938        // reservation's Drop must refund `reserved_bytes` back to
1939        // the admission pool. A pre-fix bug disarmed the reservation
1940        // too early and leaked bytes on any unwind path.
1941        let mgr = WorkspaceManager::new_without_reaper(make_config());
1942        let ws = make_workspace();
1943        mgr.workspaces
1944            .write()
1945            .insert(ws.key.clone(), Arc::clone(&ws));
1946        let reservation = mgr
1947            .reserve_rebuild(&ws.key, 250_000)
1948            .expect("reservation fits");
1949        assert_eq!(mgr.admission.lock().reserved_bytes, 250_000);
1950
1951        // Simulate a rebuild that panics after reservation but
1952        // before publish by letting the reservation drop on the
1953        // unwind-equivalent code path (explicit drop here; the
1954        // RAII guard fires the same way under `catch_unwind`).
1955        drop(reservation);
1956
1957        assert_eq!(
1958            mgr.admission.lock().reserved_bytes,
1959            0,
1960            "unconsumed reservation must refund reserved_bytes on drop \
1961             (Codex Task 6 Phase 6a iter-1 MAJOR regression)",
1962        );
1963    }
1964
1965    #[test]
1966    fn publish_and_retain_leaves_reservation_fully_disarmed_on_success() {
1967        // Companion to the refund regression: once publish_and_retain
1968        // completes successfully, the reservation must be disarmed —
1969        // otherwise its Drop at scope-exit would double-refund and
1970        // corrupt admission state.
1971        let mgr = WorkspaceManager::new_without_reaper(make_config());
1972        let ws = make_workspace();
1973        mgr.workspaces
1974            .write()
1975            .insert(ws.key.clone(), Arc::clone(&ws));
1976        let reservation = mgr
1977            .reserve_rebuild(&ws.key, 100_000)
1978            .expect("reservation fits");
1979        let admission_before = mgr.admission.lock().reserved_bytes;
1980        assert_eq!(admission_before, 100_000);
1981
1982        // Drive the full commit path. After this returns the
1983        // reservation is already moved into the function, so we can
1984        // only observe the *absence* of any stray refund.
1985        let (_token, _published_arc) = mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
1986        let admission_after = mgr.admission.lock().reserved_bytes;
1987        assert_eq!(
1988            admission_after, 0,
1989            "publish must drain reserved_bytes exactly once, not double-drain or leak",
1990        );
1991
1992        // A fresh reservation should see headroom = budget - loaded - retained;
1993        // if the previous publish leaked reserved_bytes this would fail.
1994        let again = mgr
1995            .reserve_rebuild(&ws.key, 100_000)
1996            .expect("post-publish admission must still admit a same-size reservation");
1997        drop(again);
1998        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
1999    }
2000
2001    #[test]
2002    fn unwind_after_swap_before_admission_commit_restores_full_state() {
2003        // Regression for Codex Task 6 Phase 6a iter-2 MAJOR:
2004        // simulate a panic *between* the ArcSwap swap and the
2005        // admission mutex acquisition. After unwind, the admission
2006        // state must be exactly pre-call: reserved_bytes refunded,
2007        // loaded_bytes untouched, retained_old empty, workspace.graph
2008        // and workspace.memory_bytes restored to their prior values.
2009        //
2010        // We can't inject a panic into the real `publish_and_retain`
2011        // without mocking the allocator, so we reproduce the exact
2012        // Drop-order interaction using the public types: build a
2013        // RollbackGuard + RebuildReservation in the same geometry as
2014        // the real function, run `catch_unwind` over the non-
2015        // recoverable zone, and panic inside it.
2016        use std::panic::{AssertUnwindSafe, catch_unwind};
2017
2018        let mgr = WorkspaceManager::new_without_reaper(make_config());
2019        let ws = Arc::new(LoadedWorkspace::new(
2020            WorkspaceKey::new(
2021                PathBuf::from("/repos/example"),
2022                ProjectRootMode::GitRoot,
2023                0x1,
2024            ),
2025            false,
2026        ));
2027        mgr.workspaces
2028            .write()
2029            .insert(ws.key.clone(), Arc::clone(&ws));
2030
2031        // Pre-seed workspace bytes so we can observe rollback.
2032        let prior_bytes_usize = 50_000usize;
2033        ws.memory_bytes.store(prior_bytes_usize, Ordering::Release);
2034        mgr.admission.lock().loaded_bytes = 50_000;
2035        let prior_arc = ws.graph.load_full();
2036
2037        // Reserve headroom as the real function does.
2038        let reservation = mgr
2039            .reserve_rebuild(&ws.key, 100_000)
2040            .expect("reservation fits");
2041        assert_eq!(mgr.admission.lock().reserved_bytes, 100_000);
2042
2043        let outcome = catch_unwind(AssertUnwindSafe(|| {
2044            // Mirror `publish_and_retain` up to and INCLUDING the
2045            // ArcSwap swap + update_memory, then panic *before* we
2046            // would have acquired the admission mutex. This is the
2047            // exact unwind window the iter-2 finding describes.
2048            let new_arc = Arc::new(CodeGraph::new());
2049            let prior_arc_clone = ws.graph.load_full();
2050            // The guard is armed and has no visible use after this
2051            // point; its Drop is the entire reason the scope exists,
2052            // so the binding is deliberately underscore-prefixed and
2053            // held until the panic unwinds the stack.
2054            let _rollback = RollbackGuard {
2055                ws: &ws,
2056                prior_arc: Some(prior_arc_clone),
2057                prior_bytes: prior_bytes_usize,
2058                armed: true,
2059            };
2060            let _old_arc = ws.graph.swap(new_arc);
2061            let _prev = ws.update_memory(99_999);
2062
2063            // Hand the reservation into the scope so its Drop fires
2064            // on unwind if we never disarm it — which we won't.
2065            let _hold = reservation;
2066
2067            // Simulate the panic site (e.g. retained_old.insert OOM).
2068            panic!("simulated panic inside publish_and_retain");
2069        }));
2070        assert!(outcome.is_err(), "catch_unwind must observe the panic");
2071
2072        // Post-unwind assertions — every piece of admission state and
2073        // every observable piece of workspace state must match the
2074        // pre-call snapshot exactly.
2075        let restored = ws.graph.load_full();
2076        assert!(
2077            Arc::ptr_eq(&restored, &prior_arc),
2078            "RollbackGuard must restore ws.graph to the prior Arc after unwind",
2079        );
2080        assert_eq!(
2081            ws.memory_bytes.load(Ordering::Acquire),
2082            prior_bytes_usize,
2083            "RollbackGuard must restore ws.memory_bytes after unwind",
2084        );
2085        let state = mgr.admission.lock();
2086        assert_eq!(
2087            state.reserved_bytes, 0,
2088            "reservation refund must return reserved_bytes to pre-call value (0)",
2089        );
2090        assert_eq!(
2091            state.loaded_bytes, 50_000,
2092            "loaded_bytes must not be mutated when admission commit is never entered",
2093        );
2094        assert_eq!(
2095            state.retained_old.len(),
2096            0,
2097            "retained_old must be empty when admission commit is never entered",
2098        );
2099    }
2100
2101    // --- Phase 6b: lifecycle primitives --------------------------
2102
2103    fn make_key_at(path: &str, fingerprint: u64) -> WorkspaceKey {
2104        WorkspaceKey::new(PathBuf::from(path), ProjectRootMode::GitRoot, fingerprint)
2105    }
2106
2107    #[test]
2108    fn get_or_load_builds_on_miss_and_caches() {
2109        let mgr = WorkspaceManager::new_without_reaper(make_config());
2110        let key = make_key_at("/repos/example", 0x1);
2111        let builder = super::super::builder::EmptyGraphBuilder;
2112
2113        let g1 = mgr
2114            .get_or_load(&key, &builder, 1_000)
2115            .expect("first load succeeds");
2116        let g2 = mgr
2117            .get_or_load(&key, &builder, 1_000)
2118            .expect("second load hits cache");
2119        assert!(
2120            Arc::ptr_eq(&g1, &g2),
2121            "cache hit must return the same Arc as the initial build",
2122        );
2123    }
2124
2125    #[test]
2126    fn get_or_load_surfaces_builder_failures_and_sets_failed_state() {
2127        let mgr = WorkspaceManager::new_without_reaper(make_config());
2128        let key = make_key_at("/repos/example", 0x1);
2129        let failing = super::super::builder::FailingGraphBuilder::new("simulated plugin panic");
2130
2131        let err = mgr
2132            .get_or_load(&key, &failing, 1_000)
2133            .expect_err("builder failure must bubble up");
2134        match err {
2135            DaemonError::WorkspaceBuildFailed { reason, .. } => {
2136                assert_eq!(reason, "simulated plugin panic");
2137            }
2138            other => panic!("wrong variant: {other:?}"),
2139        }
2140
2141        // Workspace should be in Failed state with retry_count==1.
2142        let workspaces = mgr.workspaces.read();
2143        let ws = workspaces.get(&key).expect("workspace registered");
2144        assert_eq!(ws.load_state(), WorkspaceState::Failed);
2145        assert_eq!(ws.retry_count.load(Ordering::Acquire), 1);
2146        assert!(ws.last_error.read().is_some());
2147        drop(workspaces);
2148
2149        // Admission state must NOT have leaked the reservation —
2150        // RebuildReservation's Drop fires on the error path.
2151        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2152    }
2153
2154    #[test]
2155    fn evict_lru_picks_oldest_non_pinned_workspace() {
2156        let mgr = WorkspaceManager::new_without_reaper(make_config());
2157        let builder = super::super::builder::EmptyGraphBuilder;
2158
2159        let a = make_key_at("/repos/a", 0x1);
2160        let b = make_key_at("/repos/b", 0x1);
2161        mgr.get_or_load(&a, &builder, 100_000).unwrap();
2162        std::thread::sleep(Duration::from_millis(5));
2163        mgr.get_or_load(&b, &builder, 100_000).unwrap();
2164
2165        // `a` was touched first, so it should be the LRU victim.
2166        let victim = mgr.evict_lru().expect("one candidate");
2167        assert_eq!(victim, a, "oldest workspace must be evicted first");
2168        assert!(
2169            !mgr.workspaces.read().contains_key(&a),
2170            "evicted workspace must be removed from the manager map",
2171        );
2172        assert!(
2173            mgr.workspaces.read().contains_key(&b),
2174            "non-victim workspace must remain",
2175        );
2176    }
2177
2178    #[test]
2179    fn evict_lru_returns_none_when_no_candidates() {
2180        let mgr = WorkspaceManager::new_without_reaper(make_config());
2181        assert!(
2182            mgr.evict_lru().is_none(),
2183            "empty manager has no eviction candidate",
2184        );
2185    }
2186
2187    #[test]
2188    fn evict_lru_skips_pinned_workspaces() {
2189        let mgr = WorkspaceManager::new_without_reaper(make_config());
2190        let builder = super::super::builder::EmptyGraphBuilder;
2191        let pinned_key = make_key_at("/repos/pinned", 0x1);
2192
2193        // Insert a pinned workspace by manually constructing + registering.
2194        {
2195            let mut ws_map = mgr.workspaces.write();
2196            ws_map.insert(
2197                pinned_key.clone(),
2198                Arc::new(LoadedWorkspace::new(
2199                    pinned_key.clone(),
2200                    /*pinned*/ true,
2201                )),
2202            );
2203        }
2204        // And drive it into Loaded state via a no-op publish.
2205        {
2206            let ws = mgr.workspaces.read().get(&pinned_key).unwrap().clone();
2207            ws.store_state(WorkspaceState::Loaded);
2208            ws.touch();
2209        }
2210
2211        // Plus a regular unpinned workspace.
2212        let other = make_key_at("/repos/other", 0x1);
2213        mgr.get_or_load(&other, &builder, 100_000).unwrap();
2214
2215        // Evict should pick `other`, not the pinned one.
2216        let victim = mgr.evict_lru().expect("one candidate");
2217        assert_eq!(victim, other);
2218        assert!(mgr.workspaces.read().contains_key(&pinned_key));
2219    }
2220
2221    #[test]
2222    fn unload_removes_workspace_and_reclaims_bytes() {
2223        let mgr = WorkspaceManager::new_without_reaper(make_config());
2224        let builder = super::super::builder::EmptyGraphBuilder;
2225        let key = make_key_at("/repos/example", 0x1);
2226        mgr.get_or_load(&key, &builder, 100_000).unwrap();
2227        assert!(mgr.workspaces.read().contains_key(&key));
2228
2229        assert!(mgr.unload(&key), "unload must report present");
2230        assert!(!mgr.workspaces.read().contains_key(&key));
2231
2232        assert!(!mgr.unload(&key), "unload on missing key returns false");
2233    }
2234
2235    #[test]
2236    fn status_reflects_loaded_workspaces_and_memory() {
2237        let mgr = WorkspaceManager::new_without_reaper(make_config());
2238        let builder = super::super::builder::EmptyGraphBuilder;
2239        let key = make_key_at("/repos/example", 0x1);
2240        mgr.get_or_load(&key, &builder, 100_000).unwrap();
2241
2242        let status = mgr.status();
2243        assert_eq!(status.daemon_version, env!("CARGO_PKG_VERSION"));
2244        assert_eq!(status.workspaces.len(), 1);
2245        assert_eq!(
2246            status.workspaces[0].index_root,
2247            PathBuf::from("/repos/example")
2248        );
2249        assert_eq!(status.workspaces[0].state, WorkspaceState::Loaded);
2250        assert!(!status.workspaces[0].pinned);
2251        assert_eq!(status.memory.limit_bytes, 1024 * 1024);
2252        // current_bytes is at least as large as the graph (empty here,
2253        // but loaded_bytes tracks an entry regardless).
2254        assert!(
2255            status.memory.high_water_bytes >= status.memory.current_bytes,
2256            "high_water_bytes must be monotonic wrt current_bytes",
2257        );
2258    }
2259
2260    #[test]
2261    fn reserve_rebuild_triggers_eviction_when_budget_tight() {
2262        // Budget is 1 MiB (from make_config). Fill it with a 700 kB
2263        // workspace, then reserve 600 kB — Phase 1 must pick the
2264        // 700 kB workspace as a victim, Phase 2 evicts it, Phase 3
2265        // commits the reservation.
2266        let mgr = WorkspaceManager::new_without_reaper(make_config());
2267        let victim_key = make_key_at("/repos/victim", 0x1);
2268        let victim = Arc::new(LoadedWorkspace::new(victim_key.clone(), false));
2269        victim.memory_bytes.store(700_000, Ordering::Release);
2270        victim.store_state(WorkspaceState::Loaded);
2271        victim.touch();
2272        mgr.workspaces
2273            .write()
2274            .insert(victim_key.clone(), Arc::clone(&victim));
2275        mgr.admission.lock().loaded_bytes = 700_000;
2276
2277        let new_key = make_key_at("/repos/new", 0x1);
2278        mgr.workspaces.write().insert(
2279            new_key.clone(),
2280            Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
2281        );
2282        let reservation = mgr
2283            .reserve_rebuild(&new_key, 600_000)
2284            .expect("Phase 2 eviction must free headroom");
2285        // Victim is gone from the map.
2286        assert!(!mgr.workspaces.read().contains_key(&victim_key));
2287        // Admission reserved the new bytes.
2288        assert_eq!(mgr.admission.lock().reserved_bytes, 600_000);
2289        drop(reservation);
2290    }
2291
2292    #[test]
2293    fn reserve_rebuild_rejects_when_only_pinned_workspaces_remain() {
2294        // Budget 1 MiB. Pin a 900 kB workspace. Requesting 600 kB
2295        // cannot evict the pin, so Phase 3 must reject.
2296        let mgr = WorkspaceManager::new_without_reaper(make_config());
2297        let pinned_key = make_key_at("/repos/pinned", 0x1);
2298        let pinned = Arc::new(LoadedWorkspace::new(
2299            pinned_key.clone(),
2300            /*pinned*/ true,
2301        ));
2302        pinned.memory_bytes.store(900_000, Ordering::Release);
2303        pinned.store_state(WorkspaceState::Loaded);
2304        mgr.workspaces
2305            .write()
2306            .insert(pinned_key.clone(), Arc::clone(&pinned));
2307        mgr.admission.lock().loaded_bytes = 900_000;
2308
2309        let new_key = make_key_at("/repos/new", 0x1);
2310        mgr.workspaces.write().insert(
2311            new_key.clone(),
2312            Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
2313        );
2314        let err = mgr
2315            .reserve_rebuild(&new_key, 600_000)
2316            .expect_err("pinned workspace makes budget unfittable");
2317        match err {
2318            DaemonError::MemoryBudgetExceeded {
2319                requested_bytes,
2320                current_bytes,
2321                ..
2322            } => {
2323                assert_eq!(requested_bytes, 600_000);
2324                assert_eq!(
2325                    current_bytes, 900_000,
2326                    "pinned workspace bytes still count after Phase 2",
2327                );
2328            }
2329            other => panic!("wrong variant: {other:?}"),
2330        }
2331        // Pinned workspace must still be present.
2332        assert!(mgr.workspaces.read().contains_key(&pinned_key));
2333    }
2334
2335    #[test]
2336    fn execute_eviction_routes_bytes_through_retained_old() {
2337        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #1:
2338        // eviction previously dropped the evicted Arc without
2339        // inserting a retained entry, leaking bytes if a slow
2340        // query still held the graph.
2341        let mgr = WorkspaceManager::new_without_reaper(make_config());
2342        let ws_key = make_key_at("/repos/example", 0x1);
2343        let ws = Arc::new(LoadedWorkspace::new(ws_key.clone(), false));
2344        ws.memory_bytes.store(300_000, Ordering::Release);
2345        ws.store_state(WorkspaceState::Loaded);
2346        mgr.workspaces
2347            .write()
2348            .insert(ws_key.clone(), Arc::clone(&ws));
2349        mgr.admission.lock().loaded_bytes = 300_000;
2350
2351        // Pin the current graph Arc via a simulated slow query
2352        // holder so the retained entry stays past the first reap.
2353        let slow_query_arc = ws.graph.load_full();
2354
2355        mgr.execute_eviction(&ws_key);
2356
2357        let state = mgr.admission.lock();
2358        assert_eq!(
2359            state.loaded_bytes, 0,
2360            "evicted workspace bytes must leave the loaded tier",
2361        );
2362        assert_eq!(
2363            state.retained_total_bytes(),
2364            300_000,
2365            "evicted workspace bytes must enter the retained tier",
2366        );
2367        assert_eq!(state.retained_old.len(), 1);
2368        drop(state);
2369
2370        // The slow query still holds the Arc. A reap does NOT free
2371        // yet — §G.5 is preserved until strong_count == 1.
2372        mgr.reap_once();
2373        assert_eq!(mgr.admission.lock().retained_total_bytes(), 300_000);
2374
2375        // Once the slow query releases, the next reap frees bytes.
2376        drop(slow_query_arc);
2377        mgr.reap_once();
2378        assert_eq!(
2379            mgr.admission.lock().retained_total_bytes(),
2380            0,
2381            "reaper must free retained entry once slow query releases",
2382        );
2383    }
2384
2385    #[test]
2386    fn get_or_load_state_cas_rejects_concurrent_load() {
2387        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #2:
2388        // two loaders must not both run the slow path. The state
2389        // CAS gates exactly one winner.
2390        let mgr = WorkspaceManager::new_without_reaper(make_config());
2391        let key = make_key_at("/repos/example", 0x1);
2392        let ws = mgr.get_or_insert_workspace(&key);
2393        // Simulate another loader holding the gate.
2394        ws.store_state(WorkspaceState::Loading);
2395
2396        let builder = super::super::builder::EmptyGraphBuilder;
2397        let err = mgr
2398            .get_or_load(&key, &builder, 1_000)
2399            .expect_err("concurrent load must be rejected");
2400        match err {
2401            DaemonError::WorkspaceBuildFailed { reason, .. } => {
2402                assert!(
2403                    reason.contains("already in progress"),
2404                    "unexpected reason: {reason}",
2405                );
2406            }
2407            other => panic!("wrong variant: {other:?}"),
2408        }
2409
2410        // Restore state so Drop order is clean; sanity-check that
2411        // the admission state was not mutated by the rejected call.
2412        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2413    }
2414
2415    #[test]
2416    fn get_or_load_detects_cancellation_between_cas_and_publish() {
2417        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #2
2418        // (cancellation-detection subcase): if rebuild_cancelled was
2419        // set before our CAS — i.e. evict raced in front of us on
2420        // the prior state — get_or_load must honour the signal
2421        // instead of clobbering it and publishing into an evicted
2422        // workspace.
2423        let mgr = WorkspaceManager::new_without_reaper(make_config());
2424        let key = make_key_at("/repos/example", 0x1);
2425        let ws = mgr.get_or_insert_workspace(&key);
2426        // Simulate "evict ran on an earlier state but left the
2427        // workspace in the map": cancellation flag set, state
2428        // Unloaded (so CAS succeeds).
2429        ws.rebuild_cancelled.store(true, Ordering::Release);
2430        ws.store_state(WorkspaceState::Unloaded);
2431
2432        let builder = super::super::builder::EmptyGraphBuilder;
2433        let err = mgr
2434            .get_or_load(&key, &builder, 1_000)
2435            .expect_err("pre-CAS cancellation must be honoured");
2436        match err {
2437            DaemonError::WorkspaceBuildFailed { reason, .. } => {
2438                assert!(
2439                    reason.contains("evicted mid-load"),
2440                    "unexpected reason: {reason}",
2441                );
2442            }
2443            other => panic!("wrong variant: {other:?}"),
2444        }
2445        // rebuild_cancelled must still be true (we didn't clobber).
2446        assert!(ws.rebuild_cancelled.load(Ordering::Acquire));
2447        assert_eq!(ws.load_state(), WorkspaceState::Failed);
2448    }
2449
2450    #[test]
2451    fn get_or_load_loading_guard_recovers_from_builder_panic() {
2452        // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #3:
2453        // a panic from builder.build must not leave the workspace
2454        // stuck in Loading with last_error unset.
2455        use std::panic::{AssertUnwindSafe, catch_unwind};
2456
2457        #[derive(Debug)]
2458        struct PanickingBuilder;
2459        impl WorkspaceBuilder for PanickingBuilder {
2460            fn build(&self, _root: &Path) -> Result<CodeGraph, DaemonError> {
2461                panic!("simulated builder panic");
2462            }
2463        }
2464
2465        let mgr = WorkspaceManager::new_without_reaper(make_config());
2466        let key = make_key_at("/repos/example", 0x1);
2467        let builder = PanickingBuilder;
2468
2469        let outcome = catch_unwind(AssertUnwindSafe(|| {
2470            let _ = mgr.get_or_load(&key, &builder, 1_000);
2471        }));
2472        assert!(outcome.is_err(), "panic must propagate through get_or_load");
2473
2474        let workspaces = mgr.workspaces.read();
2475        let ws = workspaces.get(&key).expect("workspace still registered");
2476        assert_eq!(
2477            ws.load_state(),
2478            WorkspaceState::Failed,
2479            "LoadingGuard must transition Loading → Failed on unwind",
2480        );
2481        assert!(
2482            ws.last_error.read().is_some(),
2483            "LoadingGuard must populate last_error on unwind",
2484        );
2485        assert!(
2486            ws.retry_count.load(Ordering::Acquire) >= 1,
2487            "LoadingGuard must increment retry_count",
2488        );
2489        drop(workspaces);
2490
2491        // Admission: the RebuildReservation Drop on unwind refunds
2492        // reserved_bytes, so the state is clean.
2493        assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2494    }
2495
2496    #[test]
2497    fn concurrent_load_and_evict_never_publishes_into_evicted_workspace() {
2498        // Regression for Codex Task 6 Phase 6b iter-2 MAJOR:
2499        // the post-build re-check was not atomic with
2500        // `publish_and_retain`. A concurrent eviction could slip
2501        // in between the re-check and the publish, so we'd end
2502        // up accounting bytes for an evicted workspace.
2503        //
2504        // Stress test: run many iterations of `get_or_load` and
2505        // `execute_eviction` concurrently; every iteration
2506        // should leave the admission state consistent (§G.5),
2507        // the workspace either fully loaded or fully evicted,
2508        // and never in a half-committed "loaded_bytes points at
2509        // a graph that isn't in the map" state.
2510        use std::sync::Barrier;
2511        use std::thread;
2512
2513        const ITERATIONS: usize = 64;
2514        for iter in 0..ITERATIONS {
2515            let mgr = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig {
2516                memory_limit_mb: 64,
2517                ..DaemonConfig::default()
2518            }));
2519            let key = make_key_at("/repos/example", iter as u64);
2520            let builder = Arc::new(super::super::builder::EmptyGraphBuilder);
2521
2522            let start = Arc::new(Barrier::new(2));
2523            let mgr_clone = Arc::clone(&mgr);
2524            let key_clone = key.clone();
2525            let builder_clone = Arc::clone(&builder);
2526            let start_load = Arc::clone(&start);
2527            let loader = thread::spawn(move || {
2528                start_load.wait();
2529                // Intentionally ignore the result — either success
2530                // or failure is valid; we assert post-hoc invariants.
2531                let _ = mgr_clone.get_or_load(&key_clone, &*builder_clone, 100_000);
2532            });
2533
2534            let mgr_clone = Arc::clone(&mgr);
2535            let key_clone = key.clone();
2536            let start_evict = Arc::clone(&start);
2537            let evictor = thread::spawn(move || {
2538                start_evict.wait();
2539                // Run unload against the same key; either it races
2540                // ahead of the loader (no-op), or evicts after the
2541                // loader publishes.
2542                mgr_clone.unload(&key_clone);
2543            });
2544
2545            loader.join().expect("loader panicked");
2546            evictor.join().expect("evictor panicked");
2547
2548            // Post-hoc invariants:
2549            // 1. The workspace is either Loaded AND in the map, or
2550            //    not in the map at all. No "evicted-but-in-map"
2551            //    intermediate state.
2552            // 2. Admission state is consistent: loaded_bytes +
2553            //    reserved_bytes + retained_total is whatever it is,
2554            //    but reserved_bytes must be zero (no in-flight
2555            //    reservations) and the invariant must hold as
2556            //    evidenced by positive counters.
2557            let workspaces = mgr.workspaces.read();
2558            if let Some(ws) = workspaces.get(&key) {
2559                assert_eq!(
2560                    ws.load_state(),
2561                    WorkspaceState::Loaded,
2562                    "iter {iter}: workspace in map must be Loaded, not {}",
2563                    ws.load_state(),
2564                );
2565            }
2566            drop(workspaces);
2567
2568            let state = mgr.admission.lock();
2569            assert_eq!(
2570                state.reserved_bytes, 0,
2571                "iter {iter}: no reservations should leak after the race"
2572            );
2573            // §G.5 is intrinsically maintained by the arithmetic
2574            // operations; assert the totals are non-negative and
2575            // fit the budget.
2576            assert!(
2577                state.total_committed_bytes() <= mgr.memory_limit_bytes(),
2578                "iter {iter}: total_committed {} over budget {}",
2579                state.total_committed_bytes(),
2580                mgr.memory_limit_bytes(),
2581            );
2582        }
2583    }
2584
2585    #[test]
2586    fn publish_fires_installed_hook() {
2587        // Phase 6c iter-2: `get_or_load` must invoke the installed
2588        // SqrydHook once the admission commit succeeds AND after
2589        // releasing `workspaces_guard`. This test drives the full
2590        // load path end-to-end so the fix (moving the hook out of
2591        // `publish_and_retain` and into the caller, outside every
2592        // workspaces-lock holder) is exercised — not just the raw
2593        // `publish_and_retain` critical section.
2594        let mgr = WorkspaceManager::new_without_reaper(make_config());
2595        let hook = super::super::hook::RecordingHook::new();
2596        mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
2597
2598        let key = make_key_at("/repos/example", 0x1);
2599        let builder = super::super::builder::EmptyGraphBuilder;
2600        mgr.get_or_load(&key, &builder, 0)
2601            .expect("load on empty builder succeeds");
2602
2603        assert_eq!(
2604            hook.invocation_count(),
2605            1,
2606            "hook must fire exactly once per publish",
2607        );
2608        assert_eq!(
2609            hook.invocation_roots(),
2610            vec![key.index_root.clone()],
2611            "hook must receive the workspace's index_root",
2612        );
2613    }
2614
2615    #[test]
2616    fn set_hook_replaces_prior_hook_for_subsequent_publishes() {
2617        // Phase 6c iter-2: install hook A, load, evict, install
2618        // hook B, load again. Hook A sees one invocation; hook B
2619        // sees one. Driving through `get_or_load` exercises the
2620        // post-`workspaces_guard`-drop dispatch path the iter-2
2621        // fix added.
2622        let mgr = WorkspaceManager::new_without_reaper(make_config());
2623        let hook_a = super::super::hook::RecordingHook::new();
2624        let hook_b = super::super::hook::RecordingHook::new();
2625        let builder = super::super::builder::EmptyGraphBuilder;
2626        let key = make_key_at("/repos/example", 0x1);
2627
2628        mgr.set_hook(Arc::clone(&hook_a) as super::super::hook::SharedHook);
2629        mgr.get_or_load(&key, &builder, 0)
2630            .expect("first load with hook A");
2631
2632        // Evict so the next `get_or_load` rebuilds and re-publishes
2633        // rather than hitting the Loaded-state cache fast path.
2634        mgr.unload(&key);
2635
2636        mgr.set_hook(Arc::clone(&hook_b) as super::super::hook::SharedHook);
2637        mgr.get_or_load(&key, &builder, 0)
2638            .expect("second load with hook B");
2639
2640        assert_eq!(hook_a.invocation_count(), 1);
2641        assert_eq!(hook_b.invocation_count(), 1);
2642    }
2643
2644    #[test]
2645    fn hook_can_call_manager_unload_without_deadlock() {
2646        // Regression for Codex Task 6 Phase 6c iter-1 MAJOR: the
2647        // hook must fire OUTSIDE the `workspaces.read()` guard
2648        // that `get_or_load` holds across `publish_and_retain`,
2649        // so a hook impl that calls back into `manager.unload(key)`
2650        // — which acquires `workspaces.write()` inside
2651        // `execute_eviction` — must NOT deadlock against the
2652        // loader that fired it.
2653        //
2654        // Pre-fix: the hook dispatched from inside
2655        // `publish_and_retain` under the caller's
2656        // `workspaces.read()` guard, so the re-entrant
2657        // `workspaces.write()` in `unload` would block forever.
2658        //
2659        // We run the load on a background thread and fail the
2660        // test if the thread is still alive after a generous
2661        // timeout — that turns any deadlock regression into a
2662        // deterministic failure rather than a stuck runner.
2663        use std::{sync::Weak, thread, time::Duration};
2664
2665        #[derive(Debug)]
2666        struct UnloadingHook {
2667            manager: Weak<WorkspaceManager>,
2668            key: WorkspaceKey,
2669        }
2670
2671        impl super::super::hook::SqrydHook for UnloadingHook {
2672            fn on_publish(&self, _workspace_root: &Path, _graph: Arc<CodeGraph>) {
2673                if let Some(mgr) = self.manager.upgrade() {
2674                    // If the iter-2 fix regressed and this fires
2675                    // under `workspaces.read()`, the `.write()`
2676                    // inside `execute_eviction` deadlocks here
2677                    // and the test's join timeout triggers below.
2678                    let _present = mgr.unload(&self.key);
2679                }
2680            }
2681        }
2682
2683        let mgr = WorkspaceManager::new_without_reaper(make_config());
2684        let key = make_key_at("/repos/example", 0x1);
2685        let builder = super::super::builder::EmptyGraphBuilder;
2686        let hook = Arc::new(UnloadingHook {
2687            manager: Arc::downgrade(&mgr),
2688            key: key.clone(),
2689        });
2690        mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
2691
2692        let mgr_for_thread = Arc::clone(&mgr);
2693        let key_for_thread = key.clone();
2694        let builder_for_thread = builder;
2695        let handle = thread::spawn(move || {
2696            mgr_for_thread
2697                .get_or_load(&key_for_thread, &builder_for_thread, 0)
2698                .expect("load succeeds even with re-entrant hook");
2699        });
2700
2701        let deadline = std::time::Instant::now() + Duration::from_secs(10);
2702        while !handle.is_finished() {
2703            if std::time::Instant::now() > deadline {
2704                panic!(
2705                    "get_or_load deadlocked while firing hook \
2706                     (Codex Task 6 Phase 6c iter-2 regression: \
2707                     hook must dispatch outside workspaces.read())",
2708                );
2709            }
2710            thread::sleep(Duration::from_millis(20));
2711        }
2712        handle
2713            .join()
2714            .expect("loader thread completed without panic");
2715
2716        // Hook's `unload` ran, so the workspace must no longer be
2717        // in the manager map.
2718        assert!(
2719            !mgr.workspaces.read().contains_key(&key),
2720            "hook's re-entrant unload must have removed the workspace",
2721        );
2722        // And the hook observation: it fired exactly once.
2723        // (The hook itself doesn't record invocations; the
2724        // absence-of-workspace assertion above is the positive
2725        // signal that `on_publish` ran to completion.)
2726    }
2727
2728    #[tokio::test]
2729    async fn retention_reaper_task_eventually_drops_free_entries() {
2730        let mgr = WorkspaceManager::new(make_config());
2731        let ws = make_workspace();
2732        mgr.workspaces
2733            .write()
2734            .insert(ws.key.clone(), Arc::clone(&ws));
2735        let reservation = mgr
2736            .reserve_rebuild(&ws.key, 0)
2737            .expect("zero-size reservation always fits");
2738        mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
2739        assert_eq!(mgr.admission.lock().retained_old.len(), 1);
2740
2741        // Reaper ticks every 25 ms; 200 ms is generous.
2742        for _ in 0..20 {
2743            tokio::time::sleep(Duration::from_millis(10)).await;
2744            if mgr.admission.lock().retained_old.is_empty() {
2745                return;
2746            }
2747        }
2748        panic!("reaper task never freed the entry within 200 ms");
2749    }
2750}