use std::{
collections::HashMap,
path::PathBuf,
sync::{
Arc, OnceLock,
atomic::{AtomicBool, AtomicU8, AtomicU64, AtomicUsize, Ordering},
},
time::{Duration, Instant, SystemTime},
};
use sqry_core::graph::{
CodeGraph, GraphBuilderError,
unified::{
build::{
BuildConfig, CancellationToken, build_unified_graph_cancellable,
compute_reverse_dep_closure, incremental_rebuild,
},
memory::GraphMemorySize,
},
};
use sqry_core::plugin::PluginManager;
use sqry_core::watch::{ChangeSet, GitChangeClass, LastIndexedGitState, SourceTreeWatcher};
use tokio::task::JoinHandle;
use crate::{
config::{DaemonConfig, ESTIMATE_FINAL_PER_FILE_BYTES, ESTIMATE_STAGING_PER_FILE_BYTES},
error::DaemonError,
workspace::{
LoadedWorkspace, PendingRebuild, WorkingSetInputs, WorkspaceKey, WorkspaceManager,
WorkspaceState, clone_err, working_set_estimate,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RebuildMode {
Full,
Incremental,
}
impl RebuildMode {
const fn as_u8(self) -> u8 {
match self {
Self::Full => 1,
Self::Incremental => 2,
}
}
const fn from_u8(raw: u8) -> Option<Self> {
match raw {
1 => Some(Self::Full),
2 => Some(Self::Incremental),
_ => None,
}
}
}
impl PendingRebuild {
#[must_use]
pub fn coalesce_with(self, later: PendingRebuild) -> PendingRebuild {
let mut file_set: std::collections::BTreeSet<PathBuf> =
self.changes.changed_files.into_iter().collect();
file_set.extend(later.changes.changed_files);
let changed_files: Vec<PathBuf> = file_set.into_iter().collect();
let git_state_changed = self.changes.git_state_changed || later.changes.git_state_changed;
let git_change_class = merge_git_class(
self.changes.git_change_class,
later.changes.git_change_class,
);
let enqueued_at = self.enqueued_at.max(later.enqueued_at);
let git_state_at_enqueue = later.git_state_at_enqueue.or(self.git_state_at_enqueue);
PendingRebuild {
changes: ChangeSet {
changed_files,
git_state_changed,
git_change_class,
},
enqueued_at,
git_state_at_enqueue,
}
}
}
fn merge_git_class(a: Option<GitChangeClass>, b: Option<GitChangeClass>) -> Option<GitChangeClass> {
let requires_full = a.is_some_and(GitChangeClass::requires_full_rebuild)
|| b.is_some_and(GitChangeClass::requires_full_rebuild);
if requires_full {
return Some(GitChangeClass::TreeDiverged);
}
b.or(a)
}
#[must_use]
pub fn decide_mode(config: &DaemonConfig, changes: &ChangeSet, graph: &CodeGraph) -> RebuildMode {
if changes.is_empty() {
return RebuildMode::Incremental;
}
if changes.requires_full_rebuild() {
return RebuildMode::Full;
}
if changes.changed_files.len() > config.incremental_threshold {
return RebuildMode::Full;
}
let file_ids: Vec<_> = changes
.changed_files
.iter()
.filter_map(|p| graph.files().get(p))
.collect();
let closure = compute_reverse_dep_closure(&file_ids, graph);
let file_count = graph.files().len();
let limit = file_count.saturating_mul(config.closure_limit_percent as usize) / 100;
if closure.len() > limit {
RebuildMode::Full
} else {
RebuildMode::Incremental
}
}
fn compute_working_set_estimate(prior: &CodeGraph, changes: &ChangeSet, mode: RebuildMode) -> u64 {
let prior_bytes = prior.heap_bytes() as u64;
let interner_bytes = prior.strings().heap_bytes() as u64;
let file_count = prior.files().len() as u64;
let (final_estimate, staging_file_count) = match mode {
RebuildMode::Full => (prior_bytes, file_count),
RebuildMode::Incremental => {
let n = changes.changed_files.len() as u64;
let final_est =
prior_bytes.saturating_add(n.saturating_mul(ESTIMATE_FINAL_PER_FILE_BYTES));
(final_est, n)
}
};
let staging = staging_file_count.saturating_mul(ESTIMATE_STAGING_PER_FILE_BYTES);
working_set_estimate(WorkingSetInputs {
new_graph_final_estimate: final_estimate,
staging_overhead: staging,
interner_snapshot_bytes: interner_bytes,
})
}
#[doc(hidden)]
#[derive(Debug)]
pub struct TestGate {
pub hold: AtomicUsize,
pub release: tokio::sync::Notify,
}
#[doc(hidden)]
#[derive(Debug, Default)]
pub struct TestCapture {
pub iterations: parking_lot::Mutex<Vec<CapturedIteration>>,
pub post_reservation_hold: AtomicUsize,
pub post_reservation_reached: tokio::sync::Notify,
pub post_reservation_release: tokio::sync::Notify,
pub publish_path_evictions: AtomicUsize,
pub pass_boundary_cancellations: AtomicUsize,
pub suppress_forwarder: AtomicBool,
pub precancel_token_for_pass_boundary: AtomicBool,
pub post_reservation_reached_flag: AtomicBool,
}
impl TestCapture {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn publish_path_evictions(&self) -> usize {
self.publish_path_evictions.load(Ordering::Acquire)
}
#[must_use]
pub fn pass_boundary_cancellations(&self) -> usize {
self.pass_boundary_cancellations.load(Ordering::Acquire)
}
pub fn arm_post_reservation_hold(&self) {
self.post_reservation_hold.fetch_add(1, Ordering::AcqRel);
}
pub fn release_post_reservation(&self) {
self.post_reservation_release.notify_one();
}
pub async fn wait_until_post_reservation(&self) {
if self.post_reservation_reached_flag.load(Ordering::Acquire) {
return;
}
let notified = self.post_reservation_reached.notified();
if self.post_reservation_reached_flag.load(Ordering::Acquire) {
return;
}
notified.await;
}
pub fn reset_post_reservation_reached(&self) {
self.post_reservation_reached_flag
.store(false, Ordering::Release);
}
}
#[doc(hidden)]
#[derive(Debug, Clone)]
pub struct CapturedIteration {
pub changeset: ChangeSet,
pub mode: RebuildMode,
pub git_state_at_enqueue: Option<LastIndexedGitState>,
pub started_at: Instant,
}
struct WatcherEntry {
generation: u64,
live: Arc<AtomicBool>,
#[allow(dead_code)]
async_handle: JoinHandle<()>,
#[allow(dead_code)]
blocking_handle: JoinHandle<()>,
}
pub struct RebuildDispatcher {
manager: Arc<WorkspaceManager>,
config: Arc<DaemonConfig>,
plugins: Arc<PluginManager>,
build_config: BuildConfig,
dispatched_count: AtomicU64,
last_mode: AtomicU8,
watchers: parking_lot::Mutex<HashMap<WorkspaceKey, WatcherEntry>>,
next_watcher_generation: AtomicU64,
#[doc(hidden)]
test_gate: OnceLock<Arc<TestGate>>,
#[doc(hidden)]
test_capture: OnceLock<Arc<TestCapture>>,
}
impl std::fmt::Debug for RebuildDispatcher {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RebuildDispatcher")
.field(
"dispatched_count",
&self.dispatched_count.load(Ordering::Relaxed),
)
.field("last_mode", &self.last_mode())
.field("memory_limit_mb", &self.config.memory_limit_mb)
.finish_non_exhaustive()
}
}
impl RebuildDispatcher {
#[must_use]
pub fn new(
manager: Arc<WorkspaceManager>,
config: Arc<DaemonConfig>,
plugins: Arc<PluginManager>,
) -> Arc<Self> {
Arc::new(Self {
manager,
config,
plugins,
build_config: BuildConfig::default(),
dispatched_count: AtomicU64::new(0),
last_mode: AtomicU8::new(0),
watchers: parking_lot::Mutex::new(HashMap::new()),
next_watcher_generation: AtomicU64::new(0),
test_gate: OnceLock::new(),
test_capture: OnceLock::new(),
})
}
#[doc(hidden)]
pub fn install_test_gate(&self, gate: Arc<TestGate>) -> Result<(), Arc<TestGate>> {
self.test_gate.set(gate)
}
#[doc(hidden)]
pub fn install_test_capture(&self, capture: Arc<TestCapture>) -> Result<(), Arc<TestCapture>> {
self.test_capture.set(capture)
}
async fn gate_check(&self) {
let Some(gate) = self.test_gate.get() else {
return;
};
if gate.hold.load(Ordering::Acquire) == 0 {
return;
}
let notified = gate.release.notified();
tokio::pin!(notified);
if gate.hold.load(Ordering::Acquire) > 0 {
notified.await;
gate.hold.fetch_sub(1, Ordering::AcqRel);
}
}
#[must_use]
pub fn dispatched_count(&self) -> u64 {
self.dispatched_count.load(Ordering::Relaxed)
}
#[must_use]
pub fn last_mode(&self) -> Option<RebuildMode> {
RebuildMode::from_u8(self.last_mode.load(Ordering::Relaxed))
}
pub async fn handle_changes(
&self,
key: &WorkspaceKey,
changes: ChangeSet,
) -> Result<(), DaemonError> {
self.handle_changes_inner(
key,
PendingRebuild {
changes,
enqueued_at: Instant::now(),
git_state_at_enqueue: None,
},
)
.await
}
pub async fn handle_changes_with_git_state(
&self,
key: &WorkspaceKey,
changes: ChangeSet,
git_state: LastIndexedGitState,
) -> Result<(), DaemonError> {
self.handle_changes_inner(
key,
PendingRebuild {
changes,
enqueued_at: Instant::now(),
git_state_at_enqueue: Some(git_state),
},
)
.await
}
async fn handle_changes_inner(
&self,
key: &WorkspaceKey,
incoming: PendingRebuild,
) -> Result<(), DaemonError> {
let ws: Arc<LoadedWorkspace> =
self.manager
.lookup(key)
.ok_or_else(|| DaemonError::WorkspaceEvicted {
root: key.source_root.clone(),
})?;
let mut current: PendingRebuild = {
let mut lane_guard = ws.rebuild_lane.lock().await;
let coalesced = match lane_guard.take() {
Some(prior) => prior.coalesce_with(incoming),
None => incoming,
};
match ws.rebuild_in_flight.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => coalesced, Err(_) => {
*lane_guard = Some(coalesced);
return Ok(());
}
}
};
let mut sentinel = DrainLoopSentinel {
ws: Arc::clone(&ws),
armed: true,
};
let mut last_result: Result<(), DaemonError>;
loop {
if ws.rebuild_cancelled.swap(false, Ordering::AcqRel) {
let mut lane_guard = ws.rebuild_lane.lock().await;
let _dropped: Option<PendingRebuild> = lane_guard.take();
ws.rebuild_in_flight.store(false, Ordering::Release);
sentinel.armed = false;
if ws.load_state() == WorkspaceState::Rebuilding {
ws.store_state(WorkspaceState::Unloaded);
}
return Err(DaemonError::WorkspaceEvicted {
root: key.source_root.clone(),
});
}
last_result = self
.execute_one_rebuild(key, &ws, current.changes, current.git_state_at_enqueue)
.await;
let next: Option<PendingRebuild> = {
let mut lane_guard = ws.rebuild_lane.lock().await;
match lane_guard.take() {
Some(next) => Some(next),
None => {
ws.rebuild_in_flight.store(false, Ordering::Release);
sentinel.armed = false;
None
}
}
};
match next {
Some(n) => current = n,
None => return last_result,
}
}
}
async fn execute_one_rebuild(
&self,
key: &WorkspaceKey,
ws: &Arc<LoadedWorkspace>,
changes: ChangeSet,
git_state_at_enqueue: Option<LastIndexedGitState>,
) -> Result<(), DaemonError> {
let prior_graph: Arc<CodeGraph> = ws.graph.load_full();
let mode = decide_mode(&self.config, &changes, &prior_graph);
self.store_last_mode(mode);
ws.store_state(WorkspaceState::Rebuilding);
if let Some(cap) = self.test_capture.get() {
cap.iterations.lock().push(CapturedIteration {
changeset: changes.clone(),
mode,
git_state_at_enqueue: git_state_at_enqueue.clone(),
started_at: Instant::now(),
});
}
self.gate_check().await;
let estimate = compute_working_set_estimate(&prior_graph, &changes, mode);
let reservation = match self.manager.reserve_rebuild(key, estimate) {
Ok(r) => r,
Err(e) => {
self.record_and_transition_on_err(ws, &e);
return Err(e);
}
};
self.post_reservation_check().await;
let new_graph: CodeGraph = match self
.execute_rebuild(key, ws, &prior_graph, mode, changes)
.await
{
Ok(g) => g,
Err(e) => {
drop(reservation);
if matches!(e, DaemonError::WorkspaceEvicted { .. })
&& let Some(cap) = self.test_capture.get()
{
cap.pass_boundary_cancellations
.fetch_add(1, Ordering::AcqRel);
}
self.record_and_transition_on_err(ws, &e);
return Err(e);
}
};
let publish_result = {
let workspaces_guard = self.manager.workspaces_read();
if ws.rebuild_cancelled.load(Ordering::Acquire) {
drop(workspaces_guard);
drop(reservation);
if let Some(cap) = self.test_capture.get() {
cap.publish_path_evictions.fetch_add(1, Ordering::AcqRel);
}
return Err(DaemonError::WorkspaceEvicted {
root: key.source_root.clone(),
});
}
if !workspaces_guard.contains_key(key) {
drop(workspaces_guard);
drop(reservation);
if let Some(cap) = self.test_capture.get() {
cap.publish_path_evictions.fetch_add(1, Ordering::AcqRel);
}
return Err(DaemonError::WorkspaceEvicted {
root: key.source_root.clone(),
});
}
let (_token, published_arc) =
match self.manager.publish_and_retain(reservation, ws, new_graph) {
Ok((token, arc)) => (token, arc),
Err(e) => {
self.record_and_transition_on_err(ws, &e);
return Err(e);
}
};
published_arc
};
if let Some(git_state) = git_state_at_enqueue {
*ws.last_indexed_git_state.write() = Some(git_state);
}
ws.record_success(SystemTime::now());
ws.store_state(WorkspaceState::Loaded);
self.dispatched_count.fetch_add(1, Ordering::Relaxed);
let _ = publish_result;
Ok(())
}
fn record_and_transition_on_err(&self, ws: &LoadedWorkspace, err: &DaemonError) {
if matches!(err, DaemonError::WorkspaceEvicted { .. }) {
if ws.load_state() == WorkspaceState::Rebuilding {
ws.store_state(WorkspaceState::Unloaded);
}
return;
}
ws.record_failure(clone_err(err));
ws.store_state(WorkspaceState::Failed);
}
async fn execute_rebuild(
&self,
key: &WorkspaceKey,
ws: &Arc<LoadedWorkspace>,
prior: &Arc<CodeGraph>,
mode: RebuildMode,
changes: ChangeSet,
) -> Result<CodeGraph, DaemonError> {
let root = key.source_root.clone();
let plugins = Arc::clone(&self.plugins);
let cfg = self.build_config.clone();
let prior_for_blocking = Arc::clone(prior);
let root_for_err = root.clone();
let token = CancellationToken::new();
let forwarder_handle = if self
.test_capture
.get()
.is_some_and(|cap| cap.suppress_forwarder.load(Ordering::Acquire))
{
None
} else {
Some(spawn_cancellation_forwarder(Arc::clone(ws), token.clone()))
};
if self.test_capture.get().is_some_and(|cap| {
cap.precancel_token_for_pass_boundary
.load(Ordering::Acquire)
}) {
token.cancel();
}
let token_for_blocking = token.clone();
let join_result = tokio::task::spawn_blocking(move || {
execute_rebuild_blocking(
&root,
&prior_for_blocking,
mode,
changes,
&plugins,
&cfg,
&token_for_blocking,
)
})
.await;
if let Some(handle) = forwarder_handle {
handle.abort();
}
match join_result {
Ok(Ok(graph)) => Ok(graph),
Ok(Err(e)) => Err(e),
Err(join_err) => Err(DaemonError::WorkspaceBuildFailed {
root: root_for_err,
reason: format!("spawn_blocking join error: {join_err}"),
}),
}
}
async fn post_reservation_check(&self) {
let Some(cap) = self.test_capture.get() else {
return;
};
cap.post_reservation_reached_flag
.store(true, Ordering::Release);
cap.post_reservation_reached.notify_waiters();
if cap.post_reservation_hold.load(Ordering::Acquire) == 0 {
return;
}
let notified = cap.post_reservation_release.notified();
if cap.post_reservation_hold.load(Ordering::Acquire) > 0 {
notified.await;
cap.post_reservation_hold.fetch_sub(1, Ordering::AcqRel);
}
}
fn store_last_mode(&self, mode: RebuildMode) {
self.last_mode.store(mode.as_u8(), Ordering::Relaxed);
}
pub async fn ensure_watching(
self: &Arc<Self>,
key: &WorkspaceKey,
ws: Arc<LoadedWorkspace>,
root: PathBuf,
) -> Result<(), DaemonError> {
let mut watchers = self.watchers.lock();
if let Some(entry) = watchers.get(key)
&& entry.live.load(Ordering::Acquire)
{
return Ok(());
}
let generation = self.next_watcher_generation.fetch_add(1, Ordering::Relaxed);
let watcher = SourceTreeWatcher::new(&root).map_err(|e| {
DaemonError::Io(std::io::Error::other(format!(
"failed to create watcher for {}: {e:#}",
root.display()
)))
})?;
{
let mut baseline = ws.last_indexed_git_state.write();
if baseline.is_none() {
*baseline = Some(watcher.git_state().current_state());
}
}
let (tx, rx) = tokio::sync::mpsc::channel::<(ChangeSet, LastIndexedGitState)>(16);
let debounce = Duration::from_millis(self.config.debounce_ms);
let cancel_poll_period = Duration::from_millis(100);
let live = Arc::new(AtomicBool::new(true));
let blocking_handle = {
let ws = Arc::clone(&ws);
tokio::task::spawn_blocking(move || {
watch_loop_blocking(&watcher, &tx, &ws, debounce, cancel_poll_period);
})
};
let async_handle = {
let dispatcher = Arc::clone(self);
let key = key.clone();
let ws = Arc::clone(&ws);
let live_for_task = Arc::clone(&live);
tokio::spawn(async move {
dispatch_loop_async(&dispatcher, &key, &ws, rx).await;
live_for_task.store(false, Ordering::Release);
dispatcher.reap_watcher(&key, generation);
})
};
watchers.remove(key);
watchers.insert(
key.clone(),
WatcherEntry {
generation,
live,
async_handle,
blocking_handle,
},
);
Ok(())
}
pub(crate) fn reap_watcher(&self, key: &WorkspaceKey, my_generation: u64) {
let mut watchers = self.watchers.lock();
if let Some(entry) = watchers.get(key)
&& entry.generation == my_generation
{
watchers.remove(key);
}
}
#[doc(hidden)]
#[must_use]
pub fn watchers_len(&self) -> usize {
self.watchers.lock().len()
}
}
fn watch_loop_blocking(
watcher: &SourceTreeWatcher,
tx: &tokio::sync::mpsc::Sender<(ChangeSet, LastIndexedGitState)>,
ws: &LoadedWorkspace,
debounce: Duration,
cancel_poll_period: Duration,
) {
loop {
let last_git = ws.last_indexed_git_state.read().clone();
match watcher.wait_for_changes_cancellable(
debounce,
last_git.as_ref(),
&ws.rebuild_cancelled,
cancel_poll_period,
) {
Ok(None) => {
tracing::info!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
"watcher cancelled; terminating blocking loop"
);
break;
}
Err(e) => {
tracing::error!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
error = %e,
"watcher channel disconnected; terminating blocking loop"
);
break;
}
Ok(Some(cs)) if cs.is_empty() => {
continue;
}
Ok(Some(cs)) if cs.changed_files.is_empty() && !cs.requires_full_rebuild() => {
continue;
}
Ok(Some(cs)) if cs.changed_files.is_empty() && cs.requires_full_rebuild() => {
tracing::debug!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
git_class = ?cs.git_change_class,
"skipping empty-files full-rebuild signal: TOCTOU or graph-neutral git move"
);
continue;
}
Ok(Some(cs)) => {
let new_git_state = watcher.git_state().current_state();
if tx.blocking_send((cs, new_git_state)).is_err() {
tracing::debug!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
"async dispatcher task dropped receiver; terminating blocking loop"
);
break;
}
}
}
}
}
async fn dispatch_loop_async(
dispatcher: &Arc<RebuildDispatcher>,
key: &WorkspaceKey,
ws: &LoadedWorkspace,
mut rx: tokio::sync::mpsc::Receiver<(ChangeSet, LastIndexedGitState)>,
) {
loop {
let Some((cs, new_git_state)) = rx.recv().await else {
tracing::debug!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
"watcher channel closed; terminating async dispatcher"
);
break;
};
match dispatcher
.handle_changes_with_git_state(key, cs, new_git_state)
.await
{
Ok(()) => {
}
Err(DaemonError::WorkspaceEvicted { .. }) => {
tracing::info!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
"workspace evicted; terminating async dispatcher"
);
break;
}
Err(e) => {
tracing::warn!(
target: "sqry_daemon::watch",
workspace = %ws.key.source_root.display(),
error = %e,
"rebuild failed; baseline unchanged, retrying on next change"
);
}
}
}
}
fn execute_rebuild_blocking(
root: &std::path::Path,
prior: &Arc<CodeGraph>,
mode: RebuildMode,
changes: ChangeSet,
plugins: &PluginManager,
cfg: &BuildConfig,
cancellation: &CancellationToken,
) -> Result<CodeGraph, DaemonError> {
match mode {
RebuildMode::Full => {
match build_unified_graph_cancellable(root, plugins, cfg, cancellation) {
Ok(graph) => Ok(graph),
Err(e) => Err(map_graph_builder_err(e, root.to_path_buf(), "full rebuild")),
}
}
RebuildMode::Incremental => {
let paths: &[PathBuf] = &changes.changed_files;
let file_ids: Vec<_> = paths.iter().filter_map(|p| prior.files().get(p)).collect();
let closure = compute_reverse_dep_closure(&file_ids, prior.as_ref());
incremental_rebuild(prior.as_ref(), paths, &closure, plugins, cfg, cancellation)
.map_err(|e| map_graph_builder_err(e, root.to_path_buf(), "incremental rebuild"))
}
}
}
fn map_graph_builder_err(err: GraphBuilderError, root: PathBuf, stage: &str) -> DaemonError {
match err {
GraphBuilderError::Cancelled => DaemonError::WorkspaceEvicted { root },
other => DaemonError::WorkspaceBuildFailed {
root,
reason: format!("{stage}: {other}"),
},
}
}
const CANCEL_FORWARDER_POLL_MS: u64 = 50;
fn spawn_cancellation_forwarder(
ws: Arc<LoadedWorkspace>,
token: CancellationToken,
) -> JoinHandle<()> {
tokio::spawn(async move {
loop {
if ws.rebuild_cancelled.load(Ordering::Acquire) {
token.cancel();
return;
}
tokio::time::sleep(std::time::Duration::from_millis(CANCEL_FORWARDER_POLL_MS)).await;
}
})
}
struct DrainLoopSentinel {
ws: Arc<LoadedWorkspace>,
armed: bool,
}
impl Drop for DrainLoopSentinel {
fn drop(&mut self) {
if !self.armed {
return;
}
tracing::error!(
target: "sqry_daemon::rebuild",
workspace = %self.ws.key.source_root.display(),
"handle_changes unwound with armed DrainLoopSentinel — \
releasing rebuild_in_flight defensively; any parked \
PendingRebuild will be processed on the next dispatch"
);
self.ws.rebuild_in_flight.store(false, Ordering::Release);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rebuild_mode_u8_roundtrip() {
for mode in [RebuildMode::Full, RebuildMode::Incremental] {
let encoded = mode.as_u8();
assert_eq!(RebuildMode::from_u8(encoded), Some(mode));
}
assert_eq!(RebuildMode::from_u8(0), None);
assert_eq!(RebuildMode::from_u8(3), None);
assert_eq!(RebuildMode::from_u8(255), None);
}
#[test]
fn merge_git_class_full_rebuild_dominance_canonicalises_to_tree_diverged() {
for full_variant in [GitChangeClass::BranchSwitch, GitChangeClass::TreeDiverged] {
for non_full in [
None,
Some(GitChangeClass::LocalCommit),
Some(GitChangeClass::Noise),
] {
assert_eq!(
merge_git_class(Some(full_variant), non_full),
Some(GitChangeClass::TreeDiverged),
);
assert_eq!(
merge_git_class(non_full, Some(full_variant)),
Some(GitChangeClass::TreeDiverged),
);
}
}
}
#[test]
fn merge_git_class_non_full_later_wins() {
assert_eq!(
merge_git_class(
Some(GitChangeClass::LocalCommit),
Some(GitChangeClass::Noise)
),
Some(GitChangeClass::Noise),
);
assert_eq!(
merge_git_class(
Some(GitChangeClass::Noise),
Some(GitChangeClass::LocalCommit)
),
Some(GitChangeClass::LocalCommit),
);
}
#[test]
fn merge_git_class_absorbs_none_symmetrically() {
assert_eq!(merge_git_class(None, None), None);
assert_eq!(
merge_git_class(None, Some(GitChangeClass::Noise)),
Some(GitChangeClass::Noise),
);
assert_eq!(
merge_git_class(Some(GitChangeClass::LocalCommit), None),
Some(GitChangeClass::LocalCommit),
);
}
fn make_sentinel_workspace() -> Arc<LoadedWorkspace> {
use sqry_core::project::ProjectRootMode;
Arc::new(LoadedWorkspace::new(
WorkspaceKey::new(
std::path::PathBuf::from("/repos/sentinel-test"),
ProjectRootMode::GitRoot,
0xBEEF,
),
false,
))
}
#[test]
fn drain_loop_sentinel_disarmed_is_noop() {
let ws = make_sentinel_workspace();
ws.rebuild_in_flight.store(false, Ordering::Release);
{
let sentinel = DrainLoopSentinel {
ws: Arc::clone(&ws),
armed: false,
};
drop(sentinel);
}
assert!(
!ws.rebuild_in_flight.load(Ordering::Acquire),
"disarmed sentinel must not flip the flag"
);
}
#[test]
fn drain_loop_sentinel_armed_releases_in_flight_on_drop() {
let ws = make_sentinel_workspace();
ws.rebuild_in_flight.store(true, Ordering::Release);
{
let sentinel = DrainLoopSentinel {
ws: Arc::clone(&ws),
armed: true,
};
drop(sentinel);
}
assert!(
!ws.rebuild_in_flight.load(Ordering::Acquire),
"armed sentinel Drop must release rebuild_in_flight"
);
}
fn make_dispatcher_for_gate_test() -> Arc<RebuildDispatcher> {
let config = Arc::new(crate::config::DaemonConfig::default());
let manager = crate::workspace::WorkspaceManager::new_without_reaper(Arc::clone(&config));
let plugins = Arc::new(sqry_plugin_registry::create_plugin_manager());
RebuildDispatcher::new(manager, config, plugins)
}
#[tokio::test]
async fn gate_check_is_noop_when_no_gate_installed() {
let dispatcher = make_dispatcher_for_gate_test();
tokio::time::timeout(Duration::from_millis(50), dispatcher.gate_check())
.await
.expect("gate_check with no installed gate must return immediately");
}
#[tokio::test]
async fn gate_check_blocks_then_decrements_hold_on_release() {
let dispatcher = make_dispatcher_for_gate_test();
let gate = Arc::new(TestGate {
hold: AtomicUsize::new(1),
release: tokio::sync::Notify::new(),
});
dispatcher
.install_test_gate(Arc::clone(&gate))
.expect("first install must succeed");
let dispatcher_for_task = Arc::clone(&dispatcher);
let blocked = tokio::spawn(async move { dispatcher_for_task.gate_check().await });
tokio::time::sleep(Duration::from_millis(25)).await;
assert!(
!blocked.is_finished(),
"gate_check must block while hold > 0"
);
gate.release.notify_one();
tokio::time::timeout(Duration::from_millis(500), blocked)
.await
.expect("gate_check must complete promptly after release")
.expect("task panicked");
assert_eq!(
gate.hold.load(Ordering::Acquire),
0,
"gate release must decrement hold"
);
tokio::time::timeout(Duration::from_millis(50), dispatcher.gate_check())
.await
.expect("gate_check with hold==0 must return immediately without awaiting");
}
#[test]
fn record_and_transition_on_err_preserves_evicted_state() {
let dispatcher = make_dispatcher_for_gate_test();
let ws = Arc::new(LoadedWorkspace::new(
crate::workspace::state::WorkspaceKey::new(
std::path::PathBuf::from("/repo"),
sqry_core::project::ProjectRootMode::GitRoot,
0x1,
),
false,
));
ws.store_state(crate::workspace::state::WorkspaceState::Evicted);
let err = DaemonError::WorkspaceEvicted {
root: std::path::PathBuf::from("/repo"),
};
dispatcher.record_and_transition_on_err(&ws, &err);
assert_eq!(
ws.load_state(),
crate::workspace::state::WorkspaceState::Evicted,
"eviction-path WorkspaceEvicted must NOT transition state"
);
}
#[test]
fn record_and_transition_on_err_unloads_reset_in_iteration_cancel() {
let dispatcher = make_dispatcher_for_gate_test();
let ws = Arc::new(LoadedWorkspace::new(
crate::workspace::state::WorkspaceKey::new(
std::path::PathBuf::from("/repo"),
sqry_core::project::ProjectRootMode::GitRoot,
0x1,
),
false,
));
ws.store_state(crate::workspace::state::WorkspaceState::Rebuilding);
let err = DaemonError::WorkspaceEvicted {
root: std::path::PathBuf::from("/repo"),
};
dispatcher.record_and_transition_on_err(&ws, &err);
assert_eq!(
ws.load_state(),
crate::workspace::state::WorkspaceState::Unloaded,
"in-iteration reset cancellation must transition Rebuilding → Unloaded; \
without this, daemon reset → daemon load cannot recover"
);
}
#[test]
fn record_and_transition_on_err_failed_for_non_eviction_errors() {
let dispatcher = make_dispatcher_for_gate_test();
let ws = Arc::new(LoadedWorkspace::new(
crate::workspace::state::WorkspaceKey::new(
std::path::PathBuf::from("/repo"),
sqry_core::project::ProjectRootMode::GitRoot,
0x1,
),
false,
));
ws.store_state(crate::workspace::state::WorkspaceState::Rebuilding);
let err = DaemonError::Internal(anyhow::anyhow!("plugin panic"));
dispatcher.record_and_transition_on_err(&ws, &err);
assert_eq!(
ws.load_state(),
crate::workspace::state::WorkspaceState::Failed,
"non-eviction errors must transition to Failed"
);
}
}