use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[doc(hidden)]
pub mod thread_start_hook {
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
static REGISTRY: Mutex<Vec<(PathBuf, Arc<AtomicBool>)>> = Mutex::new(Vec::new());
pub fn register(workspace_path: PathBuf, flag: Arc<AtomicBool>) {
let mut guard = REGISTRY
.lock()
.expect("thread_start_hook REGISTRY poisoned");
if let Some(slot) = guard.iter_mut().find(|(p, _)| *p == workspace_path) {
slot.1 = flag;
} else {
guard.push((workspace_path, flag));
}
}
pub fn clear(workspace_path: &Path) {
let mut guard = REGISTRY
.lock()
.expect("thread_start_hook REGISTRY poisoned");
guard.retain(|(p, _)| p != workspace_path);
}
pub(super) fn notify(workspace_path: &Path) {
let guard = REGISTRY
.lock()
.expect("thread_start_hook REGISTRY poisoned");
if let Some((_, flag)) = guard.iter().find(|(p, _)| p == workspace_path) {
flag.store(true, Ordering::Release);
}
}
}
use serde_json::Value;
use sqry_core::project::{ProjectRootMode, absolutize_without_resolution, canonicalize_path};
use sqry_core::query::executor::QueryExecutor;
use sqry_mcp::daemon_adapter::WorkspaceContext;
use crate::error::DaemonError;
use crate::workspace::{ServeVerdict, WorkspaceKey, WorkspaceManager};
#[derive(Debug)]
pub(crate) enum ExecuteVerdict {
Fresh {
inner: Value,
state: crate::workspace::WorkspaceState,
},
Stale {
inner: Value,
stale_warning: String,
last_good_at: SystemTime,
last_error: Option<String>,
},
}
fn resolve_path(raw: &Path) -> Result<PathBuf, DaemonError> {
let absolutised =
absolutize_without_resolution(raw).map_err(|e| DaemonError::InvalidArgument {
reason: format!("path_policy: index_root absolutise: {e}"),
})?;
match std::fs::metadata(&absolutised) {
Ok(meta) if meta.is_dir() => {
canonicalize_path(&absolutised).map_err(|e| DaemonError::InvalidArgument {
reason: format!("path_policy: index_root canonicalize: {e}"),
})
}
Ok(_) => Err(DaemonError::InvalidArgument {
reason: "path_policy: index_root exists but is not a directory".to_string(),
}),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(DaemonError::InvalidArgument {
reason: "path_policy: index_root does not exist; daemon/load requires \
an existing directory so a canonical WorkspaceKey can be computed"
.to_string(),
}),
Err(e) => Err(DaemonError::InvalidArgument {
reason: format!("path_policy: index_root stat: {e}"),
}),
}
}
pub(crate) async fn classify_and_execute<F>(
manager: Arc<WorkspaceManager>,
tool_executor: Arc<QueryExecutor>,
tool_timeout: Duration,
path: &str,
run: F,
) -> Result<ExecuteVerdict, DaemonError>
where
F: FnOnce(&WorkspaceContext) -> anyhow::Result<Value> + Send + 'static,
{
let canonical_root = resolve_path(Path::new(path))?;
let key = WorkspaceKey::new(canonical_root.clone(), ProjectRootMode::GitRoot, 0);
let verdict = manager.classify_for_serve(&key, SystemTime::now())?;
match verdict {
ServeVerdict::Fresh { graph, state } => {
let wctx = WorkspaceContext {
workspace_root: canonical_root.clone(),
graph,
executor: tool_executor,
};
let inner = execute_with_timeout(tool_timeout, &canonical_root, wctx, run).await?;
Ok(ExecuteVerdict::Fresh { inner, state })
}
ServeVerdict::Stale {
graph,
age_hours,
last_good_at,
last_error,
} => {
let wctx = WorkspaceContext {
workspace_root: canonical_root.clone(),
graph,
executor: tool_executor,
};
let inner = execute_with_timeout(tool_timeout, &canonical_root, wctx, run).await?;
let stale_warning = render_stale_warning(
&canonical_root,
age_hours,
last_good_at,
last_error.as_deref(),
);
Ok(ExecuteVerdict::Stale {
inner,
stale_warning,
last_good_at,
last_error,
})
}
ServeVerdict::NotReady { state } => Err(DaemonError::WorkspaceBuildFailed {
root: canonical_root,
reason: format!("workspace not ready ({state:?}); call daemon/load first"),
}),
}
}
async fn execute_with_timeout<F>(
tool_timeout: Duration,
canonical_root: &Path,
wctx: WorkspaceContext,
run: F,
) -> Result<Value, DaemonError>
where
F: FnOnce(&WorkspaceContext) -> anyhow::Result<Value> + Send + 'static,
{
let deadline_ms = u64::try_from(tool_timeout.as_millis()).unwrap_or(u64::MAX);
let secs = tool_timeout.as_secs();
let root_owned = canonical_root.to_path_buf();
let hook_path = canonical_root.to_path_buf();
let join_handle = tokio::task::spawn_blocking(move || {
thread_start_hook::notify(&hook_path);
run(&wctx)
});
match tokio::time::timeout(tool_timeout, join_handle).await {
Ok(Ok(Ok(value))) => Ok(value),
Ok(Ok(Err(err))) => Err(DaemonError::Internal(err)),
Ok(Err(join_err)) => Err(DaemonError::Internal(anyhow::anyhow!(
"spawn_blocking join: {join_err}"
))),
Err(_elapsed) => Err(DaemonError::ToolTimeout {
root: root_owned,
secs,
deadline_ms,
}),
}
}
pub(crate) fn render_stale_warning(
root: &Path,
age_hours: u64,
last_good_at: SystemTime,
last_error: Option<&str>,
) -> String {
use chrono::{DateTime, SecondsFormat, Utc};
let rfc3339 = DateTime::<Utc>::from(last_good_at).to_rfc3339_opts(SecondsFormat::Secs, true);
match last_error {
Some(reason) => format!(
"workspace {} served from last-good build at {rfc3339} ({age_hours}h stale); last error: {reason}",
root.display()
),
None => format!(
"workspace {} served from last-good build at {rfc3339} ({age_hours}h stale)",
root.display()
),
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use serde_json::Value;
use super::{classify_and_execute, render_stale_warning};
use crate::config::DaemonConfig;
use crate::error::DaemonError;
use crate::workspace::WorkspaceManager;
use sqry_core::query::executor::QueryExecutor;
#[test]
fn render_stale_warning_with_last_error() {
let root = std::path::Path::new("/tmp/ws");
let last_good = UNIX_EPOCH + Duration::from_secs(1_760_000_000);
let got = render_stale_warning(root, 48, last_good, Some("parse error"));
assert!(got.contains("/tmp/ws"));
assert!(got.contains("48h stale"));
assert!(got.contains("; last error: parse error"));
assert!(got.contains('Z'), "expected RFC3339 UTC-Zulu form: {got}");
}
#[test]
fn render_stale_warning_without_last_error_omits_clause() {
let root = std::path::Path::new("/tmp/ws");
let last_good = UNIX_EPOCH + Duration::from_secs(1_760_000_000);
let got = render_stale_warning(root, 48, last_good, None);
assert!(got.contains("48h stale"));
assert!(
!got.contains("last error"),
"None last_error must omit the clause entirely, got: {got}"
);
}
fn test_manager() -> Arc<WorkspaceManager> {
let config = Arc::new(DaemonConfig::default());
WorkspaceManager::new_without_reaper(config)
}
fn test_executor() -> Arc<QueryExecutor> {
Arc::new(QueryExecutor::new())
}
#[tokio::test]
async fn classify_and_execute_invalid_path_returns_invalid_argument() {
let manager = test_manager();
let executor = test_executor();
let run = |_wctx: &sqry_mcp::daemon_adapter::WorkspaceContext| -> anyhow::Result<Value> {
Ok(Value::Null)
};
let err = classify_and_execute(
manager,
executor,
Duration::from_secs(10),
"/this/path/does/not/exist/for/real",
run,
)
.await
.expect_err("non-existent path must fail");
match err {
DaemonError::InvalidArgument { reason } => {
assert!(
reason.contains("path_policy"),
"expected 'path_policy' prefix, got: {reason}"
);
}
other => panic!("expected InvalidArgument, got: {other:?}"),
}
}
#[tokio::test]
async fn classify_and_execute_notready_returns_workspace_build_failed() {
use sqry_core::project::{ProjectRootMode, canonicalize_path};
let tmp = tempfile::tempdir().unwrap();
let root = canonicalize_path(tmp.path()).unwrap();
let manager = test_manager();
let executor = test_executor();
let key = crate::workspace::WorkspaceKey::new(root.clone(), ProjectRootMode::GitRoot, 0);
manager.insert_workspace_in_state_for_test(key, crate::workspace::WorkspaceState::Loading);
let run = |_wctx: &sqry_mcp::daemon_adapter::WorkspaceContext| -> anyhow::Result<Value> {
Ok(Value::Null)
};
let err = classify_and_execute(
manager,
executor,
Duration::from_secs(10),
root.to_str().unwrap(),
run,
)
.await
.expect_err("NotReady verdict must fail");
match err {
DaemonError::WorkspaceBuildFailed {
root: got_root,
reason,
} => {
assert_eq!(got_root, root);
assert!(
reason.contains("workspace not ready"),
"expected 'workspace not ready' prefix, got: {reason}"
);
assert!(
reason.contains("Loading"),
"expected state Debug in message, got: {reason}"
);
}
other => panic!("expected WorkspaceBuildFailed, got: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn classify_and_execute_timeout_returns_tool_timeout() {
use sqry_core::project::{ProjectRootMode, canonicalize_path};
let tmp = tempfile::tempdir().unwrap();
let root = canonicalize_path(tmp.path()).unwrap();
let manager = test_manager();
let executor = test_executor();
let key = crate::workspace::WorkspaceKey::new(root.clone(), ProjectRootMode::GitRoot, 0);
manager.insert_workspace_in_state_for_test(key, crate::workspace::WorkspaceState::Loaded);
let run = |_wctx: &sqry_mcp::daemon_adapter::WorkspaceContext| -> anyhow::Result<Value> {
std::thread::sleep(Duration::from_millis(500));
Ok(Value::Null)
};
let err = classify_and_execute(
manager,
executor,
Duration::from_millis(50),
root.to_str().unwrap(),
run,
)
.await
.expect_err("timeout must fire");
match err {
DaemonError::ToolTimeout {
root: got_root,
secs,
deadline_ms,
} => {
assert_eq!(got_root, root);
assert_eq!(secs, 0, "50ms rounds down to 0 whole seconds");
assert_eq!(deadline_ms, 50);
}
other => panic!("expected ToolTimeout, got: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn classify_and_execute_internal_error_on_run_failure() {
use sqry_core::project::{ProjectRootMode, canonicalize_path};
let tmp = tempfile::tempdir().unwrap();
let root = canonicalize_path(tmp.path()).unwrap();
let manager = test_manager();
let executor = test_executor();
let key = crate::workspace::WorkspaceKey::new(root.clone(), ProjectRootMode::GitRoot, 0);
manager.insert_workspace_in_state_for_test(key, crate::workspace::WorkspaceState::Loaded);
let run = |_wctx: &sqry_mcp::daemon_adapter::WorkspaceContext| -> anyhow::Result<Value> {
Err(anyhow::anyhow!("synthetic closure failure"))
};
let err = classify_and_execute(
manager,
executor,
Duration::from_secs(10),
root.to_str().unwrap(),
run,
)
.await
.expect_err("closure failure must surface");
match err {
DaemonError::Internal(inner) => {
assert!(
inner.to_string().contains("synthetic closure failure"),
"expected closure error to survive, got: {inner}"
);
}
other => panic!("expected Internal, got: {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn classify_and_execute_fresh_path_returns_inner_value() {
use sqry_core::project::{ProjectRootMode, canonicalize_path};
let tmp = tempfile::tempdir().unwrap();
let root = canonicalize_path(tmp.path()).unwrap();
let manager = test_manager();
let executor = test_executor();
let key = crate::workspace::WorkspaceKey::new(root.clone(), ProjectRootMode::GitRoot, 0);
manager.insert_workspace_in_state_for_test(key, crate::workspace::WorkspaceState::Loaded);
let run = |_wctx: &sqry_mcp::daemon_adapter::WorkspaceContext| -> anyhow::Result<Value> {
Ok(serde_json::json!({"hello": "world"}))
};
let verdict = classify_and_execute(
manager,
executor,
Duration::from_secs(10),
root.to_str().unwrap(),
run,
)
.await
.expect("fresh path must succeed");
match verdict {
super::ExecuteVerdict::Fresh { inner, state } => {
assert_eq!(inner, serde_json::json!({"hello": "world"}));
assert_eq!(state, crate::workspace::WorkspaceState::Loaded);
}
other => panic!("expected Fresh, got: {other:?}"),
}
}
#[test]
fn render_stale_warning_epoch_is_well_formed() {
let got =
render_stale_warning(std::path::Path::new("/ws"), 0, SystemTime::UNIX_EPOCH, None);
assert!(got.contains("1970-01-01T00:00:00Z"), "unexpected: {got}");
}
}