grex-mcp 1.3.1

Agent-native MCP server for grex, the nested meta-repo manager: exposes the 11 core verbs as MCP tools.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
//! `sync` tool — drive the M3 Stage B end-to-end pipeline.
//!
//! Only verb (alongside `teardown`, which is NOT exposed as an MCP tool)
//! that has a real core implementation today. Bridges
//! [`grex_core::sync::run`] (synchronous, blocking) onto the async
//! rmcp dispatch via [`tokio::task::spawn_blocking`]. Cancellation token
//! comes from the per-request `RequestContext::ct` plumbed in Stage 7
//! and threaded through `tool_router`'s `&self` shim.

use crate::error::{packop_error, CancelledExt};
use grex_core::sync::{self, SyncOptions};
use rmcp::{
    handler::server::wrapper::Parameters,
    model::{CallToolResult, Content},
    ErrorData as McpError,
};
use schemars::JsonSchema;
use serde::Deserialize;
use std::path::PathBuf;
use tokio_util::sync::CancellationToken;

/// Args for `sync`. Mirrors the CLI's `--json` shape — JSON-only fields,
/// no positional args. `pack_root` is required at the MCP edge because
/// the legacy "no-arg stub print" branch makes no sense for an agent.
//
// TODO(v2.0.0): deprecate this envelope shape — the v1.2.0 walker
// (Stage 1.m and beyond) introduces additional `SyncOptions` knobs
// (`force_prune`, `force_prune_with_ignored`, `migrate_lockfile`,
// `recurse`, `max_depth`) that should land on a future `grex.sync`
// v2 envelope rather than further widening this v1 surface. The
// `#[deprecated]` attribute is intentionally NOT applied yet — there
// is no v2 envelope to point callers at, so emitting a deprecation
// warning today would be noise without a migration target. When the
// v2 envelope ships, replace this comment with
// `#[deprecated(since = "2.0.0", note = "Use grex.sync v2 envelope")]`
// on this struct and on its handler entry point.
#[derive(Debug, Deserialize, JsonSchema)]
#[serde(deny_unknown_fields, rename_all = "camelCase")]
#[non_exhaustive]
pub struct SyncParams {
    /// Pack root: directory holding `.grex/pack.yaml` or the YAML file itself.
    pub pack_root: PathBuf,
    /// Path to the pack root. The legacy field "workspace" is accepted for
    /// back-compat and will be removed in v2.0.0. When both supplied, "pack"
    /// wins. Added in v1.3.0 as the primary pack-root field; takes precedence
    /// over both the legacy `workspace` field and the existing `packRoot`
    /// field when present.
    #[serde(default)]
    pub pack: Option<PathBuf>,
    /// Workspace directory for cloned children.
    #[serde(default)]
    pub workspace: Option<PathBuf>,
    /// Plan actions without touching the filesystem.
    #[serde(default)]
    pub dry_run: bool,
    /// Skip plan-phase validators. Debug-only escape hatch.
    #[serde(default)]
    pub no_validate: bool,
    /// Override the default ref for every pack.
    #[serde(default, rename = "ref")]
    pub ref_override: Option<String>,
    /// Restrict sync to packs whose path matches one of the globs.
    #[serde(default)]
    pub only: Vec<String>,
    /// Re-execute every pack even when its `actions_hash` is unchanged.
    #[serde(default)]
    pub force: bool,
    /// Max parallel pack ops. `None` → core default; `0` → unbounded; `1` → serial.
    #[serde(default)]
    pub parallel: Option<u32>,
    /// Quarantine retention window in days for the meta-sync GC sweep.
    /// `None` preserves v1.2.1 indefinite-retention behavior; `Some(N)`
    /// triggers a best-effort sweep at the start of every meta sync. See
    /// [`grex_core::sync::SyncOptions::retain_days`] for the contract.
    /// Mirrors the CLI's `--retain-days N` flag (added in v1.2.5).
    #[serde(default)]
    pub retain_days: Option<u32>,
}

pub(crate) async fn handle(
    state: &crate::ServerState,
    p: Parameters<SyncParams>,
    cancel: CancellationToken,
) -> Result<CallToolResult, McpError> {
    run_with_cancel(state, p.0, cancel).await
}

async fn run_with_cancel(
    state: &crate::ServerState,
    p: SyncParams,
    cancel: CancellationToken,
) -> Result<CallToolResult, McpError> {
    // Test-only block-until-cancelled hook — gated behind the `test-hooks`
    // cargo feature so it compiles out of release `grex serve` binaries
    // entirely (no exposed test surface, no runtime atomic load). The
    // cancellation integration test
    // (`crates/grex-mcp/tests/cancellation.rs::notifications_cancelled_aborts_inflight_sync`)
    // enables `test-hooks` via the dev-dep self-edge in `Cargo.toml`, then
    // flips the toggle on so the in-flight handler awaits its per-request
    // `CancellationToken` instead of running the (microseconds-fast)
    // `sync::run` and losing the race against `notifications/cancelled`.
    #[cfg(any(test, feature = "test-hooks"))]
    if test_hooks::block_until_cancelled() {
        cancel.cancelled().await;
        return Err(McpError::from(CancelledExt));
    }

    // feat-m7-2 Stage 7 — bound the MCP edge by the shared
    // `Scheduler` so concurrent `tools/call sync` invocations never
    // over-subscribe past `--parallel N`. This is the FIRST production
    // consumer of `Scheduler::acquire_cancellable` (m7-1 Stage 3 added
    // the method; m7-1 Stage 5 wired the scheduler into `ServerState`).
    // Holding the permit through the full handler — including the
    // `spawn_blocking(sync::run)` body — means the bound is observable
    // from the outside as the in-flight `tools/call` count, not just
    // the queued-into-spawn_blocking count. Permit drops at end-of-
    // function so the next queued caller can proceed. Cancellation
    // before a permit is granted maps to `-32800 RequestCancelled` via
    // the existing `CancelledExt` envelope.
    let _permit = state
        .scheduler
        .acquire_cancellable(&cancel)
        .await
        .map_err(|_| McpError::from(CancelledExt))?;

    // Test-only stress-barrier hook (feat-m7-2 Stage 6). When a
    // `tokio::sync::Barrier` has been installed via
    // `__test_set_stress_barrier`, every handler invocation increments
    // a shared in-flight counter, awaits the barrier (which releases
    // simultaneously across all parked handlers + the test thread),
    // then decrements the counter on its way out. The L4 stress harness
    // (`crates/grex-mcp/tests/stress.rs`) uses this to pin the in-flight
    // population at exactly PARALLEL handlers and assert the scheduler
    // never over-subscribes. Same `cfg(any(test, feature = "test-hooks"))`
    // gate as the cancellation hook above — zero footprint in release
    // `grex serve`. Stage 7: now sits AFTER the permit-acquire so only
    // PARALLEL handlers ever park here; the rest queue at the
    // semaphore.
    #[cfg(any(test, feature = "test-hooks"))]
    let _stress_guard = test_hooks::stress_barrier_enter().await;

    let opts = build_opts(&p);
    // v1.3.0 — `pack` takes precedence over the legacy `pack_root`. Both
    // fields resolve to the same downstream value (the sync target). The
    // separate `workspace` field continues to denote the cloned-children
    // directory and is propagated via `SyncOptions::with_workspace` inside
    // `build_opts`. Precedence rule: `pack.or(Some(pack_root))`.
    let pack_root = p.pack.clone().unwrap_or_else(|| p.pack_root.clone());

    // `sync::run` is sync and may block on filesystem / git. Push it onto a
    // blocking thread so the rmcp dispatcher's reactor stays responsive. The
    // join handle is `select!`'d against `cancel.cancelled()` so the request
    // can return -32800 promptly (the OS thread continues briefly per the
    // documented leak window in `pack_lock::acquire_cancellable`).
    let cancel_clone = cancel.clone();
    let handle = tokio::task::spawn_blocking(move || sync::run(&pack_root, &opts, &cancel_clone));

    let outcome = tokio::select! {
        biased;
        _ = cancel.cancelled() => return Err(McpError::from(CancelledExt)),
        joined = handle => joined,
    };

    match outcome {
        Ok(Ok(report)) => Ok(success_envelope(&report)),
        Ok(Err(err)) => Ok(packop_error(&format!("{err}"))),
        Err(join_err) => Ok(packop_error(&format!("internal: blocking task failed: {join_err}"))),
    }
}

/// Test-only knobs. `block_until_cancelled` is consulted by
/// `run_with_cancel` at the top of every call when compiled with the
/// `test-hooks` cargo feature; flipping it on turns the handler into
/// "await cancel; return -32800". This is the deterministic substitute
/// for a slow git fetch that the Stage 7 task description anticipated.
/// The whole module is gated behind `cfg(any(test, feature = "test-hooks"))`
/// so neither the atomic nor the setter ships in default-feature release
/// builds.
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
mod test_hooks {
    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
    use std::sync::{Arc, Mutex};
    use tokio::sync::Barrier;

    static BLOCK: AtomicBool = AtomicBool::new(false);

    pub fn block_until_cancelled() -> bool {
        BLOCK.load(Ordering::SeqCst)
    }

    pub fn set_block_until_cancelled(v: bool) {
        BLOCK.store(v, Ordering::SeqCst);
    }

    // ---- Stress barrier hook (feat-m7-2 Stage 6) ----
    //
    // Holds an optional `Arc<Barrier>` plus an `AtomicUsize` recording
    // the high-water in-flight count observed across all handler
    // invocations. The test installs the barrier (sized N+1 so the test
    // thread is the +1 releaser), then drives N concurrent `tools/call
    // sync` requests; each handler hits `stress_barrier_enter`, bumps
    // the in-flight counter, awaits the barrier, and on guard-drop
    // decrements. When the post-`Barrier::wait()` snapshot equals
    // PARALLEL exactly, the contract holds.

    static IN_FLIGHT: AtomicUsize = AtomicUsize::new(0);
    static HIGH_WATER: AtomicUsize = AtomicUsize::new(0);

    fn barrier_slot() -> &'static Mutex<Option<Arc<Barrier>>> {
        static SLOT: Mutex<Option<Arc<Barrier>>> = Mutex::new(None);
        &SLOT
    }

    pub fn set_stress_barrier(b: Option<Arc<Barrier>>) {
        *barrier_slot().lock().expect("stress barrier slot poisoned") = b;
    }

    pub fn reset_stress_metrics() {
        IN_FLIGHT.store(0, Ordering::SeqCst);
        HIGH_WATER.store(0, Ordering::SeqCst);
    }

    pub fn stress_high_water() -> usize {
        HIGH_WATER.load(Ordering::SeqCst)
    }

    /// RAII guard returned by [`stress_barrier_enter`]. Decrements
    /// `IN_FLIGHT` when dropped so the counter reflects live handlers
    /// only. The high-water mark is monotone — never decremented.
    pub struct StressGuard {
        _private: (),
    }

    impl Drop for StressGuard {
        fn drop(&mut self) {
            IN_FLIGHT.fetch_sub(1, Ordering::SeqCst);
        }
    }

    /// Increment the in-flight counter, refresh the high-water mark,
    /// and (if a barrier is installed) await it. Returns a guard whose
    /// `Drop` decrements the in-flight counter. Cheap no-op when no
    /// barrier is installed (the common case — only the L4 stress
    /// harness installs one).
    pub async fn stress_barrier_enter() -> StressGuard {
        let prev = IN_FLIGHT.fetch_add(1, Ordering::SeqCst);
        let now = prev + 1;
        // Atomic max via CAS loop. `fetch_max` is stable on AtomicUsize
        // since 1.45 so a single call suffices.
        HIGH_WATER.fetch_max(now, Ordering::SeqCst);

        let barrier = barrier_slot().lock().expect("stress barrier slot poisoned").clone();
        if let Some(b) = barrier {
            b.wait().await;
        }
        StressGuard { _private: () }
    }
}

/// Test-only setter for the block-until-cancelled hook. See
/// [`test_hooks`] for rationale. Hidden from rustdoc and compiled out
/// unless the `test-hooks` cargo feature is enabled.
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_set_block_until_cancelled(v: bool) {
    test_hooks::set_block_until_cancelled(v);
}

/// Test-only setter for the L4 stress barrier (feat-m7-2 Stage 6).
/// Pass `Some(barrier)` to install; pass `None` to clear after the
/// stress test releases. Sized at `Barrier::new(PARALLEL + 1)` — N
/// handlers + the test thread.
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_set_stress_barrier(b: Option<std::sync::Arc<tokio::sync::Barrier>>) {
    test_hooks::set_stress_barrier(b);
}

/// Test-only reset for the stress in-flight + high-water counters.
/// Call once at the top of every stress case so a previous run's
/// state does not bleed into the next.
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_reset_stress_metrics() {
    test_hooks::reset_stress_metrics();
}

/// Test-only accessor for the high-water in-flight count observed by
/// the stress barrier. Monotone — never decremented across calls.
#[cfg(any(test, feature = "test-hooks"))]
#[doc(hidden)]
pub fn __test_stress_high_water() -> usize {
    test_hooks::stress_high_water()
}

fn build_opts(p: &SyncParams) -> SyncOptions {
    let only = if p.only.is_empty() { None } else { Some(p.only.clone()) };
    SyncOptions::new()
        .with_dry_run(p.dry_run)
        .with_validate(!p.no_validate)
        .with_workspace(p.workspace.clone())
        .with_ref_override(p.ref_override.clone())
        .with_only_patterns(only)
        .with_force(p.force)
        .with_retain_days(p.retain_days)
}

fn success_envelope(report: &grex_core::sync::SyncReport) -> CallToolResult {
    let body =
        format!("sync ok: {} step(s); halted={}", report.steps.len(), report.halted.is_some());
    CallToolResult::success(vec![Content::text(body)])
}

#[cfg(test)]
mod tests {
    use super::*;
    use rmcp::handler::server::tool::schema_for_type;

    #[test]
    fn sync_params_schema_resolves() {
        let _ = schema_for_type::<SyncParams>();
    }

    /// Happy-path: against an empty / non-existent pack root the core
    /// returns a `SyncError`; we map to a `pack_op` envelope. The test's
    /// goal is "handler runs and returns a structured envelope" — the
    /// exact error text is core's domain.
    #[tokio::test]
    async fn sync_happy_path_returns_envelope() {
        let s = crate::ServerState::for_tests();
        let p = SyncParams {
            pack_root: std::env::temp_dir().join("grex-mcp-nonexistent-pack"),
            pack: None,
            workspace: None,
            dry_run: true,
            no_validate: true,
            ref_override: None,
            only: Vec::new(),
            force: false,
            parallel: None,
            retain_days: None,
        };
        let r = handle(&s, Parameters(p), CancellationToken::new()).await.unwrap();
        // We expect failure — pack root does not exist. Either way the
        // tool MUST return Ok(envelope), not a JSON-RPC -32xxx.
        assert!(r.is_error.is_some(), "must set isError flag");
    }

    /// Codex MEDIUM #5 / v1.2.5 — `retain_days` must propagate from the
    /// MCP `SyncParams` envelope into the core `SyncOptions` so MCP
    /// callers reach feature parity with the CLI's `--retain-days N`
    /// flag.  None and Some(N) both round-trip verbatim.
    #[test]
    fn build_opts_propagates_retain_days() {
        let mut p = SyncParams {
            pack_root: std::path::PathBuf::from("/tmp/grex-mcp-retain-days-fixture"),
            pack: None,
            workspace: None,
            dry_run: true,
            no_validate: true,
            ref_override: None,
            only: Vec::new(),
            force: false,
            parallel: None,
            retain_days: Some(30),
        };
        let opts = build_opts(&p);
        assert_eq!(opts.retain_days, Some(30), "Some(30) must propagate verbatim");

        p.retain_days = None;
        let opts_none = build_opts(&p);
        assert_eq!(opts_none.retain_days, None, "None must propagate verbatim");
    }

    /// Companion to the round-trip test: confirm `retainDays` deserialises
    /// from the camelCase JSON-RPC wire shape (matching the rest of
    /// `SyncParams`'s `rename_all = "camelCase"` discipline).
    #[test]
    fn sync_params_deserialises_retain_days_camel_case() {
        let json = serde_json::json!({
            "packRoot": "/tmp/grex-mcp-retain-days-wire",
            "retainDays": 45_u32,
        });
        let p: SyncParams = serde_json::from_value(json).expect("camelCase retainDays parses");
        assert_eq!(p.retain_days, Some(45));
    }

    /// v1.3.0 — `pack` is the primary pack-root field; the legacy
    /// `workspace` field is accepted for back-compat. When both are
    /// supplied alongside an existing `packRoot`, `pack` wins per the
    /// documented `pack.or(workspace)`-style precedence rule (extended
    /// here to also win over the existing required `packRoot` field, so
    /// that v1.3.0 callers can opt into the new field name without
    /// having to drop the legacy one). This test pins both halves of the
    /// precedence contract: deserialisation accepts both fields, and the
    /// resolved sync-target path is the value supplied via `pack`.
    #[test]
    fn mcp_sync_pack_field_takes_precedence_over_workspace() {
        let pack_path = std::path::PathBuf::from("/tmp/grex-mcp-pack-precedence-pack");
        let workspace_path = std::path::PathBuf::from("/tmp/grex-mcp-pack-precedence-workspace");
        let pack_root_path = std::path::PathBuf::from("/tmp/grex-mcp-pack-precedence-packroot");

        // Wire-level: confirm camelCase deserialisation accepts `pack`
        // alongside the legacy `workspace` field.
        let json = serde_json::json!({
            "packRoot": pack_root_path,
            "pack": pack_path,
            "workspace": workspace_path,
        });
        let p: SyncParams = serde_json::from_value(json).expect("pack + workspace parses");
        assert_eq!(p.pack.as_deref(), Some(pack_path.as_path()));
        assert_eq!(p.workspace.as_deref(), Some(workspace_path.as_path()));
        assert_eq!(p.pack_root, pack_root_path);

        // Resolution-level: the sync target is the value supplied via
        // `pack`, not the legacy `pack_root` fallback. Mirrors the
        // `pack.or(...)` precedence applied in `run_with_cancel`.
        let resolved = p.pack.clone().unwrap_or_else(|| p.pack_root.clone());
        assert_eq!(resolved, pack_path, "pack must win over pack_root at resolution time");
    }
}