Skip to main content

sqry_daemon/workspace/
loaded.rs

1//! [`LoadedWorkspace`] — per-workspace runtime state.
2//!
3//! Corresponds to Task 6 Step 2 of the sqryd plan, plus the
4//! Amendment-2 additions:
5//!
6//! - `memory_high_water_bytes` (§D) — monotonic peak over the loaded
7//!   lifetime; reset only on unload/eviction, never on rebuilds.
8//! - `last_good_at` (§C) — stamped on every successful build; the
9//!   stale-serve router uses this to enforce the
10//!   `stale_serve_max_age_hours` cap and surface JSON-RPC `-32002`
11//!   on expiry.
12//! - `rebuild_cancelled` (§J) — lock-free cancellation signal used by
13//!   the dispatcher's background rebuild task to abort at pass
14//!   boundaries when the workspace is evicted mid-rebuild.
15//! - `rebuild_lane` (§J) — at most one queued rebuild per workspace.
16//! - `rebuild_in_flight` (§J, Task 7 Phase 7b1) — runner-role gate that
17//!   serializes `RebuildDispatcher::handle_changes` callers per
18//!   workspace. Transitions happen under `rebuild_lane` on the normal
19//!   path; `DrainLoopSentinel` is the sole recovery exception.
20//!
21//! [`ArcSwap<CodeGraph>`] owns the published graph. Queries take
22//! `graph.load_full()` to get a stable `Arc<CodeGraph>` that survives
23//! a concurrent `publish_and_retain` swap; the retention reaper is
24//! responsible for eventually dropping the superseded `Arc`.
25//!
26//! `pinned` workspaces are LRU-exempt. They are still counted against
27//! `memory_bytes` / `total_memory` — a pinned workspace cannot push
28//! the daemon over its budget; admission rejects the load / rebuild
29//! per §G.7 if the only way to fit is to evict the pin itself.
30
31use std::{
32    sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering},
33    time::{Instant, SystemTime},
34};
35
36use arc_swap::ArcSwap;
37use parking_lot::RwLock;
38use sqry_core::graph::CodeGraph;
39use sqry_core::watch::{ChangeSet, LastIndexedGitState};
40
41use crate::error::DaemonError;
42
43use super::state::{WorkspaceKey, WorkspaceState};
44
45/// Rebuild-lane entry enqueued by the Task 7 `RebuildDispatcher`.
46///
47/// The dispatcher's per-workspace call path (A2 §J) holds at most one
48/// pending rebuild per workspace. When a new `ChangeSet` arrives while
49/// another rebuild is in flight, the two are coalesced via
50/// [`Self::coalesce_with`] — union of changed files, OR of
51/// `git_state_changed`, max of `enqueued_at`, full-rebuild-dominance
52/// merge on `git_change_class`, and absorb-None + later-wins merge on
53/// `git_state_at_enqueue`.
54#[derive(Debug, Clone)]
55pub struct PendingRebuild {
56    /// The coalesced [`ChangeSet`] waiting to be processed. Carries
57    /// every file path observed so far plus the worst-case git-state
58    /// classification across all enqueues.
59    pub changes: ChangeSet,
60    /// Wall-clock `Instant` of the most recent enqueue. Updated to
61    /// `max(prior, incoming)` on every coalesce — A2 §J.2.
62    pub enqueued_at: Instant,
63    /// Git-state snapshot captured by the watcher bridge at the moment
64    /// it received this change (Task 7 Phase 7b2). When the runner
65    /// publishes a graph produced from this `PendingRebuild`, it
66    /// commits this snapshot to
67    /// [`LoadedWorkspace::last_indexed_git_state`] — tying baseline
68    /// advance to actual publish consumption rather than a
69    /// bridge-side proxy counter.
70    ///
71    /// `None` for callers that do not attach a git-state snapshot
72    /// (direct tests, future IPC `workspace/force_rebuild`). The
73    /// runner leaves the baseline untouched when consuming a `None`
74    /// entry.
75    ///
76    /// Merge rule under [`Self::coalesce_with`]: absorb-None from
77    /// either side, later wins when both are `Some`.
78    pub git_state_at_enqueue: Option<LastIndexedGitState>,
79}
80
81/// Per-workspace runtime state owned by the
82/// [`super::WorkspaceManager::workspaces`] map.
83///
84/// Mutating state:
85///
86/// - `graph`, `state`, `memory_bytes`, `memory_high_water_bytes`,
87///   `retry_count`, `rebuild_cancelled` are all atomic — queries and
88///   status readers can observe them without taking a mutex.
89/// - `last_accessed`, `last_error`, `last_good_at` are short-critical-
90///   section `RwLock`s — writers are the dispatcher / router at
91///   publish/fail time.
92/// - `rebuild_lane` is a `tokio::sync::Mutex` per A2 §J.4; the
93///   dispatcher holds it briefly to coalesce pending work.
94///
95/// Construction is expensive only in that [`ArcSwap::new`] allocates
96/// one `Arc<CodeGraph>` up front. A fresh workspace is initialised with
97/// an empty-but-live [`CodeGraph`]; the real graph is installed by the
98/// first `publish_and_retain`.
99#[derive(Debug)]
100pub struct LoadedWorkspace {
101    /// Identity key under which the manager stores this workspace.
102    ///
103    /// Immutable for the lifetime of the workspace; if the config
104    /// fingerprint or root mode change, the workspace is unloaded
105    /// under the old key and freshly loaded under a new key.
106    pub key: WorkspaceKey,
107
108    /// Published graph. Readers call `graph.load_full()`.
109    ///
110    /// Only [`crate::workspace::publish::publish_and_retain`] swaps a
111    /// new graph in; eviction stores a fresh empty graph so the
112    /// ArcSwap remains non-null (simpler than Option-wrapping).
113    pub graph: ArcSwap<CodeGraph>,
114
115    /// Current lifecycle state. Stored as `AtomicU8` to keep the
116    /// status read path lock-free; round-trip via
117    /// [`WorkspaceState::as_u8`] / [`WorkspaceState::from_u8`].
118    pub state: std::sync::atomic::AtomicU8,
119
120    /// Last wall-clock time a query observed this workspace, used by
121    /// LRU eviction (§G.7). Short critical section; `RwLock` keeps
122    /// contention negligible.
123    pub last_accessed: RwLock<Instant>,
124
125    /// Current `heap_bytes` of the published graph. Updated on every
126    /// successful `publish_and_retain`. Reads use `Relaxed` ordering —
127    /// the authoritative aggregate lives on
128    /// [`super::admission::AdmissionState`].
129    pub memory_bytes: AtomicUsize,
130
131    /// Peak `memory_bytes` observed over this workspace's loaded
132    /// lifetime. Per Amendment 2 §D:
133    ///
134    /// > High-water marks are monotonic over the workspace's loaded
135    /// > lifetime — they reset only on unload/eviction (fresh
136    /// > LoadedWorkspace), not on rebuilds or backoff.
137    ///
138    /// Updated via `fetch_max` alongside every `memory_bytes` store.
139    pub memory_high_water_bytes: AtomicUsize,
140
141    /// Whether LRU eviction must skip this workspace.
142    pub pinned: bool,
143
144    /// Most recent build/load error, if any. `None` in the
145    /// [`WorkspaceState::Loaded`] steady state. Populated on transition
146    /// into [`WorkspaceState::Failed`] and read back by the router
147    /// when surfacing `meta.last_error` on stale responses.
148    pub last_error: RwLock<Option<DaemonError>>,
149
150    /// Wall-clock of the most recent successful rebuild. Used by the
151    /// router to compute `age_hours` for the
152    /// `stale_serve_max_age_hours` cap and the JSON-RPC `-32002`
153    /// error `error.data.age_hours` payload.
154    pub last_good_at: RwLock<Option<SystemTime>>,
155
156    /// Count of consecutive failed rebuilds. Drives the exponential
157    /// backoff schedule (§G.7 / plan Step 6: 30s → 60s → 120s → 300s
158    /// → 600s). Reset to 0 on every successful publish.
159    pub retry_count: AtomicU32,
160
161    /// At most one queued rebuild per workspace. `None` when the
162    /// dispatcher's lane is idle. Filled / coalesced by the
163    /// dispatcher (Task 7).
164    pub rebuild_lane: tokio::sync::Mutex<Option<PendingRebuild>>,
165
166    /// Lock-free cancellation signal for in-flight rebuilds. Set by
167    /// `evict_lru` / explicit `unload` and polled by the rebuild
168    /// pipeline at each pass boundary. Once set, the running rebuild
169    /// aborts, drops its `RebuildReservation`, and never publishes.
170    pub rebuild_cancelled: AtomicBool,
171
172    /// Runner-role gate for the per-workspace rebuild serial consumer
173    /// (A2 §J.2, Task 7 Phase 7b1). When `true`, exactly one
174    /// [`crate::RebuildDispatcher::handle_changes`] call is actively
175    /// running the full rebuild pipeline (the Phase B drain loop in
176    /// `rebuild.rs`). A concurrent caller observing `true` under the
177    /// [`Self::rebuild_lane`] lock MUST park its coalesced
178    /// [`PendingRebuild`] in the lane and return `Ok(())` without
179    /// executing — the active runner will drain the lane at its next
180    /// drain-loop iteration.
181    ///
182    /// # Invariant
183    ///
184    /// All normal-path transitions of this flag happen while
185    /// [`Self::rebuild_lane`] is held. `DrainLoopSentinel::drop` in
186    /// `rebuild.rs` is the sole recovery exception — it stores `false`
187    /// without the lane on the unwind path, with the narrow-race
188    /// semantics documented on that type.
189    ///
190    /// # Authorised modifiers
191    ///
192    /// - `RebuildDispatcher::handle_changes` Phase A (`false → true`
193    ///   under lane).
194    /// - `RebuildDispatcher::handle_changes` Phase B drain-loop exit
195    ///   (`true → false` under lane, when the lane is empty or the
196    ///   top-of-loop eviction gate fires).
197    /// - `DrainLoopSentinel::drop` panic-safety path (`true → false`,
198    ///   bare atomic store, narrow race).
199    ///
200    /// Nothing in `WorkspaceManager` (`execute_eviction`,
201    /// `publish_and_retain`, the retention reaper) touches this flag.
202    /// It is dispatcher-local coordination, independent of the
203    /// `rebuild_cancelled` eviction signal.
204    pub rebuild_in_flight: AtomicBool,
205
206    /// Git state that the currently-published graph was indexed
207    /// against (Task 7 Phase 7b2, A2 §I / §J.2).
208    ///
209    /// Read by the per-workspace watcher bridge as the `last_git_state`
210    /// argument to
211    /// [`sqry_core::watch::SourceTreeWatcher::wait_for_changes_cancellable`]
212    /// so the classifier has a valid baseline on every debounce window.
213    ///
214    /// Advanced ONLY by [`crate::RebuildDispatcher::execute_one_rebuild`]
215    /// after [`crate::WorkspaceManager::publish_and_retain`] succeeds,
216    /// using the `git_state_at_enqueue` snapshot attached to the
217    /// [`PendingRebuild`] that produced the publish. A failed rebuild
218    /// MUST leave this field unchanged so the next
219    /// `wait_for_changes_cancellable` call still sees the divergent
220    /// state and retries.
221    ///
222    /// # Invariant
223    ///
224    /// - Only `execute_one_rebuild`'s successful-publish arm writes
225    ///   this field.
226    /// - Watcher-side event receipt, cancellation, or rebuild failure
227    ///   never write this field.
228    /// - The write happens under `parking_lot::RwLock` — short critical
229    ///   section, no cross-lock ordering concerns.
230    ///
231    /// `None` on workspace construction (no rebuild has published
232    /// yet). The first successful `execute_one_rebuild` driven by the
233    /// watcher bridge (which attaches `git_state_at_enqueue = Some(...)`)
234    /// populates this field.
235    pub last_indexed_git_state: RwLock<Option<LastIndexedGitState>>,
236}
237
238impl LoadedWorkspace {
239    /// Construct a fresh workspace entry with an empty initial graph.
240    ///
241    /// The empty graph is `Arc`-cheap (sub-kilobyte) so keeping the
242    /// `ArcSwap` non-null for the entire workspace lifetime avoids an
243    /// `Option` layer on the query path. Eviction stores another
244    /// empty graph through the same ArcSwap; re-load overwrites it.
245    #[must_use]
246    pub fn new(key: WorkspaceKey, pinned: bool) -> Self {
247        Self {
248            key,
249            graph: ArcSwap::from_pointee(CodeGraph::new()),
250            state: std::sync::atomic::AtomicU8::new(WorkspaceState::Unloaded.as_u8()),
251            last_accessed: RwLock::new(Instant::now()),
252            memory_bytes: AtomicUsize::new(0),
253            memory_high_water_bytes: AtomicUsize::new(0),
254            pinned,
255            last_error: RwLock::new(None),
256            last_good_at: RwLock::new(None),
257            retry_count: AtomicU32::new(0),
258            rebuild_lane: tokio::sync::Mutex::new(None),
259            rebuild_cancelled: AtomicBool::new(false),
260            rebuild_in_flight: AtomicBool::new(false),
261            last_indexed_git_state: RwLock::new(None),
262        }
263    }
264
265    /// Atomic state read. Round-trips through [`WorkspaceState::from_u8`];
266    /// a discriminant outside the current range panics (it is a
267    /// telemetry-corruption bug, not a recoverable condition).
268    pub fn load_state(&self) -> WorkspaceState {
269        let raw = self.state.load(Ordering::Acquire);
270        WorkspaceState::from_u8(raw)
271            .unwrap_or_else(|| unreachable!("invalid WorkspaceState discriminant {raw}"))
272    }
273
274    /// Atomic state write.
275    pub fn store_state(&self, new_state: WorkspaceState) {
276        self.state.store(new_state.as_u8(), Ordering::Release);
277    }
278
279    /// Update `memory_bytes` and keep `memory_high_water_bytes`
280    /// monotonic. Matches Amendment 2 §D:
281    ///
282    /// > Every time `memory_bytes` is assigned (initial load, full
283    /// > rebuild completion, incremental rebuild's `ArcSwap::store`),
284    /// > immediately call
285    /// > `memory_high_water_bytes.fetch_max(new, Relaxed)`.
286    ///
287    /// Returns the previous `memory_bytes` value so the caller can
288    /// compute the delta the admission accounting needs.
289    pub fn update_memory(&self, new_bytes: usize) -> usize {
290        let prior = self.memory_bytes.swap(new_bytes, Ordering::AcqRel);
291        self.memory_high_water_bytes
292            .fetch_max(new_bytes, Ordering::Relaxed);
293        prior
294    }
295
296    /// Stamp `last_accessed = now` on a query. Held under an `RwLock`
297    /// so the query hot path never blocks — writers contend only with
298    /// other writers.
299    pub fn touch(&self) {
300        *self.last_accessed.write() = Instant::now();
301    }
302
303    /// Record a successful build's wall-clock + reset retry counter.
304    pub fn record_success(&self, at: SystemTime) {
305        *self.last_good_at.write() = Some(at);
306        *self.last_error.write() = None;
307        self.retry_count.store(0, Ordering::Release);
308    }
309
310    /// Record a failed build and return the new retry count. The
311    /// dispatcher uses this to pick the exponential-backoff schedule.
312    pub fn record_failure(&self, err: DaemonError) -> u32 {
313        *self.last_error.write() = Some(err);
314        self.retry_count.fetch_add(1, Ordering::AcqRel) + 1
315    }
316
317    /// Test-only setter for [`Self::last_good_at`].
318    ///
319    /// Task 7 Phase 7c: lets `classify_for_serve` integration tests
320    /// drive the stale-serve age arithmetic against synthetic
321    /// timestamps without needing to run a real rebuild.
322    ///
323    /// `#[doc(hidden)]` to signal "test affordance only" — follows
324    /// the [`crate::TestGate`] / [`crate::TestCapture`] pattern.
325    /// Production code should not call this.
326    #[doc(hidden)]
327    pub fn set_last_good_at_for_test(&self, at: Option<SystemTime>) {
328        *self.last_good_at.write() = at;
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use std::{path::PathBuf, time::Duration};
335
336    use sqry_core::project::ProjectRootMode;
337
338    use super::*;
339
340    fn make_key() -> WorkspaceKey {
341        WorkspaceKey::new(
342            PathBuf::from("/repos/example"),
343            ProjectRootMode::GitRoot,
344            0x1,
345        )
346    }
347
348    #[test]
349    fn new_workspace_defaults() {
350        let ws = LoadedWorkspace::new(make_key(), false);
351        assert_eq!(ws.load_state(), WorkspaceState::Unloaded);
352        assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 0);
353        assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 0);
354        assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
355        assert!(!ws.rebuild_cancelled.load(Ordering::Relaxed));
356        assert!(
357            !ws.rebuild_in_flight.load(Ordering::Relaxed),
358            "new workspace must start with in_flight=false (no runner)"
359        );
360        assert!(!ws.pinned);
361        assert!(ws.last_error.read().is_none());
362        assert!(ws.last_good_at.read().is_none());
363    }
364
365    #[test]
366    fn state_atomicity_round_trips() {
367        let ws = LoadedWorkspace::new(make_key(), false);
368        ws.store_state(WorkspaceState::Loading);
369        assert_eq!(ws.load_state(), WorkspaceState::Loading);
370        ws.store_state(WorkspaceState::Rebuilding);
371        assert_eq!(ws.load_state(), WorkspaceState::Rebuilding);
372        ws.store_state(WorkspaceState::Failed);
373        assert_eq!(ws.load_state(), WorkspaceState::Failed);
374    }
375
376    #[test]
377    fn update_memory_is_monotonic_high_water() {
378        let ws = LoadedWorkspace::new(make_key(), false);
379        assert_eq!(ws.update_memory(1_000), 0);
380        assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 1_000);
381        assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 1_000);
382
383        // Grow — both must increase.
384        assert_eq!(ws.update_memory(5_000), 1_000);
385        assert_eq!(ws.memory_high_water_bytes.load(Ordering::Relaxed), 5_000);
386
387        // Shrink — current drops, high-water stays.
388        assert_eq!(ws.update_memory(2_000), 5_000);
389        assert_eq!(ws.memory_bytes.load(Ordering::Relaxed), 2_000);
390        assert_eq!(
391            ws.memory_high_water_bytes.load(Ordering::Relaxed),
392            5_000,
393            "high-water mark must be monotonic across rebuilds with smaller graphs",
394        );
395    }
396
397    #[test]
398    fn record_failure_increments_retry_count() {
399        let ws = LoadedWorkspace::new(make_key(), false);
400        let err = || DaemonError::WorkspaceBuildFailed {
401            root: PathBuf::from("/repos/example"),
402            reason: "boom".into(),
403        };
404        assert_eq!(ws.record_failure(err()), 1);
405        assert_eq!(ws.record_failure(err()), 2);
406        assert_eq!(ws.record_failure(err()), 3);
407        assert!(ws.last_error.read().is_some());
408    }
409
410    #[test]
411    fn record_success_clears_error_and_resets_retry() {
412        let ws = LoadedWorkspace::new(make_key(), false);
413        let err = DaemonError::WorkspaceBuildFailed {
414            root: PathBuf::from("/repos/example"),
415            reason: "boom".into(),
416        };
417        assert_eq!(ws.record_failure(err), 1);
418        ws.record_success(SystemTime::now());
419        assert!(ws.last_error.read().is_none());
420        assert!(ws.last_good_at.read().is_some());
421        assert_eq!(ws.retry_count.load(Ordering::Relaxed), 0);
422    }
423
424    #[test]
425    fn touch_updates_last_accessed() {
426        let ws = LoadedWorkspace::new(make_key(), false);
427        let before = *ws.last_accessed.read();
428        // Small sleep so the second Instant is strictly later.
429        std::thread::sleep(Duration::from_millis(5));
430        ws.touch();
431        let after = *ws.last_accessed.read();
432        assert!(after > before);
433    }
434
435    #[test]
436    fn pinned_flag_is_immutable_via_constructor() {
437        let pinned = LoadedWorkspace::new(make_key(), true);
438        assert!(pinned.pinned);
439        let unpinned = LoadedWorkspace::new(make_key(), false);
440        assert!(!unpinned.pinned);
441    }
442}