Skip to main content

sqry_daemon/workspace/
loaded.rs

1//! [`LoadedWorkspace`] — per-workspace runtime state.
2//!
3//! Corresponds to Task 6 Step 2 of the sqryd plan, plus the
4//! Amendment-2 additions:
5//!
6//! - `memory_high_water_bytes` (§D) — monotonic peak over the loaded
7//!   lifetime; reset only on unload/eviction, never on rebuilds.
8//! - `last_good_at` (§C) — stamped on every successful build; the
9//!   stale-serve router uses this to enforce the
10//!   `stale_serve_max_age_hours` cap and surface JSON-RPC `-32002`
11//!   on expiry.
12//! - `rebuild_cancelled` (§J) — lock-free cancellation signal used by
13//!   the dispatcher's background rebuild task to abort at pass
14//!   boundaries when the workspace is evicted mid-rebuild.
15//! - `rebuild_lane` (§J) — at most one queued rebuild per workspace.
16//! - `rebuild_in_flight` (§J, Task 7 Phase 7b1) — runner-role gate that
17//!   serializes `RebuildDispatcher::handle_changes` callers per
18//!   workspace. Transitions happen under `rebuild_lane` on the normal
19//!   path; `DrainLoopSentinel` is the sole recovery exception.
20//!
21//! [`ArcSwap<CodeGraph>`] owns the published graph. Queries take
22//! `graph.load_full()` to get a stable `Arc<CodeGraph>` that survives
23//! a concurrent `publish_and_retain` swap; the retention reaper is
24//! responsible for eventually dropping the superseded `Arc`.
25//!
26//! `pinned` workspaces are LRU-exempt. They are still counted against
27//! `memory_bytes` / `total_memory` — a pinned workspace cannot push
28//! the daemon over its budget; admission rejects the load / rebuild
29//! per §G.7 if the only way to fit is to evict the pin itself.
30
31use std::{
32    sync::Arc,
33    sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
34    time::{Instant, SystemTime},
35};
36
37use arc_swap::ArcSwap;
38use once_cell::sync::OnceCell;
39use parking_lot::RwLock;
40use sqry_core::graph::CodeGraph;
41use sqry_core::watch::{ChangeSet, LastIndexedGitState};
42use sqry_nl::Translator;
43
44use crate::error::DaemonError;
45
46use super::state::{WorkspaceKey, WorkspaceState};
47
48/// Rebuild-lane entry enqueued by the Task 7 `RebuildDispatcher`.
49///
50/// The dispatcher's per-workspace call path (A2 §J) holds at most one
51/// pending rebuild per workspace. When a new `ChangeSet` arrives while
52/// another rebuild is in flight, the two are coalesced via
53/// [`Self::coalesce_with`] — union of changed files, OR of
54/// `git_state_changed`, max of `enqueued_at`, full-rebuild-dominance
55/// merge on `git_change_class`, and absorb-None + later-wins merge on
56/// `git_state_at_enqueue`.
57#[derive(Debug, Clone)]
58pub struct PendingRebuild {
59    /// The coalesced [`ChangeSet`] waiting to be processed. Carries
60    /// every file path observed so far plus the worst-case git-state
61    /// classification across all enqueues.
62    pub changes: ChangeSet,
63    /// Wall-clock `Instant` of the most recent enqueue. Updated to
64    /// `max(prior, incoming)` on every coalesce — A2 §J.2.
65    pub enqueued_at: Instant,
66    /// Git-state snapshot captured by the watcher bridge at the moment
67    /// it received this change (Task 7 Phase 7b2). When the runner
68    /// publishes a graph produced from this `PendingRebuild`, it
69    /// commits this snapshot to
70    /// [`LoadedWorkspace::last_indexed_git_state`] — tying baseline
71    /// advance to actual publish consumption rather than a
72    /// bridge-side proxy counter.
73    ///
74    /// `None` for callers that do not attach a git-state snapshot
75    /// (direct tests, future IPC `workspace/force_rebuild`). The
76    /// runner leaves the baseline untouched when consuming a `None`
77    /// entry.
78    ///
79    /// Merge rule under [`Self::coalesce_with`]: absorb-None from
80    /// either side, later wins when both are `Some`.
81    pub git_state_at_enqueue: Option<LastIndexedGitState>,
82}
83
84/// Per-workspace runtime state owned by the
85/// [`super::WorkspaceManager::workspaces`] map.
86///
87/// Mutating state:
88///
89/// - `graph`, `state`, `memory_bytes`, `memory_high_water_bytes`,
90///   `retry_count`, `rebuild_cancelled` are all atomic — queries and
91///   status readers can observe them without taking a mutex.
92/// - `last_accessed`, `last_error`, `last_good_at` are short-critical-
93///   section `RwLock`s — writers are the dispatcher / router at
94///   publish/fail time.
95/// - `rebuild_lane` is a `tokio::sync::Mutex` per A2 §J.4; the
96///   dispatcher holds it briefly to coalesce pending work.
97///
98/// Construction is expensive only in that [`ArcSwap::new`] allocates
99/// one `Arc<CodeGraph>` up front. A fresh workspace is initialised with
100/// an empty-but-live [`CodeGraph`]; the real graph is installed by the
101/// first `publish_and_retain`.
102#[derive(Debug)]
103pub struct LoadedWorkspace {
104    /// Identity key under which the manager stores this workspace.
105    ///
106    /// Immutable for the lifetime of the workspace; if the config
107    /// fingerprint or root mode change, the workspace is unloaded
108    /// under the old key and freshly loaded under a new key.
109    pub key: WorkspaceKey,
110
111    /// Published graph. Readers call `graph.load_full()`.
112    ///
113    /// Only [`crate::workspace::publish::publish_and_retain`] swaps a
114    /// new graph in; eviction stores a fresh empty graph so the
115    /// ArcSwap remains non-null (simpler than Option-wrapping).
116    pub graph: ArcSwap<CodeGraph>,
117
118    /// Current lifecycle state. Stored as `AtomicU8` to keep the
119    /// status read path lock-free; round-trip via
120    /// [`WorkspaceState::as_u8`] / [`WorkspaceState::from_u8`].
121    pub state: std::sync::atomic::AtomicU8,
122
123    /// Last wall-clock time a query observed this workspace, used by
124    /// LRU eviction (§G.7). Short critical section; `RwLock` keeps
125    /// contention negligible.
126    pub last_accessed: RwLock<Instant>,
127
128    /// Current `heap_bytes` of the published graph. Updated on every
129    /// successful `publish_and_retain`. Reads use `Relaxed` ordering —
130    /// the authoritative aggregate lives on
131    /// [`super::admission::AdmissionState`].
132    pub memory_bytes: AtomicUsize,
133
134    /// Peak `memory_bytes` observed over this workspace's loaded
135    /// lifetime. Per Amendment 2 §D:
136    ///
137    /// > High-water marks are monotonic over the workspace's loaded
138    /// > lifetime — they reset only on unload/eviction (fresh
139    /// > LoadedWorkspace), not on rebuilds or backoff.
140    ///
141    /// Updated via `fetch_max` alongside every `memory_bytes` store.
142    pub memory_high_water_bytes: AtomicUsize,
143
144    /// Whether LRU eviction must skip this workspace.
145    pub pinned: bool,
146
147    /// Most recent build/load error, if any. `None` in the
148    /// [`WorkspaceState::Loaded`] steady state. Populated on transition
149    /// into [`WorkspaceState::Failed`] and read back by the router
150    /// when surfacing `meta.last_error` on stale responses.
151    pub last_error: RwLock<Option<DaemonError>>,
152
153    /// Wall-clock of the most recent successful rebuild. Used by the
154    /// router to compute `age_hours` for the
155    /// `stale_serve_max_age_hours` cap and the JSON-RPC `-32002`
156    /// error `error.data.age_hours` payload.
157    pub last_good_at: RwLock<Option<SystemTime>>,
158
159    /// Count of consecutive failed rebuilds. Drives the exponential
160    /// backoff schedule (§G.7 / plan Step 6: 30s → 60s → 120s → 300s
161    /// → 600s). Reset to 0 on every successful publish.
162    pub retry_count: AtomicU32,
163
164    /// At most one queued rebuild per workspace. `None` when the
165    /// dispatcher's lane is idle. Filled / coalesced by the
166    /// dispatcher (Task 7).
167    pub rebuild_lane: tokio::sync::Mutex<Option<PendingRebuild>>,
168
169    /// Lock-free cancellation signal for in-flight rebuilds. Set by
170    /// `evict_lru` / explicit `unload` and polled by the rebuild
171    /// pipeline at each pass boundary. Once set, the running rebuild
172    /// aborts, drops its `RebuildReservation`, and never publishes.
173    pub rebuild_cancelled: AtomicBool,
174
175    /// Runner-role gate for the per-workspace rebuild serial consumer
176    /// (A2 §J.2, Task 7 Phase 7b1). When `true`, exactly one
177    /// [`crate::RebuildDispatcher::handle_changes`] call is actively
178    /// running the full rebuild pipeline (the Phase B drain loop in
179    /// `rebuild.rs`). A concurrent caller observing `true` under the
180    /// [`Self::rebuild_lane`] lock MUST park its coalesced
181    /// [`PendingRebuild`] in the lane and return `Ok(())` without
182    /// executing — the active runner will drain the lane at its next
183    /// drain-loop iteration.
184    ///
185    /// # Invariant
186    ///
187    /// All normal-path transitions of this flag happen while
188    /// [`Self::rebuild_lane`] is held. `DrainLoopSentinel::drop` in
189    /// `rebuild.rs` is the sole recovery exception — it stores `false`
190    /// without the lane on the unwind path, with the narrow-race
191    /// semantics documented on that type.
192    ///
193    /// # Authorised modifiers
194    ///
195    /// - `RebuildDispatcher::handle_changes` Phase A (`false → true`
196    ///   under lane).
197    /// - `RebuildDispatcher::handle_changes` Phase B drain-loop exit
198    ///   (`true → false` under lane, when the lane is empty or the
199    ///   top-of-loop eviction gate fires).
200    /// - `DrainLoopSentinel::drop` panic-safety path (`true → false`,
201    ///   bare atomic store, narrow race).
202    ///
203    /// Nothing in `WorkspaceManager` (`execute_eviction`,
204    /// `publish_and_retain`, the retention reaper) touches this flag.
205    /// It is dispatcher-local coordination, independent of the
206    /// `rebuild_cancelled` eviction signal.
207    pub rebuild_in_flight: AtomicBool,
208
209    /// Git state that the currently-published graph was indexed
210    /// against (Task 7 Phase 7b2, A2 §I / §J.2).
211    ///
212    /// Read by the per-workspace watcher bridge as the `last_git_state`
213    /// argument to
214    /// [`sqry_core::watch::SourceTreeWatcher::wait_for_changes_cancellable`]
215    /// so the classifier has a valid baseline on every debounce window.
216    ///
217    /// Advanced ONLY by [`crate::RebuildDispatcher::execute_one_rebuild`]
218    /// after [`crate::WorkspaceManager::publish_and_retain`] succeeds,
219    /// using the `git_state_at_enqueue` snapshot attached to the
220    /// [`PendingRebuild`] that produced the publish. A failed rebuild
221    /// MUST leave this field unchanged so the next
222    /// `wait_for_changes_cancellable` call still sees the divergent
223    /// state and retries.
224    ///
225    /// # Invariant
226    ///
227    /// - Only `execute_one_rebuild`'s successful-publish arm writes
228    ///   this field.
229    /// - Watcher-side event receipt, cancellation, or rebuild failure
230    ///   never write this field.
231    /// - The write happens under `parking_lot::RwLock` — short critical
232    ///   section, no cross-lock ordering concerns.
233    ///
234    /// `None` on workspace construction (no rebuild has published
235    /// yet). The first successful `execute_one_rebuild` driven by the
236    /// watcher bridge (which attaches `git_state_at_enqueue = Some(...)`)
237    /// populates this field.
238    pub last_indexed_git_state: RwLock<Option<LastIndexedGitState>>,
239
240    /// NL07 — lazily-initialised, per-workspace
241    /// [`sqry_nl::Translator`] backing the daemon-hosted `sqry_ask`
242    /// MCP tool.
243    ///
244    /// Init cost is heavy (O(seconds) — N parallel ONNX session
245    /// loads from the [`sqry_nl::classifier::ClassifierPool`]) so the
246    /// daemon defers it to the first `sqry_ask` call against the
247    /// workspace via [`OnceCell::get_or_try_init`]. Subsequent calls
248    /// receive a cheap `Arc<Translator>` clone.
249    ///
250    /// Lifetime: tied to [`LoadedWorkspace`] itself. Workspace
251    /// eviction (LRU, explicit unload, daemon shutdown) drops the
252    /// `OnceCell`, which drops the `Arc<Translator>`, which drops
253    /// the pool's `N` classifier sessions and frees their model
254    /// weights. No explicit cleanup required.
255    ///
256    /// Concurrency: the cell itself is `Send + Sync`. The
257    /// `Translator` inside uses an internal sync pool that the
258    /// daemon's MCP host calls under `tokio::task::spawn_blocking`
259    /// so the daemon's tokio runtime never blocks on `recv()`.
260    pub nl_translator: OnceCell<Arc<Translator>>,
261}
262
263impl LoadedWorkspace {
264    /// Construct a fresh workspace entry with an empty initial graph.
265    ///
266    /// The empty graph is `Arc`-cheap (sub-kilobyte) so keeping the
267    /// `ArcSwap` non-null for the entire workspace lifetime avoids an
268    /// `Option` layer on the query path. Eviction stores another
269    /// empty graph through the same ArcSwap; re-load overwrites it.
270    #[must_use]
271    pub fn new(key: WorkspaceKey, pinned: bool) -> Self {
272        Self {
273            key,
274            graph: ArcSwap::from_pointee(CodeGraph::new()),
275            state: std::sync::atomic::AtomicU8::new(WorkspaceState::Unloaded.as_u8()),
276            last_accessed: RwLock::new(Instant::now()),
277            memory_bytes: AtomicUsize::new(0),
278            memory_high_water_bytes: AtomicUsize::new(0),
279            pinned,
280            last_error: RwLock::new(None),
281            last_good_at: RwLock::new(None),
282            retry_count: AtomicU32::new(0),
283            rebuild_lane: tokio::sync::Mutex::new(None),
284            rebuild_cancelled: AtomicBool::new(false),
285            rebuild_in_flight: AtomicBool::new(false),
286            last_indexed_git_state: RwLock::new(None),
287            nl_translator: OnceCell::new(),
288        }
289    }
290
291    /// Atomic state read. Round-trips through [`WorkspaceState::from_u8`];
292    /// a discriminant outside the current range panics (it is a
293    /// telemetry-corruption bug, not a recoverable condition).
294    pub fn load_state(&self) -> WorkspaceState {
295        let raw = self.state.load(Ordering::Acquire);
296        WorkspaceState::from_u8(raw)
297            .unwrap_or_else(|| unreachable!("invalid WorkspaceState discriminant {raw}"))
298    }
299
300    /// Atomic state write.
301    pub fn store_state(&self, new_state: WorkspaceState) {
302        self.state.store(new_state.as_u8(), Ordering::Release);
303    }
304
305    /// Update `memory_bytes` and keep `memory_high_water_bytes`
306    /// monotonic. Matches Amendment 2 §D:
307    ///
308    /// > Every time `memory_bytes` is assigned (initial load, full
309    /// > rebuild completion, incremental rebuild's `ArcSwap::store`),
310    /// > immediately call
311    /// > `memory_high_water_bytes.fetch_max(new, Relaxed)`.
312    ///
313    /// Returns the previous `memory_bytes` value so the caller can
314    /// compute the delta the admission accounting needs.
315    pub fn update_memory(&self, new_bytes: usize) -> usize {
316        let prior = self.memory_bytes.swap(new_bytes, Ordering::AcqRel);
317        self.memory_high_water_bytes
318            .fetch_max(new_bytes, Ordering::Relaxed);
319        prior
320    }
321
322    /// Stamp `last_accessed = now` on a query. Held under an `RwLock`
323    /// so the query hot path never blocks — writers contend only with
324    /// other writers.
325    pub fn touch(&self) {
326        *self.last_accessed.write() = Instant::now();
327    }
328
329    /// Record a successful build's wall-clock + reset retry counter.
330    pub fn record_success(&self, at: SystemTime) {
331        *self.last_good_at.write() = Some(at);
332        *self.last_error.write() = None;
333        self.retry_count.store(0, Ordering::Release);
334    }
335
336    /// Record a failed build and return the new retry count. The
337    /// dispatcher uses this to pick the exponential-backoff schedule.
338    pub fn record_failure(&self, err: DaemonError) -> u32 {
339        *self.last_error.write() = Some(err);
340        self.retry_count.fetch_add(1, Ordering::AcqRel) + 1
341    }
342
343    /// Test-only setter for [`Self::last_good_at`].
344    ///
345    /// Task 7 Phase 7c: lets `classify_for_serve` integration tests
346    /// drive the stale-serve age arithmetic against synthetic
347    /// timestamps without needing to run a real rebuild.
348    ///
349    /// `#[doc(hidden)]` to signal "test affordance only" — follows
350    /// the [`crate::TestGate`] / [`crate::TestCapture`] pattern.
351    /// Production code should not call this.
352    #[doc(hidden)]
353    pub fn set_last_good_at_for_test(&self, at: Option<SystemTime>) {
354        *self.last_good_at.write() = at;
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use std::{path::PathBuf, time::Duration};
361
362    use sqry_core::project::ProjectRootMode;
363
364    use super::*;
365
366    fn make_key() -> WorkspaceKey {
367        WorkspaceKey::new(
368            PathBuf::from("/repos/example"),
369            ProjectRootMode::GitRoot,
370            0x1,
371        )
372    }
373
374    #[test]
375    fn new_workspace_defaults() {
376        let ws = LoadedWorkspace::new(make_key(), false);
377        assert_eq!(ws.load_state(), WorkspaceState::Unloaded);
378        assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 0);
379        assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 0);
380        assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
381        assert!(!ws.rebuild_cancelled.load(Ordering::Relaxed));
382        assert!(
383            !ws.rebuild_in_flight.load(Ordering::Relaxed),
384            "new workspace must start with in_flight=false (no runner)"
385        );
386        assert!(!ws.pinned);
387        assert!(ws.last_error.read().is_none());
388        assert!(ws.last_good_at.read().is_none());
389    }
390
391    #[test]
392    fn state_atomicity_round_trips() {
393        let ws = LoadedWorkspace::new(make_key(), false);
394        ws.store_state(WorkspaceState::Loading);
395        assert_eq!(ws.load_state(), WorkspaceState::Loading);
396        ws.store_state(WorkspaceState::Rebuilding);
397        assert_eq!(ws.load_state(), WorkspaceState::Rebuilding);
398        ws.store_state(WorkspaceState::Failed);
399        assert_eq!(ws.load_state(), WorkspaceState::Failed);
400    }
401
402    #[test]
403    fn update_memory_is_monotonic_high_water() {
404        let ws = LoadedWorkspace::new(make_key(), false);
405        assert_eq!(ws.update_memory(1_000), 0);
406        assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 1_000);
407        assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 1_000);
408
409        // Grow — both must increase.
410        assert_eq!(ws.update_memory(5_000), 1_000);
411        assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 5_000);
412
413        // Shrink — current drops, high-water stays.
414        assert_eq!(ws.update_memory(2_000), 5_000);
415        assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 2_000);
416        assert_eq!(
417            ws.memory_high_water_bytes.load(Ordering::Relaxed),
418            5_000,
419            "high-water mark must be monotonic across rebuilds with smaller graphs",
420        );
421    }
422
423    #[test]
424    fn record_failure_increments_retry_count() {
425        let ws = LoadedWorkspace::new(make_key(), false);
426        let err = || DaemonError::WorkspaceBuildFailed {
427            root: PathBuf::from("/repos/example"),
428            reason: "boom".into(),
429        };
430        assert_eq!(ws.record_failure(err()), 1);
431        assert_eq!(ws.record_failure(err()), 2);
432        assert_eq!(ws.record_failure(err()), 3);
433        assert!(ws.last_error.read().is_some());
434    }
435
436    #[test]
437    fn record_success_clears_error_and_resets_retry() {
438        let ws = LoadedWorkspace::new(make_key(), false);
439        let err = DaemonError::WorkspaceBuildFailed {
440            root: PathBuf::from("/repos/example"),
441            reason: "boom".into(),
442        };
443        assert_eq!(ws.record_failure(err), 1);
444        ws.record_success(SystemTime::now());
445        assert!(ws.last_error.read().is_none());
446        assert!(ws.last_good_at.read().is_some());
447        assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
448    }
449
450    #[test]
451    fn touch_updates_last_accessed() {
452        let ws = LoadedWorkspace::new(make_key(), false);
453        let before = *ws.last_accessed.read();
454        // Small sleep so the second Instant is strictly later.
455        std::thread::sleep(Duration::from_millis(5));
456        ws.touch();
457        let after = *ws.last_accessed.read();
458        assert!(after > before);
459    }
460
461    #[test]
462    fn pinned_flag_is_immutable_via_constructor() {
463        let pinned = LoadedWorkspace::new(make_key(), true);
464        assert!(pinned.pinned);
465        let unpinned = LoadedWorkspace::new(make_key(), false);
466        assert!(!unpinned.pinned);
467    }
468}