sqry_daemon/workspace/manager.rs
1//! [`WorkspaceManager`] — admission accounting entry points.
2//!
3//! Covers Task 6 Steps 3 / 4 / 4a / 4b / 4c / 4d of the sqryd plan
4//! (Amendment 2 §G.1–§G.7). This file lands the admission-accounting
5//! half of the manager — `reserve_rebuild`, `publish_and_retain`,
6//! `RollbackGuard`, and the retention reaper. Workspace lifecycle
7//! (`get_or_load`, `evict_lru`, `unload`, `status`, Failed-state
8//! handling) lands in Phase 6b.
9//!
10//! ## Lock order (authoritative — referenced by §J.4)
11//!
12//! All code paths that acquire more than one lock MUST follow this
13//! total order; acquiring out of order is a bug enforced by code
14//! review.
15//!
16//! 1. `WorkspaceManager.workspaces: RwLock<HashMap<...>>`
17//! 2. `LoadedWorkspace.rebuild_lane: tokio::sync::Mutex<_>` *(Task 7)*
18//! 3. `WorkspaceManager.admission: parking_lot::Mutex<AdmissionState>`
19//!
20//! `WorkspaceManager.hook: RwLock<SharedHook>` is a disjoint
21//! sibling — it is NEVER acquired while any of the three locks
22//! above are held. In particular, the post-publish hook dispatch
23//! (`hook_snapshot` + `SqrydHook::on_publish`) is fired from
24//! `get_or_load` AFTER dropping `workspaces_guard` so the hook
25//! dispatch, and any re-entrant manager method a hook impl might
26//! call, cannot deadlock against the loader that fired it
27//! (Codex Task 6 Phase 6c iter-2 MAJOR).
28//!
29//! Rules:
30//! - A holder of `admission` may NOT acquire `rebuild_lane` or
31//! `workspaces` — it is the innermost lock.
32//! - A holder of `rebuild_lane` may NOT acquire `workspaces`.
33//! `rebuild_lane` is used only for scheduling/coalescing pending
34//! rebuilds; it is never held across a call that takes `workspaces`
35//! or `admission` nestedly.
36//! - A holder of `workspaces` (reader or writer) may NOT acquire
37//! `hook`. Hook dispatch happens only after every outer
38//! workspaces-lock holder has released.
39//! - Eviction iterates `workspaces`, sets the per-workspace atomic
40//! `rebuild_cancelled` flag (no lock), then acquires `admission`
41//! alone to update accounting. Eviction never takes `rebuild_lane`.
42//! - The retention reaper acquires only `admission`.
43
44use std::{
45 collections::HashMap,
46 path::Path,
47 sync::{
48 Arc, Weak,
49 atomic::{AtomicU64, Ordering},
50 },
51 time::{Duration, Instant, SystemTime},
52};
53
54use parking_lot::{Mutex, RwLock};
55use sqry_core::graph::{CodeGraph, unified::GraphMemorySize};
56use tokio::task::JoinHandle;
57use tracing::warn;
58
59use crate::{config::DaemonConfig, error::DaemonError};
60
61use super::{
62 admission::{AdmissionState, RetainedEntry},
63 builder::WorkspaceBuilder,
64 hook::{NoOpHook, SharedHook, SqrydHook},
65 loaded::LoadedWorkspace,
66 staleness::{StalenessVerdict, classify_staleness},
67 state::{OldGraphToken, WorkspaceKey, WorkspaceState},
68 status::{DaemonStatus, MemoryStatus, WorkspaceStatus},
69};
70
71// ---------------------------------------------------------------------------
72// ServeVerdict
73// ---------------------------------------------------------------------------
74
75/// Outcome of [`WorkspaceManager::classify_for_serve`].
76///
77/// Task 7 Phase 7c. Rich-enum return so the IPC router (Task 8) can
78/// decide how to shape its response without re-classifying.
79#[derive(Debug, Clone)]
80pub enum ServeVerdict {
81 /// Workspace is healthy; serve from `graph`. Wraps an `Arc` — the
82 /// caller holds a strong reference until it is dropped, independent
83 /// of any subsequent publish or eviction.
84 Fresh {
85 graph: Arc<CodeGraph>,
86 /// Observed workspace state at classification time — either
87 /// [`WorkspaceState::Loaded`] or [`WorkspaceState::Rebuilding`].
88 /// Task 7's envelope populates `meta.workspace_state` from this
89 /// field so clients can tell which flavour of Fresh they
90 /// received (a freshly-loaded snapshot vs. one whose successor
91 /// rebuild is already in flight).
92 state: WorkspaceState,
93 },
94 /// Workspace is in `Failed` state but within the
95 /// `stale_serve_max_age_hours` cap. Serve from `graph` with
96 /// `meta.stale = true` and `age_hours` in the response envelope.
97 Stale {
98 graph: Arc<CodeGraph>,
99 age_hours: u64,
100 /// Timestamp of the last successful build. Task 7 renders this
101 /// into the `_stale_warning` string as RFC3339 / UTC-Zulu.
102 last_good_at: SystemTime,
103 /// Textual diagnostic from the most recent failed build, if any.
104 /// `None` when the workspace has been Failed since the last good
105 /// build but no error text was captured.
106 last_error: Option<String>,
107 },
108 /// Workspace exists in the manager map but is not yet ready to
109 /// serve (`Unloaded` or `Loading`). The IPC router decides what to
110 /// do next (retry-after-delay, enqueue, surface a client-appropriate
111 /// code) — the manager does not prescribe a retry policy.
112 NotReady { state: WorkspaceState },
113}
114
115// ---------------------------------------------------------------------------
116// WorkspaceManager
117// ---------------------------------------------------------------------------
118
119/// Owns every loaded workspace plus the admission-accounting state.
120///
121/// Construction spawns the retention reaper task (§G.3). The handle is
122/// stored so `Drop` can abort it cleanly — on daemon shutdown the
123/// reaper is aborted, then the admission state drops, dropping every
124/// retained `Arc<CodeGraph>` in one pass. No accounting leak, no
125/// dangling `Arc`.
126#[derive(Debug)]
127pub struct WorkspaceManager {
128 /// Immutable daemon configuration — used for the memory budget,
129 /// the reaper interval, and the drain-timeout warning threshold.
130 config: Arc<DaemonConfig>,
131
132 /// Per-workspace state, keyed by [`WorkspaceKey`]. `RwLock` so
133 /// the read-only status path contends only with infrequent
134 /// insert / remove writers.
135 workspaces: RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>>,
136
137 /// Single-mutex admission accounting — see [`AdmissionState`]
138 /// module docs for the §G.5 invariant.
139 admission: Mutex<AdmissionState>,
140
141 /// Join handle of the spawned retention reaper. `Option` so
142 /// `Drop` can `.take().abort()` without requiring `&mut self`.
143 reaper: Mutex<Option<JoinHandle<()>>>,
144
145 /// Instant captured at construction. `daemon/status` reports
146 /// `uptime_seconds` = `Instant::now() - started_at`.
147 started_at: Instant,
148
149 /// Monotonic peak of `AdmissionState::total_committed_bytes`
150 /// observed across the daemon's uptime. Updated via `fetch_max`
151 /// on every admission-mutating operation. Amendment 2 §D.
152 total_memory_high_water: AtomicU64,
153
154 /// Post-publish persistence hook. Defaults to a no-op; Task 9's
155 /// daemon binary installs the production `QueryDbHook` that
156 /// wraps `sqry_db::persistence::save_derived`. Swapped via
157 /// [`Self::set_hook`] at daemon boot after the `QueryDb` is
158 /// constructed.
159 ///
160 /// `RwLock` rather than `ArcSwap` because `SharedHook = Arc<dyn
161 /// Trait + Send + Sync>` is cheap to clone inside the read
162 /// critical section, and the hook is only consulted on publish
163 /// (not on every query) — the RwLock is never a hot path.
164 hook: RwLock<SharedHook>,
165}
166
167impl WorkspaceManager {
168 /// Construct a fresh manager and spawn the retention reaper.
169 ///
170 /// The reaper is spawned on the current Tokio runtime. Callers
171 /// must therefore construct the manager from a Tokio context
172 /// (`#[tokio::main]`, an `async` block driven by `Runtime::block_on`,
173 /// etc.). Tests that don't need the reaper can use
174 /// [`Self::new_without_reaper`].
175 pub fn new(config: Arc<DaemonConfig>) -> Arc<Self> {
176 let mgr = Arc::new(Self {
177 config: Arc::clone(&config),
178 workspaces: RwLock::new(HashMap::new()),
179 admission: Mutex::new(AdmissionState::default()),
180 reaper: Mutex::new(None),
181 started_at: Instant::now(),
182 total_memory_high_water: AtomicU64::new(0),
183 hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
184 });
185 let handle = tokio::spawn(retention_reaper(Arc::downgrade(&mgr)));
186 *mgr.reaper.lock() = Some(handle);
187 mgr
188 }
189
190 /// Like [`Self::new`] but does not spawn the reaper — useful in
191 /// unit tests that drive the retention map synchronously via
192 /// [`Self::reap_once`].
193 #[doc(hidden)]
194 pub fn new_without_reaper(config: Arc<DaemonConfig>) -> Arc<Self> {
195 Arc::new(Self {
196 config,
197 workspaces: RwLock::new(HashMap::new()),
198 admission: Mutex::new(AdmissionState::default()),
199 reaper: Mutex::new(None),
200 started_at: Instant::now(),
201 total_memory_high_water: AtomicU64::new(0),
202 hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
203 })
204 }
205
206 /// Install a post-publish hook. Task 9's daemon binary calls
207 /// this once at startup after constructing the shared
208 /// `QueryDb`; unit tests call it to install a recording hook.
209 /// The old hook is dropped immediately; no retention semantics
210 /// apply.
211 pub fn set_hook(&self, hook: SharedHook) {
212 *self.hook.write() = hook;
213 }
214
215 /// Snapshot the currently installed hook. Internal — used by
216 /// `get_or_load` (Phase 6c iter-2) after dropping the
217 /// `workspaces.read()` guard so the `on_publish` dispatch
218 /// never nests under `workspaces`. Taking the hook under its
219 /// own short read-lock avoids holding the lock across the
220 /// dispatch so a misbehaving hook cannot block a concurrent
221 /// `set_hook` swap.
222 fn hook_snapshot(&self) -> SharedHook {
223 Arc::clone(&*self.hook.read())
224 }
225
226 /// Memory budget in bytes (derived from `config.memory_limit_mb`).
227 #[must_use]
228 pub fn memory_limit_bytes(&self) -> u64 {
229 self.config.memory_limit_bytes()
230 }
231
232 /// Access to the workspace registry (read-only view).
233 ///
234 /// Intentionally `pub(crate)` and `#[allow(dead_code)]` in Phase 6a:
235 /// Phase 6b consumers (`get_or_load`, `evict_lru`, `status`) are the
236 /// first real callers. Keeping the accessor here documents the
237 /// intended visibility boundary rather than forcing later code to
238 /// reach into the field directly.
239 #[allow(dead_code)]
240 pub(crate) fn workspaces(&self) -> &RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
241 &self.workspaces
242 }
243
244 /// Access to the admission mutex (internal). See
245 /// [`Self::workspaces`] for the `#[allow(dead_code)]` rationale.
246 #[allow(dead_code)]
247 pub(crate) fn admission(&self) -> &Mutex<AdmissionState> {
248 &self.admission
249 }
250
251 /// Look up a loaded workspace by key without acquiring `rebuild_lane`
252 /// or `admission`.
253 ///
254 /// Returns `Some(Arc<LoadedWorkspace>)` if a workspace is currently
255 /// registered under `key`, or `None` otherwise. The `workspaces`
256 /// read guard is acquired and released inside the call — callers
257 /// never observe it nested with any other lock.
258 ///
259 /// Added for the Task 7 [`crate::rebuild::RebuildDispatcher`] which
260 /// needs a cheap handle on `Arc<LoadedWorkspace>` as a precondition
261 /// before entering the canonical §J.4 ordered sequence
262 /// (`rebuild_lane` → `admission`). This is *not* part of the
263 /// ordered sequence itself — the §J.4 contract only constrains
264 /// paths that hold more than one lock simultaneously. Here, the
265 /// `workspaces` guard is dropped before the caller takes
266 /// `rebuild_lane`, so there is no nesting.
267 #[allow(dead_code)] // Consumed by rebuild.rs once Task 7 `rebuild` module lands.
268 /// Shared lookup: returns the `Arc<LoadedWorkspace>` keyed by
269 /// `key` if present. Used by `RebuildDispatcher::handle_changes`
270 /// (inside the crate) and by external test harnesses (Task 7
271 /// Phase 7b1 `rebuild_runner_gate.rs`) that need to inspect
272 /// workspace-level atomics (`rebuild_in_flight`, `rebuild_cancelled`)
273 /// or the `rebuild_lane` mutex directly.
274 ///
275 /// This is NOT a JSON-RPC surface — the IPC layer should use
276 /// `status()` for point-in-time workspace state. Direct `lookup`
277 /// access bypasses the LRU touch that `status()` performs.
278 pub fn lookup(&self, key: &WorkspaceKey) -> Option<Arc<LoadedWorkspace>> {
279 self.workspaces.read().get(key).cloned()
280 }
281
282 /// Retention reaper: a single pass over `retained_old`.
283 ///
284 /// Removes entries whose `Arc::strong_count` has dropped to 1 —
285 /// meaning the admission map is the last holder. Emits a
286 /// one-shot WARN log line when an entry exceeds
287 /// `rebuild_drain_timeout_ms` without dropping.
288 ///
289 /// **This is the only code path that removes tokens from
290 /// `retained_old`.** Any other code that mutates the retention
291 /// map is a violation of §G.3.
292 pub fn reap_once(&self) {
293 let timeout = Duration::from_millis(self.config.rebuild_drain_timeout_ms);
294 let now = Instant::now();
295 let mut to_log: Vec<OldGraphToken> = Vec::new();
296 {
297 let mut state = self.admission.lock();
298 state.retained_old.retain(|token, entry| {
299 if Arc::strong_count(&entry.graph) == 1 {
300 false // Last holder: drop entry + Arc together.
301 } else {
302 if !entry.warned_past_timeout
303 && now.saturating_duration_since(entry.published_at) > timeout
304 {
305 entry.warned_past_timeout = true;
306 to_log.push(*token);
307 }
308 true
309 }
310 });
311 }
312 for token in to_log {
313 warn!(
314 token = %token,
315 drain_timeout_ms = self.config.rebuild_drain_timeout_ms,
316 "sqryd retention reaper: retained old graph still held past drain timeout \
317 (not an accounting deadline — bytes stay accounted until strong_count == 1)",
318 );
319 }
320 }
321
322 /// Amendment 2 §G.1 two-phase reservation protocol.
323 ///
324 /// ```text
325 /// Phase 1 (workspaces read + admission read):
326 /// project_total + estimate ≤ limit? → commit
327 /// otherwise → pick LRU non-pinned
328 /// victims (`for_key` is
329 /// exempt — a workspace
330 /// cannot evict itself)
331 /// Phase 2 (no locks held):
332 /// for each victim: execute_eviction()
333 /// Phase 3 (admission alone):
334 /// re-check projected vs limit → authoritative commit
335 /// reserved_bytes += estimate → return RebuildReservation
336 /// ```
337 ///
338 /// Lock order is `workspaces → admission` in Phase 1, nothing in
339 /// Phase 2, `admission` alone in Phase 3. No nesting of
340 /// `rebuild_lane` — Task 7 adds that layer outside this function.
341 ///
342 /// Returns a [`RebuildReservation`] RAII guard on success. On
343 /// `Err`, the admission state is exactly pre-call — either no
344 /// eviction happened (headroom already available) or the
345 /// eviction cleared retained entries but could not fit.
346 pub fn reserve_rebuild(
347 self: &Arc<Self>,
348 for_key: &WorkspaceKey,
349 working_set_estimate: u64,
350 ) -> Result<RebuildReservation, DaemonError> {
351 let limit = self.memory_limit_bytes();
352
353 // --- Phase 1: peek + plan (holds workspaces → admission) ---
354 //
355 // Task 7 Phase 7b1 tightening: reject if the requester has been
356 // evicted or removed between dispatch and reservation. Both the
357 // membership check and the `rebuild_cancelled` read happen under
358 // the Phase-1 `workspaces.read()` so they serialise against
359 // `execute_eviction`'s `workspaces.write()` (which holds across
360 // both `rebuild_cancelled.store(true)` and `workspaces.remove`).
361 //
362 // Post-serialisation snapshot: the reader sees EITHER pre-eviction
363 // state (`Some(ws)` with `cancelled == false`) OR post-eviction
364 // state (`None` OR `cancelled == true`). Keeping both checks is
365 // belt-and-suspenders against any future eviction-protocol change
366 // that could reorder the two mutations.
367 let victims = {
368 let workspaces = self.workspaces.read();
369
370 let Some(requester_ws) = workspaces.get(for_key) else {
371 return Err(DaemonError::WorkspaceEvicted {
372 root: for_key.source_root.clone(),
373 });
374 };
375 if requester_ws.rebuild_cancelled.load(Ordering::Acquire) {
376 return Err(DaemonError::WorkspaceEvicted {
377 root: for_key.source_root.clone(),
378 });
379 }
380
381 let state = self.admission.lock();
382 let projected = state
383 .total_committed_bytes()
384 .saturating_add(working_set_estimate);
385 if projected <= limit {
386 Vec::new() // no victim selection needed
387 } else {
388 let need = projected - limit;
389 Self::plan_eviction(&workspaces, &state, need, for_key)
390 }
391 // Both guards drop here — Phase 2 runs with no locks.
392 };
393
394 // --- Phase 2: execute each eviction with no locks held ---
395 for key in &victims {
396 self.execute_eviction(key);
397 }
398
399 // --- Phase 2.5: opportunistic reap ----------------------
400 //
401 // `execute_eviction` moves the evicted workspace's bytes
402 // from `loaded_bytes` into `retained_old`. If no slow query
403 // still holds the evicted `Arc<CodeGraph>`, the retention
404 // reaper's next tick (25 ms) would free those bytes — but
405 // Phase 3's authoritative re-check runs *now*, before the
406 // reaper gets the chance. Run a synchronous reap pass so
407 // admission sees the free bytes immediately on the common
408 // case of "no outstanding slow queries". Slow-query-held
409 // entries stay retained and still count against the budget,
410 // which is correct per §G.5.
411 if !victims.is_empty() {
412 self.reap_once();
413 }
414
415 // --- Phase 3: authoritative commit (admission alone) ------
416 let mut state = self.admission.lock();
417 let projected = state
418 .total_committed_bytes()
419 .saturating_add(working_set_estimate);
420 if projected > limit {
421 return Err(DaemonError::MemoryBudgetExceeded {
422 limit_bytes: limit,
423 current_bytes: state.loaded_bytes,
424 reserved_bytes: state.reserved_bytes,
425 retained_bytes: state.retained_total_bytes(),
426 requested_bytes: working_set_estimate,
427 });
428 }
429 state.reserved_bytes = state.reserved_bytes.saturating_add(working_set_estimate);
430 self.bump_high_water(&state);
431 drop(state);
432
433 Ok(RebuildReservation {
434 manager: Arc::downgrade(self),
435 bytes: working_set_estimate,
436 released: false,
437 })
438 }
439
440 /// Phase-1 helper: pick the LRU-ordered set of non-pinned
441 /// workspace keys (excluding `for_key`) whose cumulative
442 /// `memory_bytes` meets or exceeds `need`.
443 ///
444 /// Returns keys in eviction order (oldest-first). Callers execute
445 /// evictions in Phase 2 without holding any lock.
446 fn plan_eviction(
447 workspaces: &HashMap<WorkspaceKey, Arc<LoadedWorkspace>>,
448 _state: &AdmissionState,
449 need: u64,
450 for_key: &WorkspaceKey,
451 ) -> Vec<WorkspaceKey> {
452 let mut candidates: Vec<(Instant, u64, WorkspaceKey)> = workspaces
453 .iter()
454 .filter(|(k, ws)| {
455 // Skip the requester (§G.7: a pinned workspace that
456 // exceeds the budget must fail, not evict itself) and
457 // every pinned workspace. Also skip workspaces in
458 // Evicted or Unloaded state — they have no bytes to
459 // reclaim and would be no-ops.
460 **k != *for_key
461 && !ws.pinned
462 && ws.load_state() != WorkspaceState::Evicted
463 && ws.load_state() != WorkspaceState::Unloaded
464 })
465 .map(|(k, ws)| {
466 let last = *ws.last_accessed.read();
467 let bytes = ws.memory_bytes.load(Ordering::Acquire) as u64;
468 (last, bytes, k.clone())
469 })
470 .collect();
471 // Oldest last_accessed first.
472 candidates.sort_by_key(|(ts, _, _)| *ts);
473
474 let mut plan = Vec::new();
475 let mut reclaimed: u64 = 0;
476 for (_, bytes, key) in candidates {
477 if reclaimed >= need {
478 break;
479 }
480 plan.push(key);
481 reclaimed = reclaimed.saturating_add(bytes);
482 }
483 plan
484 }
485
486 /// Execute Phase-2 of an eviction.
487 ///
488 /// Steps, in order:
489 ///
490 /// 1. Swap the workspace's `ArcSwap<CodeGraph>` to an empty
491 /// placeholder. This releases the old `Arc` from the
492 /// `ArcSwap` itself — any outstanding slow-query `Arc`s
493 /// still exist at the same strong count.
494 /// 2. Move those bytes from `loaded_bytes` into `retained_old`
495 /// (under the admission mutex) — keying on a fresh
496 /// [`OldGraphToken`]. This preserves the §G.5 invariant:
497 /// bytes shift from the loaded tier to the retained tier
498 /// rather than disappearing. The retention reaper frees the
499 /// entry (and therefore the bytes) when `strong_count` drops
500 /// to 1, i.e. when every slow query has released its `Arc`.
501 /// 3. Set `rebuild_cancelled = true` so any concurrent
502 /// `get_or_load` / rebuild running against this workspace
503 /// observes the signal at its next pass boundary and aborts
504 /// without publishing.
505 /// 4. Mark the state `Evicted` — and **leave the entry in the
506 /// manager map** as a tombstone. STEP_6 (workspace-aware-
507 /// cross-repo, 2026-04-26): keeping the tombstone is what
508 /// makes per-source-root partial eviction observable through
509 /// `daemon/workspaceStatus`. The aggregate must report
510 /// `state == Evicted` for individually-evicted source roots
511 /// while siblings remain `Loaded`. Removing the entry would
512 /// silently hide the eviction from the aggregate — exactly
513 /// the codex iter-1 BLOCK item.
514 ///
515 /// The order is load-bearing: the cancellation flag is set
516 /// *before* the state transition so a concurrent loader that
517 /// re-checks `rebuild_cancelled` after its build (per
518 /// [`Self::get_or_load`]) sees the cancel.
519 ///
520 /// To **fully unload** a workspace (drop the tombstone too),
521 /// callers route through [`Self::unload`] / `daemon/unload`,
522 /// which calls this function and then explicitly removes the
523 /// map entry. LRU eviction (`evict_lru`, `reserve_rebuild`'s
524 /// Phase 2) keeps the tombstone; only an explicit user-driven
525 /// unload removes it.
526 ///
527 /// Codex Task 6 Phase 6b iter-1 MAJOR: the pre-fix version
528 /// dropped the evicted `Arc` at function end and subtracted
529 /// bytes from `loaded_bytes` without inserting a retained
530 /// entry — leaking accounting for any graph still held by a
531 /// slow query.
532 ///
533 /// Codex STEP_6 iter-1 BLOCK: the pre-fix version unconditionally
534 /// removed the entry from `self.workspaces` after marking it
535 /// `Evicted`, defeating partial-eviction reporting. The
536 /// remove-entry step now lives in [`Self::unload`] alone.
537 fn execute_eviction(&self, key: &WorkspaceKey) {
538 // Hold `workspaces.write()` across the ENTIRE eviction —
539 // from the initial lookup through the final state store —
540 // so no concurrent `get_or_load` post-build re-check can
541 // interleave with us. Loaders serialize against eviction
542 // by holding `workspaces.read()` across their own publish
543 // critical section (see `get_or_load` step 7+).
544 //
545 // Lock order is `workspaces → admission` per plan §J.4.
546 // We take `admission` INSIDE this write-lock in Step 2,
547 // which is the outermost-first order the contract
548 // requires.
549 //
550 // Codex Task 6 Phase 6b iter-2 MAJOR: the iter-1 version
551 // took `workspaces.read()` only briefly for the initial
552 // lookup, then dropped it — leaving a window where a
553 // concurrent load's post-build re-check could observe
554 // workspace-still-in-map / cancelled-still-false and then
555 // publish into an already-evicted workspace. Holding
556 // `workspaces.write()` across the full eviction closes
557 // that window.
558 let mut workspaces = self.workspaces.write();
559 // Steps 1–3 (ArcSwap, admission tier transfer, cancellation
560 // + state store) are factored into the shared helper so
561 // [`Self::unload`] can reuse them under a single
562 // workspaces.write() guard.
563 //
564 // Step 4 (DO NOT remove from `self.workspaces`) is implicit
565 // here — the entry stays in the map as a tombstone. The
566 // tombstone is what STEP_6 partial-eviction reporting
567 // depends on. `unload` (the explicit user-driven path)
568 // removes the entry separately after this function returns.
569 self.evict_to_tombstone_locked(&mut workspaces, key);
570 drop(workspaces);
571 }
572
573 /// Load the workspace's graph, building it via `builder` if not
574 /// already present.
575 ///
576 /// Lifecycle gate:
577 ///
578 /// 1. Cache-hit fast path — if the workspace is present AND in
579 /// [`WorkspaceState::Loaded`], touch + return.
580 /// 2. CAS `Unloaded`/`Evicted`/`Failed` → `Loading`. Exactly one
581 /// caller wins. If another caller already holds the gate
582 /// (`Loading`/`Rebuilding`), return an error — Phase 6c /
583 /// Task 7 will introduce a wait-for-done notify channel.
584 /// 3. The winner arms a [`LoadingGuard`] RAII wrapper that
585 /// transitions the workspace into [`WorkspaceState::Failed`]
586 /// on *any* non-success exit (`Err`, early `return`, or
587 /// panic). This covers the Codex iter-1 MAJOR that a panic
588 /// from `builder.build()` would leave the workspace stuck
589 /// in Loading.
590 /// 4. Reserve admission headroom (§G.1 three-phase).
591 /// 5. Build the graph via the injected `builder`.
592 /// 6. Re-check `rebuild_cancelled` + workspace map membership
593 /// before publishing. If eviction ran during the build, the
594 /// reservation refunds via RAII and no graph is published.
595 /// 7. Publish via `publish_and_retain`. Disarm the LoadingGuard
596 /// + record success + touch.
597 /// 8. Release `workspaces_guard`, THEN dispatch the
598 /// post-publish `SqrydHook`. The hook fires outside every
599 /// outer manager lock so a hook impl is free to call back
600 /// into `unload` / `get_or_load` / `set_hook` / `status`
601 /// without deadlocking against the loader that fired it.
602 ///
603 /// Codex Task 6 Phase 6b iter-1 MAJOR (×2): the pre-fix version
604 /// clobbered a concurrent eviction's `rebuild_cancelled` signal
605 /// and could publish into a workspace already removed from the
606 /// map. The CAS + post-build re-check + LoadingGuard together
607 /// close both holes.
608 ///
609 /// Codex Task 6 Phase 6c iter-2 MAJOR: the pre-fix version
610 /// dispatched the hook from inside `publish_and_retain` while
611 /// the caller still held `workspaces.read()`, giving a hook
612 /// impl that needed `workspaces.write()` (e.g. via `unload`)
613 /// a guaranteed re-entrancy deadlock. Splitting publish and
614 /// hook dispatch into Steps 7 and 8 closes that hole.
615 ///
616 /// # Errors
617 ///
618 /// - [`DaemonError::MemoryBudgetExceeded`] if Phase 3 cannot
619 /// admit the reservation even after LRU eviction.
620 /// - [`DaemonError::WorkspaceBuildFailed`] surfaced from the
621 /// builder OR synthesised when a concurrent eviction races
622 /// the load (`reason = "workspace evicted mid-load"`).
623 pub fn get_or_load(
624 self: &Arc<Self>,
625 key: &WorkspaceKey,
626 builder: &dyn WorkspaceBuilder,
627 working_set_estimate: u64,
628 ) -> Result<Arc<CodeGraph>, DaemonError> {
629 // --- Step 1: cache-hit fast path ------------------------
630 {
631 let workspaces = self.workspaces.read();
632 if let Some(ws) = workspaces.get(key)
633 && ws.load_state() == WorkspaceState::Loaded
634 {
635 ws.touch();
636 return Ok(ws.graph.load_full());
637 }
638 }
639
640 // --- Step 2: take the lifecycle gate via state CAS ------
641 let ws = self.get_or_insert_workspace(key);
642 let allowed = [
643 WorkspaceState::Unloaded.as_u8(),
644 WorkspaceState::Failed.as_u8(),
645 WorkspaceState::Evicted.as_u8(),
646 ];
647 let mut acquired_from: Option<WorkspaceState> = None;
648 for prior in allowed {
649 if ws
650 .state
651 .compare_exchange(
652 prior,
653 WorkspaceState::Loading.as_u8(),
654 Ordering::AcqRel,
655 Ordering::Acquire,
656 )
657 .is_ok()
658 {
659 acquired_from = WorkspaceState::from_u8(prior);
660 break;
661 }
662 }
663 let Some(prior_state) = acquired_from else {
664 // Someone else already holds the gate (Loading /
665 // Rebuilding) OR raced us into Loaded. Cache-read and
666 // return if Loaded, else surface a transient error.
667 let current = ws.load_state();
668 if current == WorkspaceState::Loaded {
669 ws.touch();
670 return Ok(ws.graph.load_full());
671 }
672 return Err(DaemonError::WorkspaceBuildFailed {
673 root: key.source_root.clone(),
674 reason: format!("workspace load already in progress ({current})"),
675 });
676 };
677 // We own the gate. Clear the cancellation flag AFTER the
678 // CAS, but interpret a pre-cleared `cancelled = true`
679 // differently depending on the prior state we won from:
680 //
681 // - Prior = `Evicted`: STEP_6 iter-2. LRU eviction
682 // completed on this entry (workspaces.write() was held
683 // across both the `cancelled.store(true)` and the
684 // `state.store(Evicted)` in `execute_eviction`). The
685 // cancelled flag is a stale residue of that completed
686 // eviction; this `get_or_load` is a fresh reload and
687 // must clear cancelled unconditionally.
688 // - Prior = `Unloaded` / `Failed`: a concurrent eviction
689 // is racing us. The flag is a live cancel signal — the
690 // eviction reached `cancelled.store(true)` before our
691 // CAS but the state had not yet been moved to
692 // `Evicted`. Honour the cancel and fail this load.
693 let pre_cancelled = ws.rebuild_cancelled.swap(false, Ordering::AcqRel);
694 if pre_cancelled && prior_state != WorkspaceState::Evicted {
695 // Evict raced us out of the allowed-state list. Put
696 // the cancelled flag back, transition to Failed (so
697 // this caller's LoadingGuard doesn't fire), and fail.
698 ws.rebuild_cancelled.store(true, Ordering::Release);
699 ws.store_state(WorkspaceState::Failed);
700 return Err(DaemonError::WorkspaceBuildFailed {
701 root: key.source_root.clone(),
702 reason: "workspace evicted mid-load".to_string(),
703 });
704 }
705
706 // --- Step 3: arm LoadingGuard for panic / early-return --
707 let mut loading = LoadingGuard {
708 ws: &ws,
709 key,
710 armed: true,
711 };
712
713 // --- Step 4: reserve admission headroom ------------------
714 let reservation = self.reserve_rebuild(key, working_set_estimate)?;
715
716 // --- Step 5: build the graph ----------------------------
717 let graph = match builder.build(&key.source_root) {
718 Ok(g) => g,
719 Err(err) => {
720 drop(reservation);
721 // The LoadingGuard will flip us to Failed + record
722 // a synthetic error; overwrite with the builder's
723 // real error for diagnostic fidelity.
724 ws.record_failure(clone_err(&err));
725 loading.armed = false;
726 ws.store_state(WorkspaceState::Failed);
727 return Err(err);
728 }
729 };
730
731 // --- Step 6+7: atomic re-check + publish -------------
732 //
733 // Hold `workspaces.read()` across the final cancellation
734 // / map-membership re-check AND the `publish_and_retain`
735 // call. `execute_eviction` holds `workspaces.write()` for
736 // the duration of every eviction, so the RwLock makes the
737 // publish critical section atomic with respect to
738 // eviction: either eviction has fully completed (the map
739 // lookup fails), or eviction has not started (and cannot
740 // start while we hold the read lock).
741 //
742 // Lock order per plan §J.4: `workspaces → admission`.
743 // `publish_and_retain` takes `admission` internally;
744 // that nests under our `workspaces.read()` correctly.
745 //
746 // Codex Task 6 Phase 6b iter-2 MAJOR: the iter-1 version
747 // released `workspaces.read()` after the map-membership
748 // check and then called `publish_and_retain` unlocked.
749 // Eviction could slip in between the two, satisfying
750 // both re-checks yet still reaching `remove(key)` after
751 // our publish. Holding the read lock across the publish
752 // closes the window.
753 let workspaces_guard = self.workspaces.read();
754
755 // Cancellation check INSIDE the read lock. If cancellation
756 // was set before we grabbed the lock, we still observe it;
757 // if it's set after we release, a future load will see it.
758 if ws.rebuild_cancelled.load(Ordering::Acquire) {
759 drop(workspaces_guard);
760 drop(reservation);
761 ws.record_failure(DaemonError::WorkspaceBuildFailed {
762 root: key.source_root.clone(),
763 reason: "workspace evicted mid-load".to_string(),
764 });
765 loading.armed = false;
766 ws.store_state(WorkspaceState::Failed);
767 return Err(DaemonError::WorkspaceBuildFailed {
768 root: key.source_root.clone(),
769 reason: "workspace evicted mid-load".to_string(),
770 });
771 }
772 if !workspaces_guard.contains_key(key) {
773 drop(workspaces_guard);
774 drop(reservation);
775 ws.record_failure(DaemonError::WorkspaceBuildFailed {
776 root: key.source_root.clone(),
777 reason: "workspace removed mid-load".to_string(),
778 });
779 loading.armed = false;
780 ws.store_state(WorkspaceState::Failed);
781 return Err(DaemonError::WorkspaceBuildFailed {
782 root: key.source_root.clone(),
783 reason: "workspace removed mid-load".to_string(),
784 });
785 }
786
787 // Publish while still holding `workspaces.read()`. An
788 // eviction started in parallel is blocked on
789 // `workspaces.write()` and cannot observe / mutate this
790 // workspace until we release.
791 //
792 // Per Codex Task 6 Phase 6c iter-2 MAJOR: the hook dispatch
793 // is deliberately NOT performed inside `publish_and_retain`
794 // — firing it here would nest `self.hook.read()` under
795 // `workspaces.read()`, creating a re-entrancy deadlock for
796 // any hook impl that calls back into manager methods
797 // needing `workspaces.write()` (e.g. `unload`). The fix
798 // returns the published `Arc<CodeGraph>` from
799 // `publish_and_retain`, releases `workspaces_guard`, and
800 // THEN invokes `on_publish` under a disjoint short-lived
801 // `self.hook.read()` acquisition.
802 let (_token, published_arc) = self.publish_and_retain(reservation, &ws, graph);
803 ws.record_success(std::time::SystemTime::now());
804 ws.store_state(WorkspaceState::Loaded);
805 ws.touch();
806 loading.armed = false;
807 drop(workspaces_guard);
808
809 // Hook fires OUTSIDE every outer lock. The only lock taken
810 // here is `self.hook.read()` (for the brief clone inside
811 // `hook_snapshot`). A hook impl is now free to call any
812 // manager method — including `unload`, which needs
813 // `workspaces.write()` — without deadlocking against the
814 // loader that fired it. The dispatch itself is synchronous
815 // but spawn-only: hook impls are expected to return
816 // immediately after scheduling background work.
817 let hook = self.hook_snapshot();
818 hook.on_publish(&key.source_root, Arc::clone(&published_arc));
819
820 Ok(published_arc)
821 }
822
823 /// Look up or insert a [`LoadedWorkspace`] for `key`. Returns
824 /// the shared `Arc` so both the caller and the manager map
825 /// reference the same state.
826 fn get_or_insert_workspace(&self, key: &WorkspaceKey) -> Arc<LoadedWorkspace> {
827 // Upgrade path — try a read first to avoid the write-lock
828 // cost when the entry already exists.
829 if let Some(ws) = self.workspaces.read().get(key) {
830 return Arc::clone(ws);
831 }
832 let mut workspaces = self.workspaces.write();
833 Arc::clone(
834 workspaces
835 .entry(key.clone())
836 .or_insert_with(|| Arc::new(LoadedWorkspace::new(key.clone(), false))),
837 )
838 }
839
840 /// Evict the least-recently-accessed non-pinned workspace, if
841 /// any. Returns the evicted key on success, `None` if there are
842 /// no eligible candidates.
843 pub fn evict_lru(&self) -> Option<WorkspaceKey> {
844 let candidate = {
845 let workspaces = self.workspaces.read();
846 workspaces
847 .iter()
848 .filter(|(_, ws)| {
849 !ws.pinned
850 && ws.load_state() != WorkspaceState::Evicted
851 && ws.load_state() != WorkspaceState::Unloaded
852 })
853 .min_by_key(|(_, ws)| *ws.last_accessed.read())
854 .map(|(k, _)| k.clone())
855 };
856 if let Some(key) = &candidate {
857 self.execute_eviction(key);
858 }
859 candidate
860 }
861
862 /// Explicitly unload a workspace. Drives a full eviction
863 /// (releases graph data + admission accounting via
864 /// [`Self::evict_to_tombstone_locked`]) **and** removes the
865 /// tombstone entry from the manager map atomically under a
866 /// single `workspaces.write()` critical section.
867 ///
868 /// This is the only path that removes the map entry. LRU
869 /// eviction (`evict_lru`, `reserve_rebuild`'s Phase 2) leaves
870 /// the tombstone in place so per-source-root partial-eviction
871 /// state stays observable through `daemon/workspaceStatus` —
872 /// see [`Self::execute_eviction`] doc and STEP_6 iter-1 BLOCK.
873 ///
874 /// Returns `true` if the workspace was present, `false` if it
875 /// was already absent.
876 pub fn unload(&self, key: &WorkspaceKey) -> bool {
877 let mut workspaces = self.workspaces.write();
878 if !workspaces.contains_key(key) {
879 return false;
880 }
881 // Drop graph + admission bytes under the same write lock
882 // we will use for `remove`. Holding the lock across both
883 // operations means external observers see EITHER "entry
884 // present + Loaded" OR "entry absent" — never the "entry
885 // present + Evicted but about to be removed" intermediate
886 // state. (LRU eviction is a separate flow that DOES expose
887 // the Evicted tombstone — that is the STEP_6 contract.)
888 self.evict_to_tombstone_locked(&mut workspaces, key);
889 workspaces.remove(key);
890 true
891 }
892
893 /// Helper: run the eviction body (steps 1–4 of
894 /// [`Self::execute_eviction`]) with the caller's
895 /// `workspaces.write()` guard already held. Used by
896 /// [`Self::unload`] so unloading remains atomic — no observer
897 /// sees the `Evicted`-but-still-in-map intermediate window.
898 ///
899 /// Re-eviction safety mirrors `execute_eviction` — an entry
900 /// already in `Evicted` is left alone.
901 fn evict_to_tombstone_locked(
902 &self,
903 workspaces: &mut HashMap<WorkspaceKey, Arc<LoadedWorkspace>>,
904 key: &WorkspaceKey,
905 ) {
906 let Some(ws) = workspaces.get(key).cloned() else {
907 return;
908 };
909 if ws.load_state() == WorkspaceState::Evicted {
910 return;
911 }
912
913 let old_arc = ws.graph.swap(Arc::new(CodeGraph::new()));
914 let prior_bytes_usize = ws.memory_bytes.swap(0, Ordering::AcqRel);
915 let prior_bytes = prior_bytes_usize as u64;
916
917 let token = OldGraphToken::new();
918 {
919 let mut state = self.admission.lock();
920 state.loaded_bytes = state.loaded_bytes.saturating_sub(prior_bytes);
921 state.retained_old.insert(
922 token,
923 RetainedEntry {
924 bytes: prior_bytes,
925 graph: old_arc,
926 published_at: Instant::now(),
927 warned_past_timeout: false,
928 },
929 );
930 self.bump_high_water(&state);
931 }
932
933 ws.rebuild_cancelled.store(true, Ordering::Release);
934 ws.store_state(WorkspaceState::Evicted);
935 }
936
937 /// Find a loaded workspace by its directory path.
938 ///
939 /// Linear scan over all registered workspaces comparing each workspace's
940 /// `index_root` against `path`. Callers (e.g. `daemon/rebuild`) supply a
941 /// canonicalised path but not the full [`WorkspaceKey`].
942 /// O(n) in the number of loaded workspaces; in practice n is small.
943 ///
944 /// Returns `None` if no workspace with a matching root is found.
945 #[must_use]
946 pub fn find_key_and_workspace_by_path(
947 &self,
948 path: &std::path::Path,
949 ) -> Option<(WorkspaceKey, Arc<LoadedWorkspace>)> {
950 let workspaces = self.workspaces.read();
951 workspaces
952 .iter()
953 .find(|(k, _)| k.source_root == path)
954 .map(|(k, ws)| (k.clone(), Arc::clone(ws)))
955 }
956
957 /// Snapshot of daemon-wide status. Point-in-time, non-transactional.
958 pub fn status(&self) -> DaemonStatus {
959 let workspaces_snapshot: Vec<WorkspaceStatus> = {
960 let workspaces = self.workspaces.read();
961 let mut entries: Vec<_> = workspaces
962 .iter()
963 .map(|(k, ws)| WorkspaceStatus {
964 index_root: k.source_root.clone(),
965 state: ws.load_state(),
966 pinned: ws.pinned,
967 current_bytes: ws.memory_bytes.load(Ordering::Acquire) as u64,
968 high_water_bytes: ws.memory_high_water_bytes.load(Ordering::Acquire) as u64,
969 last_good_at: *ws.last_good_at.read(),
970 last_error: ws.last_error.read().as_ref().map(|e| e.to_string()),
971 retry_count: ws.retry_count.load(Ordering::Acquire),
972 // STEP_12 telemetry: surface both display and machine
973 // identity hex forms when the key carries a logical
974 // workspace_id; anonymous keys leave both as None so
975 // the wire shape is uniform.
976 workspace_id_short: k.workspace_id.as_ref().map(|id| id.as_short_hex()),
977 workspace_id_full: k.workspace_id.as_ref().map(|id| id.as_full_hex()),
978 })
979 .collect();
980 entries.sort_by(|a, b| a.index_root.cmp(&b.index_root));
981 entries
982 };
983
984 let (current_bytes, reserved_bytes, high_water_bytes) = {
985 let state = self.admission.lock();
986 let current = state.total_committed_bytes();
987 let reserved = state.reserved_bytes;
988 // Bump high-water here in case the status read saw a
989 // higher value than the last mutation captured. The
990 // `drop(state)` at the end of this block keeps the
991 // admission lock held across the `fetch_max` — serialising
992 // the high-water update with any concurrent publish.
993 let peak = self
994 .total_memory_high_water
995 .fetch_max(current, Ordering::AcqRel);
996 let peak = peak.max(current);
997 drop(state);
998 (current, reserved, peak)
999 };
1000
1001 DaemonStatus {
1002 uptime_seconds: self.started_at.elapsed().as_secs(),
1003 daemon_version: env!("CARGO_PKG_VERSION").to_string(),
1004 memory: MemoryStatus {
1005 limit_bytes: self.memory_limit_bytes(),
1006 current_bytes,
1007 reserved_bytes,
1008 high_water_bytes,
1009 },
1010 workspaces: workspaces_snapshot,
1011 }
1012 }
1013
1014 /// Aggregate `daemon/workspaceStatus` snapshot for a single
1015 /// `workspace_id` (STEP_6 of the workspace-aware-cross-repo plan).
1016 ///
1017 /// Walks the manager's workspace map, collects every
1018 /// [`WorkspaceKey`] whose `workspace_id == Some(target_id)`, and
1019 /// renders a deterministic per-source-root rollup. Per-source-root
1020 /// LRU eviction means individual entries can carry
1021 /// [`WorkspaceState::Evicted`] while siblings remain
1022 /// [`WorkspaceState::Loaded`] — the aggregate exposes that
1023 /// "partially evicted" shape unchanged via
1024 /// [`sqry_daemon_protocol::WorkspaceIndexStatus::partially_evicted`].
1025 ///
1026 /// Returns `None` when no entry in the map carries the requested
1027 /// `workspace_id`. The IPC layer surfaces that as
1028 /// `DaemonError::WorkspaceNotLoaded`; the manager itself does not
1029 /// classify "no entries" as an error so callers can distinguish a
1030 /// genuinely absent grouping from an empty workspace.
1031 #[must_use]
1032 pub fn workspace_index_status(
1033 &self,
1034 target_id: &sqry_daemon_protocol::WorkspaceId,
1035 ) -> Option<sqry_daemon_protocol::WorkspaceIndexStatus> {
1036 let workspaces = self.workspaces.read();
1037 let mut rows: Vec<sqry_daemon_protocol::WorkspaceSourceRootStatus> = workspaces
1038 .iter()
1039 .filter_map(|(k, ws)| {
1040 k.workspace_id
1041 .as_ref()
1042 .filter(|id| *id == target_id)
1043 .map(|_| sqry_daemon_protocol::WorkspaceSourceRootStatus {
1044 source_root: k.source_root.clone(),
1045 state: ws.load_state(),
1046 current_bytes: ws.memory_bytes.load(Ordering::Acquire) as u64,
1047 // STEP_11_4 — probe `<source_root>/.sqry/classpath/`
1048 // for presence. Status path; never blocks on
1049 // anything heavier than `fs::metadata`. Probe
1050 // failures (permission denied, racy unlink, …)
1051 // collapse to `false`; the LSP-side
1052 // `WorkspaceIndexStatus.warnings` channel surfaces
1053 // the underlying error detail when the daemon's
1054 // workspace builder hits the same probe.
1055 classpath_present: probe_classpath_present(&k.source_root),
1056 })
1057 })
1058 .collect();
1059 if rows.is_empty() {
1060 return None;
1061 }
1062 rows.sort_by(|a, b| a.source_root.cmp(&b.source_root));
1063 Some(sqry_daemon_protocol::WorkspaceIndexStatus {
1064 workspace_id: *target_id,
1065 // STEP_12 — derive the hex display strings here so JSON
1066 // consumers (`sqry daemon status --json`, MCP redaction,
1067 // CI scripts) never have to re-encode the 32-byte digest
1068 // themselves. The two strings are byte-derivative of
1069 // `workspace_id`; they do not introduce a new identity
1070 // axis.
1071 workspace_id_short: target_id.as_short_hex(),
1072 workspace_id_full: target_id.as_full_hex(),
1073 source_roots: rows,
1074 })
1075 }
1076
1077 /// Bump the daemon-wide high-water mark using the current
1078 /// `AdmissionState`. Must be called with `admission` held.
1079 fn bump_high_water(&self, state: &AdmissionState) {
1080 let current = state.total_committed_bytes();
1081 self.total_memory_high_water
1082 .fetch_max(current, Ordering::AcqRel);
1083 }
1084
1085 /// Test-only helper: insert a `LoadedWorkspace` into the manager
1086 /// map in a specific state, bypassing `get_or_load`. Used by
1087 /// `classify_for_serve` integration tests that need to observe
1088 /// the `Unloaded` / `Loading` arms (both states are transient
1089 /// during the normal load path).
1090 ///
1091 /// `#[doc(hidden)]` to signal "test affordance only" — same
1092 /// pattern as [`crate::TestGate`] / [`crate::TestCapture`].
1093 /// Production code should not call this.
1094 #[doc(hidden)]
1095 pub fn insert_workspace_in_state_for_test(&self, key: WorkspaceKey, state: WorkspaceState) {
1096 let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
1097 ws.store_state(state);
1098 self.workspaces.write().insert(key, ws);
1099 }
1100
1101 /// Test-only helper: insert a `LoadedWorkspace` into the manager
1102 /// map with explicit state, pinning, and pre-set `memory_bytes`.
1103 /// STEP_6 LRU + workspace-aggregate tests use this to exercise
1104 /// per-source-root eviction without spinning up a full
1105 /// `RealWorkspaceBuilder` pipeline. Returns the inserted Arc so
1106 /// the caller can keep observing it (e.g. to assert `load_state`
1107 /// after a follow-up mutation).
1108 ///
1109 /// `#[doc(hidden)]` to signal "test affordance only".
1110 #[doc(hidden)]
1111 pub fn insert_workspace_for_test_with_bytes(
1112 &self,
1113 key: WorkspaceKey,
1114 state: WorkspaceState,
1115 pinned: bool,
1116 bytes: usize,
1117 ) -> Arc<LoadedWorkspace> {
1118 let ws = Arc::new(LoadedWorkspace::new(key.clone(), pinned));
1119 ws.store_state(state);
1120 ws.update_memory(bytes);
1121 self.workspaces.write().insert(key, Arc::clone(&ws));
1122 ws
1123 }
1124
1125 /// Acquire the internal `workspaces` RwLock in read mode.
1126 ///
1127 /// Task 7 Phase 7c: exposed so
1128 /// [`crate::RebuildDispatcher::execute_one_rebuild`] can hold the
1129 /// read lock across its cancel/membership re-check and
1130 /// [`Self::publish_and_retain`], matching the pattern in
1131 /// [`Self::get_or_load`] (Codex Task 6 Phase 6b iter-2 MAJOR — the
1132 /// publish critical section MUST exclude concurrent
1133 /// [`Self::execute_eviction`] on the same key to avoid
1134 /// orphaned-publish / admission-drift).
1135 ///
1136 /// Callers MUST respect lock order §J.4: acquire `workspaces`
1137 /// BEFORE `admission`. The returned guard is released when the
1138 /// caller drops it.
1139 ///
1140 /// `pub(crate)` (iter-2 design Codex MAJOR): the accessor is only
1141 /// used within the daemon crate; exposing it publicly would leak
1142 /// lock mechanics and broaden the blast radius for future callers
1143 /// that might violate the §J.4 discipline.
1144 pub(crate) fn workspaces_read(
1145 &self,
1146 ) -> parking_lot::RwLockReadGuard<'_, HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
1147 self.workspaces.read()
1148 }
1149
1150 /// Classify a workspace's readiness to serve a query.
1151 ///
1152 /// Task 7 Phase 7c. Used by the Task 8 IPC router on every query
1153 /// dispatch. Pure-read: no mutations, no `.await` (sync).
1154 ///
1155 /// # Returns
1156 ///
1157 /// | Workspace state | Map present | Result |
1158 /// |-----------------|-------------|--------|
1159 /// | `Loaded` or `Rebuilding` | yes | `Ok(ServeVerdict::Fresh { graph, state })` |
1160 /// | `Failed`, age < cap (or cap == 0) | yes | `Ok(ServeVerdict::Stale { graph, age_hours, last_good_at, last_error })` |
1161 /// | `Failed`, age >= cap | yes | `Err(WorkspaceStaleExpired { age_hours, cap_hours, last_good_at, last_error })` (→ JSON-RPC -32002) |
1162 /// | `Failed`, no prior good | yes | `Err(WorkspaceBuildFailed { reason })` (→ -32001) |
1163 /// | `Unloaded` or `Loading` | yes | `Ok(ServeVerdict::NotReady { state })` |
1164 /// | `Evicted` | yes (transient window) | `Err(WorkspaceEvicted)` (→ -32004) |
1165 /// | any | no | `Err(WorkspaceEvicted)` (→ -32004) |
1166 ///
1167 /// # Lock order
1168 ///
1169 /// Task 7 Phase 7c feat iter-1 Codex BLOCKER fix: takes
1170 /// `workspaces.read()` across the FULL snapshot — state, graph,
1171 /// last_good, and last_error_text are all captured inside the
1172 /// read critical section. Dropping the read lock before reading
1173 /// the graph would allow `execute_eviction` (which needs
1174 /// `workspaces.write()` for the full graph-swap + state-store +
1175 /// map-remove sequence) to interleave, surfacing the empty
1176 /// post-eviction placeholder graph as a `Fresh` verdict.
1177 ///
1178 /// Does not acquire `admission` or `rebuild_lane`; only
1179 /// `workspaces` + per-workspace field locks. §J.4 order preserved.
1180 ///
1181 /// # Errors
1182 ///
1183 /// Returns the variants listed in the table above.
1184 pub fn classify_for_serve(
1185 &self,
1186 key: &WorkspaceKey,
1187 now: std::time::SystemTime,
1188 ) -> Result<ServeVerdict, DaemonError> {
1189 // Task 7 Phase 7c — feat iter-0 Codex BLOCKER fix: the
1190 // previous iter-0 implementation cloned the workspace Arc and
1191 // dropped `workspaces.read()` BEFORE reading state and graph.
1192 // `execute_eviction` (see Self::execute_eviction at line 494)
1193 // holds `workspaces.write()` across:
1194 // - ws.graph.swap(CodeGraph::new())
1195 // - admission accounting transfer
1196 // - ws.rebuild_cancelled.store(true)
1197 // - ws.store_state(WorkspaceState::Evicted)
1198 // - workspaces.remove(key)
1199 //
1200 // Without the read-lock hold extending across graph capture,
1201 // a classifier could observe `state == Loaded` but fetch the
1202 // post-eviction empty placeholder graph, returning
1203 // `Fresh { graph: empty }` — a correctness bug.
1204 //
1205 // Iter-1: snapshot every field under the read lock. The
1206 // returned `Arc<CodeGraph>` is a strong reference independent
1207 // of the lock lifetime; dropping the lock after capture is
1208 // safe for the caller.
1209 //
1210 // `last_error` is captured as a display-string (the error
1211 // type is not Clone; see `clone_err` rationale) because
1212 // `NoPriorGood` returns a `WorkspaceBuildFailed { reason }`
1213 // that embeds the stringified prior error.
1214 let snapshot = {
1215 let workspaces = self.workspaces.read();
1216 let Some(ws) = workspaces.get(key).cloned() else {
1217 return Err(DaemonError::WorkspaceEvicted {
1218 root: key.source_root.clone(),
1219 });
1220 };
1221 let state = ws.load_state();
1222 let graph = ws.graph.load_full();
1223 let last_good = *ws.last_good_at.read();
1224 let last_error_text = ws.last_error.read().as_ref().map(|e| e.to_string());
1225 (state, graph, last_good, last_error_text)
1226 // workspaces.read() dropped here — the (state, graph)
1227 // pair is now a coherent snapshot taken atomically w.r.t.
1228 // execute_eviction's workspaces.write().
1229 };
1230 let (state, graph, last_good, last_error_text) = snapshot;
1231
1232 match state {
1233 WorkspaceState::Loaded | WorkspaceState::Rebuilding => {
1234 Ok(ServeVerdict::Fresh { graph, state })
1235 }
1236 WorkspaceState::Failed => {
1237 let cap = self.config.stale_serve_max_age_hours;
1238 match classify_staleness(last_good, cap, now) {
1239 StalenessVerdict::NoPriorGood => Err(DaemonError::WorkspaceBuildFailed {
1240 root: key.source_root.clone(),
1241 reason: last_error_text
1242 .unwrap_or_else(|| "no prior successful build".into()),
1243 }),
1244 StalenessVerdict::Stale { age_hours } => Ok(ServeVerdict::Stale {
1245 graph,
1246 age_hours,
1247 // Invariant: `classify_staleness` only returns
1248 // `Stale` when `last_good.is_some()` (see
1249 // `workspace/staleness.rs:54-73`).
1250 last_good_at: last_good
1251 .expect("Stale verdict only emitted when last_good.is_some()"),
1252 last_error: last_error_text,
1253 }),
1254 StalenessVerdict::Expired { age_hours } => {
1255 Err(DaemonError::WorkspaceStaleExpired {
1256 root: key.source_root.clone(),
1257 age_hours,
1258 cap_hours: cap,
1259 last_good_at: last_good,
1260 last_error: last_error_text,
1261 })
1262 }
1263 }
1264 }
1265 WorkspaceState::Unloaded | WorkspaceState::Loading => {
1266 Ok(ServeVerdict::NotReady { state })
1267 }
1268 // Transient window between store_state(Evicted) and
1269 // workspaces.remove; same semantics as map-absent.
1270 WorkspaceState::Evicted => Err(DaemonError::WorkspaceEvicted {
1271 root: key.source_root.clone(),
1272 }),
1273 }
1274 }
1275
1276 /// Consume a [`RebuildReservation`] plus a freshly-built
1277 /// [`CodeGraph`] and atomically publish it to the workspace.
1278 ///
1279 /// Implements Amendment 2 §G.2:
1280 ///
1281 /// - Captures the prior `Arc<CodeGraph>` and `memory_bytes` into
1282 /// a [`RollbackGuard`] **before** any swap — so a panic at any
1283 /// point before the admission update reverts cleanly.
1284 /// - Swaps the `ArcSwap<CodeGraph>` to the new graph.
1285 /// - Swaps the per-workspace `memory_bytes` to the new size.
1286 /// - Under the admission mutex: moves `bytes_delta` from
1287 /// `reserved_bytes` into `loaded_bytes`, inserts a
1288 /// [`RetainedEntry`] holding the old `Arc` until the retention
1289 /// reaper frees it.
1290 /// - Disarms the [`RollbackGuard`] on success.
1291 ///
1292 /// Sync `fn`. There is no `.await` between the first swap and the
1293 /// admission insert — tokio task cancellation can only interrupt
1294 /// at `.await` points, so this sequence is atomic with respect
1295 /// to cancellation per §G.2.
1296 ///
1297 /// Returns the minted [`OldGraphToken`] for tracing / integration
1298 /// tests, together with an `Arc<CodeGraph>` handle to the freshly
1299 /// published graph. Per Codex Task 6 Phase 6c iter-2 MAJOR the
1300 /// post-publish `SqrydHook` dispatch is NOT performed here —
1301 /// firing `on_publish` under the `workspaces.read()` guard
1302 /// `get_or_load` holds across this call would nest
1303 /// `self.hook.read()` inside `workspaces`, giving hook impls a
1304 /// re-entrancy deadlock hole if they call back into manager
1305 /// methods needing `workspaces.write()`. The caller is
1306 /// responsible for dispatching the hook after dropping every
1307 /// outer workspaces-lock holder.
1308 pub fn publish_and_retain(
1309 self: &Arc<Self>,
1310 reservation: RebuildReservation,
1311 workspace: &LoadedWorkspace,
1312 new_graph: CodeGraph,
1313 ) -> (OldGraphToken, Arc<CodeGraph>) {
1314 // Compute the new graph's heap bytes before handing it to the
1315 // ArcSwap — once published, a concurrent reader holds it
1316 // alive, and measuring after publish race-races with the
1317 // admission update.
1318 let new_bytes_usize = new_graph.heap_bytes();
1319 // `usize as u64` is a no-op on 64-bit and a widen on 32-bit.
1320 let new_bytes = new_bytes_usize as u64;
1321
1322 // Take the reservation by value so this function owns it and
1323 // the Drop impl fires on any unwind path. `released` stays
1324 // `false` until *after* the admission commit succeeds, so a
1325 // panic before or during the admission mutex section refunds
1326 // `reserved_bytes` back to the pool (Codex Task 6 Phase 6a
1327 // iter-1 MAJOR: the previous ordering disarmed before the
1328 // commit and could leak reserved bytes on unwind).
1329 let mut reservation = reservation;
1330 let reservation_bytes = reservation.bytes;
1331
1332 let new_arc = Arc::new(new_graph);
1333 // Clone the Arc BEFORE the swap so the caller can still
1334 // obtain a handle to the published graph after the swap
1335 // moves `new_arc` into the ArcSwap. Re-reading via
1336 // `workspace.graph.load_full()` after the swap would work
1337 // today but is racy against any future swap path that
1338 // could run between the swap and the load — cheaper and
1339 // safer to clone the Arc once.
1340 let published_arc = Arc::clone(&new_arc);
1341 let token = OldGraphToken::new();
1342
1343 // --- RollbackGuard setup --------------------------------
1344 let prior_arc_for_rollback = workspace.graph.load_full();
1345 let prior_bytes = workspace
1346 .memory_bytes
1347 .load(std::sync::atomic::Ordering::Acquire);
1348
1349 let mut rollback = RollbackGuard {
1350 ws: workspace,
1351 prior_arc: Some(prior_arc_for_rollback),
1352 prior_bytes,
1353 armed: true,
1354 };
1355
1356 // --- Non-recoverable zone (no .await; no fallible ops) ---
1357 //
1358 // If any code between this point and `reservation.released = true`
1359 // panics, the following Drop order runs on unwind:
1360 // 1. `rollback` Drop reverts `workspace.graph` and
1361 // `workspace.memory_bytes` to the pre-swap values
1362 // (because `armed == true`).
1363 // 2. `reservation` Drop reacquires the admission mutex and
1364 // refunds `reservation_bytes` back to `reserved_bytes`
1365 // (because `released == false`).
1366 // This is the §G.5 invariant-preserving rollback described in
1367 // the plan; the reservation refund was missing before the
1368 // iter-1 fix.
1369 let old_arc = workspace.graph.swap(new_arc);
1370 let prev_memory_bytes = workspace.update_memory(new_bytes_usize);
1371 debug_assert_eq!(
1372 prev_memory_bytes, prior_bytes,
1373 "RollbackGuard prior_bytes must match update_memory's returned prior",
1374 );
1375
1376 // --- Admission commit (mutex-only; no other locks) -------
1377 //
1378 // The critical section is ordered so the only *fallible* op —
1379 // `HashMap::insert`, which can allocate on grow and therefore
1380 // panic — runs FIRST, before any admission counter is mutated
1381 // and before the reservation is disarmed. Everything that
1382 // follows (`saturating_*` arithmetic + `reservation.released
1383 // = true`) is guaranteed infallible, so once we reach those
1384 // lines the critical section cannot unwind mid-way and leave
1385 // admission state inconsistent.
1386 //
1387 // Codex Task 6 Phase 6a iter-2 MAJOR: the iter-1 ordering
1388 // disarmed the reservation before `retained_old.insert`
1389 // completed. A panic from the insert would leave
1390 // `reserved_bytes` drained and `loaded_bytes` updated while
1391 // no retained entry existed — rollback reverts ws.graph +
1392 // ws.memory_bytes but cannot refund the reservation
1393 // (released=true). The fix moves insert to the front of the
1394 // section so any unwind preserves the §G.5 invariant.
1395 //
1396 // Pre-build the `RetainedEntry` outside the lock so only the
1397 // `HashMap::insert` itself can allocate; the struct
1398 // construction is a field-by-field move.
1399 let retained_entry = RetainedEntry {
1400 bytes: prev_memory_bytes as u64,
1401 graph: old_arc,
1402 published_at: Instant::now(),
1403 warned_past_timeout: false,
1404 };
1405
1406 let mut state = self.admission.lock();
1407
1408 // Step 1 — fallible. `HashMap::insert` may reallocate; if it
1409 // panics the state is left unchanged (hashbrown's insert is
1410 // exception-safe: a failed grow leaves the map in its prior
1411 // capacity and does not insert the new entry). Unwind drops
1412 // `state` (releasing the mutex), then `rollback` reverts
1413 // ws.graph + ws.memory_bytes, then the `reservation`
1414 // (released=false) refunds `reservation_bytes` from
1415 // `reserved_bytes`. `loaded_bytes` is not mutated because
1416 // the lines below never run.
1417 state.retained_old.insert(token, retained_entry);
1418
1419 // Step 2 — infallible arithmetic (saturating ops on u64).
1420 // Move reservation → loaded. The prior workspace bytes are
1421 // already counted in `loaded_bytes` (they were added the
1422 // last time this workspace published). Swap by subtracting
1423 // the old and adding the new — keeps the §G.5 invariant
1424 // monotonic w.r.t. the commit.
1425 state.reserved_bytes = state.reserved_bytes.saturating_sub(reservation_bytes);
1426 state.loaded_bytes = state
1427 .loaded_bytes
1428 .saturating_sub(prev_memory_bytes as u64)
1429 .saturating_add(new_bytes);
1430
1431 // Step 3 — infallible disarm. The admission commit is
1432 // complete; the reservation's Drop is now a no-op so it
1433 // does not double-refund.
1434 reservation.released = true;
1435 self.bump_high_water(&state);
1436 drop(state);
1437
1438 rollback.armed = false; // disarm on success
1439
1440 // NOTE: `SqrydHook::on_publish` is NOT dispatched here.
1441 // `get_or_load` holds `workspaces.read()` across this call
1442 // (to make the re-check + publish critical section atomic
1443 // with respect to eviction, see that function's Step 6+7
1444 // comment block). Firing the hook here would acquire
1445 // `self.hook.read()` nested under `workspaces`, giving a
1446 // hook impl that calls back into manager methods needing
1447 // `workspaces.write()` (e.g. `unload`) a guaranteed
1448 // deadlock. The caller dispatches the hook after dropping
1449 // `workspaces_guard` — see `get_or_load` post-publish.
1450 //
1451 // `NoOpHook` remains the default; Task 9's daemon binary
1452 // installs the production `QueryDbHook` that wraps
1453 // `sqry_db::persistence::save_derived` with a timeout.
1454 (token, published_arc)
1455 }
1456
1457 /// Release the reaper handle on Drop. Safe to call from any
1458 /// context — abort is a best-effort signal.
1459 fn shutdown_reaper(&self) {
1460 if let Some(handle) = self.reaper.lock().take() {
1461 handle.abort();
1462 }
1463 }
1464}
1465
1466impl Drop for WorkspaceManager {
1467 fn drop(&mut self) {
1468 self.shutdown_reaper();
1469 }
1470}
1471
1472/// STEP_11_4 — probe `<source_root>/.sqry/classpath/` for presence at
1473/// `daemon/workspaceStatus` time.
1474///
1475/// Status path: cheap (`fs::metadata`), never blocks on anything
1476/// heavier, and degrades silently to `false` on any error so a racy
1477/// classpath unlink or a permission denial cannot fail the status
1478/// response. The LSP-side `WorkspaceIndexStatus.warnings` channel
1479/// surfaces the underlying error detail when the daemon's workspace
1480/// builder hits the same probe and wants to record the failure.
1481fn probe_classpath_present(source_root: &std::path::Path) -> bool {
1482 let probe = source_root.join(".sqry").join("classpath");
1483 std::fs::metadata(&probe)
1484 .map(|m| m.is_dir())
1485 .unwrap_or(false)
1486}
1487
1488// ---------------------------------------------------------------------------
1489// LoadingGuard (panic-safety for get_or_load)
1490// ---------------------------------------------------------------------------
1491
1492/// RAII guard that transitions the workspace into
1493/// [`WorkspaceState::Failed`] on any non-success exit from
1494/// [`WorkspaceManager::get_or_load`] — including panics.
1495///
1496/// Codex Task 6 Phase 6b iter-1 MAJOR: without this guard, a panic
1497/// in `builder.build()` would leave the workspace stuck in
1498/// `Loading` with `last_error = None`, permanently blocking
1499/// re-load attempts and corrupting status output.
1500///
1501/// The guard is armed until the final `loaded.armed = false` on
1502/// the success path (after publish succeeds). Every other exit
1503/// path — `Err` from admission, `Err` from builder, panic from
1504/// builder, early returns on the cancellation/map-membership
1505/// re-check — fires `Drop` with `armed == true` and performs the
1506/// Failed-state transition.
1507pub(crate) struct LoadingGuard<'a> {
1508 pub(crate) ws: &'a LoadedWorkspace,
1509 pub(crate) key: &'a WorkspaceKey,
1510 pub(crate) armed: bool,
1511}
1512
1513impl<'a> Drop for LoadingGuard<'a> {
1514 fn drop(&mut self) {
1515 if !self.armed {
1516 return;
1517 }
1518 // Only overwrite `last_error` if it hasn't been populated
1519 // with a more specific diagnostic by the explicit `Err`
1520 // branches above — those set last_error before `armed =
1521 // false`, so seeing None here means we are in the panic
1522 // window or an early-return path that did not record one.
1523 {
1524 let mut slot = self.ws.last_error.write();
1525 if slot.is_none() {
1526 *slot = Some(DaemonError::WorkspaceBuildFailed {
1527 root: self.key.source_root.clone(),
1528 reason: "workspace load aborted unexpectedly".to_string(),
1529 });
1530 }
1531 }
1532 self.ws.retry_count.fetch_add(1, Ordering::AcqRel);
1533 self.ws.store_state(WorkspaceState::Failed);
1534 }
1535}
1536
1537/// Clone a [`DaemonError`] for storage on [`LoadedWorkspace::last_error`]
1538/// or for propagation to `handle_changes` error returns in
1539/// [`crate::RebuildDispatcher::execute_one_rebuild`] (Task 7 Phase 7b1).
1540///
1541/// [`DaemonError`] is not `Clone` because some variants wrap
1542/// non-`Clone` types (notably [`std::io::Error`] and
1543/// [`anyhow::Error`]). `last_error` is a diagnostic surface only —
1544/// it is serialised as `e.to_string()` by the status endpoint — so
1545/// reducing the error to a textual form is the right trade-off here.
1546pub(crate) fn clone_err(err: &DaemonError) -> DaemonError {
1547 match err {
1548 DaemonError::WorkspaceBuildFailed { root, reason } => DaemonError::WorkspaceBuildFailed {
1549 root: root.clone(),
1550 reason: reason.clone(),
1551 },
1552 DaemonError::WorkspaceStaleExpired {
1553 root,
1554 age_hours,
1555 cap_hours,
1556 last_good_at,
1557 last_error,
1558 } => DaemonError::WorkspaceStaleExpired {
1559 root: root.clone(),
1560 age_hours: *age_hours,
1561 cap_hours: *cap_hours,
1562 // `SystemTime` is `Copy`; `Option<String>` needs `.clone()`.
1563 last_good_at: *last_good_at,
1564 last_error: last_error.clone(),
1565 },
1566 DaemonError::MemoryBudgetExceeded {
1567 limit_bytes,
1568 current_bytes,
1569 reserved_bytes,
1570 retained_bytes,
1571 requested_bytes,
1572 } => DaemonError::MemoryBudgetExceeded {
1573 limit_bytes: *limit_bytes,
1574 current_bytes: *current_bytes,
1575 reserved_bytes: *reserved_bytes,
1576 retained_bytes: *retained_bytes,
1577 requested_bytes: *requested_bytes,
1578 },
1579 DaemonError::WorkspaceEvicted { root } => {
1580 DaemonError::WorkspaceEvicted { root: root.clone() }
1581 }
1582 DaemonError::WorkspaceNotLoaded { root } => {
1583 DaemonError::WorkspaceNotLoaded { root: root.clone() }
1584 }
1585 // Task 8 Phase 8c U5 — tool-dispatch variants surfaced by
1586 // `tool_core::classify_and_execute` (Phase 8c U6). Each
1587 // variant must round-trip cleanly so `classify_for_serve`
1588 // reproduces the original typed error on every read path —
1589 // collapsing any of these into `WorkspaceBuildFailed` would
1590 // break the wire-contract codes registered in
1591 // [`crate::lib`] / the design doc §O.
1592 DaemonError::ToolTimeout {
1593 root,
1594 secs,
1595 deadline_ms,
1596 } => DaemonError::ToolTimeout {
1597 root: root.clone(),
1598 secs: *secs,
1599 deadline_ms: *deadline_ms,
1600 },
1601 DaemonError::InvalidArgument { reason } => DaemonError::InvalidArgument {
1602 reason: reason.clone(),
1603 },
1604 DaemonError::Internal(err) => {
1605 // `anyhow::Error` is not `Clone`; re-create it from its
1606 // full-chain `Display` form (`{:#}`) so every layer of
1607 // the causal chain survives the round-trip. Callers only
1608 // read this via `to_string()` on the status endpoint, so
1609 // losing the typed causes (if any) is acceptable.
1610 DaemonError::Internal(anyhow::anyhow!("{err:#}"))
1611 }
1612 // Task 9 U1 — lifecycle variants (AlreadyRunning, AutoStartTimeout,
1613 // SignalSetup). These errors all fire before IpcServer::bind and
1614 // therefore before any workspace is registered; they should never
1615 // reach `clone_err`. If they somehow do (e.g. a future code path
1616 // stores them in `last_error`), collapse to WorkspaceBuildFailed so
1617 // the clone contract is preserved without losing observability.
1618 DaemonError::AlreadyRunning { socket, lock, .. } => DaemonError::WorkspaceBuildFailed {
1619 root: Path::new("<unknown>").to_path_buf(),
1620 reason: format!(
1621 "daemon already running on socket {} (lock: {})",
1622 socket.display(),
1623 lock.display()
1624 ),
1625 },
1626 DaemonError::AutoStartTimeout {
1627 timeout_secs,
1628 socket,
1629 } => DaemonError::WorkspaceBuildFailed {
1630 root: Path::new("<unknown>").to_path_buf(),
1631 reason: format!(
1632 "daemon did not become ready within {timeout_secs}s on socket {}",
1633 socket.display()
1634 ),
1635 },
1636 DaemonError::SignalSetup { source } => DaemonError::WorkspaceBuildFailed {
1637 root: Path::new("<unknown>").to_path_buf(),
1638 reason: format!("failed to install signal handlers: {source}"),
1639 },
1640 other @ (DaemonError::Config { .. } | DaemonError::Io(_)) => {
1641 DaemonError::WorkspaceBuildFailed {
1642 root: Path::new("<unknown>").to_path_buf(),
1643 reason: other.to_string(),
1644 }
1645 }
1646 }
1647}
1648
1649// ---------------------------------------------------------------------------
1650// RebuildReservation (RAII)
1651// ---------------------------------------------------------------------------
1652
1653/// RAII guard representing an in-flight rebuild's admission headroom.
1654///
1655/// - On the success path, the guard is consumed by
1656/// [`WorkspaceManager::publish_and_retain`], which sets
1657/// `released = true` before draining `bytes` from `reserved_bytes`.
1658/// - On any other drop path (rebuild panic, cancellation, early
1659/// return on plugin error) the guard's `Drop` releases the reserved
1660/// bytes back to the admission pool. This keeps the §G.5 invariant
1661/// intact across every exit path.
1662///
1663/// The manager pointer is a [`Weak`] so a guard that outlives its
1664/// manager (e.g. the daemon is dropped mid-rebuild) does not try to
1665/// touch freed memory. A `None` upgrade on drop is silently ignored —
1666/// the manager took the retained bytes with it when it dropped.
1667#[must_use = "RebuildReservation must either be consumed by publish_and_retain() \
1668 or intentionally dropped to return its bytes to the admission pool"]
1669pub struct RebuildReservation {
1670 manager: Weak<WorkspaceManager>,
1671 bytes: u64,
1672 released: bool,
1673}
1674
1675impl RebuildReservation {
1676 /// How many bytes this reservation currently holds.
1677 #[must_use]
1678 pub fn bytes(&self) -> u64 {
1679 self.bytes
1680 }
1681}
1682
1683impl std::fmt::Debug for RebuildReservation {
1684 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1685 f.debug_struct("RebuildReservation")
1686 .field("bytes", &self.bytes)
1687 .field("released", &self.released)
1688 .finish()
1689 }
1690}
1691
1692impl Drop for RebuildReservation {
1693 fn drop(&mut self) {
1694 if self.released {
1695 return;
1696 }
1697 if let Some(mgr) = self.manager.upgrade() {
1698 let mut state = mgr.admission.lock();
1699 state.reserved_bytes = state.reserved_bytes.saturating_sub(self.bytes);
1700 }
1701 }
1702}
1703
1704// ---------------------------------------------------------------------------
1705// RollbackGuard (panic-safety for publish_and_retain)
1706// ---------------------------------------------------------------------------
1707
1708/// Panic-safe rollback wrapper used by [`WorkspaceManager::publish_and_retain`].
1709///
1710/// Captures the prior `Arc<CodeGraph>` and the prior `memory_bytes`
1711/// *before* any swap. If the thread unwinds between the swap and the
1712/// admission-mutex acquisition, the guard's `Drop` restores both
1713/// fields — leaving the workspace serving its pre-rebuild graph as if
1714/// the publish never happened.
1715///
1716/// Correctness depends on three contracts:
1717///
1718/// 1. The guard is constructed *before* the `ArcSwap::swap` call.
1719/// 2. `armed` is set to `false` only on the success path, after the
1720/// admission mutex has released.
1721/// 3. No fallible operation (heap allocation failure, etc.) runs
1722/// between the two swaps — otherwise the guard would be asked to
1723/// reverse a partial swap.
1724pub(crate) struct RollbackGuard<'a> {
1725 pub(crate) ws: &'a LoadedWorkspace,
1726 pub(crate) prior_arc: Option<Arc<CodeGraph>>,
1727 pub(crate) prior_bytes: usize,
1728 pub(crate) armed: bool,
1729}
1730
1731impl<'a> Drop for RollbackGuard<'a> {
1732 fn drop(&mut self) {
1733 if !self.armed {
1734 return;
1735 }
1736 if let Some(arc) = self.prior_arc.take() {
1737 self.ws.graph.store(arc);
1738 }
1739 self.ws
1740 .memory_bytes
1741 .store(self.prior_bytes, std::sync::atomic::Ordering::Release);
1742 }
1743}
1744
1745// ---------------------------------------------------------------------------
1746// Retention reaper task
1747// ---------------------------------------------------------------------------
1748
1749/// Long-lived tokio task: polls [`WorkspaceManager::reap_once`] on a
1750/// fixed 25 ms cadence (A2 §G.3).
1751///
1752/// Takes a `Weak<WorkspaceManager>` so a `WorkspaceManager::drop`
1753/// before the task notices the abort signal does not dereference
1754/// freed memory. The first failed `Weak::upgrade` exits the loop
1755/// cleanly.
1756async fn retention_reaper(mgr: Weak<WorkspaceManager>) {
1757 let interval = Duration::from_millis(25);
1758 loop {
1759 tokio::time::sleep(interval).await;
1760 let Some(mgr) = mgr.upgrade() else {
1761 return;
1762 };
1763 mgr.reap_once();
1764 }
1765}
1766
1767// ---------------------------------------------------------------------------
1768// Tests
1769// ---------------------------------------------------------------------------
1770
1771#[cfg(test)]
1772mod tests {
1773 use std::{path::PathBuf, sync::atomic::Ordering};
1774
1775 use sqry_core::project::ProjectRootMode;
1776
1777 use crate::config::DaemonConfig;
1778
1779 use super::{
1780 super::{loaded::LoadedWorkspace, state::WorkspaceKey},
1781 *,
1782 };
1783
1784 fn make_config() -> Arc<DaemonConfig> {
1785 // 1 MiB budget keeps the arithmetic tractable in assertions.
1786 Arc::new(DaemonConfig {
1787 memory_limit_mb: 1,
1788 ..DaemonConfig::default()
1789 })
1790 }
1791
1792 fn make_workspace() -> Arc<LoadedWorkspace> {
1793 Arc::new(LoadedWorkspace::new(
1794 WorkspaceKey::new(
1795 PathBuf::from("/repos/example"),
1796 ProjectRootMode::GitRoot,
1797 0x1,
1798 ),
1799 false,
1800 ))
1801 }
1802
1803 /// Register a workspace under `key` on `mgr` so that
1804 /// `reserve_rebuild` sees it present in its Phase-1
1805 /// `workspaces.read()` scope. Phase 7b1 tightens `reserve_rebuild`
1806 /// to reject unregistered keys with `DaemonError::WorkspaceEvicted`,
1807 /// so every admission-level test that expects a reservation (or a
1808 /// memory-budget rejection) must insert a workspace first.
1809 fn register_workspace(mgr: &WorkspaceManager, key: &WorkspaceKey) {
1810 mgr.workspaces.write().insert(
1811 key.clone(),
1812 Arc::new(LoadedWorkspace::new(key.clone(), false)),
1813 );
1814 }
1815
1816 #[test]
1817 fn reserve_rebuild_succeeds_when_headroom_available() {
1818 let mgr = WorkspaceManager::new_without_reaper(make_config());
1819 let key = WorkspaceKey::new(
1820 PathBuf::from("/repos/example"),
1821 ProjectRootMode::GitRoot,
1822 0x1,
1823 );
1824 register_workspace(&mgr, &key);
1825 let reservation = mgr
1826 .reserve_rebuild(&key, 500_000) // 500 kB into 1 MiB budget
1827 .expect("reservation fits");
1828 assert_eq!(reservation.bytes(), 500_000);
1829 assert_eq!(mgr.admission.lock().reserved_bytes, 500_000);
1830 drop(reservation);
1831 assert_eq!(
1832 mgr.admission.lock().reserved_bytes,
1833 0,
1834 "dropping an unconsumed reservation must return its bytes",
1835 );
1836 }
1837
1838 #[test]
1839 fn reserve_rebuild_rejects_oversized_request() {
1840 let mgr = WorkspaceManager::new_without_reaper(make_config());
1841 let key = WorkspaceKey::new(
1842 PathBuf::from("/repos/example"),
1843 ProjectRootMode::GitRoot,
1844 0x1,
1845 );
1846 register_workspace(&mgr, &key);
1847 let err = mgr.reserve_rebuild(&key, 10 * 1024 * 1024).expect_err(
1848 "a reservation bigger than the budget must be rejected with MemoryBudgetExceeded",
1849 );
1850 match err {
1851 DaemonError::MemoryBudgetExceeded {
1852 limit_bytes,
1853 requested_bytes,
1854 ..
1855 } => {
1856 assert_eq!(limit_bytes, 1024 * 1024);
1857 assert_eq!(requested_bytes, 10 * 1024 * 1024);
1858 }
1859 other => panic!("wrong error variant: {other:?}"),
1860 }
1861 assert_eq!(
1862 mgr.admission.lock().reserved_bytes,
1863 0,
1864 "a rejected reservation must not mutate admission state",
1865 );
1866 }
1867
1868 #[test]
1869 fn reserve_rebuild_rejects_when_running_total_would_exceed_budget() {
1870 let mgr = WorkspaceManager::new_without_reaper(make_config());
1871 let key = WorkspaceKey::new(
1872 PathBuf::from("/repos/example"),
1873 ProjectRootMode::GitRoot,
1874 0x1,
1875 );
1876 register_workspace(&mgr, &key);
1877 let a = mgr.reserve_rebuild(&key, 600_000).expect("first fits");
1878 let err = mgr
1879 .reserve_rebuild(&key, 600_000)
1880 .expect_err("second pushes over 1 MiB budget");
1881 match err {
1882 DaemonError::MemoryBudgetExceeded { reserved_bytes, .. } => {
1883 assert_eq!(reserved_bytes, 600_000, "first reservation still held");
1884 }
1885 other => panic!("wrong error variant: {other:?}"),
1886 }
1887 drop(a);
1888 }
1889
1890 #[test]
1891 fn reserve_rebuild_rejects_unknown_key() {
1892 // Task 7 Phase 7b1: unregistered keys must be rejected with
1893 // WorkspaceEvicted instead of succeeding. Prevents publishing
1894 // into an orphaned LoadedWorkspace after a race with eviction.
1895 let mgr = WorkspaceManager::new_without_reaper(make_config());
1896 let key = WorkspaceKey::new(
1897 PathBuf::from("/repos/never-registered"),
1898 ProjectRootMode::GitRoot,
1899 0xDEAD,
1900 );
1901 let err = mgr
1902 .reserve_rebuild(&key, 100_000)
1903 .expect_err("unknown key must surface WorkspaceEvicted");
1904 match err {
1905 DaemonError::WorkspaceEvicted { root } => {
1906 assert_eq!(root, PathBuf::from("/repos/never-registered"));
1907 }
1908 other => panic!("wrong error variant: {other:?}"),
1909 }
1910 assert_eq!(
1911 mgr.admission.lock().reserved_bytes,
1912 0,
1913 "a rejected reservation must not mutate admission state",
1914 );
1915 }
1916
1917 #[test]
1918 fn reserve_rebuild_rejects_cancelled_workspace() {
1919 // Task 7 Phase 7b1: a workspace whose `rebuild_cancelled` flag
1920 // is set (by `execute_eviction`) must be rejected even if still
1921 // present in the map (the two mutations run under the same
1922 // `workspaces.write()` scope, but defensive reads should catch
1923 // either signal).
1924 let mgr = WorkspaceManager::new_without_reaper(make_config());
1925 let key = WorkspaceKey::new(
1926 PathBuf::from("/repos/cancelled"),
1927 ProjectRootMode::GitRoot,
1928 0xCAFE,
1929 );
1930 let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
1931 ws.rebuild_cancelled.store(true, Ordering::Release);
1932 mgr.workspaces.write().insert(key.clone(), ws);
1933
1934 let err = mgr
1935 .reserve_rebuild(&key, 100_000)
1936 .expect_err("cancelled workspace must surface WorkspaceEvicted");
1937 match err {
1938 DaemonError::WorkspaceEvicted { root } => {
1939 assert_eq!(root, PathBuf::from("/repos/cancelled"));
1940 }
1941 other => panic!("wrong error variant: {other:?}"),
1942 }
1943 }
1944
1945 #[test]
1946 fn publish_and_retain_moves_bytes_and_retains_old_arc() {
1947 let mgr = WorkspaceManager::new_without_reaper(make_config());
1948 let ws = make_workspace();
1949 mgr.workspaces
1950 .write()
1951 .insert(ws.key.clone(), Arc::clone(&ws));
1952 let reservation = mgr.reserve_rebuild(&ws.key, 100_000).expect("reserve fits");
1953
1954 // Pre-seed workspace memory_bytes so publish exercises the
1955 // loaded-bytes swap (subtract prior, add new).
1956 ws.memory_bytes.store(50_000, Ordering::Release);
1957 mgr.admission.lock().loaded_bytes = 50_000;
1958
1959 let new_graph = CodeGraph::new();
1960 let new_bytes = new_graph.heap_bytes() as u64;
1961 let (token, _published_arc) = mgr.publish_and_retain(reservation, &ws, new_graph);
1962
1963 let state = mgr.admission.lock();
1964 assert_eq!(
1965 state.reserved_bytes, 0,
1966 "reservation bytes must drain on publish"
1967 );
1968 assert_eq!(
1969 state.loaded_bytes, new_bytes,
1970 "loaded_bytes = prior(50k) - prior(50k) + new(heap_bytes())",
1971 );
1972 assert_eq!(state.retained_old.len(), 1, "exactly one retained entry");
1973 let retained = state.retained_old.get(&token).expect("token present");
1974 assert_eq!(
1975 retained.bytes, 50_000,
1976 "retained bytes is the prior workspace memory_bytes",
1977 );
1978 assert_eq!(
1979 Arc::strong_count(&retained.graph),
1980 1,
1981 "admission map is the sole holder of the old Arc after publish",
1982 );
1983 }
1984
1985 #[test]
1986 fn rollback_guard_restores_workspace_on_panic_path() {
1987 // Synthesise the exact field layout publish_and_retain sets up
1988 // so the guard's Drop behaviour can be exercised directly,
1989 // without the heavy publish path.
1990 let ws = make_workspace();
1991 let old_graph = Arc::new(CodeGraph::new());
1992 ws.graph.store(Arc::clone(&old_graph));
1993 ws.memory_bytes.store(10_000, Ordering::Release);
1994
1995 {
1996 let mut guard = RollbackGuard {
1997 ws: &ws,
1998 prior_arc: Some(Arc::clone(&old_graph)),
1999 prior_bytes: 10_000,
2000 armed: true,
2001 };
2002
2003 // Simulate a partial publish: swap the ArcSwap + memory_bytes.
2004 let stomped = Arc::new(CodeGraph::new());
2005 ws.graph.store(Arc::clone(&stomped));
2006 ws.memory_bytes.store(99_999, Ordering::Release);
2007
2008 // `armed == true` so the guard reverses both fields on drop.
2009 // Flip the disarm check intentionally OFF — mimics panic path.
2010 let _ = &mut guard;
2011 }
2012
2013 // After the guard drops, both fields must match the prior.
2014 let restored = ws.graph.load_full();
2015 assert!(Arc::ptr_eq(&restored, &old_graph));
2016 assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 10_000);
2017 }
2018
2019 #[test]
2020 fn rollback_guard_disarmed_is_noop() {
2021 let ws = make_workspace();
2022 let old_graph = Arc::new(CodeGraph::new());
2023 ws.graph.store(Arc::clone(&old_graph));
2024 ws.memory_bytes.store(10_000, Ordering::Release);
2025
2026 {
2027 let mut guard = RollbackGuard {
2028 ws: &ws,
2029 prior_arc: Some(Arc::clone(&old_graph)),
2030 prior_bytes: 10_000,
2031 armed: true,
2032 };
2033 let stomped = Arc::new(CodeGraph::new());
2034 ws.graph.store(Arc::clone(&stomped));
2035 ws.memory_bytes.store(99_999, Ordering::Release);
2036
2037 // Success path disarms the guard.
2038 guard.armed = false;
2039 }
2040
2041 // State must stay "stomped" — the guard was disarmed.
2042 assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 99_999);
2043 }
2044
2045 #[test]
2046 fn reap_once_drops_last_holder_entries() {
2047 let mgr = WorkspaceManager::new_without_reaper(make_config());
2048 let ws = make_workspace();
2049 mgr.workspaces
2050 .write()
2051 .insert(ws.key.clone(), Arc::clone(&ws));
2052 let reservation = mgr
2053 .reserve_rebuild(&ws.key, 0)
2054 .expect("zero-size reservation always fits");
2055 // Publish-and-retain with a fresh empty graph; the old graph
2056 // becomes retained.
2057 mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
2058 assert_eq!(mgr.admission.lock().retained_old.len(), 1);
2059
2060 // No query holds the old Arc, so the next reap tick frees it.
2061 mgr.reap_once();
2062 assert_eq!(
2063 mgr.admission.lock().retained_old.len(),
2064 0,
2065 "reaper must free entries whose strong_count == 1",
2066 );
2067 }
2068
2069 #[test]
2070 fn reap_once_retains_entries_with_outstanding_holders() {
2071 let mgr = WorkspaceManager::new_without_reaper(make_config());
2072 let ws = make_workspace();
2073 mgr.workspaces
2074 .write()
2075 .insert(ws.key.clone(), Arc::clone(&ws));
2076 let reservation = mgr
2077 .reserve_rebuild(&ws.key, 0)
2078 .expect("zero-size reservation always fits");
2079 mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
2080
2081 // Simulate a slow query holding the retained Arc.
2082 let held = {
2083 let state = mgr.admission.lock();
2084 let token = *state.retained_old.keys().next().expect("one entry");
2085 Arc::clone(&state.retained_old.get(&token).unwrap().graph)
2086 };
2087 assert_eq!(Arc::strong_count(&held), 2);
2088
2089 mgr.reap_once();
2090 assert_eq!(
2091 mgr.admission.lock().retained_old.len(),
2092 1,
2093 "reaper must not drop entries that slow queries still hold",
2094 );
2095 drop(held);
2096
2097 mgr.reap_once();
2098 assert_eq!(
2099 mgr.admission.lock().retained_old.len(),
2100 0,
2101 "reaper frees the entry once the last slow query releases",
2102 );
2103 }
2104
2105 #[test]
2106 fn unconsumed_reservation_refunds_reserved_bytes_on_drop() {
2107 // Regression for Codex Task 6 Phase 6a iter-1 MAJOR:
2108 // if a rebuild panics *between* `reserve_rebuild` and the
2109 // admission-mutex section of `publish_and_retain`, the
2110 // reservation's Drop must refund `reserved_bytes` back to
2111 // the admission pool. A pre-fix bug disarmed the reservation
2112 // too early and leaked bytes on any unwind path.
2113 let mgr = WorkspaceManager::new_without_reaper(make_config());
2114 let ws = make_workspace();
2115 mgr.workspaces
2116 .write()
2117 .insert(ws.key.clone(), Arc::clone(&ws));
2118 let reservation = mgr
2119 .reserve_rebuild(&ws.key, 250_000)
2120 .expect("reservation fits");
2121 assert_eq!(mgr.admission.lock().reserved_bytes, 250_000);
2122
2123 // Simulate a rebuild that panics after reservation but
2124 // before publish by letting the reservation drop on the
2125 // unwind-equivalent code path (explicit drop here; the
2126 // RAII guard fires the same way under `catch_unwind`).
2127 drop(reservation);
2128
2129 assert_eq!(
2130 mgr.admission.lock().reserved_bytes,
2131 0,
2132 "unconsumed reservation must refund reserved_bytes on drop \
2133 (Codex Task 6 Phase 6a iter-1 MAJOR regression)",
2134 );
2135 }
2136
2137 #[test]
2138 fn publish_and_retain_leaves_reservation_fully_disarmed_on_success() {
2139 // Companion to the refund regression: once publish_and_retain
2140 // completes successfully, the reservation must be disarmed —
2141 // otherwise its Drop at scope-exit would double-refund and
2142 // corrupt admission state.
2143 let mgr = WorkspaceManager::new_without_reaper(make_config());
2144 let ws = make_workspace();
2145 mgr.workspaces
2146 .write()
2147 .insert(ws.key.clone(), Arc::clone(&ws));
2148 let reservation = mgr
2149 .reserve_rebuild(&ws.key, 100_000)
2150 .expect("reservation fits");
2151 let admission_before = mgr.admission.lock().reserved_bytes;
2152 assert_eq!(admission_before, 100_000);
2153
2154 // Drive the full commit path. After this returns the
2155 // reservation is already moved into the function, so we can
2156 // only observe the *absence* of any stray refund.
2157 let (_token, _published_arc) = mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
2158 let admission_after = mgr.admission.lock().reserved_bytes;
2159 assert_eq!(
2160 admission_after, 0,
2161 "publish must drain reserved_bytes exactly once, not double-drain or leak",
2162 );
2163
2164 // A fresh reservation should see headroom = budget - loaded - retained;
2165 // if the previous publish leaked reserved_bytes this would fail.
2166 let again = mgr
2167 .reserve_rebuild(&ws.key, 100_000)
2168 .expect("post-publish admission must still admit a same-size reservation");
2169 drop(again);
2170 assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2171 }
2172
2173 #[test]
2174 fn unwind_after_swap_before_admission_commit_restores_full_state() {
2175 // Regression for Codex Task 6 Phase 6a iter-2 MAJOR:
2176 // simulate a panic *between* the ArcSwap swap and the
2177 // admission mutex acquisition. After unwind, the admission
2178 // state must be exactly pre-call: reserved_bytes refunded,
2179 // loaded_bytes untouched, retained_old empty, workspace.graph
2180 // and workspace.memory_bytes restored to their prior values.
2181 //
2182 // We can't inject a panic into the real `publish_and_retain`
2183 // without mocking the allocator, so we reproduce the exact
2184 // Drop-order interaction using the public types: build a
2185 // RollbackGuard + RebuildReservation in the same geometry as
2186 // the real function, run `catch_unwind` over the non-
2187 // recoverable zone, and panic inside it.
2188 use std::panic::{AssertUnwindSafe, catch_unwind};
2189
2190 let mgr = WorkspaceManager::new_without_reaper(make_config());
2191 let ws = Arc::new(LoadedWorkspace::new(
2192 WorkspaceKey::new(
2193 PathBuf::from("/repos/example"),
2194 ProjectRootMode::GitRoot,
2195 0x1,
2196 ),
2197 false,
2198 ));
2199 mgr.workspaces
2200 .write()
2201 .insert(ws.key.clone(), Arc::clone(&ws));
2202
2203 // Pre-seed workspace bytes so we can observe rollback.
2204 let prior_bytes_usize = 50_000usize;
2205 ws.memory_bytes.store(prior_bytes_usize, Ordering::Release);
2206 mgr.admission.lock().loaded_bytes = 50_000;
2207 let prior_arc = ws.graph.load_full();
2208
2209 // Reserve headroom as the real function does.
2210 let reservation = mgr
2211 .reserve_rebuild(&ws.key, 100_000)
2212 .expect("reservation fits");
2213 assert_eq!(mgr.admission.lock().reserved_bytes, 100_000);
2214
2215 let outcome = catch_unwind(AssertUnwindSafe(|| {
2216 // Mirror `publish_and_retain` up to and INCLUDING the
2217 // ArcSwap swap + update_memory, then panic *before* we
2218 // would have acquired the admission mutex. This is the
2219 // exact unwind window the iter-2 finding describes.
2220 let new_arc = Arc::new(CodeGraph::new());
2221 let prior_arc_clone = ws.graph.load_full();
2222 // The guard is armed and has no visible use after this
2223 // point; its Drop is the entire reason the scope exists,
2224 // so the binding is deliberately underscore-prefixed and
2225 // held until the panic unwinds the stack.
2226 let _rollback = RollbackGuard {
2227 ws: &ws,
2228 prior_arc: Some(prior_arc_clone),
2229 prior_bytes: prior_bytes_usize,
2230 armed: true,
2231 };
2232 let _old_arc = ws.graph.swap(new_arc);
2233 let _prev = ws.update_memory(99_999);
2234
2235 // Hand the reservation into the scope so its Drop fires
2236 // on unwind if we never disarm it — which we won't.
2237 let _hold = reservation;
2238
2239 // Simulate the panic site (e.g. retained_old.insert OOM).
2240 panic!("simulated panic inside publish_and_retain");
2241 }));
2242 assert!(outcome.is_err(), "catch_unwind must observe the panic");
2243
2244 // Post-unwind assertions — every piece of admission state and
2245 // every observable piece of workspace state must match the
2246 // pre-call snapshot exactly.
2247 let restored = ws.graph.load_full();
2248 assert!(
2249 Arc::ptr_eq(&restored, &prior_arc),
2250 "RollbackGuard must restore ws.graph to the prior Arc after unwind",
2251 );
2252 assert_eq!(
2253 ws.memory_bytes.load(Ordering::Acquire),
2254 prior_bytes_usize,
2255 "RollbackGuard must restore ws.memory_bytes after unwind",
2256 );
2257 let state = mgr.admission.lock();
2258 assert_eq!(
2259 state.reserved_bytes, 0,
2260 "reservation refund must return reserved_bytes to pre-call value (0)",
2261 );
2262 assert_eq!(
2263 state.loaded_bytes, 50_000,
2264 "loaded_bytes must not be mutated when admission commit is never entered",
2265 );
2266 assert_eq!(
2267 state.retained_old.len(),
2268 0,
2269 "retained_old must be empty when admission commit is never entered",
2270 );
2271 }
2272
2273 // --- Phase 6b: lifecycle primitives --------------------------
2274
2275 fn make_key_at(path: &str, fingerprint: u64) -> WorkspaceKey {
2276 WorkspaceKey::new(PathBuf::from(path), ProjectRootMode::GitRoot, fingerprint)
2277 }
2278
2279 #[test]
2280 fn get_or_load_builds_on_miss_and_caches() {
2281 let mgr = WorkspaceManager::new_without_reaper(make_config());
2282 let key = make_key_at("/repos/example", 0x1);
2283 let builder = super::super::builder::EmptyGraphBuilder;
2284
2285 let g1 = mgr
2286 .get_or_load(&key, &builder, 1_000)
2287 .expect("first load succeeds");
2288 let g2 = mgr
2289 .get_or_load(&key, &builder, 1_000)
2290 .expect("second load hits cache");
2291 assert!(
2292 Arc::ptr_eq(&g1, &g2),
2293 "cache hit must return the same Arc as the initial build",
2294 );
2295 }
2296
2297 #[test]
2298 fn get_or_load_surfaces_builder_failures_and_sets_failed_state() {
2299 let mgr = WorkspaceManager::new_without_reaper(make_config());
2300 let key = make_key_at("/repos/example", 0x1);
2301 let failing = super::super::builder::FailingGraphBuilder::new("simulated plugin panic");
2302
2303 let err = mgr
2304 .get_or_load(&key, &failing, 1_000)
2305 .expect_err("builder failure must bubble up");
2306 match err {
2307 DaemonError::WorkspaceBuildFailed { reason, .. } => {
2308 assert_eq!(reason, "simulated plugin panic");
2309 }
2310 other => panic!("wrong variant: {other:?}"),
2311 }
2312
2313 // Workspace should be in Failed state with retry_count==1.
2314 let workspaces = mgr.workspaces.read();
2315 let ws = workspaces.get(&key).expect("workspace registered");
2316 assert_eq!(ws.load_state(), WorkspaceState::Failed);
2317 assert_eq!(ws.retry_count.load(Ordering::Acquire), 1);
2318 assert!(ws.last_error.read().is_some());
2319 drop(workspaces);
2320
2321 // Admission state must NOT have leaked the reservation —
2322 // RebuildReservation's Drop fires on the error path.
2323 assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2324 }
2325
2326 #[test]
2327 fn evict_lru_picks_oldest_non_pinned_workspace() {
2328 let mgr = WorkspaceManager::new_without_reaper(make_config());
2329 let builder = super::super::builder::EmptyGraphBuilder;
2330
2331 let a = make_key_at("/repos/a", 0x1);
2332 let b = make_key_at("/repos/b", 0x1);
2333 mgr.get_or_load(&a, &builder, 100_000).unwrap();
2334 std::thread::sleep(Duration::from_millis(5));
2335 mgr.get_or_load(&b, &builder, 100_000).unwrap();
2336
2337 // `a` was touched first, so it should be the LRU victim.
2338 let victim = mgr.evict_lru().expect("one candidate");
2339 assert_eq!(victim, a, "oldest workspace must be evicted first");
2340 // STEP_6 iter-2 contract change: LRU eviction keeps the
2341 // tombstone in the map (state == Evicted) so partial-
2342 // eviction reporting via `daemon/workspaceStatus` can
2343 // still surface the source root. Only `unload` removes
2344 // the entry.
2345 let workspaces = mgr.workspaces.read();
2346 let evicted_ws = workspaces
2347 .get(&a)
2348 .expect("LRU victim stays as tombstone in the manager map");
2349 assert_eq!(
2350 evicted_ws.load_state(),
2351 WorkspaceState::Evicted,
2352 "LRU victim must transition to Evicted, not be removed",
2353 );
2354 assert!(
2355 workspaces.contains_key(&b),
2356 "non-victim workspace must remain",
2357 );
2358 }
2359
2360 #[test]
2361 fn evict_lru_returns_none_when_no_candidates() {
2362 let mgr = WorkspaceManager::new_without_reaper(make_config());
2363 assert!(
2364 mgr.evict_lru().is_none(),
2365 "empty manager has no eviction candidate",
2366 );
2367 }
2368
2369 #[test]
2370 fn evict_lru_skips_pinned_workspaces() {
2371 let mgr = WorkspaceManager::new_without_reaper(make_config());
2372 let builder = super::super::builder::EmptyGraphBuilder;
2373 let pinned_key = make_key_at("/repos/pinned", 0x1);
2374
2375 // Insert a pinned workspace by manually constructing + registering.
2376 {
2377 let mut ws_map = mgr.workspaces.write();
2378 ws_map.insert(
2379 pinned_key.clone(),
2380 Arc::new(LoadedWorkspace::new(
2381 pinned_key.clone(),
2382 /*pinned*/ true,
2383 )),
2384 );
2385 }
2386 // And drive it into Loaded state via a no-op publish.
2387 {
2388 let ws = mgr.workspaces.read().get(&pinned_key).unwrap().clone();
2389 ws.store_state(WorkspaceState::Loaded);
2390 ws.touch();
2391 }
2392
2393 // Plus a regular unpinned workspace.
2394 let other = make_key_at("/repos/other", 0x1);
2395 mgr.get_or_load(&other, &builder, 100_000).unwrap();
2396
2397 // Evict should pick `other`, not the pinned one.
2398 let victim = mgr.evict_lru().expect("one candidate");
2399 assert_eq!(victim, other);
2400 assert!(mgr.workspaces.read().contains_key(&pinned_key));
2401 }
2402
2403 #[test]
2404 fn unload_removes_workspace_and_reclaims_bytes() {
2405 let mgr = WorkspaceManager::new_without_reaper(make_config());
2406 let builder = super::super::builder::EmptyGraphBuilder;
2407 let key = make_key_at("/repos/example", 0x1);
2408 mgr.get_or_load(&key, &builder, 100_000).unwrap();
2409 assert!(mgr.workspaces.read().contains_key(&key));
2410
2411 assert!(mgr.unload(&key), "unload must report present");
2412 assert!(!mgr.workspaces.read().contains_key(&key));
2413
2414 assert!(!mgr.unload(&key), "unload on missing key returns false");
2415 }
2416
2417 #[test]
2418 fn status_reflects_loaded_workspaces_and_memory() {
2419 let mgr = WorkspaceManager::new_without_reaper(make_config());
2420 let builder = super::super::builder::EmptyGraphBuilder;
2421 let key = make_key_at("/repos/example", 0x1);
2422 mgr.get_or_load(&key, &builder, 100_000).unwrap();
2423
2424 let status = mgr.status();
2425 assert_eq!(status.daemon_version, env!("CARGO_PKG_VERSION"));
2426 assert_eq!(status.workspaces.len(), 1);
2427 assert_eq!(
2428 status.workspaces[0].index_root,
2429 PathBuf::from("/repos/example")
2430 );
2431 assert_eq!(status.workspaces[0].state, WorkspaceState::Loaded);
2432 assert!(!status.workspaces[0].pinned);
2433 assert_eq!(status.memory.limit_bytes, 1024 * 1024);
2434 // current_bytes is at least as large as the graph (empty here,
2435 // but loaded_bytes tracks an entry regardless).
2436 assert!(
2437 status.memory.high_water_bytes >= status.memory.current_bytes,
2438 "high_water_bytes must be monotonic wrt current_bytes",
2439 );
2440 }
2441
2442 #[test]
2443 fn reserve_rebuild_triggers_eviction_when_budget_tight() {
2444 // Budget is 1 MiB (from make_config). Fill it with a 700 kB
2445 // workspace, then reserve 600 kB — Phase 1 must pick the
2446 // 700 kB workspace as a victim, Phase 2 evicts it, Phase 3
2447 // commits the reservation.
2448 let mgr = WorkspaceManager::new_without_reaper(make_config());
2449 let victim_key = make_key_at("/repos/victim", 0x1);
2450 let victim = Arc::new(LoadedWorkspace::new(victim_key.clone(), false));
2451 victim.memory_bytes.store(700_000, Ordering::Release);
2452 victim.store_state(WorkspaceState::Loaded);
2453 victim.touch();
2454 mgr.workspaces
2455 .write()
2456 .insert(victim_key.clone(), Arc::clone(&victim));
2457 mgr.admission.lock().loaded_bytes = 700_000;
2458
2459 let new_key = make_key_at("/repos/new", 0x1);
2460 mgr.workspaces.write().insert(
2461 new_key.clone(),
2462 Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
2463 );
2464 let reservation = mgr
2465 .reserve_rebuild(&new_key, 600_000)
2466 .expect("Phase 2 eviction must free headroom");
2467 // STEP_6 iter-2 contract: LRU eviction (Phase 2 of
2468 // `reserve_rebuild`) leaves the tombstone in the map.
2469 // The entry is now `Evicted` with `memory_bytes == 0` —
2470 // accounting moved to `retained_old`, but the key stays
2471 // visible to `daemon/workspaceStatus`.
2472 let workspaces = mgr.workspaces.read();
2473 let victim_tombstone = workspaces
2474 .get(&victim_key)
2475 .expect("victim stays as tombstone");
2476 assert_eq!(victim_tombstone.load_state(), WorkspaceState::Evicted);
2477 assert_eq!(
2478 victim_tombstone.memory_bytes.load(Ordering::Acquire),
2479 0,
2480 "evicted tombstone must hold no resident bytes",
2481 );
2482 drop(workspaces);
2483 // Admission reserved the new bytes.
2484 assert_eq!(mgr.admission.lock().reserved_bytes, 600_000);
2485 drop(reservation);
2486 }
2487
2488 #[test]
2489 fn reserve_rebuild_rejects_when_only_pinned_workspaces_remain() {
2490 // Budget 1 MiB. Pin a 900 kB workspace. Requesting 600 kB
2491 // cannot evict the pin, so Phase 3 must reject.
2492 let mgr = WorkspaceManager::new_without_reaper(make_config());
2493 let pinned_key = make_key_at("/repos/pinned", 0x1);
2494 let pinned = Arc::new(LoadedWorkspace::new(
2495 pinned_key.clone(),
2496 /*pinned*/ true,
2497 ));
2498 pinned.memory_bytes.store(900_000, Ordering::Release);
2499 pinned.store_state(WorkspaceState::Loaded);
2500 mgr.workspaces
2501 .write()
2502 .insert(pinned_key.clone(), Arc::clone(&pinned));
2503 mgr.admission.lock().loaded_bytes = 900_000;
2504
2505 let new_key = make_key_at("/repos/new", 0x1);
2506 mgr.workspaces.write().insert(
2507 new_key.clone(),
2508 Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
2509 );
2510 let err = mgr
2511 .reserve_rebuild(&new_key, 600_000)
2512 .expect_err("pinned workspace makes budget unfittable");
2513 match err {
2514 DaemonError::MemoryBudgetExceeded {
2515 requested_bytes,
2516 current_bytes,
2517 ..
2518 } => {
2519 assert_eq!(requested_bytes, 600_000);
2520 assert_eq!(
2521 current_bytes, 900_000,
2522 "pinned workspace bytes still count after Phase 2",
2523 );
2524 }
2525 other => panic!("wrong variant: {other:?}"),
2526 }
2527 // Pinned workspace must still be present.
2528 assert!(mgr.workspaces.read().contains_key(&pinned_key));
2529 }
2530
2531 #[test]
2532 fn execute_eviction_routes_bytes_through_retained_old() {
2533 // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #1:
2534 // eviction previously dropped the evicted Arc without
2535 // inserting a retained entry, leaking bytes if a slow
2536 // query still held the graph.
2537 let mgr = WorkspaceManager::new_without_reaper(make_config());
2538 let ws_key = make_key_at("/repos/example", 0x1);
2539 let ws = Arc::new(LoadedWorkspace::new(ws_key.clone(), false));
2540 ws.memory_bytes.store(300_000, Ordering::Release);
2541 ws.store_state(WorkspaceState::Loaded);
2542 mgr.workspaces
2543 .write()
2544 .insert(ws_key.clone(), Arc::clone(&ws));
2545 mgr.admission.lock().loaded_bytes = 300_000;
2546
2547 // Pin the current graph Arc via a simulated slow query
2548 // holder so the retained entry stays past the first reap.
2549 let slow_query_arc = ws.graph.load_full();
2550
2551 mgr.execute_eviction(&ws_key);
2552
2553 let state = mgr.admission.lock();
2554 assert_eq!(
2555 state.loaded_bytes, 0,
2556 "evicted workspace bytes must leave the loaded tier",
2557 );
2558 assert_eq!(
2559 state.retained_total_bytes(),
2560 300_000,
2561 "evicted workspace bytes must enter the retained tier",
2562 );
2563 assert_eq!(state.retained_old.len(), 1);
2564 drop(state);
2565
2566 // The slow query still holds the Arc. A reap does NOT free
2567 // yet — §G.5 is preserved until strong_count == 1.
2568 mgr.reap_once();
2569 assert_eq!(mgr.admission.lock().retained_total_bytes(), 300_000);
2570
2571 // Once the slow query releases, the next reap frees bytes.
2572 drop(slow_query_arc);
2573 mgr.reap_once();
2574 assert_eq!(
2575 mgr.admission.lock().retained_total_bytes(),
2576 0,
2577 "reaper must free retained entry once slow query releases",
2578 );
2579 }
2580
2581 #[test]
2582 fn get_or_load_state_cas_rejects_concurrent_load() {
2583 // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #2:
2584 // two loaders must not both run the slow path. The state
2585 // CAS gates exactly one winner.
2586 let mgr = WorkspaceManager::new_without_reaper(make_config());
2587 let key = make_key_at("/repos/example", 0x1);
2588 let ws = mgr.get_or_insert_workspace(&key);
2589 // Simulate another loader holding the gate.
2590 ws.store_state(WorkspaceState::Loading);
2591
2592 let builder = super::super::builder::EmptyGraphBuilder;
2593 let err = mgr
2594 .get_or_load(&key, &builder, 1_000)
2595 .expect_err("concurrent load must be rejected");
2596 match err {
2597 DaemonError::WorkspaceBuildFailed { reason, .. } => {
2598 assert!(
2599 reason.contains("already in progress"),
2600 "unexpected reason: {reason}",
2601 );
2602 }
2603 other => panic!("wrong variant: {other:?}"),
2604 }
2605
2606 // Restore state so Drop order is clean; sanity-check that
2607 // the admission state was not mutated by the rejected call.
2608 assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2609 }
2610
2611 #[test]
2612 fn get_or_load_detects_cancellation_between_cas_and_publish() {
2613 // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #2
2614 // (cancellation-detection subcase): if rebuild_cancelled was
2615 // set before our CAS — i.e. evict raced in front of us on
2616 // the prior state — get_or_load must honour the signal
2617 // instead of clobbering it and publishing into an evicted
2618 // workspace.
2619 let mgr = WorkspaceManager::new_without_reaper(make_config());
2620 let key = make_key_at("/repos/example", 0x1);
2621 let ws = mgr.get_or_insert_workspace(&key);
2622 // Simulate "evict ran on an earlier state but left the
2623 // workspace in the map": cancellation flag set, state
2624 // Unloaded (so CAS succeeds).
2625 ws.rebuild_cancelled.store(true, Ordering::Release);
2626 ws.store_state(WorkspaceState::Unloaded);
2627
2628 let builder = super::super::builder::EmptyGraphBuilder;
2629 let err = mgr
2630 .get_or_load(&key, &builder, 1_000)
2631 .expect_err("pre-CAS cancellation must be honoured");
2632 match err {
2633 DaemonError::WorkspaceBuildFailed { reason, .. } => {
2634 assert!(
2635 reason.contains("evicted mid-load"),
2636 "unexpected reason: {reason}",
2637 );
2638 }
2639 other => panic!("wrong variant: {other:?}"),
2640 }
2641 // rebuild_cancelled must still be true (we didn't clobber).
2642 assert!(ws.rebuild_cancelled.load(Ordering::Acquire));
2643 assert_eq!(ws.load_state(), WorkspaceState::Failed);
2644 }
2645
2646 #[test]
2647 fn get_or_load_loading_guard_recovers_from_builder_panic() {
2648 // Regression for Codex Task 6 Phase 6b iter-1 MAJOR #3:
2649 // a panic from builder.build must not leave the workspace
2650 // stuck in Loading with last_error unset.
2651 use std::panic::{AssertUnwindSafe, catch_unwind};
2652
2653 #[derive(Debug)]
2654 struct PanickingBuilder;
2655 impl WorkspaceBuilder for PanickingBuilder {
2656 fn build(&self, _root: &Path) -> Result<CodeGraph, DaemonError> {
2657 panic!("simulated builder panic");
2658 }
2659 }
2660
2661 let mgr = WorkspaceManager::new_without_reaper(make_config());
2662 let key = make_key_at("/repos/example", 0x1);
2663 let builder = PanickingBuilder;
2664
2665 let outcome = catch_unwind(AssertUnwindSafe(|| {
2666 let _ = mgr.get_or_load(&key, &builder, 1_000);
2667 }));
2668 assert!(outcome.is_err(), "panic must propagate through get_or_load");
2669
2670 let workspaces = mgr.workspaces.read();
2671 let ws = workspaces.get(&key).expect("workspace still registered");
2672 assert_eq!(
2673 ws.load_state(),
2674 WorkspaceState::Failed,
2675 "LoadingGuard must transition Loading → Failed on unwind",
2676 );
2677 assert!(
2678 ws.last_error.read().is_some(),
2679 "LoadingGuard must populate last_error on unwind",
2680 );
2681 assert!(
2682 ws.retry_count.load(Ordering::Acquire) >= 1,
2683 "LoadingGuard must increment retry_count",
2684 );
2685 drop(workspaces);
2686
2687 // Admission: the RebuildReservation Drop on unwind refunds
2688 // reserved_bytes, so the state is clean.
2689 assert_eq!(mgr.admission.lock().reserved_bytes, 0);
2690 }
2691
2692 #[test]
2693 fn concurrent_load_and_evict_never_publishes_into_evicted_workspace() {
2694 // Regression for Codex Task 6 Phase 6b iter-2 MAJOR:
2695 // the post-build re-check was not atomic with
2696 // `publish_and_retain`. A concurrent eviction could slip
2697 // in between the re-check and the publish, so we'd end
2698 // up accounting bytes for an evicted workspace.
2699 //
2700 // Stress test: run many iterations of `get_or_load` and
2701 // `execute_eviction` concurrently; every iteration
2702 // should leave the admission state consistent (§G.5),
2703 // the workspace either fully loaded or fully evicted,
2704 // and never in a half-committed "loaded_bytes points at
2705 // a graph that isn't in the map" state.
2706 use std::sync::Barrier;
2707 use std::thread;
2708
2709 const ITERATIONS: usize = 64;
2710 for iter in 0..ITERATIONS {
2711 let mgr = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig {
2712 memory_limit_mb: 64,
2713 ..DaemonConfig::default()
2714 }));
2715 let key = make_key_at("/repos/example", iter as u64);
2716 let builder = Arc::new(super::super::builder::EmptyGraphBuilder);
2717
2718 let start = Arc::new(Barrier::new(2));
2719 let mgr_clone = Arc::clone(&mgr);
2720 let key_clone = key.clone();
2721 let builder_clone = Arc::clone(&builder);
2722 let start_load = Arc::clone(&start);
2723 let loader = thread::spawn(move || {
2724 start_load.wait();
2725 // Intentionally ignore the result — either success
2726 // or failure is valid; we assert post-hoc invariants.
2727 let _ = mgr_clone.get_or_load(&key_clone, &*builder_clone, 100_000);
2728 });
2729
2730 let mgr_clone = Arc::clone(&mgr);
2731 let key_clone = key.clone();
2732 let start_evict = Arc::clone(&start);
2733 let evictor = thread::spawn(move || {
2734 start_evict.wait();
2735 // Run unload against the same key; either it races
2736 // ahead of the loader (no-op), or evicts after the
2737 // loader publishes.
2738 mgr_clone.unload(&key_clone);
2739 });
2740
2741 loader.join().expect("loader panicked");
2742 evictor.join().expect("evictor panicked");
2743
2744 // Post-hoc invariants:
2745 // 1. The workspace is either Loaded AND in the map, or
2746 // not in the map at all. No "evicted-but-in-map"
2747 // intermediate state.
2748 // 2. Admission state is consistent: loaded_bytes +
2749 // reserved_bytes + retained_total is whatever it is,
2750 // but reserved_bytes must be zero (no in-flight
2751 // reservations) and the invariant must hold as
2752 // evidenced by positive counters.
2753 let workspaces = mgr.workspaces.read();
2754 if let Some(ws) = workspaces.get(&key) {
2755 assert_eq!(
2756 ws.load_state(),
2757 WorkspaceState::Loaded,
2758 "iter {iter}: workspace in map must be Loaded, not {}",
2759 ws.load_state(),
2760 );
2761 }
2762 drop(workspaces);
2763
2764 let state = mgr.admission.lock();
2765 assert_eq!(
2766 state.reserved_bytes, 0,
2767 "iter {iter}: no reservations should leak after the race"
2768 );
2769 // §G.5 is intrinsically maintained by the arithmetic
2770 // operations; assert the totals are non-negative and
2771 // fit the budget.
2772 assert!(
2773 state.total_committed_bytes() <= mgr.memory_limit_bytes(),
2774 "iter {iter}: total_committed {} over budget {}",
2775 state.total_committed_bytes(),
2776 mgr.memory_limit_bytes(),
2777 );
2778 }
2779 }
2780
2781 #[test]
2782 fn publish_fires_installed_hook() {
2783 // Phase 6c iter-2: `get_or_load` must invoke the installed
2784 // SqrydHook once the admission commit succeeds AND after
2785 // releasing `workspaces_guard`. This test drives the full
2786 // load path end-to-end so the fix (moving the hook out of
2787 // `publish_and_retain` and into the caller, outside every
2788 // workspaces-lock holder) is exercised — not just the raw
2789 // `publish_and_retain` critical section.
2790 let mgr = WorkspaceManager::new_without_reaper(make_config());
2791 let hook = super::super::hook::RecordingHook::new();
2792 mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
2793
2794 let key = make_key_at("/repos/example", 0x1);
2795 let builder = super::super::builder::EmptyGraphBuilder;
2796 mgr.get_or_load(&key, &builder, 0)
2797 .expect("load on empty builder succeeds");
2798
2799 assert_eq!(
2800 hook.invocation_count(),
2801 1,
2802 "hook must fire exactly once per publish",
2803 );
2804 assert_eq!(
2805 hook.invocation_roots(),
2806 vec![key.source_root.clone()],
2807 "hook must receive the workspace's index_root",
2808 );
2809 }
2810
2811 #[test]
2812 fn set_hook_replaces_prior_hook_for_subsequent_publishes() {
2813 // Phase 6c iter-2: install hook A, load, evict, install
2814 // hook B, load again. Hook A sees one invocation; hook B
2815 // sees one. Driving through `get_or_load` exercises the
2816 // post-`workspaces_guard`-drop dispatch path the iter-2
2817 // fix added.
2818 let mgr = WorkspaceManager::new_without_reaper(make_config());
2819 let hook_a = super::super::hook::RecordingHook::new();
2820 let hook_b = super::super::hook::RecordingHook::new();
2821 let builder = super::super::builder::EmptyGraphBuilder;
2822 let key = make_key_at("/repos/example", 0x1);
2823
2824 mgr.set_hook(Arc::clone(&hook_a) as super::super::hook::SharedHook);
2825 mgr.get_or_load(&key, &builder, 0)
2826 .expect("first load with hook A");
2827
2828 // Evict so the next `get_or_load` rebuilds and re-publishes
2829 // rather than hitting the Loaded-state cache fast path.
2830 mgr.unload(&key);
2831
2832 mgr.set_hook(Arc::clone(&hook_b) as super::super::hook::SharedHook);
2833 mgr.get_or_load(&key, &builder, 0)
2834 .expect("second load with hook B");
2835
2836 assert_eq!(hook_a.invocation_count(), 1);
2837 assert_eq!(hook_b.invocation_count(), 1);
2838 }
2839
2840 #[test]
2841 fn hook_can_call_manager_unload_without_deadlock() {
2842 // Regression for Codex Task 6 Phase 6c iter-1 MAJOR: the
2843 // hook must fire OUTSIDE the `workspaces.read()` guard
2844 // that `get_or_load` holds across `publish_and_retain`,
2845 // so a hook impl that calls back into `manager.unload(key)`
2846 // — which acquires `workspaces.write()` inside
2847 // `execute_eviction` — must NOT deadlock against the
2848 // loader that fired it.
2849 //
2850 // Pre-fix: the hook dispatched from inside
2851 // `publish_and_retain` under the caller's
2852 // `workspaces.read()` guard, so the re-entrant
2853 // `workspaces.write()` in `unload` would block forever.
2854 //
2855 // We run the load on a background thread and fail the
2856 // test if the thread is still alive after a generous
2857 // timeout — that turns any deadlock regression into a
2858 // deterministic failure rather than a stuck runner.
2859 use std::{sync::Weak, thread, time::Duration};
2860
2861 #[derive(Debug)]
2862 struct UnloadingHook {
2863 manager: Weak<WorkspaceManager>,
2864 key: WorkspaceKey,
2865 }
2866
2867 impl super::super::hook::SqrydHook for UnloadingHook {
2868 fn on_publish(&self, _workspace_root: &Path, _graph: Arc<CodeGraph>) {
2869 if let Some(mgr) = self.manager.upgrade() {
2870 // If the iter-2 fix regressed and this fires
2871 // under `workspaces.read()`, the `.write()`
2872 // inside `execute_eviction` deadlocks here
2873 // and the test's join timeout triggers below.
2874 let _present = mgr.unload(&self.key);
2875 }
2876 }
2877 }
2878
2879 let mgr = WorkspaceManager::new_without_reaper(make_config());
2880 let key = make_key_at("/repos/example", 0x1);
2881 let builder = super::super::builder::EmptyGraphBuilder;
2882 let hook = Arc::new(UnloadingHook {
2883 manager: Arc::downgrade(&mgr),
2884 key: key.clone(),
2885 });
2886 mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
2887
2888 let mgr_for_thread = Arc::clone(&mgr);
2889 let key_for_thread = key.clone();
2890 let builder_for_thread = builder;
2891 let handle = thread::spawn(move || {
2892 mgr_for_thread
2893 .get_or_load(&key_for_thread, &builder_for_thread, 0)
2894 .expect("load succeeds even with re-entrant hook");
2895 });
2896
2897 let deadline = std::time::Instant::now() + Duration::from_secs(10);
2898 while !handle.is_finished() {
2899 if std::time::Instant::now() > deadline {
2900 panic!(
2901 "get_or_load deadlocked while firing hook \
2902 (Codex Task 6 Phase 6c iter-2 regression: \
2903 hook must dispatch outside workspaces.read())",
2904 );
2905 }
2906 thread::sleep(Duration::from_millis(20));
2907 }
2908 handle
2909 .join()
2910 .expect("loader thread completed without panic");
2911
2912 // Hook's `unload` ran, so the workspace must no longer be
2913 // in the manager map.
2914 assert!(
2915 !mgr.workspaces.read().contains_key(&key),
2916 "hook's re-entrant unload must have removed the workspace",
2917 );
2918 // And the hook observation: it fired exactly once.
2919 // (The hook itself doesn't record invocations; the
2920 // absence-of-workspace assertion above is the positive
2921 // signal that `on_publish` ran to completion.)
2922 }
2923
2924 #[tokio::test]
2925 async fn retention_reaper_task_eventually_drops_free_entries() {
2926 let mgr = WorkspaceManager::new(make_config());
2927 let ws = make_workspace();
2928 mgr.workspaces
2929 .write()
2930 .insert(ws.key.clone(), Arc::clone(&ws));
2931 let reservation = mgr
2932 .reserve_rebuild(&ws.key, 0)
2933 .expect("zero-size reservation always fits");
2934 mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
2935 assert_eq!(mgr.admission.lock().retained_old.len(), 1);
2936
2937 // Reaper ticks every 25 ms; 200 ms is generous.
2938 for _ in 0..20 {
2939 tokio::time::sleep(Duration::from_millis(10)).await;
2940 if mgr.admission.lock().retained_old.is_empty() {
2941 return;
2942 }
2943 }
2944 panic!("reaper task never freed the entry within 200 ms");
2945 }
2946}