sqry_daemon/workspace/loaded.rs
1//! [`LoadedWorkspace`] — per-workspace runtime state.
2//!
3//! Corresponds to Task 6 Step 2 of the sqryd plan, plus the
4//! Amendment-2 additions:
5//!
6//! - `memory_high_water_bytes` (§D) — monotonic peak over the loaded
7//! lifetime; reset only on unload/eviction, never on rebuilds.
8//! - `last_good_at` (§C) — stamped on every successful build; the
9//! stale-serve router uses this to enforce the
10//! `stale_serve_max_age_hours` cap and surface JSON-RPC `-32002`
11//! on expiry.
12//! - `rebuild_cancelled` (§J) — lock-free cancellation signal used by
13//! the dispatcher's background rebuild task to abort at pass
14//! boundaries when the workspace is evicted mid-rebuild.
15//! - `rebuild_lane` (§J) — at most one queued rebuild per workspace.
16//! - `rebuild_in_flight` (§J, Task 7 Phase 7b1) — runner-role gate that
17//! serializes `RebuildDispatcher::handle_changes` callers per
18//! workspace. Transitions happen under `rebuild_lane` on the normal
19//! path; `DrainLoopSentinel` is the sole recovery exception.
20//!
21//! [`ArcSwap<CodeGraph>`] owns the published graph. Queries take
22//! `graph.load_full()` to get a stable `Arc<CodeGraph>` that survives
23//! a concurrent `publish_and_retain` swap; the retention reaper is
24//! responsible for eventually dropping the superseded `Arc`.
25//!
26//! `pinned` workspaces are LRU-exempt. They are still counted against
27//! `memory_bytes` / `total_memory` — a pinned workspace cannot push
28//! the daemon over its budget; admission rejects the load / rebuild
29//! per §G.7 if the only way to fit is to evict the pin itself.
30
31use std::{
32 sync::Arc,
33 sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
34 time::{Instant, SystemTime},
35};
36
37use arc_swap::ArcSwap;
38use once_cell::sync::OnceCell;
39use parking_lot::RwLock;
40use sqry_core::graph::CodeGraph;
41use sqry_core::watch::{ChangeSet, LastIndexedGitState};
42use sqry_nl::Translator;
43
44use crate::error::DaemonError;
45
46use super::state::{WorkspaceKey, WorkspaceState};
47
48/// Rebuild-lane entry enqueued by the Task 7 `RebuildDispatcher`.
49///
50/// The dispatcher's per-workspace call path (A2 §J) holds at most one
51/// pending rebuild per workspace. When a new `ChangeSet` arrives while
52/// another rebuild is in flight, the two are coalesced via
53/// [`Self::coalesce_with`] — union of changed files, OR of
54/// `git_state_changed`, max of `enqueued_at`, full-rebuild-dominance
55/// merge on `git_change_class`, and absorb-None + later-wins merge on
56/// `git_state_at_enqueue`.
57#[derive(Debug, Clone)]
58pub struct PendingRebuild {
59 /// The coalesced [`ChangeSet`] waiting to be processed. Carries
60 /// every file path observed so far plus the worst-case git-state
61 /// classification across all enqueues.
62 pub changes: ChangeSet,
63 /// Wall-clock `Instant` of the most recent enqueue. Updated to
64 /// `max(prior, incoming)` on every coalesce — A2 §J.2.
65 pub enqueued_at: Instant,
66 /// Git-state snapshot captured by the watcher bridge at the moment
67 /// it received this change (Task 7 Phase 7b2). When the runner
68 /// publishes a graph produced from this `PendingRebuild`, it
69 /// commits this snapshot to
70 /// [`LoadedWorkspace::last_indexed_git_state`] — tying baseline
71 /// advance to actual publish consumption rather than a
72 /// bridge-side proxy counter.
73 ///
74 /// `None` for callers that do not attach a git-state snapshot
75 /// (direct tests, future IPC `workspace/force_rebuild`). The
76 /// runner leaves the baseline untouched when consuming a `None`
77 /// entry.
78 ///
79 /// Merge rule under [`Self::coalesce_with`]: absorb-None from
80 /// either side, later wins when both are `Some`.
81 pub git_state_at_enqueue: Option<LastIndexedGitState>,
82}
83
84/// Per-workspace runtime state owned by the
85/// [`super::WorkspaceManager::workspaces`] map.
86///
87/// Mutating state:
88///
89/// - `graph`, `state`, `memory_bytes`, `memory_high_water_bytes`,
90/// `retry_count`, `rebuild_cancelled` are all atomic — queries and
91/// status readers can observe them without taking a mutex.
92/// - `last_accessed`, `last_error`, `last_good_at` are short-critical-
93/// section `RwLock`s — writers are the dispatcher / router at
94/// publish/fail time.
95/// - `rebuild_lane` is a `tokio::sync::Mutex` per A2 §J.4; the
96/// dispatcher holds it briefly to coalesce pending work.
97///
98/// Construction is expensive only in that [`ArcSwap::new`] allocates
99/// one `Arc<CodeGraph>` up front. A fresh workspace is initialised with
100/// an empty-but-live [`CodeGraph`]; the real graph is installed by the
101/// first `publish_and_retain`.
102#[derive(Debug)]
103pub struct LoadedWorkspace {
104 /// Identity key under which the manager stores this workspace.
105 ///
106 /// Immutable for the lifetime of the workspace; if the config
107 /// fingerprint or root mode change, the workspace is unloaded
108 /// under the old key and freshly loaded under a new key.
109 pub key: WorkspaceKey,
110
111 /// Published graph. Readers call `graph.load_full()`.
112 ///
113 /// Only [`crate::workspace::publish::publish_and_retain`] swaps a
114 /// new graph in; eviction stores a fresh empty graph so the
115 /// ArcSwap remains non-null (simpler than Option-wrapping).
116 pub graph: ArcSwap<CodeGraph>,
117
118 /// Current lifecycle state. Stored as `AtomicU8` to keep the
119 /// status read path lock-free; round-trip via
120 /// [`WorkspaceState::as_u8`] / [`WorkspaceState::from_u8`].
121 pub state: std::sync::atomic::AtomicU8,
122
123 /// Last wall-clock time a query observed this workspace, used by
124 /// LRU eviction (§G.7). Short critical section; `RwLock` keeps
125 /// contention negligible.
126 pub last_accessed: RwLock<Instant>,
127
128 /// Current `heap_bytes` of the published graph. Updated on every
129 /// successful `publish_and_retain`. Reads use `Relaxed` ordering —
130 /// the authoritative aggregate lives on
131 /// [`super::admission::AdmissionState`].
132 pub memory_bytes: AtomicUsize,
133
134 /// Peak `memory_bytes` observed over this workspace's loaded
135 /// lifetime. Per Amendment 2 §D:
136 ///
137 /// > High-water marks are monotonic over the workspace's loaded
138 /// > lifetime — they reset only on unload/eviction (fresh
139 /// > LoadedWorkspace), not on rebuilds or backoff.
140 ///
141 /// Updated via `fetch_max` alongside every `memory_bytes` store.
142 pub memory_high_water_bytes: AtomicUsize,
143
144 /// Whether LRU eviction must skip this workspace.
145 pub pinned: bool,
146
147 /// Most recent build/load error, if any. `None` in the
148 /// [`WorkspaceState::Loaded`] steady state. Populated on transition
149 /// into [`WorkspaceState::Failed`] and read back by the router
150 /// when surfacing `meta.last_error` on stale responses.
151 pub last_error: RwLock<Option<DaemonError>>,
152
153 /// Wall-clock of the most recent successful rebuild. Used by the
154 /// router to compute `age_hours` for the
155 /// `stale_serve_max_age_hours` cap and the JSON-RPC `-32002`
156 /// error `error.data.age_hours` payload.
157 pub last_good_at: RwLock<Option<SystemTime>>,
158
159 /// Count of consecutive failed rebuilds. Drives the exponential
160 /// backoff schedule (§G.7 / plan Step 6: 30s → 60s → 120s → 300s
161 /// → 600s). Reset to 0 on every successful publish.
162 pub retry_count: AtomicU32,
163
164 /// At most one queued rebuild per workspace. `None` when the
165 /// dispatcher's lane is idle. Filled / coalesced by the
166 /// dispatcher (Task 7).
167 pub rebuild_lane: tokio::sync::Mutex<Option<PendingRebuild>>,
168
169 /// Lock-free cancellation signal for in-flight rebuilds. Set by
170 /// `evict_lru` / explicit `unload` and polled by the rebuild
171 /// pipeline at each pass boundary. Once set, the running rebuild
172 /// aborts, drops its `RebuildReservation`, and never publishes.
173 pub rebuild_cancelled: AtomicBool,
174
175 /// Runner-role gate for the per-workspace rebuild serial consumer
176 /// (A2 §J.2, Task 7 Phase 7b1). When `true`, exactly one
177 /// [`crate::RebuildDispatcher::handle_changes`] call is actively
178 /// running the full rebuild pipeline (the Phase B drain loop in
179 /// `rebuild.rs`). A concurrent caller observing `true` under the
180 /// [`Self::rebuild_lane`] lock MUST park its coalesced
181 /// [`PendingRebuild`] in the lane and return `Ok(())` without
182 /// executing — the active runner will drain the lane at its next
183 /// drain-loop iteration.
184 ///
185 /// # Invariant
186 ///
187 /// All normal-path transitions of this flag happen while
188 /// [`Self::rebuild_lane`] is held. `DrainLoopSentinel::drop` in
189 /// `rebuild.rs` is the sole recovery exception — it stores `false`
190 /// without the lane on the unwind path, with the narrow-race
191 /// semantics documented on that type.
192 ///
193 /// # Authorised modifiers
194 ///
195 /// - `RebuildDispatcher::handle_changes` Phase A (`false → true`
196 /// under lane).
197 /// - `RebuildDispatcher::handle_changes` Phase B drain-loop exit
198 /// (`true → false` under lane, when the lane is empty or the
199 /// top-of-loop eviction gate fires).
200 /// - `DrainLoopSentinel::drop` panic-safety path (`true → false`,
201 /// bare atomic store, narrow race).
202 ///
203 /// Nothing in `WorkspaceManager` (`execute_eviction`,
204 /// `publish_and_retain`, the retention reaper) touches this flag.
205 /// It is dispatcher-local coordination, independent of the
206 /// `rebuild_cancelled` eviction signal.
207 pub rebuild_in_flight: AtomicBool,
208
209 /// Git state that the currently-published graph was indexed
210 /// against (Task 7 Phase 7b2, A2 §I / §J.2).
211 ///
212 /// Read by the per-workspace watcher bridge as the `last_git_state`
213 /// argument to
214 /// [`sqry_core::watch::SourceTreeWatcher::wait_for_changes_cancellable`]
215 /// so the classifier has a valid baseline on every debounce window.
216 ///
217 /// Advanced ONLY by [`crate::RebuildDispatcher::execute_one_rebuild`]
218 /// after [`crate::WorkspaceManager::publish_and_retain`] succeeds,
219 /// using the `git_state_at_enqueue` snapshot attached to the
220 /// [`PendingRebuild`] that produced the publish. A failed rebuild
221 /// MUST leave this field unchanged so the next
222 /// `wait_for_changes_cancellable` call still sees the divergent
223 /// state and retries.
224 ///
225 /// # Invariant
226 ///
227 /// - Only `execute_one_rebuild`'s successful-publish arm writes
228 /// this field.
229 /// - Watcher-side event receipt, cancellation, or rebuild failure
230 /// never write this field.
231 /// - The write happens under `parking_lot::RwLock` — short critical
232 /// section, no cross-lock ordering concerns.
233 ///
234 /// `None` on workspace construction (no rebuild has published
235 /// yet). The first successful `execute_one_rebuild` driven by the
236 /// watcher bridge (which attaches `git_state_at_enqueue = Some(...)`)
237 /// populates this field.
238 pub last_indexed_git_state: RwLock<Option<LastIndexedGitState>>,
239
240 /// NL07 — lazily-initialised, per-workspace
241 /// [`sqry_nl::Translator`] backing the daemon-hosted `sqry_ask`
242 /// MCP tool.
243 ///
244 /// Init cost is heavy (O(seconds) — N parallel ONNX session
245 /// loads from the [`sqry_nl::classifier::ClassifierPool`]) so the
246 /// daemon defers it to the first `sqry_ask` call against the
247 /// workspace via [`OnceCell::get_or_try_init`]. Subsequent calls
248 /// receive a cheap `Arc<Translator>` clone.
249 ///
250 /// Lifetime: tied to [`LoadedWorkspace`] itself. Workspace
251 /// eviction (LRU, explicit unload, daemon shutdown) drops the
252 /// `OnceCell`, which drops the `Arc<Translator>`, which drops
253 /// the pool's `N` classifier sessions and frees their model
254 /// weights. No explicit cleanup required.
255 ///
256 /// Concurrency: the cell itself is `Send + Sync`. The
257 /// `Translator` inside uses an internal sync pool that the
258 /// daemon's MCP host calls under `tokio::task::spawn_blocking`
259 /// so the daemon's tokio runtime never blocks on `recv()`.
260 pub nl_translator: OnceCell<Arc<Translator>>,
261}
262
263impl LoadedWorkspace {
264 /// Construct a fresh workspace entry with an empty initial graph.
265 ///
266 /// The empty graph is `Arc`-cheap (sub-kilobyte) so keeping the
267 /// `ArcSwap` non-null for the entire workspace lifetime avoids an
268 /// `Option` layer on the query path. Eviction stores another
269 /// empty graph through the same ArcSwap; re-load overwrites it.
270 #[must_use]
271 pub fn new(key: WorkspaceKey, pinned: bool) -> Self {
272 Self {
273 key,
274 graph: ArcSwap::from_pointee(CodeGraph::new()),
275 state: std::sync::atomic::AtomicU8::new(WorkspaceState::Unloaded.as_u8()),
276 last_accessed: RwLock::new(Instant::now()),
277 memory_bytes: AtomicUsize::new(0),
278 memory_high_water_bytes: AtomicUsize::new(0),
279 pinned,
280 last_error: RwLock::new(None),
281 last_good_at: RwLock::new(None),
282 retry_count: AtomicU32::new(0),
283 rebuild_lane: tokio::sync::Mutex::new(None),
284 rebuild_cancelled: AtomicBool::new(false),
285 rebuild_in_flight: AtomicBool::new(false),
286 last_indexed_git_state: RwLock::new(None),
287 nl_translator: OnceCell::new(),
288 }
289 }
290
291 /// Atomic state read. Round-trips through [`WorkspaceState::from_u8`];
292 /// a discriminant outside the current range panics (it is a
293 /// telemetry-corruption bug, not a recoverable condition).
294 pub fn load_state(&self) -> WorkspaceState {
295 let raw = self.state.load(Ordering::Acquire);
296 WorkspaceState::from_u8(raw)
297 .unwrap_or_else(|| unreachable!("invalid WorkspaceState discriminant {raw}"))
298 }
299
300 /// Atomic state write.
301 pub fn store_state(&self, new_state: WorkspaceState) {
302 self.state.store(new_state.as_u8(), Ordering::Release);
303 }
304
305 /// Update `memory_bytes` and keep `memory_high_water_bytes`
306 /// monotonic. Matches Amendment 2 §D:
307 ///
308 /// > Every time `memory_bytes` is assigned (initial load, full
309 /// > rebuild completion, incremental rebuild's `ArcSwap::store`),
310 /// > immediately call
311 /// > `memory_high_water_bytes.fetch_max(new, Relaxed)`.
312 ///
313 /// Returns the previous `memory_bytes` value so the caller can
314 /// compute the delta the admission accounting needs.
315 pub fn update_memory(&self, new_bytes: usize) -> usize {
316 let prior = self.memory_bytes.swap(new_bytes, Ordering::AcqRel);
317 self.memory_high_water_bytes
318 .fetch_max(new_bytes, Ordering::Relaxed);
319 prior
320 }
321
322 /// Stamp `last_accessed = now` on a query. Held under an `RwLock`
323 /// so the query hot path never blocks — writers contend only with
324 /// other writers.
325 pub fn touch(&self) {
326 *self.last_accessed.write() = Instant::now();
327 }
328
329 /// Record a successful build's wall-clock + reset retry counter.
330 pub fn record_success(&self, at: SystemTime) {
331 *self.last_good_at.write() = Some(at);
332 *self.last_error.write() = None;
333 self.retry_count.store(0, Ordering::Release);
334 }
335
336 /// Record a failed build and return the new retry count. The
337 /// dispatcher uses this to pick the exponential-backoff schedule.
338 pub fn record_failure(&self, err: DaemonError) -> u32 {
339 *self.last_error.write() = Some(err);
340 self.retry_count.fetch_add(1, Ordering::AcqRel) + 1
341 }
342
343 /// Test-only setter for [`Self::last_good_at`].
344 ///
345 /// Task 7 Phase 7c: lets `classify_for_serve` integration tests
346 /// drive the stale-serve age arithmetic against synthetic
347 /// timestamps without needing to run a real rebuild.
348 ///
349 /// `#[doc(hidden)]` to signal "test affordance only" — follows
350 /// the [`crate::TestGate`] / [`crate::TestCapture`] pattern.
351 /// Production code should not call this.
352 #[doc(hidden)]
353 pub fn set_last_good_at_for_test(&self, at: Option<SystemTime>) {
354 *self.last_good_at.write() = at;
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use std::{path::PathBuf, time::Duration};
361
362 use sqry_core::project::ProjectRootMode;
363
364 use super::*;
365
366 fn make_key() -> WorkspaceKey {
367 WorkspaceKey::new(
368 PathBuf::from("/repos/example"),
369 ProjectRootMode::GitRoot,
370 0x1,
371 )
372 }
373
374 #[test]
375 fn new_workspace_defaults() {
376 let ws = LoadedWorkspace::new(make_key(), false);
377 assert_eq!(ws.load_state(), WorkspaceState::Unloaded);
378 assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 0);
379 assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 0);
380 assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
381 assert!(!ws.rebuild_cancelled.load(Ordering::Relaxed));
382 assert!(
383 !ws.rebuild_in_flight.load(Ordering::Relaxed),
384 "new workspace must start with in_flight=false (no runner)"
385 );
386 assert!(!ws.pinned);
387 assert!(ws.last_error.read().is_none());
388 assert!(ws.last_good_at.read().is_none());
389 }
390
391 #[test]
392 fn state_atomicity_round_trips() {
393 let ws = LoadedWorkspace::new(make_key(), false);
394 ws.store_state(WorkspaceState::Loading);
395 assert_eq!(ws.load_state(), WorkspaceState::Loading);
396 ws.store_state(WorkspaceState::Rebuilding);
397 assert_eq!(ws.load_state(), WorkspaceState::Rebuilding);
398 ws.store_state(WorkspaceState::Failed);
399 assert_eq!(ws.load_state(), WorkspaceState::Failed);
400 }
401
402 #[test]
403 fn update_memory_is_monotonic_high_water() {
404 let ws = LoadedWorkspace::new(make_key(), false);
405 assert_eq!(ws.update_memory(1_000), 0);
406 assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 1_000);
407 assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 1_000);
408
409 // Grow — both must increase.
410 assert_eq!(ws.update_memory(5_000), 1_000);
411 assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 5_000);
412
413 // Shrink — current drops, high-water stays.
414 assert_eq!(ws.update_memory(2_000), 5_000);
415 assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 2_000);
416 assert_eq!(
417 ws.memory_high_water_bytes.load(Ordering::Relaxed),
418 5_000,
419 "high-water mark must be monotonic across rebuilds with smaller graphs",
420 );
421 }
422
423 #[test]
424 fn record_failure_increments_retry_count() {
425 let ws = LoadedWorkspace::new(make_key(), false);
426 let err = || DaemonError::WorkspaceBuildFailed {
427 root: PathBuf::from("/repos/example"),
428 reason: "boom".into(),
429 };
430 assert_eq!(ws.record_failure(err()), 1);
431 assert_eq!(ws.record_failure(err()), 2);
432 assert_eq!(ws.record_failure(err()), 3);
433 assert!(ws.last_error.read().is_some());
434 }
435
436 #[test]
437 fn record_success_clears_error_and_resets_retry() {
438 let ws = LoadedWorkspace::new(make_key(), false);
439 let err = DaemonError::WorkspaceBuildFailed {
440 root: PathBuf::from("/repos/example"),
441 reason: "boom".into(),
442 };
443 assert_eq!(ws.record_failure(err), 1);
444 ws.record_success(SystemTime::now());
445 assert!(ws.last_error.read().is_none());
446 assert!(ws.last_good_at.read().is_some());
447 assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
448 }
449
450 #[test]
451 fn touch_updates_last_accessed() {
452 let ws = LoadedWorkspace::new(make_key(), false);
453 let before = *ws.last_accessed.read();
454 // Small sleep so the second Instant is strictly later.
455 std::thread::sleep(Duration::from_millis(5));
456 ws.touch();
457 let after = *ws.last_accessed.read();
458 assert!(after > before);
459 }
460
461 #[test]
462 fn pinned_flag_is_immutable_via_constructor() {
463 let pinned = LoadedWorkspace::new(make_key(), true);
464 assert!(pinned.pinned);
465 let unpinned = LoadedWorkspace::new(make_key(), false);
466 assert!(!unpinned.pinned);
467 }
468}