Skip to main content

harn_vm/
mcp_host.rs

1//! Supervised external-tool MCP host primitive (harn#2504, A.7).
2//!
3//! Wraps the connection-level state owned by [`crate::mcp_registry`] with
4//! supervision, response caching, cross-server discovery, and allowlist
5//! enforcement. The registry knows *which* servers are declared and how to
6//! lazy-connect them; this module is the runtime that callers actually
7//! reach for spawn/call/stop/discover/reload, and is the single home for
8//! restart budgets, circuit-breaker state, and per-(server, tool, args)
9//! response memoization.
10//!
11//! The split is deliberate: [`crate::mcp.rs`] owns the wire protocol,
12//! [`crate::mcp_registry`] owns lifecycle bookkeeping, and this module
13//! owns *policy* — the rules a hosted MCP server must follow before its
14//! calls are dispatched and after they return. Allowlist evaluation is
15//! delegated to a swappable [`McpAllowlist`] callback so harn-serve can
16//! plug `AuthPolicy` in without harn-vm depending on harn-serve.
17
18use std::collections::{BTreeMap, HashMap};
19use std::sync::{Arc, Mutex};
20use std::time::{Duration, Instant};
21
22use serde::{Deserialize, Serialize};
23use serde_json::Value as JsonValue;
24use sha2::{Digest, Sha256};
25
26use crate::mcp::{call_mcp_tool_with_hint, VmMcpClientHandle};
27use crate::mcp_protocol::McpCacheHint;
28use crate::mcp_registry::{self, RegisteredMcpServer};
29use crate::value::VmError;
30
31/// Default per-server restart budget. The supervisor allows this many
32/// auto-restarts inside [`DEFAULT_RESTART_WINDOW`] before the server is
33/// marked ejected; an ejected server fails fast until an explicit
34/// `reload()` clears the state.
35pub const DEFAULT_MAX_RESTARTS: u32 = 5;
36pub const DEFAULT_RESTART_WINDOW: Duration = Duration::from_mins(5);
37
38/// Default circuit-breaker thresholds applied to a hosted server when the
39/// caller does not override them. Mirrors the
40/// `circuit_breaker(name, threshold?, reset_ms?)` stdlib builtin defaults
41/// (5 failures, 30s reset) so .harn-side and Rust-side breakers behave
42/// consistently.
43pub const DEFAULT_CIRCUIT_THRESHOLD: u32 = 5;
44pub const DEFAULT_CIRCUIT_RESET: Duration = Duration::from_secs(30);
45
46/// Initial backoff applied between restart attempts. Doubled on every
47/// consecutive restart up to [`MAX_RESTART_BACKOFF`].
48pub const INITIAL_RESTART_BACKOFF: Duration = Duration::from_millis(100);
49pub const MAX_RESTART_BACKOFF: Duration = Duration::from_secs(5);
50
51/// Cap on the response cache entries kept per (server, tool) pair so a
52/// run-away script with high-arity arguments can't grow the cache without
53/// bound. LRU semantics are approximated by dropping the oldest insertion
54/// timestamp when the cap is reached.
55pub const RESPONSE_CACHE_MAX_ENTRIES_PER_TOOL: usize = 64;
56
57/// Per-server supervision state. Holds the restart-budget bookkeeping and
58/// circuit-breaker state. Carved off from `mcp_registry::ActiveConnection`
59/// so the registry can stay focused on lazy-boot / ref-counting and not
60/// gain unrelated knobs.
61#[derive(Debug)]
62struct SupervisionState {
63    /// Timestamps of recent restart attempts, oldest first. Older than
64    /// [`DEFAULT_RESTART_WINDOW`] entries are pruned on every observation.
65    restart_attempts: Vec<Instant>,
66    /// Consecutive failures since the last success. Reset by a successful
67    /// `tools/call`.
68    consecutive_failures: u32,
69    /// When `Some`, the breaker is open until this instant. While open,
70    /// `call` returns `circuit_open` without touching the network. After
71    /// the instant elapses, the next call probes (half-open); success
72    /// closes the breaker and resets the failure count.
73    breaker_opens_until: Option<Instant>,
74    /// When `true`, the server has burned through its restart budget and
75    /// will not be auto-restarted again until `reload(server)` is invoked.
76    ejected: bool,
77    /// Per-server breaker threshold (overridable via `spawn()` options).
78    circuit_threshold: u32,
79    /// Per-server breaker reset window (overridable via `spawn()` options).
80    circuit_reset: Duration,
81    /// Per-server restart budget (overridable via `spawn()` options).
82    max_restarts: u32,
83    /// Per-server restart window (overridable via `spawn()` options).
84    restart_window: Duration,
85}
86
87impl SupervisionState {
88    fn new(policy: SupervisionPolicy) -> Self {
89        Self {
90            restart_attempts: Vec::new(),
91            consecutive_failures: 0,
92            breaker_opens_until: None,
93            ejected: false,
94            circuit_threshold: policy.circuit_threshold,
95            circuit_reset: policy.circuit_reset,
96            max_restarts: policy.max_restarts,
97            restart_window: policy.restart_window,
98        }
99    }
100
101    /// Returns the breaker state at `now`. Transitions Open→HalfOpen
102    /// implicitly when the reset window has elapsed; the caller is
103    /// expected to close (or re-open) the breaker on success/failure.
104    fn breaker_state(&mut self, now: Instant) -> BreakerState {
105        match self.breaker_opens_until {
106            Some(deadline) if now < deadline => BreakerState::Open,
107            Some(_) => BreakerState::HalfOpen,
108            None => BreakerState::Closed,
109        }
110    }
111
112    /// Record a successful call. Closes the breaker and resets the
113    /// failure counter regardless of prior state.
114    fn record_success(&mut self) {
115        self.consecutive_failures = 0;
116        self.breaker_opens_until = None;
117    }
118
119    /// Record a failure. Opens the breaker once consecutive failures hit
120    /// the threshold; subsequent failures stretch the open-window so
121    /// flapping doesn't get free passes.
122    fn record_failure(&mut self, now: Instant) {
123        self.consecutive_failures = self.consecutive_failures.saturating_add(1);
124        if self.consecutive_failures >= self.circuit_threshold {
125            self.breaker_opens_until = Some(now + self.circuit_reset);
126        }
127    }
128
129    /// Record a restart attempt and report whether the server is still
130    /// within its budget. Returns `false` when the budget is exhausted —
131    /// the caller should mark the server ejected and surface a fatal
132    /// error.
133    fn record_restart(&mut self, now: Instant) -> bool {
134        self.prune_restart_window(now);
135        self.restart_attempts.push(now);
136        if self.restart_attempts.len() as u32 > self.max_restarts {
137            self.ejected = true;
138            return false;
139        }
140        true
141    }
142
143    /// Next backoff delay before retrying. Exponential, capped at
144    /// [`MAX_RESTART_BACKOFF`].
145    fn backoff_delay(&self) -> Duration {
146        let attempt = self.restart_attempts.len() as u32;
147        let exp = attempt.saturating_sub(1).min(6);
148        let mul = 1u64 << exp;
149        let nanos = INITIAL_RESTART_BACKOFF.as_nanos() as u64 * mul;
150        Duration::from_nanos(nanos).min(MAX_RESTART_BACKOFF)
151    }
152
153    fn prune_restart_window(&mut self, now: Instant) {
154        let window = self.restart_window;
155        self.restart_attempts
156            .retain(|t| now.duration_since(*t) <= window);
157    }
158
159    fn clear(&mut self) {
160        self.restart_attempts.clear();
161        self.consecutive_failures = 0;
162        self.breaker_opens_until = None;
163        self.ejected = false;
164    }
165}
166
167/// Public reading of the circuit breaker state.
168#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize)]
169#[serde(rename_all = "snake_case")]
170pub enum BreakerState {
171    Closed,
172    Open,
173    HalfOpen,
174}
175
176impl BreakerState {
177    pub fn as_str(self) -> &'static str {
178        match self {
179            BreakerState::Closed => "closed",
180            BreakerState::Open => "open",
181            BreakerState::HalfOpen => "half_open",
182        }
183    }
184}
185
186/// Per-server supervision policy. Each field maps to a knob exposed via
187/// `harn.mcp.spawn(..., options)`. Defaults match the constants at the
188/// top of this module.
189#[derive(Clone, Copy, Debug)]
190pub struct SupervisionPolicy {
191    pub circuit_threshold: u32,
192    pub circuit_reset: Duration,
193    pub max_restarts: u32,
194    pub restart_window: Duration,
195}
196
197impl Default for SupervisionPolicy {
198    fn default() -> Self {
199        Self {
200            circuit_threshold: DEFAULT_CIRCUIT_THRESHOLD,
201            circuit_reset: DEFAULT_CIRCUIT_RESET,
202            max_restarts: DEFAULT_MAX_RESTARTS,
203            restart_window: DEFAULT_RESTART_WINDOW,
204        }
205    }
206}
207
208/// One cached response keyed by (server, tool, args_hash). `expires_at`
209/// is derived from the MCP cache hint that came back with the original
210/// result; entries without a positive TTL are never inserted.
211#[derive(Clone, Debug)]
212struct CachedResponse {
213    payload: JsonValue,
214    inserted_at: Instant,
215    expires_at: Instant,
216    /// `scope` echoes the MCP `cacheScope` field. The cache is
217    /// process-local regardless, so today's logic ignores it — but
218    /// `status()` surfaces it via `cache_entries` and a future
219    /// per-tenant cache partition (A.2) will key on it.
220    #[allow(dead_code)]
221    scope: Option<&'static str>,
222}
223
224/// Decision returned by an [`AllowlistGuard`] before dispatching a call.
225/// `Allow` proceeds; `Deny` short-circuits with a typed error built from
226/// the supplied reason.
227#[derive(Clone, Debug, PartialEq, Eq)]
228pub enum AllowlistDecision {
229    Allow,
230    Deny { reason: String },
231}
232
233/// Pluggable policy hook so harn-serve can wire `AuthPolicy` (per-tenant
234/// allowlists) without harn-vm depending on harn-serve. The closure is
235/// invoked once per `spawn` / `call`, before any network traffic, with
236/// the server name and optional tool name (`None` for spawn checks).
237pub type AllowlistGuard = Arc<dyn Fn(&str, Option<&str>) -> AllowlistDecision + Send + Sync>;
238
239/// Diagnostic snapshot of a hosted server. Returned by
240/// `harn.mcp.status()` and consumed by tests and dashboards.
241#[derive(Clone, Debug, Serialize)]
242pub struct McpHostStatus {
243    pub name: String,
244    pub transport: String,
245    pub url: Option<String>,
246    pub active: bool,
247    pub lazy: bool,
248    pub ref_count: usize,
249    pub restart_count: u32,
250    pub consecutive_failures: u32,
251    pub circuit: BreakerState,
252    pub ejected: bool,
253    /// Number of cached response entries for this server (across all
254    /// tools).
255    pub cache_entries: usize,
256    /// Human-readable authenticated identity for connected OAuth-backed HTTP
257    /// servers when a vetted identity descriptor can render from the stored
258    /// token response.
259    pub display_identity: Option<String>,
260}
261
262/// Options accepted by [`spawn`]. Mirrors the dict surface in
263/// `harn.mcp.spawn({...}, options)`.
264#[derive(Clone, Debug, Default, Deserialize)]
265pub struct SpawnOptions {
266    /// When `true`, register the server but do not connect eagerly.
267    /// Defaults to `false` — `spawn` always boots so callers get a
268    /// fail-fast error if the spec is broken.
269    #[serde(default)]
270    pub lazy: bool,
271    /// Keep-alive grace period (milliseconds) before a fully-released
272    /// lazy connection is closed. `None` → close immediately.
273    #[serde(default)]
274    pub keep_alive_ms: Option<u64>,
275    /// Optional path or URL to a server card to associate with the
276    /// registration so `mcp_server_card(name)` can resolve it later.
277    #[serde(default)]
278    pub card: Option<String>,
279    /// Supervision policy overrides. Unset fields fall back to the
280    /// module-level constants.
281    #[serde(default)]
282    pub circuit_threshold: Option<u32>,
283    #[serde(default)]
284    pub circuit_reset_ms: Option<u64>,
285    #[serde(default)]
286    pub max_restarts: Option<u32>,
287    #[serde(default)]
288    pub restart_window_ms: Option<u64>,
289}
290
291impl SpawnOptions {
292    fn into_policy(self) -> (SupervisionPolicy, RegisteredMcpServerMeta) {
293        let default = SupervisionPolicy::default();
294        let policy = SupervisionPolicy {
295            circuit_threshold: self.circuit_threshold.unwrap_or(default.circuit_threshold),
296            circuit_reset: self
297                .circuit_reset_ms
298                .map(Duration::from_millis)
299                .unwrap_or(default.circuit_reset),
300            max_restarts: self.max_restarts.unwrap_or(default.max_restarts),
301            restart_window: self
302                .restart_window_ms
303                .map(Duration::from_millis)
304                .unwrap_or(default.restart_window),
305        };
306        let meta = RegisteredMcpServerMeta {
307            lazy: self.lazy,
308            keep_alive: self.keep_alive_ms.map(Duration::from_millis),
309            card: self.card,
310        };
311        (policy, meta)
312    }
313}
314
315struct RegisteredMcpServerMeta {
316    lazy: bool,
317    keep_alive: Option<Duration>,
318    card: Option<String>,
319}
320
321/// Bag of mutable state owned by the process-global host. One per
322/// process; tests reset via [`reset_for_tests`].
323struct HostInner {
324    /// Per-server supervision state. Keys mirror
325    /// `mcp_registry::REGISTRY.servers`. Entries are inserted at
326    /// `spawn` and pruned at `stop`/`reload`.
327    supervision: HashMap<String, SupervisionState>,
328    /// Per-(server, tool) → args_hash → cached response. The args-hash
329    /// keying lets two distinct argument shapes coexist for the same
330    /// tool without collisions.
331    response_cache: HashMap<(String, String), HashMap<String, CachedResponse>>,
332    /// Active allowlist guard, if any. `None` means allow-all.
333    allowlist: Option<AllowlistGuard>,
334    /// Cache statistics. Surfaced via `status()`'s `cache_entries`
335    /// counter and the standalone `cache_stats()` helper for telemetry.
336    cache_hits: u64,
337    cache_misses: u64,
338}
339
340impl HostInner {
341    fn new() -> Self {
342        Self {
343            supervision: HashMap::new(),
344            response_cache: HashMap::new(),
345            allowlist: None,
346            cache_hits: 0,
347            cache_misses: 0,
348        }
349    }
350}
351
352static HOST: Mutex<Option<HostInner>> = Mutex::new(None);
353
354fn with_inner<F, R>(f: F) -> R
355where
356    F: FnOnce(&mut HostInner) -> R,
357{
358    let mut guard = HOST.lock().expect("mcp host mutex poisoned");
359    if guard.is_none() {
360        *guard = Some(HostInner::new());
361    }
362    f(guard.as_mut().expect("host inner just initialized"))
363}
364
365/// Install (or replace) the allowlist guard. Pass `None` to clear.
366pub fn set_allowlist(guard: Option<AllowlistGuard>) {
367    with_inner(|inner| inner.allowlist = guard);
368}
369
370/// Wipe every host-side bit of state. Called by `reset_thread_local_state`
371/// and by tests that need a clean slate between runs.
372pub fn reset_for_tests() {
373    with_inner(|inner| {
374        inner.supervision.clear();
375        inner.response_cache.clear();
376        inner.allowlist = None;
377        inner.cache_hits = 0;
378        inner.cache_misses = 0;
379    });
380    mcp_registry::reset();
381}
382
383/// Cache hit/miss counters. Tests use these to assert that caching is
384/// actually engaged; the agent loop's observability layer reads them to
385/// emit `harn.mcp.cache.*` metrics.
386#[derive(Clone, Copy, Debug)]
387pub struct CacheStats {
388    pub hits: u64,
389    pub misses: u64,
390}
391
392pub fn cache_stats() -> CacheStats {
393    with_inner(|inner| CacheStats {
394        hits: inner.cache_hits,
395        misses: inner.cache_misses,
396    })
397}
398
399/// Spawn (register + connect) an MCP server. Returns the server name as
400/// the `server_id` — names are unique within the registry and stable
401/// across the process lifetime, so we don't need a separate opaque id.
402pub async fn spawn(spec: JsonValue, options: SpawnOptions) -> Result<String, VmError> {
403    let name = spec
404        .get("name")
405        .and_then(|v| v.as_str())
406        .ok_or_else(|| VmError::Runtime("mcp.spawn: spec must include a `name` field".into()))?
407        .to_string();
408    if name.is_empty() {
409        return Err(VmError::Runtime(
410            "mcp.spawn: spec.name must be a non-empty string".into(),
411        ));
412    }
413
414    if let Some(guard) = current_allowlist() {
415        if let AllowlistDecision::Deny { reason } = guard(&name, None) {
416            return Err(VmError::Runtime(format!(
417                "mcp.spawn({name}): denied by allowlist: {reason}"
418            )));
419        }
420    }
421
422    let (policy, meta) = options.into_policy();
423    mcp_registry::register_servers(vec![RegisteredMcpServer {
424        name: name.clone(),
425        spec: spec.clone(),
426        lazy: meta.lazy,
427        card: meta.card,
428        keep_alive: meta.keep_alive,
429    }]);
430
431    with_inner(|inner| {
432        inner
433            .supervision
434            .insert(name.clone(), SupervisionState::new(policy));
435    });
436
437    if !meta.lazy {
438        // Eager spawn — connect right away so a broken spec fails before
439        // the first call. Lazy servers stay idle until first use.
440        let _ = mcp_registry::ensure_active(&name).await.inspect_err(|_| {
441            with_inner(|inner| {
442                inner.supervision.remove(&name);
443            });
444        })?;
445    }
446
447    Ok(name)
448}
449
450/// Drop a hosted server: tears down the active connection, prunes
451/// supervision + cache entries. The registration itself is left in place
452/// so a subsequent `ensure_active` from declarative `harn.toml` flows
453/// still works; callers that want to fully forget a server should also
454/// re-register a new spec via `spawn`.
455pub fn stop(name: &str) -> Result<(), VmError> {
456    if !mcp_registry::is_registered(name) {
457        return Err(VmError::Runtime(format!(
458            "mcp.stop: no server named '{name}' is hosted"
459        )));
460    }
461    mcp_registry::release(name);
462    with_inner(|inner| {
463        inner.supervision.remove(name);
464        inner.response_cache.retain(|(s, _), _| s != name);
465    });
466    Ok(())
467}
468
469/// Hot-reload a hosted server: drops the active connection but preserves
470/// the registration spec. The next `call` reconnects automatically.
471/// Reset supervision state so a previously-ejected server gets a fresh
472/// budget after the operator fixes the underlying problem.
473pub fn reload(name: &str) -> Result<(), VmError> {
474    if !mcp_registry::is_registered(name) {
475        return Err(VmError::Runtime(format!(
476            "mcp.reload: no server named '{name}' is hosted"
477        )));
478    }
479    mcp_registry::release(name);
480    with_inner(|inner| {
481        if let Some(state) = inner.supervision.get_mut(name) {
482            state.clear();
483        }
484        inner.response_cache.retain(|(s, _), _| s != name);
485    });
486    Ok(())
487}
488
489/// Return the cached tool list for a server, performing a fresh
490/// `tools/list` only when the registered cache hint has expired. The
491/// returned tools are annotated with `_mcp_server` so downstream
492/// indexers (BM25 search, etc.) can match by server.
493pub async fn tools(name: &str) -> Result<Vec<JsonValue>, VmError> {
494    let handle = ensure_or_restart(name).await?;
495    let result = supervised_call(name, || async {
496        handle.call("tools/list", serde_json::json!({})).await
497    })
498    .await?;
499
500    let mut tools = result
501        .get("tools")
502        .and_then(|t| t.as_array())
503        .cloned()
504        .unwrap_or_default();
505    for tool in tools.iter_mut() {
506        if let Some(obj) = tool.as_object_mut() {
507            obj.entry("_mcp_server")
508                .or_insert_with(|| JsonValue::String(name.to_string()));
509        }
510    }
511    // MCP tool-integrity (Layer 0b): pin each tool's schema and flag any whose
512    // description/inputSchema changed since first sighting (rug-pull defense).
513    // The flag rides on the tool dict so the host's approval UI can force
514    // re-approval; harn surfaces the fact, the host decides.
515    let security_policy = crate::security::current_policy();
516    if security_policy.pin_mcp_schemas && !security_policy.server_is_trusted(name) {
517        for tool in tools.iter_mut() {
518            let hash = crate::security::tool_schema_hash(tool);
519            let tool_name = tool
520                .get("name")
521                .and_then(|v| v.as_str())
522                .unwrap_or_default()
523                .to_string();
524            if tool_name.is_empty() {
525                continue;
526            }
527            if crate::security::pin_and_detect_change(name, &tool_name, &hash) {
528                if let Some(obj) = tool.as_object_mut() {
529                    obj.insert("_schema_changed".to_string(), JsonValue::Bool(true));
530                }
531            }
532        }
533    }
534    Ok(tools)
535}
536
537/// Invoke a tool with the supervised wrapper applied. Honors:
538/// - allowlist (refuses unallowed (server, tool) pairs),
539/// - circuit breaker (fails fast when open),
540/// - response cache (returns cached payload when the server-declared
541///   TTL has not expired and the args hash matches),
542/// - auto-restart on transport failure with exponential backoff.
543pub async fn call(name: &str, tool: &str, args: JsonValue) -> Result<JsonValue, VmError> {
544    if let Some(guard) = current_allowlist() {
545        if let AllowlistDecision::Deny { reason } = guard(name, Some(tool)) {
546            return Err(VmError::Runtime(format!(
547                "mcp.call({name}/{tool}): denied by allowlist: {reason}"
548            )));
549        }
550    }
551
552    // Charge the logical call against any active `@budget(mcp_calls: …)`
553    // ceiling before doing work — including cache hits, since the budget
554    // caps how many tool calls a dispatch may *issue*, not how many miss
555    // the cache. Caps a runaway tool loop at the dispatcher boundary.
556    crate::call_budget::charge_mcp_call()?;
557
558    let now = Instant::now();
559    let args_hash = hash_args(&args);
560    if let Some(payload) = take_cache_hit(name, tool, &args_hash, now) {
561        return Ok(payload);
562    }
563    with_inner(|inner| inner.cache_misses = inner.cache_misses.saturating_add(1));
564
565    breaker_gate(name, now)?;
566
567    let handle = ensure_or_restart(name).await?;
568    // `supervised_call` operates on a single `JsonValue` payload so it
569    // can be shared with the `tools()` path. Stash the envelope's cache
570    // hint in a separate slot the closure can update without breaking
571    // that contract.
572    let envelope_hint: Arc<Mutex<Option<McpCacheHint>>> = Arc::new(Mutex::new(None));
573    let hint_slot = Arc::clone(&envelope_hint);
574    let result = supervised_call(name, move || {
575        let handle = handle.clone();
576        let tool = tool.to_string();
577        let args = args.clone();
578        let hint_slot = Arc::clone(&hint_slot);
579        async move {
580            let (content, hint) = call_mcp_tool_with_hint(&handle, &tool, args).await?;
581            if let Ok(mut slot) = hint_slot.lock() {
582                *slot = hint;
583            }
584            Ok(content)
585        }
586    })
587    .await?;
588
589    let hint = envelope_hint.lock().ok().and_then(|slot| *slot);
590    if let Some(hint) = hint {
591        insert_cache(name, tool, &args_hash, &result, hint, now);
592    }
593
594    Ok(result)
595}
596
597/// Cross-server tool discovery — calls `tools/list` against every
598/// registered (and reachable) server and returns a flat list of
599/// `{ server, tool, schema }` entries.
600pub async fn discover() -> Result<Vec<JsonValue>, VmError> {
601    let names: Vec<String> = mcp_registry::snapshot_status()
602        .into_iter()
603        .map(|s| s.name)
604        .collect();
605    let mut out: Vec<JsonValue> = Vec::new();
606    for name in names {
607        // Skip servers that the allowlist (if any) wouldn't even let us
608        // spawn — `discover()` is a tooling primitive, not a probe.
609        if let Some(guard) = current_allowlist() {
610            if matches!(guard(&name, None), AllowlistDecision::Deny { .. }) {
611                continue;
612            }
613        }
614        // Best-effort: a single unreachable server should not poison the
615        // whole discovery sweep. Surface the error inline so callers can
616        // tell why a server's tools are missing.
617        match tools(&name).await {
618            Ok(tools) => {
619                for tool in tools {
620                    let tool_name = tool
621                        .get("name")
622                        .and_then(|v| v.as_str())
623                        .unwrap_or("")
624                        .to_string();
625                    out.push(serde_json::json!({
626                        "server": name,
627                        "tool": tool_name,
628                        "schema": tool,
629                    }));
630                }
631            }
632            Err(err) => {
633                out.push(serde_json::json!({
634                    "server": name,
635                    "error": err.to_string(),
636                }));
637            }
638        }
639    }
640    Ok(out)
641}
642
643/// Diagnostic snapshot across all hosted servers.
644pub async fn status() -> Vec<McpHostStatus> {
645    let registry: BTreeMap<String, mcp_registry::RegistryStatus> = mcp_registry::snapshot_status()
646        .into_iter()
647        .map(|s| (s.name.clone(), s))
648        .collect();
649    let mut statuses = with_inner(|inner| {
650        let mut out = Vec::new();
651        let now = Instant::now();
652        for (name, reg) in &registry {
653            let (restart_count, consecutive_failures, circuit, ejected) =
654                if let Some(state) = inner.supervision.get_mut(name) {
655                    let st = state.breaker_state(now);
656                    (
657                        state.restart_attempts.len() as u32,
658                        state.consecutive_failures,
659                        st,
660                        state.ejected,
661                    )
662                } else {
663                    (0, 0, BreakerState::Closed, false)
664                };
665            let cache_entries = inner
666                .response_cache
667                .iter()
668                .filter(|((s, _), _)| s == name)
669                .map(|(_, v)| v.len())
670                .sum();
671            out.push(McpHostStatus {
672                name: name.clone(),
673                transport: reg.transport.clone(),
674                url: reg.url.clone(),
675                active: reg.active,
676                lazy: reg.lazy,
677                ref_count: reg.ref_count,
678                restart_count,
679                consecutive_failures,
680                circuit,
681                ejected,
682                cache_entries,
683                display_identity: None,
684            });
685        }
686        out
687    });
688    for status in &mut statuses {
689        if !status.active || status.transport != "http" {
690            continue;
691        }
692        let Some(url) = status.url.as_deref() else {
693            continue;
694        };
695        status.display_identity = crate::mcp_identity::display_identity_from_store(url, None).await;
696    }
697    statuses
698}
699
700fn current_allowlist() -> Option<AllowlistGuard> {
701    with_inner(|inner| inner.allowlist.clone())
702}
703
704fn breaker_gate(name: &str, now: Instant) -> Result<(), VmError> {
705    with_inner(|inner| {
706        let Some(state) = inner.supervision.get_mut(name) else {
707            return Ok(());
708        };
709        if state.ejected {
710            return Err(VmError::Runtime(format!(
711                "mcp.call({name}): server is ejected after exhausting its restart budget; call `harn.mcp.reload({name:?})` to clear"
712            )));
713        }
714        match state.breaker_state(now) {
715            BreakerState::Open => Err(VmError::Runtime(format!(
716                "mcp.call({name}): circuit breaker is open (last {n} consecutive failures); retry after the breaker resets",
717                n = state.consecutive_failures
718            ))),
719            // Closed and HalfOpen both proceed — HalfOpen lets one probe
720            // through and the success path closes the breaker.
721            BreakerState::Closed | BreakerState::HalfOpen => Ok(()),
722        }
723    })
724}
725
726async fn ensure_or_restart(name: &str) -> Result<VmMcpClientHandle, VmError> {
727    // Fast path: the registry already has a live handle.
728    if let Some(handle) = mcp_registry::active_handle(name) {
729        return Ok(handle);
730    }
731
732    // Cold path: the server is registered but the connection is gone
733    // (lazy boot, crashed transport, or `reload()` dropped it). Try to
734    // reconnect through the registry; budget enforcement happens
735    // inside `supervised_call`'s error path on the next failure, not
736    // here.
737    mcp_registry::ensure_active(name).await
738}
739
740/// Run `op` against the hosted server and wrap any error in supervision
741/// bookkeeping: record the failure, attempt an automatic restart if the
742/// budget allows, and try `op` again once. A second failure surfaces.
743async fn supervised_call<F, Fut>(name: &str, op: F) -> Result<JsonValue, VmError>
744where
745    F: Fn() -> Fut,
746    Fut: std::future::Future<Output = Result<JsonValue, VmError>>,
747{
748    let span = tracing::info_span!(
749        "harn.mcp.call",
750        otel.name = "harn.mcp.call",
751        harn.mcp.server = name,
752    );
753    let _enter = span.enter();
754
755    let first = op().await;
756    match first {
757        Ok(v) => {
758            with_inner(|inner| {
759                if let Some(state) = inner.supervision.get_mut(name) {
760                    state.record_success();
761                }
762            });
763            Ok(v)
764        }
765        Err(err) => {
766            let now = Instant::now();
767            let (should_retry, backoff) = with_inner(|inner| {
768                let Some(state) = inner.supervision.get_mut(name) else {
769                    return (false, Duration::ZERO);
770                };
771                state.record_failure(now);
772                // Only auto-restart on transport-shaped errors. The
773                // surface is broad here on purpose — any failure between
774                // "could not write to child stdin" and "server closed
775                // connection" warrants a fresh transport.
776                if !looks_like_transport_failure(&err) {
777                    return (false, Duration::ZERO);
778                }
779                let ok = state.record_restart(now);
780                if !ok {
781                    return (false, Duration::ZERO);
782                }
783                (true, state.backoff_delay())
784            });
785            if !should_retry {
786                tracing::warn!(
787                    server = name,
788                    error = %err,
789                    "harn.mcp.call: failure (no retry)"
790                );
791                return Err(err);
792            }
793
794            tracing::info!(
795                server = name,
796                error = %err,
797                backoff_ms = backoff.as_millis() as u64,
798                "harn.mcp.call: retrying after transport failure"
799            );
800
801            // Force the registry to drop the dead handle so the next
802            // `ensure_or_restart` will reconnect from spec.
803            mcp_registry::release(name);
804            tokio::time::sleep(backoff).await;
805            let _handle = ensure_or_restart(name).await?;
806            let second = op().await;
807            match &second {
808                Ok(_) => with_inner(|inner| {
809                    if let Some(state) = inner.supervision.get_mut(name) {
810                        state.record_success();
811                    }
812                }),
813                Err(err) => with_inner(|inner| {
814                    if let Some(state) = inner.supervision.get_mut(name) {
815                        state.record_failure(Instant::now());
816                    }
817                    tracing::warn!(
818                        server = name,
819                        error = %err,
820                        "harn.mcp.call: second attempt failed"
821                    );
822                }),
823            }
824            second
825        }
826    }
827}
828
829fn looks_like_transport_failure(err: &VmError) -> bool {
830    let text = err.to_string();
831    let needles = [
832        "server closed connection",
833        "disconnected",
834        "MCP read error",
835        "MCP write error",
836        "did not respond to",
837        "MCP flush error",
838        "connect",
839    ];
840    needles.iter().any(|n| text.contains(n))
841}
842
843fn hash_args(args: &JsonValue) -> String {
844    let mut hasher = Sha256::new();
845    let canonical = canonicalize_json(args);
846    hasher.update(canonical.as_bytes());
847    let digest = hasher.finalize();
848    let mut hex = String::with_capacity(digest.len() * 2);
849    for byte in digest {
850        use std::fmt::Write;
851        let _ = write!(&mut hex, "{byte:02x}");
852    }
853    hex
854}
855
856/// Canonical JSON encoding for hash stability: keys sorted, no
857/// insignificant whitespace.
858fn canonicalize_json(value: &JsonValue) -> String {
859    match value {
860        JsonValue::Object(map) => {
861            let mut sorted: Vec<(&String, &JsonValue)> = map.iter().collect();
862            sorted.sort_by(|a, b| a.0.cmp(b.0));
863            let body: Vec<String> = sorted
864                .into_iter()
865                .map(|(k, v)| {
866                    format!(
867                        "{}:{}",
868                        serde_json::to_string(k).unwrap_or_default(),
869                        canonicalize_json(v)
870                    )
871                })
872                .collect();
873            format!("{{{}}}", body.join(","))
874        }
875        JsonValue::Array(items) => {
876            let body: Vec<String> = items.iter().map(canonicalize_json).collect();
877            format!("[{}]", body.join(","))
878        }
879        other => serde_json::to_string(other).unwrap_or_default(),
880    }
881}
882
883fn take_cache_hit(server: &str, tool: &str, args_hash: &str, now: Instant) -> Option<JsonValue> {
884    with_inner(|inner| {
885        let key = (server.to_string(), tool.to_string());
886        let entry = inner.response_cache.get_mut(&key)?;
887        let cached = entry.get(args_hash)?;
888        if now >= cached.expires_at {
889            entry.remove(args_hash);
890            return None;
891        }
892        let payload = cached.payload.clone();
893        inner.cache_hits = inner.cache_hits.saturating_add(1);
894        Some(payload)
895    })
896}
897
898/// Cache a server-supplied response payload under (`server`, `tool`,
899/// `args_hash`) for the TTL the server declared. Insertion is a no-op
900/// when the hint has no positive TTL — server implementations that
901/// want a result memoized must surface a `ttlMs > 0` in the envelope.
902fn insert_cache(
903    server: &str,
904    tool: &str,
905    args_hash: &str,
906    payload: &JsonValue,
907    hint: McpCacheHint,
908    now: Instant,
909) {
910    let Some(ttl_ms) = hint.ttl_ms else {
911        return;
912    };
913    if ttl_ms == 0 {
914        return;
915    }
916    let expires_at = now + Duration::from_millis(ttl_ms);
917    let cached = CachedResponse {
918        payload: payload.clone(),
919        inserted_at: now,
920        expires_at,
921        scope: hint.scope,
922    };
923    with_inner(|inner| {
924        let key = (server.to_string(), tool.to_string());
925        let bucket = inner.response_cache.entry(key).or_default();
926        if bucket.len() >= RESPONSE_CACHE_MAX_ENTRIES_PER_TOOL {
927            // Drop the oldest insertion as a cheap LRU approximation.
928            if let Some(oldest_key) = bucket
929                .iter()
930                .min_by_key(|(_, v)| v.inserted_at)
931                .map(|(k, _)| k.clone())
932            {
933                bucket.remove(&oldest_key);
934            }
935        }
936        bucket.insert(args_hash.to_string(), cached);
937    });
938}
939
940/// Compatibility shim for tests that previously inserted a cached
941/// payload from a JSON envelope. Resolves the hint via
942/// [`McpCacheHint::from_result`] and delegates to [`insert_cache`].
943#[cfg(test)]
944fn insert_cache_if_hinted(
945    server: &str,
946    tool: &str,
947    args_hash: &str,
948    payload: &JsonValue,
949    now: Instant,
950) {
951    if let Some(hint) = McpCacheHint::from_result(payload) {
952        insert_cache(server, tool, args_hash, payload, hint, now);
953    }
954}
955
956#[cfg(test)]
957mod tests {
958    use super::*;
959
960    static TEST_LOCK: Mutex<()> = Mutex::new(());
961
962    fn lock() -> std::sync::MutexGuard<'static, ()> {
963        TEST_LOCK.lock().unwrap_or_else(|p| p.into_inner())
964    }
965
966    #[test]
967    fn supervision_breaker_opens_after_threshold() {
968        let _g = lock();
969        let mut state = SupervisionState::new(SupervisionPolicy {
970            circuit_threshold: 3,
971            circuit_reset: Duration::from_millis(100),
972            ..SupervisionPolicy::default()
973        });
974        let t0 = Instant::now();
975        assert_eq!(state.breaker_state(t0), BreakerState::Closed);
976        state.record_failure(t0);
977        state.record_failure(t0);
978        assert_eq!(state.breaker_state(t0), BreakerState::Closed);
979        state.record_failure(t0);
980        assert_eq!(state.breaker_state(t0), BreakerState::Open);
981        // After reset window, breaker transitions to half-open.
982        assert_eq!(
983            state.breaker_state(t0 + Duration::from_millis(200)),
984            BreakerState::HalfOpen
985        );
986    }
987
988    #[test]
989    fn supervision_restart_budget_ejects_after_n_attempts() {
990        let _g = lock();
991        let mut state = SupervisionState::new(SupervisionPolicy {
992            max_restarts: 2,
993            restart_window: Duration::from_mins(1),
994            ..SupervisionPolicy::default()
995        });
996        let t = Instant::now();
997        assert!(state.record_restart(t));
998        assert!(state.record_restart(t));
999        assert!(!state.record_restart(t));
1000        assert!(state.ejected);
1001    }
1002
1003    #[test]
1004    fn supervision_backoff_grows_exponentially_then_caps() {
1005        let _g = lock();
1006        let mut state = SupervisionState::new(SupervisionPolicy::default());
1007        let t = Instant::now();
1008        state.record_restart(t);
1009        let d1 = state.backoff_delay();
1010        state.record_restart(t);
1011        let d2 = state.backoff_delay();
1012        state.record_restart(t);
1013        let d3 = state.backoff_delay();
1014        assert!(
1015            d2 > d1,
1016            "second backoff ({d2:?}) should exceed first ({d1:?})"
1017        );
1018        assert!(d3 > d2);
1019        for _ in 0..16 {
1020            state.record_restart(t);
1021        }
1022        assert!(state.backoff_delay() <= MAX_RESTART_BACKOFF);
1023    }
1024
1025    #[test]
1026    fn canonical_json_sorts_object_keys() {
1027        let a = canonicalize_json(&serde_json::json!({"b": 1, "a": 2}));
1028        let b = canonicalize_json(&serde_json::json!({"a": 2, "b": 1}));
1029        assert_eq!(a, b);
1030    }
1031
1032    #[test]
1033    fn hash_args_is_stable_across_key_order() {
1034        let h1 = hash_args(&serde_json::json!({"x": 1, "y": [1, 2]}));
1035        let h2 = hash_args(&serde_json::json!({"y": [1, 2], "x": 1}));
1036        assert_eq!(h1, h2);
1037    }
1038
1039    #[test]
1040    fn cache_insert_and_take_respects_ttl() {
1041        let _g = lock();
1042        reset_for_tests();
1043        let payload = serde_json::json!({
1044            "ttlMs": 100,
1045            "cacheScope": "private",
1046            "value": 1
1047        });
1048        let now = Instant::now();
1049        insert_cache_if_hinted("srv", "ping", "deadbeef", &payload, now);
1050        let hit = take_cache_hit("srv", "ping", "deadbeef", now);
1051        assert!(hit.is_some(), "fresh entry should hit");
1052        let stale = take_cache_hit("srv", "ping", "deadbeef", now + Duration::from_millis(200));
1053        assert!(stale.is_none(), "expired entry should miss");
1054    }
1055
1056    #[test]
1057    fn cache_skips_payload_without_hint() {
1058        let _g = lock();
1059        reset_for_tests();
1060        insert_cache_if_hinted(
1061            "srv",
1062            "ping",
1063            "h",
1064            &serde_json::json!({"value": 1}),
1065            Instant::now(),
1066        );
1067        assert!(take_cache_hit("srv", "ping", "h", Instant::now()).is_none());
1068    }
1069
1070    #[test]
1071    fn allowlist_denies_disallowed_tool() {
1072        let _g = lock();
1073        reset_for_tests();
1074        set_allowlist(Some(Arc::new(|server, tool| {
1075            if server == "github" && tool == Some("delete_repo") {
1076                AllowlistDecision::Deny {
1077                    reason: "destructive tool blocked".into(),
1078                }
1079            } else {
1080                AllowlistDecision::Allow
1081            }
1082        })));
1083        let runtime = tokio::runtime::Builder::new_current_thread()
1084            .enable_all()
1085            .build()
1086            .unwrap();
1087        let err = runtime
1088            .block_on(call("github", "delete_repo", serde_json::json!({})))
1089            .unwrap_err();
1090        assert!(err.to_string().contains("denied by allowlist"));
1091        set_allowlist(None);
1092    }
1093
1094    #[test]
1095    fn stop_unregistered_server_errors() {
1096        let _g = lock();
1097        reset_for_tests();
1098        let err = stop("nope").unwrap_err();
1099        assert!(err.to_string().contains("no server named 'nope'"));
1100    }
1101
1102    #[test]
1103    fn supervision_record_success_resets_counters() {
1104        let _g = lock();
1105        let mut state = SupervisionState::new(SupervisionPolicy::default());
1106        let t = Instant::now();
1107        state.record_failure(t);
1108        state.record_failure(t);
1109        state.record_success();
1110        assert_eq!(state.consecutive_failures, 0);
1111        assert!(state.breaker_opens_until.is_none());
1112    }
1113
1114    #[test]
1115    fn looks_like_transport_failure_matches_common_errors() {
1116        let cases = [
1117            "MCP: server closed connection",
1118            "MCP: server did not respond to 'tools/call' within 60s",
1119            "MCP write error: broken pipe",
1120            "MCP client is disconnected",
1121        ];
1122        for msg in cases {
1123            assert!(
1124                looks_like_transport_failure(&VmError::Runtime(msg.into())),
1125                "expected {msg:?} to be classified as transport failure"
1126            );
1127        }
1128        assert!(
1129            !looks_like_transport_failure(&VmError::Runtime(
1130                "tool 'foo' rejected arguments".into()
1131            )),
1132            "tool-level errors must not trigger an auto-restart"
1133        );
1134    }
1135}