use crate::error::{packop_error, CancelledExt};
use grex_core::sync::{self, SyncOptions};
use rmcp::{
handler::server::wrapper::Parameters,
model::{CallToolResult, Content},
ErrorData as McpError,
};
use schemars::JsonSchema;
use serde::Deserialize;
use std::path::PathBuf;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
#[non_exhaustive]
pub struct SyncParams {
pub pack_root: PathBuf,
#[serde(default)]
pub workspace: Option<PathBuf>,
#[serde(default)]
pub dry_run: bool,
#[serde(default)]
pub no_validate: bool,
#[serde(default, rename = "ref")]
pub ref_override: Option<String>,
#[serde(default)]
pub only: Vec<String>,
#[serde(default)]
pub force: bool,
#[serde(default)]
pub parallel: Option<u32>,
#[serde(default)]
pub retain_days: Option<u32>,
}
pub(crate) async fn handle(
state: &crate::ServerState,
p: Parameters<SyncParams>,
cancel: CancellationToken,
) -> Result<CallToolResult, McpError> {
run_with_cancel(state, p.0, cancel).await
}
async fn run_with_cancel(
state: &crate::ServerState,
p: SyncParams,
cancel: CancellationToken,
) -> Result<CallToolResult, McpError> {
#[cfg(any(test, feature = "test-hooks"))]
if test_hooks::block_until_cancelled() {
cancel.cancelled().await;
return Err(McpError::from(CancelledExt));
}
let _permit = state
.scheduler
.acquire_cancellable(&cancel)
.await
.map_err(|_| McpError::from(CancelledExt))?;
#[cfg(any(test, feature = "test-hooks"))]
let _stress_guard = test_hooks::stress_barrier_enter().await;
let opts = build_opts(&p);
let pack_root = p.pack_root.clone();
let cancel_clone = cancel.clone();
let handle = tokio::task::spawn_blocking(move || sync::run(&pack_root, &opts, &cancel_clone));
let outcome = tokio::select! {
biased;
_ = cancel.cancelled() => return Err(McpError::from(CancelledExt)),
joined = handle => joined,
};
match outcome {
Ok(Ok(report)) => Ok(success_envelope(&report)),
Ok(Err(err)) => Ok(packop_error(&format!("{err}"))),
Err(join_err) => Ok(packop_error(&format!("internal: blocking task failed: {join_err}"))),
}
}
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
mod test_hooks {
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::Barrier;
static BLOCK: AtomicBool = AtomicBool::new(false);
pub fn block_until_cancelled() -> bool {
BLOCK.load(Ordering::SeqCst)
}
pub fn set_block_until_cancelled(v: bool) {
BLOCK.store(v, Ordering::SeqCst);
}
static IN_FLIGHT: AtomicUsize = AtomicUsize::new(0);
static HIGH_WATER: AtomicUsize = AtomicUsize::new(0);
fn barrier_slot() -> &'static Mutex<Option<Arc<Barrier>>> {
static SLOT: Mutex<Option<Arc<Barrier>>> = Mutex::new(None);
&SLOT
}
pub fn set_stress_barrier(b: Option<Arc<Barrier>>) {
*barrier_slot().lock().expect("stress barrier slot poisoned") = b;
}
pub fn reset_stress_metrics() {
IN_FLIGHT.store(0, Ordering::SeqCst);
HIGH_WATER.store(0, Ordering::SeqCst);
}
pub fn stress_high_water() -> usize {
HIGH_WATER.load(Ordering::SeqCst)
}
pub struct StressGuard {
_private: (),
}
impl Drop for StressGuard {
fn drop(&mut self) {
IN_FLIGHT.fetch_sub(1, Ordering::SeqCst);
}
}
pub async fn stress_barrier_enter() -> StressGuard {
let prev = IN_FLIGHT.fetch_add(1, Ordering::SeqCst);
let now = prev + 1;
HIGH_WATER.fetch_max(now, Ordering::SeqCst);
let barrier = barrier_slot().lock().expect("stress barrier slot poisoned").clone();
if let Some(b) = barrier {
b.wait().await;
}
StressGuard { _private: () }
}
}
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_set_block_until_cancelled(v: bool) {
test_hooks::set_block_until_cancelled(v);
}
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_set_stress_barrier(b: Option<std::sync::Arc<tokio::sync::Barrier>>) {
test_hooks::set_stress_barrier(b);
}
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_reset_stress_metrics() {
test_hooks::reset_stress_metrics();
}
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_stress_high_water() -> usize {
test_hooks::stress_high_water()
}
fn build_opts(p: &SyncParams) -> SyncOptions {
let only = if p.only.is_empty() { None } else { Some(p.only.clone()) };
SyncOptions::new()
.with_dry_run(p.dry_run)
.with_validate(!p.no_validate)
.with_workspace(p.workspace.clone())
.with_ref_override(p.ref_override.clone())
.with_only_patterns(only)
.with_force(p.force)
.with_retain_days(p.retain_days)
}
fn success_envelope(report: &grex_core::sync::SyncReport) -> CallToolResult {
let body =
format!("sync ok: {} step(s); halted={}", report.steps.len(), report.halted.is_some());
CallToolResult::success(vec![Content::text(body)])
}
#[cfg(test)]
mod tests {
use super::*;
use rmcp::handler::server::tool::schema_for_type;
#[test]
fn sync_params_schema_resolves() {
let _ = schema_for_type::<SyncParams>();
}
#[tokio::test]
async fn sync_happy_path_returns_envelope() {
let s = crate::ServerState::for_tests();
let p = SyncParams {
pack_root: std::env::temp_dir().join("grex-mcp-nonexistent-pack"),
workspace: None,
dry_run: true,
no_validate: true,
ref_override: None,
only: Vec::new(),
force: false,
parallel: None,
retain_days: None,
};
let r = handle(&s, Parameters(p), CancellationToken::new()).await.unwrap();
assert!(r.is_error.is_some(), "must set isError flag");
}
#[test]
fn build_opts_propagates_retain_days() {
let mut p = SyncParams {
pack_root: std::path::PathBuf::from("/tmp/grex-mcp-retain-days-fixture"),
workspace: None,
dry_run: true,
no_validate: true,
ref_override: None,
only: Vec::new(),
force: false,
parallel: None,
retain_days: Some(30),
};
let opts = build_opts(&p);
assert_eq!(opts.retain_days, Some(30), "Some(30) must propagate verbatim");
p.retain_days = None;
let opts_none = build_opts(&p);
assert_eq!(opts_none.retain_days, None, "None must propagate verbatim");
}
#[test]
fn sync_params_deserialises_retain_days_camel_case() {
let json = serde_json::json!({
"packRoot": "/tmp/grex-mcp-retain-days-wire",
"retainDays": 45_u32,
});
let p: SyncParams = serde_json::from_value(json).expect("camelCase retainDays parses");
assert_eq!(p.retain_days, Some(45));
}
}