use std::{
collections::HashMap,
path::Path,
sync::{
Arc, Weak,
atomic::{AtomicU64, Ordering},
},
time::{Duration, Instant, SystemTime},
};
use parking_lot::{Mutex, RwLock};
use sqry_core::graph::{CodeGraph, unified::GraphMemorySize};
use tokio::task::JoinHandle;
use tracing::warn;
use crate::{config::DaemonConfig, error::DaemonError};
use super::{
admission::{AdmissionState, RetainedEntry},
builder::WorkspaceBuilder,
hook::{NoOpHook, SharedHook, SqrydHook},
loaded::LoadedWorkspace,
staleness::{StalenessVerdict, classify_staleness},
state::{OldGraphToken, WorkspaceKey, WorkspaceState},
status::{DaemonStatus, MemoryStatus, WorkspaceStatus},
};
#[derive(Debug, Clone)]
pub enum ServeVerdict {
Fresh {
graph: Arc<CodeGraph>,
state: WorkspaceState,
},
Stale {
graph: Arc<CodeGraph>,
age_hours: u64,
last_good_at: SystemTime,
last_error: Option<String>,
},
NotReady { state: WorkspaceState },
}
#[derive(Debug)]
pub struct WorkspaceManager {
config: Arc<DaemonConfig>,
workspaces: RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>>,
admission: Mutex<AdmissionState>,
reaper: Mutex<Option<JoinHandle<()>>>,
started_at: Instant,
total_memory_high_water: AtomicU64,
hook: RwLock<SharedHook>,
}
impl WorkspaceManager {
pub fn new(config: Arc<DaemonConfig>) -> Arc<Self> {
let mgr = Arc::new(Self {
config: Arc::clone(&config),
workspaces: RwLock::new(HashMap::new()),
admission: Mutex::new(AdmissionState::default()),
reaper: Mutex::new(None),
started_at: Instant::now(),
total_memory_high_water: AtomicU64::new(0),
hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
});
let handle = tokio::spawn(retention_reaper(Arc::downgrade(&mgr)));
*mgr.reaper.lock() = Some(handle);
mgr
}
#[doc(hidden)]
pub fn new_without_reaper(config: Arc<DaemonConfig>) -> Arc<Self> {
Arc::new(Self {
config,
workspaces: RwLock::new(HashMap::new()),
admission: Mutex::new(AdmissionState::default()),
reaper: Mutex::new(None),
started_at: Instant::now(),
total_memory_high_water: AtomicU64::new(0),
hook: RwLock::new(Arc::new(NoOpHook) as SharedHook),
})
}
pub fn set_hook(&self, hook: SharedHook) {
*self.hook.write() = hook;
}
fn hook_snapshot(&self) -> SharedHook {
Arc::clone(&*self.hook.read())
}
#[must_use]
pub fn memory_limit_bytes(&self) -> u64 {
self.config.memory_limit_bytes()
}
#[allow(dead_code)]
pub(crate) fn workspaces(&self) -> &RwLock<HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
&self.workspaces
}
#[allow(dead_code)]
pub(crate) fn admission(&self) -> &Mutex<AdmissionState> {
&self.admission
}
#[allow(dead_code)] pub fn lookup(&self, key: &WorkspaceKey) -> Option<Arc<LoadedWorkspace>> {
self.workspaces.read().get(key).cloned()
}
pub fn reap_once(&self) {
let timeout = Duration::from_millis(self.config.rebuild_drain_timeout_ms);
let now = Instant::now();
let mut to_log: Vec<OldGraphToken> = Vec::new();
{
let mut state = self.admission.lock();
state.retained_old.retain(|token, entry| {
if Arc::strong_count(&entry.graph) == 1 {
false } else {
if !entry.warned_past_timeout
&& now.saturating_duration_since(entry.published_at) > timeout
{
entry.warned_past_timeout = true;
to_log.push(*token);
}
true
}
});
}
for token in to_log {
warn!(
token = %token,
drain_timeout_ms = self.config.rebuild_drain_timeout_ms,
"sqryd retention reaper: retained old graph still held past drain timeout \
(not an accounting deadline — bytes stay accounted until strong_count == 1)",
);
}
}
pub fn reserve_rebuild(
self: &Arc<Self>,
for_key: &WorkspaceKey,
working_set_estimate: u64,
) -> Result<RebuildReservation, DaemonError> {
let limit = self.memory_limit_bytes();
let victims = {
let workspaces = self.workspaces.read();
let Some(requester_ws) = workspaces.get(for_key) else {
return Err(DaemonError::WorkspaceEvicted {
root: for_key.index_root.clone(),
});
};
if requester_ws.rebuild_cancelled.load(Ordering::Acquire) {
return Err(DaemonError::WorkspaceEvicted {
root: for_key.index_root.clone(),
});
}
let state = self.admission.lock();
let projected = state
.total_committed_bytes()
.saturating_add(working_set_estimate);
if projected <= limit {
Vec::new() } else {
let need = projected - limit;
Self::plan_eviction(&workspaces, &state, need, for_key)
}
};
for key in &victims {
self.execute_eviction(key);
}
if !victims.is_empty() {
self.reap_once();
}
let mut state = self.admission.lock();
let projected = state
.total_committed_bytes()
.saturating_add(working_set_estimate);
if projected > limit {
return Err(DaemonError::MemoryBudgetExceeded {
limit_bytes: limit,
current_bytes: state.loaded_bytes,
reserved_bytes: state.reserved_bytes,
retained_bytes: state.retained_total_bytes(),
requested_bytes: working_set_estimate,
});
}
state.reserved_bytes = state.reserved_bytes.saturating_add(working_set_estimate);
self.bump_high_water(&state);
drop(state);
Ok(RebuildReservation {
manager: Arc::downgrade(self),
bytes: working_set_estimate,
released: false,
})
}
fn plan_eviction(
workspaces: &HashMap<WorkspaceKey, Arc<LoadedWorkspace>>,
_state: &AdmissionState,
need: u64,
for_key: &WorkspaceKey,
) -> Vec<WorkspaceKey> {
let mut candidates: Vec<(Instant, u64, WorkspaceKey)> = workspaces
.iter()
.filter(|(k, ws)| {
**k != *for_key
&& !ws.pinned
&& ws.load_state() != WorkspaceState::Evicted
&& ws.load_state() != WorkspaceState::Unloaded
})
.map(|(k, ws)| {
let last = *ws.last_accessed.read();
let bytes = ws.memory_bytes.load(Ordering::Acquire) as u64;
(last, bytes, k.clone())
})
.collect();
candidates.sort_by_key(|(ts, _, _)| *ts);
let mut plan = Vec::new();
let mut reclaimed: u64 = 0;
for (_, bytes, key) in candidates {
if reclaimed >= need {
break;
}
plan.push(key);
reclaimed = reclaimed.saturating_add(bytes);
}
plan
}
fn execute_eviction(&self, key: &WorkspaceKey) {
let mut workspaces = self.workspaces.write();
let Some(ws) = workspaces.get(key).cloned() else {
return; };
let old_arc = ws.graph.swap(Arc::new(CodeGraph::new()));
let prior_bytes_usize = ws.memory_bytes.swap(0, Ordering::AcqRel);
let prior_bytes = prior_bytes_usize as u64;
let token = OldGraphToken::new();
{
let mut state = self.admission.lock();
state.loaded_bytes = state.loaded_bytes.saturating_sub(prior_bytes);
state.retained_old.insert(
token,
RetainedEntry {
bytes: prior_bytes,
graph: old_arc,
published_at: Instant::now(),
warned_past_timeout: false,
},
);
self.bump_high_water(&state);
}
ws.rebuild_cancelled.store(true, Ordering::Release);
ws.store_state(WorkspaceState::Evicted);
workspaces.remove(key);
}
pub fn get_or_load(
self: &Arc<Self>,
key: &WorkspaceKey,
builder: &dyn WorkspaceBuilder,
working_set_estimate: u64,
) -> Result<Arc<CodeGraph>, DaemonError> {
{
let workspaces = self.workspaces.read();
if let Some(ws) = workspaces.get(key)
&& ws.load_state() == WorkspaceState::Loaded
{
ws.touch();
return Ok(ws.graph.load_full());
}
}
let ws = self.get_or_insert_workspace(key);
let allowed = [
WorkspaceState::Unloaded.as_u8(),
WorkspaceState::Failed.as_u8(),
WorkspaceState::Evicted.as_u8(),
];
let mut acquired = false;
for prior in allowed {
if ws
.state
.compare_exchange(
prior,
WorkspaceState::Loading.as_u8(),
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
acquired = true;
break;
}
}
if !acquired {
let current = ws.load_state();
if current == WorkspaceState::Loaded {
ws.touch();
return Ok(ws.graph.load_full());
}
return Err(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: format!("workspace load already in progress ({current})"),
});
}
let pre_cancelled = ws.rebuild_cancelled.swap(false, Ordering::AcqRel);
if pre_cancelled {
ws.rebuild_cancelled.store(true, Ordering::Release);
ws.store_state(WorkspaceState::Failed);
return Err(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: "workspace evicted mid-load".to_string(),
});
}
let mut loading = LoadingGuard {
ws: &ws,
key,
armed: true,
};
let reservation = self.reserve_rebuild(key, working_set_estimate)?;
let graph = match builder.build(&key.index_root) {
Ok(g) => g,
Err(err) => {
drop(reservation);
ws.record_failure(clone_err(&err));
loading.armed = false;
ws.store_state(WorkspaceState::Failed);
return Err(err);
}
};
let workspaces_guard = self.workspaces.read();
if ws.rebuild_cancelled.load(Ordering::Acquire) {
drop(workspaces_guard);
drop(reservation);
ws.record_failure(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: "workspace evicted mid-load".to_string(),
});
loading.armed = false;
ws.store_state(WorkspaceState::Failed);
return Err(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: "workspace evicted mid-load".to_string(),
});
}
if !workspaces_guard.contains_key(key) {
drop(workspaces_guard);
drop(reservation);
ws.record_failure(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: "workspace removed mid-load".to_string(),
});
loading.armed = false;
ws.store_state(WorkspaceState::Failed);
return Err(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: "workspace removed mid-load".to_string(),
});
}
let (_token, published_arc) = self.publish_and_retain(reservation, &ws, graph);
ws.record_success(std::time::SystemTime::now());
ws.store_state(WorkspaceState::Loaded);
ws.touch();
loading.armed = false;
drop(workspaces_guard);
let hook = self.hook_snapshot();
hook.on_publish(&key.index_root, Arc::clone(&published_arc));
Ok(published_arc)
}
fn get_or_insert_workspace(&self, key: &WorkspaceKey) -> Arc<LoadedWorkspace> {
if let Some(ws) = self.workspaces.read().get(key) {
return Arc::clone(ws);
}
let mut workspaces = self.workspaces.write();
Arc::clone(
workspaces
.entry(key.clone())
.or_insert_with(|| Arc::new(LoadedWorkspace::new(key.clone(), false))),
)
}
pub fn evict_lru(&self) -> Option<WorkspaceKey> {
let candidate = {
let workspaces = self.workspaces.read();
workspaces
.iter()
.filter(|(_, ws)| {
!ws.pinned
&& ws.load_state() != WorkspaceState::Evicted
&& ws.load_state() != WorkspaceState::Unloaded
})
.min_by_key(|(_, ws)| *ws.last_accessed.read())
.map(|(k, _)| k.clone())
};
if let Some(key) = &candidate {
self.execute_eviction(key);
}
candidate
}
pub fn unload(&self, key: &WorkspaceKey) -> bool {
let present = self.workspaces.read().contains_key(key);
if present {
self.execute_eviction(key);
}
present
}
#[must_use]
pub fn find_key_and_workspace_by_path(
&self,
path: &std::path::Path,
) -> Option<(WorkspaceKey, Arc<LoadedWorkspace>)> {
let workspaces = self.workspaces.read();
workspaces
.iter()
.find(|(k, _)| k.index_root == path)
.map(|(k, ws)| (k.clone(), Arc::clone(ws)))
}
pub fn status(&self) -> DaemonStatus {
let workspaces_snapshot: Vec<WorkspaceStatus> = {
let workspaces = self.workspaces.read();
let mut entries: Vec<_> = workspaces
.iter()
.map(|(k, ws)| WorkspaceStatus {
index_root: k.index_root.clone(),
state: ws.load_state(),
pinned: ws.pinned,
current_bytes: ws.memory_bytes.load(Ordering::Acquire) as u64,
high_water_bytes: ws.memory_high_water_bytes.load(Ordering::Acquire) as u64,
last_good_at: *ws.last_good_at.read(),
last_error: ws.last_error.read().as_ref().map(|e| e.to_string()),
retry_count: ws.retry_count.load(Ordering::Acquire),
})
.collect();
entries.sort_by(|a, b| a.index_root.cmp(&b.index_root));
entries
};
let (current_bytes, reserved_bytes, high_water_bytes) = {
let state = self.admission.lock();
let current = state.total_committed_bytes();
let reserved = state.reserved_bytes;
let peak = self
.total_memory_high_water
.fetch_max(current, Ordering::AcqRel);
let peak = peak.max(current);
drop(state);
(current, reserved, peak)
};
DaemonStatus {
uptime_seconds: self.started_at.elapsed().as_secs(),
daemon_version: env!("CARGO_PKG_VERSION").to_string(),
memory: MemoryStatus {
limit_bytes: self.memory_limit_bytes(),
current_bytes,
reserved_bytes,
high_water_bytes,
},
workspaces: workspaces_snapshot,
}
}
fn bump_high_water(&self, state: &AdmissionState) {
let current = state.total_committed_bytes();
self.total_memory_high_water
.fetch_max(current, Ordering::AcqRel);
}
#[doc(hidden)]
pub fn insert_workspace_in_state_for_test(&self, key: WorkspaceKey, state: WorkspaceState) {
let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
ws.store_state(state);
self.workspaces.write().insert(key, ws);
}
pub(crate) fn workspaces_read(
&self,
) -> parking_lot::RwLockReadGuard<'_, HashMap<WorkspaceKey, Arc<LoadedWorkspace>>> {
self.workspaces.read()
}
pub fn classify_for_serve(
&self,
key: &WorkspaceKey,
now: std::time::SystemTime,
) -> Result<ServeVerdict, DaemonError> {
let snapshot = {
let workspaces = self.workspaces.read();
let Some(ws) = workspaces.get(key).cloned() else {
return Err(DaemonError::WorkspaceEvicted {
root: key.index_root.clone(),
});
};
let state = ws.load_state();
let graph = ws.graph.load_full();
let last_good = *ws.last_good_at.read();
let last_error_text = ws.last_error.read().as_ref().map(|e| e.to_string());
(state, graph, last_good, last_error_text)
};
let (state, graph, last_good, last_error_text) = snapshot;
match state {
WorkspaceState::Loaded | WorkspaceState::Rebuilding => {
Ok(ServeVerdict::Fresh { graph, state })
}
WorkspaceState::Failed => {
let cap = self.config.stale_serve_max_age_hours;
match classify_staleness(last_good, cap, now) {
StalenessVerdict::NoPriorGood => Err(DaemonError::WorkspaceBuildFailed {
root: key.index_root.clone(),
reason: last_error_text
.unwrap_or_else(|| "no prior successful build".into()),
}),
StalenessVerdict::Stale { age_hours } => Ok(ServeVerdict::Stale {
graph,
age_hours,
last_good_at: last_good
.expect("Stale verdict only emitted when last_good.is_some()"),
last_error: last_error_text,
}),
StalenessVerdict::Expired { age_hours } => {
Err(DaemonError::WorkspaceStaleExpired {
root: key.index_root.clone(),
age_hours,
cap_hours: cap,
last_good_at: last_good,
last_error: last_error_text,
})
}
}
}
WorkspaceState::Unloaded | WorkspaceState::Loading => {
Ok(ServeVerdict::NotReady { state })
}
WorkspaceState::Evicted => Err(DaemonError::WorkspaceEvicted {
root: key.index_root.clone(),
}),
}
}
pub fn publish_and_retain(
self: &Arc<Self>,
reservation: RebuildReservation,
workspace: &LoadedWorkspace,
new_graph: CodeGraph,
) -> (OldGraphToken, Arc<CodeGraph>) {
let new_bytes_usize = new_graph.heap_bytes();
let new_bytes = new_bytes_usize as u64;
let mut reservation = reservation;
let reservation_bytes = reservation.bytes;
let new_arc = Arc::new(new_graph);
let published_arc = Arc::clone(&new_arc);
let token = OldGraphToken::new();
let prior_arc_for_rollback = workspace.graph.load_full();
let prior_bytes = workspace
.memory_bytes
.load(std::sync::atomic::Ordering::Acquire);
let mut rollback = RollbackGuard {
ws: workspace,
prior_arc: Some(prior_arc_for_rollback),
prior_bytes,
armed: true,
};
let old_arc = workspace.graph.swap(new_arc);
let prev_memory_bytes = workspace.update_memory(new_bytes_usize);
debug_assert_eq!(
prev_memory_bytes, prior_bytes,
"RollbackGuard prior_bytes must match update_memory's returned prior",
);
let retained_entry = RetainedEntry {
bytes: prev_memory_bytes as u64,
graph: old_arc,
published_at: Instant::now(),
warned_past_timeout: false,
};
let mut state = self.admission.lock();
state.retained_old.insert(token, retained_entry);
state.reserved_bytes = state.reserved_bytes.saturating_sub(reservation_bytes);
state.loaded_bytes = state
.loaded_bytes
.saturating_sub(prev_memory_bytes as u64)
.saturating_add(new_bytes);
reservation.released = true;
self.bump_high_water(&state);
drop(state);
rollback.armed = false;
(token, published_arc)
}
fn shutdown_reaper(&self) {
if let Some(handle) = self.reaper.lock().take() {
handle.abort();
}
}
}
impl Drop for WorkspaceManager {
fn drop(&mut self) {
self.shutdown_reaper();
}
}
pub(crate) struct LoadingGuard<'a> {
pub(crate) ws: &'a LoadedWorkspace,
pub(crate) key: &'a WorkspaceKey,
pub(crate) armed: bool,
}
impl<'a> Drop for LoadingGuard<'a> {
fn drop(&mut self) {
if !self.armed {
return;
}
{
let mut slot = self.ws.last_error.write();
if slot.is_none() {
*slot = Some(DaemonError::WorkspaceBuildFailed {
root: self.key.index_root.clone(),
reason: "workspace load aborted unexpectedly".to_string(),
});
}
}
self.ws.retry_count.fetch_add(1, Ordering::AcqRel);
self.ws.store_state(WorkspaceState::Failed);
}
}
pub(crate) fn clone_err(err: &DaemonError) -> DaemonError {
match err {
DaemonError::WorkspaceBuildFailed { root, reason } => DaemonError::WorkspaceBuildFailed {
root: root.clone(),
reason: reason.clone(),
},
DaemonError::WorkspaceStaleExpired {
root,
age_hours,
cap_hours,
last_good_at,
last_error,
} => DaemonError::WorkspaceStaleExpired {
root: root.clone(),
age_hours: *age_hours,
cap_hours: *cap_hours,
last_good_at: *last_good_at,
last_error: last_error.clone(),
},
DaemonError::MemoryBudgetExceeded {
limit_bytes,
current_bytes,
reserved_bytes,
retained_bytes,
requested_bytes,
} => DaemonError::MemoryBudgetExceeded {
limit_bytes: *limit_bytes,
current_bytes: *current_bytes,
reserved_bytes: *reserved_bytes,
retained_bytes: *retained_bytes,
requested_bytes: *requested_bytes,
},
DaemonError::WorkspaceEvicted { root } => {
DaemonError::WorkspaceEvicted { root: root.clone() }
}
DaemonError::WorkspaceNotLoaded { root } => {
DaemonError::WorkspaceNotLoaded { root: root.clone() }
}
DaemonError::ToolTimeout {
root,
secs,
deadline_ms,
} => DaemonError::ToolTimeout {
root: root.clone(),
secs: *secs,
deadline_ms: *deadline_ms,
},
DaemonError::InvalidArgument { reason } => DaemonError::InvalidArgument {
reason: reason.clone(),
},
DaemonError::Internal(err) => {
DaemonError::Internal(anyhow::anyhow!("{err:#}"))
}
DaemonError::AlreadyRunning { socket, lock, .. } => DaemonError::WorkspaceBuildFailed {
root: Path::new("<unknown>").to_path_buf(),
reason: format!(
"daemon already running on socket {} (lock: {})",
socket.display(),
lock.display()
),
},
DaemonError::AutoStartTimeout {
timeout_secs,
socket,
} => DaemonError::WorkspaceBuildFailed {
root: Path::new("<unknown>").to_path_buf(),
reason: format!(
"daemon did not become ready within {timeout_secs}s on socket {}",
socket.display()
),
},
DaemonError::SignalSetup { source } => DaemonError::WorkspaceBuildFailed {
root: Path::new("<unknown>").to_path_buf(),
reason: format!("failed to install signal handlers: {source}"),
},
other @ (DaemonError::Config { .. } | DaemonError::Io(_)) => {
DaemonError::WorkspaceBuildFailed {
root: Path::new("<unknown>").to_path_buf(),
reason: other.to_string(),
}
}
}
}
#[must_use = "RebuildReservation must either be consumed by publish_and_retain() \
or intentionally dropped to return its bytes to the admission pool"]
pub struct RebuildReservation {
manager: Weak<WorkspaceManager>,
bytes: u64,
released: bool,
}
impl RebuildReservation {
#[must_use]
pub fn bytes(&self) -> u64 {
self.bytes
}
}
impl std::fmt::Debug for RebuildReservation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RebuildReservation")
.field("bytes", &self.bytes)
.field("released", &self.released)
.finish()
}
}
impl Drop for RebuildReservation {
fn drop(&mut self) {
if self.released {
return;
}
if let Some(mgr) = self.manager.upgrade() {
let mut state = mgr.admission.lock();
state.reserved_bytes = state.reserved_bytes.saturating_sub(self.bytes);
}
}
}
pub(crate) struct RollbackGuard<'a> {
pub(crate) ws: &'a LoadedWorkspace,
pub(crate) prior_arc: Option<Arc<CodeGraph>>,
pub(crate) prior_bytes: usize,
pub(crate) armed: bool,
}
impl<'a> Drop for RollbackGuard<'a> {
fn drop(&mut self) {
if !self.armed {
return;
}
if let Some(arc) = self.prior_arc.take() {
self.ws.graph.store(arc);
}
self.ws
.memory_bytes
.store(self.prior_bytes, std::sync::atomic::Ordering::Release);
}
}
async fn retention_reaper(mgr: Weak<WorkspaceManager>) {
let interval = Duration::from_millis(25);
loop {
tokio::time::sleep(interval).await;
let Some(mgr) = mgr.upgrade() else {
return;
};
mgr.reap_once();
}
}
#[cfg(test)]
mod tests {
use std::{path::PathBuf, sync::atomic::Ordering};
use sqry_core::project::ProjectRootMode;
use crate::config::DaemonConfig;
use super::{
super::{loaded::LoadedWorkspace, state::WorkspaceKey},
*,
};
fn make_config() -> Arc<DaemonConfig> {
Arc::new(DaemonConfig {
memory_limit_mb: 1,
..DaemonConfig::default()
})
}
fn make_workspace() -> Arc<LoadedWorkspace> {
Arc::new(LoadedWorkspace::new(
WorkspaceKey::new(
PathBuf::from("/repos/example"),
ProjectRootMode::GitRoot,
0x1,
),
false,
))
}
fn register_workspace(mgr: &WorkspaceManager, key: &WorkspaceKey) {
mgr.workspaces.write().insert(
key.clone(),
Arc::new(LoadedWorkspace::new(key.clone(), false)),
);
}
#[test]
fn reserve_rebuild_succeeds_when_headroom_available() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = WorkspaceKey::new(
PathBuf::from("/repos/example"),
ProjectRootMode::GitRoot,
0x1,
);
register_workspace(&mgr, &key);
let reservation = mgr
.reserve_rebuild(&key, 500_000) .expect("reservation fits");
assert_eq!(reservation.bytes(), 500_000);
assert_eq!(mgr.admission.lock().reserved_bytes, 500_000);
drop(reservation);
assert_eq!(
mgr.admission.lock().reserved_bytes,
0,
"dropping an unconsumed reservation must return its bytes",
);
}
#[test]
fn reserve_rebuild_rejects_oversized_request() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = WorkspaceKey::new(
PathBuf::from("/repos/example"),
ProjectRootMode::GitRoot,
0x1,
);
register_workspace(&mgr, &key);
let err = mgr.reserve_rebuild(&key, 10 * 1024 * 1024).expect_err(
"a reservation bigger than the budget must be rejected with MemoryBudgetExceeded",
);
match err {
DaemonError::MemoryBudgetExceeded {
limit_bytes,
requested_bytes,
..
} => {
assert_eq!(limit_bytes, 1024 * 1024);
assert_eq!(requested_bytes, 10 * 1024 * 1024);
}
other => panic!("wrong error variant: {other:?}"),
}
assert_eq!(
mgr.admission.lock().reserved_bytes,
0,
"a rejected reservation must not mutate admission state",
);
}
#[test]
fn reserve_rebuild_rejects_when_running_total_would_exceed_budget() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = WorkspaceKey::new(
PathBuf::from("/repos/example"),
ProjectRootMode::GitRoot,
0x1,
);
register_workspace(&mgr, &key);
let a = mgr.reserve_rebuild(&key, 600_000).expect("first fits");
let err = mgr
.reserve_rebuild(&key, 600_000)
.expect_err("second pushes over 1 MiB budget");
match err {
DaemonError::MemoryBudgetExceeded { reserved_bytes, .. } => {
assert_eq!(reserved_bytes, 600_000, "first reservation still held");
}
other => panic!("wrong error variant: {other:?}"),
}
drop(a);
}
#[test]
fn reserve_rebuild_rejects_unknown_key() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = WorkspaceKey::new(
PathBuf::from("/repos/never-registered"),
ProjectRootMode::GitRoot,
0xDEAD,
);
let err = mgr
.reserve_rebuild(&key, 100_000)
.expect_err("unknown key must surface WorkspaceEvicted");
match err {
DaemonError::WorkspaceEvicted { root } => {
assert_eq!(root, PathBuf::from("/repos/never-registered"));
}
other => panic!("wrong error variant: {other:?}"),
}
assert_eq!(
mgr.admission.lock().reserved_bytes,
0,
"a rejected reservation must not mutate admission state",
);
}
#[test]
fn reserve_rebuild_rejects_cancelled_workspace() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = WorkspaceKey::new(
PathBuf::from("/repos/cancelled"),
ProjectRootMode::GitRoot,
0xCAFE,
);
let ws = Arc::new(LoadedWorkspace::new(key.clone(), false));
ws.rebuild_cancelled.store(true, Ordering::Release);
mgr.workspaces.write().insert(key.clone(), ws);
let err = mgr
.reserve_rebuild(&key, 100_000)
.expect_err("cancelled workspace must surface WorkspaceEvicted");
match err {
DaemonError::WorkspaceEvicted { root } => {
assert_eq!(root, PathBuf::from("/repos/cancelled"));
}
other => panic!("wrong error variant: {other:?}"),
}
}
#[test]
fn publish_and_retain_moves_bytes_and_retains_old_arc() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws = make_workspace();
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let reservation = mgr.reserve_rebuild(&ws.key, 100_000).expect("reserve fits");
ws.memory_bytes.store(50_000, Ordering::Release);
mgr.admission.lock().loaded_bytes = 50_000;
let new_graph = CodeGraph::new();
let new_bytes = new_graph.heap_bytes() as u64;
let (token, _published_arc) = mgr.publish_and_retain(reservation, &ws, new_graph);
let state = mgr.admission.lock();
assert_eq!(
state.reserved_bytes, 0,
"reservation bytes must drain on publish"
);
assert_eq!(
state.loaded_bytes, new_bytes,
"loaded_bytes = prior(50k) - prior(50k) + new(heap_bytes())",
);
assert_eq!(state.retained_old.len(), 1, "exactly one retained entry");
let retained = state.retained_old.get(&token).expect("token present");
assert_eq!(
retained.bytes, 50_000,
"retained bytes is the prior workspace memory_bytes",
);
assert_eq!(
Arc::strong_count(&retained.graph),
1,
"admission map is the sole holder of the old Arc after publish",
);
}
#[test]
fn rollback_guard_restores_workspace_on_panic_path() {
let ws = make_workspace();
let old_graph = Arc::new(CodeGraph::new());
ws.graph.store(Arc::clone(&old_graph));
ws.memory_bytes.store(10_000, Ordering::Release);
{
let mut guard = RollbackGuard {
ws: &ws,
prior_arc: Some(Arc::clone(&old_graph)),
prior_bytes: 10_000,
armed: true,
};
let stomped = Arc::new(CodeGraph::new());
ws.graph.store(Arc::clone(&stomped));
ws.memory_bytes.store(99_999, Ordering::Release);
let _ = &mut guard;
}
let restored = ws.graph.load_full();
assert!(Arc::ptr_eq(&restored, &old_graph));
assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 10_000);
}
#[test]
fn rollback_guard_disarmed_is_noop() {
let ws = make_workspace();
let old_graph = Arc::new(CodeGraph::new());
ws.graph.store(Arc::clone(&old_graph));
ws.memory_bytes.store(10_000, Ordering::Release);
{
let mut guard = RollbackGuard {
ws: &ws,
prior_arc: Some(Arc::clone(&old_graph)),
prior_bytes: 10_000,
armed: true,
};
let stomped = Arc::new(CodeGraph::new());
ws.graph.store(Arc::clone(&stomped));
ws.memory_bytes.store(99_999, Ordering::Release);
guard.armed = false;
}
assert_eq!(ws.memory_bytes.load(Ordering::Acquire), 99_999);
}
#[test]
fn reap_once_drops_last_holder_entries() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws = make_workspace();
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let reservation = mgr
.reserve_rebuild(&ws.key, 0)
.expect("zero-size reservation always fits");
mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
assert_eq!(mgr.admission.lock().retained_old.len(), 1);
mgr.reap_once();
assert_eq!(
mgr.admission.lock().retained_old.len(),
0,
"reaper must free entries whose strong_count == 1",
);
}
#[test]
fn reap_once_retains_entries_with_outstanding_holders() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws = make_workspace();
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let reservation = mgr
.reserve_rebuild(&ws.key, 0)
.expect("zero-size reservation always fits");
mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
let held = {
let state = mgr.admission.lock();
let token = *state.retained_old.keys().next().expect("one entry");
Arc::clone(&state.retained_old.get(&token).unwrap().graph)
};
assert_eq!(Arc::strong_count(&held), 2);
mgr.reap_once();
assert_eq!(
mgr.admission.lock().retained_old.len(),
1,
"reaper must not drop entries that slow queries still hold",
);
drop(held);
mgr.reap_once();
assert_eq!(
mgr.admission.lock().retained_old.len(),
0,
"reaper frees the entry once the last slow query releases",
);
}
#[test]
fn unconsumed_reservation_refunds_reserved_bytes_on_drop() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws = make_workspace();
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let reservation = mgr
.reserve_rebuild(&ws.key, 250_000)
.expect("reservation fits");
assert_eq!(mgr.admission.lock().reserved_bytes, 250_000);
drop(reservation);
assert_eq!(
mgr.admission.lock().reserved_bytes,
0,
"unconsumed reservation must refund reserved_bytes on drop \
(Codex Task 6 Phase 6a iter-1 MAJOR regression)",
);
}
#[test]
fn publish_and_retain_leaves_reservation_fully_disarmed_on_success() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws = make_workspace();
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let reservation = mgr
.reserve_rebuild(&ws.key, 100_000)
.expect("reservation fits");
let admission_before = mgr.admission.lock().reserved_bytes;
assert_eq!(admission_before, 100_000);
let (_token, _published_arc) = mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
let admission_after = mgr.admission.lock().reserved_bytes;
assert_eq!(
admission_after, 0,
"publish must drain reserved_bytes exactly once, not double-drain or leak",
);
let again = mgr
.reserve_rebuild(&ws.key, 100_000)
.expect("post-publish admission must still admit a same-size reservation");
drop(again);
assert_eq!(mgr.admission.lock().reserved_bytes, 0);
}
#[test]
fn unwind_after_swap_before_admission_commit_restores_full_state() {
use std::panic::{AssertUnwindSafe, catch_unwind};
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws = Arc::new(LoadedWorkspace::new(
WorkspaceKey::new(
PathBuf::from("/repos/example"),
ProjectRootMode::GitRoot,
0x1,
),
false,
));
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let prior_bytes_usize = 50_000usize;
ws.memory_bytes.store(prior_bytes_usize, Ordering::Release);
mgr.admission.lock().loaded_bytes = 50_000;
let prior_arc = ws.graph.load_full();
let reservation = mgr
.reserve_rebuild(&ws.key, 100_000)
.expect("reservation fits");
assert_eq!(mgr.admission.lock().reserved_bytes, 100_000);
let outcome = catch_unwind(AssertUnwindSafe(|| {
let new_arc = Arc::new(CodeGraph::new());
let prior_arc_clone = ws.graph.load_full();
let _rollback = RollbackGuard {
ws: &ws,
prior_arc: Some(prior_arc_clone),
prior_bytes: prior_bytes_usize,
armed: true,
};
let _old_arc = ws.graph.swap(new_arc);
let _prev = ws.update_memory(99_999);
let _hold = reservation;
panic!("simulated panic inside publish_and_retain");
}));
assert!(outcome.is_err(), "catch_unwind must observe the panic");
let restored = ws.graph.load_full();
assert!(
Arc::ptr_eq(&restored, &prior_arc),
"RollbackGuard must restore ws.graph to the prior Arc after unwind",
);
assert_eq!(
ws.memory_bytes.load(Ordering::Acquire),
prior_bytes_usize,
"RollbackGuard must restore ws.memory_bytes after unwind",
);
let state = mgr.admission.lock();
assert_eq!(
state.reserved_bytes, 0,
"reservation refund must return reserved_bytes to pre-call value (0)",
);
assert_eq!(
state.loaded_bytes, 50_000,
"loaded_bytes must not be mutated when admission commit is never entered",
);
assert_eq!(
state.retained_old.len(),
0,
"retained_old must be empty when admission commit is never entered",
);
}
fn make_key_at(path: &str, fingerprint: u64) -> WorkspaceKey {
WorkspaceKey::new(PathBuf::from(path), ProjectRootMode::GitRoot, fingerprint)
}
#[test]
fn get_or_load_builds_on_miss_and_caches() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = make_key_at("/repos/example", 0x1);
let builder = super::super::builder::EmptyGraphBuilder;
let g1 = mgr
.get_or_load(&key, &builder, 1_000)
.expect("first load succeeds");
let g2 = mgr
.get_or_load(&key, &builder, 1_000)
.expect("second load hits cache");
assert!(
Arc::ptr_eq(&g1, &g2),
"cache hit must return the same Arc as the initial build",
);
}
#[test]
fn get_or_load_surfaces_builder_failures_and_sets_failed_state() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = make_key_at("/repos/example", 0x1);
let failing = super::super::builder::FailingGraphBuilder::new("simulated plugin panic");
let err = mgr
.get_or_load(&key, &failing, 1_000)
.expect_err("builder failure must bubble up");
match err {
DaemonError::WorkspaceBuildFailed { reason, .. } => {
assert_eq!(reason, "simulated plugin panic");
}
other => panic!("wrong variant: {other:?}"),
}
let workspaces = mgr.workspaces.read();
let ws = workspaces.get(&key).expect("workspace registered");
assert_eq!(ws.load_state(), WorkspaceState::Failed);
assert_eq!(ws.retry_count.load(Ordering::Acquire), 1);
assert!(ws.last_error.read().is_some());
drop(workspaces);
assert_eq!(mgr.admission.lock().reserved_bytes, 0);
}
#[test]
fn evict_lru_picks_oldest_non_pinned_workspace() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let builder = super::super::builder::EmptyGraphBuilder;
let a = make_key_at("/repos/a", 0x1);
let b = make_key_at("/repos/b", 0x1);
mgr.get_or_load(&a, &builder, 100_000).unwrap();
std::thread::sleep(Duration::from_millis(5));
mgr.get_or_load(&b, &builder, 100_000).unwrap();
let victim = mgr.evict_lru().expect("one candidate");
assert_eq!(victim, a, "oldest workspace must be evicted first");
assert!(
!mgr.workspaces.read().contains_key(&a),
"evicted workspace must be removed from the manager map",
);
assert!(
mgr.workspaces.read().contains_key(&b),
"non-victim workspace must remain",
);
}
#[test]
fn evict_lru_returns_none_when_no_candidates() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
assert!(
mgr.evict_lru().is_none(),
"empty manager has no eviction candidate",
);
}
#[test]
fn evict_lru_skips_pinned_workspaces() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let builder = super::super::builder::EmptyGraphBuilder;
let pinned_key = make_key_at("/repos/pinned", 0x1);
{
let mut ws_map = mgr.workspaces.write();
ws_map.insert(
pinned_key.clone(),
Arc::new(LoadedWorkspace::new(
pinned_key.clone(),
true,
)),
);
}
{
let ws = mgr.workspaces.read().get(&pinned_key).unwrap().clone();
ws.store_state(WorkspaceState::Loaded);
ws.touch();
}
let other = make_key_at("/repos/other", 0x1);
mgr.get_or_load(&other, &builder, 100_000).unwrap();
let victim = mgr.evict_lru().expect("one candidate");
assert_eq!(victim, other);
assert!(mgr.workspaces.read().contains_key(&pinned_key));
}
#[test]
fn unload_removes_workspace_and_reclaims_bytes() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let builder = super::super::builder::EmptyGraphBuilder;
let key = make_key_at("/repos/example", 0x1);
mgr.get_or_load(&key, &builder, 100_000).unwrap();
assert!(mgr.workspaces.read().contains_key(&key));
assert!(mgr.unload(&key), "unload must report present");
assert!(!mgr.workspaces.read().contains_key(&key));
assert!(!mgr.unload(&key), "unload on missing key returns false");
}
#[test]
fn status_reflects_loaded_workspaces_and_memory() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let builder = super::super::builder::EmptyGraphBuilder;
let key = make_key_at("/repos/example", 0x1);
mgr.get_or_load(&key, &builder, 100_000).unwrap();
let status = mgr.status();
assert_eq!(status.daemon_version, env!("CARGO_PKG_VERSION"));
assert_eq!(status.workspaces.len(), 1);
assert_eq!(
status.workspaces[0].index_root,
PathBuf::from("/repos/example")
);
assert_eq!(status.workspaces[0].state, WorkspaceState::Loaded);
assert!(!status.workspaces[0].pinned);
assert_eq!(status.memory.limit_bytes, 1024 * 1024);
assert!(
status.memory.high_water_bytes >= status.memory.current_bytes,
"high_water_bytes must be monotonic wrt current_bytes",
);
}
#[test]
fn reserve_rebuild_triggers_eviction_when_budget_tight() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let victim_key = make_key_at("/repos/victim", 0x1);
let victim = Arc::new(LoadedWorkspace::new(victim_key.clone(), false));
victim.memory_bytes.store(700_000, Ordering::Release);
victim.store_state(WorkspaceState::Loaded);
victim.touch();
mgr.workspaces
.write()
.insert(victim_key.clone(), Arc::clone(&victim));
mgr.admission.lock().loaded_bytes = 700_000;
let new_key = make_key_at("/repos/new", 0x1);
mgr.workspaces.write().insert(
new_key.clone(),
Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
);
let reservation = mgr
.reserve_rebuild(&new_key, 600_000)
.expect("Phase 2 eviction must free headroom");
assert!(!mgr.workspaces.read().contains_key(&victim_key));
assert_eq!(mgr.admission.lock().reserved_bytes, 600_000);
drop(reservation);
}
#[test]
fn reserve_rebuild_rejects_when_only_pinned_workspaces_remain() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let pinned_key = make_key_at("/repos/pinned", 0x1);
let pinned = Arc::new(LoadedWorkspace::new(
pinned_key.clone(),
true,
));
pinned.memory_bytes.store(900_000, Ordering::Release);
pinned.store_state(WorkspaceState::Loaded);
mgr.workspaces
.write()
.insert(pinned_key.clone(), Arc::clone(&pinned));
mgr.admission.lock().loaded_bytes = 900_000;
let new_key = make_key_at("/repos/new", 0x1);
mgr.workspaces.write().insert(
new_key.clone(),
Arc::new(LoadedWorkspace::new(new_key.clone(), false)),
);
let err = mgr
.reserve_rebuild(&new_key, 600_000)
.expect_err("pinned workspace makes budget unfittable");
match err {
DaemonError::MemoryBudgetExceeded {
requested_bytes,
current_bytes,
..
} => {
assert_eq!(requested_bytes, 600_000);
assert_eq!(
current_bytes, 900_000,
"pinned workspace bytes still count after Phase 2",
);
}
other => panic!("wrong variant: {other:?}"),
}
assert!(mgr.workspaces.read().contains_key(&pinned_key));
}
#[test]
fn execute_eviction_routes_bytes_through_retained_old() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let ws_key = make_key_at("/repos/example", 0x1);
let ws = Arc::new(LoadedWorkspace::new(ws_key.clone(), false));
ws.memory_bytes.store(300_000, Ordering::Release);
ws.store_state(WorkspaceState::Loaded);
mgr.workspaces
.write()
.insert(ws_key.clone(), Arc::clone(&ws));
mgr.admission.lock().loaded_bytes = 300_000;
let slow_query_arc = ws.graph.load_full();
mgr.execute_eviction(&ws_key);
let state = mgr.admission.lock();
assert_eq!(
state.loaded_bytes, 0,
"evicted workspace bytes must leave the loaded tier",
);
assert_eq!(
state.retained_total_bytes(),
300_000,
"evicted workspace bytes must enter the retained tier",
);
assert_eq!(state.retained_old.len(), 1);
drop(state);
mgr.reap_once();
assert_eq!(mgr.admission.lock().retained_total_bytes(), 300_000);
drop(slow_query_arc);
mgr.reap_once();
assert_eq!(
mgr.admission.lock().retained_total_bytes(),
0,
"reaper must free retained entry once slow query releases",
);
}
#[test]
fn get_or_load_state_cas_rejects_concurrent_load() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = make_key_at("/repos/example", 0x1);
let ws = mgr.get_or_insert_workspace(&key);
ws.store_state(WorkspaceState::Loading);
let builder = super::super::builder::EmptyGraphBuilder;
let err = mgr
.get_or_load(&key, &builder, 1_000)
.expect_err("concurrent load must be rejected");
match err {
DaemonError::WorkspaceBuildFailed { reason, .. } => {
assert!(
reason.contains("already in progress"),
"unexpected reason: {reason}",
);
}
other => panic!("wrong variant: {other:?}"),
}
assert_eq!(mgr.admission.lock().reserved_bytes, 0);
}
#[test]
fn get_or_load_detects_cancellation_between_cas_and_publish() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = make_key_at("/repos/example", 0x1);
let ws = mgr.get_or_insert_workspace(&key);
ws.rebuild_cancelled.store(true, Ordering::Release);
ws.store_state(WorkspaceState::Unloaded);
let builder = super::super::builder::EmptyGraphBuilder;
let err = mgr
.get_or_load(&key, &builder, 1_000)
.expect_err("pre-CAS cancellation must be honoured");
match err {
DaemonError::WorkspaceBuildFailed { reason, .. } => {
assert!(
reason.contains("evicted mid-load"),
"unexpected reason: {reason}",
);
}
other => panic!("wrong variant: {other:?}"),
}
assert!(ws.rebuild_cancelled.load(Ordering::Acquire));
assert_eq!(ws.load_state(), WorkspaceState::Failed);
}
#[test]
fn get_or_load_loading_guard_recovers_from_builder_panic() {
use std::panic::{AssertUnwindSafe, catch_unwind};
#[derive(Debug)]
struct PanickingBuilder;
impl WorkspaceBuilder for PanickingBuilder {
fn build(&self, _root: &Path) -> Result<CodeGraph, DaemonError> {
panic!("simulated builder panic");
}
}
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = make_key_at("/repos/example", 0x1);
let builder = PanickingBuilder;
let outcome = catch_unwind(AssertUnwindSafe(|| {
let _ = mgr.get_or_load(&key, &builder, 1_000);
}));
assert!(outcome.is_err(), "panic must propagate through get_or_load");
let workspaces = mgr.workspaces.read();
let ws = workspaces.get(&key).expect("workspace still registered");
assert_eq!(
ws.load_state(),
WorkspaceState::Failed,
"LoadingGuard must transition Loading → Failed on unwind",
);
assert!(
ws.last_error.read().is_some(),
"LoadingGuard must populate last_error on unwind",
);
assert!(
ws.retry_count.load(Ordering::Acquire) >= 1,
"LoadingGuard must increment retry_count",
);
drop(workspaces);
assert_eq!(mgr.admission.lock().reserved_bytes, 0);
}
#[test]
fn concurrent_load_and_evict_never_publishes_into_evicted_workspace() {
use std::sync::Barrier;
use std::thread;
const ITERATIONS: usize = 64;
for iter in 0..ITERATIONS {
let mgr = WorkspaceManager::new_without_reaper(Arc::new(DaemonConfig {
memory_limit_mb: 64,
..DaemonConfig::default()
}));
let key = make_key_at("/repos/example", iter as u64);
let builder = Arc::new(super::super::builder::EmptyGraphBuilder);
let start = Arc::new(Barrier::new(2));
let mgr_clone = Arc::clone(&mgr);
let key_clone = key.clone();
let builder_clone = Arc::clone(&builder);
let start_load = Arc::clone(&start);
let loader = thread::spawn(move || {
start_load.wait();
let _ = mgr_clone.get_or_load(&key_clone, &*builder_clone, 100_000);
});
let mgr_clone = Arc::clone(&mgr);
let key_clone = key.clone();
let start_evict = Arc::clone(&start);
let evictor = thread::spawn(move || {
start_evict.wait();
mgr_clone.unload(&key_clone);
});
loader.join().expect("loader panicked");
evictor.join().expect("evictor panicked");
let workspaces = mgr.workspaces.read();
if let Some(ws) = workspaces.get(&key) {
assert_eq!(
ws.load_state(),
WorkspaceState::Loaded,
"iter {iter}: workspace in map must be Loaded, not {}",
ws.load_state(),
);
}
drop(workspaces);
let state = mgr.admission.lock();
assert_eq!(
state.reserved_bytes, 0,
"iter {iter}: no reservations should leak after the race"
);
assert!(
state.total_committed_bytes() <= mgr.memory_limit_bytes(),
"iter {iter}: total_committed {} over budget {}",
state.total_committed_bytes(),
mgr.memory_limit_bytes(),
);
}
}
#[test]
fn publish_fires_installed_hook() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let hook = super::super::hook::RecordingHook::new();
mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
let key = make_key_at("/repos/example", 0x1);
let builder = super::super::builder::EmptyGraphBuilder;
mgr.get_or_load(&key, &builder, 0)
.expect("load on empty builder succeeds");
assert_eq!(
hook.invocation_count(),
1,
"hook must fire exactly once per publish",
);
assert_eq!(
hook.invocation_roots(),
vec![key.index_root.clone()],
"hook must receive the workspace's index_root",
);
}
#[test]
fn set_hook_replaces_prior_hook_for_subsequent_publishes() {
let mgr = WorkspaceManager::new_without_reaper(make_config());
let hook_a = super::super::hook::RecordingHook::new();
let hook_b = super::super::hook::RecordingHook::new();
let builder = super::super::builder::EmptyGraphBuilder;
let key = make_key_at("/repos/example", 0x1);
mgr.set_hook(Arc::clone(&hook_a) as super::super::hook::SharedHook);
mgr.get_or_load(&key, &builder, 0)
.expect("first load with hook A");
mgr.unload(&key);
mgr.set_hook(Arc::clone(&hook_b) as super::super::hook::SharedHook);
mgr.get_or_load(&key, &builder, 0)
.expect("second load with hook B");
assert_eq!(hook_a.invocation_count(), 1);
assert_eq!(hook_b.invocation_count(), 1);
}
#[test]
fn hook_can_call_manager_unload_without_deadlock() {
use std::{sync::Weak, thread, time::Duration};
#[derive(Debug)]
struct UnloadingHook {
manager: Weak<WorkspaceManager>,
key: WorkspaceKey,
}
impl super::super::hook::SqrydHook for UnloadingHook {
fn on_publish(&self, _workspace_root: &Path, _graph: Arc<CodeGraph>) {
if let Some(mgr) = self.manager.upgrade() {
let _present = mgr.unload(&self.key);
}
}
}
let mgr = WorkspaceManager::new_without_reaper(make_config());
let key = make_key_at("/repos/example", 0x1);
let builder = super::super::builder::EmptyGraphBuilder;
let hook = Arc::new(UnloadingHook {
manager: Arc::downgrade(&mgr),
key: key.clone(),
});
mgr.set_hook(Arc::clone(&hook) as super::super::hook::SharedHook);
let mgr_for_thread = Arc::clone(&mgr);
let key_for_thread = key.clone();
let builder_for_thread = builder;
let handle = thread::spawn(move || {
mgr_for_thread
.get_or_load(&key_for_thread, &builder_for_thread, 0)
.expect("load succeeds even with re-entrant hook");
});
let deadline = std::time::Instant::now() + Duration::from_secs(10);
while !handle.is_finished() {
if std::time::Instant::now() > deadline {
panic!(
"get_or_load deadlocked while firing hook \
(Codex Task 6 Phase 6c iter-2 regression: \
hook must dispatch outside workspaces.read())",
);
}
thread::sleep(Duration::from_millis(20));
}
handle
.join()
.expect("loader thread completed without panic");
assert!(
!mgr.workspaces.read().contains_key(&key),
"hook's re-entrant unload must have removed the workspace",
);
}
#[tokio::test]
async fn retention_reaper_task_eventually_drops_free_entries() {
let mgr = WorkspaceManager::new(make_config());
let ws = make_workspace();
mgr.workspaces
.write()
.insert(ws.key.clone(), Arc::clone(&ws));
let reservation = mgr
.reserve_rebuild(&ws.key, 0)
.expect("zero-size reservation always fits");
mgr.publish_and_retain(reservation, &ws, CodeGraph::new());
assert_eq!(mgr.admission.lock().retained_old.len(), 1);
for _ in 0..20 {
tokio::time::sleep(Duration::from_millis(10)).await;
if mgr.admission.lock().retained_old.is_empty() {
return;
}
}
panic!("reaper task never freed the entry within 200 ms");
}
}