sqry_daemon/workspace/loaded.rs
1//! [`LoadedWorkspace`] — per-workspace runtime state.
2//!
3//! Corresponds to Task 6 Step 2 of the sqryd plan, plus the
4//! Amendment-2 additions:
5//!
6//! - `memory_high_water_bytes` (§D) — monotonic peak over the loaded
7//! lifetime; reset only on unload/eviction, never on rebuilds.
8//! - `last_good_at` (§C) — stamped on every successful build; the
9//! stale-serve router uses this to enforce the
10//! `stale_serve_max_age_hours` cap and surface JSON-RPC `-32002`
11//! on expiry.
12//! - `rebuild_cancelled` (§J) — lock-free cancellation signal used by
13//! the dispatcher's background rebuild task to abort at pass
14//! boundaries when the workspace is evicted mid-rebuild.
15//! - `rebuild_lane` (§J) — at most one queued rebuild per workspace.
16//! - `rebuild_in_flight` (§J, Task 7 Phase 7b1) — runner-role gate that
17//! serializes `RebuildDispatcher::handle_changes` callers per
18//! workspace. Transitions happen under `rebuild_lane` on the normal
19//! path; `DrainLoopSentinel` is the sole recovery exception.
20//!
21//! [`ArcSwap<CodeGraph>`] owns the published graph. Queries take
22//! `graph.load_full()` to get a stable `Arc<CodeGraph>` that survives
23//! a concurrent `publish_and_retain` swap; the retention reaper is
24//! responsible for eventually dropping the superseded `Arc`.
25//!
26//! `pinned` workspaces are LRU-exempt. They are still counted against
27//! `memory_bytes` / `total_memory` — a pinned workspace cannot push
28//! the daemon over its budget; admission rejects the load / rebuild
29//! per §G.7 if the only way to fit is to evict the pin itself.
30
31use std::{
32 sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
33 time::{Instant, SystemTime},
34};
35
36use arc_swap::ArcSwap;
37use parking_lot::RwLock;
38use sqry_core::graph::CodeGraph;
39use sqry_core::watch::{ChangeSet, LastIndexedGitState};
40
41use crate::error::DaemonError;
42
43use super::state::{WorkspaceKey, WorkspaceState};
44
45/// Rebuild-lane entry enqueued by the Task 7 `RebuildDispatcher`.
46///
47/// The dispatcher's per-workspace call path (A2 §J) holds at most one
48/// pending rebuild per workspace. When a new `ChangeSet` arrives while
49/// another rebuild is in flight, the two are coalesced via
50/// [`Self::coalesce_with`] — union of changed files, OR of
51/// `git_state_changed`, max of `enqueued_at`, full-rebuild-dominance
52/// merge on `git_change_class`, and absorb-None + later-wins merge on
53/// `git_state_at_enqueue`.
54#[derive(Debug, Clone)]
55pub struct PendingRebuild {
56 /// The coalesced [`ChangeSet`] waiting to be processed. Carries
57 /// every file path observed so far plus the worst-case git-state
58 /// classification across all enqueues.
59 pub changes: ChangeSet,
60 /// Wall-clock `Instant` of the most recent enqueue. Updated to
61 /// `max(prior, incoming)` on every coalesce — A2 §J.2.
62 pub enqueued_at: Instant,
63 /// Git-state snapshot captured by the watcher bridge at the moment
64 /// it received this change (Task 7 Phase 7b2). When the runner
65 /// publishes a graph produced from this `PendingRebuild`, it
66 /// commits this snapshot to
67 /// [`LoadedWorkspace::last_indexed_git_state`] — tying baseline
68 /// advance to actual publish consumption rather than a
69 /// bridge-side proxy counter.
70 ///
71 /// `None` for callers that do not attach a git-state snapshot
72 /// (direct tests, future IPC `workspace/force_rebuild`). The
73 /// runner leaves the baseline untouched when consuming a `None`
74 /// entry.
75 ///
76 /// Merge rule under [`Self::coalesce_with`]: absorb-None from
77 /// either side, later wins when both are `Some`.
78 pub git_state_at_enqueue: Option<LastIndexedGitState>,
79}
80
81/// Per-workspace runtime state owned by the
82/// [`super::WorkspaceManager::workspaces`] map.
83///
84/// Mutating state:
85///
86/// - `graph`, `state`, `memory_bytes`, `memory_high_water_bytes`,
87/// `retry_count`, `rebuild_cancelled` are all atomic — queries and
88/// status readers can observe them without taking a mutex.
89/// - `last_accessed`, `last_error`, `last_good_at` are short-critical-
90/// section `RwLock`s — writers are the dispatcher / router at
91/// publish/fail time.
92/// - `rebuild_lane` is a `tokio::sync::Mutex` per A2 §J.4; the
93/// dispatcher holds it briefly to coalesce pending work.
94///
95/// Construction is expensive only in that [`ArcSwap::new`] allocates
96/// one `Arc<CodeGraph>` up front. A fresh workspace is initialised with
97/// an empty-but-live [`CodeGraph`]; the real graph is installed by the
98/// first `publish_and_retain`.
99#[derive(Debug)]
100pub struct LoadedWorkspace {
101 /// Identity key under which the manager stores this workspace.
102 ///
103 /// Immutable for the lifetime of the workspace; if the config
104 /// fingerprint or root mode change, the workspace is unloaded
105 /// under the old key and freshly loaded under a new key.
106 pub key: WorkspaceKey,
107
108 /// Published graph. Readers call `graph.load_full()`.
109 ///
110 /// Only [`crate::workspace::publish::publish_and_retain`] swaps a
111 /// new graph in; eviction stores a fresh empty graph so the
112 /// ArcSwap remains non-null (simpler than Option-wrapping).
113 pub graph: ArcSwap<CodeGraph>,
114
115 /// Current lifecycle state. Stored as `AtomicU8` to keep the
116 /// status read path lock-free; round-trip via
117 /// [`WorkspaceState::as_u8`] / [`WorkspaceState::from_u8`].
118 pub state: std::sync::atomic::AtomicU8,
119
120 /// Last wall-clock time a query observed this workspace, used by
121 /// LRU eviction (§G.7). Short critical section; `RwLock` keeps
122 /// contention negligible.
123 pub last_accessed: RwLock<Instant>,
124
125 /// Current `heap_bytes` of the published graph. Updated on every
126 /// successful `publish_and_retain`. Reads use `Relaxed` ordering —
127 /// the authoritative aggregate lives on
128 /// [`super::admission::AdmissionState`].
129 pub memory_bytes: AtomicUsize,
130
131 /// Peak `memory_bytes` observed over this workspace's loaded
132 /// lifetime. Per Amendment 2 §D:
133 ///
134 /// > High-water marks are monotonic over the workspace's loaded
135 /// > lifetime — they reset only on unload/eviction (fresh
136 /// > LoadedWorkspace), not on rebuilds or backoff.
137 ///
138 /// Updated via `fetch_max` alongside every `memory_bytes` store.
139 pub memory_high_water_bytes: AtomicUsize,
140
141 /// Whether LRU eviction must skip this workspace.
142 pub pinned: bool,
143
144 /// Most recent build/load error, if any. `None` in the
145 /// [`WorkspaceState::Loaded`] steady state. Populated on transition
146 /// into [`WorkspaceState::Failed`] and read back by the router
147 /// when surfacing `meta.last_error` on stale responses.
148 pub last_error: RwLock<Option<DaemonError>>,
149
150 /// Wall-clock of the most recent successful rebuild. Used by the
151 /// router to compute `age_hours` for the
152 /// `stale_serve_max_age_hours` cap and the JSON-RPC `-32002`
153 /// error `error.data.age_hours` payload.
154 pub last_good_at: RwLock<Option<SystemTime>>,
155
156 /// Count of consecutive failed rebuilds. Drives the exponential
157 /// backoff schedule (§G.7 / plan Step 6: 30s → 60s → 120s → 300s
158 /// → 600s). Reset to 0 on every successful publish.
159 pub retry_count: AtomicU32,
160
161 /// At most one queued rebuild per workspace. `None` when the
162 /// dispatcher's lane is idle. Filled / coalesced by the
163 /// dispatcher (Task 7).
164 pub rebuild_lane: tokio::sync::Mutex<Option<PendingRebuild>>,
165
166 /// Lock-free cancellation signal for in-flight rebuilds. Set by
167 /// `evict_lru` / explicit `unload` and polled by the rebuild
168 /// pipeline at each pass boundary. Once set, the running rebuild
169 /// aborts, drops its `RebuildReservation`, and never publishes.
170 pub rebuild_cancelled: AtomicBool,
171
172 /// Runner-role gate for the per-workspace rebuild serial consumer
173 /// (A2 §J.2, Task 7 Phase 7b1). When `true`, exactly one
174 /// [`crate::RebuildDispatcher::handle_changes`] call is actively
175 /// running the full rebuild pipeline (the Phase B drain loop in
176 /// `rebuild.rs`). A concurrent caller observing `true` under the
177 /// [`Self::rebuild_lane`] lock MUST park its coalesced
178 /// [`PendingRebuild`] in the lane and return `Ok(())` without
179 /// executing — the active runner will drain the lane at its next
180 /// drain-loop iteration.
181 ///
182 /// # Invariant
183 ///
184 /// All normal-path transitions of this flag happen while
185 /// [`Self::rebuild_lane`] is held. `DrainLoopSentinel::drop` in
186 /// `rebuild.rs` is the sole recovery exception — it stores `false`
187 /// without the lane on the unwind path, with the narrow-race
188 /// semantics documented on that type.
189 ///
190 /// # Authorised modifiers
191 ///
192 /// - `RebuildDispatcher::handle_changes` Phase A (`false → true`
193 /// under lane).
194 /// - `RebuildDispatcher::handle_changes` Phase B drain-loop exit
195 /// (`true → false` under lane, when the lane is empty or the
196 /// top-of-loop eviction gate fires).
197 /// - `DrainLoopSentinel::drop` panic-safety path (`true → false`,
198 /// bare atomic store, narrow race).
199 ///
200 /// Nothing in `WorkspaceManager` (`execute_eviction`,
201 /// `publish_and_retain`, the retention reaper) touches this flag.
202 /// It is dispatcher-local coordination, independent of the
203 /// `rebuild_cancelled` eviction signal.
204 pub rebuild_in_flight: AtomicBool,
205
206 /// Git state that the currently-published graph was indexed
207 /// against (Task 7 Phase 7b2, A2 §I / §J.2).
208 ///
209 /// Read by the per-workspace watcher bridge as the `last_git_state`
210 /// argument to
211 /// [`sqry_core::watch::SourceTreeWatcher::wait_for_changes_cancellable`]
212 /// so the classifier has a valid baseline on every debounce window.
213 ///
214 /// Advanced ONLY by [`crate::RebuildDispatcher::execute_one_rebuild`]
215 /// after [`crate::WorkspaceManager::publish_and_retain`] succeeds,
216 /// using the `git_state_at_enqueue` snapshot attached to the
217 /// [`PendingRebuild`] that produced the publish. A failed rebuild
218 /// MUST leave this field unchanged so the next
219 /// `wait_for_changes_cancellable` call still sees the divergent
220 /// state and retries.
221 ///
222 /// # Invariant
223 ///
224 /// - Only `execute_one_rebuild`'s successful-publish arm writes
225 /// this field.
226 /// - Watcher-side event receipt, cancellation, or rebuild failure
227 /// never write this field.
228 /// - The write happens under `parking_lot::RwLock` — short critical
229 /// section, no cross-lock ordering concerns.
230 ///
231 /// `None` on workspace construction (no rebuild has published
232 /// yet). The first successful `execute_one_rebuild` driven by the
233 /// watcher bridge (which attaches `git_state_at_enqueue = Some(...)`)
234 /// populates this field.
235 pub last_indexed_git_state: RwLock<Option<LastIndexedGitState>>,
236}
237
238impl LoadedWorkspace {
239 /// Construct a fresh workspace entry with an empty initial graph.
240 ///
241 /// The empty graph is `Arc`-cheap (sub-kilobyte) so keeping the
242 /// `ArcSwap` non-null for the entire workspace lifetime avoids an
243 /// `Option` layer on the query path. Eviction stores another
244 /// empty graph through the same ArcSwap; re-load overwrites it.
245 #[must_use]
246 pub fn new(key: WorkspaceKey, pinned: bool) -> Self {
247 Self {
248 key,
249 graph: ArcSwap::from_pointee(CodeGraph::new()),
250 state: std::sync::atomic::AtomicU8::new(WorkspaceState::Unloaded.as_u8()),
251 last_accessed: RwLock::new(Instant::now()),
252 memory_bytes: AtomicUsize::new(0),
253 memory_high_water_bytes: AtomicUsize::new(0),
254 pinned,
255 last_error: RwLock::new(None),
256 last_good_at: RwLock::new(None),
257 retry_count: AtomicU32::new(0),
258 rebuild_lane: tokio::sync::Mutex::new(None),
259 rebuild_cancelled: AtomicBool::new(false),
260 rebuild_in_flight: AtomicBool::new(false),
261 last_indexed_git_state: RwLock::new(None),
262 }
263 }
264
265 /// Atomic state read. Round-trips through [`WorkspaceState::from_u8`];
266 /// a discriminant outside the current range panics (it is a
267 /// telemetry-corruption bug, not a recoverable condition).
268 pub fn load_state(&self) -> WorkspaceState {
269 let raw = self.state.load(Ordering::Acquire);
270 WorkspaceState::from_u8(raw)
271 .unwrap_or_else(|| unreachable!("invalid WorkspaceState discriminant {raw}"))
272 }
273
274 /// Atomic state write.
275 pub fn store_state(&self, new_state: WorkspaceState) {
276 self.state.store(new_state.as_u8(), Ordering::Release);
277 }
278
279 /// Update `memory_bytes` and keep `memory_high_water_bytes`
280 /// monotonic. Matches Amendment 2 §D:
281 ///
282 /// > Every time `memory_bytes` is assigned (initial load, full
283 /// > rebuild completion, incremental rebuild's `ArcSwap::store`),
284 /// > immediately call
285 /// > `memory_high_water_bytes.fetch_max(new, Relaxed)`.
286 ///
287 /// Returns the previous `memory_bytes` value so the caller can
288 /// compute the delta the admission accounting needs.
289 pub fn update_memory(&self, new_bytes: usize) -> usize {
290 let prior = self.memory_bytes.swap(new_bytes, Ordering::AcqRel);
291 self.memory_high_water_bytes
292 .fetch_max(new_bytes, Ordering::Relaxed);
293 prior
294 }
295
296 /// Stamp `last_accessed = now` on a query. Held under an `RwLock`
297 /// so the query hot path never blocks — writers contend only with
298 /// other writers.
299 pub fn touch(&self) {
300 *self.last_accessed.write() = Instant::now();
301 }
302
303 /// Record a successful build's wall-clock + reset retry counter.
304 pub fn record_success(&self, at: SystemTime) {
305 *self.last_good_at.write() = Some(at);
306 *self.last_error.write() = None;
307 self.retry_count.store(0, Ordering::Release);
308 }
309
310 /// Record a failed build and return the new retry count. The
311 /// dispatcher uses this to pick the exponential-backoff schedule.
312 pub fn record_failure(&self, err: DaemonError) -> u32 {
313 *self.last_error.write() = Some(err);
314 self.retry_count.fetch_add(1, Ordering::AcqRel) + 1
315 }
316
317 /// Test-only setter for [`Self::last_good_at`].
318 ///
319 /// Task 7 Phase 7c: lets `classify_for_serve` integration tests
320 /// drive the stale-serve age arithmetic against synthetic
321 /// timestamps without needing to run a real rebuild.
322 ///
323 /// `#[doc(hidden)]` to signal "test affordance only" — follows
324 /// the [`crate::TestGate`] / [`crate::TestCapture`] pattern.
325 /// Production code should not call this.
326 #[doc(hidden)]
327 pub fn set_last_good_at_for_test(&self, at: Option<SystemTime>) {
328 *self.last_good_at.write() = at;
329 }
330}
331
332#[cfg(test)]
333mod tests {
334 use std::{path::PathBuf, time::Duration};
335
336 use sqry_core::project::ProjectRootMode;
337
338 use super::*;
339
340 fn make_key() -> WorkspaceKey {
341 WorkspaceKey::new(
342 PathBuf::from("/repos/example"),
343 ProjectRootMode::GitRoot,
344 0x1,
345 )
346 }
347
348 #[test]
349 fn new_workspace_defaults() {
350 let ws = LoadedWorkspace::new(make_key(), false);
351 assert_eq!(ws.load_state(), WorkspaceState::Unloaded);
352 assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 0);
353 assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 0);
354 assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
355 assert!(!ws.rebuild_cancelled.load(Ordering::Relaxed));
356 assert!(
357 !ws.rebuild_in_flight.load(Ordering::Relaxed),
358 "new workspace must start with in_flight=false (no runner)"
359 );
360 assert!(!ws.pinned);
361 assert!(ws.last_error.read().is_none());
362 assert!(ws.last_good_at.read().is_none());
363 }
364
365 #[test]
366 fn state_atomicity_round_trips() {
367 let ws = LoadedWorkspace::new(make_key(), false);
368 ws.store_state(WorkspaceState::Loading);
369 assert_eq!(ws.load_state(), WorkspaceState::Loading);
370 ws.store_state(WorkspaceState::Rebuilding);
371 assert_eq!(ws.load_state(), WorkspaceState::Rebuilding);
372 ws.store_state(WorkspaceState::Failed);
373 assert_eq!(ws.load_state(), WorkspaceState::Failed);
374 }
375
376 #[test]
377 fn update_memory_is_monotonic_high_water() {
378 let ws = LoadedWorkspace::new(make_key(), false);
379 assert_eq!(ws.update_memory(1_000), 0);
380 assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 1_000);
381 assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 1_000);
382
383 // Grow — both must increase.
384 assert_eq!(ws.update_memory(5_000), 1_000);
385 assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 5_000);
386
387 // Shrink — current drops, high-water stays.
388 assert_eq!(ws.update_memory(2_000), 5_000);
389 assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 2_000);
390 assert_eq!(
391 ws.memory_high_water_bytes.load(Ordering::Relaxed),
392 5_000,
393 "high-water mark must be monotonic across rebuilds with smaller graphs",
394 );
395 }
396
397 #[test]
398 fn record_failure_increments_retry_count() {
399 let ws = LoadedWorkspace::new(make_key(), false);
400 let err = || DaemonError::WorkspaceBuildFailed {
401 root: PathBuf::from("/repos/example"),
402 reason: "boom".into(),
403 };
404 assert_eq!(ws.record_failure(err()), 1);
405 assert_eq!(ws.record_failure(err()), 2);
406 assert_eq!(ws.record_failure(err()), 3);
407 assert!(ws.last_error.read().is_some());
408 }
409
410 #[test]
411 fn record_success_clears_error_and_resets_retry() {
412 let ws = LoadedWorkspace::new(make_key(), false);
413 let err = DaemonError::WorkspaceBuildFailed {
414 root: PathBuf::from("/repos/example"),
415 reason: "boom".into(),
416 };
417 assert_eq!(ws.record_failure(err), 1);
418 ws.record_success(SystemTime::now());
419 assert!(ws.last_error.read().is_none());
420 assert!(ws.last_good_at.read().is_some());
421 assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
422 }
423
424 #[test]
425 fn touch_updates_last_accessed() {
426 let ws = LoadedWorkspace::new(make_key(), false);
427 let before = *ws.last_accessed.read();
428 // Small sleep so the second Instant is strictly later.
429 std::thread::sleep(Duration::from_millis(5));
430 ws.touch();
431 let after = *ws.last_accessed.read();
432 assert!(after > before);
433 }
434
435 #[test]
436 fn pinned_flag_is_immutable_via_constructor() {
437 let pinned = LoadedWorkspace::new(make_key(), true);
438 assert!(pinned.pinned);
439 let unpinned = LoadedWorkspace::new(make_key(), false);
440 assert!(!unpinned.pinned);
441 }
442}