grex_cli/cli/verbs/serve.rs
1//! `grex serve` — launch the MCP stdio server (feat-m7-1 stage 8).
2//!
3//! Wires the rmcp framework (built in `grex-mcp`) to the actual stdio
4//! transport on top of a Tokio runtime. The runtime is constructed
5//! per-invocation: `cli::run` is sync, but rmcp's stdio loop is async,
6//! so we bridge with a current-thread runtime started on demand.
7//!
8//! ## Tracing discipline
9//!
10//! `grex-mcp::GrexMcpServer::run` reinstalls a stderr-pinned subscriber
11//! using `EnvFilter::try_from_default_env()` with `info` fallback. To
12//! silence rmcp's "Service initialized as server" INFO line in
13//! production, `main.rs` defaults `RUST_LOG` to `grex=info,rmcp=warn`
14//! when no env var is present (Stage 5 wiring note #7). Tests can
15//! override with `RUST_LOG=…`.
16
17use crate::cli::args::{GlobalFlags, ServeArgs};
18use anyhow::{Context, Result};
19use grex_core::manifest::{ensure_event_log_migrated, find_workspace_root};
20use grex_core::{Registry, Scheduler};
21use grex_mcp::{GrexMcpServer, ServerState};
22use tokio_util::sync::CancellationToken;
23
24// `_cancel` is intentionally unused here. The verb-level CancellationToken from
25// `cli::run` is plumbed for future global-shutdown wiring (Stage 8+). Per-request
26// cancellation is handled by rmcp's internal local_ct_pool (see grex-mcp Stage 7
27// commit and lib.rs comment block above serve()).
28pub fn run(args: ServeArgs, _global: &GlobalFlags, _cancel: &CancellationToken) -> Result<()> {
29 crate::cli::deprecation::warn_workspace_alias_used();
30 let workspace = match args.pack {
31 Some(p) => p,
32 None => {
33 let cwd = std::env::current_dir().context("resolve cwd for --pack default")?;
34 // Walk up from cwd to find a pack-root marker — fixes the
35 // v1.x cwd-relative bug for `grex serve` invoked from a
36 // subdir of the pack root.
37 find_workspace_root(&cwd)
38 }
39 };
40 let manifest_path = match args.manifest {
41 Some(p) => p,
42 None => ensure_event_log_migrated(&workspace).context("migrate v1.x event log")?,
43 };
44 let parallel = resolve_parallel(args.parallel);
45
46 let scheduler = Scheduler::new(parallel);
47 let registry = Registry::default();
48 let state = ServerState::new(scheduler, registry, manifest_path, workspace);
49 let server = GrexMcpServer::new(state);
50
51 // Bridge sync `cli::run` → async rmcp loop. A fresh single-thread
52 // runtime is sufficient: the server has no other in-process work,
53 // and rmcp drives its own request fan-out via tokio::spawn.
54 let rt = tokio::runtime::Builder::new_current_thread()
55 .enable_all()
56 .build()
57 .context("build tokio runtime for grex serve")?;
58
59 rt.block_on(async move {
60 let transport = rmcp::transport::stdio();
61 // Pre-`run` info line so stderr has at least one tracing event
62 // even with the default `grex=info,rmcp=warn` filter that hides
63 // rmcp's "Service initialized as server" log. Useful for ops
64 // visibility (PID, parallel cap) and asserted by 8.T3.
65 tracing::info!(
66 target: "grex",
67 parallel,
68 "grex serve: MCP stdio transport ready",
69 );
70 server.run(transport).await.context("grex-mcp server exited with error")
71 })
72}
73
74/// Resolve the `--parallel` flag to a concrete worker count, falling
75/// back to `std::thread::available_parallelism()` when unset and to `1`
76/// as the conservative floor when even that fails (uncommon — reserved
77/// VMs / sandboxed CI). Matches the harness contract in
78/// `.omne/cfg/concurrency.md`.
79fn resolve_parallel(opt: Option<u32>) -> usize {
80 match opt {
81 Some(n) => n as usize,
82 None => std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1),
83 }
84}